Wednesday, December 16, 2015

Understanding Solr's AnalyticsQuery

Back in Solr 4.9 an AnalyticsQuery API was added that allows developers to plug custom analytic logic into Solr. The AnalyticsQuery class provides a clean and simple API that gives developers access to all the rich functionality in Lucene and is strategically placed within Solr’s distributed search architecture. Using this API you can harness all the power of Lucene and Solr to write custom analytic logic.

Let’s dive in!

The AnalyticsQuery API is a natural extension of the PostFilter API. So let’s start with PostFilters and then we’ll move quickly to the AnalyticsQuery API.

PostFilters

Lucene has the concept of a Collector. A Collector collects the results that match a search.

The PostFilter API wraps a DelegatingCollector around a ranking collector to filter results. The DelegatingCollector is passed each search result, applies filtering logic to the result and then “delegates” the result to the ranking Collector, if it chooses.

DelegatingCollectors can wrap other DelegatingCollectors to form a pipeline. Each Delegating collector looks at the search result and delegates to the next collector. The order that DelegatingCollectors will be executed is controlled by a “cost” parameter that is built into Solr’s local parameter query syntax.

This was put in so that costly PostFilters could be run after less costly PostFilters, thus reducing the number of documents seen by the more costly PostFilter.

DelegatingCollectors also have a finish() method that notifies the collector that all documents have been collected.

The AnalyticsQuery API

The PostFilter API is very strategically positioned because it sees every search result. The AnalyticsQuery API capitilizes on that strategic positioning by adding a few key features.
  • It provides easy access to the ResponseBuilder object, so analytics data can be easily output and piped between AnalyticsQueries. 
  • It provides a way to insert a custom MergeStrategy, so that analytic output from the shards can be merged. 

Extending the AnalyticsQuery class

To hook into the AnalyticsQuery API you need to extend the AnalyticsQuery class. This class has two constructors:

public AnalyticsQuery();

Use the above constructor if you’re doing analytics on a single Solr server.

public AnalyticsQuery(MergeStrategy mergeStrategy);

Use the above constructor if you’re doing distributed analytics. If you use this constructor you’ll need to also write a MergeStrategy implementation. In my next blog I’ll cover the MergeStrategy API, but for now it’s enough to know that it exists, and what it’s used for.

Then you have to implement the hook method:

public abstract DelegatingCollector getAnalyticsCollector(ResponseBuilder rb, IndexSearcher searcher); 

This is where you return your DelegatingCollector implementation that will collect the analytics. Notice that you are passed the current IndexSearcher and ResponseBuilder. You can access both the request context and the Solr response objects through the response builder.

Through the request context you can pipeline the result of one AnalyticsQuery to another AnalyticsQuery, controlling the order of execution using the “cost” parameter. You can place your analytic output directly into the Solr response through the response object in the ResponseBuilder.

Below is a very simple class that extends AnalyticsQuery

 public class MyAnalyticsQuery extends AnalyticsQuery {

    public MyAnalyticsQuery(MergeStrategy mergeStrategy)
    {
      super(mergeStrategy);
    }

    public DelegatingCollector getAnalyticsCollector(ResponseBuilder rb, IndexSearcher searcher)
    {
      return new MyAnalyticsCollector(rb);
    }
  }

And here is a very simple implementation of a DelegatingCollector (MyAnalyticsCollector):

 class MyAnalyticsCollector extends DelegatingCollector {
    ResponseBuilder rb;
    int count;

    public MyAnalyticsCollector(ResponseBuilder rb) {
      this.rb = rb;
    }

    public void collect(int doc) throws IOException {
      ++count;
      leafDelegate.collect(doc);
    }

    public void finish() throws IOException {
      NamedList analytics = new NamedList();
      rb.rsp.add("analytics", analytics);
      analytics.add("mycount", count);
      if(this.delegate instanceof DelegatingCollector) {
        ((DelegatingCollector)this.delegate).finish();
      }
    }
  }
Notice that in the collect(int doc) method, it is incrementing a counter and then calling the collect method on it’s delegate. The collect method is called for each document in the result set. In the finish() method the count is placed into the output response. See the full API for the DelegatingCollector class to understand how it works.

Plugging In Your Code

Just like PostFilters, an AnalyticsQuery is passed in through a custom QParserPlugin as a filter query. Here is an example AnalyticsQuery being passed in through the “fq” parameter:

