summaryrefslogtreecommitdiff
path: root/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java')
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java48
1 files changed, 42 insertions, 6 deletions
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java
index 761ec1b050..0084f27717 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java
@@ -29,7 +29,7 @@ import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQRequestBody;
import org.apache.qpid.framing.AMQResponseBody;
import org.apache.qpid.framing.RequestResponseMappingException;
-import org.apache.qpid.nclient.model.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
public class RequestManager
{
@@ -56,7 +56,7 @@ public class RequestManager
*/
private long lastProcessedResponseId;
- private ConcurrentHashMap<Long, Long> requestSentMap;
+ private ConcurrentHashMap<Long, CorrelationID> requestSentMap;
public RequestManager(long connectionId, int channel, boolean serverFlag)
{
@@ -65,7 +65,7 @@ public class RequestManager
this.connectionId = connectionId;
requestIdCount = 1L;
lastProcessedResponseId = 0L;
- requestSentMap = new ConcurrentHashMap<Long, Long>();
+ requestSentMap = new ConcurrentHashMap<Long, CorrelationID>();
}
// *** Functions to originate a request ***
@@ -80,7 +80,7 @@ public class RequestManager
logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel +
"] TX REQ: Req[" + requestId + " " + lastProcessedResponseId + "]; " + evt.getMethod());
}
- requestSentMap.put(requestId, evt.getCorrelationId());
+ requestSentMap.put(requestId, new CorrelationID(evt.getCorrelationId(), evt.getLocalCorrelationId()));
return requestFrame;
}
@@ -103,9 +103,9 @@ public class RequestManager
throw new RequestResponseMappingException(requestId,
"Failed to locate requestId " + requestId + " in requestSentMap.");
}
- long localCorrelationId = requestSentMap.get(requestId);
+ CorrelationID correlationID = requestSentMap.get(requestId);
AMQPMethodEvent methodEvent = new AMQPMethodEvent(channel, responseBody.getMethodPayload(),
- requestId,localCorrelationId);
+ correlationID.getSystemCorrelationID(),correlationID.getLocalCorrelationID());
events.add(methodEvent);
requestSentMap.remove(requestId);
}
@@ -126,4 +126,40 @@ public class RequestManager
{
return requestIdCount++;
}
+
+ private class CorrelationID
+ {
+ // Use for the request/response stuff
+ private long _systemCorrelationID;
+ // used internally to track callbacks
+ private long _localCorrelationID;
+
+ CorrelationID(long systemCorrelationID,long localCorrelationID)
+ {
+ _localCorrelationID = localCorrelationID;
+ _systemCorrelationID = systemCorrelationID;
+ }
+
+ public long getLocalCorrelationID()
+ {
+ return _localCorrelationID;
+ }
+
+ public void setLocalCorrelationID(long correlationID)
+ {
+ _localCorrelationID = correlationID;
+ }
+
+ public long getSystemCorrelationID()
+ {
+ return _systemCorrelationID;
+ }
+
+ public void setSystemCorrelationID(long correlationID)
+ {
+ _systemCorrelationID = correlationID;
+ }
+
+
+ }
}