Solr Document Processing with Apache Camel - Part II

In my last post, Solr Document Processing with Apache Camel - Part 1, I made the case for using Apache Camel as a document processing platform. In this article, our objective is to create a simple Apache Camel standalone application for ingesting products into Solr. While this example may seem a bit contrived, it is intended to provide a foundation for future articles in the series.

Our roadmap for today is as follows:

  1. Set up Solr
  2. Create a Camel Application
  3. Index sample products into Solr via Camel

Downloading, Installing and Running Solr

In this section we will perform a vanilla SolrCloud deployment and start out with the schemaless gettingstarted collection.

To begin, open a terminal and move into a directory where we can install Apache Solr.

$ curl -O https://archive.apache.org/dist/lucene/solr/5.4.1/solr-5.4.1.tgz
$ tar -xzf solr-5.4.1.tgz
$ cd solr-5.4.1 

Next, let’s use the quick start method of starting Solr in SolrCloud mode. This will:

  1. Create a two-node SolrCloud cluster.
  2. Node one will listen on port 8983 and run embedded ZooKeeper on port 9983.
  3. Node two will listen on port 7574.
  4. Create a sample collection called gettingstarted.
$ bin/solr start -e cloud -noprompt

GitHub Project

All the code described in this article is available in GitHub in the camel-dp-part2 folder.

The Maven Project

Let’s start by creating a Maven project. To make things simple, we will create our project skeleton using the Maven Archetype plugin. 

$ cd ..
$ mvn archetype:generate \
-DgroupId=com.gastongonzalez.blog.camel \
-DartifactId=camel-dp-part2 \
-Dversion=1.0.0-SNAPSHOT \
-DarchetypeGroupId=org.apache.maven.archetypes \
-DarchetypeArtifactId=maven-archetype-quickstart \
-DinteractiveMode=false

Once the project has been created, change into the Maven project. 

$ cd camel-dp-part2

Adding Apache Camel Dependencies

As I mentioned in the last post, one of the key drivers for selecting Camel over other integration frameworks and libraries is the relatively light set of dependencies. For our application we only need three external dependencies:

  1. camel-core - Camel’s core library. 
  2. camel-gson - A data format to support unmarshalling of JSON using the GSON library.
  3. camel-solr - component - A component for indexing Solr.

Let’s start by editing the pom.xml and add a property for the version of Camel as well as define the version of  SLF4J.

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <camel.version>2.17.0</camel.version>
        <slf4j.version>1.7.5</slf4j.version>
    </properties>