q=*:*&fq={!myanalytic param1=a param2=b cost=101}&fq={!myanalytic2 param1=a param2=b cost=102}

In the local parameter syntax above there are two filter queries pointing to custom QParserPlugins. The local parameter syntax “!myanalytic” tells Solr to load the “myanalytic” QParserPlugin, which will return a Query that extends AnalyticsQuery.

Notice the “cost” parameters which will control the order of execution. In this case the “myanalytic” AnalyticsQuery will execute ahead of the “myanalytic2″ AnalyticsQuery.

Interaction with the CollapsingQParserPlugin

The CollapsingQParserPlugin is a PostFilter that performs field collapsing. Because it’s a PostFilter it can be layered in among AnalyticsQueries using the “cost” parameter to control order of execution. So, you can conveniently collect analytics before and after field collapsing.

Sunday, April 19, 2015

Parallel Streaming Transformations

Prior blogs have covered the basics of solrj.io. In this blog we begin to learn how to harness the parallel computing capabilities of solrj.io.

This blog introduces the ParallelStream which sends a TupleStream to worker nodes where it can be processed in parallel. Both streaming transformations and streaming aggregations can be parallelized using the ParallelStream. This blog focuses on an example of a parallel streaming transformation. Future blogs will provide examples of parallel streaming aggregation.

Worker Collections

Parallel streaming operations are handled by Solr nodes within a worker collection. Worker collections are SolrCloud collections that have been configured to handle streaming operations. The only requirement for worker collections is that the Solr nodes in the collection must be configured with a StreamHandler to handle the streaming requests.

Worker collections can hold an index with data or they can be empty collections that only exist to handle streaming operations.

Stream Partitioning

Each worker node is shuffled 1/Nth of the search results. So if there are 7 worker nodes specified each node will be shuffled 1/7th of the search results. The partitioning is setup under the covers by the StreamHandler on the worker nodes.

Search results are hash partitioned by keys. This ensures that all results with the same key(s) are sent to the same worker node. When coupled with the ability to sort by keys, hash partitioning provides shuffling functionality that allows many streaming operations to be parallelized.

ParallelStream

A TupleStream decorator called the ParallelStream wraps a TupleStream and sends it to N worker nodes to be operated on in parallel.

The ParallelStream serializes the byte code of the underlying TupleStream and sends it to the workers before the TupleStream is opened. The workers deserialize the TupleStream, open the stream, read the Tuples and stream them back to the ParallelStream.

The TupleStream sent to the workers can be programmed to stream only aggregated results or the top N results back to the ParallelStream.

UniqueStream

The example below also introduces a TupleStream decorator called the UniqueStream. The UniqueStream emits a unique stream of Tuples. Tuples are de-duplicated based on a Comparator.

The example will show how the UniqueStream can be used with the ParallelStream to perform the unique operation in parallel.

Simple Parallel Streaming Transformation Example
org.apache.solr.client.solrj.io.*;
import java.util.*;

public class StreamingClient {

