Realtime Search for Hadoop

In the previous part of this article series we focused on the efficient storage of log data in Hadoop. We described how to store the data in Hadoop’s MapFiles, and we tweaked the configuration settings for increased data storage capacity and greater retrieval speed. Today, we discuss how to perform near real-time searches on up to 36.6 billion log messages by a clever combination of Hadoop with Lucene and Solr.

Search requirements

Let’s start with a quick recap of the data we have stored in Hadoop. Our system will store up to 20 TB of log messages. A single log message has an average size of around 600 byte. This means that we will have up to 36.6 billion distinct log messages stored in Hadoop. Our search requirements state that 95% percent of all search queries should display the results in less than 10 seconds. As will be shown, our solution is actually much faster than required (2-3 seconds per query).

However, these requirements could not be met with a pure Hadoop solution. In Hadoop we can use Map/Reduce jobs to retrieve data. A Map/Reduce job that has to read all the data in our cluster typically runs for about 2 hours. This is way above our response time requirements!

Introducing Lucene

The only way to be able to search in real-time is to build a search index on the stored data. We have evaluated Lucene for this purpose. Lucene seems to be a very good partner for Hadoop. It is implemented in Java, which means a very good integration in our own Java-based application. Lucene is also highly scalable and has a powerful query syntax.

Now let’s look at how our solution works. Lucene is able to distinguish multiple indexed fields in a single document. As we have learned in part 2 of this series, our log data can be split up in distinct fields like the timestamp of the message, the log level, the message itself, etc. An initial proof-of-concept implementations showed that this feature is necessary if we want to be able to search for date ranges, or ranges of different log levels.

A Lucene index consists of documents. Each document has a number of fields. The contents of a field can consist of one or more terms. The number of unique terms is on criteria for the memory requirements of an index.

One of the most important findings during the evaluation of Lucene was that the memory requirements for our index was a limiting factor. Lucene is able to index multiple terabytes of data. Usually, Lucene is used to build up a full text index of rather large files. In these scenarios, 1 TB of data consists often of maybe 10-20 million documents or even less. This is not the case with our data structure. For us, 1 TB of data consist of around 1.8 billion documents. Remember, our documents actually are log messages with a size of about 600 bytes. And we want to store 36.6 billion of them!

In our tests we have learned that we need about 1 GB heap memory for every 150 million documents in the Lucene index. These memory requirements depend heavily on the number of indexed fields, the type of the indexed fields and whether the contents of a field have to be stored in Lucene or not.

Reducing memory requirements of the Lucene index

A key factor in using Lucene is to reduce the memory requirements of the index as much as possible. We have analyzed typical search queries of our customer and identified fields which need to be indexed in order to be able to run 95% of all search queries in real-time. It turned out that we only needed 6 fields in the Lucene index:

  • timestamp of the log message
  • numeric id of the application which created this log message
  • numeric log level
  • name of application server the application is running on
  • host name of the server the application is running on
  • path in the Hadoop file system where the complete log message can be read

The next step was to optimize the memory requirements for each field. Especially the timestamp proved to be a very memory-intensive field. Our timestamps have an accuracy of milliseconds. This leads to a huge number of unique values inside the timestamp field which quickly eat up memory. On the other hand in the search queries timestamps are only specified up to an accuracy of minutes. We only need the higher accuracy to sort the search results.

After evaluating different ways to reduce the memory requirements for the timestamp field, we settled on a solution where we split up this field into 2 separate fields in the Lucene index. One field stores the timestamp with an accuracy of minutes and is indexed. The other field stores the timestamp with full accuracy and is only stored in Lucene, not indexed. With this solution we have reduced the number of unique terms that Lucene needs to handle and therefore reduced the memory requirements by a great deal. Another benefit of this approach is increased performance when searching for date ranges. The downside is that we need to sort the result set ourselves using the detailed timestamp field after getting the search results from Lucene.

Each document needs have a primary key field, which specifies how the document can be retrieved. In our case, the primary key field contains the full path inside the HDFS to the MapFile which contains the log message, followed by the index of the log message inside this MapFile. This enables us to directly access the referenced log message.

Working with Index Shards and Solr

With 1 GB heap memory required per 150 million documents, we would need 240 GB of heap memory to build an index for all documents in Hadoop. This is way too much for a single index! Using a single index has also disadvantages when you look at high availability requirements. If there is a problem with the index, you loose the real time query functionality.

We already have a number of data nodes running on the Hadoop cluster, so we can split up the Lucene index into smaller parts which can be served on each datanode. We assign 6 GB of heap memory on each data node to Lucene so that each data node is able to run the index for up to 1 billion documents.

When we realized that we had to split up the Lucene index into multiple so-called shards, we moved from Lucene to Solr. Solr is a search platform based on Lucene. It provides a Web-based interface to access the index. This means we can use a simple HTTP/REST request to index documents, perform queries and even move an index from one data node to another.

Handling Search Requests. An incoming search query (red arrow) is analyzed for the queried fields. If all fields are indexed, parallel search queries will be sent to all index shards (yellow arrow). The responses will be commulated, sorted and then returned to the user. If the search fields are not index, a new MapReduce search job will be created and submitted to the Hadoop jobtracker (green arrow).

Each data node is running a single Solr server which can host multiple Lucene indexes. New log messages are indexed into different shards, so that each index has approximately the same number of documents. This evens out the load on each shard and enables scalability. When a new data node is integrated into the cluster, the index shard on this datanode will be primarily used for indexing new documents.

For performance reasons, the index data files are stored on the local file system of each data node. Each time an index has been modified, it will be backed up into the Hadoop file system. Now we are able to quickly redeploy this index onto another data node, in case the data node which originally hosted this index has failed.

When performing a query, all index shards are queried in parallel. This ensures a fast response times. When a user formulates a query, it is first analyzed if this query can be run against the Lucene indexes. This is not the case if the user specifies search fields which are not indexed. In that case the query will be run as a Map/Reduce job. If the query can be run against the Lucene indexes, it will be forwarded to all data nodes in parallel. The results of these subqueries are cumulated and sorted. Then the log messages are read from the HDFS using the primary keys inside the Lucene index results.

If a query to a single shard fails, the search results may be incomplete, but the queries to the other shards are not affected. This greatly enhances the availability of the system. Typical query times are about 2-3 seconds with this approach, which is considerably less than stated in our customer’s requirements.

The downside of this approach is that we need a sophisticated mechanism to handle the large number of index shards. For example, we need to be able to identify which shard is running on which data node, which shards are currently not deployed on data nodes. We also need to identify failures of index shards and move indexes from one data node to another. We will discuss these topic in the next part of this series.