public interface WriteBatcher extends Batcher
To facilitate long-running write jobs, batches documents added by many
external threads and coordinates internal threads to send the batches
round-robin to all appropriate hosts in the cluster. Appropriate hosts are
those containing a forest associated with the database for the
DatabaseClient provided to DataMovementManager. Many external threads
(threads not managed by WriteBatcher) can concurrently add documents by
calling WriteBatcher add
or addAs
. Each
time enough documents are added to make a batch, the batch is added to an
internal queue where the first available internal thread will pick it up and
write it to the server. Since batches are not written until they are full,
you should always call flushAsync()
or flushAndWait()
when no
more documents will be written to ensure that any partial batch is written.
WriteBatcher whb = dataMovementManager.newWriteBatcher()
.withBatchSize(100)
.withThreadCount(20)
.onBatchSuccess(batch -> {
logger.debug("batch # {}, so far: {}", batch.getJobBatchNumber(), batch.getJobWritesSoFar());
})
.onBatchFailure((batch,throwable) -> throwable.printStackTrace() );
JobTicket ticket = dataMovementManager.startJob(whb);
whb.add ("doc1.txt", new StringHandle("doc1 contents"));
whb.addAs("doc2.txt", "doc2 contents");
whb.flushAndWait(); // send the two docs even though they're not a full batch
dataMovementManager.stopJob(ticket);
Note: All Closeable content or metadata handles passed to add
methods will be closed as soon as possible (after the batch is written).
This is to avoid IO resource leakage. This differs from the normal usage of
the Java Client API because WriteBatcher is asynchronous so there's no
easy way to know which handles have finished writing and can therefore be
closed. So to save confusion we close all handles for you. If you have a
resource that must be closed after a batch is written, but is not closed by
your handle, override the close method of any Closeable handle and close
your resource there.
Modifier and Type | Method and Description |
---|---|
WriteBatcher |
add(DocumentWriteOperation writeOperation)
Add a document, by passing in a
DocumentWriteOperation ,
to be batched and then written to the server when a batch is full
or flushAsync() or flushAndWait() is called. |
WriteBatcher |
add(java.lang.String uri,
AbstractWriteHandle contentHandle)
Add a document to be batched then written to the server when a batch is full
or
flushAsync() or flushAndWait() is called. |
WriteBatcher |
add(java.lang.String uri,
DocumentMetadataWriteHandle metadataHandle,
AbstractWriteHandle contentHandle)
Add a document to be batched then written to the server when a batch is full
or
flushAsync() or flushAndWait() is called. |
WriteBatcher |
add(WriteEvent... docs)
Add docs in the form of WriteEvents.
|
void |
addAll(java.util.stream.Stream<? extends DocumentWriteOperation> operations)
Writes a document stream to the database.
|
WriteBatcher |
addAs(java.lang.String uri,
DocumentMetadataWriteHandle metadataHandle,
java.lang.Object content)
Add a document to be batched then written to the server when a batch is full
or
flushAsync() or flushAndWait() is called. |
WriteBatcher |
addAs(java.lang.String uri,
java.lang.Object content)
Add a document to be batched then written to the server when a batch is full
or
flushAsync() or flushAndWait() is called. |
boolean |
awaitCompletion()
Blocks until the job has finished or cancelled all queued tasks.
|
boolean |
awaitCompletion(long timeout,
java.util.concurrent.TimeUnit unit)
Blocks until the job has finished or cancelled all queued tasks.
|
void |
flushAndWait()
Create a batch from any unbatched documents and write that batch, then
wait for all batches to complete (the same as awaitCompletion().
|
void |
flushAsync()
Create a batch from any unbatched documents and write that batch
asynchronously.
|
WriteFailureListener[] |
getBatchFailureListeners()
Get the array of WriteFailureListener instances
registered via onBatchFailure including the HostAvailabilityListener
registered by default.
|
WriteBatchListener[] |
getBatchSuccessListeners()
Get the array of WriteBatchListener instances registered via
onBatchSuccess.
|
DocumentMetadataHandle |
getDocumentMetadata() |
JobTicket |
getJobTicket()
After the job has been started, returns the JobTicket generated when the
job was started.
|
java.lang.String |
getTemporalCollection()
The temporal collection configured for temporal document inserts
|
ServerTransform |
getTransform() |
WriteBatcher |
onBatchFailure(WriteFailureListener listener)
Add a listener to run each time there is an exception writing a batch.
|
WriteBatcher |
onBatchSuccess(WriteBatchListener listener)
Add a listener to run each time a batch is successfully written.
|
void |
retry(WriteBatch queryEvent)
Retry in the same thread to send a batch that failed.
|
void |
retryWithFailureListeners(WriteBatch writeBatch)
Retry in the same thread to send a batch that failed.
|
void |
setBatchFailureListeners(WriteFailureListener... listeners)
Remove any existing WriteFailureListener instances
registered via onBatchFailure including the HostAvailabilityListener
registered by default and replace them with the provided listeners.
|
void |
setBatchSuccessListeners(WriteBatchListener... listeners)
Remove any existing WriteBatchListener instances registered
via onBatchSuccess and replace them with the provided listeners.
|
WriteBatcher |
withBatchSize(int batchSize)
Sets the number of documents to send per batch.
|
WriteBatcher |
withDefaultMetadata(DocumentMetadataHandle handle)
Sets the DocumentMetadataHandle for write operations.
|
WriteBatcher |
withForestConfig(ForestConfiguration forestConfig)
If the server forest configuration changes mid-job, it can be re-fetched
with
DataMovementManager.readForestConfig() then set via
withForestConfig. |
WriteBatcher |
withJobId(java.lang.String jobId)
Sets the unique id of the job to help with managing multiple concurrent jobs and
start the job with the specified job id.
|
WriteBatcher |
withJobName(java.lang.String jobName)
Sets the job name.
|
WriteBatcher |
withTemporalCollection(java.lang.String collection)
The temporal collection to use for a temporal document insert
|
WriteBatcher |
withThreadCount(int threadCount)
Sets the number of threads added to the internal thread pool for this
instance to use for writing or reporting on batches of uris.
|
WriteBatcher |
withTransform(ServerTransform transform)
The ServerTransform to modify each document from each batch before it is
written to the database.
|
getBatchSize, getForestConfig, getJobEndTime, getJobId, getJobName, getJobStartTime, getPrimaryClient, getThreadCount, isStarted, isStopped
WriteBatcher withDefaultMetadata(DocumentMetadataHandle handle)
handle
- the passed in DocumentMetadataHandlevoid addAll(java.util.stream.Stream<? extends DocumentWriteOperation> operations)
operations
- is the DocumentWriteOperation stream passed in.DocumentMetadataHandle getDocumentMetadata()
WriteBatcher add(java.lang.String uri, AbstractWriteHandle contentHandle)
Add a document to be batched then written to the server when a batch is full
or flushAsync()
or flushAndWait()
is called.
uri
- the document uricontentHandle
- the document contentsWriteBatcher addAs(java.lang.String uri, java.lang.Object content)
Add a document to be batched then written to the server when a batch is full
or flushAsync()
or flushAndWait()
is called.
uri
- the document uricontent
- the document contentsWriteBatcher add(java.lang.String uri, DocumentMetadataWriteHandle metadataHandle, AbstractWriteHandle contentHandle)
Add a document to be batched then written to the server when a batch is full
or flushAsync()
or flushAndWait()
is called.
uri
- the document urimetadataHandle
- the metadata (collection, permissions, metdata values, properties, quality)contentHandle
- the document contentsWriteBatcher addAs(java.lang.String uri, DocumentMetadataWriteHandle metadataHandle, java.lang.Object content)
Add a document to be batched then written to the server when a batch is full
or flushAsync()
or flushAndWait()
is called.
uri
- the document urimetadataHandle
- the metadata (collection, permissions, metdata values, properties, quality)content
- the document contentsWriteBatcher add(WriteEvent... docs)
docs
- the batch of WriteEvents where each WriteEvent represents one documentWriteBatcher add(DocumentWriteOperation writeOperation)
Add a document, by passing in a
DocumentWriteOperation
,
to be batched and then written to the server when a batch is full
or flushAsync()
or flushAndWait()
is called.
writeOperation
- the DocumentWriteOperation object containing
the document's details to be written to the serverWriteBatcher onBatchSuccess(WriteBatchListener listener)
listener
- the action which has to be done when the batch gets written
successfullyWriteBatcher onBatchFailure(WriteFailureListener listener)
Add a listener to run each time there is an exception writing a batch.
These listeners will not run when an exception is thrown by a listener registered with onBatchSuccess. To learn more, please see Handling Exceptions in Listeners
listener
- the code to run when a failure occursvoid retry(WriteBatch queryEvent)
queryEvent
- the information about the batch that failedWriteBatchListener[] getBatchSuccessListeners()
WriteFailureListener[] getBatchFailureListeners()
void setBatchSuccessListeners(WriteBatchListener... listeners)
listeners
- the WriteBatchListener instances this
batcher should usevoid setBatchFailureListeners(WriteFailureListener... listeners)
listeners
- the WriteFailureListener instances this
batcher should useWriteBatcher withTemporalCollection(java.lang.String collection)
collection
- The temporal collection to use for a temporal document insertjava.lang.String getTemporalCollection()
WriteBatcher withTransform(ServerTransform transform)
transform
- The ServerTransform to run on each document from each batch.ServerTransform getTransform()
WriteBatcher withForestConfig(ForestConfiguration forestConfig)
DataMovementManager.readForestConfig()
then set via
withForestConfig.withForestConfig
in interface Batcher
forestConfig
- the updated ForestConfigurationWriteBatcher withJobName(java.lang.String jobName)
withJobName
in interface Batcher
jobName
- the name you would like to assign to this jobWriteBatcher withJobId(java.lang.String jobId)
WriteBatcher withBatchSize(int batchSize)
withBatchSize
in interface Batcher
batchSize
- the batch size -- must be 1 or greaterWriteBatcher withThreadCount(int threadCount)
withThreadCount
in interface Batcher
threadCount
- the number of threads to use in this Batchervoid flushAsync()
void flushAndWait()
boolean awaitCompletion()
boolean awaitCompletion(long timeout, java.util.concurrent.TimeUnit unit) throws java.lang.InterruptedException
timeout
- the maximum time to waitunit
- the time unit of the timeout argumentjava.lang.InterruptedException
- if interrupted while waitingJobTicket getJobTicket()
getJobTicket
in interface Batcher
java.lang.IllegalStateException
- if this job has not yet been startedvoid retryWithFailureListeners(WriteBatch writeBatch)
Retry in the same thread to send a batch that failed. If it fails again, all the failure listeners associated with the batcher using onBatchFailure method would be processed.
Note : Use this method with caution as there is a possibility of infinite loops. If a batch fails and one of the failure listeners calls this method to retry with failure listeners and if the batch again fails, this would go on as an infinite loop until the batch succeeds.
writeBatch
- the information about the batch that failedCopyright © 2013-2018 MarkLogic Corporation.