The MergeStream
The MergeStream unions two TuplesStreams ordering the Tuples based on a Comparator. When used in combination with the ReducerStream this allows Tuples from different streams to be operated on as one unit. This provides the building block for a number of relational operations including intersections, complements and joins.
A Simple Example: Computing the Complement
To find the complement of two sets, A and B, you need to find all the Tuples in set B that are NOT in set A. The example below will show how easy it is to compute the complement of streams of any size using the MergeStream and ReducerStream.
import org.apache.solr.client.solrj.io.*; import java.util.*; public class StreamingClient { public static void main(String args[]) throws IOException { String zkHost = args[0]; String collectionA = args[1]; String collectionB = args[2]; //Query properties for StreamA Map propsA = new HashMap(); propsA.put("q", "*:*"); propsA.put("qt", "/export"); propsA.put("sort", "fieldA asc"); propsA.put("fl", "fieldA"); //Construct StreamA CloudSolrStream streamA = new CloudSolrStream(zkHost, collectionA, propsA); //Include the Collection name with each tuple streamA.setTrace(true); //Query properties for StreamB Map propsB = new HashMap(); propsB.put("q", "*:*"); propsB.put("qt", "/export"); propsB.put("sort", "fieldA asc"); propsB.put("fl", "fieldA"); //Construct StreamB CloudSolrStream streamB = new CloudSolrStream(zkHost, collectionB, propsB); //Include the Collection name with each tuple streamB.setTrace(true); //Union streamA and streamB ordering by fieldA Comparator comp = new AscFieldComp("fieldA"); MergeStream streamM = MergeStream(streamA, streamB, comp); //Wrap a ReducerStream around the MergeStream ReducerStream streamR = new ReducerStream(streamM, comp); try { streamR.open(); while(true) { Tuple tuple = streamR.read(); if(tuple.EOF) { break; } List<Map> maps = tuple.getMaps(); boolean complement = true; for(Map map : maps) { //Trace is turned on //so the "_COLLECTION_" field will be set. String col = (String)map.get("_COLLECTION_"); //Check to see if there is a Tuple //from collectionA if(col.equals(collectionA)) { complement = false; break; } } if(complement) { for(Map map : maps) { String fieldA = (String)map.get("fieldA"); //print the complement System.out.println("Complement:"+ fieldA); } } } } finally { streamR.close(); } } }The example above does the following things:
- Creates two CloudSolrStreams that connect to two different collections.
- Notice that the CloudSolrStream.setTrace() is called so that the Tuples can be traced back to their collection.
- The query parameters for both streams sort on fieldA asc. So both streams will return the Tuples in ascending order based on the fieldA.
- The two CloudSolrStreams are wrapped with a MergeStream. Note the Comparator, which also orders by fieldA ascending. This will union the two streams ordering Tuples by fieldA.
- The MergeStream is then wrapped with a ReducerStream. Note again that the Comparator is on fieldA so it will group Tuples that have the same value based on fieldA.
- The Tuples that have been grouped together can be retrieved by the Tuple.getMaps() method. The Tuple group may contain Tuples from streamA and streamB
- To compute the Complement all we need to do is look for Tuple groups that don't have a Tuple from streamA.
- Notice the "_COLLECTION_" field which is present because CloudSolrStream.setTrace() was set to true.