Add our Camel and logging dependencies.

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-core</artifactId>
    <version>${camel.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-solr</artifactId>
    <version>${camel.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-gson</artifactId>
    <version>${camel.version}</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>${slf4j.version}</version>
</dependency>

Adding a Logger

Create a resources directory to hold the logger configuration.

$ mkdir -p src/main/resources

Add a logger configuration: src/main/resources/log4j.properties

log4j.rootLogger=INFO, out
log4j.logger.com.gastongonzalez.blog.camel=DEBUG
log4j.logger.org.apache.camel=INFO
log4j.logger.org.apache.camel.impl.converter=INFO
log4j.logger.org.apache.camel.util.ResolverUtil=INFO
log4j.appender.out=org.apache.log4j.ConsoleAppender
log4j.appender.out.layout=org.apache.log4j.PatternLayout
log4j.appender.out.layout.ConversionPattern=[%30.30t] %-30.30c{1} %-5p %m%n
log4j.throwableRenderer=org.apache.log4j.EnhancedThrowableRenderer

Our Product Data Source

Now that we have the basic structure for our application in place, let's talk about our data source and data model. To make things simple, we'll assume our products are available in one or more JSON files stored on the file system.

We’ll keep the product JSON simple for now and only concern ourselves with a few fields.

Note: I extracted the full Best Buy product data set, grabbed the first 9 and then filtered out all fields except: product ID, name and SKU. (For those interested, I use jq for performing JSON transformations such as this.) In a later article, we’ll work with the entire data set and full data model from Best Buy.

{
  "products": [
    {
      "sku": 1319418,
      "productId": 1219052607961,
      "name": "Geek Squad - Add On: Basic Audio Setup"
    },
    {
      "sku": 1402108,
      "productId": 1219051118053,
      "name": "Geek Squad - Steering Wheel Control Installation"
     }
 ]
}

Let’s create a data directory at the root level of our project to store our JSON. Then, download the JSON from GitHub and copy it to our data directory.

$ mkdir -p data/solr
$ curl -O https://raw.githubusercontent.com/GastonGonzalez/camel-to-solr/master/camel-dp-part2/data/solr/product-sample.json
$ mv product-sample.json data/solr

Thinking in Terms of EIPs

Now that we know what our source data looks like, let’s think about how we can use Camel to solve our product ingestion problem. 

  1. Read one or more product JSON files from the file system.
  2. Unmarshall the JSON to a GSON object (preferably to a POJO that uses SolrJ @Field annotations).
  3. Submit each product POJO to the Solr indexer.

We can even translate these steps using Enterprise Integration Patterns (EIPs).

 

The diagram above mirrors our initial ingestion steps.

  1. Consume JSON files from the filesystem using a polling consumer. Essentially, monitor a directory for the addition of files. In our case, we can think of this as a hot folder where files can be processed as they are written to the data directory (i.e., data/solr). Each file that is added to this directory will produce one Camel message.
  2. Split each JSON product object into a separate Camel message.
  3. Submit each product to the Solr endpoint for indexing.

In subsequent articles we will talk about how we can really flex our investment in Camel and perform various content enrichment, transformation and data cleansing processing steps after our splitter and prior to indexing. For now, we are simply indexing the products without modification.

Implementing our Camel Application

Let’s start by creating our POJOs. Essentially, we need our POJOs to do two things: allow us to unmarshall the JSON to a graph of Java objects and annotate our products with SolrJ's @Field annotation so that we can index our products as JavaBeans.

If we look at our JSON we have an array of product objects specified by products. Naturally, we will need a POJO called com.gastongonzalez.blog.camel.Products to represent our array of Product objects.
package com.gastongonzalez.blog.camel;

import java.util.ArrayList;

public class Products {

    private ArrayList<Product> products;

    public ArrayList<Product> getProducts() {
        return products;
    }

    public void setProducts(ArrayList<Product> products) {
        this.products = products;
    }
}

Now, we define the product itself (com.gastongonzalez.blog.camel.Product). Here we specify our SolrJ field annotations.

package com.gastongonzalez.blog.camel;

import org.apache.solr.client.solrj.beans.Field;

public class Product
{
    @Field
    private String name;

    @Field
    private String productId;

    @Field
    private String sku;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getProductId() {
        return productId;
    }

    public void setProductId(String productId) {
        this.productId = productId;
    }

    public String getSku() {
        return sku;
    }

    public void setSku(String sku) {
        this.sku = sku;
    }

    @Override
    public String toString() {
        final StringBuilder sb = new StringBuilder("Product{");
        sb.append("name='").append(name).append('\'');
        sb.append(", productId='").append(productId).append('\'');
        sb.append(", sku='").append(sku).append('\'');
        sb.append('}');
        return sb.toString();
    }
}

With the POJOs defined, we can now move onto implementing our Camel Application using the Java DSL. Begin by editing src/main/java/com/gastongonzalez/blog/camel/App.java.

package com.gastongonzalez.blog.camel;

import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.gson.GsonDataFormat;
import org.apache.camel.component.solr.SolrConstants;
import org.apache.camel.impl.DefaultCamelContext;

public class App
{
    public static void main( String[] args ) throws Exception
    {
        CamelContext context = new DefaultCamelContext();

        final GsonDataFormat gsonDataFormat = new GsonDataFormat();
        gsonDataFormat.setUnmarshalType(Products.class);

        context.addRoutes(new RouteBuilder() {
            @Override
            public void configure() throws Exception
            {
                from("file:data/solr?noop=true")
                    .unmarshal(gsonDataFormat)
                        .setBody().simple("${body.products}")
                            .split().body()
                                .setHeader(SolrConstants.OPERATION, constant(SolrConstants.OPERATION_ADD_BEAN))
                   .to("solrCloud://localhost:8983/solr/gettingstarted?zkHost=localhost:9983&collection=gettingstarted");
            }
        });

        context.start();
        Thread.sleep(10000);
        context.stop();
    }
}

Not too bad. It looks pretty expressive in just about 30 lines, but let's walk through it any way.

  1. The CamelContext is essentially the Camel runtime and takes care of all the plumbing.
  2. GsonDatatFormat defines the class that is used for unmarshalling the JSON.
  3. The routing engine and routes are a core Camel concept. They are responsible for routing messages. All routes are registered with the Camel Context by calling addRoutes()
  4. RouteBuilder is used to create one or more routes. In our simple use case we only have one route that begins with from() and ends with to().
    1. from() - Here we make use of the out of the box Camel File component. Among other things, components are responsible for creating endpoints. All Camel endpoints are expressed using a URI. We define our file endpoint with two pieces of information: which directory to monitor (data/solr) and specify that we do not want to do anything to files in the directory after Camel has processed them (noop=true). There are many URI configuration options. For example, if we wanted to delete any files after they have been processed by the file endpoint, we could replace noop=true with delete=true.
    2. unmashall() - Unmarshall converts our JSON file to an instance of our Products POJO. At this point, the Camel message body is no longer a File, instead a Java object.
    3. setBody() - Set body allows us to change the contents of the message body. We use Camel's simple expression language to obtain the Product ArrayList and set the ArrayList as the message body. 
    4. split() - We use the split EIP to take our current message body (an array of Product POJOs) and split each item in the array into a separate Camel message.
    5. setHeader() - In addition to body, a message also contains headers which can be inspected by  processors, endpoints, etc. At a minimum, the Solr endpoint requires that we specify the type of message operation to perform. In our case, we want to index the POJO that is in the message body. Refer to the Solr component documentation for other operations.
    6. to()  - Similar to our file endpoint, we define a Solr endpoint using a URI. Here we specify that we want to use SolrCloud (solrCloud). Other supported options include solr, if you need to index a standalone Solr instance, and solrs, if the standalone Solr instance is available via HTTPS. We also define the Solr host and port. In the case of SolrCloud, this value is not considered since we are using ZooKeeper. ZooKeeper is required for SolrCloud and therefore is specified as a URI configuration using zkHost. Lastly, we specify our collection, gettingstarted, to index.
    7. context.start() - Starts the Camel runtime. 
    8. Thread.sleep() - Since this is only a sample application we sleep for 10 seconds. This is actually more time than is needed to ensure that our messages are processed. In the next post, we will revisit the application's life cycle. In fact, Camel has a pluggable shutdown strategy, so even if we shutdown Camel too soon (while messages are still in-flight), they will still be processed.
    9. context.stop() - Shutdown Camel gracefully.  

Indexing Solr

Now that we have Solr running and have our Camel application completed, let’s compile and run it.

$ mvn clean compile exec:java -Dexec.mainClass=com.gastongonzalez.blog.camel.App

Once the application is shut down, visit http://localhost:8983/solr/#/ and check that documents are indexed in the gettingstarted collection.

 

Conclusion

Well, that was a long post, but hopefully it gives you a feel for Camel and its expressive DSL. In our next article we will extend this application and integrate the following features:

  1. Add support for property replacement tokens and environment variable overrides. We can do better than hard coding hostnames and ports.
  2. Refactor the application's lifecycle management.
  3. Package the application as an assembled JAR for easy distribution.
  4. Starting working with data enrichment and the aggregation EIP.