Monday, February 27, 2017

Anomaly Detection in Solr 6.5

Solr 6.5 is just around the corner and along with it comes the new significantTerms Streaming Expression. The significantTerms expression queries a Solr Cloud collection but instead of returning the matching documents, it returns the significant terms in the matching documents.

To determine the significance of a term a formula is used which considers the number of times the term appears in the foreground set versus the number of times the term appears in the background set. The foreground set is the search result. The background set is all the documents in the index.

The significantTerms function assigns higher scores to terms that are more frequent in the foreground set and rarer in the background set, in relation to other terms.

For example:

Term     Foreground    Background
A           100                   103
B           101                   1000

Term A would be considered more significant then term B, because term A is much more rare in the background set.

This model for scoring terms can be very useful for spotting anomalies in the data. Specifically we can easily surface terms that are unusually aligned with specific result sets.


A Simple Example with the Enron Emails


For this example we'll start with a single Enron email address (tana.jones@enron.com) and ask the question:

Which address has the most significant relationship with tana.jones@enron.com?

We can start looking for an answer by running an aggregation. Since we're using Streaming Expressions we'll use the facet expression:

facet(enron,
         q="from:tana.jones@enron.com",
         buckets="to",
         bucketSorts="count(*) desc",
         bucketSizeLimit="100",
         count(*))

This expression queries the index for tana.jones@enron.com in the from field and gathers the facet buckets and counts from the to field. It returns the top 100 facet buckets from the to field ordered by the counts in descending order.

This expression returns the top 100 addresses that tana.jones@enron.com has emailed. The top five results look like this:

{
"result-set": {
"docs": [{
"count(*)": 789,
"to": "alan.aronowitz@enron.com"
}, {
"count(*)": 376,
"to": "frank.davis@enron.com"
}, {
"count(*)": 372,
"to": "mark.taylor@enron.com"
}, {
"count(*)": 249,
"to": "brent.hendry@enron.com"
}, {
"count(*)": 197,
"to": "bob.bowen@enron.com"
}, ...

This gives some useful information but does it answer the question?  The top address is alan.aronowitz@enron.com with a count of 789. Is this the most significant relationship?

Let's see if the significantTerms expression can surface an anomaly. Here is the expression:

significantTerms(enron, q="from:tana.jones@enron.com", field="to", limit="20")

The expression above runs the query from:tana.jones@enron.com on the enron collection. It then collects the top 20 significant terms from the to field.

The top five results look like this:

{
"result-set": {
"docs": [{
"score": 54.370163,
"term": "michael.neves@enron.com",
"foreground": 130,
"background": 132
}, {
"score": 53.911552,
"term": "lisa.lees@enron.com",
"foreground": 186,
"background": 243
}, {
"score": 53.806202,
"term": "frank.davis@enron.com",
"foreground": 376,
"background": 596
}, {
"score": 51.760098,
"term": "harry.collins@enron.com",
"foreground": 106,
"background": 150
}, {
"score": 51.471268,
"term": "edmund.cooper@enron.com",
"foreground": 132,
"background": 222
}

We have indeed surfaced an interesting anomaly. The first term is michael.neves@enron.com. This address has a foreground count of 130 and background count of 132. This means that michael.neves@enron.com has received 132 emails in the entire corpus and 130 of them have been from tana.jones@enron.com. This signals a strong connection.

alan.aronowitz@enron.com, the highest total receiver of emails from tana.jones@enron.com, isn't in the top 5 results from the significantTerms function.

alan.aronowitz@enron.com shows up at number 8 in the list:
{
"score": 49.847652,
"term": "alan.aronowitz@enron.com",
"foreground": 789,
"background": 2117
}

Notice that the foreground count is 789 and background count is 2117. This means that 37% of the emails received by alan.aronowitz@enron.com were from tana.jones@enron.com.

98% of the emails received by michael.neves@enron.com came from tana.jones@enron.com.

significantTerms VS scoreNodes


The significantTerms function works directly with the inverted index and can score terms from a single-value, multi-value and text fields.

The scoreNodes function scores tuples emitted by graph expressions. This allows for anomaly detection in distributed graphs. A prior blog covers the scoreNodes function in more detail.

In Solr 6.5 the scoreNodes scoring algorithm was changed to better surface anomalies. The significantTerms and scoreNodes functions now use the same scoring algorithm.

Use Cases


Anomaly detection has interesting use cases including:

1) Recommendations: Finding products that are unusually connected based on past shopping history.

2) Auto-Suggestion: Suggesting terms that go well together based on indexed query logs.

3) Fraud Anomalies: Finding vendors that are unusually associated with credit card fraud.

4) Text Analytics: Finding significant terms relating to documents in a full text search result set.

5) Log Anomalies: Finding IP addresses that are unusually associated with time periods of suspicious activity.

Thursday, February 16, 2017

Solr's Shiny New Apache Calcite SQL Integration


Solr's new Apache Calcite SQL integration is in Lucene/Solr's master branch now. This blog will discuss Solr's potential as a distributed SQL engine, some thoughts on Apache Calcite, what's currently supported with Solr's Apache Calcite integration and what might be coming next.


Solr as Distributed SQL Engine


The initial SQL interface, which released with Solr 6.0, uses the Presto project's SQL parser to parse the SQL and then rewrites the queries as Solr Streaming Expressions.

The goals of the initial SQL release were focused around supporting SQL aggregations using both MapReduce and Solr's native faceting capabilities. And also to support Solr's full text query language in the SQL predicate.

Even with the limited set of goals it became clear that Solr could be a special SQL engine. There are very few distributed SQL engines that can push down so much processing into the engine, and also rise up above the engine when needed to perform streaming parallel relational algebra. And then of course there is the predicate. Few existing SQL engines can compete with Solr's rich search predicates which have been developed over a period of 10+ years.

Last but not least is the performance. Solr is not a batch engine adapted for request/response. Solr is a request/response engine from the ground up. Solr's search performance, analytic performance and instant streaming capabilities make it one of the fastest distributed SQL engines available.

The Apache Calcite Integration


Kevin Risden broke ground on Solr's Apache Calcite integration in March of 2016. It wasn't clear at that time that Apache Calcite would be the right fit, but over time it became clear that it was exactly what we needed. There were really two choices for how we could implement the next phase of Solr's SQL integration:

1) Stick with the approach of using a SQL Parser only, and work directly with the parse tree to build the physical query plan in Streaming Expressions.

