Class CommitProcessor
- java.lang.Object
- 
- java.lang.Thread
- 
- org.apache.zookeeper.server.ZooKeeperThread
- 
- org.apache.zookeeper.server.ZooKeeperCriticalThread
- 
- org.apache.zookeeper.server.quorum.CommitProcessor
 
 
 
 
- 
- All Implemented Interfaces:
- Runnable,- RequestProcessor
 
 public class CommitProcessor extends ZooKeeperCriticalThread implements RequestProcessor This RequestProcessor matches the incoming committed requests with the locally submitted requests. The trick is that locally submitted requests that change the state of the system will come back as incoming committed requests, so we need to match them up. Instead of just waiting for the committed requests, we process the uncommitted requests that belong to other sessions. The CommitProcessor is multi-threaded. Communication between threads is handled via queues, atomics, and wait/notifyAll synchronized on the processor. The CommitProcessor acts as a gateway for allowing requests to continue with the remainder of the processing pipeline. It will allow many read requests but only a single write request to be in flight simultaneously, thus ensuring that write requests are processed in transaction id order. - 1 commit processor main thread, which watches the request queues and assigns requests to worker threads based on their sessionId so that read and write requests for a particular session are always assigned to the same thread (and hence are guaranteed to run in order). - 0-N worker threads, which run the rest of the request processor pipeline on the requests. If configured with 0 worker threads, the primary commit processor thread runs the pipeline directly. Typical (default) thread counts are: on a 32 core machine, 1 commit processor thread and 32 worker threads. Multi-threading constraints: - Each session's requests must be processed in order. - Write requests must be processed in zxid order - Must ensure no race condition between writes in one session that would trigger a watch being set by a read request in another session The current implementation solves the third constraint by simply allowing no read requests to be processed in parallel with write requests.
- 
- 
Nested Class Summary- 
Nested classes/interfaces inherited from class java.lang.ThreadThread.State, Thread.UncaughtExceptionHandler
 - 
Nested classes/interfaces inherited from interface org.apache.zookeeper.server.RequestProcessorRequestProcessor.RequestProcessorException
 
- 
 - 
Field SummaryFields Modifier and Type Field Description protected LinkedBlockingQueue<Request>committedRequestsRequests that have been committed.protected AtomicIntegernumRequestsProcessingThe number of requests currently being processedprotected Map<Long,Deque<Request>>pendingRequestsRequests that we are holding until commit comes in.protected LinkedBlockingQueue<Request>queuedRequestsIncoming requests.protected LinkedBlockingQueue<Request>queuedWriteRequestsIncoming requests that are waiting on a commit, contained in order of arrivalprotected booleanstoppedprotected booleanstoppedMainLoopFor testing purposes, we use a separated stopping condition for the outer loop.protected WorkerServiceworkerPoolstatic StringZOOKEEPER_COMMIT_PROC_MAX_COMMIT_BATCH_SIZEDefault max commit batch size: 1static StringZOOKEEPER_COMMIT_PROC_MAX_READ_BATCH_SIZEDefault max read batch size: -1 to disable the featurestatic StringZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADSDefault: numCoresstatic StringZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUTDefault worker pool shutdown timeout in ms: 5000 (5s)- 
Fields inherited from class java.lang.ThreadMAX_PRIORITY, MIN_PRIORITY, NORM_PRIORITY
 
- 
 - 
Constructor SummaryConstructors Constructor Description CommitProcessor(RequestProcessor nextProcessor, String id, boolean matchSyncs, ZooKeeperServerListener listener)
 - 
Method SummaryAll Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description voidcommit(Request request)protected voidendOfIteration()static intgetMaxCommitBatchSize()static intgetMaxReadBatchSize()protected booleanneedCommit(Request request)voidprocessRequest(Request request)voidrun()static voidsetMaxCommitBatchSize(int size)static voidsetMaxReadBatchSize(int size)voidshutdown()voidstart()protected voidwaitForEmptyPool()- 
Methods inherited from class org.apache.zookeeper.server.ZooKeeperCriticalThreadhandleException
 - 
Methods inherited from class java.lang.ThreadactiveCount, checkAccess, clone, countStackFrames, currentThread, destroy, dumpStack, enumerate, getAllStackTraces, getContextClassLoader, getDefaultUncaughtExceptionHandler, getId, getName, getPriority, getStackTrace, getState, getThreadGroup, getUncaughtExceptionHandler, holdsLock, interrupt, interrupted, isAlive, isDaemon, isInterrupted, join, join, join, resume, setContextClassLoader, setDaemon, setDefaultUncaughtExceptionHandler, setName, setPriority, setUncaughtExceptionHandler, sleep, sleep, stop, stop, suspend, toString, yield
 
- 
 
- 
- 
- 
Field Detail- 
ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADSpublic static final String ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS Default: numCores- See Also:
- Constant Field Values
 
 - 
ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUTpublic static final String ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT Default worker pool shutdown timeout in ms: 5000 (5s)- See Also:
- Constant Field Values
 
 - 
ZOOKEEPER_COMMIT_PROC_MAX_READ_BATCH_SIZEpublic static final String ZOOKEEPER_COMMIT_PROC_MAX_READ_BATCH_SIZE Default max read batch size: -1 to disable the feature- See Also:
- Constant Field Values
 
 - 
ZOOKEEPER_COMMIT_PROC_MAX_COMMIT_BATCH_SIZEpublic static final String ZOOKEEPER_COMMIT_PROC_MAX_COMMIT_BATCH_SIZE Default max commit batch size: 1- See Also:
- Constant Field Values
 
 - 
queuedRequestsprotected LinkedBlockingQueue<Request> queuedRequests Incoming requests.
 - 
queuedWriteRequestsprotected final LinkedBlockingQueue<Request> queuedWriteRequests Incoming requests that are waiting on a commit, contained in order of arrival
 - 
committedRequestsprotected final LinkedBlockingQueue<Request> committedRequests Requests that have been committed.
 - 
pendingRequestsprotected final Map<Long,Deque<Request>> pendingRequests Requests that we are holding until commit comes in. Keys represent session ids, each value is a linked list of the session's requests.
 - 
numRequestsProcessingprotected final AtomicInteger numRequestsProcessing The number of requests currently being processed
 - 
stoppedMainLoopprotected volatile boolean stoppedMainLoop For testing purposes, we use a separated stopping condition for the outer loop.
 - 
stoppedprotected volatile boolean stopped 
 - 
workerPoolprotected WorkerService workerPool 
 
- 
 - 
Constructor Detail- 
CommitProcessorpublic CommitProcessor(RequestProcessor nextProcessor, String id, boolean matchSyncs, ZooKeeperServerListener listener) 
 
- 
 - 
Method Detail- 
needCommitprotected boolean needCommit(Request request) 
 - 
endOfIterationprotected void endOfIteration() 
 - 
waitForEmptyPoolprotected void waitForEmptyPool() throws InterruptedException- Throws:
- InterruptedException
 
 - 
getMaxReadBatchSizepublic static int getMaxReadBatchSize() 
 - 
getMaxCommitBatchSizepublic static int getMaxCommitBatchSize() 
 - 
setMaxReadBatchSizepublic static void setMaxReadBatchSize(int size) 
 - 
setMaxCommitBatchSizepublic static void setMaxCommitBatchSize(int size) 
 - 
commitpublic void commit(Request request) 
 - 
processRequestpublic void processRequest(Request request) - Specified by:
- processRequestin interface- RequestProcessor
 
 - 
shutdownpublic void shutdown() - Specified by:
- shutdownin interface- RequestProcessor
 
 
- 
 
-