Solutions

MarkLogic Data Hub Service

Fast data integration + improved data governance and security, with no infrastructure to buy or manage.

Learn More

Learn

Stay On Top Of Everything MarkLogic

Be the first to know! News, product information, and events delivered straight to your inbox.

Sign Me Up

Community

Stay On Top Of Everything MarkLogic

Be the first to know! News, product information, and events delivered straight to your inbox.

Sign Me Up

Company

Stay On Top Of Everything MarkLogic

Be the first to know! News, product information, and events delivered straight to your inbox.

Sign Me Up

Getting Started with Apache NiFi: Migrating from Relational to MarkLogic

by Gabo Manuel

The typical process of migrating data from a relational database into MarkLogic has always translated to ad-hoc code or csv dumps to be processed by MLCP. Apache NiFi introduces a code-free approach of migrating content directly from a relational database system into MarkLogic. Here we walk you through getting started with migration data from a relational database into MarkLogic.

Note: The following steps assume that MarkLogic v9 (or higher) is already running and is available locally. Additionally, Java 8 needs to be installed and configured.

NiFi Setup

Check out instructions on how to download or build and run NiFi from source code, as well as a quick guide in getting it up and running.

MarkLogic’s NAR files are available for download, along with instructions on what to do with them.

With NiFi running, load your browser to: http://localhost:8080/nifi. You might be surprised with a blank page on fresh start of the scripts, it takes a while to load.

Apache NiFi Toolbar
Figure 1: Apache NiFi toolbar

Now that we have our NiFi instance running, we can start configuring our processes. For additional information about the available processors, visit the Apache NiFi documentation.

Defining the Flow

We want to establish a basic flow with the following steps:

  1. Retrieve records from the relational database
  2. Convert each row into a json document
  3. Ingest data into MarkLogic

Let’s get started.

NiFi Basics: How to add a processor

To add a Processor to your dataflow canvas, drag the Processor icon () from the top of the screen down to the canvas and drop it there:

Apache NiFi Toolbar

This will generate a dialog box that prompts you to select the Processor to add (see Figure 2). You can use the search box on the upper right to reduce the number of Processors displayed.

Add Processor dialogue
Figure 2: Add Processor screen

Flow Step 1: Retrieving relational data

As shown in the image above, there are several processor options to retrieve data from a relational database. QueryDatabaseTable, and GenerateTableFetch has the concept of remembering a column’s last value, i.e. a “Maximum-Value Column”. This is optimal for processing a table whose records never get updated, e.g. audit logs, since previously processed rows will not be processed again.

Add ExecuteSQL Processor

Let’s add an ExecuteSQL Processor to the dataflow canvas. This processor requires the user to supply the full SQL statement, giving room for de-normalizing the records at relational database level via joins. ExecuteSQL will retrieve the records from the database and return the records in Apache Avro format.

Configure ExecuteSQL Processor

Configure the Processor by right-clicking on the Processor and selecting Configure:

ExecuteSQL properties
Figure 3: ExecuteSQL Processor Properties

The "Database Connection Pooling Service" needs to be created and configured by selecting “Create new service…”.

Create New Service dropdown

Refer to the screenshot in Figure 4 for Database Connection Pooling Service property values. Be sure to supply the appropriate connection URL, Driver Class Name, and Driver Location for your relational database. The example below is for a MySQL database running on my local machine.

ExecuteSQL Processor’s Database Connection Pooling Service
Figure 4: ExecuteSQL Processor’s Database Connection Pooling Service properties

Important: Remember to enable the connection pool after configuration:

screenshot of Enable

The "SQL select query" property value is the actual SQL select query used to get the data from the relational database. It can be as simple as "select * from employee" or as complex as a 5-table join.

If you have a large number of records or expect a very long transaction period, you may want to adjust the "Max Wait Time" property.

Make sure to review the SCHEDULING tab to ensure it only runs when you want. The default is to run almost instantaneously after the first execution completes. If you only want to run the process once, consider having a really high run schedule for timer driven strategy, or specifying a fixed CRON value for execution.

Configure Processor Scheduling
Figure 5: Configuring Processor Scheduling

