Friday, April 17, 2015

Computing the Complement of Two TupleStreams

This is the third blog in the series about In the last blog, the concept of TupleStream Decorators was introduced and a simple example was shown using the ReducerStream. In this blog we'll introduce a stream Decorator called the MergeStream and see how it can be used with the ReducerStream to calculate the Complement between two streams.

The MergeStream

The MergeStream unions two TuplesStreams ordering the Tuples based on a Comparator. When used in combination with the ReducerStream this allows Tuples from different streams to be operated on as one unit. This provides the building block for a number of relational operations including intersections, complements and joins.

A Simple Example: Computing the Complement

To find the complement of two sets, A and B, you need to find all the Tuples in set B that are NOT in set A. The example below will show how easy it is to compute the complement of streams of any size using the MergeStream and ReducerStream.

import java.util.*;

public class StreamingClient {

   public static void main(String args[]) throws IOException {
      String zkHost = args[0];
      String collectionA = args[1];
      String collectionB = args[2];

      //Query properties for StreamA

      Map propsA = new HashMap();
      propsA.put("q", "*:*");
      propsA.put("qt", "/export");
      propsA.put("sort", "fieldA asc");
      propsA.put("fl", "fieldA");
      //Construct StreamA

      CloudSolrStream streamA = new CloudSolrStream(zkHost, 

      //Include the Collection name with each tuple


      //Query properties for StreamB

      Map propsB = new HashMap();
      propsB.put("q", "*:*");
      propsB.put("qt", "/export");
      propsB.put("sort", "fieldA asc");
      propsB.put("fl", "fieldA");
      //Construct StreamB

      CloudSolrStream streamB = new CloudSolrStream(zkHost, 

      //Include the Collection name with each tuple

      //Union streamA and streamB ordering by fieldA

      Comparator comp = new AscFieldComp("fieldA");  
      MergeStream streamM = MergeStream(streamA, streamB, comp);

      //Wrap a ReducerStream around the MergeStream

      ReducerStream streamR = new ReducerStream(streamM, comp);

      try {
        while(true) {
          Tuple tuple =;
          if(tuple.EOF) {

          List<Map> maps = tuple.getMaps();
          boolean complement = true;
          for(Map map : maps) {

             //Trace is turned on
             //so the "_COLLECTION_" field will be set.

             String col = (String)map.get("_COLLECTION_");

             //Check to see if there is a Tuple
             //from collectionA

             if(col.equals(collectionA)) {
                complement = false;

          if(complement) {
            for(Map map : maps) {
               String fieldA = (String)map.get("fieldA");
               //print the complement
               System.out.println("Complement:"+ fieldA);
      } finally {
The example above does the following things:
  1. Creates two CloudSolrStreams that connect to two different collections.
  2. Notice that the CloudSolrStream.setTrace() is called so that the Tuples can be traced back to their collection.
  3. The query parameters for both streams sort on fieldA asc. So both streams will return the Tuples in ascending order based on the fieldA.
  4. The two CloudSolrStreams are wrapped with a MergeStream. Note the Comparator, which also orders by fieldA ascending. This will union the two streams ordering Tuples by fieldA.
  5. The MergeStream is then wrapped with a ReducerStream. Note again that the Comparator is on fieldA so it will group Tuples that have the same value based on fieldA.
  6. The Tuples that have been grouped together can be retrieved by the Tuple.getMaps() method. The Tuple group may contain Tuples from streamA and streamB
  7. To compute the Complement all we need to do is look for Tuple groups that don't have a Tuple from streamA.
  8. Notice the "_COLLECTION_" field which is present because CloudSolrStream.setTrace() was set to true.

New York - Coronavirus Statistics (NYTimes Data Set)

As of 2020-04-09 New York City - Cumulative Cases By Day New York City - Cumulative Deaths By Day ...