How to use MarkLogic in Apache Spark applications

by Hemant Puranik

By now you may have heard that Apache Spark is the fastest growing project in open source ‘Big Data’ community. Spark is quickly becoming an attractive platform for developers as it addresses multiple data-processing use cases through different components - Spark SQL, Spark Streaming, Machine Learning, Graph engine and so on. While Spark provides the framework for cluster computing, it does not include its own distributed data persistence technology (i.e. a database or a file system) and relies on technologies like Hadoop/HDFS for that purpose. In fact Spark can work with any Hadoop-compatible data formats. Since the MarkLogic Connector for Hadoop already provides the interface for using MarkLogic as a MapReduce input source, I decided to use the same connector as an input source for my Spark application.

Spark Example Application: MarkLogicWordCount

MarkLogicWordCount is an example application that actually does much more than a simple word count. This application is designed to work with simple XML documents that contain name:value pairs. An example XML document is shown below:

The complaint XML documents are stored within a MarkLogic database. The MarkLogicWordCount application loads all the documents from the database into Spark RDD (Resilient Distributed Dataset) and performs following operations:

  1. Extracts XML elements as name:value pairs where element content is the value.
  2. Counts distinct values for each element name.
  3. Counts occurrences of each distinct name:value pair across the document set.
  4. Saves the results from step 2 and 3 into the specified HDFS target location.

The application produces the output as shown below:

As you can see in the output above, the first line indicates that 11 distinct product names were found in the given data set (result of step 2) and following 11 lines indicates the the number of times (or the number of documents in which) each of the 11 product names were found within the data set (result of step3). The output contains this statistic profile for all element names and associated name:value pairs found in the data set.

Setup

MarkLogic Connector for Hadoop is supported against Hortonworks Data Platform (HDP) and the Cloudera Distribution of Hadoop (CDH). Recent releases of HDP and CDH come bundled with Apache Spark. Please refer to Getting Started with the MarkLogic Connector for Hadoop in order to setup the Hadoop connector. Also refer to setup instructions specific to MarkLogicWordCount.

Let’s walk through the code

Although Spark is developed using Scala, it supports application development in Java and Python as well in addition to Scala. MarkLogicWordCount is a Java application.

Loading MarkLogic documents in Spark RDD

The first logical step within the application is to load documents from the MarkLogic database into a Spark RDD. The new RDD is created using the newAPIHadoopRDD method on the SparkContext object. The MarkLogic-specific configuration properties are passed using a Hadoop Configuration object. These properties are loaded from the configuration XML that is passed as an input argument to the MarkLogicWordCount application. The properties include username, password, MarkLogic host, port, database name etc. We use the DocumentInputFormat class that enables reading documents from the MarkLogic instance into DocumentURI and MarkLogicNode objects as key-value pairs. The following code demonstrates how to create an RDD based on documents within a MarkLogic database.

Apply transformation to RDD

Now that we have loaded the documents in Spark RDD, let’s apply the necessary transformations to produce the intended output. Spark RDD supports two types of operations: transformations and actions. Transformations create a new dataset and actions return a value or save the dataset back to the persistence layer. While performing RDD operations, Spark’s API relies heavily on passing a function that is defined within a Spark application that will be executed in a Spark cluster. For example, map is a transformation that you can apply to a Spark RDD. The map API will take a user-defined function, pass each dataset element through that function and return a new RDD that represents the results. Since Spark is developed in Scala, which is fundamentally a functional programming language, this programming paradigm is very natural to Scala developers. Since we are developing the MarkLogicWordCount application in Java, we will implement the functions by extending the classes that are available in the Spark Java API, specifically in the package org.apache.spark.api.java.function. Also we will use the special Java RDDs in spark.api.java that provide the same methods as Scala RDDs but take Java functions.

Within the MarkLogicWordCount example we apply the following transformation steps to the RDD in which we have loaded the documents from the MarkLogic database.

  1. Transform the dataset of XML documents into the dataset that contains the XML name→value pairs found in each input XML document.
  2. Group the values of the same element name to transform name→value pairs to name→valueList
  3. Count distinct values for each element name to transform name→valueList to name→distinctValueCount
  4. Now again use the name→value pair dataset created in step 1 and transform it to a dataset that maps each name:value pair to its occurrence count i.e. name:value→count.
  5. Aggregate the occurrence count of each distinct name:value pair i.e. name:value→AggregateOccurrenceCount. This is the value distribution for each element.
  6. Filter out the name:value pairs that occur rarely within the dataset i.e. value distribution is statistically insignificant. Note that depending on the use case (for example anomaly detection) you may want to filter out the most commonly occurring name:value pairs and keep the rarely occurring ones in the dataset.
  7. Combine the dataset produced in step 3 (name→distinctValueCount) and the dataset produced in step 6 (name:value→AggregateOccurenceCount) into a single dataset.
  8. Sort the combined dataset so that each distinct value count for each element name and its associated value distribution statistics appear in alphabetical order.
  9. Finally save the dataset to the target HDFS location that is specified as an input argument to MarkLogicWordCount.

The following code demonstrates how these steps are accomplished within the MarkLogicWordCount example.

All RDD transformations in Spark are lazy. They are applied only when Spark encounters an action. In this case the only action performed on the RDD is saveAsTextFile. By default each time you perform an action on a transformed RDD, that RDD may be recomputed. However, Spark allows you to cache the RDD in memory using the persist or cache methods for much faster access whenever you need to perform multiple actions on the RDD.

Note that in the code above, for many of the transformation steps, custom functions are passed as input parameters. To take a look at the implementation of these functions refer to the complete source code of MarkLogicWordCount. Feel free to download the source code, build it and try out the MarkLogicWordCount application yourself.

Conclusion

As demonstrated in this article, you can use the MarkLogic Connector for Hadoop as an input source to Spark and take advantage of the Spark framework to develop your ‘Big Data’ applications on top of MarkLogic.

Comments