   public static void main(String args[]) throws IOException {
      String zkHost = args[0];
      String collection = args[1];
      String workerCollection = args[2];

      Map props = new HashMap();
      props.put("q", "*:*");
      props.put("qt", "/export");
      props.put("sort", "fieldA asc");
      props.put("fl", "fieldA");

      //Set the partition keys
      props.put("partitionKeys", "fieldA");
      
      CloudSolrStream streamC = new CloudSolrStream(zkHost, 
                                                    collection, 
                                                    props);
      Comparator comp = new AscFieldComp("fieldA");
      UniqueStream streamU = new UniqueStream(streamC, comp);
       
      ParallelStream streamP = new ParallelStream(zkHost,
                                                  workerCollection,
                                                  streamU,
                                                  7,
                                                  comp);
                             
      try {
       
        streamP.open();
        while(true) {
          
          Tuple tuple = streamP.read();
          if(tuple.EOF) {
             break;
          }

          String uniqueValue = tuple.getString("fieldA");
          //Print all the unique values
          System.out.println(uniqueValue);

      } finally {
        streamP.close();
      }
   }
}
The example above does the following things:
  1. Creates a CloudSolrStream that connects to a SolrCloud collection.
  2. In the query parameters the sort is set to fieldA asc. This will return the Tuples in ascending order based on fieldA.
  3. The query parameters also set the partitionKeys to fieldA. This will hash partition the search results on fieldA.
  4. The CloudSolrStream is then decorated with a UniqueStream. Note the Comparator which is used by the UniqueStream. In this case the AscFieldComp will cause the UniqueStream to emit unique Tuples based on the value in fieldA. The sorting and partitioning of search results on fieldA allows the UniqueStreams operation to be run in parallel on the worker nodes.
  5. The UniqueStream is decorated with a ParallelStream. Note that the ParallelStream is constructed with the following parameters:
    • zkHost
    • The name of the workerCollection
    • The TupleStream that is being sent to the workers
    • The number of workers
    • A Comparator to order the results coming back from the workers
  6. Then the ParalleStream is opened, iterated and closed.
  7. Under the covers the ParallelStream serializes the UniqueStream and sends it to 7 worker nodes. The workers open the UniqueStream, read the unique Tuples and stream them back to the ParallelStream.

Streaming Aggregation

The last several blogs dealt primarily with streaming transformations. There's still a lot more to talk about with streaming transformation but it's time to start exploring streaming aggregation.

Streaming aggregation deals with computing analytics from TupleStreams. Solrj.io is designed to support two models of streaming aggregation:
  • Metrics: Metric aggregations gather analytics on TupleStreams as Tuples are read without transforming the stream. When all the Tuples have been read from the stream the aggregates are placed on the final EOF Tuple. This technique is likely the preferred approach for aggregations of low to moderate cardinality.  
  • Roll-ups: Roll-up aggregations transform the TupleStream by rolling up aggregates and emitting a stream of Tuples that contain the aggregates. This is likely the preferred approach for handling aggregation of high cardinality fields and time series aggregation
This blog deals specifically with Metric aggregation. Future blogs will demonstrate Roll-up aggregations.

Custom TupleStreams

The initial solrj.io release doesn't come with TupleStream implementations that perform streaming aggregation. Future releases will almost certainly contain aggregation support, but for now we'll need to add some custom TupleStreams to demonstrate in-line aggregation.

You'll see that developing custom TupleStreams is extremely easy, and that's partially why they were left out of the initial release. Building specific aggregations is very easy. The hard part is designing a general purpose aggregation library that handles most of the general use cases.

The EOF Tuple

Metric aggregations were designed to work in concert with the EOF Tuple. You'll recall that the EOF Tuple marks the end of the stream. The EOF Tuple does not contain data from the stream, but it is designed to hold Metric aggregations.

Metric Aggregation

Metric aggregation is accomplished by decorating a TupleStream with an aggregating TupleStream that gathers metrics as the Tuples are read. When the aggregating TupleStream reads the EOF Tuple, it places it's aggregates onto the EOF Tuple. The EOF Tuple will then contain all the in-line aggregates.

Simple Example

The example below will demonstrate a very simple case of in-line aggregation. This code is deliberately kept short and simplistic to demonstrate the mechanics of in-line aggregation. Future blogs will show how to do more interesting things like combining streaming transformations with streaming aggregation, parallel aggregation and using richer aggregate data structures.

The Code Sample
import org.apache.solr.client.solrj.io.*;
import java.util.*;

public class StreamingClient {

   public static void main(String args[]) throws IOException {
      String zkHost = args[0];
      String collection = args[1];

      Map props = new HashMap();
      props.put("q", "*:*");
      props.put("qt", "/export");
      props.put("sort", "fieldA asc");
      props.put("fl", "fieldA,fieldB,fieldC");
      
      CloudSolrStream cloudstream = new CloudSolrStream(zkHost, 
                                                    collection, 
                                                    props);

      //Wrap the CloudSolrStream in a CountStream
      //The CountStream implementation is below

      CountStream countStream = new CountStream(cloudStream);
      
      try {
       
        countStream.open();
        while(true) {
          
          Tuple tuple = countStream.read();
          if(tuple.EOF) {
             long count = tuple.getLong("count");
             System.out.println("Tuple count:"+count);
             break;
          }

          String fieldA = tuple.getString("fieldA");
          String fieldB = tuple.getString("fieldB");
          String fieldC = tuple.getString("fieldC");
          System.out.println(fieldA + ", " + fieldB + ", " + fieldC);
        }
      
      } finally {
       countStream.close();
      }
   }
}

public class CountStream implements TupleStream {