2) Use a broader framework and plugin rules that would push down parts of the SQL query that we wanted to control.

There were pros and cons to both approaches. The main pro for using just a SQL parser is that we would have total control over the process once the query was parsed. This means we would never run into a scenario where we couldn't implement something that leveraged Solr's capabilities.

The main con to just using the SQL parser is that we would have been responsible for implementing everything, including things like a complete JDBC driver. This is not a small undertaking.

The main pros and cons of using a broader framework were exactly the opposite: less control but the ability to leverage existing features in the framework.

Kevin was very much in favor of using a broader framework, but I was not convinced that we could take full advantage of Solr's capabilities unless we controlled everything.

But in the end Kevin broke ground embedding the Apache Calcite framework into Solr. In the open source world, working code tends to win out. Based on his initial work, I agreed that we should move forward using the wider Apache Calcite framework.

Kevin continued working on the integration. Along the way Cao Manh Dat joined in and added the aggregation support to the branch that Kevin was working on.

Eventually I joined in as well, building on top of the work that Kevin and Dat already contributed. My main focus was to ensure that the initial Apache Calcite implementation was comparable in features and performance to Solr's existing SQL integration.

As I spent more time working with Apache Calcite I came to really appreciate what the project offered. Apache Calcite allows you to selectively push down parts of the SQL implementation such as the predicate, sort, aggregation and joins. It gives you almost full control if you want it, but allows you to leverage any part of the framework that you choose to use. For example you can push down nothing if you want, or you can push down just the predicate or just the sort.

