diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-02-16 13:03:11 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-02-16 13:03:11 +0000 |
commit | b90aa16c3fd5f7ef66bf97f4a86045ff3c8f7f32 (patch) | |
tree | 56eab58037f302659d1257350002f01773165f45 | |
parent | 267709b3bb825f6cfb652510307687bc67f3dbb0 (diff) | |
download | qpid-python-b90aa16c3fd5f7ef66bf97f4a86045ff3c8f7f32.tar.gz |
QPID-373 Queue|Exchange}{Bind|Declare} should be synchronous to correctly receive/handle error
Updated AMQSession to be synchronous
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@508382 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 181 |
1 files changed, 92 insertions, 89 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 94a004ffd0..c35c067bf6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -58,6 +58,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInvalidSelectorException; import org.apache.qpid.AMQUndeliveredException; +import org.apache.qpid.AMQInvalidRoutingKeyException; import org.apache.qpid.client.failover.FailoverSupport; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.JMSBytesMessage; @@ -98,6 +99,8 @@ import org.apache.qpid.framing.TxCommitBody; import org.apache.qpid.framing.TxCommitOkBody; import org.apache.qpid.framing.TxRollbackBody; import org.apache.qpid.framing.TxRollbackOkBody; +import org.apache.qpid.framing.QueueBindOkBody; +import org.apache.qpid.framing.QueueDeclareOkBody; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; @@ -129,44 +132,32 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private AtomicBoolean _startedAtLeastOnce = new AtomicBoolean(false); /** - * Used to reference durable subscribers so they requests for unsubscribe can be handled - * correctly. Note this only keeps a record of subscriptions which have been created - * in the current instance. It does not remember subscriptions between executions of the - * client + * Used to reference durable subscribers so they requests for unsubscribe can be handled correctly. Note this only + * keeps a record of subscriptions which have been created in the current instance. It does not remember + * subscriptions between executions of the client */ private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions = new ConcurrentHashMap<String, TopicSubscriberAdaptor>(); private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap = new ConcurrentHashMap<BasicMessageConsumer, String>(); - /** - * Used in the consume method. We generate the consume tag on the client so that we can use the nowait - * feature. - */ + /** Used in the consume method. We generate the consume tag on the client so that we can use the nowait feature. */ private int _nextTag = 1; - /** - * This queue is bounded and is used to store messages before being dispatched to the consumer - */ + /** This queue is bounded and is used to store messages before being dispatched to the consumer */ private final FlowControllingBlockingQueue _queue; private Dispatcher _dispatcher; private MessageFactoryRegistry _messageFactoryRegistry; - /** - * Set of all producers created by this session - */ + /** Set of all producers created by this session */ private Map _producers = new ConcurrentHashMap(); - /** - * Maps from consumer tag (String) to JMSMessageConsumer instance - */ + /** Maps from consumer tag (String) to JMSMessageConsumer instance */ private Map<AMQShortString, BasicMessageConsumer> _consumers = new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>(); - /** - * Maps from destination to count of JMSMessageConsumers - */ + /** Maps from destination to count of JMSMessageConsumers */ private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount = new ConcurrentHashMap<Destination, AtomicInteger>(); @@ -183,18 +174,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi protected static final boolean DEFAULT_MANDATORY = true; /** - * The counter of the next producer id. This id is generated by the session and used only to allow the - * producer to identify itself to the session when deregistering itself. - * <p/> - * Access to this id does not require to be synchronized since according to the JMS specification only one - * thread of control is allowed to create producers for any given session instance. + * The counter of the next producer id. This id is generated by the session and used only to allow the producer to + * identify itself to the session when deregistering itself. <p/> Access to this id does not require to be + * synchronized since according to the JMS specification only one thread of control is allowed to create producers + * for any given session instance. */ private long _nextProducerId; /** - * Set when recover is called. This is to handle the case where recover() is called by application code - * during onMessage() processing. We need to make sure we do not send an auto ack if recover was called. + * Set when recover is called. This is to handle the case where recover() is called by application code during + * onMessage() processing. We need to make sure we do not send an auto ack if recover was called. */ private boolean _inRecovery; @@ -203,16 +193,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private boolean _hasMessageListeners; - /** - * Responsible for decoding a message fragment and passing it to the appropriate message consumer. - */ + /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ private class Dispatcher extends Thread { - /** - * Track the 'stopped' state of the dispatcher, a session starts in the stopped state. - */ + /** Track the 'stopped' state of the dispatcher, a session starts in the stopped state. */ private final AtomicBoolean _closed = new AtomicBoolean(false); private final Object _lock = new Object(); @@ -303,16 +289,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi message.getContentHeader(), message.getBodies()); - int errorCode = message.getBounceBody().replyCode; + AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode); AMQShortString reason = message.getBounceBody().replyText; _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); //@TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. - if (errorCode == AMQConstant.NO_CONSUMERS.getCode()) + if (errorCode == AMQConstant.NO_CONSUMERS) { _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage)); } - else if (errorCode == AMQConstant.NO_ROUTE.getCode()) + else if (errorCode == (AMQConstant.NO_ROUTE) { _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage)); } @@ -649,8 +635,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } /** - * Called when the server initiates the closure of the session - * unilaterally. + * Called when the server initiates the closure of the session unilaterally. * * @param e the exception that caused this session to be closed. Null causes the */ @@ -676,10 +661,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } /** - * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after - * failover when the client has veoted resubscription. - * <p/> - * The caller of this method must already hold the failover mutex. + * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after failover + * when the client has veoted resubscription. <p/> The caller of this method must already hold the failover mutex. */ void markClosed() { @@ -920,7 +903,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * Creates a QueueReceiver * * @param destination + * * @return QueueReceiver - a wrapper around our MessageConsumer + * * @throws JMSException */ public QueueReceiver createQueueReceiver(Destination destination) throws JMSException @@ -936,7 +921,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * * @param destination * @param messageSelector + * * @return QueueReceiver - a wrapper around our MessageConsumer + * * @throws JMSException */ public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException @@ -1105,6 +1092,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi ex.setLinkedException(ise); throw ex; } + catch (AMQInvalidRoutingKeyException e) + { + throw new InvalidDestinationException(amqd.getRoutingKey().toString()); + } catch (AMQException e) { JMSException ex = new JMSException("Error registering consumer: " + e); @@ -1155,7 +1146,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } - public void declareExchange(AMQShortString name, AMQShortString type) + public void declareExchange(AMQShortString name, AMQShortString type) throws AMQException { declareExchange(name, type, getProtocolHandler()); } @@ -1177,12 +1168,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class); } - private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler) + private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException { declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler); } - private void declareExchange(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler) + private void declareExchange(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler) throws AMQException { // TODO: Be aware of possible changes to parameter order as versions change. AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId, @@ -1192,11 +1183,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi false, // durable name, // exchange false, // internal - true, // nowait + false, // nowait false, // passive getTicket(), // ticket type); // type - protocolHandler.writeFrame(exchangeDeclare); + + protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); } /** @@ -1204,7 +1196,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * * @param amqd * @param protocolHandler + * * @return the queue name. This is useful where the broker is generating a queue name on behalf of the client. + * * @throws AMQException */ private AMQShortString declareQueue(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException @@ -1217,6 +1211,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi amqd.setQueueName(protocolHandler.generateQueueName()); } + //TODO verify the destiation is valid. else throw + // TODO: Be aware of possible changes to parameter order as versions change. AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) @@ -1224,12 +1220,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi amqd.isAutoDelete(), // autoDelete amqd.isDurable(), // durable amqd.isExclusive(), // exclusive - true, // nowait + false, // nowait false, // passive amqd.getAMQQueueName(), // queue getTicket()); // ticket - protocolHandler.writeFrame(queueDeclare); + protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class); return amqd.getAMQQueueName(); } @@ -1240,18 +1236,20 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) ft, // arguments amqd.getExchangeName(), // exchange - true, // nowait + false, // nowait queueName, // queue amqd.getRoutingKey(), // routingKey getTicket()); // ticket - protocolHandler.writeFrame(queueBind); + + protocolHandler.syncWrite(queueBind, QueueBindOkBody.class); } /** * Register to consume from the queue. * * @param queueName + * * @return the consumer tag generated by the broker */ private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, @@ -1336,7 +1334,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * Creates a QueueReceiver wrapping a MessageConsumer * * @param queue + * * @return QueueReceiver + * * @throws JMSException */ public QueueReceiver createReceiver(Queue queue) throws JMSException @@ -1352,7 +1352,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * * @param queue * @param messageSelector + * * @return QueueReceiver + * * @throws JMSException */ public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException @@ -1399,7 +1401,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * Creates a non-durable subscriber * * @param topic + * * @return TopicSubscriber - a wrapper round our MessageConsumer + * * @throws JMSException */ public TopicSubscriber createSubscriber(Topic topic) throws JMSException @@ -1416,7 +1420,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @param topic * @param messageSelector * @param noLocal + * * @return TopicSubscriber - a wrapper round our MessageConsumer + * * @throws JMSException */ public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException @@ -1493,9 +1499,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - /** - * Note, currently this does not handle reuse of the same name with different topics correctly. - */ + /** Note, currently this does not handle reuse of the same name with different topics correctly. */ public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException { @@ -1606,8 +1610,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } /** - * Invoked by the MINA IO thread (indirectly) when a message is received from the transport. - * Puts the message onto the queue read by the dispatcher. + * Invoked by the MINA IO thread (indirectly) when a message is received from the transport. Puts the message onto + * the queue read by the dispatcher. * * @param message the message that has been received */ @@ -1622,13 +1626,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } /** - * Acknowledge a message or several messages. This method can be called via AbstractJMSMessage or from - * a BasicConsumer. The former where the mode is CLIENT_ACK and the latter where the mode is - * AUTO_ACK or similar. + * Acknowledge a message or several messages. This method can be called via AbstractJMSMessage or from a + * BasicConsumer. The former where the mode is CLIENT_ACK and the latter where the mode is AUTO_ACK or similar. * * @param deliveryTag the tag of the last message to be acknowledged - * @param multiple if true will acknowledge all messages up to and including the one specified by the - * delivery tag + * @param multiple if true will acknowledge all messages up to and including the one specified by the delivery + * tag */ public void acknowledgeMessage(long deliveryTag, boolean multiple) { @@ -1710,7 +1713,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { //stop the server delivering messages to this session suspendChannel(); - + if (_dispatcher != null) { _dispatcher.setConnectionStopped(true); @@ -1721,6 +1724,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * Callers must hold the failover mutex before calling this method. * * @param consumer + * * @throws AMQException */ void registerConsumer(BasicMessageConsumer consumer, boolean nowait) throws AMQException @@ -1746,8 +1750,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } /** - * Called by the MessageConsumer when closing, to deregister the consumer from the - * map from consumerTag to consumer instance. + * Called by the MessageConsumer when closing, to deregister the consumer from the map from consumerTag to consumer + * instance. * * @param consumer the consum */ @@ -1883,7 +1887,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } - public int getTicket() { return _ticket; @@ -1898,30 +1901,30 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void requestAccess(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write, boolean read) throws AMQException { getProtocolHandler().writeCommandFrameAndWaitForReply(AccessRequestBody.createAMQFrame(getChannelId(), - getProtocolMajorVersion(), - getProtocolMinorVersion(), - active, - exclusive, - passive, - read, - realm, - write), - new BlockingMethodFrameListener(_channelId) - { - - public boolean processMethod(int channelId, AMQMethodBody frame) throws AMQException - { - if(frame instanceof AccessRequestOkBody) - { - setTicket(((AccessRequestOkBody)frame).getTicket()); - return true; - } - else - { - return false; - } - } - }); + getProtocolMajorVersion(), + getProtocolMinorVersion(), + active, + exclusive, + passive, + read, + realm, + write), + new BlockingMethodFrameListener(_channelId) + { + + public boolean processMethod(int channelId, AMQMethodBody frame) throws AMQException + { + if (frame instanceof AccessRequestOkBody) + { + setTicket(((AccessRequestOkBody) frame).getTicket()); + return true; + } + else + { + return false; + } + } + }); } |