summaryrefslogtreecommitdiff
path: root/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
diff options
context:
space:
mode:
Diffstat (limited to 'zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java')
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java97
1 files changed, 36 insertions, 61 deletions
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
index 9733a48ac..35293359f 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
@@ -18,7 +18,6 @@
package org.apache.zookeeper.server;
-import static java.nio.charset.StandardCharsets.UTF_8;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.StringReader;
@@ -36,6 +35,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import org.apache.jute.BinaryOutputArchive;
import org.apache.jute.Record;
import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.DeleteContainerRequest;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.BadArgumentsException;
import org.apache.zookeeper.KeeperException.Code;
@@ -101,7 +101,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
*/
private static boolean failCreate = false;
- LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();
+ LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<>();
private final RequestProcessor nextProcessor;
private final boolean digestEnabled;
@@ -311,13 +311,8 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
/**
* This method will be called inside the ProcessRequestThread, which is a
* singleton, so there will be a single thread calling this code.
- *
- * @param type
- * @param zxid
- * @param request
- * @param record
*/
- protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) throws KeeperException, IOException, RequestProcessorException {
+ protected void pRequest2Txn(int type, long zxid, Request request, Record record) throws KeeperException, IOException, RequestProcessorException {
if (request.getHdr() == null) {
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
Time.currentWallTime(), type));
@@ -328,11 +323,12 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
case OpCode.create2:
case OpCode.createTTL:
case OpCode.createContainer: {
- pRequest2TxnCreate(type, request, record, deserialize);
+ pRequest2TxnCreate(type, request, record);
break;
}
case OpCode.deleteContainer: {
- String path = new String(request.readRequestBytes(), UTF_8);
+ DeleteContainerRequest txn = (DeleteContainerRequest) record;
+ String path = txn.getPath();
String parentPath = getParentPathAndValidate(path);
ChangeRecord nodeRecord = getRecordForPath(path);
if (nodeRecord.childCount > 0) {
@@ -359,9 +355,6 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
case OpCode.delete:
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
DeleteRequest deleteRequest = (DeleteRequest) record;
- if (deserialize) {
- request.readRequestRecord(deleteRequest);
- }
String path = deleteRequest.getPath();
String parentPath = getParentPathAndValidate(path);
ChangeRecord parentRecord = getRecordForPath(parentPath);
@@ -387,9 +380,6 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
case OpCode.setData:
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
SetDataRequest setDataRequest = (SetDataRequest) record;
- if (deserialize) {
- request.readRequestRecord(setDataRequest);
- }
path = setDataRequest.getPath();
validatePath(path, request.sessionId);
nodeRecord = getRecordForPath(path);
@@ -559,9 +549,6 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
case OpCode.setACL:
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
SetACLRequest setAclRequest = (SetACLRequest) record;
- if (deserialize) {
- request.readRequestRecord(setAclRequest);
- }
path = setAclRequest.getPath();
validatePath(path, request.sessionId);
List<ACL> listACL = fixupACL(path, request.authInfo, setAclRequest.getAcl());
@@ -577,8 +564,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
addChangeRecord(nodeRecord);
break;
case OpCode.createSession:
- CreateSessionTxn createSessionTxn = new CreateSessionTxn();
- request.readRequestRecord(createSessionTxn);
+ CreateSessionTxn createSessionTxn = request.readRequestRecord(CreateSessionTxn::new);
request.setTxn(createSessionTxn);
// only add the global session tracker but not to ZKDb
zks.sessionTracker.trackSession(request.sessionId, createSessionTxn.getTimeOut());
@@ -630,9 +616,6 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
case OpCode.check:
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
CheckVersionRequest checkVersionRequest = (CheckVersionRequest) record;
- if (deserialize) {
- request.readRequestRecord(checkVersionRequest);
- }
path = checkVersionRequest.getPath();
validatePath(path, request.sessionId);
nodeRecord = getRecordForPath(path);
@@ -653,11 +636,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
}
}
- private void pRequest2TxnCreate(int type, Request request, Record record, boolean deserialize) throws IOException, KeeperException {
- if (deserialize) {
- request.readRequestRecord(record);
- }
-
+ private void pRequest2TxnCreate(int type, Request request, Record record) throws IOException, KeeperException {
int flags;
String path;
List<ACL> acl;
@@ -792,39 +771,41 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
case OpCode.createContainer:
case OpCode.create:
case OpCode.create2:
- CreateRequest create2Request = new CreateRequest();
- pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);
+ CreateRequest create2Request = request.readRequestRecord(CreateRequest::new);
+ pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request);
break;
case OpCode.createTTL:
- CreateTTLRequest createTtlRequest = new CreateTTLRequest();
- pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest, true);
+ CreateTTLRequest createTtlRequest = request.readRequestRecord(CreateTTLRequest::new);
+ pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest);
break;
case OpCode.deleteContainer:
+ DeleteContainerRequest deleteContainerRequest = request.readRequestRecord(DeleteContainerRequest::new);
+ pRequest2Txn(request.type, zks.getNextZxid(), request, deleteContainerRequest);
+ break;
case OpCode.delete:
- DeleteRequest deleteRequest = new DeleteRequest();
- pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true);
+ DeleteRequest deleteRequest = request.readRequestRecord(DeleteRequest::new);
+ pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest);
break;
case OpCode.setData:
- SetDataRequest setDataRequest = new SetDataRequest();
- pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true);
+ SetDataRequest setDataRequest = request.readRequestRecord(SetDataRequest::new);
+ pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest);
break;
case OpCode.reconfig:
- ReconfigRequest reconfigRequest = new ReconfigRequest();
- request.readRequestRecord(reconfigRequest);
- pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest, true);
+ ReconfigRequest reconfigRequest = request.readRequestRecord(ReconfigRequest::new);
+ pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest);
break;
case OpCode.setACL:
- SetACLRequest setAclRequest = new SetACLRequest();
- pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest, true);
+ SetACLRequest setAclRequest = request.readRequestRecord(SetACLRequest::new);
+ pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest);
break;
case OpCode.check:
- CheckVersionRequest checkRequest = new CheckVersionRequest();
- pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest, true);
+ CheckVersionRequest checkRequest = request.readRequestRecord(CheckVersionRequest::new);
+ pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest);
break;
case OpCode.multi:
- MultiOperationRecord multiRequest = new MultiOperationRecord();
+ MultiOperationRecord multiRequest;
try {
- request.readRequestRecord(multiRequest);
+ multiRequest = request.readRequestRecord(MultiOperationRecord::new);
} catch (IOException e) {
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(), Time.currentWallTime(), OpCode.multi));
throw e;
@@ -854,7 +835,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
} else {
/* Prep the request and convert to a Txn */
try {
- pRequest2Txn(op.getType(), zxid, request, subrequest, false);
+ pRequest2Txn(op.getType(), zxid, request, subrequest);
type = op.getType();
txn = request.getTxn();
} catch (KeeperException e) {
@@ -899,7 +880,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
case OpCode.createSession:
case OpCode.closeSession:
if (!request.isLocalSession()) {
- pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
+ pRequest2Txn(request.type, zks.getNextZxid(), request, null);
}
break;
@@ -944,20 +925,14 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
// log at error level as we are returning a marshalling
// error to the user
LOG.error("Failed to process {}", request, e);
- StringBuilder sb = new StringBuilder();
- byte[] payload = request.readRequestBytes();
- if (payload != null) {
- for (byte b : payload) {
- sb.append(String.format("%02x", (0xff & b)));
- }
- } else {
- sb.append("request buffer is null");
- }
- LOG.error("Dumping request buffer for request type {}: 0x{}", Request.op2String(request.type), sb);
- if (request.getHdr() != null) {
- request.getHdr().setType(OpCode.error);
- request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue()));
+ String digest = request.requestDigest();
+ LOG.error("Dumping request buffer for request type {}: 0x{}", Request.op2String(request.type), digest);
+ if (request.getHdr() == null) {
+ request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getZxid(), Time.currentWallTime(), request.type));
}
+
+ request.getHdr().setType(OpCode.error);
+ request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue()));
}
}