diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-03-13 10:35:42 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-03-13 10:35:42 +0000 |
commit | c4f018f7c10d2169ced4c59e776844ee5cf52d33 (patch) | |
tree | 421d85cf41bf382bab618587298d9d6bef825bdc /java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java | |
parent | 685ad5615e73f02f76f69841162fb9aa126892d2 (diff) | |
download | qpid-python-c4f018f7c10d2169ced4c59e776844ee5cf52d33.tar.gz |
QPID-346,QPID-386,QPID-403, QPID-410 Rollback, Basic-Reject, QueueBrowser NO_ACK.
QPID-346 Message loss after rollback\recover
QPID-386 Updated Transactional Tests to cover underlying AMQP/Qpid state.
QPID-403 Implement Basic.Reject
QPID-410 Queue Browsers should use not acknowledge messages.
-------------------------------------
Broker
TxAck - Added comment and fixed white space
UnacknowledgedMessage - Added comment for messageDecrement
AMQChannel - Added extra debugging.
+ Created a NonTransactionalContext for requeuing messages as using txContext will tie the requeue to any runing transaction.
+ Updated message reference counting. So it is in terms of queues don't increment when giving to client.
BasicCancelMethodHandler - Added Debug log.
BasicConsumeMethodHandler - Reverted to directly writes frames to the session, throwing ChannelException caused problems. Added Trace and debug logging.
BasicRejectMethodHandler, ChannelCloseHandler, ConnectionCloseMethodHandler - Added Debug logging
AMQPFastProtocolHandler - moved error log to before session.write
AMQMessage - Added additional debug via debugIdentity() and comments
AMQQueue - Decoupled reference counting from dequeue operation.
ConcurrentSelectorDeliveryManager - Added comments and increased info in debug logging
SubscriptionImpl - Disabled use of acks for browsers. For now put setDeliveredToConsumer back in the finally block. commented that I'm not sure this is correct as even an error writing to client will cause msg to be marked delivered to consumer.
+ On Close ensured that it is only called once.
+ Had problem where closing browser was causing two CancelOk frames to be sent back to client.
RequiredDeliveryException - Added comment to explain incrementReference
LocalTransactionalContext - Commented out incrementReference as it shouldn't be required here.
NonTransactionalContext - Removed incrementReference on deliver
+ - Fixed bug where browsers - acks would cause messages to be discarded. new JIRA this needs tidied up.
TxnBuffer - Added debug logging.
Client
------
AMQQueueBrowser - Added comments
AMQSession - Added comments and debug
+ Updated to cause closed consumer to reject messages rather than receive them.
+ Prevented NoConsumer's from rollingback and rejecting.. they simply clear their SyncQueue - JIRA to ensure clean state with rollback
BasicMessageConsumer - Added trace level debuging on close calls
+ Forced noConsume-rs to use NO_ACK
+ added more logging
Closeable - Updated to use isClosed rather than directly calling _closed.get() to aid in future work on ensuring multi threaded close still allows pending acks to be processed first.
ChannelCloseOkMethodHandler - updated comment
AMQProtocolSession - Update comments,whitespace
TransportConnection - removed static block
FlowControllingBlockingQueue - Added isEmpty() Method
PropertyValueTest - Added VM Broker setup
+ Updated test to run once and 50 times to pull out delivery tag problems that were occuring.
+ Adjusted logging level to be more helpful. moved some info down to trace and debug.
MessageRequeueTest - Moved QpidClientConnection its own file.
+ Fixed it so it actually runs more than one consumer, concurrently.Now 3 was 1.
ConcurrentLinkedMessageQueueAtomicSize - Implemented iterator().
Added QueueBrowserTest to system tests to test QueueBrowsering.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@517638 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java | 42 |
1 files changed, 37 insertions, 5 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java index da61f2ffd5..56eae279dc 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java @@ -25,6 +25,8 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicConsumeBody; import org.apache.qpid.framing.BasicConsumeOkBody; +import org.apache.qpid.framing.ChannelCloseBody; +import org.apache.qpid.framing.ConnectionCloseBody; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.AMQChannel; @@ -67,12 +69,22 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic } else { + if (_log.isDebugEnabled()) + { + _log.debug("BasicConsume: from '" + body.queue + + "' for:" + body.consumerTag + + " nowait:" + body.nowait + + " args:" + body.arguments); + } AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.queue); if (queue == null) { - _log.info("No queue for '" + body.queue + "'"); + if (_log.isTraceEnabled()) + { + _log.trace("No queue for '" + body.queue + "'"); + } if (body.queue != null) { String msg = "No such queue, '" + body.queue + "'"; @@ -105,14 +117,34 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic } catch (org.apache.qpid.AMQInvalidArgumentException ise) { - _log.info("Closing connection due to invalid selector"); - throw body.getChannelException(AMQConstant.INVALID_ARGUMENT, ise.getMessage()); + _log.debug("Closing connection due to invalid selector"); + // Why doesn't this ChannelException work. +// throw body.getChannelException(AMQConstant.INVALID_ARGUMENT, ise.getMessage()); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + session.writeFrame(ChannelCloseBody.createAMQFrame(channelId, + (byte) 8, (byte) 0, // AMQP version (major, minor) + BasicConsumeBody.getClazz((byte) 8, (byte) 0), // classId + BasicConsumeBody.getMethod((byte) 8, (byte) 0), // methodId + AMQConstant.INVALID_ARGUMENT.getCode(), // replyCode + new AMQShortString(ise.getMessage()))); // replyText } catch (ConsumerTagNotUniqueException e) { AMQShortString msg = new AMQShortString("Non-unique consumer tag, '" + body.consumerTag + "'"); - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Non-unique consumer tag, '" + body.consumerTag + "'"); + // If the above doesn't work then perhaps this is wrong too. +// throw body.getConnectionException(AMQConstant.NOT_ALLOWED, +// "Non-unique consumer tag, '" + body.consumerTag + "'"); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + BasicConsumeBody.getClazz((byte)8, (byte)0), // classId + BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId + AMQConstant.NOT_ALLOWED.getCode(), // replyCode + msg)); // replyText } catch (AMQQueue.ExistingExclusiveSubscription e) { |