summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2007-01-22 10:26:03 +0000
committerRobert Greig <rgreig@apache.org>2007-01-22 10:26:03 +0000
commit0420b0021e02254d161f027529b80991060d3cc8 (patch)
tree83ba65aa92d0de13654115f504a2ba952dc06bc0
parent56c9945fab16ae5c2df607b4705a79ba85128674 (diff)
downloadqpid-python-0420b0021e02254d161f027529b80991060d3cc8.tar.gz
QPID-275 : Patch supplied by Rob Godfrey - Add support for get / purge / qos size / default exchanges and some other small fixes highlighted by the python tests
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@498574 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java77
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java81
2 files changed, 158 insertions, 0 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
new file mode 100644
index 0000000000..1eb3152973
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
@@ -0,0 +1,77 @@
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.framing.*;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.ConsumerTagNotUniqueException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidSelectorException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+
+public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetBody>
+{
+ private static final Logger _log = Logger.getLogger(BasicGetMethodHandler.class);
+
+ private static final BasicGetMethodHandler _instance = new BasicGetMethodHandler();
+
+ public static BasicGetMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private BasicGetMethodHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
+ ExchangeRegistry exchangeRegistry, AMQProtocolSession session,
+ AMQMethodEvent<BasicGetBody> evt) throws AMQException
+ {
+ BasicGetBody body = evt.getMethod();
+ final int channelId = evt.getChannelId();
+
+ AMQChannel channel = session.getChannel(channelId);
+ if (channel == null)
+ {
+ _log.error("Channel " + channelId + " not found");
+ // TODO: either alert or error that the
+ }
+ else
+ {
+ AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : queueRegistry.getQueue(body.queue);
+
+ if (queue == null)
+ {
+ _log.info("No queue for '" + body.queue + "'");
+ if(body.queue!=null)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_FOUND.getCode(),
+ "No such queue, '" + body.queue + "'");
+ }
+ else
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(),
+ "No queue name provided, no default queue defined.");
+ }
+ }
+ else
+ {
+ if(!queue.performGet(session, channel, !body.noAck))
+ {
+
+
+ // TODO - set clusterId
+ session.writeFrame(BasicGetEmptyBody.createAMQFrame(channelId, (byte) 8, (byte) 0, null));
+ }
+ }
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
new file mode 100644
index 0000000000..ad63d36351
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
@@ -0,0 +1,81 @@
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.framing.*;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+
+public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBody>
+{
+ private static final QueuePurgeHandler _instance = new QueuePurgeHandler();
+
+ public static QueuePurgeHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private final boolean _failIfNotFound;
+
+ public QueuePurgeHandler()
+ {
+ this(true);
+ }
+
+ public QueuePurgeHandler(boolean failIfNotFound)
+ {
+ _failIfNotFound = failIfNotFound;
+ }
+
+ public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueuePurgeBody> evt) throws AMQException
+ {
+ QueuePurgeBody body = evt.getMethod();
+ AMQQueue queue;
+ if(body.queue == null)
+ {
+ queue = session.getChannel(evt.getChannelId()).getDefaultQueue();
+ if(queue == null)
+ {
+ if(_failIfNotFound)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(),"No queue specified.");
+ }
+
+ }
+ }
+ else
+ {
+ queue = queues.getQueue(body.queue);
+ }
+
+ if(queue == null)
+ {
+ if(_failIfNotFound)
+ {
+ throw body.getChannelException(404, "Queue " + body.queue + " does not exist.");
+ }
+ }
+ else
+ {
+ long purged = queue.clearQueue(session.getChannel(evt.getChannelId()).getStoreContext());
+
+
+ if(!body.nowait)
+ {
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeFrame(QueuePurgeOkBody.createAMQFrame(evt.getChannelId(),
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ purged)); // messageCount
+ }
+ }
+ }
+}