diff options
Diffstat (limited to 'java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java')
-rw-r--r-- | java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java | 48 |
1 files changed, 42 insertions, 6 deletions
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java index 761ec1b050..0084f27717 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java @@ -29,7 +29,7 @@ import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQRequestBody; import org.apache.qpid.framing.AMQResponseBody; import org.apache.qpid.framing.RequestResponseMappingException; -import org.apache.qpid.nclient.model.AMQPMethodEvent; +import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; public class RequestManager { @@ -56,7 +56,7 @@ public class RequestManager */ private long lastProcessedResponseId; - private ConcurrentHashMap<Long, Long> requestSentMap; + private ConcurrentHashMap<Long, CorrelationID> requestSentMap; public RequestManager(long connectionId, int channel, boolean serverFlag) { @@ -65,7 +65,7 @@ public class RequestManager this.connectionId = connectionId; requestIdCount = 1L; lastProcessedResponseId = 0L; - requestSentMap = new ConcurrentHashMap<Long, Long>(); + requestSentMap = new ConcurrentHashMap<Long, CorrelationID>(); } // *** Functions to originate a request *** @@ -80,7 +80,7 @@ public class RequestManager logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + "] TX REQ: Req[" + requestId + " " + lastProcessedResponseId + "]; " + evt.getMethod()); } - requestSentMap.put(requestId, evt.getCorrelationId()); + requestSentMap.put(requestId, new CorrelationID(evt.getCorrelationId(), evt.getLocalCorrelationId())); return requestFrame; } @@ -103,9 +103,9 @@ public class RequestManager throw new RequestResponseMappingException(requestId, "Failed to locate requestId " + requestId + " in requestSentMap."); } - long localCorrelationId = requestSentMap.get(requestId); + CorrelationID correlationID = requestSentMap.get(requestId); AMQPMethodEvent methodEvent = new AMQPMethodEvent(channel, responseBody.getMethodPayload(), - requestId,localCorrelationId); + correlationID.getSystemCorrelationID(),correlationID.getLocalCorrelationID()); events.add(methodEvent); requestSentMap.remove(requestId); } @@ -126,4 +126,40 @@ public class RequestManager { return requestIdCount++; } + + private class CorrelationID + { + // Use for the request/response stuff + private long _systemCorrelationID; + // used internally to track callbacks + private long _localCorrelationID; + + CorrelationID(long systemCorrelationID,long localCorrelationID) + { + _localCorrelationID = localCorrelationID; + _systemCorrelationID = systemCorrelationID; + } + + public long getLocalCorrelationID() + { + return _localCorrelationID; + } + + public void setLocalCorrelationID(long correlationID) + { + _localCorrelationID = correlationID; + } + + public long getSystemCorrelationID() + { + return _systemCorrelationID; + } + + public void setSystemCorrelationID(long correlationID) + { + _systemCorrelationID = correlationID; + } + + + } } |