Sunday, April 19, 2015

Streaming Aggregation

The last several blogs dealt primarily with streaming transformations. There's still a lot more to talk about with streaming transformation but it's time to start exploring streaming aggregation.

Streaming aggregation deals with computing analytics from TupleStreams. Solrj.io is designed to support two models of streaming aggregation:
  • Metrics: Metric aggregations gather analytics on TupleStreams as Tuples are read without transforming the stream. When all the Tuples have been read from the stream the aggregates are placed on the final EOF Tuple. This technique is likely the preferred approach for aggregations of low to moderate cardinality.  
  • Roll-ups: Roll-up aggregations transform the TupleStream by rolling up aggregates and emitting a stream of Tuples that contain the aggregates. This is likely the preferred approach for handling aggregation of high cardinality fields and time series aggregation
This blog deals specifically with Metric aggregation. Future blogs will demonstrate Roll-up aggregations.

Custom TupleStreams

The initial solrj.io release doesn't come with TupleStream implementations that perform streaming aggregation. Future releases will almost certainly contain aggregation support, but for now we'll need to add some custom TupleStreams to demonstrate in-line aggregation.

You'll see that developing custom TupleStreams is extremely easy, and that's partially why they were left out of the initial release. Building specific aggregations is very easy. The hard part is designing a general purpose aggregation library that handles most of the general use cases.

The EOF Tuple

Metric aggregations were designed to work in concert with the EOF Tuple. You'll recall that the EOF Tuple marks the end of the stream. The EOF Tuple does not contain data from the stream, but it is designed to hold Metric aggregations.

Metric Aggregation

Metric aggregation is accomplished by decorating a TupleStream with an aggregating TupleStream that gathers metrics as the Tuples are read. When the aggregating TupleStream reads the EOF Tuple, it places it's aggregates onto the EOF Tuple. The EOF Tuple will then contain all the in-line aggregates.

Simple Example

The example below will demonstrate a very simple case of in-line aggregation. This code is deliberately kept short and simplistic to demonstrate the mechanics of in-line aggregation. Future blogs will show how to do more interesting things like combining streaming transformations with streaming aggregation, parallel aggregation and using richer aggregate data structures.

The Code Sample
import org.apache.solr.client.solrj.io.*;
import java.util.*;

public class StreamingClient {

   public static void main(String args[]) throws IOException {
      String zkHost = args[0];
      String collection = args[1];

      Map props = new HashMap();
      props.put("q", "*:*");
      props.put("qt", "/export");
      props.put("sort", "fieldA asc");
      props.put("fl", "fieldA,fieldB,fieldC");
      
      CloudSolrStream cloudstream = new CloudSolrStream(zkHost, 
                                                    collection, 
                                                    props);

      //Wrap the CloudSolrStream in a CountStream
      //The CountStream implementation is below

      CountStream countStream = new CountStream(cloudStream);
      
      try {
       
        countStream.open();
        while(true) {
          
          Tuple tuple = countStream.read();
          if(tuple.EOF) {
             long count = tuple.getLong("count");
             System.out.println("Tuple count:"+count);
             break;
          }

          String fieldA = tuple.getString("fieldA");
          String fieldB = tuple.getString("fieldB");
          String fieldC = tuple.getString("fieldC");
          System.out.println(fieldA + ", " + fieldB + ", " + fieldC);
        }
      
      } finally {
       countStream.close();
      }
   }
}

public class CountStream implements TupleStream {

  private TupleStream source;
  private int count;

  public CountStream(TupleStream source) {
     this.source = source;
  }

  public Tuple read() throws IOException {
     Tuple tuple = source.read();
   
     if(tuple.EOF) {
       tuple.put("count", count);
       return tuple;
     }

     ++count;
     return tuple;
  }

  public List children() {
    List children = new ArrayList();
    children.add(source);
    return children;
  }

  public void setStreamContext(StreamContext context) {
    source.setStreamContext(context);
  }

  public void open() throws IOException {
    source.open();
  }

  public void close() throws IOException {
    source.close();
  }
}

The code above is doing the following things:

  • Creating a CloudSolrStream
  • Wrapping the CloudSolrStream with a CountStream
  • Opening, reading and closing the CountStream.
  • The CountStream class implements the TupleStream interface. The read() method simply reads a Tuple from the TupleStream it has wrapped and increments a counter.
  • When the EOF Tuple is encountered it places the count onto the EOF Tuple and returns it.
  • The StreamingClient reads the Tuples from the CountStream and prints the field values from each Tuple. The CountStream does not change the Tuples that are emitted from the CloudSolrStream that it has wrapped. When the StreamingClient encounters the EOF Tuple it reads the count field, which was placed there by the CountStream.

Feature Scaling with Solr Streaming Expressions

Before performing machine learning operations its often important to scale the feature vectors so they can be compared at the same scale. In...