The MergeStream
The MergeStream unions two TuplesStreams ordering the Tuples based on a Comparator. When used in combination with the ReducerStream this allows Tuples from different streams to be operated on as one unit. This provides the building block for a number of relational operations including intersections, complements and joins.
A Simple Example: Computing the Complement
To find the complement of two sets, A and B, you need to find all the Tuples in set B that are NOT in set A. The example below will show how easy it is to compute the complement of streams of any size using the MergeStream and ReducerStream.
import org.apache.solr.client.solrj.io.*;
import java.util.*;
public class StreamingClient {
public static void main(String args[]) throws IOException {
String zkHost = args[0];
String collectionA = args[1];
String collectionB = args[2];
//Query properties for StreamA
Map propsA = new HashMap();
propsA.put("q", "*:*");
propsA.put("qt", "/export");
propsA.put("sort", "fieldA asc");
propsA.put("fl", "fieldA");
//Construct StreamA
CloudSolrStream streamA = new CloudSolrStream(zkHost,
collectionA,
propsA);
//Include the Collection name with each tuple
streamA.setTrace(true);
//Query properties for StreamB
Map propsB = new HashMap();
propsB.put("q", "*:*");
propsB.put("qt", "/export");
propsB.put("sort", "fieldA asc");
propsB.put("fl", "fieldA");
//Construct StreamB
CloudSolrStream streamB = new CloudSolrStream(zkHost,
collectionB,
propsB);
//Include the Collection name with each tuple
streamB.setTrace(true);
//Union streamA and streamB ordering by fieldA
Comparator comp = new AscFieldComp("fieldA");
MergeStream streamM = MergeStream(streamA, streamB, comp);
//Wrap a ReducerStream around the MergeStream
ReducerStream streamR = new ReducerStream(streamM, comp);
try {
streamR.open();
while(true) {
Tuple tuple = streamR.read();
if(tuple.EOF) {
break;
}
List<Map> maps = tuple.getMaps();
boolean complement = true;
for(Map map : maps) {
//Trace is turned on
//so the "_COLLECTION_" field will be set.
String col = (String)map.get("_COLLECTION_");
//Check to see if there is a Tuple
//from collectionA
if(col.equals(collectionA)) {
complement = false;
break;
}
}
if(complement) {
for(Map map : maps) {
String fieldA = (String)map.get("fieldA");
//print the complement
System.out.println("Complement:"+ fieldA);
}
}
}
} finally {
streamR.close();
}
}
}
The example above does the following things:- Creates two CloudSolrStreams that connect to two different collections.
- Notice that the CloudSolrStream.setTrace() is called so that the Tuples can be traced back to their collection.
- The query parameters for both streams sort on fieldA asc. So both streams will return the Tuples in ascending order based on the fieldA.
- The two CloudSolrStreams are wrapped with a MergeStream. Note the Comparator, which also orders by fieldA ascending. This will union the two streams ordering Tuples by fieldA.
- The MergeStream is then wrapped with a ReducerStream. Note again that the Comparator is on fieldA so it will group Tuples that have the same value based on fieldA.
- The Tuples that have been grouped together can be retrieved by the Tuple.getMaps() method. The Tuple group may contain Tuples from streamA and streamB
- To compute the Complement all we need to do is look for Tuple groups that don't have a Tuple from streamA.
- Notice the "_COLLECTION_" field which is present because CloudSolrStream.setTrace() was set to true.