Sunday, April 19, 2015

Parallel Streaming Transformations

Prior blogs have covered the basics of solrj.io. In this blog we begin to learn how to harness the parallel computing capabilities of solrj.io.

This blog introduces the ParallelStream which sends a TupleStream to worker nodes where it can be processed in parallel. Both streaming transformations and streaming aggregations can be parallelized using the ParallelStream. This blog focuses on an example of a parallel streaming transformation. Future blogs will provide examples of parallel streaming aggregation.

Worker Collections

Parallel streaming operations are handled by Solr nodes within a worker collection. Worker collections are SolrCloud collections that have been configured to handle streaming operations. The only requirement for worker collections is that the Solr nodes in the collection must be configured with a StreamHandler to handle the streaming requests.

Worker collections can hold an index with data or they can be empty collections that only exist to handle streaming operations.

Stream Partitioning

Each worker node is shuffled 1/Nth of the search results. So if there are 7 worker nodes specified each node will be shuffled 1/7th of the search results. The partitioning is setup under the covers by the StreamHandler on the worker nodes.

Search results are hash partitioned by keys. This ensures that all results with the same key(s) are sent to the same worker node. When coupled with the ability to sort by keys, hash partitioning provides shuffling functionality that allows many streaming operations to be parallelized.

ParallelStream

A TupleStream decorator called the ParallelStream wraps a TupleStream and sends it to N worker nodes to be operated on in parallel.

The ParallelStream serializes the byte code of the underlying TupleStream and sends it to the workers before the TupleStream is opened. The workers deserialize the TupleStream, open the stream, read the Tuples and stream them back to the ParallelStream.

The TupleStream sent to the workers can be programmed to stream only aggregated results or the top N results back to the ParallelStream.

UniqueStream

The example below also introduces a TupleStream decorator called the UniqueStream. The UniqueStream emits a unique stream of Tuples. Tuples are de-duplicated based on a Comparator.

The example will show how the UniqueStream can be used with the ParallelStream to perform the unique operation in parallel.

Simple Parallel Streaming Transformation Example
org.apache.solr.client.solrj.io.*;
import java.util.*;

public class StreamingClient {

   public static void main(String args[]) throws IOException {
      String zkHost = args[0];
      String collection = args[1];
      String workerCollection = args[2];

      Map props = new HashMap();
      props.put("q", "*:*");
      props.put("qt", "/export");
      props.put("sort", "fieldA asc");
      props.put("fl", "fieldA");

      //Set the partition keys
      props.put("partitionKeys", "fieldA");
      
      CloudSolrStream streamC = new CloudSolrStream(zkHost, 
                                                    collection, 
                                                    props);
      Comparator comp = new AscFieldComp("fieldA");
      UniqueStream streamU = new UniqueStream(streamC, comp);
       
      ParallelStream streamP = new ParallelStream(zkHost,
                                                  workerCollection,
                                                  streamU,
                                                  7,
                                                  comp);
                             
      try {
       
        streamP.open();
        while(true) {
          
          Tuple tuple = streamP.read();
          if(tuple.EOF) {
             break;
          }

          String uniqueValue = tuple.getString("fieldA");
          //Print all the unique values
          System.out.println(uniqueValue);

      } finally {
        streamP.close();
      }
   }
}
The example above does the following things:
  1. Creates a CloudSolrStream that connects to a SolrCloud collection.
  2. In the query parameters the sort is set to fieldA asc. This will return the Tuples in ascending order based on fieldA.
  3. The query parameters also set the partitionKeys to fieldA. This will hash partition the search results on fieldA.
  4. The CloudSolrStream is then decorated with a UniqueStream. Note the Comparator which is used by the UniqueStream. In this case the AscFieldComp will cause the UniqueStream to emit unique Tuples based on the value in fieldA. The sorting and partitioning of search results on fieldA allows the UniqueStreams operation to be run in parallel on the worker nodes.
  5. The UniqueStream is decorated with a ParallelStream. Note that the ParallelStream is constructed with the following parameters:
    • zkHost
    • The name of the workerCollection
    • The TupleStream that is being sent to the workers
    • The number of workers
    • A Comparator to order the results coming back from the workers
  6. Then the ParalleStream is opened, iterated and closed.
  7. Under the covers the ParallelStream serializes the UniqueStream and sends it to 7 worker nodes. The workers open the UniqueStream, read the unique Tuples and stream them back to the ParallelStream.

Solr temporal graph queries for event correlation, root cause analysis and temporal anomaly detection

Temporal graph queries will be available in the 8.9 release of Apache Solr. Temporal graph queries are designed for key log analytics use ...