summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-03-13 10:35:42 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-03-13 10:35:42 +0000
commitc4f018f7c10d2169ced4c59e776844ee5cf52d33 (patch)
tree421d85cf41bf382bab618587298d9d6bef825bdc /java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
parent685ad5615e73f02f76f69841162fb9aa126892d2 (diff)
downloadqpid-python-c4f018f7c10d2169ced4c59e776844ee5cf52d33.tar.gz
QPID-346,QPID-386,QPID-403, QPID-410 Rollback, Basic-Reject, QueueBrowser NO_ACK.
QPID-346 Message loss after rollback\recover QPID-386 Updated Transactional Tests to cover underlying AMQP/Qpid state. QPID-403 Implement Basic.Reject QPID-410 Queue Browsers should use not acknowledge messages. ------------------------------------- Broker TxAck - Added comment and fixed white space UnacknowledgedMessage - Added comment for messageDecrement AMQChannel - Added extra debugging. + Created a NonTransactionalContext for requeuing messages as using txContext will tie the requeue to any runing transaction. + Updated message reference counting. So it is in terms of queues don't increment when giving to client. BasicCancelMethodHandler - Added Debug log. BasicConsumeMethodHandler - Reverted to directly writes frames to the session, throwing ChannelException caused problems. Added Trace and debug logging. BasicRejectMethodHandler, ChannelCloseHandler, ConnectionCloseMethodHandler - Added Debug logging AMQPFastProtocolHandler - moved error log to before session.write AMQMessage - Added additional debug via debugIdentity() and comments AMQQueue - Decoupled reference counting from dequeue operation. ConcurrentSelectorDeliveryManager - Added comments and increased info in debug logging SubscriptionImpl - Disabled use of acks for browsers. For now put setDeliveredToConsumer back in the finally block. commented that I'm not sure this is correct as even an error writing to client will cause msg to be marked delivered to consumer. + On Close ensured that it is only called once. + Had problem where closing browser was causing two CancelOk frames to be sent back to client. RequiredDeliveryException - Added comment to explain incrementReference LocalTransactionalContext - Commented out incrementReference as it shouldn't be required here. NonTransactionalContext - Removed incrementReference on deliver + - Fixed bug where browsers - acks would cause messages to be discarded. new JIRA this needs tidied up. TxnBuffer - Added debug logging. Client ------ AMQQueueBrowser - Added comments AMQSession - Added comments and debug + Updated to cause closed consumer to reject messages rather than receive them. + Prevented NoConsumer's from rollingback and rejecting.. they simply clear their SyncQueue - JIRA to ensure clean state with rollback BasicMessageConsumer - Added trace level debuging on close calls + Forced noConsume-rs to use NO_ACK + added more logging Closeable - Updated to use isClosed rather than directly calling _closed.get() to aid in future work on ensuring multi threaded close still allows pending acks to be processed first. ChannelCloseOkMethodHandler - updated comment AMQProtocolSession - Update comments,whitespace TransportConnection - removed static block FlowControllingBlockingQueue - Added isEmpty() Method PropertyValueTest - Added VM Broker setup + Updated test to run once and 50 times to pull out delivery tag problems that were occuring. + Adjusted logging level to be more helpful. moved some info down to trace and debug. MessageRequeueTest - Moved QpidClientConnection its own file. + Fixed it so it actually runs more than one consumer, concurrently.Now 3 was 1. ConcurrentLinkedMessageQueueAtomicSize - Implemented iterator(). Added QueueBrowserTest to system tests to test QueueBrowsering. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@517638 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java42
1 files changed, 37 insertions, 5 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
index da61f2ffd5..56eae279dc 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
@@ -25,6 +25,8 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicConsumeBody;
import org.apache.qpid.framing.BasicConsumeOkBody;
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.AMQChannel;
@@ -67,12 +69,22 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
}
else
{
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("BasicConsume: from '" + body.queue +
+ "' for:" + body.consumerTag +
+ " nowait:" + body.nowait +
+ " args:" + body.arguments);
+ }
AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.queue);
if (queue == null)
{
- _log.info("No queue for '" + body.queue + "'");
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("No queue for '" + body.queue + "'");
+ }
if (body.queue != null)
{
String msg = "No such queue, '" + body.queue + "'";
@@ -105,14 +117,34 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
}
catch (org.apache.qpid.AMQInvalidArgumentException ise)
{
- _log.info("Closing connection due to invalid selector");
- throw body.getChannelException(AMQConstant.INVALID_ARGUMENT, ise.getMessage());
+ _log.debug("Closing connection due to invalid selector");
+ // Why doesn't this ChannelException work.
+// throw body.getChannelException(AMQConstant.INVALID_ARGUMENT, ise.getMessage());
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeFrame(ChannelCloseBody.createAMQFrame(channelId,
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ BasicConsumeBody.getClazz((byte) 8, (byte) 0), // classId
+ BasicConsumeBody.getMethod((byte) 8, (byte) 0), // methodId
+ AMQConstant.INVALID_ARGUMENT.getCode(), // replyCode
+ new AMQShortString(ise.getMessage()))); // replyText
}
catch (ConsumerTagNotUniqueException e)
{
AMQShortString msg = new AMQShortString("Non-unique consumer tag, '" + body.consumerTag + "'");
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "Non-unique consumer tag, '" + body.consumerTag + "'");
+ // If the above doesn't work then perhaps this is wrong too.
+// throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+// "Non-unique consumer tag, '" + body.consumerTag + "'");
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ BasicConsumeBody.getClazz((byte)8, (byte)0), // classId
+ BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId
+ AMQConstant.NOT_ALLOWED.getCode(), // replyCode
+ msg)); // replyText
}
catch (AMQQueue.ExistingExclusiveSubscription e)
{