  private TupleStream source;
  private int count;

  public CountStream(TupleStream source) {
     this.source = source;
  }

  public Tuple read() throws IOException {
     Tuple tuple = source.read();
   
     if(tuple.EOF) {
       tuple.put("count", count);
       return tuple;
     }

     ++count;
     return tuple;
  }

  public List children() {
    List children = new ArrayList();
    children.add(source);
    return children;
  }

  public void setStreamContext(StreamContext context) {
    source.setStreamContext(context);
  }

  public void open() throws IOException {
    source.open();
  }

  public void close() throws IOException {
    source.close();
  }
}

The code above is doing the following things:

  • Creating a CloudSolrStream
  • Wrapping the CloudSolrStream with a CountStream
  • Opening, reading and closing the CountStream.
  • The CountStream class implements the TupleStream interface. The read() method simply reads a Tuple from the TupleStream it has wrapped and increments a counter.
  • When the EOF Tuple is encountered it places the count onto the EOF Tuple and returns it.
  • The StreamingClient reads the Tuples from the CountStream and prints the field values from each Tuple. The CountStream does not change the Tuples that are emitted from the CloudSolrStream that it has wrapped. When the StreamingClient encounters the EOF Tuple it reads the count field, which was placed there by the CountStream.

Friday, April 17, 2015

Computing the Complement of Two TupleStreams

This is the third blog in the series about Solrj.io. In the last blog, the concept of TupleStream Decorators was introduced and a simple example was shown using the ReducerStream. In this blog we'll introduce a stream Decorator called the MergeStream and see how it can be used with the ReducerStream to calculate the Complement between two streams.

The MergeStream

The MergeStream unions two TuplesStreams ordering the Tuples based on a Comparator. When used in combination with the ReducerStream this allows Tuples from different streams to be operated on as one unit. This provides the building block for a number of relational operations including intersections, complements and joins.

A Simple Example: Computing the Complement

To find the complement of two sets, A and B, you need to find all the Tuples in set B that are NOT in set A. The example below will show how easy it is to compute the complement of streams of any size using the MergeStream and ReducerStream.

import org.apache.solr.client.solrj.io.*;
import java.util.*;

public class StreamingClient {

   public static void main(String args[]) throws IOException {
      String zkHost = args[0];
      String collectionA = args[1];
      String collectionB = args[2];

      //Query properties for StreamA

      Map propsA = new HashMap();
      propsA.put("q", "*:*");
      propsA.put("qt", "/export");
      propsA.put("sort", "fieldA asc");
      propsA.put("fl", "fieldA");
      
      //Construct StreamA

      CloudSolrStream streamA = new CloudSolrStream(zkHost, 
                                                    collectionA, 
                                                    propsA);

      //Include the Collection name with each tuple

      streamA.setTrace(true); 

      //Query properties for StreamB

      Map propsB = new HashMap();
      propsB.put("q", "*:*");
      propsB.put("qt", "/export");
      propsB.put("sort", "fieldA asc");
      propsB.put("fl", "fieldA");
      
      //Construct StreamB

      CloudSolrStream streamB = new CloudSolrStream(zkHost, 
                                                    collectionB, 
                                                    propsB);

      //Include the Collection name with each tuple
 
      streamB.setTrace(true);

      //Union streamA and streamB ordering by fieldA

      Comparator comp = new AscFieldComp("fieldA");  
      MergeStream streamM = MergeStream(streamA, streamB, comp);

      //Wrap a ReducerStream around the MergeStream

      ReducerStream streamR = new ReducerStream(streamM, comp);

      try {
       
        streamR.open();
        while(true) {
          
          Tuple tuple = streamR.read();
          if(tuple.EOF) {
             break;
          }

          List<Map> maps = tuple.getMaps();
          
          boolean complement = true;
          for(Map map : maps) {

             //Trace is turned on
             //so the "_COLLECTION_" field will be set.

             String col = (String)map.get("_COLLECTION_");

             //Check to see if there is a Tuple
             //from collectionA

             if(col.equals(collectionA)) {
                complement = false;
                break;
             }             
          }

          if(complement) {
            for(Map map : maps) {
               String fieldA = (String)map.get("fieldA");
               
               //print the complement
               System.out.println("Complement:"+ fieldA);
            }
          }
        }
      } finally {
          streamR.close();
      }
   }
}
The example above does the following things:
  1. Creates two CloudSolrStreams that connect to two different collections.
  2. Notice that the CloudSolrStream.setTrace() is called so that the Tuples can be traced back to their collection.
  3. The query parameters for both streams sort on fieldA asc. So both streams will return the Tuples in ascending order based on the fieldA.
  4. The two CloudSolrStreams are wrapped with a MergeStream. Note the Comparator, which also orders by fieldA ascending. This will union the two streams ordering Tuples by fieldA.
  5. The MergeStream is then wrapped with a ReducerStream. Note again that the Comparator is on fieldA so it will group Tuples that have the same value based on fieldA.
  6. The Tuples that have been grouped together can be retrieved by the Tuple.getMaps() method. The Tuple group may contain Tuples from streamA and streamB
  7. To compute the Complement all we need to do is look for Tuple groups that don't have a Tuple from streamA.
  8. Notice the "_COLLECTION_" field which is present because CloudSolrStream.setTrace() was set to true.

Monday, April 13, 2015

TupleSteam Decorators

This is the second blog in the series about Solrj.io. The previous blog covered how to use the basic TupleStream implementations to open, read and close a TupleStream from a SolrCloud collection.

This blog will describe how to use Decorators to perform operations on TupleStreams. There are two types of operations that are typically performed on TupleStreams.