Lastly, check the SETTINGS tab and auto-terminate for cases of failures:

auto-terminate upon failure
Figure 6: Under Settings, auto-terminate upon failure

Flow Step 2: Convert each row into a json document

This goal needs to be split into two steps:

  1. Convert Avro to JSON
  2. Split the entire result set into individual rows/records.

Convert Avro to JSON

Add a ConvertAvroToJSON Processor and configure it as follows:

ConvertAvroToJSON processor properties
Figure 7: ConvertAvroToJSON Processor Properties

The "Wrap Single Record" property ensures that we will always be processing an array.

If set to False, the record is not processed as an array: {"emp_id" : 1, "first_name" : "Ruth", "last_name" : "Shaw", ... }

If set to True, the record will be processed as an array: [{"emp_id" : 1, "first_name" : "Ruth", "last_name" : "Shaw", ... }]

Under SETTINGS tab, we auto-terminate on failure. (See Figure 6).

Split the entire result set into individual rows/records

Add a SplitJSON Processor with the following configuration:

SplitJSON processor properties
Figure 8: SplitJSON Processor Properties

If you are familiar with XPath, this article helps translate XPath to JsonPath.

Note: This JsonPath is outside the scope of MarkLogic and will not work in your XQuery or Server-side JavaScript out of the box.

Under SETTINGS, auto-terminate on failure and original:

auto-terminate upon failure and original

Flow Step 3: Ingest data into MarkLogic

Apache NiFi is meant to run continuously and the starting Processor will execute at the configured frequency until it is stopped or disabled. Therefore, for consistency during re-runs, we recommend using the existing primary key as part of the resulting document URI. For example, an employee with ID of 1 will result in URI of /employee/1.json. In order to do this, we extract the primary key and store it as part of the FlowFile attribute.

Add an EvaluateJsonPath Processor and configure it as follows:

EvaluateJsonPath processor properties
Figure 9: EvaluateJsonPath Processor Properties

The “Destination” is stores the extracted primary key value into the flowfile-attribute instead of overwriting the FlowFile’s content. The emp_id property is added using the () icon. The value of this property is a JsonPath expression to be evaluated.

Under SETTINGS, we auto-terminate on failure and unmatched.

For our final step, we push the documents into MarkLogic using a PutMarkLogic Processor, which makes use of MarkLogic’s Data Movement SDK behind the scenes, allowing documents to be written in batches, improving throughput and reducing latency.

If you search for “marklogic” when adding a Processor, two processors will be available:

EvaluateJsonPath processor properties

We configure the PutMarkLogic Processor as follows:

PutMarkLogic processor properties
Figure 10: PutMarkLogic Processor Properties

Create the "DatabaseClient Service" property value similar to the "Database Connection Pooling Service". Use the following screenshot as reference:

PutMarkLogic DatabaseClient Service
Figure 11: PutMarkLogic Processor’s DatabaseClient Service properties

The port for the DatabaseClient Service needs to support the MarkLogic REST API. You can also adjust the "Database" value if you want the documents written to a specific database and not the configured content database for specified port.

You can also configure the PutMarkLogic Processor’s "Permissions" property, as needed by your security model.

Note that the “URI attribute name” property value is the extracted property emp_id from the earlier EvaluateJsonPath Processor.

Under SETTINGS, we auto-terminate on both success and failure.

Wiring them all together

We now have all the processors in place. It’s time to wire them all together. Your NiFi dataflow canvas should look similar to this:

NiFi canvas with processors only
Figure 12: Dataflow canvas with only Processors

Hovering the mouse over the ExecuteSQL Processor will show the "connection" icon () to appear. Click and drag this icon towards the ConvertAvroToJSON Processor and drop it there to make a connection:

Connecting processors on the canvas

If the connection is successfully created, a connection configuration screen will display. Configure the relationship for success:

Configuring the relationship for success

We repeat the same process for the rest of our Processors:

  • Drag the connector from ConvertAvroToJSON to SplitJson processor to establish a connection on success.
  • Drag the connector from SplitJson to EvaluateJsonPath processor to establish a connection on split.
    Configuring the relationship for split
  • Drag the connector from EvaluateJsonPath to PutMarkLogic processor to establish a connection on matched.
    Configuring the relationship for match

