diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java | 575 |
1 files changed, 248 insertions, 327 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 1314b2b715..847c8b8459 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -21,7 +21,6 @@ package org.apache.qpid.server; import org.apache.log4j.Logger; - import org.apache.qpid.AMQException; import org.apache.qpid.configuration.Configured; import org.apache.qpid.framing.AMQShortString; @@ -30,14 +29,23 @@ import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.ack.UnacknowledgedMessage; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl; import org.apache.qpid.server.configuration.Configurator; -import org.apache.qpid.server.exchange.NoRouteException; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.exchange.NoRouteException; +import org.apache.qpid.server.flow.FlowCreditManager; +import org.apache.qpid.server.flow.Pre0_10CreditManager; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.*; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.IncomingMessage; +import org.apache.qpid.server.queue.MessageHandleFactory; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; +import org.apache.qpid.server.subscription.ClientDeliveryMethod; +import org.apache.qpid.server.subscription.RecordDeliveryMethod; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.LocalTransactionalContext; @@ -45,13 +53,13 @@ import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; import java.util.Collection; -import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; +import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; public class AMQChannel { @@ -61,13 +69,8 @@ public class AMQChannel private final int _channelId; - // private boolean _transactional; - - private long _prefetch_HighWaterMark; - private long _prefetch_LowWaterMark; - - private long _prefetchSize; + private final Pre0_10CreditManager _creditManager = new Pre0_10CreditManager(0l,0l); /** * The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that @@ -86,10 +89,11 @@ public class AMQChannel * been received by this channel. As the frames are received the message gets updated and once all frames have been * received the message can then be routed. */ - private AMQMessage _currentMessage; + private IncomingMessage _currentMessage; + + /** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */ + private final Map<AMQShortString, Subscription> _tag2SubscriptionMap = new HashMap<AMQShortString, Subscription>(); - /** Maps from consumer tag to queue instance. Allows us to unsubscribe from a queue. */ - private final Map<AMQShortString, AMQQueue> _consumerTag2QueueMap = new ConcurrentHashMap<AMQShortString, AMQQueue>(); private final MessageStore _messageStore; @@ -97,7 +101,7 @@ public class AMQChannel private final AtomicBoolean _suspended = new AtomicBoolean(false); - private TransactionalContext _txnContext, _nonTransactedContext; + private TransactionalContext _txnContext; /** * A context used by the message store enabling it to track context for a given channel even across thread @@ -109,8 +113,6 @@ public class AMQChannel private MessageHandleFactory _messageHandleFactory = new MessageHandleFactory(); - private Set<Long> _browsedAcks = new HashSet<Long>(); - // Why do we need this reference ? - ritchiem private final AMQProtocolSession _session; private boolean _closing; @@ -118,7 +120,7 @@ public class AMQChannel @Configured(path = "advanced.enableJMSXUserID", defaultValue = "false") public boolean ENABLE_JMSXUserID; - + public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore) throws AMQException @@ -129,8 +131,8 @@ public class AMQChannel _session = session; _channelId = channelId; _storeContext = new StoreContext("Session: " + session.getClientIdentifier() + "; channel: " + channelId); - _prefetch_HighWaterMark = DEFAULT_PREFETCH; - _prefetch_LowWaterMark = _prefetch_HighWaterMark / 2; + + _messageStore = messageStore; // by default the session is non-transactional @@ -140,7 +142,7 @@ public class AMQChannel /** Sets this channel to be part of a local transaction */ public void setLocalTransactional() { - _txnContext = new LocalTransactionalContext(_messageStore, _storeContext, _returnMessages); + _txnContext = new LocalTransactionalContext(this); } public boolean isTransactional() @@ -156,55 +158,15 @@ public class AMQChannel return _channelId; } - public long getPrefetchCount() - { - return _prefetch_HighWaterMark; - } - - public void setPrefetchCount(long prefetchCount) - { - _prefetch_HighWaterMark = prefetchCount; - } - - public long getPrefetchSize() - { - return _prefetchSize; - } - - public void setPrefetchSize(long prefetchSize) - { - _prefetchSize = prefetchSize; - } - - public long getPrefetchLowMarkCount() - { - return _prefetch_LowWaterMark; - } - - public void setPrefetchLowMarkCount(long prefetchCount) - { - _prefetch_LowWaterMark = prefetchCount; - } - - public long getPrefetchHighMarkCount() - { - return _prefetch_HighWaterMark; - } - - public void setPrefetchHighMarkCount(long prefetchCount) - { - _prefetch_HighWaterMark = prefetchCount; - } - - public void setPublishFrame(MessagePublishInfo info, AMQProtocolSession publisher, final Exchange e) throws AMQException + public void setPublishFrame(MessagePublishInfo info, final Exchange e) throws AMQException { - _currentMessage = new AMQMessage(_messageStore.getNewMessageId(), info, _txnContext); - _currentMessage.setPublisher(publisher); + _currentMessage = new IncomingMessage(_messageStore.getNewMessageId(), info, _txnContext, _session); + _currentMessage.setMessageStore(_messageStore); _currentMessage.setExchange(e); } - public void publishContentHeader(ContentHeaderBody contentHeaderBody, AMQProtocolSession protocolSession) + public void publishContentHeader(ContentHeaderBody contentHeaderBody) throws AMQException { if (_currentMessage == null) @@ -215,7 +177,7 @@ public class AMQChannel { if (_log.isDebugEnabled()) { - _log.debug(debugIdentity() + "Content header received on channel " + _channelId); + _log.debug("Content header received on channel " + _channelId); } if (ENABLE_JMSXUserID) @@ -225,25 +187,48 @@ public class AMQChannel //fixme: fudge for QPID-677 properties.getHeaders().keySet(); - properties.setUserId(protocolSession.getAuthorizedID().getName()); + properties.setUserId(_session.getAuthorizedID().getName()); } _currentMessage.setContentHeaderBody(contentHeaderBody); + _currentMessage.setExpiration(); routeCurrentMessage(); - _currentMessage.routingComplete(_messageStore, _storeContext, _messageHandleFactory); - // check and deliver if header says body length is zero - if (contentHeaderBody.bodySize == 0) + _currentMessage.routingComplete(_messageStore, _messageHandleFactory); + + deliverCurrentMessageIfComplete(); + + } + } + + private void deliverCurrentMessageIfComplete() + throws AMQException + { + // check and deliver if header says body length is zero + if (_currentMessage.allContentReceived()) + { + try { - _txnContext.messageProcessed(protocolSession); + _currentMessage.deliverToQueues(); + } + catch (NoRouteException e) + { + _returnMessages.add(e); + } + finally + { + // callback to allow the context to do any post message processing + // primary use is to allow message return processing in the non-tx case + _txnContext.messageProcessed(_session); _currentMessage = null; } } + } - public void publishContentBody(ContentBody contentBody, AMQProtocolSession protocolSession) throws AMQException + public void publishContentBody(ContentBody contentBody) throws AMQException { if (_currentMessage == null) { @@ -260,15 +245,11 @@ public class AMQChannel // returns true iff the message was delivered (i.e. if all data was // received - if (_currentMessage.addContentBodyFrame(_storeContext, - protocolSession.getMethodRegistry().getProtocolVersionMethodConverter().convertToContentChunk( - contentBody))) - { - // callback to allow the context to do any post message processing - // primary use is to allow message return processing in the non-tx case - _txnContext.messageProcessed(protocolSession); - _currentMessage = null; - } + _currentMessage.addContentBodyFrame( + _session.getMethodRegistry().getProtocolVersionMethodConverter().convertToContentChunk( + contentBody)); + + deliverCurrentMessageIfComplete(); } catch (AMQException e) { @@ -287,6 +268,7 @@ public class AMQChannel } catch (NoRouteException e) { + //_currentMessage.incrementReference(); _returnMessages.add(e); } } @@ -307,18 +289,17 @@ public class AMQChannel * * @param tag the tag chosen by the client (if null, server will generate one) * @param queue the queue to subscribe to - * @param session the protocol session of the subscriber - * @param noLocal Flag stopping own messages being receivied. - * @param exclusive Flag requesting exclusive access to the queue * @param acks Are acks enabled for this subscriber * @param filters Filters to apply to this subscriber * + * @param noLocal Flag stopping own messages being receivied. + * @param exclusive Flag requesting exclusive access to the queue * @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests * * @throws ConsumerTagNotUniqueException if the tag is not unique * @throws AMQException if something goes wrong */ - public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, AMQProtocolSession session, boolean acks, + public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, boolean acks, FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException, ConsumerTagNotUniqueException { if (tag == null) @@ -326,77 +307,65 @@ public class AMQChannel tag = new AMQShortString("sgen_" + getNextConsumerTag()); } - if (_consumerTag2QueueMap.containsKey(tag)) + if (_tag2SubscriptionMap.containsKey(tag)) { throw new ConsumerTagNotUniqueException(); } + Subscription subscription = + SubscriptionFactoryImpl.INSTANCE.createSubscription(_channelId, _session, tag, acks, filters, noLocal, _creditManager); + + + // So to keep things straight we put before the call and catch all exceptions from the register and tidy up. // We add before we register as the Async Delivery process may AutoClose the subscriber // so calling _cT2QM.remove before we have done put which was after the register succeeded. // So to keep things straight we put before the call and catch all exceptions from the register and tidy up. - _consumerTag2QueueMap.put(tag, queue); + + _tag2SubscriptionMap.put(tag, subscription); try { - queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal, exclusive); + queue.registerSubscription(subscription, exclusive); } catch (AMQException e) { - _consumerTag2QueueMap.remove(tag); + _tag2SubscriptionMap.remove(tag); throw e; } - return tag; } /** * Unsubscribe a consumer from a queue. - * @param session * @param consumerTag * @return true if the consumerTag had a mapped queue that could be unregistered. * @throws AMQException */ - public boolean unsubscribeConsumer(AMQProtocolSession session, AMQShortString consumerTag) throws AMQException + public boolean unsubscribeConsumer(AMQShortString consumerTag) throws AMQException { - if (_log.isDebugEnabled()) - { - _log.debug("Unacked Map Dump size:" + _unacknowledgedMessageMap.size()); - _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() - { - - public boolean callback(UnacknowledgedMessage message) throws AMQException - { - _log.debug(message); - - return true; - } - public void visitComplete() - { - } - }); - } - - AMQQueue q = _consumerTag2QueueMap.remove(consumerTag); - if (q != null) + Subscription sub = _tag2SubscriptionMap.remove(consumerTag); + if (sub != null) { - q.unregisterProtocolSession(session, _channelId, consumerTag); + sub.getQueue().unregisterSubscription(sub); return true; } + else + { + _log.warn("Attempt to unsubscribe consumer with tag '"+consumerTag+"' which is not registered."); + } return false; } /** * Called from the protocol session to close this channel and clean up. T * - * @param session The session to close - * * @throws AMQException if there is an error during closure */ - public void close(AMQProtocolSession session) throws AMQException + public void close() throws AMQException { _txnContext.rollback(); - unsubscribeAllConsumers(session); + unsubscribeAllConsumers(); try { requeue(); @@ -414,11 +383,11 @@ public class AMQChannel _closing = closing; } - private void unsubscribeAllConsumers(AMQProtocolSession session) throws AMQException + private void unsubscribeAllConsumers() throws AMQException { if (_log.isInfoEnabled()) { - if (!_consumerTag2QueueMap.isEmpty()) + if (!_tag2SubscriptionMap.isEmpty()) { _log.info("Unsubscribing all consumers on channel " + toString()); } @@ -428,17 +397,19 @@ public class AMQChannel } } - for (Map.Entry<AMQShortString, AMQQueue> me : _consumerTag2QueueMap.entrySet()) + for (Map.Entry<AMQShortString, Subscription> me : _tag2SubscriptionMap.entrySet()) { if (_log.isInfoEnabled()) { _log.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString()); } - me.getValue().unregisterProtocolSession(session, _channelId, me.getKey()); + Subscription sub = me.getValue(); + + sub.getQueue().unregisterSubscription(sub); } - _consumerTag2QueueMap.clear(); + _tag2SubscriptionMap.clear(); } /** @@ -447,9 +418,9 @@ public class AMQChannel * @param entry the record of the message on the queue that was delivered * @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of the * delivery tag) - * @param consumerTag The tag for the consumer that is to acknowledge this message. + * @param subscription The consumer that is to acknowledge this message. */ - public void addUnacknowledgedMessage(QueueEntry entry, long deliveryTag, AMQShortString consumerTag) + public void addUnacknowledgedMessage(QueueEntry entry, long deliveryTag, Subscription subscription) { if (_log.isDebugEnabled()) { @@ -462,16 +433,13 @@ public class AMQChannel if (_log.isDebugEnabled()) { _log.debug(debugIdentity() + " Adding unacked message(" + entry.getMessage().toString() + " DT:" + deliveryTag - + ") with a queue(" + entry.getQueue() + ") for " + consumerTag); + + ") with a queue(" + entry.getQueue() + ") for " + subscription); } } } - synchronized (_unacknowledgedMessageMap.getLock()) - { - _unacknowledgedMessageMap.add(deliveryTag, new UnacknowledgedMessage(entry, consumerTag, deliveryTag,_unacknowledgedMessageMap)); - checkSuspension(); - } + _unacknowledgedMessageMap.add(deliveryTag, entry); + } private final String id = "(" + System.identityHashCode(this) + ")"; @@ -490,7 +458,7 @@ public class AMQChannel public void requeue() throws AMQException { // we must create a new map since all the messages will get a new delivery tag when they are redelivered - Collection<UnacknowledgedMessage> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages(); + Collection<QueueEntry> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages(); // Deliver these messages out of the transaction as their delivery was never // part of the transaction only the receive. @@ -505,13 +473,9 @@ public class AMQChannel if (!(_txnContext instanceof NonTransactionalContext)) { - // if (_nonTransactedContext == null) - { - _nonTransactedContext = - new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages); - } - deliveryContext = _nonTransactedContext; + deliveryContext = + new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages); } else { @@ -519,22 +483,23 @@ public class AMQChannel } } - for (UnacknowledgedMessage unacked : messagesToBeDelivered) + for (QueueEntry unacked : messagesToBeDelivered) { if (!unacked.isQueueDeleted()) { - // Ensure message is released for redelivery - unacked.entry.release(); - // Mark message redelivered unacked.getMessage().setRedelivered(true); + // Ensure message is released for redelivery + unacked.release(); + // Deliver Message - deliveryContext.deliver(unacked.entry, false); + deliveryContext.requeue(unacked); - // Should we allow access To the DM to directy deliver the message? - // As we don't need to check for Consumers or worry about incrementing the message count? - // unacked.queue.getDeliveryManager().deliver(_storeContext, unacked.queue.getName(), unacked.message, false); + } + else + { + unacked.discard(_storeContext); } } @@ -549,32 +514,29 @@ public class AMQChannel */ public void requeue(long deliveryTag) throws AMQException { - UnacknowledgedMessage unacked = _unacknowledgedMessageMap.remove(deliveryTag); + QueueEntry unacked = _unacknowledgedMessageMap.remove(deliveryTag); if (unacked != null) { + // Mark message redelivered + unacked.getMessage().setRedelivered(true); // Ensure message is released for redelivery if (!unacked.isQueueDeleted()) { - unacked.entry.release(); + unacked.release(); } - // Mark message redelivered - unacked.getMessage().setRedelivered(true); // Deliver these messages out of the transaction as their delivery was never // part of the transaction only the receive. TransactionalContext deliveryContext; if (!(_txnContext instanceof NonTransactionalContext)) { - // if (_nonTransactedContext == null) - { - _nonTransactedContext = + + deliveryContext = new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages); - } - deliveryContext = _nonTransactedContext; } else { @@ -584,7 +546,7 @@ public class AMQChannel if (!unacked.isQueueDeleted()) { // Redeliver the messages to the front of the queue - deliveryContext.deliver(unacked.entry, true); + deliveryContext.requeue(unacked); // Deliver increments the message count but we have already deliverted this once so don't increment it again // this was because deliver did an increment changed this. } @@ -592,11 +554,8 @@ public class AMQChannel { _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.getMessage().debugIdentity() + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message."); - // _log.error("Requested requeue of message:" + deliveryTag + - // " but no queue defined using DeadLetter queue:" + getDeadLetterQueue()); - // - // deliveryContext.deliver(unacked.message, getDeadLetterQueue(), false); - // + + unacked.discard(_storeContext); } } else @@ -604,25 +563,6 @@ public class AMQChannel _log.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists." + _unacknowledgedMessageMap.size()); - if (_log.isDebugEnabled()) - { - _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() - { - int count = 0; - - public boolean callback(UnacknowledgedMessage message) throws AMQException - { - _log.debug( - (count++) + ": (" + message.getMessage().debugIdentity() + ")" + "[" + message.deliveryTag + "]"); - - return false; // Continue - } - - public void visitComplete() - { - } - }); - } } } @@ -636,8 +576,10 @@ public class AMQChannel */ public void resend(final boolean requeue) throws AMQException { - final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>(); - final List<UnacknowledgedMessage> msgToResend = new LinkedList<UnacknowledgedMessage>(); + + + final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>(); + final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>(); if (_log.isDebugEnabled()) { @@ -647,23 +589,25 @@ public class AMQChannel // Process the Unacked-Map. // Marking messages who still have a consumer for to be resent // and those that don't to be requeued. + _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() { - public boolean callback(UnacknowledgedMessage message) throws AMQException + public boolean callback(final long deliveryTag, QueueEntry message) throws AMQException { - AMQShortString consumerTag = message.consumerTag; + AMQMessage msg = message.getMessage(); msg.setRedelivered(true); - if (consumerTag != null) + final Subscription subscription = message.getDeliveredSubscription(); + if (subscription != null) { // Consumer exists - if (_consumerTag2QueueMap.containsKey(consumerTag)) + if (!subscription.isClosed()) { - msgToResend.add(message); + msgToResend.put(deliveryTag, message); } else // consumer has gone { - msgToRequeue.add(message); + msgToRequeue.put(deliveryTag, message); } } else @@ -675,7 +619,7 @@ public class AMQChannel { if (requeue) { - msgToRequeue.add(message); + msgToRequeue.put(deliveryTag, message); } else { @@ -684,7 +628,8 @@ public class AMQChannel } else { - _log.info("Message.queue is null and no DeadLetter Queue so dropping message:" + message); + message.discard(_storeContext); + _log.warn("Message.queue is null and no DeadLetter Queue so dropping message:" + message); } } @@ -697,6 +642,8 @@ public class AMQChannel } }); + _unacknowledgedMessageMap.clear(); + // Process Messages to Resend if (_log.isDebugEnabled()) { @@ -710,9 +657,15 @@ public class AMQChannel } } - for (UnacknowledgedMessage message : msgToResend) + for (Map.Entry<Long, QueueEntry> entry : msgToResend.entrySet()) { + QueueEntry message = entry.getValue(); + long deliveryTag = entry.getKey(); + + + AMQMessage msg = message.getMessage(); + AMQQueue queue = message.getQueue(); // Our Java Client will always suspend the channel when resending! // If the client has requested the messages be resent then it is @@ -727,46 +680,20 @@ public class AMQChannel // else // { // release to allow it to be delivered - message.entry.release(); // Without any details from the client about what has been processed we have to mark // all messages in the unacked map as redelivered. msg.setRedelivered(true); - Subscription sub = message.entry.getDeliveredSubscription(); + Subscription sub = message.getDeliveredSubscription(); if (sub != null) { - // Get the lock so we can tell if the sub scription has closed. - // will stop delivery to this subscription until the lock is released. - // note: this approach would allow the use of a single queue if the - // PreDeliveryQueue would allow head additions. - // In the Java Qpid client we are suspended whilst doing this so it is all rather Mute.. - // needs guidance from AMQP WG Model SIG - synchronized (sub.getSendLock()) + + if(!queue.resend(message, sub)) { - if (sub.isClosed()) - { - if (_log.isDebugEnabled()) - { - _log.debug("Subscription(" + System.identityHashCode(sub) - + ") closed during resend so requeuing message"); - } - // move this message to requeue - msgToRequeue.add(message); - } - else - { - if (_log.isDebugEnabled()) - { - _log.debug("Requeuing " + msg.debugIdentity() + " for resend via sub:" - + System.identityHashCode(sub)); - } - - sub.addToResendQueue(message.entry); - _unacknowledgedMessageMap.remove(message.deliveryTag); - } - } // sync(sub.getSendLock) + msgToRequeue.put(deliveryTag, message); + } } else { @@ -777,7 +704,7 @@ public class AMQChannel + ")to prevent loss"); } // move this message to requeue - msgToRequeue.add(message); + msgToRequeue.put(deliveryTag, message); } } // for all messages // } else !isSuspend @@ -795,13 +722,9 @@ public class AMQChannel TransactionalContext deliveryContext; if (!(_txnContext instanceof NonTransactionalContext)) { - if (_nonTransactedContext == null) - { - _nonTransactedContext = - new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages); - } - deliveryContext = _nonTransactedContext; + deliveryContext = + new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages); } else { @@ -809,14 +732,17 @@ public class AMQChannel } // Process Messages to Requeue at the front of the queue - for (UnacknowledgedMessage message : msgToRequeue) + for (Map.Entry<Long, QueueEntry> entry : msgToRequeue.entrySet()) { - message.entry.release(); - message.entry.setRedelivered(true); + QueueEntry message = entry.getValue(); + long deliveryTag = entry.getKey(); + + message.release(); + message.setRedelivered(true); - deliveryContext.deliver(message.entry, true); + deliveryContext.requeue(message); - _unacknowledgedMessageMap.remove(message.deliveryTag); + _unacknowledgedMessageMap.remove(deliveryTag); } } @@ -827,38 +753,47 @@ public class AMQChannel * * @param queue the queue that has been deleted * - * @throws org.apache.qpid.AMQException if there is an error processing the unacked messages */ - public void queueDeleted(final AMQQueue queue) throws AMQException + /* public void queueDeleted(final AMQQueue queue) { - _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() + try { - public boolean callback(UnacknowledgedMessage message) throws AMQException + _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() { - if (message.getQueue() == queue) + public boolean callback(UnacknowledgedMessage message) { - try + if (message.getQueue() == queue) { - message.discard(_storeContext); - message.setQueueDeleted(true); + try + { + message.discard(_storeContext); + message.setQueueDeleted(true); + } + catch (AMQException e) + { + _log.error( + "Error decrementing ref count on message " + message.getMessage().getMessageId() + ": " + e, e); + throw new RuntimeException(e); + } } - catch (AMQException e) - { - _log.error( - "Error decrementing ref count on message " + message.getMessage().getMessageId() + ": " + e, e); - } + + return false; } - return false; - } + public void visitComplete() + { + } + }); + } + catch (AMQException e) + { + _log.error("Unexpected Error while handling deletion of queue", e); + throw new RuntimeException(e); + } - public void visitComplete() - { - } - }); } - +*/ /** * Acknowledge one or more messages. * @@ -870,23 +805,7 @@ public class AMQChannel */ public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException { - synchronized (_unacknowledgedMessageMap.getLock()) - { - if (_log.isDebugEnabled()) - { - _log.debug("Unacked (PreAck) Size:" + _unacknowledgedMessageMap.size()); - } - - _unacknowledgedMessageMap.acknowledgeMessage(deliveryTag, multiple, _txnContext); - - if (_log.isDebugEnabled()) - { - _log.debug("Unacked (PostAck) Size:" + _unacknowledgedMessageMap.size()); - } - - } - - checkSuspension(); + _unacknowledgedMessageMap.acknowledgeMessage(deliveryTag, multiple, _txnContext); } /** @@ -899,43 +818,22 @@ public class AMQChannel return _unacknowledgedMessageMap; } - private void checkSuspension() - { - boolean suspend; - - suspend = - ((_prefetch_HighWaterMark != 0) && (_unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark)) - || ((_prefetchSize != 0) && (_prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes())); - - setSuspended(suspend); - } public void setSuspended(boolean suspended) { - boolean isSuspended = _suspended.get(); - if (isSuspended && !suspended) - { - // Continue being suspended if we are above the _prefetch_LowWaterMark - suspended = _unacknowledgedMessageMap.size() > _prefetch_LowWaterMark; - } boolean wasSuspended = _suspended.getAndSet(suspended); if (wasSuspended != suspended) { if (wasSuspended) { - _log.debug("Unsuspending channel " + this); // may need to deliver queued messages - for (AMQQueue q : _consumerTag2QueueMap.values()) + for (Subscription s : _tag2SubscriptionMap.values()) { - q.deliverAsync(); + s.getQueue().deliverAsync(s); } } - else - { - _log.debug("Suspending channel " + this); - } } } @@ -961,12 +859,7 @@ public class AMQChannel public String toString() { - StringBuilder sb = new StringBuilder(30); - sb.append("Channel: id ").append(_channelId).append(", transaction mode: ").append(isTransactional()); - sb.append(", prefetch marks: ").append(_prefetch_LowWaterMark); - sb.append("/").append(_prefetch_HighWaterMark); - - return sb.toString(); + return "["+_session.toString()+":"+_channelId+"]"; } public void setDefaultQueue(AMQQueue queue) @@ -984,14 +877,14 @@ public class AMQChannel return _storeContext; } - public void processReturns(AMQProtocolSession session) throws AMQException + public void processReturns() throws AMQException { if (!_returnMessages.isEmpty()) { for (RequiredDeliveryException bouncedMessage : _returnMessages) { AMQMessage message = bouncedMessage.getAMQMessage(); - session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(), + _session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(), new AMQShortString(bouncedMessage.getMessage())); message.decrementReference(_storeContext); @@ -1001,40 +894,68 @@ public class AMQChannel } } - public boolean wouldSuspend(AMQMessage msg) + + public TransactionalContext getTransactionalContext() { - if (isSuspended()) - { - return true; - } - else - { - boolean willSuspend = - ((_prefetch_HighWaterMark != 0) && ((_unacknowledgedMessageMap.size() + 1) > _prefetch_HighWaterMark)); - if (!willSuspend) - { - final long unackedSize = _unacknowledgedMessageMap.getUnacknowledgeBytes(); + return _txnContext; + } - willSuspend = (_prefetchSize != 0) && (unackedSize != 0) && (_prefetchSize < (msg.getSize() + unackedSize)); - } + public boolean isClosing() + { + return _closing; + } - if (willSuspend) - { - setSuspended(true); - } + public AMQProtocolSession getProtocolSession() + { + return _session; + } - return willSuspend; - } + public FlowCreditManager getCreditManager() + { + return _creditManager; + } + public void setCredit(final long prefetchSize, final int prefetchCount) + { + _creditManager.setCreditLimits(prefetchSize, prefetchCount); } - public TransactionalContext getTransactionalContext() + public List<RequiredDeliveryException> getReturnMessages() { - return _txnContext; + return _returnMessages; } - public boolean isClosing() + public MessageStore getMessageStore() { - return _closing; + return _messageStore; + } + + private final ClientDeliveryMethod _clientDeliveryMethod = new ClientDeliveryMethod() + { + + public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag) + throws AMQException + { + getProtocolSession().getProtocolOutputConverter().writeDeliver(entry.getMessage(), getChannelId(), deliveryTag, sub.getConsumerTag()); + } + }; + + public ClientDeliveryMethod getClientDeliveryMethod() + { + return _clientDeliveryMethod; + } + + private final RecordDeliveryMethod _recordDeliveryMethod = new RecordDeliveryMethod() + { + + public void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag) + { + addUnacknowledgedMessage(entry, deliveryTag, sub); + } + }; + + public RecordDeliveryMethod getRecordDeliveryMethod() + { + return _recordDeliveryMethod; } } |