Integrating Apache Camel and Content Pump

by Rob Rudin

When you start developing a new application on MarkLogic, or if you're using MarkLogic for the first time, one of your first tasks typically will be to load content - JSON, XML, Office files, anything really - into a MarkLogic database. MarkLogic supports several options for loading content, with Content Pump (mlcp) providing a number of useful features, including:

  • You can load content from text files, delimited text files, aggregate XML files, Hadoop sequence files, and compressed ZIP/GZIP files
  • You can apply a transform to modify and enrich content before inserting it into a database
  • You can distribute ingestion across all nodes in a MarkLogic cluster via Hadoop

And it's easy to get started with mlcp and its command-line interface - just enter in some configuration parameters, and you're off and running.

However, many ETL processes are far more complex than just loading content from files. You may need to listen to a JMS queue for new messages, or periodically pull data from a relational database, or invoke a method in a third-party JAR file. You may also have complex routing rules that require splitting data records and later aggregating the results back together, or you may wish to send emails or generate notifications when certain events occur during your ETL process.

For scenarios such as these, developers often rely on integration tools such as Apache Camel. Camel provides a messaging and routing framework that implements many of the well-known Enterprise Integration Patterns that define solutions for common integration problems. Just as importantly, Camel provides dozens of components that allow you to easily consume data from and publish data to a variety of interfaces. With Camel, addressing the use cases mentioned above becomes a straightforward task.

So with mlcp providing a rich interface for loading content into MarkLogic, and with Camel providing a flexible integration framework, the question becomes - how can we integrate Camel and mlcp together? Fortunately, this integration is fairly simple, as Camel supports writing custom components in Java and mlcp itself is written in Java.

To show a basic example of using a Camel component that can invoke mlcp, I've put together a Github repository that demonstrates a "hot folder" - i.e. a directory that Camel watches for new files, and when a file shows up, it's ingested into MarkLogic via mlcp. This sample project uses Gradle for declaring a dependency on the sample Camel component and for running Camel via Spring.

Since we're just watching a folder and then handing files off to mlcp in this example, our Camel routes file is very simple:

The fileUri and mlcpUri properties are defined in the file - Gradle makes these easy to override. Of course, change the host/port/username/password as needed to match that of the XDBC server that mlcp will talk to. With the default fileUri, Camel will create a directory named "inbox" in your current directory if one does not yet exist.

The Camel component for mlcp knows how to parse mlcpUri into a series of arguments that will be passed into mlcp. And it supports all of the import arguments - this example just shows a collection being specified. You'll notice that there's no "input_file_path" parameter in the URI - this will be supplied automatically by the mlcp Camel component.

Finally, we need a simple Gradle task that will fire up Camel. For brevity, I've omitted the dependencies, but you can view the real Gradle file to see everything, which includes an easy way to setup a MarkLogic application for testing.

We can now run "./gradlew camelRun". Camel will watch the "inbox" directory for new files, and every time a file appears, it will be loaded into MarkLogic and into the "sample-collection" collection. You can customize this as you wish - for example, you may want to load aggregate XML files or delimited text files, and so you would just add the necessary parameters to the mlcpUri property to configure mlcp appropriately. And now that Camel is able to send files to mlcp, you can utilize all the support that Camel has for Enterprise Integration patterns and for connecting to a wide variety of interfaces.

I hope this gives you a sense of how straightforward it is to integrate mlcp with a framework such as Camel; similar frameworks exist, and as long as they provide some mechanism for adding new components, the integration should be very similar. The net benefit is that you can both quickly and easily load content into MarkLogic while reusing the tools that you're familiar with for implementing ETL processes.

If you have any stories you'd like to share about similar ETL work, or any other comments, please post them to this blog entry. If you have questions pertaining to ETL and MarkLogic, I encourage you to post those to stackoverflow with a "marklogic" tag.

Keeping Reputation Consistent

by Kasey Alderete

In designing Samplestack, a sample MarkLogic application that provides search and updates across Question & Answer content, our team wanted to demonstrate how the database’s built-in capabilities enhance the application developer’s experience. One of the differentiating features of MarkLogic’s enterprise NoSQL database is having ACID transactions, and more specifically its support for multi-document, multi-statement transactions

It was a no-brainer that we would look for ways to meet requirements and keep the data consistent through the use of transactions where appropriate.

Once we defined the application requirements, we ended up with a scenario that required the database to successfully execute multi-statement transactions.

  • When an answer is selected as the ‘accepted’ answer, parallel updates are required for the content and the user(s):
    • Update the answer to indicate its status as ‘accepted’
    • Increase the reputation of the user with the ‘accepted’ answer
    • Decrease the reputation of the user with the previously ‘accepted’ answer (if applicable)

But how does this apply to YOUR application? What considerations did we take into account to determine this was the best course of action, and how is it implemented? The documentation gives a great overview of the mechanics of transactions in MarkLogic, but I’m going to provide a little more context. I'll walk through how we implemented the scenario above while ensuring that user reputation stayed consistent with the state of the Q&A data. 

Note: When discussing “we” during the implementation – I mean my talented engineering colleague Charles Greer who provided both the brains and the muscle behind the operation.

Document Data Model

Before diving into transactions, I need to explain how the data is modeled in Samplestack since that played a large role in determining where the reputation and related updates needed to occur upon answer acceptance.

When setting up our data model we thought about types of information we’d capture:

  • Questions
  • User votes
  • Answers
  • Votes on questions and answers
  • Comments
  • Answer acceptances
  • User name
  • User reputation
  • User location
  • Question metadata (dates, tags)

