diff options
author | Aidan Skinner <aidan@apache.org> | 2008-06-26 10:25:36 +0000 |
---|---|---|
committer | Aidan Skinner <aidan@apache.org> | 2008-06-26 10:25:36 +0000 |
commit | 1e311aa245383ee1c9dc71116ecd746ff5e80ade (patch) | |
tree | 91adb6ea3f693884aec562cea45d156480fe8a45 | |
parent | 39dd5d83afcb6c32f82fbb414caa717423365dca (diff) | |
download | qpid-python-1e311aa245383ee1c9dc71116ecd746ff5e80ade.tar.gz |
QPID-854 QPID-999 : Merge Changes to the client to make the dispatcher responsible for closing the queue browser when all the messages have been processed.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@671845 13f79535-47bb-0310-9956-ffa450edef68
3 files changed, 231 insertions, 3 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 75e8cbe155..6d386f31e6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -802,6 +802,10 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess deregisterConsumer(consumer); } + else + { + _queue.add(new UnprocessedMessage.CloseConsumerMessage(consumer)); + } } } } @@ -1150,6 +1154,13 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess public StreamMessage createStreamMessage() throws JMSException { + // This method needs to be improved. Throwables only arrive here from the mina : exceptionRecived + // calls through connection.closeAllSessions which is also called by the public connection.close() + // with a null cause + // When we are closing the Session due to a protocol session error we simply create a new AMQException + // with the correct error code and text this is cleary WRONG as the instanceof check below will fail. + // We need to determin here if the connection should be + synchronized (_connection.getFailoverMutex()) { checkNotClosed(); @@ -1599,7 +1610,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); } - else + else // Queue Browser { if (isQueueBound(getDefaultTopicExchangeName(), AMQTopic.getDurableTopicQueueName(name, _connection))) @@ -2778,7 +2789,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess _lock.wait(); } - if (tagLE(deliveryTag, _rollbackMark.get())) + if (!(message instanceof UnprocessedMessage.CloseConsumerMessage) + && tagLE(deliveryTag, _rollbackMark.get())) { rejectMessage(message, true); } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 727b08a812..47250c0f60 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -507,6 +507,12 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me throw e; } + else if (o instanceof UnprocessedMessage.CloseConsumerMessage) + { + _closed.set(true); + deregisterConsumer(); + return null; + } else { return (AbstractJMSMessage) o; @@ -561,6 +567,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me } else { + // FIXME: wow this is ugly // //fixme this probably is not right // if (!isNoConsume()) { // done in BasicCancelOK Handler but not sending one so just deregister. @@ -615,6 +622,36 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me } /** + * @param closeMessage + * this message signals that we should close the browser + */ + public void notifyCloseMessage(UnprocessedMessage.CloseConsumerMessage closeMessage) + { + if (isMessageListenerSet()) + { + // Currently only possible to get this msg type with a browser. + // If we get the message here then we should probably just close + // this consumer. + // Though an AutoClose consumer with message listener is quite odd.. + // Just log out the fact so we know where we are + _logger.warn("Using an AutoCloseconsumer with message listener is not supported."); + } + else + { + try + { + _synchronousQueue.put(closeMessage); + } + catch (InterruptedException e) + { + _logger.info(" SynchronousQueue.put interupted. Usually result of connection closing," + + "but we shouldn't have close yet"); + } + } + } + + + /** * Called from the AMQSession when a message has arrived for this consumer. This methods handles both the case of a * message listener or a synchronous receive() caller. * @@ -622,6 +659,12 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me */ void notifyMessage(UnprocessedMessage messageFrame) { + if (messageFrame instanceof UnprocessedMessage.CloseConsumerMessage) + { + notifyCloseMessage((UnprocessedMessage.CloseConsumerMessage) messageFrame); + return; + } + final boolean debug = _logger.isDebugEnabled(); if (debug) diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java index 17efd7e24f..076d87cbd1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java @@ -107,4 +107,177 @@ public abstract class UnprocessedMessage<H,B> { return ""; } -} + + public static final class CloseConsumerMessage extends UnprocessedMessage + { + AMQShortString _consumerTag; + + public CloseConsumerMessage(int channelId, long deliveryId, AMQShortString consumerTag, + AMQShortString exchange, AMQShortString routingKey, boolean redelivered) + { + super(channelId, deliveryId, consumerTag, exchange, routingKey, redelivered); + _consumerTag = consumerTag; + } + + public CloseConsumerMessage(BasicMessageConsumer consumer) + { + this(0, 0, consumer.getConsumerTag(), null, null, false); + } + + public BasicDeliverBody getDeliverBody() + { + return new BasicDeliverBody() + { + + public AMQShortString getConsumerTag() + { + return _consumerTag; + } + + @Override + public long getDeliveryTag() + { + return 0; + } + + @Override + public AMQShortString getExchange() + { + return null; + } + + @Override + public boolean getRedelivered() + { + return false; + } + + @Override + public AMQShortString getRoutingKey() + { + return null; + } + + @Override + public boolean execute(MethodDispatcher methodDispatcher, int channelId) throws AMQException + { + return false; + } + + @Override + public AMQFrame generateFrame(int channelId) + { + return null; + } + + @Override + public AMQChannelException getChannelException(AMQConstant code, String message) + { + return null; + } + + @Override + public AMQChannelException getChannelException(AMQConstant code, String message, Throwable cause) + { + return null; + } + + @Override + public AMQChannelException getChannelNotFoundException(int channelId) + { + return null; + } + + @Override + public int getClazz() + { + return 0; + } + + @Override + public AMQConnectionException getConnectionException(AMQConstant code, String message) + { + return null; + } + + @Override + public AMQConnectionException getConnectionException(AMQConstant code, String message, + Throwable cause) + { + return null; + } + + @Override + public byte getMajor() + { + return 0; + } + + @Override + public int getMethod() + { + return 0; + } + + @Override + public byte getMinor() + { + return 0; + } + + @Override + public int getSize() + { + return 0; + } + + @Override + public void writeMethodPayload(ByteBuffer buffer) + { + } + + @Override + public void writePayload(ByteBuffer buffer) + { + } + + @Override + public byte getFrameType() + { + return 0; + } + + @Override + public void handle(int channelId, AMQVersionAwareProtocolSession amqMinaProtocolSession) + throws AMQException + { + + } + }; + } + + @Override + public List getBodies() + { + return null; + } + + @Override + public Object getContentHeader() + { + return null; + } + + @Override + public void receiveBody(Object nativeMessageBody) + { + + } + + @Override + public void setContentHeader(Object nativeMessageHeader) + { + + } + } +}
\ No newline at end of file |