You should now have a flow similar to this:

NiFi canvas with processors and connectors
Figure 13: Dataflow canvas with Processors and connections

Run it!

Right click on a blank area to display the menu and click “Start”:

Start NiFi flow

Note: All processors would be non-modifiable while they are running. If there is any processor you need to re-configure, you would need to "Stop" that processor.

Use MarkLogic’s Query Console (http://localhost:8000) to confirm that the relational rows were converted to documents:

Query Console to confirm data movement

With content of:

Query Console to confirm data contents

Congratulations! Now you have your data migrated from your relational database into MarkLogic.

Bring DHF into the Mix!

A detailed guide on getting an Operational Data Hub is available to help you get started, along with the Data Hub Framework QuickStart tutorial, which will walk you through getting your entities and input flows created.

To incorporate your existing flows, let’s adjust a few things. First, we either create a new MarkLogic Database Client Service, or adjust the service that is currently being used by our PutMarkLogic Processor as follows. Note that you must disable the service to make any changes.

DatabaseClient Service properties for DHF
Figure 14: PutMarkLogic Processor’s DatabaseClient Service properties, updated for DHF

Note that the port now matches the staging datahub configuration. 8010 is the default port used in the MarkLogic Data Hub QuickStart tutorial.

Modify our PutMarkLogic Processor as follows. Note that you must stop the Processor prior to making any changes:

PutMarkLogic properties for DHF
Figure 15: PutMarkLogic Processor Properties, updated for DHF

The data hub documentation on ingesting with the REST API will help determine which transform module to use given your flow implementation.

If you run the new flow into your MarkLogic data hub, you can check the staging database using the DHF QuickStart UI to confirm:

DHF QuickStart to confirm data migration

With content of:

DHF QuickStart to confirm data contents

Congratulations! Not only have you migrated data from your relational database into MarkLogic, you even get the opportunity to harmonize your data before downstream systems can access it!

Saving and Reusing a NiFi Template

Wiring the processors took no time at all to implement, but doing it over and over again can become tedious and can easily introduce mistakes during execution. In addition, our data sources may not limited to just relational sources (e.g. REST calls, twitter feeds, CSVs). To "save" our work, we can select components and save them as a NiFi template that we can reuse later.

Select the components on the canvas you want to save as a template, including the connectors. Notice the darker blue borders.

Select components for template
Figure 16: Selecting processors and connectors to save as template

Click on the "Create Template" icon () on the operate panel that appears on the left-hand side.

Operate panel

Provide a name and description for the template and click CREATE.

Create template

If successful, you will see a prompt like this:

Create template confirmation

We can now make use of the template icon () on the top. Click and drag it down to the canvas to get the following prompt:

Add template prompt

If you expand the drop down, you can hover over the question mark icon to show the description of the template. This is a good reason to be concise but descriptive when creating your templates.

Template description

When you add the template to your existing canvas, the new processors may overlap on top of existing processors. Rearrange them as needed (i.e. side by side).

After adding this template, you may notice that the PutMarkLogic and SplitJson processors have yellow triangle alert icons because sources have not yet been specified. Note that the template creates a new MarkLogic DatabaseClient Service for the new processor instance created by the template. Double click on the processor to configure and click on the "Go To" icon (green arrow) of the DatabaseClient Service property. Notice that we now have an extra instance of the DefaultMarkLogicDatabaseClientService that is currently disabled. We have two options at this point:

  1. Delete this instance and select the existing and enabled instance. This is recommended if we will be using the same MarkLogic instance with the same credentials and transforms.
  2. Rename and enable this new instance. This is recommended if we will be using a different instance and/or if we are to use a different transform (e.g. a different ingest flow, if we are using DHF).

If we choose Option 1, then configure the new instance of the PutMarkLogic processor to use the original instance of DefaultMarkLogicDatabaseClientService.

More information about templates are available in the Apache NiFi User Guide.

Additional Reading

Stack Overflow iconStack Overflow: Get the most useful answers to questions from the MarkLogic community, or ask your own question.

Comments

The commenting feature on this page is enabled by a third party. Comments posted to this page are publicly visible.