summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java16
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java43
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java175
3 files changed, 231 insertions, 3 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 75e8cbe155..6d386f31e6 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -802,6 +802,10 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
deregisterConsumer(consumer);
}
+ else
+ {
+ _queue.add(new UnprocessedMessage.CloseConsumerMessage(consumer));
+ }
}
}
}
@@ -1150,6 +1154,13 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
public StreamMessage createStreamMessage() throws JMSException
{
+ // This method needs to be improved. Throwables only arrive here from the mina : exceptionRecived
+ // calls through connection.closeAllSessions which is also called by the public connection.close()
+ // with a null cause
+ // When we are closing the Session due to a protocol session error we simply create a new AMQException
+ // with the correct error code and text this is cleary WRONG as the instanceof check below will fail.
+ // We need to determin here if the connection should be
+
synchronized (_connection.getFailoverMutex())
{
checkNotClosed();
@@ -1599,7 +1610,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
}
- else
+ else // Queue Browser
{
if (isQueueBound(getDefaultTopicExchangeName(), AMQTopic.getDurableTopicQueueName(name, _connection)))
@@ -2778,7 +2789,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
_lock.wait();
}
- if (tagLE(deliveryTag, _rollbackMark.get()))
+ if (!(message instanceof UnprocessedMessage.CloseConsumerMessage)
+ && tagLE(deliveryTag, _rollbackMark.get()))
{
rejectMessage(message, true);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 727b08a812..47250c0f60 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -507,6 +507,12 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
throw e;
}
+ else if (o instanceof UnprocessedMessage.CloseConsumerMessage)
+ {
+ _closed.set(true);
+ deregisterConsumer();
+ return null;
+ }
else
{
return (AbstractJMSMessage) o;
@@ -561,6 +567,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
}
else
{
+ // FIXME: wow this is ugly
// //fixme this probably is not right
// if (!isNoConsume())
{ // done in BasicCancelOK Handler but not sending one so just deregister.
@@ -615,6 +622,36 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
}
/**
+ * @param closeMessage
+ * this message signals that we should close the browser
+ */
+ public void notifyCloseMessage(UnprocessedMessage.CloseConsumerMessage closeMessage)
+ {
+ if (isMessageListenerSet())
+ {
+ // Currently only possible to get this msg type with a browser.
+ // If we get the message here then we should probably just close
+ // this consumer.
+ // Though an AutoClose consumer with message listener is quite odd..
+ // Just log out the fact so we know where we are
+ _logger.warn("Using an AutoCloseconsumer with message listener is not supported.");
+ }
+ else
+ {
+ try
+ {
+ _synchronousQueue.put(closeMessage);
+ }
+ catch (InterruptedException e)
+ {
+ _logger.info(" SynchronousQueue.put interupted. Usually result of connection closing,"
+ + "but we shouldn't have close yet");
+ }
+ }
+ }
+
+
+ /**
* Called from the AMQSession when a message has arrived for this consumer. This methods handles both the case of a
* message listener or a synchronous receive() caller.
*
@@ -622,6 +659,12 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
*/
void notifyMessage(UnprocessedMessage messageFrame)
{
+ if (messageFrame instanceof UnprocessedMessage.CloseConsumerMessage)
+ {
+ notifyCloseMessage((UnprocessedMessage.CloseConsumerMessage) messageFrame);
+ return;
+ }
+
final boolean debug = _logger.isDebugEnabled();
if (debug)
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
index 17efd7e24f..076d87cbd1 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
@@ -107,4 +107,177 @@ public abstract class UnprocessedMessage<H,B>
{
return "";
}
-}
+
+ public static final class CloseConsumerMessage extends UnprocessedMessage
+ {
+ AMQShortString _consumerTag;
+
+ public CloseConsumerMessage(int channelId, long deliveryId, AMQShortString consumerTag,
+ AMQShortString exchange, AMQShortString routingKey, boolean redelivered)
+ {
+ super(channelId, deliveryId, consumerTag, exchange, routingKey, redelivered);
+ _consumerTag = consumerTag;
+ }
+
+ public CloseConsumerMessage(BasicMessageConsumer consumer)
+ {
+ this(0, 0, consumer.getConsumerTag(), null, null, false);
+ }
+
+ public BasicDeliverBody getDeliverBody()
+ {
+ return new BasicDeliverBody()
+ {
+
+ public AMQShortString getConsumerTag()
+ {
+ return _consumerTag;
+ }
+
+ @Override
+ public long getDeliveryTag()
+ {
+ return 0;
+ }
+
+ @Override
+ public AMQShortString getExchange()
+ {
+ return null;
+ }
+
+ @Override
+ public boolean getRedelivered()
+ {
+ return false;
+ }
+
+ @Override
+ public AMQShortString getRoutingKey()
+ {
+ return null;
+ }
+
+ @Override
+ public boolean execute(MethodDispatcher methodDispatcher, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ @Override
+ public AMQFrame generateFrame(int channelId)
+ {
+ return null;
+ }
+
+ @Override
+ public AMQChannelException getChannelException(AMQConstant code, String message)
+ {
+ return null;
+ }
+
+ @Override
+ public AMQChannelException getChannelException(AMQConstant code, String message, Throwable cause)
+ {
+ return null;
+ }
+
+ @Override
+ public AMQChannelException getChannelNotFoundException(int channelId)
+ {
+ return null;
+ }
+
+ @Override
+ public int getClazz()
+ {
+ return 0;
+ }
+
+ @Override
+ public AMQConnectionException getConnectionException(AMQConstant code, String message)
+ {
+ return null;
+ }
+
+ @Override
+ public AMQConnectionException getConnectionException(AMQConstant code, String message,
+ Throwable cause)
+ {
+ return null;
+ }
+
+ @Override
+ public byte getMajor()
+ {
+ return 0;
+ }
+
+ @Override
+ public int getMethod()
+ {
+ return 0;
+ }
+
+ @Override
+ public byte getMinor()
+ {
+ return 0;
+ }
+
+ @Override
+ public int getSize()
+ {
+ return 0;
+ }
+
+ @Override
+ public void writeMethodPayload(ByteBuffer buffer)
+ {
+ }
+
+ @Override
+ public void writePayload(ByteBuffer buffer)
+ {
+ }
+
+ @Override
+ public byte getFrameType()
+ {
+ return 0;
+ }
+
+ @Override
+ public void handle(int channelId, AMQVersionAwareProtocolSession amqMinaProtocolSession)
+ throws AMQException
+ {
+
+ }
+ };
+ }
+
+ @Override
+ public List getBodies()
+ {
+ return null;
+ }
+
+ @Override
+ public Object getContentHeader()
+ {
+ return null;
+ }
+
+ @Override
+ public void receiveBody(Object nativeMessageBody)
+ {
+
+ }
+
+ @Override
+ public void setContentHeader(Object nativeMessageHeader)
+ {
+
+ }
+ }
+} \ No newline at end of file