public interface QueryBatcher extends Batcher
To facilitate long-running read, update, and delete use cases, coordinates
threads to process batches of uris matching a query or coming
from an Iterator. Each batch of uris matching a query will come from a
single forest. The host for that forest is the target of the DatabaseClient
provided to the listener's processEvent method. The query is performed
directly on each forest associated with the database for the DatabaseClient
provided to DataMovementManager. The end goal of each job is determined by
the listeners registered with onUrisReady. The data set from which batches
are made and on which processing is performed is determined by the
query
or
Iterator
used to
construct this instance.
ApplyTransformListener
,
DeleteListener
,
ExportListener
, and
ExportToWriterListener
. The provided
listeners are used by adding an instance via onUrisReady like so:
QueryBatcher qhb = dataMovementManager.newQueryBatcher(query)
.withConsistentSnapshot()
.onUrisReady( new DeleteListener() )
.onQueryFailure(exception -> exception.printStackTrace());
JobTicket ticket = dataMovementManager.startJob(qhb);
qhb.awaitCompletion();
dataMovementManager.stopJob(ticket);
Custom listeners will generally use the [MarkLogic Java Client API][] to manipulate the documents for the uris in each batch.
QueryBatcher is designed to be highly scalable and performant. To accommodate the largest result sets, QueryBatcher paginates through matches rather than loading matches into memory. To prevent queueing too many tasks when running a query, QueryBatcher only adds another task when one completes the query and is about to send the matching uris to the onUrisReady listeners.
For pagination to succeed, you must not modify the result set during pagination. This means you must
withConsistentSnapshot()
, or
Iterator
instead of a query
.
Sample usage using withConsistentSnapshot():
QueryDefinition query = new StructuredQueryBuilder().collection("myCollection");
QueryBatcher qhb = dataMovementManager.newQueryBatcher(query)
.withBatchSize(1000)
.withThreadCount(20)
.withConsistentSnapshot()
.onUrisReady(batch -> {
for ( String uri : batch.getItems() ) {
if ( uri.endsWith(".txt") ) {
client.newDocumentManager().delete(uri);
}
}
})
.onQueryFailure(exception -> exception.printStackTrace());
JobTicket ticket = dataMovementManager.startJob(qhb);
qhb.awaitCompletion();
dataMovementManager.stopJob(ticket);
Example of queueing uris in memory instead of using withConsistentSnapshot():
ArrayList<String> uris = Collections.synchronizedList(new ArrayList<>());
QueryBatcher getUris = dataMovementManager.newQueryBatcher(query)
.withBatchSize(5000)
.onUrisReady( batch -> uris.addAll(Arrays.asList(batch.getItems())) )
.onQueryFailure(exception -> exception.printStackTrace());
JobTicket getUrisTicket = dataMovementManager.startJob(getUris);
getUris.awaitCompletion();
dataMovementManager.stopJob(getUrisTicket);
// now we have the uris, let's step through them
QueryBatcher performDelete = moveMgr.newQueryBatcher(uris.iterator())
.onUrisReady(new DeleteListener())
.onQueryFailure(exception -> exception.printStackTrace());
JobTicket ticket = dataMovementManager.startJob(performDelete);
performDelete.awaitCompletion();
dataMovementManager.stopJob(ticket);
To queue uris to disk (if not enough memory is available) see UrisToWriterListener
.Modifier and Type | Method and Description |
---|---|
boolean |
awaitCompletion()
Blocks until the job is complete.
|
boolean |
awaitCompletion(long timeout,
java.util.concurrent.TimeUnit unit)
Blocks until the job is complete.
|
int |
getDefaultDocBatchSize()
Returns defaultDocBatchSize, which is calculated according to server status
|
int |
getDocToUriBatchRatio()
Returns docToUriBatchRatio set to the QueryBatcher
|
JobTicket |
getJobTicket()
After the job has been started, returns the JobTicket generated when the
job was started.
|
long |
getMaxBatches()
Returns the maximum number of Batches for the current job.
|
int |
getMaxDocToUriBatchRatio()
Returns maxDocToUriBatchRatio, which is calculated according to server status
|
int |
getMaxUriBatchSize()
Returns maxUriBatchSize, which is calculated according to server status
|
QueryFailureListener[] |
getQueryFailureListeners()
Get the array of QueryFailureListener instances
registered via onBatchFailure including the HostAvailabilityListener
registered by default.
|
QueryBatcherListener[] |
getQueryJobCompletionListeners()
Get the array of QueryBatcherListener instances registered via
onJobCompletion.
|
QueryBatchListener[] |
getQuerySuccessListeners()
Deprecated.
(as of 4.0.4) this should have been called getUrisReadyListeners
|
QueryBatchListener[] |
getUrisReadyListeners()
Get the array of QueryBatchListener instances registered via
onUrisReady.
|
boolean |
isStopped()
true if the job is terminated (last batch was finished or
DataMovementManager.stopJob was called),
false otherwise |
QueryBatcher |
onJobCompletion(QueryBatcherListener listener)
Add a listener to run when the Query job is completed i.e.
|
QueryBatcher |
onQueryFailure(QueryFailureListener listener)
Add a listener to run each time there is an exception retrieving a batch
of uris.
|
QueryBatcher |
onUrisReady(QueryBatchListener listener)
Add a listener to run each time a batch of uris is ready.
|
void |
retry(QueryEvent queryEvent)
Retry in the same thread to query a batch that failed.
|
void |
retryListener(QueryBatch batch,
QueryBatchListener queryBatchListener)
Retries processing the listener to the batch of URIs, when the batch has
been successfully retrieved from the server but applying the listener
on the batch failed.
|
void |
retryWithFailureListeners(QueryEvent queryEvent)
Retry in the same thread to query a batch that failed.
|
void |
setMaxBatches()
Caps the query at the current batch.
|
void |
setMaxBatches(long maxBatches)
Sets the limit for the maximum number of batches that can be collected.
|
void |
setQueryFailureListeners(QueryFailureListener... listeners)
Remove any existing QueryFailureListener instances
registered via onBatchFailure including the HostAvailabilityListener
registered by default and replace them with the provided listeners.
|
void |
setQueryJobCompletionListeners(QueryBatcherListener... listeners)
Remove any existing QueryBatcherListener instances registered via
onJobCompletion and replace them with the provided listeners.
|
void |
setUrisReadyListeners(QueryBatchListener... listeners)
Remove any existing QueryBatchListener instances registered
via onUrisReady and replace them with the provided listeners.
|
QueryBatcher |
withBatchSize(int docBatchSize)
Sets the number of documents processed in a batch.
|
QueryBatcher |
withBatchSize(int docBatchSize,
int docToUriBatchRatio)
Sets the number of documents processed in a batch and the ratio of the document processing batch to
the document uri collection batch.
|
QueryBatcher |
withConsistentSnapshot()
Specifies that matching uris should be retrieved as they were when this
QueryBatcher job started.
|
QueryBatcher |
withForestConfig(ForestConfiguration forestConfig)
If the server forest configuration changes mid-job, it can be re-fetched
with
DataMovementManager.readForestConfig() then set via
withForestConfig. |
QueryBatcher |
withJobId(java.lang.String jobId)
Sets the unique id of the job to help with managing multiple concurrent jobs and
start the job with the specified job id.
|
QueryBatcher |
withJobName(java.lang.String jobName)
Sets the job name.
|
QueryBatcher |
withThreadCount(int threadCount)
Sets the number of threads added to the internal thread pool for this
instance to use for retrieving or processing batches of uris.
|
getBatchSize, getForestConfig, getJobEndTime, getJobId, getJobName, getJobStartTime, getPrimaryClient, getThreadCount, isStarted
QueryBatcher onUrisReady(QueryBatchListener listener)
listener
- the action which has to be done when uris are readyQueryBatcher onQueryFailure(QueryFailureListener listener)
Add a listener to run each time there is an exception retrieving a batch of uris.
These listeners will not run when an exception is thrown by a listener registered with onUrisReady. To learn more, please see Handling Exceptions in Listeners
listener
- the code to run when a failure occursQueryBatcher onJobCompletion(QueryBatcherListener listener)
Add a listener to run when the Query job is completed i.e. when all the document URIs are retrieved and the associated listeners are completed
listener
- the code to run when the Query job is completedvoid retry(QueryEvent queryEvent)
queryEvent
- the information about the batch that failed@Deprecated QueryBatchListener[] getQuerySuccessListeners()
QueryBatchListener[] getUrisReadyListeners()
QueryBatcherListener[] getQueryJobCompletionListeners()
QueryFailureListener[] getQueryFailureListeners()
void setUrisReadyListeners(QueryBatchListener... listeners)
listeners
- the QueryBatchListener instances this
batcher should usevoid setQueryFailureListeners(QueryFailureListener... listeners)
listeners
- the QueryFailureListener instances this
batcher should usevoid setQueryJobCompletionListeners(QueryBatcherListener... listeners)
listeners
- the QueryBatcherListener instances this batcher should useQueryBatcher withConsistentSnapshot()
query
, not with
an Iterator
.
This is required when performing a delete of documents matching the query
or any modification (including ApplyTransformListener) of matching
documents which would cause them to no longer match the query (otherwise
pagination through the result set would fail because pages shift as
documents are deleted or modfied to no longer match the query).QueryBatcher withForestConfig(ForestConfiguration forestConfig)
DataMovementManager.readForestConfig()
then set via
withForestConfig.withForestConfig
in interface Batcher
forestConfig
- the updated ForestConfigurationQueryBatcher withJobName(java.lang.String jobName)
withJobName
in interface Batcher
jobName
- the name you would like to assign to this jobQueryBatcher withJobId(java.lang.String jobId)
QueryBatcher withBatchSize(int docBatchSize)
withBatchSize
in interface Batcher
docBatchSize
- the number of documents processed in a batchQueryBatcher withBatchSize(int docBatchSize, int docToUriBatchRatio)
docBatchSize
- the number of documents processed in a batchdocToUriBatchRatio
- the ratio of the document processing batch to the document uri collection batch. The
docToUriBatchRatio should ordinarily be larger than 1 because URIs are small relative to
full documents and because collecting URIs from indexes is ordinarily faster than
processing documents.int getDocToUriBatchRatio()
int getDefaultDocBatchSize()
int getMaxUriBatchSize()
int getMaxDocToUriBatchRatio()
QueryBatcher withThreadCount(int threadCount)
startJob
) is used to queue all
batches--so startJob will not return until all iteration is complete and
all batches are queued. For Iterators this thread count is the number of
threads used for processing the queued batches (running processEvent on
the listeners regiested with onUrisReady).withThreadCount
in interface Batcher
threadCount
- the number of threads to use in this Batcherboolean awaitCompletion()
boolean awaitCompletion(long timeout, java.util.concurrent.TimeUnit unit) throws java.lang.InterruptedException
timeout
- the maximum time to waitunit
- the time unit of the timeout argumentjava.lang.InterruptedException
- if interrupted while waitingboolean isStopped()
DataMovementManager.stopJob
was called),
false otherwiseisStopped
in interface Batcher
DataMovementManager.stopJob
was called), false otherwiseJobTicket getJobTicket()
getJobTicket
in interface Batcher
java.lang.IllegalStateException
- if this job has not yet been startedvoid retryListener(QueryBatch batch, QueryBatchListener queryBatchListener)
batch
- the QueryBatch for which we need to process the listenerqueryBatchListener
- the QueryBatchListener which needs to be appliedvoid retryWithFailureListeners(QueryEvent queryEvent)
queryEvent
- the information about the batch that failedvoid setMaxBatches(long maxBatches)
maxBatches
- is the value of the limit.void setMaxBatches()
long getMaxBatches()
Copyright © 2013-2021 MarkLogic Corporation.