This blog will describe how to use Decorators to perform operations on TupleStreams. There are two types of operations that are typically performed on TupleStreams.
- Streaming Transformation: These types of operations transform the underlying stream(s). Examples of streaming transformations include: unique, group by, rollup, union, intersect, complement, join etc...)
- Streaming Aggregation: These types of operations gather metrics and build aggregations on the underlying streams. These types of operations include: sum, count, average, min, max etc...)
This blog will focus on Streaming Transformation. Followup blogs with cover Streaming Aggregation.
Streaming Transformation
Solrj.io comes with a core set of TupleStream Decorators that can transform underlying TupleStreams. These TupleStream Decorators wrap one or more TupleStreams and perform operations on the Tuples while reading from the underlying streams.
TupleStream Decorators implement the TupleStream interface, so they too can be wrapped by other TupleStream Decorators to build up complex operations.
Sorted Streams
Many of the TupleStream Decorators rely on the sort order of the underlying TupleStreams to perform memory efficient streaming transformations on very large streams.
For example the UniqueStream emits a stream of unique tuples based on a Comparator. The underlying TupleStream must be sorted by the same fields as this Comparator. This allows the UniqueStream to iterate very large TupleStreams and de-duplicate the Tuples using very little memory.
The ReducerStream
The Solrj.io package comes with a TupleStream Decorator called the ReducerStream. The ReducerStream can be thought of as a swiss-army knife for streaming transformations. It can be used on it's own or as a building block for performing higher level streaming transformations such as Joins.
The ReducerStream Iterates over a TupleStream and buffers Tuples that are equal based on a Comparator.
This allows tuples to be grouped by common field(s). The read() method emits one Tuple per group. The fields of the emitted Tuple reflect the first tuple encountered in the group.
The Tuple.getMaps() method returns all the Tuples in the group. This method returns a list of maps (including the group head), which hold the data for each Tuple in the group.
A Simple Example
The example below shows how the ReducerStream can be used to aggregate information about sessions. Session aggregation is a fairly typical use case for Hadoop.
The reason session aggregation is often performed in Hadoop is that it requires the ability to do large scale distributed grouping. This is because sessions typically consist of one or more log records that need to be processed as a unit. These records could be stored on any server in the cluster. Map/Reduce provides a method for doing this type of large scale distributed grouping.
The ReducerStream example shows how this type of operation can be done using Solrj.io.
A couple of notes before jumping into the example:
- The example below collects all of the sessions in one place. Future blogs will show how to use SolrCloud Worker Collections to perform these types of operations in parallel.
- The example below shows an operation which is similar in nature to Map/Reduce. It's important to understand that the ReducerStream is just one of many possible Streaming Transformations that can be performed using Solrj.io. Solrj.io is not Map/Reduce, it's a generic API for performing streaming operations.
The Code
import org.apache.solr.client.solrj.io.*; import java.util.*; public class StreamingClient { public static void main(String args[]) throws IOException { String zkHost = args[0]; String collection = args[1]; Map props = new HashMap(); props.put("q", "*:*"); props.put("qt", "/export"); props.put("sort", "sessionID asc"); props.put("fl", "sessionID,sku,price"); CloudSolrStream cstream = new CloudSolrStream(zkHost, collection, props); Comparator comp = new AscFieldComp("sessionID"); ReducerStream rstream = new ReducerStream(cstream, comp); try { rstream.open(); long sessionCount = 0; while(true) { Tuple sessionTuple = rstream.read(); if(sessionTuple.EOF) { break; } ++sessionCount; String sessionID = sessionTuple.getString("sessionID"); List<Map> maps = sessionTuple.getMaps(); double sessionPrice = 0.0; for(Map map : maps) { String recordSessionID = (String) map.get("sessionID") assert(sessionID.equals(recordSessionID); String sku = (String)map.get("sku"); double price = (double)map.get("price); sessionPrice += price; System.out.println(sessionID+" : "+sessionPrice); } System.out.println("Session Count:"+sessionCount); } finally { rstream.close(); } } }The example above does the following things:
- Creates a CloudSolrStream that connects to a SolrCloud collection.
- In the query parameters the sort is set to sessionID asc. This will return the Tuples in ascending order based on the sesssionID field.
- Decorates the CloudSolrStream with a ReducerStream. Note the Comparator, which is used by the ReducerStream to group the Tuples. In this case the AscFieldComp will cause the ReducerStream to group the Tuples based on the "sessionID" field.
- Then the ReducerStream is opened, iterated and closed.
- During the iteration the read() method emits one Tuple for each sessionID. The Tuple.getMaps() method is used to access all the Tuples in the session.