summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/handler
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/handler')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java77
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java67
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java74
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java173
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java206
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java96
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java58
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java73
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java83
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java125
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java77
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java53
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java66
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java141
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java72
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java63
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java101
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java126
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java162
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java58
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java178
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java108
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java71
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/OnCurrentThreadExecutor.java34
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java159
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java254
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java125
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java123
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java123
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl.java574
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_9.java164
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_91.java168
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_8_0.java86
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java76
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java82
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java63
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/UnexpectedMethodException.java36
37 files changed, 4375 insertions, 0 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java
new file mode 100644
index 0000000000..d587ef0c16
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java
@@ -0,0 +1,77 @@
+package org.apache.qpid.server.handler;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
+import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+
+/**
+ * @author Apache Software Foundation
+ *
+ *
+ */
+public class AccessRequestHandler implements StateAwareMethodListener<AccessRequestBody>
+{
+ private static final AccessRequestHandler _instance = new AccessRequestHandler();
+
+
+ public static AccessRequestHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private AccessRequestHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, AccessRequestBody body, int channelId) throws AMQException
+ {
+ AMQProtocolSession session = stateManager.getProtocolSession();
+
+ MethodRegistry methodRegistry = session.getMethodRegistry();
+
+ // We don't implement access control class, but to keep clients happy that expect it
+ // always use the "0" ticket.
+ AccessRequestOkBody response;
+ if(methodRegistry instanceof MethodRegistry_0_9)
+ {
+ response = ((MethodRegistry_0_9)methodRegistry).createAccessRequestOkBody(0);
+ }
+ else if(methodRegistry instanceof MethodRegistry_8_0)
+ {
+ response = ((MethodRegistry_8_0)methodRegistry).createAccessRequestOkBody(0);
+ }
+ else
+ {
+ throw new AMQException(AMQConstant.COMMAND_INVALID, "AccessRequest not present in AMQP versions other than 0-8, 0-9");
+ }
+
+
+ session.writeFrame(response.generateFrame(channelId));
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java
new file mode 100644
index 0000000000..f90e7c3dff
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java
@@ -0,0 +1,67 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicAckBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+
+public class BasicAckMethodHandler implements StateAwareMethodListener<BasicAckBody>
+{
+ private static final Logger _log = Logger.getLogger(BasicAckMethodHandler.class);
+
+ private static final BasicAckMethodHandler _instance = new BasicAckMethodHandler();
+
+ public static BasicAckMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private BasicAckMethodHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, BasicAckBody body, int channelId) throws AMQException
+ {
+ AMQProtocolSession protocolSession = stateManager.getProtocolSession();
+
+
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Ack(Tag:" + body.getDeliveryTag() + ":Mult:" + body.getMultiple() + ") received on channel " + channelId);
+ }
+
+ final AMQChannel channel = protocolSession.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
+
+ // this method throws an AMQException if the delivery tag is not known
+ channel.acknowledgeMessage(body.getDeliveryTag(), body.getMultiple());
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
new file mode 100644
index 0000000000..29054f55c1
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
@@ -0,0 +1,74 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicCancelBody;
+import org.apache.qpid.framing.BasicCancelOkBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.log4j.Logger;
+
+public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicCancelBody>
+{
+ private static final Logger _log = Logger.getLogger(BasicCancelMethodHandler.class);
+
+ private static final BasicCancelMethodHandler _instance = new BasicCancelMethodHandler();
+
+ public static BasicCancelMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private BasicCancelMethodHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, BasicCancelBody body, int channelId) throws AMQException
+ {
+ AMQProtocolSession session = stateManager.getProtocolSession();
+
+ final AMQChannel channel = session.getChannel(channelId);
+
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
+
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("BasicCancel: for:" + body.getConsumerTag() +
+ " nowait:" + body.getNowait());
+ }
+
+ channel.unsubscribeConsumer(body.getConsumerTag());
+ if (!body.getNowait())
+ {
+ MethodRegistry methodRegistry = session.getMethodRegistry();
+ BasicCancelOkBody cancelOkBody = methodRegistry.createBasicCancelOkBody(body.getConsumerTag());
+ session.writeFrame(cancelOkBody.generateFrame(channelId));
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
new file mode 100644
index 0000000000..a5999711bc
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
@@ -0,0 +1,173 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class BasicConsumeMethodHandler implements StateAwareMethodListener<BasicConsumeBody>
+{
+ private static final Logger _logger = Logger.getLogger(BasicConsumeMethodHandler.class);
+
+ private static final BasicConsumeMethodHandler _instance = new BasicConsumeMethodHandler();
+
+ public static BasicConsumeMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private BasicConsumeMethodHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, BasicConsumeBody body, int channelId) throws AMQException
+ {
+ AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
+
+ AMQChannel channel = protocolConnection.getChannel(channelId);
+
+ VirtualHost vHost = protocolConnection.getVirtualHost();
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
+ else
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("BasicConsume: from '" + body.getQueue() +
+ "' for:" + body.getConsumerTag() +
+ " nowait:" + body.getNowait() +
+ " args:" + body.getArguments());
+ }
+
+ AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.getQueue().intern());
+
+ if (queue == null)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("No queue for '" + body.getQueue() + "'");
+ }
+ if (body.getQueue() != null)
+ {
+ String msg = "No such queue, '" + body.getQueue() + "'";
+ throw body.getChannelException(AMQConstant.NOT_FOUND, msg);
+ }
+ else
+ {
+ String msg = "No queue name provided, no default queue defined.";
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED, msg);
+ }
+ }
+ else
+ {
+ final AMQShortString consumerTagName;
+
+ if (queue.isExclusive() && !queue.isDurable())
+ {
+ AMQSessionModel session = queue.getExclusiveOwningSession();
+ if (session == null || session.getConnectionModel() != protocolConnection)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection.");
+ }
+ }
+
+ if (body.getConsumerTag() != null)
+ {
+ consumerTagName = body.getConsumerTag().intern();
+ }
+ else
+ {
+ consumerTagName = null;
+ }
+
+ try
+ {
+ if(consumerTagName == null || channel.getSubscription(consumerTagName) == null)
+ {
+
+ AMQShortString consumerTag = channel.subscribeToQueue(consumerTagName, queue, !body.getNoAck(),
+ body.getArguments(), body.getNoLocal(), body.getExclusive());
+ if (!body.getNowait())
+ {
+ MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createBasicConsumeOkBody(consumerTag);
+ protocolConnection.writeFrame(responseBody.generateFrame(channelId));
+
+ }
+ }
+ else
+ {
+ AMQShortString msg = new AMQShortString("Non-unique consumer tag, '" + body.getConsumerTag() + "'");
+
+ MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), // replyCode
+ msg, // replytext
+ body.getClazz(),
+ body.getMethod());
+ protocolConnection.writeFrame(responseBody.generateFrame(0));
+ }
+
+ }
+ catch (org.apache.qpid.AMQInvalidArgumentException ise)
+ {
+ _logger.debug("Closing connection due to invalid selector");
+
+ MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createChannelCloseBody(AMQConstant.INVALID_ARGUMENT.getCode(),
+ new AMQShortString(ise.getMessage()),
+ body.getClazz(),
+ body.getMethod());
+ protocolConnection.writeFrame(responseBody.generateFrame(channelId));
+
+
+ }
+ catch (AMQQueue.ExistingExclusiveSubscription e)
+ {
+ throw body.getChannelException(AMQConstant.ACCESS_REFUSED,
+ "Cannot subscribe to queue "
+ + queue.getNameShortString()
+ + " as it already has an existing exclusive consumer");
+ }
+ catch (AMQQueue.ExistingSubscriptionPreventsExclusive e)
+ {
+ throw body.getChannelException(AMQConstant.ACCESS_REFUSED,
+ "Cannot subscribe to queue "
+ + queue.getNameShortString()
+ + " exclusively as it already has a consumer");
+ }
+
+ }
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
new file mode 100644
index 0000000000..790027f293
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+
+package org.apache.qpid.server.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicGetBody;
+import org.apache.qpid.framing.BasicGetEmptyBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.flow.MessageOnlyCreditManager;
+import org.apache.qpid.server.subscription.SubscriptionImpl;
+import org.apache.qpid.server.subscription.ClientDeliveryMethod;
+import org.apache.qpid.server.subscription.RecordDeliveryMethod;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetBody>
+{
+ private static final Logger _log = Logger.getLogger(BasicGetMethodHandler.class);
+
+ private static final BasicGetMethodHandler _instance = new BasicGetMethodHandler();
+
+ public static BasicGetMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private BasicGetMethodHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, BasicGetBody body, int channelId) throws AMQException
+ {
+ AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
+
+
+ VirtualHost vHost = protocolConnection.getVirtualHost();
+
+ AMQChannel channel = protocolConnection.getChannel(channelId);
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
+ else
+ {
+ AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.getQueue());
+ if (queue == null)
+ {
+ _log.info("No queue for '" + body.getQueue() + "'");
+ if(body.getQueue()!=null)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_FOUND,
+ "No such queue, '" + body.getQueue()+ "'");
+ }
+ else
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "No queue name provided, no default queue defined.");
+ }
+ }
+ else
+ {
+ if (queue.isExclusive())
+ {
+ AMQSessionModel session = queue.getExclusiveOwningSession();
+ if (session == null || session.getConnectionModel() != protocolConnection)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue is exclusive, but not created on this Connection.");
+ }
+ }
+
+ if (!performGet(queue,protocolConnection, channel, !body.getNoAck()))
+ {
+ MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
+ // TODO - set clusterId
+ BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null);
+
+
+ protocolConnection.writeFrame(responseBody.generateFrame(channelId));
+ }
+ }
+ }
+ }
+
+ public static boolean performGet(final AMQQueue queue,
+ final AMQProtocolSession session,
+ final AMQChannel channel,
+ final boolean acks)
+ throws AMQException
+ {
+
+ final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L);
+
+ final ClientDeliveryMethod getDeliveryMethod = new ClientDeliveryMethod()
+ {
+
+ int _msg;
+
+ public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+ throws AMQException
+ {
+ singleMessageCredit.useCreditForMessage(entry.getMessage());
+ if(entry.getMessage() instanceof AMQMessage)
+ {
+ session.getProtocolOutputConverter().writeGetOk(entry, channel.getChannelId(),
+ deliveryTag, queue.getMessageCount());
+ }
+ else
+ {
+ //TODO Convert AMQP 0-10 message
+ throw new AMQException(AMQConstant.NOT_IMPLEMENTED, "Not implemented conversion of 0-10 message", null);
+ }
+
+ }
+ };
+ final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod()
+ {
+
+ public void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+ {
+ channel.addUnacknowledgedMessage(entry, deliveryTag, null);
+ }
+ };
+
+ Subscription sub;
+ if(acks)
+ {
+ sub = SubscriptionFactoryImpl.INSTANCE.createSubscription(channel, session, null, acks, null, false, singleMessageCredit, getDeliveryMethod, getRecordMethod);
+ }
+ else
+ {
+ sub = new GetNoAckSubscription(channel,
+ session,
+ null,
+ null,
+ false,
+ singleMessageCredit,
+ getDeliveryMethod,
+ getRecordMethod);
+ }
+
+ queue.registerSubscription(sub,false);
+ queue.flushSubscription(sub);
+ queue.unregisterSubscription(sub);
+ return(!singleMessageCredit.hasCredit());
+
+
+ }
+
+ public static final class GetNoAckSubscription extends SubscriptionImpl.NoAckSubscription
+ {
+ public GetNoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
+ AMQShortString consumerTag, FieldTable filters,
+ boolean noLocal, FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod)
+ throws AMQException
+ {
+ super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
+ }
+
+ public boolean isTransient()
+ {
+ return true;
+ }
+
+ public boolean wouldSuspend(QueueEntry msg)
+ {
+ return !getCreditManager().useCreditForMessage(msg.getMessage());
+ }
+
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
new file mode 100644
index 0000000000..8f23b1c4d4
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
@@ -0,0 +1,96 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class BasicPublishMethodHandler implements StateAwareMethodListener<BasicPublishBody>
+{
+ private static final Logger _logger = Logger.getLogger(BasicPublishMethodHandler.class);
+
+ private static final BasicPublishMethodHandler _instance = new BasicPublishMethodHandler();
+
+
+ public static BasicPublishMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private BasicPublishMethodHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, BasicPublishBody body, int channelId) throws AMQException
+ {
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Publish received on channel " + channelId);
+ }
+
+ AMQShortString exchangeName = body.getExchange();
+ // TODO: check the delivery tag field details - is it unique across the broker or per subscriber?
+ if (exchangeName == null)
+ {
+ exchangeName = ExchangeDefaults.DEFAULT_EXCHANGE_NAME;
+ }
+
+ VirtualHost vHost = session.getVirtualHost();
+ Exchange exch = vHost.getExchangeRegistry().getExchange(exchangeName);
+ // if the exchange does not exist we raise a channel exception
+ if (exch == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange name");
+ }
+ else
+ {
+ // The partially populated BasicDeliver frame plus the received route body
+ // is stored in the channel. Once the final body frame has been received
+ // it is routed to the exchange.
+ AMQChannel channel = session.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
+
+ MessagePublishInfo info = session.getMethodRegistry().getProtocolVersionMethodConverter().convertToInfo(body);
+ info.setExchange(exchangeName);
+ channel.setPublishFrame(info, exch);
+ }
+ }
+
+}
+
+
+
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
new file mode 100644
index 0000000000..dd3281c65f
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicQosBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.AMQChannel;
+
+public class BasicQosHandler implements StateAwareMethodListener<BasicQosBody>
+{
+ private static final BasicQosHandler _instance = new BasicQosHandler();
+
+ public static BasicQosHandler getInstance()
+ {
+ return _instance;
+ }
+
+ public void methodReceived(AMQStateManager stateManager, BasicQosBody body, int channelId) throws AMQException
+ {
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ AMQChannel channel = session.getChannel(channelId);
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
+
+ channel.setCredit(body.getPrefetchSize(), body.getPrefetchCount());
+
+
+ MethodRegistry methodRegistry = session.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createBasicQosOkBody();
+ session.writeFrame(responseBody.generateFrame(channelId));
+
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
new file mode 100644
index 0000000000..c7842cd643
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicRecoverBody;
+import org.apache.qpid.framing.BasicRecoverOkBody;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+
+public class BasicRecoverMethodHandler implements StateAwareMethodListener<BasicRecoverBody>
+{
+ private static final Logger _logger = Logger.getLogger(BasicRecoverMethodHandler.class);
+
+ private static final BasicRecoverMethodHandler _instance = new BasicRecoverMethodHandler();
+
+ public static BasicRecoverMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ public void methodReceived(AMQStateManager stateManager, BasicRecoverBody body, int channelId) throws AMQException
+ {
+ AMQProtocolSession session = stateManager.getProtocolSession();
+
+ _logger.debug("Recover received on protocol session " + session + " and channel " + channelId);
+ AMQChannel channel = session.getChannel(channelId);
+
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
+
+ channel.resend(body.getRequeue());
+
+ // Qpid 0-8 hacks a synchronous -ok onto recover.
+ // In Qpid 0-9 we create a separate sync-recover, sync-recover-ok pair to be "more" compliant
+ if(session.getProtocolVersion().equals(ProtocolVersion.v8_0))
+ {
+ MethodRegistry_8_0 methodRegistry = (MethodRegistry_8_0) session.getMethodRegistry();
+ AMQMethodBody recoverOk = methodRegistry.createBasicRecoverOkBody();
+ session.writeFrame(recoverOk.generateFrame(channelId));
+
+ }
+
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java
new file mode 100644
index 0000000000..f9feada6fe
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java
@@ -0,0 +1,83 @@
+package org.apache.qpid.server.handler;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.framing.BasicRecoverBody;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.BasicRecoverSyncBody;
+import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
+import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
+import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.AMQException;
+
+public class BasicRecoverSyncMethodHandler implements StateAwareMethodListener<BasicRecoverSyncBody>
+{
+ private static final Logger _logger = Logger.getLogger(BasicRecoverSyncMethodHandler.class);
+
+ private static final BasicRecoverSyncMethodHandler _instance = new BasicRecoverSyncMethodHandler();
+
+ public static BasicRecoverSyncMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ public void methodReceived(AMQStateManager stateManager, BasicRecoverSyncBody body, int channelId) throws AMQException
+ {
+ AMQProtocolSession session = stateManager.getProtocolSession();
+
+ _logger.debug("Recover received on protocol session " + session + " and channel " + channelId);
+ AMQChannel channel = session.getChannel(channelId);
+
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
+
+ channel.resend(body.getRequeue());
+
+ // Qpid 0-8 hacks a synchronous -ok onto recover.
+ // In Qpid 0-9 we create a separate sync-recover, sync-recover-ok pair to be "more" compliant
+ if(session.getProtocolVersion().equals(ProtocolVersion.v0_9))
+ {
+ MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9) session.getMethodRegistry();
+ AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody();
+ session.writeFrame(recoverOk.generateFrame(channelId));
+
+ }
+ else if(session.getProtocolVersion().equals(ProtocolVersion.v0_91))
+ {
+ MethodRegistry_0_91 methodRegistry = (MethodRegistry_0_91) session.getMethodRegistry();
+ AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody();
+ session.writeFrame(recoverOk.generateFrame(channelId));
+
+ }
+
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
new file mode 100644
index 0000000000..62dd76f832
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
@@ -0,0 +1,125 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicRejectBody;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.log4j.Logger;
+
+public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicRejectBody>
+{
+ private static final Logger _logger = Logger.getLogger(BasicRejectMethodHandler.class);
+
+ private static BasicRejectMethodHandler _instance = new BasicRejectMethodHandler();
+
+ public static BasicRejectMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private BasicRejectMethodHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, BasicRejectBody body, int channelId) throws AMQException
+ {
+ AMQProtocolSession session = stateManager.getProtocolSession();
+
+ AMQChannel channel = session.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejecting:" + body.getDeliveryTag() +
+ ": Requeue:" + body.getRequeue() +
+ //": Resend:" + evt.getMethod().resend +
+ " on channel:" + channel.debugIdentity());
+ }
+
+ long deliveryTag = body.getDeliveryTag();
+
+ QueueEntry message = channel.getUnacknowledgedMessageMap().get(deliveryTag);
+
+ if (message == null)
+ {
+ _logger.warn("Dropping reject request as message is null for tag:" + deliveryTag);
+// throw evt.getMethod().getChannelException(AMQConstant.NOT_FOUND, "Delivery Tag(" + deliveryTag + ")not known");
+ }
+ else
+ {
+ if (message.isQueueDeleted())
+ {
+ _logger.warn("Message's Queue as already been purged, unable to Reject. " +
+ "Dropping message should use Dead Letter Queue");
+ message = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
+ if(message != null)
+ {
+ message.discard();
+ }
+ //sendtoDeadLetterQueue(msg)
+ return;
+ }
+
+ if (message.getMessage() == null)
+ {
+ _logger.warn("Message as already been purged, unable to Reject.");
+ return;
+ }
+
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage() +
+ ": Requeue:" + body.getRequeue() +
+ //": Resend:" + evt.getMethod().resend +
+ " on channel:" + channel.debugIdentity());
+ }
+
+ // If we haven't requested message to be resent to this consumer then reject it from ever getting it.
+ //if (!evt.getMethod().resend)
+ {
+ message.reject();
+ }
+
+ if (body.getRequeue())
+ {
+ channel.requeue(deliveryTag);
+ }
+ else
+ {
+ _logger.warn("Dropping message as requeue not required and there is no dead letter queue");
+ message = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
+ //sendtoDeadLetterQueue(AMQMessage message)
+// message.queue = channel.getDefaultDeadLetterQueue();
+// channel.requeue(deliveryTag);
+ }
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
new file mode 100644
index 0000000000..9133cce6b7
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.AMQChannel;
+
+public class ChannelCloseHandler implements StateAwareMethodListener<ChannelCloseBody>
+{
+ private static final Logger _logger = Logger.getLogger(ChannelCloseHandler.class);
+
+ private static ChannelCloseHandler _instance = new ChannelCloseHandler();
+
+ public static ChannelCloseHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private ChannelCloseHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, ChannelCloseBody body, int channelId) throws AMQException
+ {
+ AMQProtocolSession session = stateManager.getProtocolSession();
+
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Received channel close for id " + channelId + " citing class " + body.getClassId() +
+ " and method " + body.getMethodId());
+ }
+
+
+ AMQChannel channel = session.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "Trying to close unknown channel");
+ }
+
+ session.closeChannel(channelId);
+ // Client requested closure so we don't wait for ok we send it
+ stateManager.getProtocolSession().closeChannelOk(channelId);
+
+ MethodRegistry methodRegistry = session.getMethodRegistry();
+ ChannelCloseOkBody responseBody = methodRegistry.createChannelCloseOkBody();
+ session.writeFrame(responseBody.generateFrame(channelId));
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java
new file mode 100644
index 0000000000..a857490e7e
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+
+public class ChannelCloseOkHandler implements StateAwareMethodListener<ChannelCloseOkBody>
+{
+ private static final Logger _logger = Logger.getLogger(ChannelCloseOkHandler.class);
+
+ private static ChannelCloseOkHandler _instance = new ChannelCloseOkHandler();
+
+ public static ChannelCloseOkHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private ChannelCloseOkHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, ChannelCloseOkBody body, int channelId) throws AMQException
+ {
+
+ _logger.info("Received channel-close-ok for channel-id " + channelId);
+
+ // Let the Protocol Session know the channel is now closed.
+ stateManager.getProtocolSession().closeChannelOk(channelId);
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
new file mode 100644
index 0000000000..696ca8a63b
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
@@ -0,0 +1,66 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+
+public class ChannelFlowHandler implements StateAwareMethodListener<ChannelFlowBody>
+{
+ private static final Logger _logger = Logger.getLogger(ChannelFlowHandler.class);
+
+ private static ChannelFlowHandler _instance = new ChannelFlowHandler();
+
+ public static ChannelFlowHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private ChannelFlowHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, ChannelFlowBody body, int channelId) throws AMQException
+ {
+ AMQProtocolSession session = stateManager.getProtocolSession();
+
+
+ AMQChannel channel = session.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
+
+ channel.setSuspended(!body.getActive());
+ _logger.debug("Channel.Flow for channel " + channelId + ", active=" + body.getActive());
+
+ MethodRegistry methodRegistry = session.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createChannelFlowOkBody(body.getActive());
+ session.writeFrame(responseBody.generateFrame(channelId));
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
new file mode 100644
index 0000000000..6d874ee971
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
@@ -0,0 +1,141 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ChannelOpenBody;
+import org.apache.qpid.framing.ChannelOpenOkBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
+import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
+import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenBody>
+{
+ private static final Logger _logger = Logger.getLogger(ChannelOpenHandler.class);
+
+ private static ChannelOpenHandler _instance = new ChannelOpenHandler();
+
+ public static ChannelOpenHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private ChannelOpenHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, ChannelOpenBody body, int channelId) throws AMQException
+ {
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+
+ // Protect the broker against out of order frame request.
+ if (virtualHost == null)
+ {
+ throw new AMQException(AMQConstant.COMMAND_INVALID, "Virtualhost has not yet been set. ConnectionOpen has not been called.", null);
+ }
+ _logger.info("Connecting to: " + virtualHost.getName());
+
+ final AMQChannel channel = new AMQChannel(session,channelId, virtualHost.getMessageStore());
+
+ session.addChannel(channel);
+
+ ChannelOpenOkBody response;
+
+ ProtocolVersion pv = session.getProtocolVersion();
+
+ if(pv.equals(ProtocolVersion.v8_0))
+ {
+ MethodRegistry_8_0 methodRegistry = (MethodRegistry_8_0) MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
+ response = methodRegistry.createChannelOpenOkBody();
+
+ }
+ else if(pv.equals(ProtocolVersion.v0_9))
+ {
+ MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9) MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+ UUID uuid = UUID.randomUUID();
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ DataOutputStream dataOut = new DataOutputStream(output);
+ try
+ {
+ dataOut.writeLong(uuid.getMostSignificantBits());
+ dataOut.writeLong(uuid.getLeastSignificantBits());
+ dataOut.flush();
+ dataOut.close();
+ }
+ catch (IOException e)
+ {
+ // This *really* shouldn't happen as we're not doing any I/O
+ throw new RuntimeException("I/O exception when writing to byte array", e);
+ }
+
+ // should really associate this channelId to the session
+ byte[] channelName = output.toByteArray();
+
+ response = methodRegistry.createChannelOpenOkBody(channelName);
+ }
+ else if(pv.equals(ProtocolVersion.v0_91))
+ {
+ MethodRegistry_0_91 methodRegistry = (MethodRegistry_0_91) MethodRegistry.getMethodRegistry(ProtocolVersion.v0_91);
+ UUID uuid = UUID.randomUUID();
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ DataOutputStream dataOut = new DataOutputStream(output);
+ try
+ {
+ dataOut.writeLong(uuid.getMostSignificantBits());
+ dataOut.writeLong(uuid.getLeastSignificantBits());
+ dataOut.flush();
+ dataOut.close();
+ }
+ catch (IOException e)
+ {
+ // This *really* shouldn't happen as we're not doing any I/O
+ throw new RuntimeException("I/O exception when writing to byte array", e);
+ }
+
+ // should really associate this channelId to the session
+ byte[] channelName = output.toByteArray();
+
+ response = methodRegistry.createChannelOpenOkBody(channelName);
+ }
+ else
+ {
+ throw new AMQException(AMQConstant.INTERNAL_ERROR, "Got channel open for protocol version not catered for: " + pv, null);
+ }
+
+
+ session.writeFrame(response.generateFrame(channelId));
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
new file mode 100644
index 0000000000..dade5d5f54
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
@@ -0,0 +1,72 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ConnectionCloseOkBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+
+public class ConnectionCloseMethodHandler implements StateAwareMethodListener<ConnectionCloseBody>
+{
+ private static final Logger _logger = Logger.getLogger(ConnectionCloseMethodHandler.class);
+
+ private static ConnectionCloseMethodHandler _instance = new ConnectionCloseMethodHandler();
+
+ public static ConnectionCloseMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private ConnectionCloseMethodHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, ConnectionCloseBody body, int channelId) throws AMQException
+ {
+ AMQProtocolSession session = stateManager.getProtocolSession();
+
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("ConnectionClose received with reply code/reply text " + body.getReplyCode() + "/" +
+ body.getReplyText() + " for " + session);
+ }
+ try
+ {
+ session.closeSession();
+ }
+ catch (Exception e)
+ {
+ _logger.error("Error closing protocol session: " + e, e);
+ }
+
+ MethodRegistry methodRegistry = session.getMethodRegistry();
+ ConnectionCloseOkBody responseBody = methodRegistry.createConnectionCloseOkBody();
+ session.writeFrame(responseBody.generateFrame(channelId));
+
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java
new file mode 100644
index 0000000000..bc6e5ab403
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ConnectionCloseOkBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.state.AMQState;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+
+public class ConnectionCloseOkMethodHandler implements StateAwareMethodListener<ConnectionCloseOkBody>
+{
+ private static final Logger _logger = Logger.getLogger(ConnectionCloseOkMethodHandler.class);
+
+ private static ConnectionCloseOkMethodHandler _instance = new ConnectionCloseOkMethodHandler();
+
+ public static ConnectionCloseOkMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private ConnectionCloseOkMethodHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, ConnectionCloseOkBody body, int channelId) throws AMQException
+ {
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ //todo should this not do more than just log the method?
+ _logger.info("Received Connection-close-ok");
+
+ try
+ {
+ stateManager.changeState(AMQState.CONNECTION_CLOSED);
+ session.closeSession();
+ }
+ catch (Exception e)
+ {
+ _logger.error("Error closing protocol session: " + e, e);
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
new file mode 100644
index 0000000000..9a79467526
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
@@ -0,0 +1,101 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ConnectionOpenBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.state.AMQState;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class ConnectionOpenMethodHandler implements StateAwareMethodListener<ConnectionOpenBody>
+{
+ private static final Logger _logger = Logger.getLogger(ConnectionOpenMethodHandler.class);
+
+ private static ConnectionOpenMethodHandler _instance = new ConnectionOpenMethodHandler();
+
+ public static ConnectionOpenMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private ConnectionOpenMethodHandler()
+ {
+ }
+
+ private static AMQShortString generateClientID()
+ {
+ return new AMQShortString(Long.toString(System.currentTimeMillis()));
+ }
+
+ public void methodReceived(AMQStateManager stateManager, ConnectionOpenBody body, int channelId) throws AMQException
+ {
+ AMQProtocolSession session = stateManager.getProtocolSession();
+
+ //ignore leading '/'
+ String virtualHostName;
+ if ((body.getVirtualHost() != null) && body.getVirtualHost().charAt(0) == '/')
+ {
+ virtualHostName = new StringBuilder(body.getVirtualHost().subSequence(1, body.getVirtualHost().length())).toString();
+ }
+ else
+ {
+ virtualHostName = body.getVirtualHost() == null ? null : String.valueOf(body.getVirtualHost());
+ }
+
+ VirtualHost virtualHost = stateManager.getVirtualHostRegistry().getVirtualHost(virtualHostName);
+
+ if (virtualHost == null)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown virtual host: '" + virtualHostName + "'");
+ }
+ else
+ {
+ // Check virtualhost access
+ if (!virtualHost.getSecurityManager().accessVirtualhost(virtualHostName, session.getRemoteAddress()))
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied: '" + virtualHost.getName() + "'");
+ }
+
+ session.setVirtualHost(virtualHost);
+
+ // See Spec (0.8.2). Section 3.1.2 Virtual Hosts
+ if (session.getContextKey() == null)
+ {
+ session.setContextKey(generateClientID());
+ }
+
+ MethodRegistry methodRegistry = session.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(body.getVirtualHost());
+
+ stateManager.changeState(AMQState.CONNECTION_OPEN);
+
+ session.writeFrame(responseBody.generateFrame(channelId));
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
new file mode 100644
index 0000000000..d4b79134a2
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
@@ -0,0 +1,126 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ConnectionSecureBody;
+import org.apache.qpid.framing.ConnectionSecureOkBody;
+import org.apache.qpid.framing.ConnectionTuneBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.security.auth.AuthenticationResult;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
+import org.apache.qpid.server.state.AMQState;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+
+public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener<ConnectionSecureOkBody>
+{
+ private static final Logger _logger = Logger.getLogger(ConnectionSecureOkMethodHandler.class);
+
+ private static ConnectionSecureOkMethodHandler _instance = new ConnectionSecureOkMethodHandler();
+
+ public static ConnectionSecureOkMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private ConnectionSecureOkMethodHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, ConnectionSecureOkBody body, int channelId) throws AMQException
+ {
+ AMQProtocolSession session = stateManager.getProtocolSession();
+
+ AuthenticationManager authMgr = ApplicationRegistry.getInstance().getAuthenticationManager();
+
+ SaslServer ss = session.getSaslServer();
+ if (ss == null)
+ {
+ throw new AMQException("No SASL context set up in session");
+ }
+ MethodRegistry methodRegistry = session.getMethodRegistry();
+ AuthenticationResult authResult = authMgr.authenticate(ss, body.getResponse());
+ switch (authResult.status)
+ {
+ case ERROR:
+ Exception cause = authResult.getCause();
+
+ _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage()));
+
+ // This should be abstracted
+ stateManager.changeState(AMQState.CONNECTION_CLOSING);
+
+ ConnectionCloseBody connectionCloseBody =
+ methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(),
+ AMQConstant.NOT_ALLOWED.getName(),
+ body.getClazz(),
+ body.getMethod());
+
+ session.writeFrame(connectionCloseBody.generateFrame(0));
+ disposeSaslServer(session);
+ break;
+ case SUCCESS:
+ _logger.info("Connected as: " + ss.getAuthorizationID());
+ stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
+
+ ConnectionTuneBody tuneBody =
+ methodRegistry.createConnectionTuneBody(ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount(),
+ ConnectionStartOkMethodHandler.getConfiguredFrameSize(),
+ ApplicationRegistry.getInstance().getConfiguration().getHeartBeatDelay());
+ session.writeFrame(tuneBody.generateFrame(0));
+ session.setAuthorizedID(new UsernamePrincipal(ss.getAuthorizationID()));
+ disposeSaslServer(session);
+ break;
+ case CONTINUE:
+ stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
+
+ ConnectionSecureBody secureBody = methodRegistry.createConnectionSecureBody(authResult.challenge);
+ session.writeFrame(secureBody.generateFrame(0));
+ }
+ }
+
+ private void disposeSaslServer(AMQProtocolSession ps)
+ {
+ SaslServer ss = ps.getSaslServer();
+ if (ss != null)
+ {
+ ps.setSaslServer(null);
+ try
+ {
+ ss.dispose();
+ }
+ catch (SaslException e)
+ {
+ _logger.error("Error disposing of Sasl server: " + e);
+ }
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
new file mode 100644
index 0000000000..4442f969c4
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
@@ -0,0 +1,162 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ConnectionSecureBody;
+import org.apache.qpid.framing.ConnectionStartOkBody;
+import org.apache.qpid.framing.ConnectionTuneBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.security.auth.AuthenticationResult;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
+import org.apache.qpid.server.state.AMQState;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+
+
+public class ConnectionStartOkMethodHandler implements StateAwareMethodListener<ConnectionStartOkBody>
+{
+ private static final Logger _logger = Logger.getLogger(ConnectionStartOkMethodHandler.class);
+
+ private static ConnectionStartOkMethodHandler _instance = new ConnectionStartOkMethodHandler();
+
+ public static ConnectionStartOkMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private ConnectionStartOkMethodHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, ConnectionStartOkBody body, int channelId) throws AMQException
+ {
+ AMQProtocolSession session = stateManager.getProtocolSession();
+
+ _logger.info("SASL Mechanism selected: " + body.getMechanism());
+ _logger.info("Locale selected: " + body.getLocale());
+
+ AuthenticationManager authMgr = ApplicationRegistry.getInstance().getAuthenticationManager();
+
+ SaslServer ss = null;
+ try
+ {
+ ss = authMgr.createSaslServer(String.valueOf(body.getMechanism()), session.getLocalFQDN());
+
+ if (ss == null)
+ {
+ throw body.getConnectionException(AMQConstant.RESOURCE_ERROR, "Unable to create SASL Server:" + body.getMechanism());
+ }
+
+ session.setSaslServer(ss);
+
+ AuthenticationResult authResult = authMgr.authenticate(ss, body.getResponse());
+
+ //save clientProperties
+ if (session.getClientProperties() == null)
+ {
+ session.setClientProperties(body.getClientProperties());
+ }
+
+ MethodRegistry methodRegistry = session.getMethodRegistry();
+
+ switch (authResult.status)
+ {
+ case ERROR:
+ Exception cause = authResult.getCause();
+
+ _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage()));
+
+ stateManager.changeState(AMQState.CONNECTION_CLOSING);
+
+ ConnectionCloseBody closeBody =
+ methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), // replyCode
+ AMQConstant.NOT_ALLOWED.getName(),
+ body.getClazz(),
+ body.getMethod());
+
+ session.writeFrame(closeBody.generateFrame(0));
+ disposeSaslServer(session);
+ break;
+
+ case SUCCESS:
+ _logger.info("Connected as: " + ss.getAuthorizationID());
+ session.setAuthorizedID(new UsernamePrincipal(ss.getAuthorizationID()));
+
+ stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
+
+ ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody(ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount(),
+ getConfiguredFrameSize(),
+ ApplicationRegistry.getInstance().getConfiguration().getHeartBeatDelay());
+ session.writeFrame(tuneBody.generateFrame(0));
+ break;
+ case CONTINUE:
+ stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
+
+ ConnectionSecureBody secureBody = methodRegistry.createConnectionSecureBody(authResult.challenge);
+ session.writeFrame(secureBody.generateFrame(0));
+ }
+ }
+ catch (SaslException e)
+ {
+ disposeSaslServer(session);
+ throw new AMQException("SASL error: " + e, e);
+ }
+ }
+
+ private void disposeSaslServer(AMQProtocolSession ps)
+ {
+ SaslServer ss = ps.getSaslServer();
+ if (ss != null)
+ {
+ ps.setSaslServer(null);
+ try
+ {
+ ss.dispose();
+ }
+ catch (SaslException e)
+ {
+ _logger.error("Error disposing of Sasl server: " + e);
+ }
+ }
+ }
+
+ static int getConfiguredFrameSize()
+ {
+ final ServerConfiguration config = ApplicationRegistry.getInstance().getConfiguration();
+ final int framesize = config.getFrameSize();
+ _logger.info("Framesize set to " + framesize);
+ return framesize;
+ }
+}
+
+
+
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java
new file mode 100644
index 0000000000..1da2760639
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ConnectionTuneOkBody;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.state.AMQState;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+
+public class ConnectionTuneOkMethodHandler implements StateAwareMethodListener<ConnectionTuneOkBody>
+{
+ private static final Logger _logger = Logger.getLogger(ConnectionTuneOkMethodHandler.class);
+
+ private static ConnectionTuneOkMethodHandler _instance = new ConnectionTuneOkMethodHandler();
+
+ public static ConnectionTuneOkMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ public void methodReceived(AMQStateManager stateManager, ConnectionTuneOkBody body, int channelId) throws AMQException
+ {
+ AMQProtocolSession session = stateManager.getProtocolSession();
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug(body);
+ }
+ stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
+ session.initHeartbeats(body.getHeartbeat());
+ session.setMaxFrameSize(body.getFrameMax());
+
+ long maxChannelNumber = body.getChannelMax();
+ //0 means no implied limit, except that forced by protocol limitations (0xFFFF)
+ session.setMaximumNumberOfChannels( maxChannelNumber == 0 ? 0xFFFFL : maxChannelNumber);
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
new file mode 100644
index 0000000000..ccd42204d9
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
@@ -0,0 +1,178 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+/**
+ * @author Apache Software Foundation
+ *
+ *
+ */
+public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBoundBody>
+{
+ private static final ExchangeBoundHandler _instance = new ExchangeBoundHandler();
+
+ public static final int OK = 0;
+
+ public static final int EXCHANGE_NOT_FOUND = 1;
+
+ public static final int QUEUE_NOT_FOUND = 2;
+
+ public static final int NO_BINDINGS = 3;
+
+ public static final int QUEUE_NOT_BOUND = 4;
+
+ public static final int NO_QUEUE_BOUND_WITH_RK = 5;
+
+ public static final int SPECIFIC_QUEUE_NOT_BOUND_WITH_RK = 6;
+
+ public static ExchangeBoundHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private ExchangeBoundHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, ExchangeBoundBody body, int channelId) throws AMQException
+ {
+ AMQProtocolSession session = stateManager.getProtocolSession();
+
+ VirtualHost virtualHost = session.getVirtualHost();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+ MethodRegistry methodRegistry = session.getMethodRegistry();
+
+
+
+
+ AMQShortString exchangeName = body.getExchange();
+ AMQShortString queueName = body.getQueue();
+ AMQShortString routingKey = body.getRoutingKey();
+ if (exchangeName == null)
+ {
+ throw new AMQException("Exchange exchange must not be null");
+ }
+ Exchange exchange = virtualHost.getExchangeRegistry().getExchange(exchangeName);
+ ExchangeBoundOkBody response;
+ if (exchange == null)
+ {
+
+
+ response = methodRegistry.createExchangeBoundOkBody(EXCHANGE_NOT_FOUND,
+ new AMQShortString("Exchange " + exchangeName + " not found"));
+ }
+ else if (routingKey == null)
+ {
+ if (queueName == null)
+ {
+ if (exchange.hasBindings())
+ {
+ response = methodRegistry.createExchangeBoundOkBody(OK, null);
+ }
+ else
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(NO_BINDINGS, // replyCode
+ null); // replyText
+ }
+ }
+ else
+ {
+
+ AMQQueue queue = queueRegistry.getQueue(queueName);
+ if (queue == null)
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode
+ new AMQShortString("Queue " + queueName + " not found")); // replyText
+ }
+ else
+ {
+ if (exchange.isBound(queue))
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode
+ null); // replyText
+ }
+ else
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_BOUND, // replyCode
+ new AMQShortString("Queue " + queueName + " not bound to exchange " + exchangeName)); // replyText
+ }
+ }
+ }
+ }
+ else if (queueName != null)
+ {
+ AMQQueue queue = queueRegistry.getQueue(queueName);
+ if (queue == null)
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode
+ new AMQShortString("Queue " + queueName + " not found")); // replyText
+ }
+ else
+ {
+ if (exchange.isBound(body.getRoutingKey(), queue))
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode
+ null); // replyText
+ }
+ else
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode
+ new AMQShortString("Queue " + queueName + " not bound with routing key " +
+ body.getRoutingKey() + " to exchange " + exchangeName)); // replyText
+ }
+ }
+ }
+ else
+ {
+ if (exchange.isBound(body.getRoutingKey()))
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode
+ null); // replyText
+ }
+ else
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(NO_QUEUE_BOUND_WITH_RK, // replyCode
+ new AMQShortString("No queue bound with routing key " + body.getRoutingKey() +
+ " to exchange " + exchangeName)); // replyText
+ }
+ }
+ session.writeFrame(response.generateFrame(channelId));
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
new file mode 100644
index 0000000000..98a0d33487
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
@@ -0,0 +1,108 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQUnknownExchangeType;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class ExchangeDeclareHandler implements StateAwareMethodListener<ExchangeDeclareBody>
+{
+ private static final Logger _logger = Logger.getLogger(ExchangeDeclareHandler.class);
+
+ private static final ExchangeDeclareHandler _instance = new ExchangeDeclareHandler();
+
+ public static ExchangeDeclareHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private ExchangeDeclareHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, ExchangeDeclareBody body, int channelId) throws AMQException
+ {
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+ ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+ ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Request to declare exchange of type " + body.getType() + " with name " + body.getExchange());
+ }
+
+ synchronized(exchangeRegistry)
+ {
+ Exchange exchange = exchangeRegistry.getExchange(body.getExchange());
+
+ if (exchange == null)
+ {
+ if(body.getPassive() && ((body.getType() == null) || body.getType().length() ==0))
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + body.getExchange());
+ }
+ else
+ {
+ try
+ {
+ exchange = exchangeFactory.createExchange(body.getExchange() == null ? null : body.getExchange().intern(),
+ body.getType() == null ? null : body.getType().intern(),
+ body.getDurable(),
+ body.getPassive(), body.getTicket());
+ exchangeRegistry.registerExchange(exchange);
+
+ if (exchange.isDurable())
+ {
+ virtualHost.getDurableConfigurationStore().createExchange(exchange);
+ }
+ }
+ catch(AMQUnknownExchangeType e)
+ {
+ throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + body.getExchange(),e);
+ }
+ }
+ }
+ else if (!exchange.getTypeShortString().equals(body.getType()))
+ {
+
+ throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + body.getExchange() + " of type " + exchange.getTypeShortString() + " to " + body.getType() +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(),null);
+ }
+ }
+ if(!body.getNowait())
+ {
+ MethodRegistry methodRegistry = session.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody();
+ session.writeFrame(responseBody.generateFrame(channelId));
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
new file mode 100644
index 0000000000..586aaf9336
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ExchangeDeleteBody;
+import org.apache.qpid.framing.ExchangeDeleteOkBody;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.exchange.ExchangeInUseException;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeDeleteBody>
+{
+ private static final ExchangeDeleteHandler _instance = new ExchangeDeleteHandler();
+
+ public static ExchangeDeleteHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private ExchangeDeleteHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, ExchangeDeleteBody body, int channelId) throws AMQException
+ {
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+ ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+
+ try
+ {
+ if(exchangeRegistry.getExchange(body.getExchange()) == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "No such exchange: " + body.getExchange());
+ }
+ exchangeRegistry.unregisterExchange(body.getExchange(), body.getIfUnused());
+
+ ExchangeDeleteOkBody responseBody = session.getMethodRegistry().createExchangeDeleteOkBody();
+
+ session.writeFrame(responseBody.generateFrame(channelId));
+ }
+ catch (ExchangeInUseException e)
+ {
+ throw body.getChannelException(AMQConstant.IN_USE, "Exchange in use");
+ // TODO: sort out consistent channel close mechanism that does all clean up etc.
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/OnCurrentThreadExecutor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/OnCurrentThreadExecutor.java
new file mode 100644
index 0000000000..ac516b6133
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/OnCurrentThreadExecutor.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import java.util.concurrent.Executor;
+
+/**
+ * An executor that executes the task on the current thread.
+ */
+public class OnCurrentThreadExecutor implements Executor
+{
+ public void execute(Runnable command)
+ {
+ command.run();
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
new file mode 100644
index 0000000000..0eb69e4b16
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
@@ -0,0 +1,159 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.QueueBindBody;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.util.Map;
+
+public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
+{
+ private static final Logger _log = Logger.getLogger(QueueBindHandler.class);
+
+ private static final QueueBindHandler _instance = new QueueBindHandler();
+
+ public static QueueBindHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private QueueBindHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, QueueBindBody body, int channelId) throws AMQException
+ {
+ AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
+ VirtualHost virtualHost = protocolConnection.getVirtualHost();
+ ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+ final AMQQueue queue;
+ final AMQShortString routingKey;
+
+ if (body.getQueue() == null)
+ {
+ AMQChannel channel = protocolConnection.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
+
+ queue = channel.getDefaultQueue();
+
+ if (queue == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null");
+ }
+
+ if (body.getRoutingKey() == null)
+ {
+ routingKey = queue.getNameShortString();
+ }
+ else
+ {
+ routingKey = body.getRoutingKey().intern();
+ }
+ }
+ else
+ {
+ queue = queueRegistry.getQueue(body.getQueue());
+ routingKey = body.getRoutingKey() == null ? AMQShortString.EMPTY_STRING : body.getRoutingKey().intern();
+ }
+
+ if (queue == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
+ }
+ final Exchange exch = exchangeRegistry.getExchange(body.getExchange());
+ if (exch == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist.");
+ }
+
+
+ try
+ {
+ if (queue.isExclusive() && !queue.isDurable())
+ {
+ AMQSessionModel session = queue.getExclusiveOwningSession();
+ if (session == null || session.getConnectionModel() != protocolConnection)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection.");
+ }
+ }
+
+ if (!exch.isBound(routingKey, body.getArguments(), queue))
+ {
+ String bindingKey = String.valueOf(routingKey);
+ Map<String,Object> arguments = FieldTable.convertToMap(body.getArguments());
+
+ if(!virtualHost.getBindingFactory().addBinding(bindingKey, queue, exch, arguments))
+ {
+ Binding oldBinding = virtualHost.getBindingFactory().getBinding(bindingKey, queue, exch, arguments);
+
+ Map<String, Object> oldArgs = oldBinding.getArguments();
+ if((oldArgs == null && !arguments.isEmpty()) || (oldArgs != null && !oldArgs.equals(arguments)))
+ {
+ virtualHost.getBindingFactory().replaceBinding(bindingKey, queue, exch, arguments);
+ }
+ }
+ }
+ }
+ catch (AMQException e)
+ {
+ throw body.getChannelException(AMQConstant.CHANNEL_ERROR, e.toString());
+ }
+
+ if (_log.isInfoEnabled())
+ {
+ _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey);
+ }
+ if (!body.getNowait())
+ {
+ MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody();
+ protocolConnection.writeFrame(responseBody.generateFrame(channelId));
+
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
new file mode 100644
index 0000000000..8939cfa334
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
@@ -0,0 +1,254 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import java.util.UUID;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.QueueDeclareBody;
+import org.apache.qpid.framing.QueueDeclareOkBody;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclareBody>
+{
+ private static final Logger _logger = Logger.getLogger(QueueDeclareHandler.class);
+
+ private static final QueueDeclareHandler _instance = new QueueDeclareHandler();
+
+ public static QueueDeclareHandler getInstance()
+ {
+ return _instance;
+ }
+
+ public boolean autoRegister = ApplicationRegistry.getInstance().getConfiguration().getQueueAutoRegister();
+
+ private final AtomicInteger _counter = new AtomicInteger();
+
+ public void methodReceived(AMQStateManager stateManager, QueueDeclareBody body, int channelId) throws AMQException
+ {
+ final AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
+ final AMQSessionModel session = protocolConnection.getChannel(channelId);
+ VirtualHost virtualHost = protocolConnection.getVirtualHost();
+ ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+ DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
+
+ final AMQShortString queueName;
+
+ // if we aren't given a queue name, we create one which we return to the client
+ if ((body.getQueue() == null) || (body.getQueue().length() == 0))
+ {
+ queueName = createName();
+ }
+ else
+ {
+ queueName = body.getQueue().intern();
+ }
+
+ AMQQueue queue;
+
+ //TODO: do we need to check that the queue already exists with exactly the same "configuration"?
+
+ synchronized (queueRegistry)
+ {
+ queue = queueRegistry.getQueue(queueName);
+
+ AMQSessionModel owningSession = null;
+
+ if (queue != null)
+ {
+ owningSession = queue.getExclusiveOwningSession();
+ }
+
+ if (queue == null)
+ {
+ if (body.getPassive())
+ {
+ String msg = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ").";
+ throw body.getChannelException(AMQConstant.NOT_FOUND, msg);
+ }
+ else
+ {
+ queue = createQueue(queueName, body, virtualHost, protocolConnection);
+ queue.setPrincipalHolder(protocolConnection);
+ if (queue.isDurable() && !queue.isAutoDelete())
+ {
+ store.createQueue(queue, body.getArguments());
+ }
+ if(body.getAutoDelete())
+ {
+ queue.setDeleteOnNoConsumers(true);
+ }
+ queueRegistry.registerQueue(queue);
+ if (body.getExclusive())
+ {
+ queue.setExclusiveOwningSession(protocolConnection.getChannel(channelId));
+ queue.setPrincipalHolder(protocolConnection);
+
+ if(!body.getDurable())
+ {
+ final AMQQueue q = queue;
+ final AMQProtocolSession.Task sessionCloseTask = new AMQProtocolSession.Task()
+ {
+ public void doTask(AMQProtocolSession session) throws AMQException
+ {
+ q.setExclusiveOwningSession(null);
+ }
+ };
+ protocolConnection.addSessionCloseTask(sessionCloseTask);
+ queue.addQueueDeleteTask(new AMQQueue.Task() {
+ public void doTask(AMQQueue queue) throws AMQException
+ {
+ protocolConnection.removeSessionCloseTask(sessionCloseTask);
+ }
+ });
+ }
+ }
+ if (autoRegister)
+ {
+ Exchange defaultExchange = exchangeRegistry.getDefaultExchange();
+
+ virtualHost.getBindingFactory().addBinding(String.valueOf(queueName), queue, defaultExchange, Collections.EMPTY_MAP);
+ _logger.info("Queue " + queueName + " bound to default exchange(" + defaultExchange.getNameShortString() + ")");
+ }
+ }
+ }
+ else if (queue.isExclusive() && !queue.isDurable() && (owningSession == null || owningSession.getConnectionModel() != protocolConnection))
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection.");
+ }
+ else if(!body.getPassive() && ((queue.isExclusive()) != body.getExclusive()))
+ {
+
+ throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
+ "Cannot re-declare queue '" + queue.getNameShortString() + "' with different exclusivity (was: "
+ + queue.isExclusive() + " requested " + body.getExclusive() + ")");
+ }
+ else if (!body.getPassive() && body.getExclusive() && !(queue.isDurable() ? String.valueOf(queue.getOwner()).equals(session.getClientID()) : (owningSession == null || owningSession.getConnectionModel() == protocolConnection)))
+ {
+ throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + queueName + "'), "
+ + "as exclusive queue with same name "
+ + "declared on another client ID('"
+ + queue.getOwner() + "') your clientID('" + session.getClientID() + "')");
+
+ }
+ else if(!body.getPassive() && queue.isAutoDelete() != body.getAutoDelete())
+ {
+ throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
+ "Cannot re-declare queue '" + queue.getNameShortString() + "' with different auto-delete (was: "
+ + queue.isAutoDelete() + " requested " + body.getAutoDelete() + ")");
+ }
+ else if(!body.getPassive() && queue.isDurable() != body.getDurable())
+ {
+ throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
+ "Cannot re-declare queue '" + queue.getNameShortString() + "' with different durability (was: "
+ + queue.isDurable() + " requested " + body.getDurable() + ")");
+ }
+
+
+ AMQChannel channel = protocolConnection.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
+
+ //set this as the default queue on the channel:
+ channel.setDefaultQueue(queue);
+ }
+
+ if (!body.getNowait())
+ {
+ MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
+ QueueDeclareOkBody responseBody =
+ methodRegistry.createQueueDeclareOkBody(queueName,
+ queue.getMessageCount(),
+ queue.getConsumerCount());
+ protocolConnection.writeFrame(responseBody.generateFrame(channelId));
+
+ _logger.info("Queue " + queueName + " declared successfully");
+ }
+ }
+
+ protected AMQShortString createName()
+ {
+ return new AMQShortString("tmp_" + UUID.randomUUID());
+ }
+
+ protected AMQQueue createQueue(final AMQShortString queueName,
+ QueueDeclareBody body,
+ VirtualHost virtualHost,
+ final AMQProtocolSession session)
+ throws AMQException
+ {
+ final QueueRegistry registry = virtualHost.getQueueRegistry();
+ AMQShortString owner = body.getExclusive() ? session.getContextKey() : null;
+
+ final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, body.getDurable(), owner, body.getAutoDelete(),
+ body.getExclusive(),virtualHost, body.getArguments());
+
+ if (body.getExclusive() && !body.getDurable())
+ {
+ final AMQProtocolSession.Task deleteQueueTask =
+ new AMQProtocolSession.Task()
+ {
+ public void doTask(AMQProtocolSession session) throws AMQException
+ {
+ if (registry.getQueue(queueName) == queue)
+ {
+ queue.delete();
+ }
+ }
+ };
+
+ session.addSessionCloseTask(deleteQueueTask);
+
+ queue.addQueueDeleteTask(new AMQQueue.Task()
+ {
+ public void doTask(AMQQueue queue)
+ {
+ session.removeSessionCloseTask(deleteQueueTask);
+ }
+ });
+ }
+
+ return queue;
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
new file mode 100644
index 0000000000..da52268e52
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
@@ -0,0 +1,125 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.QueueDeleteBody;
+import org.apache.qpid.framing.QueueDeleteOkBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.AMQChannel;
+
+public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteBody>
+{
+ private static final QueueDeleteHandler _instance = new QueueDeleteHandler();
+
+ public static QueueDeleteHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private final boolean _failIfNotFound;
+
+ public QueueDeleteHandler()
+ {
+ this(true);
+ }
+
+ public QueueDeleteHandler(boolean failIfNotFound)
+ {
+ _failIfNotFound = failIfNotFound;
+
+ }
+
+ public void methodReceived(AMQStateManager stateManager, QueueDeleteBody body, int channelId) throws AMQException
+ {
+ AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
+ VirtualHost virtualHost = protocolConnection.getVirtualHost();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+ DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
+
+ AMQQueue queue;
+ if (body.getQueue() == null)
+ {
+ AMQChannel channel = protocolConnection.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
+
+ //get the default queue on the channel:
+ queue = channel.getDefaultQueue();
+ }
+ else
+ {
+ queue = queueRegistry.getQueue(body.getQueue());
+ }
+
+ if (queue == null)
+ {
+ if (_failIfNotFound)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
+ }
+ }
+ else
+ {
+ if (body.getIfEmpty() && !queue.isEmpty())
+ {
+ throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is not empty.");
+ }
+ else if (body.getIfUnused() && !queue.isUnused())
+ {
+ // TODO - Error code
+ throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is still used.");
+ }
+ else
+ {
+ AMQSessionModel session = queue.getExclusiveOwningSession();
+ if (queue.isExclusive() && !queue.isDurable() && (session == null || session.getConnectionModel() != protocolConnection))
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection.");
+ }
+
+ int purged = queue.delete();
+
+ if (queue.isDurable())
+ {
+ store.removeQueue(queue);
+ }
+
+ MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
+ QueueDeleteOkBody responseBody = methodRegistry.createQueueDeleteOkBody(purged);
+ protocolConnection.writeFrame(responseBody.generateFrame(channelId));
+ }
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
new file mode 100644
index 0000000000..759eec0129
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.QueuePurgeBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.AMQChannel;
+
+public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBody>
+{
+ private static final QueuePurgeHandler _instance = new QueuePurgeHandler();
+
+ public static QueuePurgeHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private final boolean _failIfNotFound;
+
+ public QueuePurgeHandler()
+ {
+ this(true);
+ }
+
+ public QueuePurgeHandler(boolean failIfNotFound)
+ {
+ _failIfNotFound = failIfNotFound;
+ }
+
+ public void methodReceived(AMQStateManager stateManager, QueuePurgeBody body, int channelId) throws AMQException
+ {
+ AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
+ VirtualHost virtualHost = protocolConnection.getVirtualHost();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+ AMQChannel channel = protocolConnection.getChannel(channelId);
+
+
+ AMQQueue queue;
+ if(body.getQueue() == null)
+ {
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
+
+ //get the default queue on the channel:
+ queue = channel.getDefaultQueue();
+
+ if(queue == null)
+ {
+ if(_failIfNotFound)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,"No queue specified.");
+ }
+ }
+ }
+ else
+ {
+ queue = queueRegistry.getQueue(body.getQueue());
+ }
+
+ if(queue == null)
+ {
+ if(_failIfNotFound)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
+ }
+ }
+ else
+ {
+ AMQSessionModel session = queue.getExclusiveOwningSession();
+
+ if (queue.isExclusive() && (session == null || session.getConnectionModel() != protocolConnection))
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue is exclusive, but not created on this Connection.");
+ }
+
+ long purged = queue.clearQueue();
+
+
+ if(!body.getNowait())
+ {
+
+ MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createQueuePurgeOkBody(purged);
+ protocolConnection.writeFrame(responseBody.generateFrame(channelId));
+
+ }
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
new file mode 100644
index 0000000000..8391a4b184
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
@@ -0,0 +1,123 @@
+package org.apache.qpid.server.handler;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.QueueUnbindBody;
+import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindBody>
+{
+ private static final Logger _log = Logger.getLogger(QueueUnbindHandler.class);
+
+ private static final QueueUnbindHandler _instance = new QueueUnbindHandler();
+
+ public static QueueUnbindHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private QueueUnbindHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, QueueUnbindBody body, int channelId) throws AMQException
+ {
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+ ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+
+ final AMQQueue queue;
+ final AMQShortString routingKey;
+
+ if (body.getQueue() == null)
+ {
+ AMQChannel channel = session.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
+
+ queue = channel.getDefaultQueue();
+
+ if (queue == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null");
+ }
+
+ routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern();
+
+ }
+ else
+ {
+ queue = queueRegistry.getQueue(body.getQueue());
+ routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern();
+ }
+
+ if (queue == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
+ }
+ final Exchange exch = exchangeRegistry.getExchange(body.getExchange());
+ if (exch == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist.");
+ }
+
+ if(virtualHost.getBindingFactory().getBinding(String.valueOf(routingKey), queue, exch, FieldTable.convertToMap(body.getArguments())) == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND,"No such binding");
+ }
+ else
+ {
+ virtualHost.getBindingFactory().removeBinding(String.valueOf(routingKey), queue, exch, FieldTable.convertToMap(body.getArguments()));
+ }
+
+
+ if (_log.isInfoEnabled())
+ {
+ _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey);
+ }
+
+ MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9) session.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createQueueUnbindOkBody();
+ session.writeFrame(responseBody.generateFrame(channelId));
+
+
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl.java
new file mode 100644
index 0000000000..e290afcde3
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl.java
@@ -0,0 +1,574 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import java.util.Map;
+import java.util.HashMap;
+
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.AMQException;
+
+public class ServerMethodDispatcherImpl implements MethodDispatcher
+{
+ private final AMQStateManager _stateManager;
+
+ private static interface DispatcherFactory
+ {
+ public MethodDispatcher createMethodDispatcher(AMQStateManager stateManager);
+ }
+
+ private static final Map<ProtocolVersion, DispatcherFactory> _dispatcherFactories =
+ new HashMap<ProtocolVersion, DispatcherFactory>();
+
+
+ static
+ {
+ _dispatcherFactories.put(ProtocolVersion.v8_0,
+ new DispatcherFactory()
+ {
+ public MethodDispatcher createMethodDispatcher(AMQStateManager stateManager)
+ {
+ return new ServerMethodDispatcherImpl_8_0(stateManager);
+ }
+ });
+
+ _dispatcherFactories.put(ProtocolVersion.v0_9,
+ new DispatcherFactory()
+ {
+ public MethodDispatcher createMethodDispatcher(AMQStateManager stateManager)
+ {
+ return new ServerMethodDispatcherImpl_0_9(stateManager);
+ }
+ });
+ _dispatcherFactories.put(ProtocolVersion.v0_91,
+ new DispatcherFactory()
+ {
+ public MethodDispatcher createMethodDispatcher(AMQStateManager stateManager)
+ {
+ return new ServerMethodDispatcherImpl_0_91(stateManager);
+ }
+ });
+
+ }
+
+
+ private static final AccessRequestHandler _accessRequestHandler = AccessRequestHandler.getInstance();
+ private static final ChannelCloseHandler _channelCloseHandler = ChannelCloseHandler.getInstance();
+ private static final ChannelOpenHandler _channelOpenHandler = ChannelOpenHandler.getInstance();
+ private static final ChannelCloseOkHandler _channelCloseOkHandler = ChannelCloseOkHandler.getInstance();
+ private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance();
+ private static final ConnectionCloseOkMethodHandler _connectionCloseOkMethodHandler = ConnectionCloseOkMethodHandler.getInstance();
+ private static final ConnectionOpenMethodHandler _connectionOpenMethodHandler = ConnectionOpenMethodHandler.getInstance();
+ private static final ConnectionTuneOkMethodHandler _connectionTuneOkMethodHandler = ConnectionTuneOkMethodHandler.getInstance();
+ private static final ConnectionSecureOkMethodHandler _connectionSecureOkMethodHandler = ConnectionSecureOkMethodHandler.getInstance();
+ private static final ConnectionStartOkMethodHandler _connectionStartOkMethodHandler = ConnectionStartOkMethodHandler.getInstance();
+ private static final ExchangeDeclareHandler _exchangeDeclareHandler = ExchangeDeclareHandler.getInstance();
+ private static final ExchangeDeleteHandler _exchangeDeleteHandler = ExchangeDeleteHandler.getInstance();
+ private static final ExchangeBoundHandler _exchangeBoundHandler = ExchangeBoundHandler.getInstance();
+ private static final BasicAckMethodHandler _basicAckMethodHandler = BasicAckMethodHandler.getInstance();
+ private static final BasicRecoverMethodHandler _basicRecoverMethodHandler = BasicRecoverMethodHandler.getInstance();
+ private static final BasicConsumeMethodHandler _basicConsumeMethodHandler = BasicConsumeMethodHandler.getInstance();
+ private static final BasicGetMethodHandler _basicGetMethodHandler = BasicGetMethodHandler.getInstance();
+ private static final BasicCancelMethodHandler _basicCancelMethodHandler = BasicCancelMethodHandler.getInstance();
+ private static final BasicPublishMethodHandler _basicPublishMethodHandler = BasicPublishMethodHandler.getInstance();
+ private static final BasicQosHandler _basicQosHandler = BasicQosHandler.getInstance();
+ private static final QueueBindHandler _queueBindHandler = QueueBindHandler.getInstance();
+ private static final QueueDeclareHandler _queueDeclareHandler = QueueDeclareHandler.getInstance();
+ private static final QueueDeleteHandler _queueDeleteHandler = QueueDeleteHandler.getInstance();
+ private static final QueuePurgeHandler _queuePurgeHandler = QueuePurgeHandler.getInstance();
+ private static final ChannelFlowHandler _channelFlowHandler = ChannelFlowHandler.getInstance();
+ private static final TxSelectHandler _txSelectHandler = TxSelectHandler.getInstance();
+ private static final TxCommitHandler _txCommitHandler = TxCommitHandler.getInstance();
+ private static final TxRollbackHandler _txRollbackHandler = TxRollbackHandler.getInstance();
+ private static final BasicRejectMethodHandler _basicRejectMethodHandler = BasicRejectMethodHandler.getInstance();
+
+
+
+ public static MethodDispatcher createMethodDispatcher(AMQStateManager stateManager, ProtocolVersion protocolVersion)
+ {
+ return _dispatcherFactories.get(protocolVersion).createMethodDispatcher(stateManager);
+ }
+
+
+ public ServerMethodDispatcherImpl(AMQStateManager stateManager)
+ {
+ _stateManager = stateManager;
+ }
+
+
+ protected AMQStateManager getStateManager()
+ {
+ return _stateManager;
+ }
+
+
+
+ public boolean dispatchAccessRequest(AccessRequestBody body, int channelId) throws AMQException
+ {
+ _accessRequestHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchBasicAck(BasicAckBody body, int channelId) throws AMQException
+ {
+ _basicAckMethodHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchBasicCancel(BasicCancelBody body, int channelId) throws AMQException
+ {
+ _basicCancelMethodHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchBasicConsume(BasicConsumeBody body, int channelId) throws AMQException
+ {
+ _basicConsumeMethodHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchBasicGet(BasicGetBody body, int channelId) throws AMQException
+ {
+ _basicGetMethodHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchBasicPublish(BasicPublishBody body, int channelId) throws AMQException
+ {
+ _basicPublishMethodHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchBasicQos(BasicQosBody body, int channelId) throws AMQException
+ {
+ _basicQosHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchBasicRecover(BasicRecoverBody body, int channelId) throws AMQException
+ {
+ _basicRecoverMethodHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchBasicReject(BasicRejectBody body, int channelId) throws AMQException
+ {
+ _basicRejectMethodHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchChannelOpen(ChannelOpenBody body, int channelId) throws AMQException
+ {
+ _channelOpenHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+
+ public boolean dispatchAccessRequestOk(AccessRequestOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchBasicCancelOk(BasicCancelOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchBasicConsumeOk(BasicConsumeOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchBasicDeliver(BasicDeliverBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchBasicGetEmpty(BasicGetEmptyBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchBasicGetOk(BasicGetOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchBasicQosOk(BasicQosOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchBasicReturn(BasicReturnBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) throws AMQException
+ {
+ _channelCloseHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+
+ public boolean dispatchChannelCloseOk(ChannelCloseOkBody body, int channelId) throws AMQException
+ {
+ _channelCloseOkHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+
+ public boolean dispatchChannelFlow(ChannelFlowBody body, int channelId) throws AMQException
+ {
+ _channelFlowHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchChannelFlowOk(ChannelFlowOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchChannelOpenOk(ChannelOpenOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+
+ public boolean dispatchConnectionOpen(ConnectionOpenBody body, int channelId) throws AMQException
+ {
+ _connectionOpenMethodHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+
+ public boolean dispatchConnectionClose(ConnectionCloseBody body, int channelId) throws AMQException
+ {
+ _connectionCloseMethodHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+
+ public boolean dispatchConnectionCloseOk(ConnectionCloseOkBody body, int channelId) throws AMQException
+ {
+ _connectionCloseOkMethodHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchConnectionOpenOk(ConnectionOpenOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchConnectionRedirect(ConnectionRedirectBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchConnectionSecure(ConnectionSecureBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchConnectionStart(ConnectionStartBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchConnectionTune(ConnectionTuneBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchDtxSelectOk(DtxSelectOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchDtxStartOk(DtxStartOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchExchangeBoundOk(ExchangeBoundOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchExchangeDeclareOk(ExchangeDeclareOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchExchangeDeleteOk(ExchangeDeleteOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchFileCancelOk(FileCancelOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchFileConsumeOk(FileConsumeOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchFileDeliver(FileDeliverBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchFileOpen(FileOpenBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchFileOpenOk(FileOpenOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchFileQosOk(FileQosOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchFileReturn(FileReturnBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchFileStage(FileStageBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchQueueBindOk(QueueBindOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchQueueDeclareOk(QueueDeclareOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchQueueDeleteOk(QueueDeleteOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchQueuePurgeOk(QueuePurgeOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchStreamCancelOk(StreamCancelOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchStreamConsumeOk(StreamConsumeOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchStreamDeliver(StreamDeliverBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchStreamQosOk(StreamQosOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchStreamReturn(StreamReturnBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchTxCommitOk(TxCommitOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchTxRollbackOk(TxRollbackOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchTxSelectOk(TxSelectOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+
+ public boolean dispatchConnectionSecureOk(ConnectionSecureOkBody body, int channelId) throws AMQException
+ {
+ _connectionSecureOkMethodHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchConnectionStartOk(ConnectionStartOkBody body, int channelId) throws AMQException
+ {
+ _connectionStartOkMethodHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchConnectionTuneOk(ConnectionTuneOkBody body, int channelId) throws AMQException
+ {
+ _connectionTuneOkMethodHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchDtxSelect(DtxSelectBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchDtxStart(DtxStartBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchExchangeBound(ExchangeBoundBody body, int channelId) throws AMQException
+ {
+ _exchangeBoundHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchExchangeDeclare(ExchangeDeclareBody body, int channelId) throws AMQException
+ {
+ _exchangeDeclareHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchExchangeDelete(ExchangeDeleteBody body, int channelId) throws AMQException
+ {
+ _exchangeDeleteHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchFileAck(FileAckBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchFileCancel(FileCancelBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchFileConsume(FileConsumeBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchFilePublish(FilePublishBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchFileQos(FileQosBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchFileReject(FileRejectBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchQueueBind(QueueBindBody body, int channelId) throws AMQException
+ {
+ _queueBindHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchQueueDeclare(QueueDeclareBody body, int channelId) throws AMQException
+ {
+ _queueDeclareHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchQueueDelete(QueueDeleteBody body, int channelId) throws AMQException
+ {
+ _queueDeleteHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchQueuePurge(QueuePurgeBody body, int channelId) throws AMQException
+ {
+ _queuePurgeHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchStreamCancel(StreamCancelBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchStreamConsume(StreamConsumeBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchStreamPublish(StreamPublishBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchStreamQos(StreamQosBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTunnelRequest(TunnelRequestBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTxCommit(TxCommitBody body, int channelId) throws AMQException
+ {
+ _txCommitHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchTxRollback(TxRollbackBody body, int channelId) throws AMQException
+ {
+ _txRollbackHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchTxSelect(TxSelectBody body, int channelId) throws AMQException
+ {
+ _txSelectHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+
+
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_9.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_9.java
new file mode 100644
index 0000000000..8b1dca77ba
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_9.java
@@ -0,0 +1,164 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+
+import org.apache.qpid.framing.amqp_0_9.MethodDispatcher_0_9;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.AMQException;
+
+
+
+public class ServerMethodDispatcherImpl_0_9
+ extends ServerMethodDispatcherImpl
+ implements MethodDispatcher_0_9
+
+{
+
+ private static final BasicRecoverSyncMethodHandler _basicRecoverSyncMethodHandler =
+ BasicRecoverSyncMethodHandler.getInstance();
+ private static final QueueUnbindHandler _queueUnbindHandler =
+ QueueUnbindHandler.getInstance();
+
+
+ public ServerMethodDispatcherImpl_0_9(AMQStateManager stateManager)
+ {
+ super(stateManager);
+ }
+
+ public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException
+ {
+ _basicRecoverSyncMethodHandler.methodReceived(getStateManager(), body, channelId);
+ return true;
+ }
+
+ public boolean dispatchBasicRecoverSyncOk(BasicRecoverSyncOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchChannelOk(ChannelOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchChannelPing(ChannelPingBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchChannelPong(ChannelPongBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchChannelResume(ChannelResumeBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageAppend(MessageAppendBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageCancel(MessageCancelBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageCheckpoint(MessageCheckpointBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageClose(MessageCloseBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageConsume(MessageConsumeBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageEmpty(MessageEmptyBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageGet(MessageGetBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageOffset(MessageOffsetBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageOk(MessageOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageOpen(MessageOpenBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageQos(MessageQosBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageRecover(MessageRecoverBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageReject(MessageRejectBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageResume(MessageResumeBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageTransfer(MessageTransferBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException
+ {
+ _queueUnbindHandler.methodReceived(getStateManager(),body,channelId);
+ return true;
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_91.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_91.java
new file mode 100644
index 0000000000..32cd4c4e9f
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_91.java
@@ -0,0 +1,168 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_0_91.MethodDispatcher_0_91;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.AMQException;
+
+
+public class ServerMethodDispatcherImpl_0_91
+ extends ServerMethodDispatcherImpl
+ implements MethodDispatcher_0_91
+
+{
+
+ private static final BasicRecoverSyncMethodHandler _basicRecoverSyncMethodHandler =
+ BasicRecoverSyncMethodHandler.getInstance();
+ private static final QueueUnbindHandler _queueUnbindHandler =
+ QueueUnbindHandler.getInstance();
+
+
+ public ServerMethodDispatcherImpl_0_91(AMQStateManager stateManager)
+ {
+ super(stateManager);
+ }
+
+ public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException
+ {
+ _basicRecoverSyncMethodHandler.methodReceived(getStateManager(), body, channelId);
+ return true;
+ }
+
+ public boolean dispatchBasicRecoverSyncOk(BasicRecoverSyncOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchChannelOk(ChannelOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchChannelPing(ChannelPingBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchChannelPong(ChannelPongBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchChannelResume(ChannelResumeBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageAppend(MessageAppendBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageCancel(MessageCancelBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageCheckpoint(MessageCheckpointBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageClose(MessageCloseBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageConsume(MessageConsumeBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageEmpty(MessageEmptyBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageGet(MessageGetBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageOffset(MessageOffsetBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageOk(MessageOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageOpen(MessageOpenBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageQos(MessageQosBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageRecover(MessageRecoverBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageReject(MessageRejectBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageResume(MessageResumeBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageTransfer(MessageTransferBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException
+ {
+ _queueUnbindHandler.methodReceived(getStateManager(),body,channelId);
+ return true;
+ }
+} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_8_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_8_0.java
new file mode 100644
index 0000000000..d599ca3d4e
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_8_0.java
@@ -0,0 +1,86 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.framing.amqp_8_0.MethodDispatcher_8_0;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.AMQException;
+
+public class ServerMethodDispatcherImpl_8_0
+ extends ServerMethodDispatcherImpl
+ implements MethodDispatcher_8_0
+{
+ public ServerMethodDispatcherImpl_8_0(AMQStateManager stateManager)
+ {
+ super(stateManager);
+ }
+
+ public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchChannelAlert(ChannelAlertBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchTestContent(TestContentBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTestContentOk(TestContentOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTestInteger(TestIntegerBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTestIntegerOk(TestIntegerOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTestString(TestStringBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTestStringOk(TestStringOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTestTable(TestTableBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTestTableOk(TestTableOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
new file mode 100644
index 0000000000..abd2bccc8d
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.TxCommitBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+
+public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody>
+{
+ private static final Logger _log = Logger.getLogger(TxCommitHandler.class);
+
+ private static TxCommitHandler _instance = new TxCommitHandler();
+
+ public static TxCommitHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private TxCommitHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, TxCommitBody body, int channelId) throws AMQException
+ {
+ AMQProtocolSession session = stateManager.getProtocolSession();
+
+ try
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Commit received on channel " + channelId);
+ }
+ AMQChannel channel = session.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
+ channel.commit();
+
+ MethodRegistry methodRegistry = session.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createTxCommitOkBody();
+ session.writeFrame(responseBody.generateFrame(channelId));
+
+ }
+ catch (AMQException e)
+ {
+ throw body.getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage());
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
new file mode 100644
index 0000000000..4643dee0a3
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
@@ -0,0 +1,82 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.TxRollbackBody;
+import org.apache.qpid.framing.TxRollbackOkBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+
+public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBody>
+{
+ private static TxRollbackHandler _instance = new TxRollbackHandler();
+
+ public static TxRollbackHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private TxRollbackHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, TxRollbackBody body, final int channelId) throws AMQException
+ {
+ final AMQProtocolSession session = stateManager.getProtocolSession();
+
+ try
+ {
+ AMQChannel channel = session.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
+
+
+
+ final MethodRegistry methodRegistry = session.getMethodRegistry();
+ final AMQMethodBody responseBody = methodRegistry.createTxRollbackOkBody();
+
+ Runnable task = new Runnable()
+ {
+
+ public void run()
+ {
+ session.writeFrame(responseBody.generateFrame(channelId));
+ }
+ };
+
+ channel.rollback(task);
+
+ }
+ catch (AMQException e)
+ {
+ throw body.getChannelException(e.getErrorCode(), "Failed to rollback: " + e.getMessage());
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
new file mode 100644
index 0000000000..308f5b73cf
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.TxSelectBody;
+import org.apache.qpid.framing.TxSelectOkBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.AMQChannel;
+
+public class TxSelectHandler implements StateAwareMethodListener<TxSelectBody>
+{
+ private static TxSelectHandler _instance = new TxSelectHandler();
+
+ public static TxSelectHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private TxSelectHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, TxSelectBody body, int channelId) throws AMQException
+ {
+ AMQProtocolSession session = stateManager.getProtocolSession();
+
+ AMQChannel channel = session.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
+
+ channel.setLocalTransactional();
+
+ MethodRegistry methodRegistry = session.getMethodRegistry();
+ TxSelectOkBody responseBody = methodRegistry.createTxSelectOkBody();
+ session.writeFrame(responseBody.generateFrame(channelId));
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/UnexpectedMethodException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/UnexpectedMethodException.java
new file mode 100644
index 0000000000..3526fdcae5
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/UnexpectedMethodException.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.AMQException;
+
+public class UnexpectedMethodException extends AMQException
+{
+
+ private static final long serialVersionUID = -255921574946294892L;
+
+ public UnexpectedMethodException(AMQMethodBody body)
+ {
+ super("Unexpected method recevied: " + body.getClass().getName());
+ }
+}