  • Streaming Transformation: These types of operations transform the underlying stream(s). Examples of streaming transformations include: unique, group by, rollup, union, intersect, complement, join etc...) 
  • Streaming Aggregation: These types of operations gather metrics and build aggregations on the underlying streams. These types of operations include: sum, count, average, min, max etc...)

This blog will focus on Streaming Transformation. Followup blogs with cover Streaming Aggregation.

Streaming Transformation

Solrj.io comes with a core set of TupleStream Decorators that can transform underlying TupleStreams. These TupleStream Decorators wrap one or more TupleStreams and perform operations on the Tuples while reading from the underlying streams.

TupleStream Decorators implement the TupleStream interface, so they too can be wrapped by other TupleStream Decorators to build up complex operations.

Sorted Streams

Many of the TupleStream Decorators rely on the sort order of the underlying TupleStreams to perform memory efficient streaming transformations on very large streams.

For example the UniqueStream emits a stream of unique tuples based on a Comparator. The underlying TupleStream must be sorted by the same fields as this Comparator. This allows the UniqueStream to iterate very large TupleStreams and de-duplicate the Tuples using very little memory.

The ReducerStream

The Solrj.io package comes with a TupleStream Decorator called the ReducerStream. The ReducerStream can be thought of as a swiss-army knife for streaming transformations. It can be used on it's own or as a building block for performing higher level streaming transformations such as Joins.

The ReducerStream Iterates over a TupleStream and buffers Tuples that are equal based on a Comparator.

This allows tuples to be grouped by common field(s). The read() method emits one Tuple per group. The fields of the emitted Tuple reflect the first tuple encountered in the group.

The Tuple.getMaps() method returns all the Tuples in the group. This method returns a list of maps (including the group head), which hold the data for each Tuple in the group.


A Simple Example

The example below shows how the ReducerStream can be used to aggregate information about sessions. Session aggregation is a fairly typical use case for Hadoop.

The reason session aggregation is often performed in Hadoop is that it requires the ability to do large scale distributed grouping. This is because sessions typically consist of one or more log records that need to be processed as a unit. These records could be stored on any server in the cluster. Map/Reduce provides a method for doing this type of large scale distributed grouping.

The ReducerStream example shows how this type of operation can be done using Solrj.io.

A couple of notes before jumping into the example:

  • The example below collects all of the sessions in one place. Future blogs will show how to use SolrCloud Worker Collections to perform these types of operations in parallel.
  • The example below shows an operation which is similar in nature to Map/Reduce. It's important to understand that the ReducerStream is just one of many possible Streaming Transformations that can be performed using Solrj.io. Solrj.io is not Map/Reduce, it's a generic API for performing streaming operations.

The Code
import org.apache.solr.client.solrj.io.*;
import java.util.*;

public class StreamingClient {

