diff options
Diffstat (limited to 'zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java')
-rw-r--r-- | zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java | 97 |
1 files changed, 36 insertions, 61 deletions
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java index 9733a48ac..35293359f 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java @@ -18,7 +18,6 @@ package org.apache.zookeeper.server; -import static java.nio.charset.StandardCharsets.UTF_8; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.StringReader; @@ -36,6 +35,7 @@ import java.util.concurrent.LinkedBlockingQueue; import org.apache.jute.BinaryOutputArchive; import org.apache.jute.Record; import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.DeleteContainerRequest; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.BadArgumentsException; import org.apache.zookeeper.KeeperException.Code; @@ -101,7 +101,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req */ private static boolean failCreate = false; - LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>(); + LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<>(); private final RequestProcessor nextProcessor; private final boolean digestEnabled; @@ -311,13 +311,8 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req /** * This method will be called inside the ProcessRequestThread, which is a * singleton, so there will be a single thread calling this code. - * - * @param type - * @param zxid - * @param request - * @param record */ - protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) throws KeeperException, IOException, RequestProcessorException { + protected void pRequest2Txn(int type, long zxid, Request request, Record record) throws KeeperException, IOException, RequestProcessorException { if (request.getHdr() == null) { request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), type)); @@ -328,11 +323,12 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req case OpCode.create2: case OpCode.createTTL: case OpCode.createContainer: { - pRequest2TxnCreate(type, request, record, deserialize); + pRequest2TxnCreate(type, request, record); break; } case OpCode.deleteContainer: { - String path = new String(request.readRequestBytes(), UTF_8); + DeleteContainerRequest txn = (DeleteContainerRequest) record; + String path = txn.getPath(); String parentPath = getParentPathAndValidate(path); ChangeRecord nodeRecord = getRecordForPath(path); if (nodeRecord.childCount > 0) { @@ -359,9 +355,6 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req case OpCode.delete: zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); DeleteRequest deleteRequest = (DeleteRequest) record; - if (deserialize) { - request.readRequestRecord(deleteRequest); - } String path = deleteRequest.getPath(); String parentPath = getParentPathAndValidate(path); ChangeRecord parentRecord = getRecordForPath(parentPath); @@ -387,9 +380,6 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req case OpCode.setData: zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); SetDataRequest setDataRequest = (SetDataRequest) record; - if (deserialize) { - request.readRequestRecord(setDataRequest); - } path = setDataRequest.getPath(); validatePath(path, request.sessionId); nodeRecord = getRecordForPath(path); @@ -559,9 +549,6 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req case OpCode.setACL: zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); SetACLRequest setAclRequest = (SetACLRequest) record; - if (deserialize) { - request.readRequestRecord(setAclRequest); - } path = setAclRequest.getPath(); validatePath(path, request.sessionId); List<ACL> listACL = fixupACL(path, request.authInfo, setAclRequest.getAcl()); @@ -577,8 +564,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req addChangeRecord(nodeRecord); break; case OpCode.createSession: - CreateSessionTxn createSessionTxn = new CreateSessionTxn(); - request.readRequestRecord(createSessionTxn); + CreateSessionTxn createSessionTxn = request.readRequestRecord(CreateSessionTxn::new); request.setTxn(createSessionTxn); // only add the global session tracker but not to ZKDb zks.sessionTracker.trackSession(request.sessionId, createSessionTxn.getTimeOut()); @@ -630,9 +616,6 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req case OpCode.check: zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); CheckVersionRequest checkVersionRequest = (CheckVersionRequest) record; - if (deserialize) { - request.readRequestRecord(checkVersionRequest); - } path = checkVersionRequest.getPath(); validatePath(path, request.sessionId); nodeRecord = getRecordForPath(path); @@ -653,11 +636,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req } } - private void pRequest2TxnCreate(int type, Request request, Record record, boolean deserialize) throws IOException, KeeperException { - if (deserialize) { - request.readRequestRecord(record); - } - + private void pRequest2TxnCreate(int type, Request request, Record record) throws IOException, KeeperException { int flags; String path; List<ACL> acl; @@ -792,39 +771,41 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req case OpCode.createContainer: case OpCode.create: case OpCode.create2: - CreateRequest create2Request = new CreateRequest(); - pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true); + CreateRequest create2Request = request.readRequestRecord(CreateRequest::new); + pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request); break; case OpCode.createTTL: - CreateTTLRequest createTtlRequest = new CreateTTLRequest(); - pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest, true); + CreateTTLRequest createTtlRequest = request.readRequestRecord(CreateTTLRequest::new); + pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest); break; case OpCode.deleteContainer: + DeleteContainerRequest deleteContainerRequest = request.readRequestRecord(DeleteContainerRequest::new); + pRequest2Txn(request.type, zks.getNextZxid(), request, deleteContainerRequest); + break; case OpCode.delete: - DeleteRequest deleteRequest = new DeleteRequest(); - pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true); + DeleteRequest deleteRequest = request.readRequestRecord(DeleteRequest::new); + pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest); break; case OpCode.setData: - SetDataRequest setDataRequest = new SetDataRequest(); - pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true); + SetDataRequest setDataRequest = request.readRequestRecord(SetDataRequest::new); + pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest); break; case OpCode.reconfig: - ReconfigRequest reconfigRequest = new ReconfigRequest(); - request.readRequestRecord(reconfigRequest); - pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest, true); + ReconfigRequest reconfigRequest = request.readRequestRecord(ReconfigRequest::new); + pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest); break; case OpCode.setACL: - SetACLRequest setAclRequest = new SetACLRequest(); - pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest, true); + SetACLRequest setAclRequest = request.readRequestRecord(SetACLRequest::new); + pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest); break; case OpCode.check: - CheckVersionRequest checkRequest = new CheckVersionRequest(); - pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest, true); + CheckVersionRequest checkRequest = request.readRequestRecord(CheckVersionRequest::new); + pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest); break; case OpCode.multi: - MultiOperationRecord multiRequest = new MultiOperationRecord(); + MultiOperationRecord multiRequest; try { - request.readRequestRecord(multiRequest); + multiRequest = request.readRequestRecord(MultiOperationRecord::new); } catch (IOException e) { request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(), Time.currentWallTime(), OpCode.multi)); throw e; @@ -854,7 +835,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req } else { /* Prep the request and convert to a Txn */ try { - pRequest2Txn(op.getType(), zxid, request, subrequest, false); + pRequest2Txn(op.getType(), zxid, request, subrequest); type = op.getType(); txn = request.getTxn(); } catch (KeeperException e) { @@ -899,7 +880,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req case OpCode.createSession: case OpCode.closeSession: if (!request.isLocalSession()) { - pRequest2Txn(request.type, zks.getNextZxid(), request, null, true); + pRequest2Txn(request.type, zks.getNextZxid(), request, null); } break; @@ -944,20 +925,14 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req // log at error level as we are returning a marshalling // error to the user LOG.error("Failed to process {}", request, e); - StringBuilder sb = new StringBuilder(); - byte[] payload = request.readRequestBytes(); - if (payload != null) { - for (byte b : payload) { - sb.append(String.format("%02x", (0xff & b))); - } - } else { - sb.append("request buffer is null"); - } - LOG.error("Dumping request buffer for request type {}: 0x{}", Request.op2String(request.type), sb); - if (request.getHdr() != null) { - request.getHdr().setType(OpCode.error); - request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue())); + String digest = request.requestDigest(); + LOG.error("Dumping request buffer for request type {}: 0x{}", Request.op2String(request.type), digest); + if (request.getHdr() == null) { + request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getZxid(), Time.currentWallTime(), request.type)); } + + request.getHdr().setType(OpCode.error); + request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue())); } } |