summaryrefslogtreecommitdiff
path: root/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java')
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java64
1 files changed, 64 insertions, 0 deletions
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java
new file mode 100644
index 0000000000..890b0dd6eb
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java
@@ -0,0 +1,64 @@
+package org.apache.qpid.nclient.amqp;
+
+import java.security.SecureRandom;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.model.AMQPMethodEvent;
+
+public abstract class AMQPCallBackSupport
+{
+ private SecureRandom _localCorrelationIdGenerator = new SecureRandom();
+ protected ConcurrentHashMap<Long,AMQPCallBack> _cbMap = new ConcurrentHashMap<Long,AMQPCallBack>();
+
+ //the channelId assigned for this instance
+ protected int _channelId;
+
+ public AMQPCallBackSupport(int channelId)
+ {
+ _channelId = channelId;
+ }
+
+ private long getNextCorrelationId()
+ {
+ return _localCorrelationIdGenerator.nextLong();
+ }
+
+
+ // For methods that still use nowait, hopefully they will remove nowait
+ protected AMQPMethodEvent handleNoWait(boolean noWait,AMQMethodBody methodBody,AMQPCallBack cb)
+ {
+ if(noWait)
+ {
+ // u only need to register if u are expecting a response
+ long localCorrelationId = getNextCorrelationId();
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID,localCorrelationId);
+ _cbMap.put(localCorrelationId, cb);
+ return msg;
+ }
+ else
+ {
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID);
+ return msg;
+ }
+ }
+
+ protected AMQPMethodEvent handleAsynchronousCall(AMQMethodBody methodBody,AMQPCallBack cb)
+ {
+ long localCorrelationId = getNextCorrelationId();
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID,localCorrelationId);
+ _cbMap.put(localCorrelationId, cb);
+ return msg;
+ }
+
+ protected void invokeCallBack(long localCorrelationId, AMQMethodBody methodBody)
+ {
+ if(_cbMap.contains(localCorrelationId))
+ {
+ AMQPCallBack cb = (AMQPCallBack)_cbMap.get(localCorrelationId);
+ cb.brokerResponded(methodBody);
+ }
+ }
+
+}