   public static void main(String args[]) throws IOException {
      String zkHost = args[0];
      String collection = args[1];

      Map props = new HashMap();
      props.put("q", "*:*");
      props.put("qt", "/export");
      props.put("sort", "sessionID asc");
      props.put("fl", "sessionID,sku,price");
      
      CloudSolrStream cstream = new CloudSolrStream(zkHost, 
                                                    collection, 
                                                    props);
      Comparator comp = new AscFieldComp("sessionID");
      ReducerStream rstream = new ReducerStream(cstream, comp);

      try {
       
        rstream.open();
        long sessionCount = 0;
        while(true) {
          
          Tuple sessionTuple = rstream.read();
          if(sessionTuple.EOF) {
             break;
          }

          ++sessionCount;
          String sessionID = sessionTuple.getString("sessionID");

          List<Map> maps = sessionTuple.getMaps();
          
          double sessionPrice = 0.0;
          for(Map map : maps) {
             String recordSessionID = (String) map.get("sessionID")
             assert(sessionID.equals(recordSessionID);
             String sku = (String)map.get("sku");
             double price = (double)map.get("price); 
             sessionPrice += price;
             System.out.println(sessionID+" : "+sessionPrice);
         }

         System.out.println("Session Count:"+sessionCount);

      } finally {
       rstream.close();
      }
   }
}
The example above does the following things:
  1. Creates a CloudSolrStream that connects to a SolrCloud collection.
  2. In the query parameters the sort is set to sessionID asc. This will return the Tuples in ascending order based on the sesssionID field.
  3. Decorates the CloudSolrStream with a ReducerStream. Note the Comparator, which is used by the ReducerStream to group the Tuples. In this case the AscFieldComp will cause the ReducerStream to group the Tuples based on the "sessionID" field.
  4. Then the ReducerStream is opened, iterated and closed.
  5. During the iteration the read() method emits one Tuple for each sessionID. The Tuple.getMaps() method is used to access all the Tuples in the session.

Tuesday, April 7, 2015

The Streaming API (Solrj.io) : The Basics

This is the first blog in a series about the new Streaming API for SolrCloud, otherwise known as Solrj.io.

What is Solrj.io?

Solrj.io is a new Solrj package that provides the programming framework for SolrCloud's new parallel computing framework.

Solrj.io is a Java API located in the org.apache.solr.client.solrj.io package.

The Base Abstractions

Solrlj.io has two important base abstractions:
Tuple: A Tuple is simply a thin wrapper around a Map of key/value pairs. In it's most basic form a Tuple represents a single record from a search result set.

TupleStream: The TupleStream abstracts a search result set as a stream of Tuples. The TupleStream provides a simple interface for opening, reading and closing streams of Tuples.
The Base Implementations

Solrj.io has two base TupleStream implementations:
SolrStream: Abstracts a search result set from a single Solr instance as a stream of Tuples. 
CloudSolrStream: Abstracts a search result set from a SolrCloud collection as a stream of Tuples. 
CloudSolrStream is a SolrCloud smart client. You provide CloudSolrStream with a zkHost and a collection name and it will automatically pick a replica from each shard to perform the query.

CloudSolrStream queries each shard and performs a streaming merge of the results based on an internal Comparator. This streaming merge allows CloudSolrStream to merge very large result sets from the shards with very little memory.
A Simple Example
import org.apache.solr.client.solrj.io.*;
import java.util.*;

public class StreamingClient {

   public static void main(String args[]) throws IOException {
      String zkHost = args[0];
      String collection = args[1];

      Map props = new HashMap();
      props.put("q", "*:*");
      props.put("qt", "/export");
      props.put("sort", "fieldA asc");
      props.put("fl", "fieldA,fieldB,fieldC");
      
      CloudSolrStream cstream = new CloudSolrStream(zkHost, 
                                                    collection, 
                                                    props);
      try {
       
        cstream.open();
        while(true) {
          
          Tuple tuple = cstream.read();
          if(tuple.EOF) {
             break;
          }

          String fieldA = tuple.getString("fieldA");
          String fieldB = tuple.getString("fieldB");
          String fieldC = tuple.getString("fieldC");
          System.out.println(fieldA + ", " + fieldB + ", " + fieldC);
        }
      
      } finally {
       cstream.close();
      }
   }
}

The example above does the following things:
  1. Creates a map with query parameters. Notice the "qt" parameter is set to "/export". This will cause the search to be handled by the /export handler, which is designed to sort and export entire result sets.
  2. Constructs a CloudSolrStream passing it the zkHost, collection and query parameters.
  3. Opens the stream and reads the Tuples until a Tuple with the EOF property set to true is encountered. This EOF Tuple signifies the end of the stream.
  4. For each Tuple fieldA, fieldB and fieldC are read. These fields are present because they are specified in the field list (fl) query parameter.
  5. The Tuples will be returned in ascending order of fieldA. This is specified by the sort parameter.
Streamed Results 

It's important to understand that the Tuples in the example are streamed. This means that the example can handle result sets with millions of documents without running into memory issues.

TupleStream Decorators 

The code sample shows a simple iteration of Tuples. What if we want to do something more exciting with the stream of Tuples?

TupleStream Decorators can be used to perform operations on TupleStreams. A TupleStream Decorator wraps one or more TupleStreams and performs operations on the Tuples as they are read. 

In the next blog we'll see how TupleStream Decorators can be used to both transform TupleStreams and gather metrics on TupleStreams.


Sunday, March 29, 2015

Parallel Computing with SolrCloud

Coming in Solr 5.1 is a new general purpose parallel computing framework for SolrCloud. This blog introduces the main concepts of the parallel computing framework. Followup blogs will cover these concepts in greater detail.

The Framework

The new parallel computing framework has three main components:
  • Shuffling
  • Worker Collections
  • Streaming API (Solrj.io)
An overview of these components is covered below. Follow-up blogs will cover each of these topics in detail.

Shuffling

Shuffling will be a familiar concept for people who have worked with other parallel computing frameworks such as Hadoop. In general terms, shuffling involves the sorting and partitioning of records. Sorting and partitioning provides the foundation for many parallel computing activities.

Starting with Solr 5.1 SolrCloud has the ability to shuffle entire result sets. Basically this means sorting and partitioning entire result sets.

The efficient sorting of entire result sets is handled by Solr's "/export" handler. The /export handler uses a stream sorting technique that efficiently sorts and streams very large result sets. 

The partitioning of result sets is handled by a new Solr query, called a HashQuery, that hash partitions search results based on arbitrary fields in the documents.

Worker Collections

Solr 5.1 introduces the concept of Worker Collections. A Worker Collection can be any SolrCloud collection of any size. It can contain search indexes or it can exist only to perform work within the parallel computing framework. Worker Collections are created and managed using SolrCloud's standard Collections API.

Each node in a Worker Collection has a new request handler called a Stream Handler. The Stream Handlers, on each worker node, perform operations on partitioned result sets in parallel.

The operations that worker nodes perform in parallel are defined by the Streaming API.

Streaming API (Solrj.io)

The Streaming API is a new Java API located in the solrj.io package. The Streaming API abstracts Solr search results as streams of Tuples called TupleStreams. A Tuple is a thin wrapper for a Map with key/value pairs that represent a search result.

The TupleStream interface defines a simple API for opening, reading and closing streams of search results.

Two of the core TupleStreams are the SolrStream and the CloudSolrStream. The SolrStream class abstracts a stream of search results from a single Solr instance. The CloudSolrStream class abstracts a stream of search results from a SolrCloud collection.

TupleStream's can be wrapped or decorated by other TupleStreams. Decorator streams perform operations on their underlying streams. Streaming operations typically fall into one of two categories:

  • Streaming Transformation: These types of operations transform the underlying stream(s). Examples of streaming transformations include: unique, group by, rollup, union, intersect, complement, join etc...) 
  • Streaming Aggregation: These types of operations gather metrics and build aggregations on the underlying streams. These types of operations include: sum, count, average, min, max etc...)

A core set of TupleStream decorators come with the Streaming API and developers can create their own decorator implementations that perform customized streaming operations.

The Streaming API also includes a ParallelStream implementation. The ParallelStream wraps a TupleStream and pushes it to a Worker Collection to be executed in parallel. Using partition keys TupleStreams can be partitioned evenly across worker nodes. This allows the Streaming API to perform parallel partitioned relational algebra on TupleStreams.

For a deeper dive into the Solrj.io package you can read the next blog in the series.



Streaming Expression's Powerful New Data Structures

In the next release of Solr, the Streaming Expression library includes two powerful new data structures called list and cell.  In this blog...