summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java30
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/MessageAppendHandler.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/MessageCheckpointHandler.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/MessageCloseHandler.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/MessageEmptyHandler.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/MessageOffsetHandler.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/MessageOkHandler.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/MessageOpenHandler.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/MessageRejectHandler.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/MessageResumeHandler.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java1
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java202
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java35
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java11
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java11
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java7
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java9
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java15
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java13
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java50
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/MessageCheckpointMethodHandler.java50
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java (renamed from java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java)32
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/MessageEmptyMethodHandler.java50
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/MessageOffsetMethodHandler.java50
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/MessageOkMethodHandler.java50
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java (renamed from java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java)36
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/MessageRejectMethodHandler.java50
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/MessageResumeMethodHandler.java50
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java50
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodEvent.java62
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodListener.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java20
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java6
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java6
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java2
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java2
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java2
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java2
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java2
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java2
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java2
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java2
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java2
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java2
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java2
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java2
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java2
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java2
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java32
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java14
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/RequestHandler.java26
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/RequestManager.java34
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java103
-rw-r--r--java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java (renamed from java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodEvent.java)2
91 files changed, 888 insertions, 432 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 51be0d867b..7bdb5e1ecd 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -23,10 +23,14 @@ package org.apache.qpid.server;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
+//import org.apache.qpid.framing.BasicPublishBody;
+//import org.apache.qpid.framing.ContentBody;
+//import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.Content;
+import org.apache.qpid.framing.RequestManager;
+import org.apache.qpid.framing.ResponseManager;
+import org.apache.qpid.protocol.AMQProtocolWriter;
import org.apache.qpid.server.ack.TxAck;
import org.apache.qpid.server.ack.UnacknowledgedMessage;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
@@ -64,6 +68,9 @@ public class AMQChannel
private long _prefetch_HighWaterMark;
private long _prefetch_LowWaterMark;
+
+ private RequestManager _requestManager;
+ private ResponseManager _responseManager;
/**
* The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that
@@ -114,7 +121,7 @@ public class AMQChannel
private final List<AMQDataBlock> _returns = new LinkedList<AMQDataBlock>();
private Set<Long> _browsedAcks = new HashSet<Long>();
- public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges)
+ public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges, AMQProtocolWriter protocolSession)
throws AMQException
{
_channelId = channelId;
@@ -122,6 +129,8 @@ public class AMQChannel
_prefetch_LowWaterMark = _prefetch_HighWaterMark / 2;
_messageStore = messageStore;
_exchanges = exchanges;
+ _requestManager = new RequestManager(channelId, protocolSession);
+ _responseManager = new ResponseManager(channelId, protocolSession);
_txnBuffer = new TxnBuffer(_messageStore);
}
@@ -170,11 +179,11 @@ public class AMQChannel
_prefetch_HighWaterMark = prefetchCount;
}
- public void setPublishFrame(BasicPublishBody publishBody, AMQProtocolSession publisher) throws AMQException
- {
- _currentMessage = new AMQMessage(_messageStore, publishBody);
- _currentMessage.setPublisher(publisher);
- }
+// public void setPublishFrame(BasicPublishBody publishBody, AMQProtocolSession publisher) throws AMQException
+// {
+// _currentMessage = new AMQMessage(_messageStore, publishBody);
+// _currentMessage.setPublisher(publisher);
+// }
// public void publishContentHeader(ContentHeaderBody contentHeaderBody)
// throws AMQException
@@ -269,6 +278,9 @@ public class AMQChannel
}
}
}
+
+ public RequestManager getRequestManager() { return _requestManager; }
+ public ResponseManager getResponseManager() { return _responseManager; }
public long getNextDeliveryTag()
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
index 2f1d6ada00..7461f93539 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
@@ -25,8 +25,8 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java
index fd6714de3a..32622f8fed 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java
@@ -22,8 +22,8 @@ package org.apache.qpid.server.handler;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
index 8f91d4ecf4..07ab0537d5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
@@ -21,12 +21,12 @@
package org.apache.qpid.server.handler;
import org.apache.log4j.Logger;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ChannelFlowBody;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
index ee2ff9e5bb..459ccf40a8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
@@ -24,11 +24,11 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ChannelOpenBody;
import org.apache.qpid.framing.ChannelOpenOkBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
index 94e54e8f1d..6e22d67b72 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
@@ -23,12 +23,12 @@ package org.apache.qpid.server.handler;
import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ConnectionCloseOkBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
import org.apache.qpid.AMQException;
import org.apache.log4j.Logger;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java
index cc9277593b..69d50c9237 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java
@@ -22,8 +22,8 @@ package org.apache.qpid.server.handler;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ConnectionCloseOkBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
index f1487b2844..c3b6560ee4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
@@ -24,8 +24,8 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ConnectionOpenBody;
import org.apache.qpid.framing.ConnectionOpenOkBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQState;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
index 42dda66eab..9aea4a7b26 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
@@ -23,8 +23,8 @@ package org.apache.qpid.server.handler;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.framing.*;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.protocol.HeartbeatConfig;
import org.apache.qpid.server.queue.QueueRegistry;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
index b2ddf6d0db..77fddf1ff5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
@@ -27,8 +27,8 @@ import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ConnectionSecureBody;
import org.apache.qpid.framing.ConnectionStartOkBody;
import org.apache.qpid.framing.ConnectionTuneBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.protocol.HeartbeatConfig;
import org.apache.qpid.server.queue.QueueRegistry;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java
index f0b4e0a515..de38cff3e2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java
@@ -23,8 +23,8 @@ package org.apache.qpid.server.handler;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ConnectionTuneOkBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQState;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
index 87e20faa81..67f77c72ef 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
@@ -21,9 +21,9 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ExchangeBoundBody;
import org.apache.qpid.framing.ExchangeBoundOkBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
index efcfffeaff..cdb3a503ae 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
@@ -25,10 +25,10 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ExchangeDeclareBody;
import org.apache.qpid.framing.ExchangeDeclareOkBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
index 1c72f75dd7..79b4e07c90 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
@@ -24,9 +24,9 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ExchangeDeleteBody;
import org.apache.qpid.framing.ExchangeDeleteOkBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.exchange.ExchangeInUseException;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageAppendHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageAppendHandler.java
index 91b5493f32..962bd2004f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageAppendHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageAppendHandler.java
@@ -20,7 +20,13 @@
*/
package org.apache.qpid.server.handler;
+import org.apache.qpid.AMQException;
import org.apache.qpid.framing.MessageAppendBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
public class MessageAppendHandler implements StateAwareMethodListener<MessageAppendBody>
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java
index a84cbfb88e..ad41910141 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java
@@ -20,7 +20,13 @@
*/
package org.apache.qpid.server.handler;
+import org.apache.qpid.AMQException;
import org.apache.qpid.framing.MessageCancelBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
public class MessageCancelHandler implements StateAwareMethodListener<MessageCancelBody>
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCheckpointHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCheckpointHandler.java
index 9dd9a6b18e..2123f9203c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCheckpointHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCheckpointHandler.java
@@ -20,7 +20,13 @@
*/
package org.apache.qpid.server.handler;
+import org.apache.qpid.AMQException;
import org.apache.qpid.framing.MessageCheckpointBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
public class MessageCheckpointHandler implements StateAwareMethodListener<MessageCheckpointBody>
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCloseHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCloseHandler.java
index 5e21c1ee6c..5efe24abfa 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCloseHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCloseHandler.java
@@ -20,7 +20,13 @@
*/
package org.apache.qpid.server.handler;
+import org.apache.qpid.AMQException;
import org.apache.qpid.framing.MessageCloseBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
public class MessageCloseHandler implements StateAwareMethodListener<MessageCloseBody>
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
index a2c5662703..c019a3b043 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
@@ -20,7 +20,13 @@
*/
package org.apache.qpid.server.handler;
+import org.apache.qpid.AMQException;
import org.apache.qpid.framing.MessageConsumeBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
public class MessageConsumeHandler implements StateAwareMethodListener<MessageConsumeBody>
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageEmptyHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageEmptyHandler.java
index 37d39a517a..3d5b59c238 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageEmptyHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageEmptyHandler.java
@@ -20,7 +20,13 @@
*/
package org.apache.qpid.server.handler;
+import org.apache.qpid.AMQException;
import org.apache.qpid.framing.MessageEmptyBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
public class MessageEmptyHandler implements StateAwareMethodListener<MessageEmptyBody>
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java
index 15d769e295..5ae0c677a4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java
@@ -20,7 +20,13 @@
*/
package org.apache.qpid.server.handler;
+import org.apache.qpid.AMQException;
import org.apache.qpid.framing.MessageGetBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
public class MessageGetHandler implements StateAwareMethodListener<MessageGetBody>
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOffsetHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOffsetHandler.java
index 7ac075c8d4..de6a886891 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOffsetHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOffsetHandler.java
@@ -20,7 +20,13 @@
*/
package org.apache.qpid.server.handler;
+import org.apache.qpid.AMQException;
import org.apache.qpid.framing.MessageOffsetBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
public class MessageOffsetHandler implements StateAwareMethodListener<MessageOffsetBody>
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOkHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOkHandler.java
index 4b2a1543cd..9fb38bf8b8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOkHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOkHandler.java
@@ -20,7 +20,13 @@
*/
package org.apache.qpid.server.handler;
+import org.apache.qpid.AMQException;
import org.apache.qpid.framing.MessageOkBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
public class MessageOkHandler implements StateAwareMethodListener<MessageOkBody>
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOpenHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOpenHandler.java
index 79679713ba..714227bb7c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOpenHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOpenHandler.java
@@ -20,7 +20,13 @@
*/
package org.apache.qpid.server.handler;
+import org.apache.qpid.AMQException;
import org.apache.qpid.framing.MessageOpenBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
public class MessageOpenHandler implements StateAwareMethodListener<MessageOpenBody>
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java
index baa01df602..6bea5553d9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java
@@ -20,7 +20,13 @@
*/
package org.apache.qpid.server.handler;
+import org.apache.qpid.AMQException;
import org.apache.qpid.framing.MessageQosBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
public class MessageQosHandler implements StateAwareMethodListener<MessageQosBody>
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java
index e178c60b27..6da0b6ffcc 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java
@@ -20,7 +20,13 @@
*/
package org.apache.qpid.server.handler;
+import org.apache.qpid.AMQException;
import org.apache.qpid.framing.MessageRecoverBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
public class MessageRecoverHandler implements StateAwareMethodListener<MessageRecoverBody>
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRejectHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRejectHandler.java
index 401b399fa0..e5a27c762a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRejectHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRejectHandler.java
@@ -20,7 +20,13 @@
*/
package org.apache.qpid.server.handler;
+import org.apache.qpid.AMQException;
import org.apache.qpid.framing.MessageRejectBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
public class MessageRejectHandler implements StateAwareMethodListener<MessageRejectBody>
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageResumeHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageResumeHandler.java
index 429514cc5b..49b5ff8e08 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageResumeHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageResumeHandler.java
@@ -20,7 +20,13 @@
*/
package org.apache.qpid.server.handler;
+import org.apache.qpid.AMQException;
import org.apache.qpid.framing.MessageResumeBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
public class MessageResumeHandler implements StateAwareMethodListener<MessageResumeBody>
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
index 18027fdc2b..4f6886e885 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
@@ -20,7 +20,13 @@
*/
package org.apache.qpid.server.handler;
+import org.apache.qpid.AMQException;
import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
public class MessageTransferHandler implements StateAwareMethodListener<MessageTransferBody>
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
index 5e8c8a31ca..915bfa67a6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
@@ -22,13 +22,13 @@ package org.apache.qpid.server.handler;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.QueueBindBody;
import org.apache.qpid.framing.QueueBindOkBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
index 0d2dc1550e..1a7b82829a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
@@ -26,9 +26,9 @@ import org.apache.qpid.configuration.Configured;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.QueueDeclareBody;
import org.apache.qpid.framing.QueueDeclareOkBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
index d49baa97a4..b867d80fdb 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
@@ -20,13 +20,13 @@
*/
package org.apache.qpid.server.handler;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.framing.QueueDeleteBody;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
index 74902c33f8..983e6f7e56 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
@@ -23,9 +23,9 @@ package org.apache.qpid.server.handler;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.TxCommitBody;
import org.apache.qpid.framing.TxCommitOkBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
index 32fb57db70..891dd69d4d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
@@ -23,8 +23,8 @@ package org.apache.qpid.server.handler;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.TxRollbackBody;
import org.apache.qpid.framing.TxRollbackOkBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
index bb0f89f163..0c2a6ca210 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
@@ -23,8 +23,8 @@ package org.apache.qpid.server.handler;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.TxSelectBody;
import org.apache.qpid.framing.TxSelectOkBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java
index d2062d3c17..6596da1f8f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.protocol;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.framing.AMQMethodBody;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 525978e348..558d5f4aa6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -30,14 +30,18 @@ import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ConnectionStartBody;
import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.Content;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.ProtocolVersionList;
import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.AMQRequestBody;
+import org.apache.qpid.framing.AMQResponseBody;
+//import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.HeartbeatBody;
-import org.apache.qpid.framing.ContentHeaderBody;
+//import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.codec.AMQDecoder;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
@@ -195,124 +199,130 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
AMQFrame frame = (AMQFrame) message;
- if (frame.bodyFrame instanceof AMQRequest)
+ if (frame.bodyFrame instanceof AMQRequestBody)
{
requestFrameReceived(frame);
}
- else if (frame.bodyFrame instanceof AMQResponse)
+ else if (frame.bodyFrame instanceof AMQResponseBody)
{
responseFrameReceived(frame);
}
- else if (frame.bodyFrame instanceof AMQMethodBody)
- {
- methodFrameReceived(frame);
- }
else
{
- try
- {
- contentFrameReceived(frame);
- }
- catch (RequiredDeliveryException e)
- {
- //need to return the message:
- _logger.info("Returning message to " + this + " channel " + frame.channel
- + ": " + e.getMessage());
- writeFrame(e.getReturnMessage(frame.channel));
- }
+ _logger.error("Received invalid frame: " + frame.toString());
}
+// else if (frame.bodyFrame instanceof AMQMethodBody)
+// {
+// methodFrameReceived(frame);
+// }
+// else
+// {
+// try
+// {
+// contentFrameReceived(frame);
+// }
+// catch (RequiredDeliveryException e)
+// {
+// //need to return the message:
+// _logger.info("Returning message to " + this + " channel " + frame.channel
+// + ": " + e.getMessage());
+// writeFrame(e.getReturnMessage(frame.channel));
+// }
+// }
}
}
- private void requestFrameReceived(AMQFrame frame)
+ private void requestFrameReceived(AMQFrame frame) throws AMQException
{
if (_logger.isDebugEnabled())
{
_logger.debug("Request frame received: " + frame);
}
+ AMQChannel channel = getChannel(frame.channel);
}
- private void responseFrameReceived(AMQFrame frame)
+ private void responseFrameReceived(AMQFrame frame) throws AMQException
{
if (_logger.isDebugEnabled())
{
_logger.debug("Response frame received: " + frame);
}
- }
-
- private void methodFrameReceived(AMQFrame frame)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Method frame received: " + frame);
- }
- final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.channel,
- (AMQMethodBody) frame.bodyFrame);
- try
- {
- boolean wasAnyoneInterested = false;
- for (AMQMethodListener listener : _frameListeners)
- {
- wasAnyoneInterested = listener.methodReceived(evt, this, _queueRegistry, _exchangeRegistry) ||
- wasAnyoneInterested;
- }
- if (!wasAnyoneInterested)
- {
- throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener.");
- }
- }
- catch (AMQChannelException e)
- {
- _logger.error("Closing channel due to: " + e.getMessage());
- writeFrame(e.getCloseFrame(frame.channel));
- }
- catch (AMQException e)
- {
- for (AMQMethodListener listener : _frameListeners)
- {
- listener.error(e);
- }
- _minaProtocolSession.close();
- }
- }
-
- private void contentFrameReceived(AMQFrame frame) throws AMQException
- {
- if (frame.bodyFrame instanceof ContentHeaderBody)
- {
- contentHeaderReceived(frame);
- }
- else if (frame.bodyFrame instanceof ContentBody)
- {
- contentBodyReceived(frame);
- }
- else if (frame.bodyFrame instanceof HeartbeatBody)
- {
- _logger.debug("Received heartbeat from client");
- }
- else
- {
- _logger.warn("Unrecognised frame " + frame.getClass().getName());
- }
- }
-
- private void contentHeaderReceived(AMQFrame frame) throws AMQException
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Content header frame received: " + frame);
- }
- getChannel(frame.channel).publishContentHeader((ContentHeaderBody) frame.bodyFrame);
- }
-
- private void contentBodyReceived(AMQFrame frame) throws AMQException
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Content body frame received: " + frame);
- }
- getChannel(frame.channel).publishContentBody((ContentBody) frame.bodyFrame);
- }
+ AMQChannel channel = getChannel(frame.channel);
+ }
+
+// private void methodFrameReceived(AMQFrame frame)
+// {
+// if (_logger.isDebugEnabled())
+// {
+// _logger.debug("Method frame received: " + frame);
+// }
+// final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.channel,
+// (AMQMethodBody) frame.bodyFrame);
+// try
+// {
+// boolean wasAnyoneInterested = false;
+// for (AMQMethodListener listener : _frameListeners)
+// {
+// wasAnyoneInterested = listener.methodReceived(evt, this, _queueRegistry, _exchangeRegistry) ||
+// wasAnyoneInterested;
+// }
+// if (!wasAnyoneInterested)
+// {
+// throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener.");
+// }
+// }
+// catch (AMQChannelException e)
+// {
+// _logger.error("Closing channel due to: " + e.getMessage());
+// writeFrame(e.getCloseFrame(frame.channel));
+// }
+// catch (AMQException e)
+// {
+// for (AMQMethodListener listener : _frameListeners)
+// {
+// listener.error(e);
+// }
+// _minaProtocolSession.close();
+// }
+// }
+
+// private void contentFrameReceived(AMQFrame frame) throws AMQException
+// {
+// if (frame.bodyFrame instanceof ContentHeaderBody)
+// {
+// contentHeaderReceived(frame);
+// }
+// else if (frame.bodyFrame instanceof ContentBody)
+// {
+// contentBodyReceived(frame);
+// }
+// else if (frame.bodyFrame instanceof HeartbeatBody)
+// {
+// _logger.debug("Received heartbeat from client");
+// }
+// else
+// {
+// _logger.warn("Unrecognised frame " + frame.getClass().getName());
+// }
+// }
+
+// private void contentHeaderReceived(AMQFrame frame) throws AMQException
+// {
+// if (_logger.isDebugEnabled())
+// {
+// _logger.debug("Content header frame received: " + frame);
+// }
+// getChannel(frame.channel).publishContentHeader((ContentHeaderBody) frame.bodyFrame);
+// }
+
+// private void contentBodyReceived(AMQFrame frame) throws AMQException
+// {
+// if (_logger.isDebugEnabled())
+// {
+// _logger.debug("Content body frame received: " + frame);
+// }
+// getChannel(frame.channel).publishContentBody((ContentBody) frame.bodyFrame);
+// }
/**
* Convenience method that writes a frame to the protocol session. Equivalent
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index b1ed999f72..00d22d839e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.AMQException;
@@ -38,14 +39,6 @@ public interface AMQProtocolSession extends AMQProtocolWriter
*/
void dataBlockReceived(AMQDataBlock message) throws Exception;
-// This is now a part of AMQProtocolWriter (inherited) to provide uniformity across both
-// client and server.
-// /**
-// * Write a datablock, encoding where necessary (e.g. into a sequence of bytes)
-// * @param frame the frame to be encoded and written
-// */
-// void writeFrame(AMQDataBlock frame);
-
/**
* Get the context key associated with this session. Context key is described
* in the AMQ protocol specification (RFC 6).
diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
index 509d7761dd..18b8041e7c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
@@ -22,9 +22,9 @@ package org.apache.qpid.server.state;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.*;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.handler.*;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQMethodListener;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java b/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java
index 7d58f242e5..56323258b7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java
@@ -21,7 +21,7 @@
package org.apache.qpid.server.state;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.exchange.ExchangeRegistry;
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 8cde5e557f..b1f19bc191 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -63,7 +63,6 @@ import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.JMSStreamMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.common.AMQPFilterTypes;
@@ -89,6 +88,7 @@ import org.apache.qpid.framing.TxCommitBody;
import org.apache.qpid.framing.TxCommitOkBody;
import org.apache.qpid.framing.TxRollbackBody;
import org.apache.qpid.framing.TxRollbackOkBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.handler.ExchangeBoundHandler;
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java
deleted file mode 100644
index d855e97204..0000000000
--- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package org.apache.qpid.client.handler;
-
-import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.client.state.AMQStateManager;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
-import org.apache.qpid.client.BasicMessageConsumer;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.ExchangeBoundOkBody;
-import org.apache.qpid.framing.BasicCancelOkBody;
-import org.apache.log4j.Logger;
-
-/**
- * @author Apache Software Foundation
- */
-public class BasicCancelOkMethodHandler implements StateAwareMethodListener
-{
- private static final Logger _logger = Logger.getLogger(BasicCancelOkMethodHandler.class);
- private static final BasicCancelOkMethodHandler _instance = new BasicCancelOkMethodHandler();
-
- public static BasicCancelOkMethodHandler getInstance()
- {
- return _instance;
- }
-
- private BasicCancelOkMethodHandler()
- {
- }
-
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
- {
- _logger.debug("New BasicCancelOk method received");
- BasicCancelOkBody body = (BasicCancelOkBody) evt.getMethod();
- evt.getProtocolSession().confirmConsumerCancelled(evt.getChannelId(), body.consumerTag);
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
index 5ad530a3ea..eb24f1fa74 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
@@ -26,13 +26,14 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidSelectorException;
import org.apache.qpid.client.AMQNoConsumersException;
import org.apache.qpid.client.AMQNoRouteException;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.AMQMethodEvent;
public class ChannelCloseMethodHandler implements StateAwareMethodListener
{
@@ -45,7 +46,7 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener
return _handler;
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
{
_logger.debug("ChannelClose method received");
ChannelCloseBody method = (ChannelCloseBody) evt.getMethod();
@@ -61,7 +62,7 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)0, (byte)9);
- evt.getProtocolSession().writeFrame(frame);
+ protocolSession.writeFrame(frame);
if (errorCode != AMQConstant.REPLY_SUCCESS.getCode())
{
_logger.error("Channel close received with errorCode " + errorCode + ", and reason " + reason);
@@ -85,6 +86,6 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener
}
}
- evt.getProtocolSession().channelClosed(evt.getChannelId(), errorCode, reason);
+ protocolSession.channelClosed(evt.getChannelId(), errorCode, reason);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java
index 383cebbcab..a99c963eb4 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java
@@ -21,7 +21,8 @@
package org.apache.qpid.client.handler;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.log4j.Logger;
@@ -37,7 +38,7 @@ public class ChannelCloseOkMethodHandler implements StateAwareMethodListener
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
{
_logger.info("Received channel-close-ok for channel-id " + evt.getChannelId());
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java
index 9a4f7af849..7a13972d8f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java
@@ -22,7 +22,8 @@ package org.apache.qpid.client.handler;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.ChannelFlowOkBody;
@@ -41,7 +42,7 @@ public class ChannelFlowOkMethodHandler implements StateAwareMethodListener
{
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
{
ChannelFlowOkBody method = (ChannelFlowOkBody) evt.getMethod();
_logger.debug("Received Channel.Flow-Ok message, active = " + method.active);
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
index 5cec420920..36e9c947f3 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
@@ -24,7 +24,8 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
@@ -47,7 +48,7 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener
{
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
{
_logger.info("ConnectionClose frame received");
ConnectionCloseBody method = (ConnectionCloseBody) evt.getMethod();
@@ -62,7 +63,7 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- evt.getProtocolSession().writeFrame(ConnectionCloseOkBody.createAMQFrame((short)0, (byte)0, (byte)9));
+ protocolSession.writeFrame(ConnectionCloseOkBody.createAMQFrame((short)0, (byte)0, (byte)9));
if (errorCode != 200)
{
@@ -70,7 +71,7 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener
{
_logger.info("Authentication Error:"+Thread.currentThread().getName());
- evt.getProtocolSession().closeProtocolSession();
+ protocolSession.closeProtocolSession();
//todo this is a bit of a fudge (could be conssidered such as each new connection needs a new state manager or at least a fresh state.
stateManager.changeState(AMQState.CONNECTION_NOT_STARTED);
@@ -88,7 +89,7 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener
// this actually closes the connection in the case where it is not an error.
- evt.getProtocolSession().closeProtocolSession();
+ protocolSession.closeProtocolSession();
stateManager.changeState(AMQState.CONNECTION_CLOSED);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
index b5001a6e64..da903e7c1d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
@@ -21,7 +21,8 @@
package org.apache.qpid.client.handler;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
@@ -39,7 +40,7 @@ public class ConnectionOpenOkMethodHandler implements StateAwareMethodListener
{
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
{
stateManager.changeState(AMQState.CONNECTION_OPEN);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java
index a658e3e787..699c8955bc 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java
@@ -22,7 +22,8 @@ package org.apache.qpid.client.handler;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.ConnectionRedirectBody;
@@ -44,7 +45,7 @@ public class ConnectionRedirectMethodHandler implements StateAwareMethodListener
{
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
{
_logger.info("ConnectionRedirect frame received");
ConnectionRedirectBody method = (ConnectionRedirectBody) evt.getMethod();
@@ -63,6 +64,6 @@ public class ConnectionRedirectMethodHandler implements StateAwareMethodListener
host = method.host.substring(0, portIndex);
port = Integer.parseInt(method.host.substring(portIndex + 1));
}
- evt.getProtocolSession().failover(host, port);
+ protocolSession.failover(host, port);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java
index 062cb268d8..87a8bbd529 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java
@@ -24,9 +24,10 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ConnectionSecureOkBody;
import org.apache.qpid.framing.ConnectionSecureBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
@@ -40,9 +41,9 @@ public class ConnectionSecureMethodHandler implements StateAwareMethodListener
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
{
- SaslClient client = evt.getProtocolSession().getSaslClient();
+ SaslClient client = protocolSession.getSaslClient();
if (client == null)
{
throw new AMQException("No SASL client set up - cannot proceed with authentication");
@@ -60,7 +61,7 @@ public class ConnectionSecureMethodHandler implements StateAwareMethodListener
AMQFrame responseFrame = ConnectionSecureOkBody.createAMQFrame(evt.getChannelId(),
(byte)0, (byte)9, // AMQP version (major, minor)
response); // response
- evt.getProtocolSession().writeFrame(responseFrame);
+ protocolSession.writeFrame(responseFrame);
}
catch (SaslException e)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
index d1b0082d36..a60c298bd2 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
@@ -24,7 +24,6 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.common.QpidProperties;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.security.AMQCallbackHandler;
import org.apache.qpid.client.security.CallbackHandlerRegistry;
@@ -35,6 +34,7 @@ import org.apache.qpid.framing.ConnectionStartBody;
import org.apache.qpid.framing.ConnectionStartOkBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
+import org.apache.qpid.protocol.AMQMethodEvent;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
@@ -59,7 +59,7 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener
{
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
{
ConnectionStartBody body = (ConnectionStartBody) evt.getMethod();
@@ -81,25 +81,24 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener
throw new AMQException("No supported security mechanism found, passed: " + new String(body.mechanisms));
}
- final AMQProtocolSession ps = evt.getProtocolSession();
byte[] saslResponse;
try
{
SaslClient sc = Sasl.createSaslClient(new String[]{mechanism},
null, "AMQP", "localhost",
- null, createCallbackHandler(mechanism, ps));
+ null, createCallbackHandler(mechanism, protocolSession));
if (sc == null)
{
throw new AMQException("Client SASL configuration error: no SaslClient could be created for mechanism " +
mechanism + ". Please ensure all factories are registered. See DynamicSaslRegistrar for " +
" details of how to register non-standard SASL client providers.");
}
- ps.setSaslClient(sc);
+ protocolSession.setSaslClient(sc);
saslResponse = (sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null);
}
catch (SaslException e)
{
- ps.setSaslClient(null);
+ protocolSession.setSaslClient(null);
throw new AMQException("Unable to create SASL client: " + e, e);
}
@@ -122,14 +121,14 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener
stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
FieldTable clientProperties = FieldTableFactory.newFieldTable();
- clientProperties.put(ClientProperties.instance.toString(), ps.getClientID());
+ clientProperties.put(ClientProperties.instance.toString(), protocolSession.getClientID());
clientProperties.put(ClientProperties.product.toString(), QpidProperties.getProductName());
clientProperties.put(ClientProperties.version.toString(), QpidProperties.getReleaseVersion());
clientProperties.put(ClientProperties.platform.toString(), getFullSystemInfo());
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- ps.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(),
+ protocolSession.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(),
(byte)0, (byte)9, // AMQP version (major, minor)
clientProperties, // clientProperties
selectedLocale, // locale
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
index d6ff53c416..e4e74be684 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
@@ -23,7 +23,6 @@ package org.apache.qpid.client.handler;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.ConnectionTuneParameters;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
@@ -32,6 +31,7 @@ import org.apache.qpid.framing.ConnectionOpenBody;
import org.apache.qpid.framing.ConnectionTuneBody;
import org.apache.qpid.framing.ConnectionTuneOkBody;
import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.protocol.AMQMethodEvent;
public class ConnectionTuneMethodHandler implements StateAwareMethodListener
{
@@ -48,13 +48,12 @@ public class ConnectionTuneMethodHandler implements StateAwareMethodListener
{
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
{
_logger.debug("ConnectionTune frame received");
ConnectionTuneBody frame = (ConnectionTuneBody) evt.getMethod();
- AMQProtocolSession session = evt.getProtocolSession();
- ConnectionTuneParameters params = session.getConnectionTuneParameters();
+ ConnectionTuneParameters params = protocolSession.getConnectionTuneParameters();
if (params == null)
{
params = new ConnectionTuneParameters();
@@ -63,11 +62,11 @@ public class ConnectionTuneMethodHandler implements StateAwareMethodListener
params.setFrameMax(frame.frameMax);
params.setChannelMax(frame.channelMax);
params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.heartbeat));
- session.setConnectionTuneParameters(params);
+ protocolSession.setConnectionTuneParameters(params);
stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
- session.writeFrame(createTuneOkFrame(evt.getChannelId(), params));
- session.writeFrame(createConnectionOpenFrame(evt.getChannelId(), session.getAMQConnection().getVirtualHost(), null, true));
+ protocolSession.writeFrame(createTuneOkFrame(evt.getChannelId(), params));
+ protocolSession.writeFrame(createConnectionOpenFrame(evt.getChannelId(), session.getAMQConnection().getVirtualHost(), null, true));
}
protected AMQFrame createConnectionOpenFrame(int channel, String path, String capabilities, boolean insist)
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java
index 858726745e..41e57113b0 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java
@@ -19,10 +19,11 @@ package org.apache.qpid.client.handler;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.ExchangeBoundOkBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
/**
* @author Apache Software Foundation
@@ -41,7 +42,7 @@ public class ExchangeBoundOkMethodHandler implements StateAwareMethodListener
{
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
{
if (_logger.isDebugEnabled())
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java
new file mode 100644
index 0000000000..543ea0c7ad
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.MessageAppendBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+
+public class MessageAppendMethodHandler implements StateAwareMethodListener
+{
+ private static MessageAppendMethodHandler _instance = new MessageAppendMethodHandler();
+
+ public static MessageAppendMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private MessageAppendMethodHandler() {}
+
+
+ public void methodReceived (AMQStateManager stateManager,
+ AMQProtocolSession protocolSession,
+ AMQMethodEvent<MessageAppendBody> evt)
+ throws AMQException
+ {
+ // TODO
+ }
+}
+
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageCheckpointMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageCheckpointMethodHandler.java
new file mode 100644
index 0000000000..f809dbc4c1
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageCheckpointMethodHandler.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.MessageCheckpointBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+
+public class MessageCheckpointMethodHandler implements StateAwareMethodListener
+{
+ private static MessageCheckpointMethodHandler _instance = new MessageCheckpointMethodHandler();
+
+ public static MessageCheckpointMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private MessageCheckpointMethodHandler() {}
+
+
+ public void methodReceived (AMQStateManager stateManager,
+ AMQProtocolSession protocolSession,
+ AMQMethodEvent<MessageCheckpointBody> evt)
+ throws AMQException
+ {
+ // TODO
+ }
+}
+
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java
index 6e12899204..cd04e89f6e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java
@@ -18,33 +18,33 @@
* under the License.
*
*/
-package org.apache.qpid.client.handler;
+package org.apache.qpid.server.handler;
-import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicDeliverBody;
+import org.apache.qpid.framing.MessageCloseBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
-import org.apache.qpid.client.message.UnprocessedMessage;
-public class BasicDeliverMethodHandler implements StateAwareMethodListener
+public class MessageCloseMethodHandler implements StateAwareMethodListener
{
- private static final Logger _logger = Logger.getLogger(BasicDeliverMethodHandler.class);
+ private static MessageCloseMethodHandler _instance = new MessageCloseMethodHandler();
- private static final BasicDeliverMethodHandler _instance = new BasicDeliverMethodHandler();
-
- public static BasicDeliverMethodHandler getInstance()
+ public static MessageCloseMethodHandler getInstance()
{
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ private MessageCloseMethodHandler() {}
+
+
+ public void methodReceived (AMQStateManager stateManager,
+ AMQProtocolSession protocolSession,
+ AMQMethodEvent<MessageCloseBody> evt)
+ throws AMQException
{
- final UnprocessedMessage msg = new UnprocessedMessage();
- msg.deliverBody = (BasicDeliverBody) evt.getMethod();
- msg.channelId = evt.getChannelId();
- _logger.debug("New JmsDeliver method received");
- evt.getProtocolSession().unprocessedMessageReceived(msg);
+ // TODO
}
}
+
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageEmptyMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageEmptyMethodHandler.java
new file mode 100644
index 0000000000..76534b285f
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageEmptyMethodHandler.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.MessageEmptyBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+
+public class MessageEmptyMethodHandler implements StateAwareMethodListener
+{
+ private static MessageEmptyMethodHandler _instance = new MessageEmptyMethodHandler();
+
+ public static MessageEmptyMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private MessageEmptyMethodHandler() {}
+
+
+ public void methodReceived (AMQStateManager stateManager,
+ AMQProtocolSession protocolSession,
+ AMQMethodEvent<MessageEmptyBody> evt)
+ throws AMQException
+ {
+ // TODO
+ }
+}
+
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageOffsetMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageOffsetMethodHandler.java
new file mode 100644
index 0000000000..80e5412049
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageOffsetMethodHandler.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.MessageOffsetBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+
+public class MessageOffsetMethodHandler implements StateAwareMethodListener
+{
+ private static MessageOffsetMethodHandler _instance = new MessageOffsetMethodHandler();
+
+ public static MessageOffsetMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private MessageOffsetMethodHandler() {}
+
+
+ public void methodReceived (AMQStateManager stateManager,
+ AMQProtocolSession protocolSession,
+ AMQMethodEvent<MessageOffsetBody> evt)
+ throws AMQException
+ {
+ // TODO
+ }
+}
+
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageOkMethodHandler.java
new file mode 100644
index 0000000000..6d1d94de67
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageOkMethodHandler.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.MessageOkBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+
+public class MessageOkMethodHandler implements StateAwareMethodListener
+{
+ private static MessageOkMethodHandler _instance = new MessageOkMethodHandler();
+
+ public static MessageOkMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private MessageOkMethodHandler() {}
+
+
+ public void methodReceived (AMQStateManager stateManager,
+ AMQProtocolSession protocolSession,
+ AMQMethodEvent<MessageOkBody> evt)
+ throws AMQException
+ {
+ // TODO
+ }
+}
+
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java
index 8785a96396..1535b16563 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java
@@ -18,35 +18,33 @@
* under the License.
*
*/
-package org.apache.qpid.client.handler;
+package org.apache.qpid.server.handler;
-import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.framing.MessageOpenBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.framing.BasicReturnBody;
-public class BasicReturnMethodHandler implements StateAwareMethodListener
+public class MessageOpenMethodHandler implements StateAwareMethodListener
{
- private static final Logger _logger = Logger.getLogger(BasicReturnMethodHandler.class);
+ private static MessageOpenMethodHandler _instance = new MessageOpenMethodHandler();
- private static final BasicReturnMethodHandler _instance = new BasicReturnMethodHandler();
-
- public static BasicReturnMethodHandler getInstance()
+ public static MessageOpenMethodHandler getInstance()
{
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ private MessageOpenMethodHandler() {}
+
+
+ public void methodReceived (AMQStateManager stateManager,
+ AMQProtocolSession protocolSession,
+ AMQMethodEvent<MessageOpenBody> evt)
+ throws AMQException
{
- _logger.debug("New JmsBounce method received");
- final UnprocessedMessage msg = new UnprocessedMessage();
- msg.deliverBody = null;
- msg.bounceBody = (BasicReturnBody) evt.getMethod();
- msg.channelId = evt.getChannelId();
-
- evt.getProtocolSession().unprocessedMessageReceived(msg);
+ // TODO
}
-} \ No newline at end of file
+}
+
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageRejectMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageRejectMethodHandler.java
new file mode 100644
index 0000000000..bbce2ae1ee
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageRejectMethodHandler.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.MessageRejectBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+
+public class MessageRejectMethodHandler implements StateAwareMethodListener
+{
+ private static MessageRejectMethodHandler _instance = new MessageRejectMethodHandler();
+
+ public static MessageRejectMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private MessageRejectMethodHandler() {}
+
+
+ public void methodReceived (AMQStateManager stateManager,
+ AMQProtocolSession protocolSession,
+ AMQMethodEvent<MessageRejectBody> evt)
+ throws AMQException
+ {
+ // TODO
+ }
+}
+
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageResumeMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageResumeMethodHandler.java
new file mode 100644
index 0000000000..810ed25029
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageResumeMethodHandler.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.MessageResumeBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+
+public class MessageResumeMethodHandler implements StateAwareMethodListener
+{
+ private static MessageResumeMethodHandler _instance = new MessageResumeMethodHandler();
+
+ public static MessageResumeMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private MessageResumeMethodHandler() {}
+
+
+ public void methodReceived (AMQStateManager stateManager,
+ AMQProtocolSession protocolSession,
+ AMQMethodEvent<MessageResumeBody> evt)
+ throws AMQException
+ {
+ // TODO
+ }
+}
+
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
new file mode 100644
index 0000000000..6a21bed7b5
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+
+public class MessageTransferMethodHandler implements StateAwareMethodListener
+{
+ private static MessageTransferMethodHandler _instance = new MessageTransferMethodHandler();
+
+ public static MessageTransferMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private MessageTransferMethodHandler() {}
+
+
+ public void methodReceived (AMQStateManager stateManager,
+ AMQProtocolSession protocolSession,
+ AMQMethodEvent<MessageTransferBody> evt)
+ throws AMQException
+ {
+ // TODO
+ }
+}
+
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java
index 3271a715a2..b8b75accb4 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java
@@ -17,11 +17,12 @@
*/
package org.apache.qpid.client.handler;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.client.state.AMQStateManager;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.QueueDeleteOkBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.log4j.Logger;
/**
@@ -41,7 +42,7 @@ public class QueueDeleteOkMethodHandler implements StateAwareMethodListener
{
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
{
if (_logger.isDebugEnabled())
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodEvent.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodEvent.java
deleted file mode 100644
index 403aa3486e..0000000000
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodEvent.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.protocol;
-
-import org.apache.qpid.framing.AMQMethodBody;
-
-public class AMQMethodEvent
-{
- private AMQMethodBody _method;
-
- private int _channelId;
-
- private AMQProtocolSession _protocolSession;
-
- public AMQMethodEvent(int channelId, AMQMethodBody method, AMQProtocolSession protocolSession)
- {
- _channelId = channelId;
- _method = method;
- _protocolSession = protocolSession;
- }
-
- public AMQMethodBody getMethod()
- {
- return _method;
- }
-
- public int getChannelId()
- {
- return _channelId;
- }
-
- public AMQProtocolSession getProtocolSession()
- {
- return _protocolSession;
- }
-
- public String toString()
- {
- StringBuffer buf = new StringBuffer("Method event: ");
- buf.append("\nChannel id: ").append(_channelId);
- buf.append("\nMethod: ").append(_method);
- return buf.toString();
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodListener.java
index 68299033a5..2cbd8f0e32 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodListener.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodListener.java
@@ -21,6 +21,8 @@
package org.apache.qpid.client.protocol;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
public interface AMQMethodListener
{
@@ -34,7 +36,7 @@ public interface AMQMethodListener
* to all registered listeners using the error() method (see below) allowing them to
* perform cleanup if necessary.
*/
- boolean methodReceived(AMQMethodEvent evt) throws AMQException;
+ boolean methodReceived(AMQMethodEvent evt, AMQProtocolSession protocolSession) throws AMQException;
/**
* Callback when an error has occurred. Allows listeners to clean up.
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 68797edc77..a0399e1450 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -46,6 +46,7 @@ import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.HeartbeatBody;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.ssl.BogusSSLContextFactory;
import java.util.Iterator;
@@ -313,14 +314,14 @@ public class AMQProtocolHandler extends IoHandlerAdapter
_logger.debug("Method frame received: " + frame);
}
- final AMQMethodEvent evt = new AMQMethodEvent(frame.channel, (AMQMethodBody) frame.bodyFrame, _protocolSession);
+ final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.channel, (AMQMethodBody) frame.bodyFrame);
try
{
boolean wasAnyoneInterested = false;
while (it.hasNext())
{
final AMQMethodListener listener = (AMQMethodListener) it.next();
- wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
+ wasAnyoneInterested = listener.methodReceived(evt, _protocolSession) || wasAnyoneInterested;
}
if (!wasAnyoneInterested)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
index 21ae3fc71f..4fff4fab00 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
@@ -22,6 +22,8 @@ package org.apache.qpid.client.protocol;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
public abstract class BlockingMethodFrameListener implements AMQMethodListener
{
@@ -53,7 +55,7 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener
* @return true if the listener has dealt with this frame
* @throws AMQException
*/
- public boolean methodReceived(AMQMethodEvent evt) throws AMQException
+ public boolean methodReceived(AMQMethodEvent evt, AMQProtocolSession protocolSession) throws AMQException
{
AMQMethodBody method = evt.getMethod();
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
index 50bd1667f9..0d9d70b244 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
@@ -21,9 +21,10 @@
package org.apache.qpid.client.state;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.client.handler.*;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
import org.apache.qpid.client.protocol.AMQMethodListener;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.framing.*;
import org.apache.log4j.Logger;
@@ -101,9 +102,16 @@ public class AMQStateManager implements AMQMethodListener
frame2handlerMap.put(ChannelCloseBody.class, ChannelCloseMethodHandler.getInstance());
frame2handlerMap.put(ChannelCloseOkBody.class, ChannelCloseOkMethodHandler.getInstance());
frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
- frame2handlerMap.put(BasicDeliverBody.class, BasicDeliverMethodHandler.getInstance());
- frame2handlerMap.put(BasicReturnBody.class, BasicReturnMethodHandler.getInstance());
- frame2handlerMap.put(BasicCancelOkBody.class, BasicCancelOkMethodHandler.getInstance());
+ frame2handlerMap.put(MessageAppendBody.class, MessageAppendMethodHandler.getInstance());
+ frame2handlerMap.put(MessageCheckpointBody.class, MessageCheckpointMethodHandler.getInstance());
+ frame2handlerMap.put(MessageCloseBody.class, MessageCloseMethodHandler.getInstance());
+ frame2handlerMap.put(MessageEmptyBody.class, MessageEmptyMethodHandler.getInstance());
+ frame2handlerMap.put(MessageOffsetBody.class, MessageOffsetMethodHandler.getInstance());
+ frame2handlerMap.put(MessageOkBody.class, MessageOkMethodHandler.getInstance());
+ frame2handlerMap.put(MessageOpenBody.class, MessageOpenMethodHandler.getInstance());
+ frame2handlerMap.put(MessageRejectBody.class, MessageRejectMethodHandler.getInstance());
+ frame2handlerMap.put(MessageResumeBody.class, MessageResumeMethodHandler.getInstance());
+ frame2handlerMap.put(MessageTransferBody.class, MessageTransferMethodHandler.getInstance());
frame2handlerMap.put(ChannelFlowOkBody.class, ChannelFlowOkMethodHandler.getInstance());
frame2handlerMap.put(QueueDeleteOkBody.class, QueueDeleteOkMethodHandler.getInstance());
frame2handlerMap.put(ExchangeBoundOkBody.class, ExchangeBoundOkMethodHandler.getInstance());
@@ -146,12 +154,12 @@ public class AMQStateManager implements AMQMethodListener
}
}
- public boolean methodReceived(AMQMethodEvent evt) throws AMQException
+ public boolean methodReceived(AMQMethodEvent evt, AMQProtocolSession protocolSession) throws AMQException
{
StateAwareMethodListener handler = findStateTransitionHandler(_currentState, evt.getMethod());
if (handler != null)
{
- handler.methodReceived(this, evt);
+ handler.methodReceived(this, protocolSession, evt);
return true;
}
return false;
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java b/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java
index 37a0f9f376..f2c8916868 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java
@@ -20,7 +20,8 @@
*/
package org.apache.qpid.client.state;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.AMQException;
/**
@@ -30,5 +31,6 @@ import org.apache.qpid.AMQException;
*/
public interface StateAwareMethodListener
{
- void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException;
+ void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession,
+ AMQMethodEvent evt) throws AMQException;
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java
index 7a01995abb..16dd5d8fa7 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java
@@ -23,10 +23,10 @@ package org.apache.qpid.server.cluster;
import org.apache.mina.common.IoSession;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
/**
* Hack to assist with reuse of the client handlers for connection setup in
@@ -51,8 +51,8 @@ class ClientAdapter implements MethodHandler
public void handle(int channel, AMQMethodBody method) throws AMQException
{
- AMQMethodEvent evt = new AMQMethodEvent(channel, method, _session);
- _stateMgr.methodReceived(evt);
+ AMQMethodEvent evt = new AMQMethodEvent(channel, method);
+ _stateMgr.methodReceived(evt, _session);
}
private class SessionAdapter extends AMQProtocolSession
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java
index 8e7fb1ff49..c4107a435b 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java
@@ -24,7 +24,7 @@ import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQMethodBody;
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java
index 022ee098ab..8e6d133600 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java
@@ -25,7 +25,7 @@ import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.cluster.util.LogMessage;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQMethodBody;
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java
index 5944d99a14..260f27fa82 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java
@@ -21,12 +21,12 @@
package org.apache.qpid.server.cluster.handler;
import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
import org.apache.qpid.server.cluster.ClusteredProtocolSession;
import org.apache.qpid.AMQException;
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java
index 46ba3e5015..97174d782c 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java
@@ -49,6 +49,7 @@ import org.apache.qpid.framing.BasicQosBody;
import org.apache.qpid.framing.TxSelectBody;
import org.apache.qpid.framing.TxCommitBody;
import org.apache.qpid.framing.TxRollbackBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.cluster.ClusterCapability;
import org.apache.qpid.server.cluster.ClusteredProtocolSession;
import org.apache.qpid.server.cluster.GroupManager;
@@ -76,7 +77,6 @@ import org.apache.qpid.server.handler.BasicQosHandler;
import org.apache.qpid.server.handler.TxSelectHandler;
import org.apache.qpid.server.handler.TxCommitHandler;
import org.apache.qpid.server.handler.TxRollbackHandler;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQState;
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java
index 5b2c6f4a9a..8250d57d98 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java
@@ -22,8 +22,8 @@ package org.apache.qpid.server.cluster.handler;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java
index 2cd0989f10..5cd94f12d1 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java
@@ -22,8 +22,8 @@ package org.apache.qpid.server.cluster.handler;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java
index 00f37951f2..eaf80338d2 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java
@@ -22,9 +22,9 @@ package org.apache.qpid.server.cluster.handler;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.cluster.ClusteredProtocolSession;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java
index 3e8528f533..3bf40fbaee 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java
@@ -22,8 +22,8 @@ package org.apache.qpid.server.cluster.handler;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.QueueDeclareBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java
index 729a38c970..e9f039ed09 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java
@@ -23,9 +23,9 @@ package org.apache.qpid.server.cluster.handler;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicCancelBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.cluster.ClusteredProtocolSession;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.ClusteredQueue;
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java
index 4d3f1261b2..693c70b780 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java
@@ -24,9 +24,9 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicConsumeBody;
import org.apache.qpid.framing.BasicConsumeOkBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.cluster.ClusteredProtocolSession;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.ClusteredQueue;
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java
index 03c644889e..ea42db3ded 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java
@@ -22,12 +22,12 @@ package org.apache.qpid.server.cluster.handler;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicConsumeBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.cluster.BroadcastPolicy;
import org.apache.qpid.server.cluster.GroupManager;
import org.apache.qpid.server.cluster.util.LogMessage;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.handler.BasicConsumeMethodHandler;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java
index db340c6a61..e68ae9eb3d 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.cluster.handler;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.cluster.BroadcastPolicy;
import org.apache.qpid.server.cluster.ClusteredProtocolSession;
import org.apache.qpid.server.cluster.GroupManager;
@@ -32,7 +33,6 @@ import org.apache.qpid.server.cluster.Member;
import org.apache.qpid.server.cluster.SimpleSendable;
import org.apache.qpid.server.cluster.policy.StandardPolicies;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java
index d46913d042..fd38ea6153 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java
@@ -22,8 +22,8 @@ package org.apache.qpid.server.cluster.handler;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
index a1058c6ff6..c927c0cebf 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
@@ -29,11 +29,11 @@ import org.apache.qpid.framing.ExchangeDeleteBody;
import org.apache.qpid.framing.QueueBindBody;
import org.apache.qpid.framing.QueueDeclareBody;
import org.apache.qpid.framing.QueueDeleteBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.cluster.MethodHandlerFactory;
import org.apache.qpid.server.cluster.MethodHandlerRegistry;
import org.apache.qpid.server.cluster.handler.WrappingMethodHandlerFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQState;
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
index cf931e8306..ac4879561c 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
@@ -31,11 +31,11 @@ import org.apache.qpid.framing.QueueDeleteBody;
import org.apache.qpid.framing.ClusterSynchBody;
import org.apache.qpid.framing.BasicConsumeBody;
import org.apache.qpid.framing.BasicCancelBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.cluster.ClusteredProtocolSession;
import org.apache.qpid.server.cluster.util.LogMessage;
import org.apache.qpid.server.cluster.util.Bindings;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java
index af43ab6474..00a27a8869 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java
@@ -33,11 +33,11 @@ public class AMQRequestBody extends AMQBody
// Constructor
public AMQRequestBody() {}
public AMQRequestBody(long requestId, long responseMark,
- AMQMethodBody methodPayload)
+ AMQMethodBody methodPayload)
{
- this.requestId = requestId;
- this.responseMark = responseMark;
- this.methodPayload = methodPayload;
+ this.requestId = requestId;
+ this.responseMark = responseMark;
+ this.methodPayload = methodPayload;
}
@@ -49,42 +49,42 @@ public class AMQRequestBody extends AMQBody
protected byte getFrameType()
{
- return (byte)AmqpConstants.frameRequestAsInt();
+ return (byte)AmqpConstants.frameRequestAsInt();
}
protected int getSize()
{
- return 8 + 8 + 4 + methodPayload.getBodySize();
+ return 8 + 8 + 4 + methodPayload.getBodySize();
}
protected void writePayload(ByteBuffer buffer)
{
- EncodingUtils.writeLong(buffer, requestId);
- EncodingUtils.writeLong(buffer, responseMark);
- EncodingUtils.writeUnsignedShort(buffer, 0); // reserved, set to 0
+ EncodingUtils.writeLong(buffer, requestId);
+ EncodingUtils.writeLong(buffer, responseMark);
+ EncodingUtils.writeUnsignedShort(buffer, 0); // reserved, set to 0
methodPayload.writePayload(buffer);
}
protected void populateFromBuffer(ByteBuffer buffer, long size)
throws AMQFrameDecodingException, AMQProtocolVersionException
{
- requestId = EncodingUtils.readLong(buffer);
- responseMark = EncodingUtils.readLong(buffer);
- int reserved = EncodingUtils.readShort(buffer); // reserved, throw away
- methodPayload.populateFromBuffer(buffer, size - 8 - 8 - 4);
+ requestId = EncodingUtils.readLong(buffer);
+ responseMark = EncodingUtils.readLong(buffer);
+ int reserved = EncodingUtils.readShort(buffer); // reserved, throw away
+ methodPayload.populateFromBuffer(buffer, size - 8 - 8 - 4);
}
public String toString()
{
- return "Req[" + requestId + " " + responseMark + "] C" +
- methodPayload.getClazz() + " M" + methodPayload.getMethod();
+ return "Req[" + requestId + " " + responseMark + "] C" +
+ methodPayload.getClazz() + " M" + methodPayload.getMethod();
}
public static AMQFrame createAMQFrame(int channelId, long requestId,
long responseMark, AMQMethodBody methodPayload)
{
AMQRequestBody requestFrame = new AMQRequestBody(requestId, responseMark,
- methodPayload);
+ methodPayload);
AMQFrame frame = new AMQFrame();
frame.channel = channelId;
frame.bodyFrame = requestFrame;
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java
index 67fc485f48..90038da2d4 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java
@@ -33,9 +33,9 @@ public class AMQResponseBody extends AMQBody
// Constructor
public AMQResponseBody() {}
public AMQResponseBody(long getResponseId, long getRequestId,
- int batchOffset, AMQMethodBody methodPayload)
+ int batchOffset, AMQMethodBody methodPayload)
{
- this.responseId = responseId;
+ this.responseId = responseId;
this.requestId = requestId;
this.batchOffset = batchOffset;
this.methodPayload = methodPayload;
@@ -49,12 +49,12 @@ public class AMQResponseBody extends AMQBody
protected byte getFrameType()
{
- return (byte)AmqpConstants.frameResponseAsInt();
+ return (byte)AmqpConstants.frameResponseAsInt();
}
protected int getSize()
{
- return 8 + 8 + 4 + methodPayload.getBodySize();
+ return 8 + 8 + 4 + methodPayload.getBodySize();
}
protected void writePayload(ByteBuffer buffer)
@@ -76,15 +76,15 @@ public class AMQResponseBody extends AMQBody
public String toString()
{
- return "Res[" + responseId + " " + requestId + "-" + requestId + batchOffset + "] C" +
- methodPayload.getClazz() + " M" + methodPayload.getMethod();
+ return "Res[" + responseId + " " + requestId + "-" + requestId + batchOffset + "] C" +
+ methodPayload.getClazz() + " M" + methodPayload.getMethod();
}
public static AMQFrame createAMQFrame(int channelId, long responseId,
long requestId, int batchOffset, AMQMethodBody methodPayload)
{
AMQResponseBody responseFrame = new AMQResponseBody(responseId,
- requestId, batchOffset, methodPayload);
+ requestId, batchOffset, methodPayload);
AMQFrame frame = new AMQFrame();
frame.channel = channelId;
frame.bodyFrame = responseFrame;
diff --git a/java/common/src/main/java/org/apache/qpid/framing/RequestHandler.java b/java/common/src/main/java/org/apache/qpid/framing/RequestHandler.java
new file mode 100644
index 0000000000..f779258e00
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/RequestHandler.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+public interface RequestHandler
+{
+ public boolean requestReceived(AMQRequestBody requestBody);
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
index e673a8e343..55c25151da 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
@@ -26,7 +26,7 @@ import org.apache.qpid.protocol.AMQProtocolWriter;
public class RequestManager
{
- private int channel;
+ private int channel;
AMQProtocolWriter protocolSession;
/**
@@ -39,60 +39,60 @@ public class RequestManager
/**
* These keep track of the last requestId and responseId to be received.
*/
- private long lastReceivedResponseId;
+ private long lastProcessedResponseId;
private Hashtable<Long, AMQResponseCallback> requestSentMap;
- public RequestManager(int channel, AMQProtocolWriter protocolSession)
+ public RequestManager(int channel, AMQProtocolWriter protocolSession)
{
- this.channel = channel;
+ this.channel = channel;
this.protocolSession = protocolSession;
- requestIdCount = 1L;
- lastReceivedResponseId = 0L;
+ requestIdCount = 1L;
+ lastProcessedResponseId = 0L;
requestSentMap = new Hashtable<Long, AMQResponseCallback>();
}
// *** Functions to originate a request ***
public long sendRequest(AMQMethodBody requestMethodBody,
- AMQResponseCallback responseCallback)
+ AMQResponseCallback responseCallback)
{
- long requestId = getNextRequestId(); // Get new request ID
- AMQFrame requestFrame = AMQRequestBody.createAMQFrame(channel, requestId,
- lastReceivedResponseId, requestMethodBody);
+ long requestId = getNextRequestId(); // Get new request ID
+ AMQFrame requestFrame = AMQRequestBody.createAMQFrame(channel, requestId,
+ lastProcessedResponseId, requestMethodBody);
protocolSession.writeFrame(requestFrame);
requestSentMap.put(requestId, responseCallback);
return requestId;
}
public void responseReceived(AMQResponseBody responseBody)
- throws RequestResponseMappingException
+ throws RequestResponseMappingException
{
- lastReceivedResponseId = responseBody.getResponseId();
long requestIdStart = responseBody.getRequestId();
long requestIdStop = requestIdStart + responseBody.getBatchOffset();
for (long requestId = requestIdStart; requestId <= requestIdStop; requestId++)
{
- AMQResponseCallback responseCallback = requestSentMap.get(requestId);
+ AMQResponseCallback responseCallback = requestSentMap.get(requestId);
if (responseCallback == null)
- throw new RequestResponseMappingException(requestId,
- "Failed to locate requestId " + requestId + " in requestSentMap.");
+ throw new RequestResponseMappingException(requestId,
+ "Failed to locate requestId " + requestId + " in requestSentMap.");
responseCallback.responseFrameReceived(responseBody);
requestSentMap.remove(requestId);
}
+ lastProcessedResponseId = responseBody.getResponseId();
}
// *** Management functions ***
public int requestsMapSize()
{
- return requestSentMap.size();
+ return requestSentMap.size();
}
// *** Private helper functions ***
private long getNextRequestId()
{
- return requestIdCount++;
+ return requestIdCount++;
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
index a895464a1f..280d8d562a 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
@@ -28,10 +28,11 @@ import org.apache.qpid.protocol.AMQProtocolWriter;
public class ResponseManager
{
- private int channel;
+ private int channel;
+ RequestHandler requestHandler;
AMQProtocolWriter protocolSession;
- /**
+ /**
* Determines the batch behaviour of the manager.
*
* Responses are sent to the RequestResponseManager through sendResponse().
@@ -48,7 +49,7 @@ public class ResponseManager
* MANUAL: No response is sent until it is explicitly released by calling
* function xxxx(). (TODO)
*/
- public enum batchResponseModeEnum { NONE }
+ public enum batchResponseModeEnum { NONE }
private batchResponseModeEnum batchResponseMode;
/**
@@ -70,79 +71,79 @@ public class ResponseManager
private class ResponseStatus implements Comparable<ResponseStatus>
{
- public long requestId;
+ public long requestId;
public AMQMethodBody responseMethodBody;
public ResponseStatus(long requestId)
{
- this.requestId = requestId;
- responseMethodBody = null;
+ this.requestId = requestId;
+ responseMethodBody = null;
}
public int compareTo(ResponseStatus o)
{
- return (int)(requestId - o.requestId);
+ return (int)(requestId - o.requestId);
}
}
private Hashtable<Long, ResponseStatus> responseMap;
- public ResponseManager(int channel, AMQProtocolWriter protocolSession)
+ public ResponseManager(int channel, RequestHandler requestHandler,
+ AMQProtocolWriter protocolSession)
{
- this.channel = channel;
+ this.channel = channel;
+ this.requestHandler = requestHandler;
this.protocolSession = protocolSession;
responseIdCount = 1L;
lastReceivedRequestId = 0L;
responseMap = new Hashtable<Long, ResponseStatus>();
}
- // *** Functions to handle an incoming request ***
+ // *** Functions to handle an incoming request ***
public void requestReceived(AMQRequestBody requestBody)
{
- long requestId = requestBody.getRequestId();
+ long requestId = requestBody.getRequestId();
// TODO: responseMark is used in HA, but until then, ignore...
long responseMark = requestBody.getResponseMark();
- lastReceivedRequestId = requestId;
+ lastReceivedRequestId = requestId;
responseMap.put(requestId, new ResponseStatus(requestId));
-
- // TODO: Initiate some action based on the MethodBody - like send to handlers,
- // but how to do this in a way that will work for both client and server?
+ requestHandler.requestReceived(requestBody);
}
public void sendResponse(long requestId, AMQMethodBody responseMethodBody)
- throws RequestResponseMappingException
+ throws RequestResponseMappingException
{
- ResponseStatus responseStatus = responseMap.get(requestId);
+ ResponseStatus responseStatus = responseMap.get(requestId);
if (responseStatus == null)
- throw new RequestResponseMappingException(requestId,
- "Failed to locate requestId " + requestId + " in responseMap.");
+ throw new RequestResponseMappingException(requestId,
+ "Failed to locate requestId " + requestId + " in responseMap.");
if (responseStatus.responseMethodBody != null)
- throw new RequestResponseMappingException(requestId, "RequestId " +
- requestId + " already has a response in responseMap.");
+ throw new RequestResponseMappingException(requestId, "RequestId " +
+ requestId + " already has a response in responseMap.");
responseStatus.responseMethodBody = responseMethodBody;
doBatches();
}
// *** Management functions ***
- public batchResponseModeEnum getBatchResponseMode()
+ public batchResponseModeEnum getBatchResponseMode()
{
- return batchResponseMode;
+ return batchResponseMode;
}
public void setBatchResponseMode(batchResponseModeEnum batchResponseMode)
{
- if (this.batchResponseMode != batchResponseMode)
+ if (this.batchResponseMode != batchResponseMode)
{
- this.batchResponseMode = batchResponseMode;
- doBatches();
+ this.batchResponseMode = batchResponseMode;
+ doBatches();
}
}
public int responsesMapSize()
{
- return responseMap.size();
+ return responseMap.size();
}
/**
@@ -153,12 +154,12 @@ public class ResponseManager
*/
public int outstandingResponses()
{
- int cnt = 0;
+ int cnt = 0;
for (Long requestId : responseMap.keySet())
{
- if (responseMap.get(requestId).responseMethodBody == null)
- cnt++;
- }
+ if (responseMap.get(requestId).responseMethodBody == null)
+ cnt++;
+ }
return cnt;
}
@@ -170,12 +171,12 @@ public class ResponseManager
*/
public int batchedResponses()
{
- int cnt = 0;
+ int cnt = 0;
for (Long requestId : responseMap.keySet())
{
- if (responseMap.get(requestId).responseMethodBody != null)
- cnt++;
- }
+ if (responseMap.get(requestId).responseMethodBody != null)
+ cnt++;
+ }
return cnt;
}
@@ -183,39 +184,39 @@ public class ResponseManager
private long getNextResponseId()
{
- return responseIdCount++;
+ return responseIdCount++;
}
private void doBatches()
{
- switch (batchResponseMode)
+ switch (batchResponseMode)
{
- case NONE:
- Iterator<Long> lItr = responseMap.keySet().iterator();
- while (lItr.hasNext())
+ case NONE:
+ Iterator<Long> lItr = responseMap.keySet().iterator();
+ while (lItr.hasNext())
{
- long requestId = lItr.next();
- ResponseStatus responseStatus = responseMap.get(requestId);
- if (responseStatus.responseMethodBody != null)
+ long requestId = lItr.next();
+ ResponseStatus responseStatus = responseMap.get(requestId);
+ if (responseStatus.responseMethodBody != null)
{
- sendResponseBatch(requestId, 0, responseStatus.responseMethodBody);
+ sendResponseBatch(requestId, 0, responseStatus.responseMethodBody);
lItr.remove();
}
}
- break;
+ break;
// TODO: Add additional batch mode handlers here...
- // case DELAY_FIXED:
- // case MANUAL:
+ // case DELAY_FIXED:
+ // case MANUAL:
}
}
private void sendResponseBatch(long firstRequestId, int numAdditionalRequests,
- AMQMethodBody responseMethodBody)
+ AMQMethodBody responseMethodBody)
{
- long responseId = getNextResponseId(); // Get new request ID
- AMQFrame responseFrame = AMQResponseBody.createAMQFrame(channel, responseId,
- firstRequestId, numAdditionalRequests, responseMethodBody);
+ long responseId = getNextResponseId(); // Get new request ID
+ AMQFrame responseFrame = AMQResponseBody.createAMQFrame(channel, responseId,
+ firstRequestId, numAdditionalRequests, responseMethodBody);
protocolSession.writeFrame(responseFrame);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodEvent.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java
index 3d12828900..ab36041cb8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodEvent.java
+++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.server.protocol;
+package org.apache.qpid.protocol;
import org.apache.qpid.framing.AMQMethodBody;