Apache Calcite also provides two very important things: a cost based query optimizer and a JDBC driver. Solr's initial JDBC driver only implemented part of the specification and the specification is large. Implementing a cost based query optimizer is a daunting task. With Apache Calcite we get these features almost for free. We still have to provide hooks into them, but we don't have to implement them.

What's Currently Supported in Lucene/Solr Master


The initial Apache Calcite integration includes:

  • limited and unlimited selects. Unlimited selects stream the entire result set regardless of the size.
  • Support for Solr search predicates including support for embedding an entire Solr query using the "_query_" field. This means all Solr query syntax is supported including complex full text, graph query, geo-spatial, fuzzy, moreLikeThis etc.
  • Support for score in the field list and order by in queries that have a limit clause.
  • Support for field aliases.
  • Support for faceted and MapReduce aggregations with the aggregationMode parameter.
  • Support for aggregations on multi-valued fields when in facet mode.
  • Support for multi-value fields in simple selects.
  • Parallel execution of MapReduce aggregations on worker nodes.
  • Support for aggregations without a group by clause. These are always pushed down into the search engine.
  • Support for select distinct queries in both faceted and MapReduce mode.
  • Support for group by aggregations in both faceted and MapReduce mode.
  • Support for sorting on fields in the index as well as sorting on aggregations.
  • Support for the having clause. In MapReduce mode the having clause is pushed down to the worker nodes now.


What's Coming Next


Now that the Apache Calcite integration is in master we are free to begin adding new features on a regular basis. Here are some features that are on the top of the list:

1) Support for * in the field list.
2) Automatic selection of aggregationMode (facet or MapReduce). Having Solr choose the right aggregation mode based on the cardinality of fields being aggregated.
3) Support for SELECT ... INTO ...
4) Support for arithmetic operations on fields and aggregations (select (a*b) as c from t). This is now supported in Streaming Expressions (SOLR-9916).
5) Expanded aggregation support.
5) Support for UNION, INTERSECT, JOIN, using Streaming Expressions parallel relational algebra capabilities and Apache Calcites query optimizer.

Tuesday, February 14, 2017

Recommendations With Solr's Graph Expressions Part 1: Finding Products That Go Well Together

Graph Expressions were introduced in Solr 6.1. Graph Expressions are part of the wider Streaming Expressions library. This means that you can combine them with other expressions to build complex and interesting graph queries.

Note: If you're not familiar with graph concepts such as nodes and edges it may useful to first review the Wiki on Graph Theory.

This blog is part one of a three part series on making recommendations with Graph Expressions.

The three parts are:

1) Finding products that go well together.
2) Using the crowd to find products that go well with a user. 
3) Combining Graph Expressions to make a personalized recommendation.

Before diving into the first part of the recommendation lets consider the data. For all three blogs we'll be using a simple SolrCloud Collection called baskets in the following format:

userID     basketID   productID 
user1        basket1      productA      
user1        basket1      productA    
user1        basket2      productL      
user2        basket3      productD
user2        basket3      productM
...

The baskets collection holds all the products that have been added to baskets. Each record has a userID, basketID and productID. We'll be able to use Graph Expressions to mine this data for recommendations.

One more quick note before we get started. One of the main expressions we'll be using is the nodes expression. The nodes expression was originally released as the gatherNodes expression. Starting with Solr 6.4 the nodes function name can be used as a shorthand for gatherNodes. You can still also use gatherNodes if you like, they are both a pointer to the same function.

Now lets get started!

Finding Products That Go Well Together


One approach to recommending products is to start with a product the user has selected and find products that go well with that product.

The Graph Expression below finds the products that go well with productA:

scoreNodes(top(n=25,
                          sort="count(*) desc",
                          nodes(baskets,
                                     random(baskets, q="productID:productA", fl="basketID", rows="250"),
                                     walk="basketID->basketID",
                                     gather="productID",
                                     fq="-productID:productA",
                                     count(*))))

Let's explore how the expression works.

Seeding the Graph Expression


The inner random expression is used to seed the Graph Expression:

random(baskets, q="productID:productA", fl="basketID", rows="250")
                                   
