diff options
91 files changed, 888 insertions, 432 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 51be0d867b..7bdb5e1ecd 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -23,10 +23,14 @@ package org.apache.qpid.server; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.BasicPublishBody; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; +//import org.apache.qpid.framing.BasicPublishBody; +//import org.apache.qpid.framing.ContentBody; +//import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.Content; +import org.apache.qpid.framing.RequestManager; +import org.apache.qpid.framing.ResponseManager; +import org.apache.qpid.protocol.AMQProtocolWriter; import org.apache.qpid.server.ack.TxAck; import org.apache.qpid.server.ack.UnacknowledgedMessage; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; @@ -64,6 +68,9 @@ public class AMQChannel private long _prefetch_HighWaterMark; private long _prefetch_LowWaterMark; + + private RequestManager _requestManager; + private ResponseManager _responseManager; /** * The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that @@ -114,7 +121,7 @@ public class AMQChannel private final List<AMQDataBlock> _returns = new LinkedList<AMQDataBlock>(); private Set<Long> _browsedAcks = new HashSet<Long>(); - public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges) + public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges, AMQProtocolWriter protocolSession) throws AMQException { _channelId = channelId; @@ -122,6 +129,8 @@ public class AMQChannel _prefetch_LowWaterMark = _prefetch_HighWaterMark / 2; _messageStore = messageStore; _exchanges = exchanges; + _requestManager = new RequestManager(channelId, protocolSession); + _responseManager = new ResponseManager(channelId, protocolSession); _txnBuffer = new TxnBuffer(_messageStore); } @@ -170,11 +179,11 @@ public class AMQChannel _prefetch_HighWaterMark = prefetchCount; } - public void setPublishFrame(BasicPublishBody publishBody, AMQProtocolSession publisher) throws AMQException - { - _currentMessage = new AMQMessage(_messageStore, publishBody); - _currentMessage.setPublisher(publisher); - } +// public void setPublishFrame(BasicPublishBody publishBody, AMQProtocolSession publisher) throws AMQException +// { +// _currentMessage = new AMQMessage(_messageStore, publishBody); +// _currentMessage.setPublisher(publisher); +// } // public void publishContentHeader(ContentHeaderBody contentHeaderBody) // throws AMQException @@ -269,6 +278,9 @@ public class AMQChannel } } } + + public RequestManager getRequestManager() { return _requestManager; } + public ResponseManager getResponseManager() { return _responseManager; } public long getNextDeliveryTag() { diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java index 2f1d6ada00..7461f93539 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java @@ -25,8 +25,8 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.ChannelCloseBody; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ChannelCloseOkBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java index fd6714de3a..32622f8fed 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java @@ -22,8 +22,8 @@ package org.apache.qpid.server.handler; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ChannelCloseOkBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java index 8f91d4ecf4..07ab0537d5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java @@ -21,12 +21,12 @@ package org.apache.qpid.server.handler; import org.apache.log4j.Logger; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ChannelFlowBody; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java index ee2ff9e5bb..459ccf40a8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java @@ -24,11 +24,11 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ChannelOpenBody; import org.apache.qpid.framing.ChannelOpenOkBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java index 94e54e8f1d..6e22d67b72 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java @@ -23,12 +23,12 @@ package org.apache.qpid.server.handler; import org.apache.qpid.framing.ConnectionCloseBody; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ConnectionCloseOkBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.AMQException; import org.apache.log4j.Logger; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java index cc9277593b..69d50c9237 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java @@ -22,8 +22,8 @@ package org.apache.qpid.server.handler; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ConnectionCloseOkBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java index f1487b2844..c3b6560ee4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java @@ -24,8 +24,8 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ConnectionOpenBody; import org.apache.qpid.framing.ConnectionOpenOkBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQState; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java index 42dda66eab..9aea4a7b26 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java @@ -23,8 +23,8 @@ package org.apache.qpid.server.handler; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.framing.*; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.protocol.HeartbeatConfig; import org.apache.qpid.server.queue.QueueRegistry; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java index b2ddf6d0db..77fddf1ff5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java @@ -27,8 +27,8 @@ import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ConnectionSecureBody; import org.apache.qpid.framing.ConnectionStartOkBody; import org.apache.qpid.framing.ConnectionTuneBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.protocol.HeartbeatConfig; import org.apache.qpid.server.queue.QueueRegistry; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java index f0b4e0a515..de38cff3e2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java @@ -23,8 +23,8 @@ package org.apache.qpid.server.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ConnectionTuneOkBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQState; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java index 87e20faa81..67f77c72ef 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java @@ -21,9 +21,9 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ExchangeBoundBody; import org.apache.qpid.framing.ExchangeBoundOkBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java index efcfffeaff..cdb3a503ae 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java @@ -25,10 +25,10 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ExchangeDeclareBody; import org.apache.qpid.framing.ExchangeDeclareOkBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.exchange.ExchangeFactory; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java index 1c72f75dd7..79b4e07c90 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java @@ -24,9 +24,9 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ExchangeDeleteBody; import org.apache.qpid.framing.ExchangeDeleteOkBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.exchange.ExchangeInUseException; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageAppendHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageAppendHandler.java index 91b5493f32..962bd2004f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageAppendHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageAppendHandler.java @@ -20,7 +20,13 @@ */ package org.apache.qpid.server.handler; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.MessageAppendBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; public class MessageAppendHandler implements StateAwareMethodListener<MessageAppendBody> diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java index a84cbfb88e..ad41910141 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java @@ -20,7 +20,13 @@ */ package org.apache.qpid.server.handler; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.MessageCancelBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; public class MessageCancelHandler implements StateAwareMethodListener<MessageCancelBody> diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCheckpointHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCheckpointHandler.java index 9dd9a6b18e..2123f9203c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCheckpointHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCheckpointHandler.java @@ -20,7 +20,13 @@ */ package org.apache.qpid.server.handler; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.MessageCheckpointBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; public class MessageCheckpointHandler implements StateAwareMethodListener<MessageCheckpointBody> diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCloseHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCloseHandler.java index 5e21c1ee6c..5efe24abfa 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCloseHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCloseHandler.java @@ -20,7 +20,13 @@ */ package org.apache.qpid.server.handler; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.MessageCloseBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; public class MessageCloseHandler implements StateAwareMethodListener<MessageCloseBody> diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java index a2c5662703..c019a3b043 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java @@ -20,7 +20,13 @@ */ package org.apache.qpid.server.handler; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.MessageConsumeBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; public class MessageConsumeHandler implements StateAwareMethodListener<MessageConsumeBody> diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageEmptyHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageEmptyHandler.java index 37d39a517a..3d5b59c238 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageEmptyHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageEmptyHandler.java @@ -20,7 +20,13 @@ */ package org.apache.qpid.server.handler; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.MessageEmptyBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; public class MessageEmptyHandler implements StateAwareMethodListener<MessageEmptyBody> diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java index 15d769e295..5ae0c677a4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java @@ -20,7 +20,13 @@ */ package org.apache.qpid.server.handler; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.MessageGetBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; public class MessageGetHandler implements StateAwareMethodListener<MessageGetBody> diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOffsetHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOffsetHandler.java index 7ac075c8d4..de6a886891 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOffsetHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOffsetHandler.java @@ -20,7 +20,13 @@ */ package org.apache.qpid.server.handler; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.MessageOffsetBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; public class MessageOffsetHandler implements StateAwareMethodListener<MessageOffsetBody> diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOkHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOkHandler.java index 4b2a1543cd..9fb38bf8b8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOkHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOkHandler.java @@ -20,7 +20,13 @@ */ package org.apache.qpid.server.handler; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.MessageOkBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; public class MessageOkHandler implements StateAwareMethodListener<MessageOkBody> diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOpenHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOpenHandler.java index 79679713ba..714227bb7c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOpenHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOpenHandler.java @@ -20,7 +20,13 @@ */ package org.apache.qpid.server.handler; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.MessageOpenBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; public class MessageOpenHandler implements StateAwareMethodListener<MessageOpenBody> diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java index baa01df602..6bea5553d9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java @@ -20,7 +20,13 @@ */ package org.apache.qpid.server.handler; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.MessageQosBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; public class MessageQosHandler implements StateAwareMethodListener<MessageQosBody> diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java index e178c60b27..6da0b6ffcc 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java @@ -20,7 +20,13 @@ */ package org.apache.qpid.server.handler; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.MessageRecoverBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; public class MessageRecoverHandler implements StateAwareMethodListener<MessageRecoverBody> diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRejectHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRejectHandler.java index 401b399fa0..e5a27c762a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRejectHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRejectHandler.java @@ -20,7 +20,13 @@ */ package org.apache.qpid.server.handler; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.MessageRejectBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; public class MessageRejectHandler implements StateAwareMethodListener<MessageRejectBody> diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageResumeHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageResumeHandler.java index 429514cc5b..49b5ff8e08 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageResumeHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageResumeHandler.java @@ -20,7 +20,13 @@ */ package org.apache.qpid.server.handler; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.MessageResumeBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; public class MessageResumeHandler implements StateAwareMethodListener<MessageResumeBody> diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java index 18027fdc2b..4f6886e885 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java @@ -20,7 +20,13 @@ */ package org.apache.qpid.server.handler; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.MessageTransferBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; public class MessageTransferHandler implements StateAwareMethodListener<MessageTransferBody> diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java index 5e8c8a31ca..915bfa67a6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java @@ -22,13 +22,13 @@ package org.apache.qpid.server.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.QueueBindBody; import org.apache.qpid.framing.QueueBindOkBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index 0d2dc1550e..1a7b82829a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -26,9 +26,9 @@ import org.apache.qpid.configuration.Configured; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.QueueDeclareBody; import org.apache.qpid.framing.QueueDeclareOkBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java index d49baa97a4..b867d80fdb 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java @@ -20,13 +20,13 @@ */ package org.apache.qpid.server.handler; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.framing.QueueDeleteBody; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java index 74902c33f8..983e6f7e56 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java @@ -23,9 +23,9 @@ package org.apache.qpid.server.handler; import org.apache.qpid.AMQException; import org.apache.qpid.framing.TxCommitBody; import org.apache.qpid.framing.TxCommitOkBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java index 32fb57db70..891dd69d4d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java @@ -23,8 +23,8 @@ package org.apache.qpid.server.handler; import org.apache.qpid.AMQException; import org.apache.qpid.framing.TxRollbackBody; import org.apache.qpid.framing.TxRollbackOkBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java index bb0f89f163..0c2a6ca210 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java @@ -23,8 +23,8 @@ package org.apache.qpid.server.handler; import org.apache.qpid.AMQException; import org.apache.qpid.framing.TxSelectBody; import org.apache.qpid.framing.TxSelectOkBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java index d2062d3c17..6596da1f8f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.protocol; import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.framing.AMQMethodBody; diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 525978e348..558d5f4aa6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -30,14 +30,18 @@ import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.ProtocolInitiation; import org.apache.qpid.framing.ConnectionStartBody; import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.Content; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.ProtocolVersionList; import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.AMQRequestBody; +import org.apache.qpid.framing.AMQResponseBody; +//import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.HeartbeatBody; -import org.apache.qpid.framing.ContentHeaderBody; +//import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.codec.AMQDecoder; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.RequiredDeliveryException; @@ -195,124 +199,130 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { AMQFrame frame = (AMQFrame) message; - if (frame.bodyFrame instanceof AMQRequest) + if (frame.bodyFrame instanceof AMQRequestBody) { requestFrameReceived(frame); } - else if (frame.bodyFrame instanceof AMQResponse) + else if (frame.bodyFrame instanceof AMQResponseBody) { responseFrameReceived(frame); } - else if (frame.bodyFrame instanceof AMQMethodBody) - { - methodFrameReceived(frame); - } else { - try - { - contentFrameReceived(frame); - } - catch (RequiredDeliveryException e) - { - //need to return the message: - _logger.info("Returning message to " + this + " channel " + frame.channel - + ": " + e.getMessage()); - writeFrame(e.getReturnMessage(frame.channel)); - } + _logger.error("Received invalid frame: " + frame.toString()); } +// else if (frame.bodyFrame instanceof AMQMethodBody) +// { +// methodFrameReceived(frame); +// } +// else +// { +// try +// { +// contentFrameReceived(frame); +// } +// catch (RequiredDeliveryException e) +// { +// //need to return the message: +// _logger.info("Returning message to " + this + " channel " + frame.channel +// + ": " + e.getMessage()); +// writeFrame(e.getReturnMessage(frame.channel)); +// } +// } } } - private void requestFrameReceived(AMQFrame frame) + private void requestFrameReceived(AMQFrame frame) throws AMQException { if (_logger.isDebugEnabled()) { _logger.debug("Request frame received: " + frame); } + AMQChannel channel = getChannel(frame.channel); } - private void responseFrameReceived(AMQFrame frame) + private void responseFrameReceived(AMQFrame frame) throws AMQException { if (_logger.isDebugEnabled()) { _logger.debug("Response frame received: " + frame); } - } - - private void methodFrameReceived(AMQFrame frame) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Method frame received: " + frame); - } - final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.channel, - (AMQMethodBody) frame.bodyFrame); - try - { - boolean wasAnyoneInterested = false; - for (AMQMethodListener listener : _frameListeners) - { - wasAnyoneInterested = listener.methodReceived(evt, this, _queueRegistry, _exchangeRegistry) || - wasAnyoneInterested; - } - if (!wasAnyoneInterested) - { - throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener."); - } - } - catch (AMQChannelException e) - { - _logger.error("Closing channel due to: " + e.getMessage()); - writeFrame(e.getCloseFrame(frame.channel)); - } - catch (AMQException e) - { - for (AMQMethodListener listener : _frameListeners) - { - listener.error(e); - } - _minaProtocolSession.close(); - } - } - - private void contentFrameReceived(AMQFrame frame) throws AMQException - { - if (frame.bodyFrame instanceof ContentHeaderBody) - { - contentHeaderReceived(frame); - } - else if (frame.bodyFrame instanceof ContentBody) - { - contentBodyReceived(frame); - } - else if (frame.bodyFrame instanceof HeartbeatBody) - { - _logger.debug("Received heartbeat from client"); - } - else - { - _logger.warn("Unrecognised frame " + frame.getClass().getName()); - } - } - - private void contentHeaderReceived(AMQFrame frame) throws AMQException - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Content header frame received: " + frame); - } - getChannel(frame.channel).publishContentHeader((ContentHeaderBody) frame.bodyFrame); - } - - private void contentBodyReceived(AMQFrame frame) throws AMQException - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Content body frame received: " + frame); - } - getChannel(frame.channel).publishContentBody((ContentBody) frame.bodyFrame); - } + AMQChannel channel = getChannel(frame.channel); + } + +// private void methodFrameReceived(AMQFrame frame) +// { +// if (_logger.isDebugEnabled()) +// { +// _logger.debug("Method frame received: " + frame); +// } +// final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.channel, +// (AMQMethodBody) frame.bodyFrame); +// try +// { +// boolean wasAnyoneInterested = false; +// for (AMQMethodListener listener : _frameListeners) +// { +// wasAnyoneInterested = listener.methodReceived(evt, this, _queueRegistry, _exchangeRegistry) || +// wasAnyoneInterested; +// } +// if (!wasAnyoneInterested) +// { +// throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener."); +// } +// } +// catch (AMQChannelException e) +// { +// _logger.error("Closing channel due to: " + e.getMessage()); +// writeFrame(e.getCloseFrame(frame.channel)); +// } +// catch (AMQException e) +// { +// for (AMQMethodListener listener : _frameListeners) +// { +// listener.error(e); +// } +// _minaProtocolSession.close(); +// } +// } + +// private void contentFrameReceived(AMQFrame frame) throws AMQException +// { +// if (frame.bodyFrame instanceof ContentHeaderBody) +// { +// contentHeaderReceived(frame); +// } +// else if (frame.bodyFrame instanceof ContentBody) +// { +// contentBodyReceived(frame); +// } +// else if (frame.bodyFrame instanceof HeartbeatBody) +// { +// _logger.debug("Received heartbeat from client"); +// } +// else +// { +// _logger.warn("Unrecognised frame " + frame.getClass().getName()); +// } +// } + +// private void contentHeaderReceived(AMQFrame frame) throws AMQException +// { +// if (_logger.isDebugEnabled()) +// { +// _logger.debug("Content header frame received: " + frame); +// } +// getChannel(frame.channel).publishContentHeader((ContentHeaderBody) frame.bodyFrame); +// } + +// private void contentBodyReceived(AMQFrame frame) throws AMQException +// { +// if (_logger.isDebugEnabled()) +// { +// _logger.debug("Content body frame received: " + frame); +// } +// getChannel(frame.channel).publishContentBody((ContentBody) frame.bodyFrame); +// } /** * Convenience method that writes a frame to the protocol session. Equivalent diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index b1ed999f72..00d22d839e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.AMQException; @@ -38,14 +39,6 @@ public interface AMQProtocolSession extends AMQProtocolWriter */ void dataBlockReceived(AMQDataBlock message) throws Exception; -// This is now a part of AMQProtocolWriter (inherited) to provide uniformity across both -// client and server. -// /** -// * Write a datablock, encoding where necessary (e.g. into a sequence of bytes) -// * @param frame the frame to be encoded and written -// */ -// void writeFrame(AMQDataBlock frame); - /** * Get the context key associated with this session. Context key is described * in the AMQ protocol specification (RFC 6). diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java index 509d7761dd..18b8041e7c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java @@ -22,9 +22,9 @@ package org.apache.qpid.server.state; import org.apache.qpid.AMQException; import org.apache.qpid.framing.*; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.handler.*; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQMethodListener; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java b/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java index 7d58f242e5..56323258b7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java +++ b/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java @@ -21,7 +21,7 @@ package org.apache.qpid.server.state; import org.apache.qpid.AMQException; -import org.apache.qpid.server.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.exchange.ExchangeRegistry; diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 8cde5e557f..b1f19bc191 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -63,7 +63,6 @@ import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.JMSStreamMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; -import org.apache.qpid.client.protocol.AMQMethodEvent; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.common.AMQPFilterTypes; @@ -89,6 +88,7 @@ import org.apache.qpid.framing.TxCommitBody; import org.apache.qpid.framing.TxCommitOkBody; import org.apache.qpid.framing.TxRollbackBody; import org.apache.qpid.framing.TxRollbackOkBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.handler.ExchangeBoundHandler; diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java deleted file mode 100644 index d855e97204..0000000000 --- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java +++ /dev/null @@ -1,35 +0,0 @@ -package org.apache.qpid.client.handler; - -import org.apache.qpid.client.state.StateAwareMethodListener; -import org.apache.qpid.client.state.AMQStateManager; -import org.apache.qpid.client.protocol.AMQMethodEvent; -import org.apache.qpid.client.BasicMessageConsumer; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.ExchangeBoundOkBody; -import org.apache.qpid.framing.BasicCancelOkBody; -import org.apache.log4j.Logger; - -/** - * @author Apache Software Foundation - */ -public class BasicCancelOkMethodHandler implements StateAwareMethodListener -{ - private static final Logger _logger = Logger.getLogger(BasicCancelOkMethodHandler.class); - private static final BasicCancelOkMethodHandler _instance = new BasicCancelOkMethodHandler(); - - public static BasicCancelOkMethodHandler getInstance() - { - return _instance; - } - - private BasicCancelOkMethodHandler() - { - } - - public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException - { - _logger.debug("New BasicCancelOk method received"); - BasicCancelOkBody body = (BasicCancelOkBody) evt.getMethod(); - evt.getProtocolSession().confirmConsumerCancelled(evt.getChannelId(), body.consumerTag); - } -} diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java index 5ad530a3ea..eb24f1fa74 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java @@ -26,13 +26,14 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQInvalidSelectorException; import org.apache.qpid.client.AMQNoConsumersException; import org.apache.qpid.client.AMQNoRouteException; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.client.protocol.AMQMethodEvent; +import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ChannelCloseBody; import org.apache.qpid.framing.ChannelCloseOkBody; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.protocol.AMQMethodEvent; public class ChannelCloseMethodHandler implements StateAwareMethodListener { @@ -45,7 +46,7 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener return _handler; } - public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException { _logger.debug("ChannelClose method received"); ChannelCloseBody method = (ChannelCloseBody) evt.getMethod(); @@ -61,7 +62,7 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)0, (byte)9); - evt.getProtocolSession().writeFrame(frame); + protocolSession.writeFrame(frame); if (errorCode != AMQConstant.REPLY_SUCCESS.getCode()) { _logger.error("Channel close received with errorCode " + errorCode + ", and reason " + reason); @@ -85,6 +86,6 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener } } - evt.getProtocolSession().channelClosed(evt.getChannelId(), errorCode, reason); + protocolSession.channelClosed(evt.getChannelId(), errorCode, reason); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java index 383cebbcab..a99c963eb4 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java @@ -21,7 +21,8 @@ package org.apache.qpid.client.handler; import org.apache.qpid.AMQException; -import org.apache.qpid.client.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.log4j.Logger; @@ -37,7 +38,7 @@ public class ChannelCloseOkMethodHandler implements StateAwareMethodListener return _instance; } - public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException { _logger.info("Received channel-close-ok for channel-id " + evt.getChannelId()); diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java index 9a4f7af849..7a13972d8f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java @@ -22,7 +22,8 @@ package org.apache.qpid.client.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.client.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.ChannelFlowOkBody; @@ -41,7 +42,7 @@ public class ChannelFlowOkMethodHandler implements StateAwareMethodListener { } - public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException { ChannelFlowOkBody method = (ChannelFlowOkBody) evt.getMethod(); _logger.debug("Received Channel.Flow-Ok message, active = " + method.active); diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java index 5cec420920..36e9c947f3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java @@ -24,7 +24,8 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQConnectionClosedException; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.client.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; @@ -47,7 +48,7 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener { } - public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException { _logger.info("ConnectionClose frame received"); ConnectionCloseBody method = (ConnectionCloseBody) evt.getMethod(); @@ -62,7 +63,7 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - evt.getProtocolSession().writeFrame(ConnectionCloseOkBody.createAMQFrame((short)0, (byte)0, (byte)9)); + protocolSession.writeFrame(ConnectionCloseOkBody.createAMQFrame((short)0, (byte)0, (byte)9)); if (errorCode != 200) { @@ -70,7 +71,7 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener { _logger.info("Authentication Error:"+Thread.currentThread().getName()); - evt.getProtocolSession().closeProtocolSession(); + protocolSession.closeProtocolSession(); //todo this is a bit of a fudge (could be conssidered such as each new connection needs a new state manager or at least a fresh state. stateManager.changeState(AMQState.CONNECTION_NOT_STARTED); @@ -88,7 +89,7 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener // this actually closes the connection in the case where it is not an error. - evt.getProtocolSession().closeProtocolSession(); + protocolSession.closeProtocolSession(); stateManager.changeState(AMQState.CONNECTION_CLOSED); } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java index b5001a6e64..da903e7c1d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java @@ -21,7 +21,8 @@ package org.apache.qpid.client.handler; import org.apache.qpid.AMQException; -import org.apache.qpid.client.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; @@ -39,7 +40,7 @@ public class ConnectionOpenOkMethodHandler implements StateAwareMethodListener { } - public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException { stateManager.changeState(AMQState.CONNECTION_OPEN); } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java index a658e3e787..699c8955bc 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java @@ -22,7 +22,8 @@ package org.apache.qpid.client.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.client.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.ConnectionRedirectBody; @@ -44,7 +45,7 @@ public class ConnectionRedirectMethodHandler implements StateAwareMethodListener { } - public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException { _logger.info("ConnectionRedirect frame received"); ConnectionRedirectBody method = (ConnectionRedirectBody) evt.getMethod(); @@ -63,6 +64,6 @@ public class ConnectionRedirectMethodHandler implements StateAwareMethodListener host = method.host.substring(0, portIndex); port = Integer.parseInt(method.host.substring(portIndex + 1)); } - evt.getProtocolSession().failover(host, port); + protocolSession.failover(host, port); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java index 062cb268d8..87a8bbd529 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java @@ -24,9 +24,10 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ConnectionSecureOkBody; import org.apache.qpid.framing.ConnectionSecureBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; -import org.apache.qpid.client.protocol.AMQMethodEvent; import javax.security.sasl.SaslClient; import javax.security.sasl.SaslException; @@ -40,9 +41,9 @@ public class ConnectionSecureMethodHandler implements StateAwareMethodListener return _instance; } - public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException { - SaslClient client = evt.getProtocolSession().getSaslClient(); + SaslClient client = protocolSession.getSaslClient(); if (client == null) { throw new AMQException("No SASL client set up - cannot proceed with authentication"); @@ -60,7 +61,7 @@ public class ConnectionSecureMethodHandler implements StateAwareMethodListener AMQFrame responseFrame = ConnectionSecureOkBody.createAMQFrame(evt.getChannelId(), (byte)0, (byte)9, // AMQP version (major, minor) response); // response - evt.getProtocolSession().writeFrame(responseFrame); + protocolSession.writeFrame(responseFrame); } catch (SaslException e) { diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java index d1b0082d36..a60c298bd2 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java @@ -24,7 +24,6 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.common.ClientProperties; import org.apache.qpid.common.QpidProperties; -import org.apache.qpid.client.protocol.AMQMethodEvent; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.security.AMQCallbackHandler; import org.apache.qpid.client.security.CallbackHandlerRegistry; @@ -35,6 +34,7 @@ import org.apache.qpid.framing.ConnectionStartBody; import org.apache.qpid.framing.ConnectionStartOkBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; +import org.apache.qpid.protocol.AMQMethodEvent; import javax.security.sasl.Sasl; import javax.security.sasl.SaslClient; @@ -59,7 +59,7 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener { } - public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException { ConnectionStartBody body = (ConnectionStartBody) evt.getMethod(); @@ -81,25 +81,24 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener throw new AMQException("No supported security mechanism found, passed: " + new String(body.mechanisms)); } - final AMQProtocolSession ps = evt.getProtocolSession(); byte[] saslResponse; try { SaslClient sc = Sasl.createSaslClient(new String[]{mechanism}, null, "AMQP", "localhost", - null, createCallbackHandler(mechanism, ps)); + null, createCallbackHandler(mechanism, protocolSession)); if (sc == null) { throw new AMQException("Client SASL configuration error: no SaslClient could be created for mechanism " + mechanism + ". Please ensure all factories are registered. See DynamicSaslRegistrar for " + " details of how to register non-standard SASL client providers."); } - ps.setSaslClient(sc); + protocolSession.setSaslClient(sc); saslResponse = (sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null); } catch (SaslException e) { - ps.setSaslClient(null); + protocolSession.setSaslClient(null); throw new AMQException("Unable to create SASL client: " + e, e); } @@ -122,14 +121,14 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener stateManager.changeState(AMQState.CONNECTION_NOT_TUNED); FieldTable clientProperties = FieldTableFactory.newFieldTable(); - clientProperties.put(ClientProperties.instance.toString(), ps.getClientID()); + clientProperties.put(ClientProperties.instance.toString(), protocolSession.getClientID()); clientProperties.put(ClientProperties.product.toString(), QpidProperties.getProductName()); clientProperties.put(ClientProperties.version.toString(), QpidProperties.getReleaseVersion()); clientProperties.put(ClientProperties.platform.toString(), getFullSystemInfo()); // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - ps.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(), + protocolSession.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(), (byte)0, (byte)9, // AMQP version (major, minor) clientProperties, // clientProperties selectedLocale, // locale diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java index d6ff53c416..e4e74be684 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java @@ -23,7 +23,6 @@ package org.apache.qpid.client.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.client.ConnectionTuneParameters; -import org.apache.qpid.client.protocol.AMQMethodEvent; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; @@ -32,6 +31,7 @@ import org.apache.qpid.framing.ConnectionOpenBody; import org.apache.qpid.framing.ConnectionTuneBody; import org.apache.qpid.framing.ConnectionTuneOkBody; import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.protocol.AMQMethodEvent; public class ConnectionTuneMethodHandler implements StateAwareMethodListener { @@ -48,13 +48,12 @@ public class ConnectionTuneMethodHandler implements StateAwareMethodListener { } - public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException { _logger.debug("ConnectionTune frame received"); ConnectionTuneBody frame = (ConnectionTuneBody) evt.getMethod(); - AMQProtocolSession session = evt.getProtocolSession(); - ConnectionTuneParameters params = session.getConnectionTuneParameters(); + ConnectionTuneParameters params = protocolSession.getConnectionTuneParameters(); if (params == null) { params = new ConnectionTuneParameters(); @@ -63,11 +62,11 @@ public class ConnectionTuneMethodHandler implements StateAwareMethodListener params.setFrameMax(frame.frameMax); params.setChannelMax(frame.channelMax); params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.heartbeat)); - session.setConnectionTuneParameters(params); + protocolSession.setConnectionTuneParameters(params); stateManager.changeState(AMQState.CONNECTION_NOT_OPENED); - session.writeFrame(createTuneOkFrame(evt.getChannelId(), params)); - session.writeFrame(createConnectionOpenFrame(evt.getChannelId(), session.getAMQConnection().getVirtualHost(), null, true)); + protocolSession.writeFrame(createTuneOkFrame(evt.getChannelId(), params)); + protocolSession.writeFrame(createConnectionOpenFrame(evt.getChannelId(), session.getAMQConnection().getVirtualHost(), null, true)); } protected AMQFrame createConnectionOpenFrame(int channel, String path, String capabilities, boolean insist) diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java index 858726745e..41e57113b0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java @@ -19,10 +19,11 @@ package org.apache.qpid.client.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.client.protocol.AMQMethodEvent; +import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.ExchangeBoundOkBody; +import org.apache.qpid.protocol.AMQMethodEvent; /** * @author Apache Software Foundation @@ -41,7 +42,7 @@ public class ExchangeBoundOkMethodHandler implements StateAwareMethodListener { } - public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException { if (_logger.isDebugEnabled()) { diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java new file mode 100644 index 0000000000..543ea0c7ad --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java @@ -0,0 +1,50 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.handler; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.MessageAppendBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateAwareMethodListener; + +public class MessageAppendMethodHandler implements StateAwareMethodListener +{ + private static MessageAppendMethodHandler _instance = new MessageAppendMethodHandler(); + + public static MessageAppendMethodHandler getInstance() + { + return _instance; + } + + private MessageAppendMethodHandler() {} + + + public void methodReceived (AMQStateManager stateManager, + AMQProtocolSession protocolSession, + AMQMethodEvent<MessageAppendBody> evt) + throws AMQException + { + // TODO + } +} + diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageCheckpointMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageCheckpointMethodHandler.java new file mode 100644 index 0000000000..f809dbc4c1 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageCheckpointMethodHandler.java @@ -0,0 +1,50 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.handler; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.MessageCheckpointBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateAwareMethodListener; + +public class MessageCheckpointMethodHandler implements StateAwareMethodListener +{ + private static MessageCheckpointMethodHandler _instance = new MessageCheckpointMethodHandler(); + + public static MessageCheckpointMethodHandler getInstance() + { + return _instance; + } + + private MessageCheckpointMethodHandler() {} + + + public void methodReceived (AMQStateManager stateManager, + AMQProtocolSession protocolSession, + AMQMethodEvent<MessageCheckpointBody> evt) + throws AMQException + { + // TODO + } +} + diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java index 6e12899204..cd04e89f6e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java @@ -18,33 +18,33 @@ * under the License. * */ -package org.apache.qpid.client.handler; +package org.apache.qpid.server.handler; -import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicDeliverBody; +import org.apache.qpid.framing.MessageCloseBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; -import org.apache.qpid.client.protocol.AMQMethodEvent; -import org.apache.qpid.client.message.UnprocessedMessage; -public class BasicDeliverMethodHandler implements StateAwareMethodListener +public class MessageCloseMethodHandler implements StateAwareMethodListener { - private static final Logger _logger = Logger.getLogger(BasicDeliverMethodHandler.class); + private static MessageCloseMethodHandler _instance = new MessageCloseMethodHandler(); - private static final BasicDeliverMethodHandler _instance = new BasicDeliverMethodHandler(); - - public static BasicDeliverMethodHandler getInstance() + public static MessageCloseMethodHandler getInstance() { return _instance; } - public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + private MessageCloseMethodHandler() {} + + + public void methodReceived (AMQStateManager stateManager, + AMQProtocolSession protocolSession, + AMQMethodEvent<MessageCloseBody> evt) + throws AMQException { - final UnprocessedMessage msg = new UnprocessedMessage(); - msg.deliverBody = (BasicDeliverBody) evt.getMethod(); - msg.channelId = evt.getChannelId(); - _logger.debug("New JmsDeliver method received"); - evt.getProtocolSession().unprocessedMessageReceived(msg); + // TODO } } + diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageEmptyMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageEmptyMethodHandler.java new file mode 100644 index 0000000000..76534b285f --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageEmptyMethodHandler.java @@ -0,0 +1,50 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.handler; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.MessageEmptyBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateAwareMethodListener; + +public class MessageEmptyMethodHandler implements StateAwareMethodListener +{ + private static MessageEmptyMethodHandler _instance = new MessageEmptyMethodHandler(); + + public static MessageEmptyMethodHandler getInstance() + { + return _instance; + } + + private MessageEmptyMethodHandler() {} + + + public void methodReceived (AMQStateManager stateManager, + AMQProtocolSession protocolSession, + AMQMethodEvent<MessageEmptyBody> evt) + throws AMQException + { + // TODO + } +} + diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageOffsetMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageOffsetMethodHandler.java new file mode 100644 index 0000000000..80e5412049 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageOffsetMethodHandler.java @@ -0,0 +1,50 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.handler; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.MessageOffsetBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateAwareMethodListener; + +public class MessageOffsetMethodHandler implements StateAwareMethodListener +{ + private static MessageOffsetMethodHandler _instance = new MessageOffsetMethodHandler(); + + public static MessageOffsetMethodHandler getInstance() + { + return _instance; + } + + private MessageOffsetMethodHandler() {} + + + public void methodReceived (AMQStateManager stateManager, + AMQProtocolSession protocolSession, + AMQMethodEvent<MessageOffsetBody> evt) + throws AMQException + { + // TODO + } +} + diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageOkMethodHandler.java new file mode 100644 index 0000000000..6d1d94de67 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageOkMethodHandler.java @@ -0,0 +1,50 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.handler; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.MessageOkBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateAwareMethodListener; + +public class MessageOkMethodHandler implements StateAwareMethodListener +{ + private static MessageOkMethodHandler _instance = new MessageOkMethodHandler(); + + public static MessageOkMethodHandler getInstance() + { + return _instance; + } + + private MessageOkMethodHandler() {} + + + public void methodReceived (AMQStateManager stateManager, + AMQProtocolSession protocolSession, + AMQMethodEvent<MessageOkBody> evt) + throws AMQException + { + // TODO + } +} + diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java index 8785a96396..1535b16563 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java @@ -18,35 +18,33 @@ * under the License. * */ -package org.apache.qpid.client.handler; +package org.apache.qpid.server.handler; -import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.client.message.UnprocessedMessage; -import org.apache.qpid.client.protocol.AMQMethodEvent; +import org.apache.qpid.framing.MessageOpenBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; -import org.apache.qpid.framing.BasicReturnBody; -public class BasicReturnMethodHandler implements StateAwareMethodListener +public class MessageOpenMethodHandler implements StateAwareMethodListener { - private static final Logger _logger = Logger.getLogger(BasicReturnMethodHandler.class); + private static MessageOpenMethodHandler _instance = new MessageOpenMethodHandler(); - private static final BasicReturnMethodHandler _instance = new BasicReturnMethodHandler(); - - public static BasicReturnMethodHandler getInstance() + public static MessageOpenMethodHandler getInstance() { return _instance; } - public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + private MessageOpenMethodHandler() {} + + + public void methodReceived (AMQStateManager stateManager, + AMQProtocolSession protocolSession, + AMQMethodEvent<MessageOpenBody> evt) + throws AMQException { - _logger.debug("New JmsBounce method received"); - final UnprocessedMessage msg = new UnprocessedMessage(); - msg.deliverBody = null; - msg.bounceBody = (BasicReturnBody) evt.getMethod(); - msg.channelId = evt.getChannelId(); - - evt.getProtocolSession().unprocessedMessageReceived(msg); + // TODO } -}
\ No newline at end of file +} + diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageRejectMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageRejectMethodHandler.java new file mode 100644 index 0000000000..bbce2ae1ee --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageRejectMethodHandler.java @@ -0,0 +1,50 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.handler; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.MessageRejectBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateAwareMethodListener; + +public class MessageRejectMethodHandler implements StateAwareMethodListener +{ + private static MessageRejectMethodHandler _instance = new MessageRejectMethodHandler(); + + public static MessageRejectMethodHandler getInstance() + { + return _instance; + } + + private MessageRejectMethodHandler() {} + + + public void methodReceived (AMQStateManager stateManager, + AMQProtocolSession protocolSession, + AMQMethodEvent<MessageRejectBody> evt) + throws AMQException + { + // TODO + } +} + diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageResumeMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageResumeMethodHandler.java new file mode 100644 index 0000000000..810ed25029 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageResumeMethodHandler.java @@ -0,0 +1,50 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.handler; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.MessageResumeBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateAwareMethodListener; + +public class MessageResumeMethodHandler implements StateAwareMethodListener +{ + private static MessageResumeMethodHandler _instance = new MessageResumeMethodHandler(); + + public static MessageResumeMethodHandler getInstance() + { + return _instance; + } + + private MessageResumeMethodHandler() {} + + + public void methodReceived (AMQStateManager stateManager, + AMQProtocolSession protocolSession, + AMQMethodEvent<MessageResumeBody> evt) + throws AMQException + { + // TODO + } +} + diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java new file mode 100644 index 0000000000..6a21bed7b5 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java @@ -0,0 +1,50 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.handler; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.MessageTransferBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateAwareMethodListener; + +public class MessageTransferMethodHandler implements StateAwareMethodListener +{ + private static MessageTransferMethodHandler _instance = new MessageTransferMethodHandler(); + + public static MessageTransferMethodHandler getInstance() + { + return _instance; + } + + private MessageTransferMethodHandler() {} + + + public void methodReceived (AMQStateManager stateManager, + AMQProtocolSession protocolSession, + AMQMethodEvent<MessageTransferBody> evt) + throws AMQException + { + // TODO + } +} + diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java index 3271a715a2..b8b75accb4 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java @@ -17,11 +17,12 @@ */ package org.apache.qpid.client.handler; +import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.client.state.AMQStateManager; -import org.apache.qpid.client.protocol.AMQMethodEvent; import org.apache.qpid.AMQException; import org.apache.qpid.framing.QueueDeleteOkBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.log4j.Logger; /** @@ -41,7 +42,7 @@ public class QueueDeleteOkMethodHandler implements StateAwareMethodListener { } - public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException { if (_logger.isDebugEnabled()) { diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodEvent.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodEvent.java deleted file mode 100644 index 403aa3486e..0000000000 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodEvent.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.protocol; - -import org.apache.qpid.framing.AMQMethodBody; - -public class AMQMethodEvent -{ - private AMQMethodBody _method; - - private int _channelId; - - private AMQProtocolSession _protocolSession; - - public AMQMethodEvent(int channelId, AMQMethodBody method, AMQProtocolSession protocolSession) - { - _channelId = channelId; - _method = method; - _protocolSession = protocolSession; - } - - public AMQMethodBody getMethod() - { - return _method; - } - - public int getChannelId() - { - return _channelId; - } - - public AMQProtocolSession getProtocolSession() - { - return _protocolSession; - } - - public String toString() - { - StringBuffer buf = new StringBuffer("Method event: "); - buf.append("\nChannel id: ").append(_channelId); - buf.append("\nMethod: ").append(_method); - return buf.toString(); - } -} diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodListener.java index 68299033a5..2cbd8f0e32 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodListener.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodListener.java @@ -21,6 +21,8 @@ package org.apache.qpid.client.protocol; import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.client.protocol.AMQProtocolSession; public interface AMQMethodListener { @@ -34,7 +36,7 @@ public interface AMQMethodListener * to all registered listeners using the error() method (see below) allowing them to * perform cleanup if necessary. */ - boolean methodReceived(AMQMethodEvent evt) throws AMQException; + boolean methodReceived(AMQMethodEvent evt, AMQProtocolSession protocolSession) throws AMQException; /** * Callback when an error has occurred. Allows listeners to clean up. diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 68797edc77..a0399e1450 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -46,6 +46,7 @@ import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.HeartbeatBody; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.ssl.BogusSSLContextFactory; import java.util.Iterator; @@ -313,14 +314,14 @@ public class AMQProtocolHandler extends IoHandlerAdapter _logger.debug("Method frame received: " + frame); } - final AMQMethodEvent evt = new AMQMethodEvent(frame.channel, (AMQMethodBody) frame.bodyFrame, _protocolSession); + final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.channel, (AMQMethodBody) frame.bodyFrame); try { boolean wasAnyoneInterested = false; while (it.hasNext()) { final AMQMethodListener listener = (AMQMethodListener) it.next(); - wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; + wasAnyoneInterested = listener.methodReceived(evt, _protocolSession) || wasAnyoneInterested; } if (!wasAnyoneInterested) { diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java index 21ae3fc71f..4fff4fab00 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java @@ -22,6 +22,8 @@ package org.apache.qpid.client.protocol; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.client.protocol.AMQProtocolSession; public abstract class BlockingMethodFrameListener implements AMQMethodListener { @@ -53,7 +55,7 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener * @return true if the listener has dealt with this frame * @throws AMQException */ - public boolean methodReceived(AMQMethodEvent evt) throws AMQException + public boolean methodReceived(AMQMethodEvent evt, AMQProtocolSession protocolSession) throws AMQException { AMQMethodBody method = evt.getMethod(); diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index 50bd1667f9..0d9d70b244 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -21,9 +21,10 @@ package org.apache.qpid.client.state; import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.client.handler.*; -import org.apache.qpid.client.protocol.AMQMethodEvent; import org.apache.qpid.client.protocol.AMQMethodListener; +import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.framing.*; import org.apache.log4j.Logger; @@ -101,9 +102,16 @@ public class AMQStateManager implements AMQMethodListener frame2handlerMap.put(ChannelCloseBody.class, ChannelCloseMethodHandler.getInstance()); frame2handlerMap.put(ChannelCloseOkBody.class, ChannelCloseOkMethodHandler.getInstance()); frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance()); - frame2handlerMap.put(BasicDeliverBody.class, BasicDeliverMethodHandler.getInstance()); - frame2handlerMap.put(BasicReturnBody.class, BasicReturnMethodHandler.getInstance()); - frame2handlerMap.put(BasicCancelOkBody.class, BasicCancelOkMethodHandler.getInstance()); + frame2handlerMap.put(MessageAppendBody.class, MessageAppendMethodHandler.getInstance()); + frame2handlerMap.put(MessageCheckpointBody.class, MessageCheckpointMethodHandler.getInstance()); + frame2handlerMap.put(MessageCloseBody.class, MessageCloseMethodHandler.getInstance()); + frame2handlerMap.put(MessageEmptyBody.class, MessageEmptyMethodHandler.getInstance()); + frame2handlerMap.put(MessageOffsetBody.class, MessageOffsetMethodHandler.getInstance()); + frame2handlerMap.put(MessageOkBody.class, MessageOkMethodHandler.getInstance()); + frame2handlerMap.put(MessageOpenBody.class, MessageOpenMethodHandler.getInstance()); + frame2handlerMap.put(MessageRejectBody.class, MessageRejectMethodHandler.getInstance()); + frame2handlerMap.put(MessageResumeBody.class, MessageResumeMethodHandler.getInstance()); + frame2handlerMap.put(MessageTransferBody.class, MessageTransferMethodHandler.getInstance()); frame2handlerMap.put(ChannelFlowOkBody.class, ChannelFlowOkMethodHandler.getInstance()); frame2handlerMap.put(QueueDeleteOkBody.class, QueueDeleteOkMethodHandler.getInstance()); frame2handlerMap.put(ExchangeBoundOkBody.class, ExchangeBoundOkMethodHandler.getInstance()); @@ -146,12 +154,12 @@ public class AMQStateManager implements AMQMethodListener } } - public boolean methodReceived(AMQMethodEvent evt) throws AMQException + public boolean methodReceived(AMQMethodEvent evt, AMQProtocolSession protocolSession) throws AMQException { StateAwareMethodListener handler = findStateTransitionHandler(_currentState, evt.getMethod()); if (handler != null) { - handler.methodReceived(this, evt); + handler.methodReceived(this, protocolSession, evt); return true; } return false; diff --git a/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java b/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java index 37a0f9f376..f2c8916868 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java @@ -20,7 +20,8 @@ */ package org.apache.qpid.client.state; -import org.apache.qpid.client.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.AMQException; /** @@ -30,5 +31,6 @@ import org.apache.qpid.AMQException; */ public interface StateAwareMethodListener { - void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException; + void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, + AMQMethodEvent evt) throws AMQException; } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java index 7a01995abb..16dd5d8fa7 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java @@ -23,10 +23,10 @@ package org.apache.qpid.server.cluster; import org.apache.mina.common.IoSession; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.protocol.AMQMethodEvent; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.protocol.AMQMethodEvent; /** * Hack to assist with reuse of the client handlers for connection setup in @@ -51,8 +51,8 @@ class ClientAdapter implements MethodHandler public void handle(int channel, AMQMethodBody method) throws AMQException { - AMQMethodEvent evt = new AMQMethodEvent(channel, method, _session); - _stateMgr.methodReceived(evt); + AMQMethodEvent evt = new AMQMethodEvent(channel, method); + _stateMgr.methodReceived(evt, _session); } private class SessionAdapter extends AMQProtocolSession diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java index 8e7fb1ff49..c4107a435b 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java @@ -24,7 +24,7 @@ import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQMethodBody; diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java index 022ee098ab..8e6d133600 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java @@ -25,7 +25,7 @@ import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.cluster.util.LogMessage; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQMethodBody; diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java index 5944d99a14..260f27fa82 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java @@ -21,12 +21,12 @@ package org.apache.qpid.server.cluster.handler; import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.cluster.ClusteredProtocolSession; import org.apache.qpid.AMQException; diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java index 46ba3e5015..97174d782c 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java @@ -49,6 +49,7 @@ import org.apache.qpid.framing.BasicQosBody; import org.apache.qpid.framing.TxSelectBody; import org.apache.qpid.framing.TxCommitBody; import org.apache.qpid.framing.TxRollbackBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.cluster.ClusterCapability; import org.apache.qpid.server.cluster.ClusteredProtocolSession; import org.apache.qpid.server.cluster.GroupManager; @@ -76,7 +77,6 @@ import org.apache.qpid.server.handler.BasicQosHandler; import org.apache.qpid.server.handler.TxSelectHandler; import org.apache.qpid.server.handler.TxCommitHandler; import org.apache.qpid.server.handler.TxRollbackHandler; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQState; diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java index 5b2c6f4a9a..8250d57d98 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java @@ -22,8 +22,8 @@ package org.apache.qpid.server.cluster.handler; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java index 2cd0989f10..5cd94f12d1 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java @@ -22,8 +22,8 @@ package org.apache.qpid.server.cluster.handler; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java index 00f37951f2..eaf80338d2 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java @@ -22,9 +22,9 @@ package org.apache.qpid.server.cluster.handler; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.cluster.ClusteredProtocolSession; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java index 3e8528f533..3bf40fbaee 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java @@ -22,8 +22,8 @@ package org.apache.qpid.server.cluster.handler; import org.apache.qpid.AMQException; import org.apache.qpid.framing.QueueDeclareBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java index 729a38c970..e9f039ed09 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java @@ -23,9 +23,9 @@ package org.apache.qpid.server.cluster.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicCancelBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.cluster.ClusteredProtocolSession; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.ClusteredQueue; diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java index 4d3f1261b2..693c70b780 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java @@ -24,9 +24,9 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicConsumeBody; import org.apache.qpid.framing.BasicConsumeOkBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.cluster.ClusteredProtocolSession; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.ClusteredQueue; diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java index 03c644889e..ea42db3ded 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java @@ -22,12 +22,12 @@ package org.apache.qpid.server.cluster.handler; import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicConsumeBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.cluster.BroadcastPolicy; import org.apache.qpid.server.cluster.GroupManager; import org.apache.qpid.server.cluster.util.LogMessage; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.handler.BasicConsumeMethodHandler; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java index db340c6a61..e68ae9eb3d 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.cluster.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.cluster.BroadcastPolicy; import org.apache.qpid.server.cluster.ClusteredProtocolSession; import org.apache.qpid.server.cluster.GroupManager; @@ -32,7 +33,6 @@ import org.apache.qpid.server.cluster.Member; import org.apache.qpid.server.cluster.SimpleSendable; import org.apache.qpid.server.cluster.policy.StandardPolicies; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java index d46913d042..fd38ea6153 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java @@ -22,8 +22,8 @@ package org.apache.qpid.server.cluster.handler; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java index a1058c6ff6..c927c0cebf 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java @@ -29,11 +29,11 @@ import org.apache.qpid.framing.ExchangeDeleteBody; import org.apache.qpid.framing.QueueBindBody; import org.apache.qpid.framing.QueueDeclareBody; import org.apache.qpid.framing.QueueDeleteBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.cluster.MethodHandlerFactory; import org.apache.qpid.server.cluster.MethodHandlerRegistry; import org.apache.qpid.server.cluster.handler.WrappingMethodHandlerFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQState; diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java index cf931e8306..ac4879561c 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java @@ -31,11 +31,11 @@ import org.apache.qpid.framing.QueueDeleteBody; import org.apache.qpid.framing.ClusterSynchBody; import org.apache.qpid.framing.BasicConsumeBody; import org.apache.qpid.framing.BasicCancelBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.cluster.ClusteredProtocolSession; import org.apache.qpid.server.cluster.util.LogMessage; import org.apache.qpid.server.cluster.util.Bindings; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java index af43ab6474..00a27a8869 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java @@ -33,11 +33,11 @@ public class AMQRequestBody extends AMQBody // Constructor public AMQRequestBody() {} public AMQRequestBody(long requestId, long responseMark, - AMQMethodBody methodPayload) + AMQMethodBody methodPayload) { - this.requestId = requestId; - this.responseMark = responseMark; - this.methodPayload = methodPayload; + this.requestId = requestId; + this.responseMark = responseMark; + this.methodPayload = methodPayload; } @@ -49,42 +49,42 @@ public class AMQRequestBody extends AMQBody protected byte getFrameType() { - return (byte)AmqpConstants.frameRequestAsInt(); + return (byte)AmqpConstants.frameRequestAsInt(); } protected int getSize() { - return 8 + 8 + 4 + methodPayload.getBodySize(); + return 8 + 8 + 4 + methodPayload.getBodySize(); } protected void writePayload(ByteBuffer buffer) { - EncodingUtils.writeLong(buffer, requestId); - EncodingUtils.writeLong(buffer, responseMark); - EncodingUtils.writeUnsignedShort(buffer, 0); // reserved, set to 0 + EncodingUtils.writeLong(buffer, requestId); + EncodingUtils.writeLong(buffer, responseMark); + EncodingUtils.writeUnsignedShort(buffer, 0); // reserved, set to 0 methodPayload.writePayload(buffer); } protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException, AMQProtocolVersionException { - requestId = EncodingUtils.readLong(buffer); - responseMark = EncodingUtils.readLong(buffer); - int reserved = EncodingUtils.readShort(buffer); // reserved, throw away - methodPayload.populateFromBuffer(buffer, size - 8 - 8 - 4); + requestId = EncodingUtils.readLong(buffer); + responseMark = EncodingUtils.readLong(buffer); + int reserved = EncodingUtils.readShort(buffer); // reserved, throw away + methodPayload.populateFromBuffer(buffer, size - 8 - 8 - 4); } public String toString() { - return "Req[" + requestId + " " + responseMark + "] C" + - methodPayload.getClazz() + " M" + methodPayload.getMethod(); + return "Req[" + requestId + " " + responseMark + "] C" + + methodPayload.getClazz() + " M" + methodPayload.getMethod(); } public static AMQFrame createAMQFrame(int channelId, long requestId, long responseMark, AMQMethodBody methodPayload) { AMQRequestBody requestFrame = new AMQRequestBody(requestId, responseMark, - methodPayload); + methodPayload); AMQFrame frame = new AMQFrame(); frame.channel = channelId; frame.bodyFrame = requestFrame; diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java index 67fc485f48..90038da2d4 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java @@ -33,9 +33,9 @@ public class AMQResponseBody extends AMQBody // Constructor public AMQResponseBody() {} public AMQResponseBody(long getResponseId, long getRequestId, - int batchOffset, AMQMethodBody methodPayload) + int batchOffset, AMQMethodBody methodPayload) { - this.responseId = responseId; + this.responseId = responseId; this.requestId = requestId; this.batchOffset = batchOffset; this.methodPayload = methodPayload; @@ -49,12 +49,12 @@ public class AMQResponseBody extends AMQBody protected byte getFrameType() { - return (byte)AmqpConstants.frameResponseAsInt(); + return (byte)AmqpConstants.frameResponseAsInt(); } protected int getSize() { - return 8 + 8 + 4 + methodPayload.getBodySize(); + return 8 + 8 + 4 + methodPayload.getBodySize(); } protected void writePayload(ByteBuffer buffer) @@ -76,15 +76,15 @@ public class AMQResponseBody extends AMQBody public String toString() { - return "Res[" + responseId + " " + requestId + "-" + requestId + batchOffset + "] C" + - methodPayload.getClazz() + " M" + methodPayload.getMethod(); + return "Res[" + responseId + " " + requestId + "-" + requestId + batchOffset + "] C" + + methodPayload.getClazz() + " M" + methodPayload.getMethod(); } public static AMQFrame createAMQFrame(int channelId, long responseId, long requestId, int batchOffset, AMQMethodBody methodPayload) { AMQResponseBody responseFrame = new AMQResponseBody(responseId, - requestId, batchOffset, methodPayload); + requestId, batchOffset, methodPayload); AMQFrame frame = new AMQFrame(); frame.channel = channelId; frame.bodyFrame = responseFrame; diff --git a/java/common/src/main/java/org/apache/qpid/framing/RequestHandler.java b/java/common/src/main/java/org/apache/qpid/framing/RequestHandler.java new file mode 100644 index 0000000000..f779258e00 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/RequestHandler.java @@ -0,0 +1,26 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +public interface RequestHandler +{ + public boolean requestReceived(AMQRequestBody requestBody); +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java index e673a8e343..55c25151da 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java +++ b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java @@ -26,7 +26,7 @@ import org.apache.qpid.protocol.AMQProtocolWriter; public class RequestManager { - private int channel; + private int channel; AMQProtocolWriter protocolSession; /** @@ -39,60 +39,60 @@ public class RequestManager /** * These keep track of the last requestId and responseId to be received. */ - private long lastReceivedResponseId; + private long lastProcessedResponseId; private Hashtable<Long, AMQResponseCallback> requestSentMap; - public RequestManager(int channel, AMQProtocolWriter protocolSession) + public RequestManager(int channel, AMQProtocolWriter protocolSession) { - this.channel = channel; + this.channel = channel; this.protocolSession = protocolSession; - requestIdCount = 1L; - lastReceivedResponseId = 0L; + requestIdCount = 1L; + lastProcessedResponseId = 0L; requestSentMap = new Hashtable<Long, AMQResponseCallback>(); } // *** Functions to originate a request *** public long sendRequest(AMQMethodBody requestMethodBody, - AMQResponseCallback responseCallback) + AMQResponseCallback responseCallback) { - long requestId = getNextRequestId(); // Get new request ID - AMQFrame requestFrame = AMQRequestBody.createAMQFrame(channel, requestId, - lastReceivedResponseId, requestMethodBody); + long requestId = getNextRequestId(); // Get new request ID + AMQFrame requestFrame = AMQRequestBody.createAMQFrame(channel, requestId, + lastProcessedResponseId, requestMethodBody); protocolSession.writeFrame(requestFrame); requestSentMap.put(requestId, responseCallback); return requestId; } public void responseReceived(AMQResponseBody responseBody) - throws RequestResponseMappingException + throws RequestResponseMappingException { - lastReceivedResponseId = responseBody.getResponseId(); long requestIdStart = responseBody.getRequestId(); long requestIdStop = requestIdStart + responseBody.getBatchOffset(); for (long requestId = requestIdStart; requestId <= requestIdStop; requestId++) { - AMQResponseCallback responseCallback = requestSentMap.get(requestId); + AMQResponseCallback responseCallback = requestSentMap.get(requestId); if (responseCallback == null) - throw new RequestResponseMappingException(requestId, - "Failed to locate requestId " + requestId + " in requestSentMap."); + throw new RequestResponseMappingException(requestId, + "Failed to locate requestId " + requestId + " in requestSentMap."); responseCallback.responseFrameReceived(responseBody); requestSentMap.remove(requestId); } + lastProcessedResponseId = responseBody.getResponseId(); } // *** Management functions *** public int requestsMapSize() { - return requestSentMap.size(); + return requestSentMap.size(); } // *** Private helper functions *** private long getNextRequestId() { - return requestIdCount++; + return requestIdCount++; } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java index a895464a1f..280d8d562a 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java @@ -28,10 +28,11 @@ import org.apache.qpid.protocol.AMQProtocolWriter; public class ResponseManager { - private int channel; + private int channel; + RequestHandler requestHandler; AMQProtocolWriter protocolSession; - /** + /** * Determines the batch behaviour of the manager. * * Responses are sent to the RequestResponseManager through sendResponse(). @@ -48,7 +49,7 @@ public class ResponseManager * MANUAL: No response is sent until it is explicitly released by calling * function xxxx(). (TODO) */ - public enum batchResponseModeEnum { NONE } + public enum batchResponseModeEnum { NONE } private batchResponseModeEnum batchResponseMode; /** @@ -70,79 +71,79 @@ public class ResponseManager private class ResponseStatus implements Comparable<ResponseStatus> { - public long requestId; + public long requestId; public AMQMethodBody responseMethodBody; public ResponseStatus(long requestId) { - this.requestId = requestId; - responseMethodBody = null; + this.requestId = requestId; + responseMethodBody = null; } public int compareTo(ResponseStatus o) { - return (int)(requestId - o.requestId); + return (int)(requestId - o.requestId); } } private Hashtable<Long, ResponseStatus> responseMap; - public ResponseManager(int channel, AMQProtocolWriter protocolSession) + public ResponseManager(int channel, RequestHandler requestHandler, + AMQProtocolWriter protocolSession) { - this.channel = channel; + this.channel = channel; + this.requestHandler = requestHandler; this.protocolSession = protocolSession; responseIdCount = 1L; lastReceivedRequestId = 0L; responseMap = new Hashtable<Long, ResponseStatus>(); } - // *** Functions to handle an incoming request *** + // *** Functions to handle an incoming request *** public void requestReceived(AMQRequestBody requestBody) { - long requestId = requestBody.getRequestId(); + long requestId = requestBody.getRequestId(); // TODO: responseMark is used in HA, but until then, ignore... long responseMark = requestBody.getResponseMark(); - lastReceivedRequestId = requestId; + lastReceivedRequestId = requestId; responseMap.put(requestId, new ResponseStatus(requestId)); - - // TODO: Initiate some action based on the MethodBody - like send to handlers, - // but how to do this in a way that will work for both client and server? + requestHandler.requestReceived(requestBody); } public void sendResponse(long requestId, AMQMethodBody responseMethodBody) - throws RequestResponseMappingException + throws RequestResponseMappingException { - ResponseStatus responseStatus = responseMap.get(requestId); + ResponseStatus responseStatus = responseMap.get(requestId); if (responseStatus == null) - throw new RequestResponseMappingException(requestId, - "Failed to locate requestId " + requestId + " in responseMap."); + throw new RequestResponseMappingException(requestId, + "Failed to locate requestId " + requestId + " in responseMap."); if (responseStatus.responseMethodBody != null) - throw new RequestResponseMappingException(requestId, "RequestId " + - requestId + " already has a response in responseMap."); + throw new RequestResponseMappingException(requestId, "RequestId " + + requestId + " already has a response in responseMap."); responseStatus.responseMethodBody = responseMethodBody; doBatches(); } // *** Management functions *** - public batchResponseModeEnum getBatchResponseMode() + public batchResponseModeEnum getBatchResponseMode() { - return batchResponseMode; + return batchResponseMode; } public void setBatchResponseMode(batchResponseModeEnum batchResponseMode) { - if (this.batchResponseMode != batchResponseMode) + if (this.batchResponseMode != batchResponseMode) { - this.batchResponseMode = batchResponseMode; - doBatches(); + this.batchResponseMode = batchResponseMode; + doBatches(); } } public int responsesMapSize() { - return responseMap.size(); + return responseMap.size(); } /** @@ -153,12 +154,12 @@ public class ResponseManager */ public int outstandingResponses() { - int cnt = 0; + int cnt = 0; for (Long requestId : responseMap.keySet()) { - if (responseMap.get(requestId).responseMethodBody == null) - cnt++; - } + if (responseMap.get(requestId).responseMethodBody == null) + cnt++; + } return cnt; } @@ -170,12 +171,12 @@ public class ResponseManager */ public int batchedResponses() { - int cnt = 0; + int cnt = 0; for (Long requestId : responseMap.keySet()) { - if (responseMap.get(requestId).responseMethodBody != null) - cnt++; - } + if (responseMap.get(requestId).responseMethodBody != null) + cnt++; + } return cnt; } @@ -183,39 +184,39 @@ public class ResponseManager private long getNextResponseId() { - return responseIdCount++; + return responseIdCount++; } private void doBatches() { - switch (batchResponseMode) + switch (batchResponseMode) { - case NONE: - Iterator<Long> lItr = responseMap.keySet().iterator(); - while (lItr.hasNext()) + case NONE: + Iterator<Long> lItr = responseMap.keySet().iterator(); + while (lItr.hasNext()) { - long requestId = lItr.next(); - ResponseStatus responseStatus = responseMap.get(requestId); - if (responseStatus.responseMethodBody != null) + long requestId = lItr.next(); + ResponseStatus responseStatus = responseMap.get(requestId); + if (responseStatus.responseMethodBody != null) { - sendResponseBatch(requestId, 0, responseStatus.responseMethodBody); + sendResponseBatch(requestId, 0, responseStatus.responseMethodBody); lItr.remove(); } } - break; + break; // TODO: Add additional batch mode handlers here... - // case DELAY_FIXED: - // case MANUAL: + // case DELAY_FIXED: + // case MANUAL: } } private void sendResponseBatch(long firstRequestId, int numAdditionalRequests, - AMQMethodBody responseMethodBody) + AMQMethodBody responseMethodBody) { - long responseId = getNextResponseId(); // Get new request ID - AMQFrame responseFrame = AMQResponseBody.createAMQFrame(channel, responseId, - firstRequestId, numAdditionalRequests, responseMethodBody); + long responseId = getNextResponseId(); // Get new request ID + AMQFrame responseFrame = AMQResponseBody.createAMQFrame(channel, responseId, + firstRequestId, numAdditionalRequests, responseMethodBody); protocolSession.writeFrame(responseFrame); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodEvent.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java index 3d12828900..ab36041cb8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodEvent.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpid.server.protocol; +package org.apache.qpid.protocol; import org.apache.qpid.framing.AMQMethodBody; |