Showing posts with label streaming. Show all posts
Showing posts with label streaming. Show all posts

Tuesday, April 7, 2015

The Streaming API (Solrj.io) : The Basics

This is the first blog in a series about the new Streaming API for SolrCloud, otherwise known as Solrj.io.

What is Solrj.io?

Solrj.io is a new Solrj package that provides the programming framework for SolrCloud's new parallel computing framework.

Solrj.io is a Java API located in the org.apache.solr.client.solrj.io package.

The Base Abstractions

Solrlj.io has two important base abstractions:
Tuple: A Tuple is simply a thin wrapper around a Map of key/value pairs. In it's most basic form a Tuple represents a single record from a search result set.

TupleStream: The TupleStream abstracts a search result set as a stream of Tuples. The TupleStream provides a simple interface for opening, reading and closing streams of Tuples.
The Base Implementations

Solrj.io has two base TupleStream implementations:
SolrStream: Abstracts a search result set from a single Solr instance as a stream of Tuples. 
CloudSolrStream: Abstracts a search result set from a SolrCloud collection as a stream of Tuples. 
CloudSolrStream is a SolrCloud smart client. You provide CloudSolrStream with a zkHost and a collection name and it will automatically pick a replica from each shard to perform the query.

CloudSolrStream queries each shard and performs a streaming merge of the results based on an internal Comparator. This streaming merge allows CloudSolrStream to merge very large result sets from the shards with very little memory.
A Simple Example
import org.apache.solr.client.solrj.io.*;
import java.util.*;

public class StreamingClient {

   public static void main(String args[]) throws IOException {
      String zkHost = args[0];
      String collection = args[1];

      Map props = new HashMap();
      props.put("q", "*:*");
      props.put("qt", "/export");
      props.put("sort", "fieldA asc");
      props.put("fl", "fieldA,fieldB,fieldC");
      
      CloudSolrStream cstream = new CloudSolrStream(zkHost, 
                                                    collection, 
                                                    props);
      try {
       
        cstream.open();
        while(true) {
          
          Tuple tuple = cstream.read();
          if(tuple.EOF) {
             break;
          }

          String fieldA = tuple.getString("fieldA");
          String fieldB = tuple.getString("fieldB");
          String fieldC = tuple.getString("fieldC");
          System.out.println(fieldA + ", " + fieldB + ", " + fieldC);
        }
      
      } finally {
       cstream.close();
      }
   }
}

The example above does the following things:
  1. Creates a map with query parameters. Notice the "qt" parameter is set to "/export". This will cause the search to be handled by the /export handler, which is designed to sort and export entire result sets.
  2. Constructs a CloudSolrStream passing it the zkHost, collection and query parameters.
  3. Opens the stream and reads the Tuples until a Tuple with the EOF property set to true is encountered. This EOF Tuple signifies the end of the stream.
  4. For each Tuple fieldA, fieldB and fieldC are read. These fields are present because they are specified in the field list (fl) query parameter.
  5. The Tuples will be returned in ascending order of fieldA. This is specified by the sort parameter.
Streamed Results 

It's important to understand that the Tuples in the example are streamed. This means that the example can handle result sets with millions of documents without running into memory issues.

TupleStream Decorators 

The code sample shows a simple iteration of Tuples. What if we want to do something more exciting with the stream of Tuples?

TupleStream Decorators can be used to perform operations on TupleStreams. A TupleStream Decorator wraps one or more TupleStreams and performs operations on the Tuples as they are read. 

In the next blog we'll see how TupleStream Decorators can be used to both transform TupleStreams and gather metrics on TupleStreams.


Sunday, March 29, 2015

Parallel Computing with SolrCloud

Coming in Solr 5.1 is a new general purpose parallel computing framework for SolrCloud. This blog introduces the main concepts of the parallel computing framework. Followup blogs will cover these concepts in greater detail.

The Framework

The new parallel computing framework has three main components:
  • Shuffling
  • Worker Collections
  • Streaming API (Solrj.io)
An overview of these components is covered below. Follow-up blogs will cover each of these topics in detail.

Shuffling

Shuffling will be a familiar concept for people who have worked with other parallel computing frameworks such as Hadoop. In general terms, shuffling involves the sorting and partitioning of records. Sorting and partitioning provides the foundation for many parallel computing activities.

Starting with Solr 5.1 SolrCloud has the ability to shuffle entire result sets. Basically this means sorting and partitioning entire result sets.

The efficient sorting of entire result sets is handled by Solr's "/export" handler. The /export handler uses a stream sorting technique that efficiently sorts and streams very large result sets. 

The partitioning of result sets is handled by a new Solr query, called a HashQuery, that hash partitions search results based on arbitrary fields in the documents.

Worker Collections

Solr 5.1 introduces the concept of Worker Collections. A Worker Collection can be any SolrCloud collection of any size. It can contain search indexes or it can exist only to perform work within the parallel computing framework. Worker Collections are created and managed using SolrCloud's standard Collections API.

Each node in a Worker Collection has a new request handler called a Stream Handler. The Stream Handlers, on each worker node, perform operations on partitioned result sets in parallel.

The operations that worker nodes perform in parallel are defined by the Streaming API.

Streaming API (Solrj.io)

The Streaming API is a new Java API located in the solrj.io package. The Streaming API abstracts Solr search results as streams of Tuples called TupleStreams. A Tuple is a thin wrapper for a Map with key/value pairs that represent a search result.

The TupleStream interface defines a simple API for opening, reading and closing streams of search results.

Two of the core TupleStreams are the SolrStream and the CloudSolrStream. The SolrStream class abstracts a stream of search results from a single Solr instance. The CloudSolrStream class abstracts a stream of search results from a SolrCloud collection.

TupleStream's can be wrapped or decorated by other TupleStreams. Decorator streams perform operations on their underlying streams. Streaming operations typically fall into one of two categories:

  • Streaming Transformation: These types of operations transform the underlying stream(s). Examples of streaming transformations include: unique, group by, rollup, union, intersect, complement, join etc...) 
  • Streaming Aggregation: These types of operations gather metrics and build aggregations on the underlying streams. These types of operations include: sum, count, average, min, max etc...)

A core set of TupleStream decorators come with the Streaming API and developers can create their own decorator implementations that perform customized streaming operations.

The Streaming API also includes a ParallelStream implementation. The ParallelStream wraps a TupleStream and pushes it to a Worker Collection to be executed in parallel. Using partition keys TupleStreams can be partitioned evenly across worker nodes. This allows the Streaming API to perform parallel partitioned relational algebra on TupleStreams.

For a deeper dive into the Solrj.io package you can read the next blog in the series.



Solr temporal graph queries for event correlation, root cause analysis and temporal anomaly detection

Temporal graph queries will be available in the 8.9 release of Apache Solr. Temporal graph queries are designed for key log analytics use ...