The random expression is not a Graph Expression. But in this scenario its used to seed a Graph Expression with a set of root nodes to begin the traversal.

The random expression returns a pseudo random set of results that match the query. In this case the random expression is returning 250 basketsIDs that contain the productID productA.

The random expression serves two important purposes in seeding the Graph Expression:

1) It limits the scope of the graph traversal to 250 basketIDs. If we seed the graph traversal with all the basketIDs that have productA, we could potentially have a very large number of baskets to work with. This could cause a slow traversal and memory problems as Graph Expressions are tracked in memory.

2) It adds an element of surprise to the recommendation by providing a different set of baskets each time. This can result in different recommendations because each recommendation is seeded with a different set of basketIDs.


Calculating Market Basket Co-Occurrence with the Nodes Expression


Now lets explore the nodes expression which wraps the random expression. The nodes expression performs a breadth first graph traversal step, gathering nodes and aggregations along the way. For a full explanation of the nodes expression you can review the online documentation.

Lets look at exactly how the example nodes expression operates:

nodes(baskets, 
           random(baskets, q="productID:productA", fl="basketID", rows="250"),
           walk="basketID->basketID",
           fq="-productID:productA",
           gather="productID",
           count(*))

Here is an explanation of the parameters:
  1. baskets: This is the collection that the nodes expression is gathering data from.
  2. random expression: Seeds the nodes expression with a set of pseudo random basketIDs that contain productA.
  3. walk: Walks a relationship in the graph. The basketID->basketID construct tells the nodes expression to take the basketID in the tuples emitted by the random expression and search them against the basketID in the index.
  4. fq: Is a filter query that filters the results of the walk parameter. In this case it filters out records with productA in the productID field. This stops productA from being a recommendation for itself.
  5. gather: Specifies what field to collect from the rows that are returned by the walk parameter. In this case it is gathering the productID field.
  6. count(*): This is a graph aggregation, that counts the occurrences of what was gathered. In this case it counts how many times each productID was gathered. 

In plain english this nodes expression is gathering the productIDs that co-occur with productA in baskets, and counting how many times the products co-occur.

Scoring the Nodes To Find the Most Significant Product Relationships


With the output of the nodes expression we already know which products co-occur most frequently with productA. But there is something we don't know yet: how often the products occur across all the baskets. If a product occurs in a large percentage of baskets, then it doesn't have any particular relevance to productA.

This is where the scoreNodes function does it's magic.


scoreNodes(top(n=25,
                          sort="count(*) desc",
                          nodes(baskets,
                                     random(baskets, q="productID:productA", fl="basketID", rows="250"),
                                     walk="basketID->basketID",
                                     gather="productID",
                                     fq="-productID:productA",
                                     count(*))))

In expression above the top function emits the top 25 products based on the co-occurrence count. The top 25 products are then scored by the scoreNodes function.

The scoreNodes function scores the products based on the raw co-occurrence counts and their frequency across the entire collection.

The scoring formula is similar to the tf*idf scoring algorithm used to score results from a full text search. In the full text context tf (term frequency) is the number of times the term appears in the document. idf (inverse document frequency) is computed based on the document frequency of the term, or how many documents the term appears in. The idf is used to provide a boost to rarer terms.

scoreNodes uses the same principal to score nodes in a graph traversal. The count(*) aggregation is used as the tf value in the formula. The idf is computed for each node, in this case productID,  based on global statistics across the entire collection. The effect of the scoreNodes algorithm is to provide a boost to nodes that are rarer in the collection.

The scoreNodes functions adds a field to each node tuple called nodeScore, which is the relevance score for the node.

Now we know which products have the most significant relationship with productA.


Can We Still Do Better?


Yes. We now know which products have the most significant relationship with productA. But we don't know if the user will have an interest in the product(s) we're recommending. In the next blog in the series we'll explore a graph expression that uses connections in the graph to personalize the recommendation.

Solr temporal graph queries for event correlation, root cause analysis and temporal anomaly detection

Temporal graph queries will be available in the 8.9 release of Apache Solr. Temporal graph queries are designed for key log analytics use ...