summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-02-16 13:03:11 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-02-16 13:03:11 +0000
commitb90aa16c3fd5f7ef66bf97f4a86045ff3c8f7f32 (patch)
tree56eab58037f302659d1257350002f01773165f45
parent267709b3bb825f6cfb652510307687bc67f3dbb0 (diff)
downloadqpid-python-b90aa16c3fd5f7ef66bf97f4a86045ff3c8f7f32.tar.gz
QPID-373 Queue|Exchange}{Bind|Declare} should be synchronous to correctly receive/handle error
Updated AMQSession to be synchronous git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@508382 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java181
1 files changed, 92 insertions, 89 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 94a004ffd0..c35c067bf6 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -58,6 +58,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidSelectorException;
import org.apache.qpid.AMQUndeliveredException;
+import org.apache.qpid.AMQInvalidRoutingKeyException;
import org.apache.qpid.client.failover.FailoverSupport;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.JMSBytesMessage;
@@ -98,6 +99,8 @@ import org.apache.qpid.framing.TxCommitBody;
import org.apache.qpid.framing.TxCommitOkBody;
import org.apache.qpid.framing.TxRollbackBody;
import org.apache.qpid.framing.TxRollbackOkBody;
+import org.apache.qpid.framing.QueueBindOkBody;
+import org.apache.qpid.framing.QueueDeclareOkBody;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -129,44 +132,32 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private AtomicBoolean _startedAtLeastOnce = new AtomicBoolean(false);
/**
- * Used to reference durable subscribers so they requests for unsubscribe can be handled
- * correctly. Note this only keeps a record of subscriptions which have been created
- * in the current instance. It does not remember subscriptions between executions of the
- * client
+ * Used to reference durable subscribers so they requests for unsubscribe can be handled correctly. Note this only
+ * keeps a record of subscriptions which have been created in the current instance. It does not remember
+ * subscriptions between executions of the client
*/
private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions =
new ConcurrentHashMap<String, TopicSubscriberAdaptor>();
private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap =
new ConcurrentHashMap<BasicMessageConsumer, String>();
- /**
- * Used in the consume method. We generate the consume tag on the client so that we can use the nowait
- * feature.
- */
+ /** Used in the consume method. We generate the consume tag on the client so that we can use the nowait feature. */
private int _nextTag = 1;
- /**
- * This queue is bounded and is used to store messages before being dispatched to the consumer
- */
+ /** This queue is bounded and is used to store messages before being dispatched to the consumer */
private final FlowControllingBlockingQueue _queue;
private Dispatcher _dispatcher;
private MessageFactoryRegistry _messageFactoryRegistry;
- /**
- * Set of all producers created by this session
- */
+ /** Set of all producers created by this session */
private Map _producers = new ConcurrentHashMap();
- /**
- * Maps from consumer tag (String) to JMSMessageConsumer instance
- */
+ /** Maps from consumer tag (String) to JMSMessageConsumer instance */
private Map<AMQShortString, BasicMessageConsumer> _consumers = new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
- /**
- * Maps from destination to count of JMSMessageConsumers
- */
+ /** Maps from destination to count of JMSMessageConsumers */
private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount =
new ConcurrentHashMap<Destination, AtomicInteger>();
@@ -183,18 +174,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
protected static final boolean DEFAULT_MANDATORY = true;
/**
- * The counter of the next producer id. This id is generated by the session and used only to allow the
- * producer to identify itself to the session when deregistering itself.
- * <p/>
- * Access to this id does not require to be synchronized since according to the JMS specification only one
- * thread of control is allowed to create producers for any given session instance.
+ * The counter of the next producer id. This id is generated by the session and used only to allow the producer to
+ * identify itself to the session when deregistering itself. <p/> Access to this id does not require to be
+ * synchronized since according to the JMS specification only one thread of control is allowed to create producers
+ * for any given session instance.
*/
private long _nextProducerId;
/**
- * Set when recover is called. This is to handle the case where recover() is called by application code
- * during onMessage() processing. We need to make sure we do not send an auto ack if recover was called.
+ * Set when recover is called. This is to handle the case where recover() is called by application code during
+ * onMessage() processing. We need to make sure we do not send an auto ack if recover was called.
*/
private boolean _inRecovery;
@@ -203,16 +193,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private boolean _hasMessageListeners;
- /**
- * Responsible for decoding a message fragment and passing it to the appropriate message consumer.
- */
+ /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
private class Dispatcher extends Thread
{
- /**
- * Track the 'stopped' state of the dispatcher, a session starts in the stopped state.
- */
+ /** Track the 'stopped' state of the dispatcher, a session starts in the stopped state. */
private final AtomicBoolean _closed = new AtomicBoolean(false);
private final Object _lock = new Object();
@@ -303,16 +289,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
message.getContentHeader(),
message.getBodies());
- int errorCode = message.getBounceBody().replyCode;
+ AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode);
AMQShortString reason = message.getBounceBody().replyText;
_logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
//@TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
- if (errorCode == AMQConstant.NO_CONSUMERS.getCode())
+ if (errorCode == AMQConstant.NO_CONSUMERS)
{
_connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage));
}
- else if (errorCode == AMQConstant.NO_ROUTE.getCode())
+ else if (errorCode == (AMQConstant.NO_ROUTE)
{
_connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage));
}
@@ -649,8 +635,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
/**
- * Called when the server initiates the closure of the session
- * unilaterally.
+ * Called when the server initiates the closure of the session unilaterally.
*
* @param e the exception that caused this session to be closed. Null causes the
*/
@@ -676,10 +661,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
/**
- * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after
- * failover when the client has veoted resubscription.
- * <p/>
- * The caller of this method must already hold the failover mutex.
+ * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after failover
+ * when the client has veoted resubscription. <p/> The caller of this method must already hold the failover mutex.
*/
void markClosed()
{
@@ -920,7 +903,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* Creates a QueueReceiver
*
* @param destination
+ *
* @return QueueReceiver - a wrapper around our MessageConsumer
+ *
* @throws JMSException
*/
public QueueReceiver createQueueReceiver(Destination destination) throws JMSException
@@ -936,7 +921,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*
* @param destination
* @param messageSelector
+ *
* @return QueueReceiver - a wrapper around our MessageConsumer
+ *
* @throws JMSException
*/
public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException
@@ -1105,6 +1092,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
ex.setLinkedException(ise);
throw ex;
}
+ catch (AMQInvalidRoutingKeyException e)
+ {
+ throw new InvalidDestinationException(amqd.getRoutingKey().toString());
+ }
catch (AMQException e)
{
JMSException ex = new JMSException("Error registering consumer: " + e);
@@ -1155,7 +1146,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
- public void declareExchange(AMQShortString name, AMQShortString type)
+ public void declareExchange(AMQShortString name, AMQShortString type) throws AMQException
{
declareExchange(name, type, getProtocolHandler());
}
@@ -1177,12 +1168,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class);
}
- private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler)
+ private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException
{
declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler);
}
- private void declareExchange(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler)
+ private void declareExchange(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler) throws AMQException
{
// TODO: Be aware of possible changes to parameter order as versions change.
AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId,
@@ -1192,11 +1183,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
false, // durable
name, // exchange
false, // internal
- true, // nowait
+ false, // nowait
false, // passive
getTicket(), // ticket
type); // type
- protocolHandler.writeFrame(exchangeDeclare);
+
+ protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
}
/**
@@ -1204,7 +1196,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*
* @param amqd
* @param protocolHandler
+ *
* @return the queue name. This is useful where the broker is generating a queue name on behalf of the client.
+ *
* @throws AMQException
*/
private AMQShortString declareQueue(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException
@@ -1217,6 +1211,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
amqd.setQueueName(protocolHandler.generateQueueName());
}
+ //TODO verify the destiation is valid. else throw
+
// TODO: Be aware of possible changes to parameter order as versions change.
AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId,
getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
@@ -1224,12 +1220,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
amqd.isAutoDelete(), // autoDelete
amqd.isDurable(), // durable
amqd.isExclusive(), // exclusive
- true, // nowait
+ false, // nowait
false, // passive
amqd.getAMQQueueName(), // queue
getTicket()); // ticket
- protocolHandler.writeFrame(queueDeclare);
+ protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class);
return amqd.getAMQQueueName();
}
@@ -1240,18 +1236,20 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
ft, // arguments
amqd.getExchangeName(), // exchange
- true, // nowait
+ false, // nowait
queueName, // queue
amqd.getRoutingKey(), // routingKey
getTicket()); // ticket
- protocolHandler.writeFrame(queueBind);
+
+ protocolHandler.syncWrite(queueBind, QueueBindOkBody.class);
}
/**
* Register to consume from the queue.
*
* @param queueName
+ *
* @return the consumer tag generated by the broker
*/
private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler,
@@ -1336,7 +1334,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* Creates a QueueReceiver wrapping a MessageConsumer
*
* @param queue
+ *
* @return QueueReceiver
+ *
* @throws JMSException
*/
public QueueReceiver createReceiver(Queue queue) throws JMSException
@@ -1352,7 +1352,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*
* @param queue
* @param messageSelector
+ *
* @return QueueReceiver
+ *
* @throws JMSException
*/
public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException
@@ -1399,7 +1401,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* Creates a non-durable subscriber
*
* @param topic
+ *
* @return TopicSubscriber - a wrapper round our MessageConsumer
+ *
* @throws JMSException
*/
public TopicSubscriber createSubscriber(Topic topic) throws JMSException
@@ -1416,7 +1420,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* @param topic
* @param messageSelector
* @param noLocal
+ *
* @return TopicSubscriber - a wrapper round our MessageConsumer
+ *
* @throws JMSException
*/
public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
@@ -1493,9 +1499,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
- /**
- * Note, currently this does not handle reuse of the same name with different topics correctly.
- */
+ /** Note, currently this does not handle reuse of the same name with different topics correctly. */
public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
throws JMSException
{
@@ -1606,8 +1610,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
/**
- * Invoked by the MINA IO thread (indirectly) when a message is received from the transport.
- * Puts the message onto the queue read by the dispatcher.
+ * Invoked by the MINA IO thread (indirectly) when a message is received from the transport. Puts the message onto
+ * the queue read by the dispatcher.
*
* @param message the message that has been received
*/
@@ -1622,13 +1626,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
/**
- * Acknowledge a message or several messages. This method can be called via AbstractJMSMessage or from
- * a BasicConsumer. The former where the mode is CLIENT_ACK and the latter where the mode is
- * AUTO_ACK or similar.
+ * Acknowledge a message or several messages. This method can be called via AbstractJMSMessage or from a
+ * BasicConsumer. The former where the mode is CLIENT_ACK and the latter where the mode is AUTO_ACK or similar.
*
* @param deliveryTag the tag of the last message to be acknowledged
- * @param multiple if true will acknowledge all messages up to and including the one specified by the
- * delivery tag
+ * @param multiple if true will acknowledge all messages up to and including the one specified by the delivery
+ * tag
*/
public void acknowledgeMessage(long deliveryTag, boolean multiple)
{
@@ -1710,7 +1713,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
//stop the server delivering messages to this session
suspendChannel();
-
+
if (_dispatcher != null)
{
_dispatcher.setConnectionStopped(true);
@@ -1721,6 +1724,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* Callers must hold the failover mutex before calling this method.
*
* @param consumer
+ *
* @throws AMQException
*/
void registerConsumer(BasicMessageConsumer consumer, boolean nowait) throws AMQException
@@ -1746,8 +1750,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
/**
- * Called by the MessageConsumer when closing, to deregister the consumer from the
- * map from consumerTag to consumer instance.
+ * Called by the MessageConsumer when closing, to deregister the consumer from the map from consumerTag to consumer
+ * instance.
*
* @param consumer the consum
*/
@@ -1883,7 +1887,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
-
public int getTicket()
{
return _ticket;
@@ -1898,30 +1901,30 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public void requestAccess(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write, boolean read) throws AMQException
{
getProtocolHandler().writeCommandFrameAndWaitForReply(AccessRequestBody.createAMQFrame(getChannelId(),
- getProtocolMajorVersion(),
- getProtocolMinorVersion(),
- active,
- exclusive,
- passive,
- read,
- realm,
- write),
- new BlockingMethodFrameListener(_channelId)
- {
-
- public boolean processMethod(int channelId, AMQMethodBody frame) throws AMQException
- {
- if(frame instanceof AccessRequestOkBody)
- {
- setTicket(((AccessRequestOkBody)frame).getTicket());
- return true;
- }
- else
- {
- return false;
- }
- }
- });
+ getProtocolMajorVersion(),
+ getProtocolMinorVersion(),
+ active,
+ exclusive,
+ passive,
+ read,
+ realm,
+ write),
+ new BlockingMethodFrameListener(_channelId)
+ {
+
+ public boolean processMethod(int channelId, AMQMethodBody frame) throws AMQException
+ {
+ if (frame instanceof AccessRequestOkBody)
+ {
+ setTicket(((AccessRequestOkBody) frame).getTicket());
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ });
}