We also thought about the most common types of updates users would be making:

  • Asking questions
  • Answering questions
  • Voting
  • Accepting answers

And the range of queries (searches) we needed to support for end users:

  • Using keywords/full text search
  • By user
  • By tag
  • Whether questions were resolved (had accepted answers)
  • By date

We wanted to denormalize the data where sensible to enhance searchability, but to keep frequent updates scalable and bounded.

Much of the data could be logically grouped into either “Question & Answer” (QnA) content tracking the thread of a conversation and associated metadata (tags, votes on content) or “User” data with specifics on the user’s activity and profile. Users participate in QnA threads, so the user name appeared in both groupings. Including it in the QnA document provided a way of searching for their content updates. User records allowed us to keep fields that might be more frequently changed (user location, user votes) in a separate document so we wouldn’t have to update every QnA thread where the user participated in the case of a vote or a physical move.

One key decision was to leave user reputation out of the QnA document. Reputation could change constantly (when users had their answers accepted and their content voted on), meaning every document containing a user’s reputation would have to be touched during an update. This could translate into thousands of documents for an active user participating in many QnA threads. We did not have an explicit requirement to search or sort documents by reputation, so we chose to normalize reputation and keep it in the user record only. We still wanted to show reputation alongside user names, but we accomplished that with a transform that joins search results with user reputations. Joining user reputation with QnA documents to display one page of search results cost less than performing a join for sort or search across all results.

Here’s a look at where we landed with our 2 record types modeled as JSON documents:

User Record

Key fields used for the “Contributor” role in the application (simplified for this walk-through)

Question and Answer document

Basic structure of a QnA thread (simplified for this walk-through)

This meant that for our anticipated user updates, there were never more than 3 or 4 documents requiring simultaneous database updates. We chose this limit as it made sense based on our project requirements. The key outcome was that it was a known, constrained set of document updates as a basis for future scale and performance.

Considering Transactions

Given our data model, we knew the updates required as a part of accepting an answer would span multiple documents. But what if there was a system failure? Or another user searched the database while an update was in progress? Without transactions there would exist the potential for a user reputation to be inconsistent with the QnA document denoting the accepted answer.

Q: How do I solve this problem? –Mary
A: Look it up in the documentation. -Joe (-> √ Accepted!)
Joe User Record
Reputation: 0

We wanted to be production-ready for an enterprise environment and knew that having eventually consistent data would not be good enough. If a failure or another query happened mid-update, we did not want to present an ‘unstable’ state where an answer had been accepted but no one received credit. We’d like to either roll back all updates or complete them all at once.

In the User Interface, when the Question Asker selects ‘accept’…

Q: How do I solve this problem? -Mary
A1: Look it up in the documentation. –Joe

Upon click, simultaneous updates to both the QnA and User documents must be made:

QnA Document
“accepted”: true
“acceptedAnswerId”: A1
Reputation: 1

We concluded database transactions allowed us to avoid the risks of system failure or mid-update access by another application to the same dataset. With MarkLogic, we could update multiple documents in a single transaction – keeping the reputation consistent with the QnA data.

The most common example illustrating the need for transactions are debits and credits. As Samplestack demonstrates, data integrity is not only relevant for financial applications. Situations which demand that data meet all validation rules at any given point in time require consistency. Also keep in mind when designing your data model, that normalized data does not become inconsistent. For denormalized data you may need transactions to keep redundant or related data synchronized.

Implementing Multi-Statement Transactions

Samplestack is a three-tiered application based on the Reference Architecture. The Java version of the application primarily uses the Java Client API for managing interactions between the application middle tier and the database, including in the case of updating reputation using multi-statement transactions.

Let’s walk through a selection of the application code to highlight the key components to successfully executing a transaction upon answer acceptance. Keep in mind the following code is specific to the Samplestack application and includes references to private functions defined elsewhere in the codebase (not necessarily cut-and-paste for your application).

1. Open a transaction

2. Perform the required updates

This application uses DocumentPatchBuilder to make the document changes.

3. Either rollback or commit

One tricky part is to make sure and account for error scenarios and to include the rollback. Remember too that because this is a multi-statement transaction, updates will not be available to others until you commit. The updates will, however, be available to you in real-time, for search for example during the transaction. Part of the benefit of performing the update via MarkLogic, is that search and other indexes are updated real-time during a transaction. You’ve made the latest information available while keeping reputation consistent.

Armed with this overview of the design and implementation considerations for multi-document, multi-statement transactions, you should be well on your way maintaining data consistency in your own applications!

Additional Resources

By the way, MLCP just got better

by Dave Cassel

MarkLogic 8 came out recently and it has an amazing set of new features. With all the big new things, MarkLogic Content Pump (MLCP) got some improvements that you might have missed.

Alternate Database

With MLCP 1.3-1, you can now specify the database that you want to interact with. In the past, if you didn't already have an XDBC app server pointing to your target database, you'd have to set one up so that MLCP could run import, export, or copy operations there. Not any more -- now with the "-database" option, you can specify which database you want to work with.

Even better, with the Enhanced HTTP Server on port 8000, you don't even need to set up an XDBC app server. Out of the box, I can do this:

$ export -host localhost -port 8000 -username admin -password admin -database Documents -output_file_path docs
$ export -host localhost -port 8000 -username admin -password admin -database Modules -output_file_path modules

More Data Types

The updated MLCP also helps you with new data types.

MLCP just got that much more helpful!

blogroll Blogroll