summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java575
1 files changed, 248 insertions, 327 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 1314b2b715..847c8b8459 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -21,7 +21,6 @@
package org.apache.qpid.server;
import org.apache.log4j.Logger;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.configuration.Configured;
import org.apache.qpid.framing.AMQShortString;
@@ -30,14 +29,23 @@ import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.ack.UnacknowledgedMessage;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
import org.apache.qpid.server.configuration.Configurator;
-import org.apache.qpid.server.exchange.NoRouteException;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.NoRouteException;
+import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.*;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.IncomingMessage;
+import org.apache.qpid.server.queue.MessageHandleFactory;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
+import org.apache.qpid.server.subscription.ClientDeliveryMethod;
+import org.apache.qpid.server.subscription.RecordDeliveryMethod;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.LocalTransactionalContext;
@@ -45,13 +53,13 @@ import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
import java.util.Collection;
-import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
public class AMQChannel
{
@@ -61,13 +69,8 @@ public class AMQChannel
private final int _channelId;
- // private boolean _transactional;
-
- private long _prefetch_HighWaterMark;
- private long _prefetch_LowWaterMark;
-
- private long _prefetchSize;
+ private final Pre0_10CreditManager _creditManager = new Pre0_10CreditManager(0l,0l);
/**
* The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that
@@ -86,10 +89,11 @@ public class AMQChannel
* been received by this channel. As the frames are received the message gets updated and once all frames have been
* received the message can then be routed.
*/
- private AMQMessage _currentMessage;
+ private IncomingMessage _currentMessage;
+
+ /** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */
+ private final Map<AMQShortString, Subscription> _tag2SubscriptionMap = new HashMap<AMQShortString, Subscription>();
- /** Maps from consumer tag to queue instance. Allows us to unsubscribe from a queue. */
- private final Map<AMQShortString, AMQQueue> _consumerTag2QueueMap = new ConcurrentHashMap<AMQShortString, AMQQueue>();
private final MessageStore _messageStore;
@@ -97,7 +101,7 @@ public class AMQChannel
private final AtomicBoolean _suspended = new AtomicBoolean(false);
- private TransactionalContext _txnContext, _nonTransactedContext;
+ private TransactionalContext _txnContext;
/**
* A context used by the message store enabling it to track context for a given channel even across thread
@@ -109,8 +113,6 @@ public class AMQChannel
private MessageHandleFactory _messageHandleFactory = new MessageHandleFactory();
- private Set<Long> _browsedAcks = new HashSet<Long>();
-
// Why do we need this reference ? - ritchiem
private final AMQProtocolSession _session;
private boolean _closing;
@@ -118,7 +120,7 @@ public class AMQChannel
@Configured(path = "advanced.enableJMSXUserID",
defaultValue = "false")
public boolean ENABLE_JMSXUserID;
-
+
public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
throws AMQException
@@ -129,8 +131,8 @@ public class AMQChannel
_session = session;
_channelId = channelId;
_storeContext = new StoreContext("Session: " + session.getClientIdentifier() + "; channel: " + channelId);
- _prefetch_HighWaterMark = DEFAULT_PREFETCH;
- _prefetch_LowWaterMark = _prefetch_HighWaterMark / 2;
+
+
_messageStore = messageStore;
// by default the session is non-transactional
@@ -140,7 +142,7 @@ public class AMQChannel
/** Sets this channel to be part of a local transaction */
public void setLocalTransactional()
{
- _txnContext = new LocalTransactionalContext(_messageStore, _storeContext, _returnMessages);
+ _txnContext = new LocalTransactionalContext(this);
}
public boolean isTransactional()
@@ -156,55 +158,15 @@ public class AMQChannel
return _channelId;
}
- public long getPrefetchCount()
- {
- return _prefetch_HighWaterMark;
- }
-
- public void setPrefetchCount(long prefetchCount)
- {
- _prefetch_HighWaterMark = prefetchCount;
- }
-
- public long getPrefetchSize()
- {
- return _prefetchSize;
- }
-
- public void setPrefetchSize(long prefetchSize)
- {
- _prefetchSize = prefetchSize;
- }
-
- public long getPrefetchLowMarkCount()
- {
- return _prefetch_LowWaterMark;
- }
-
- public void setPrefetchLowMarkCount(long prefetchCount)
- {
- _prefetch_LowWaterMark = prefetchCount;
- }
-
- public long getPrefetchHighMarkCount()
- {
- return _prefetch_HighWaterMark;
- }
-
- public void setPrefetchHighMarkCount(long prefetchCount)
- {
- _prefetch_HighWaterMark = prefetchCount;
- }
-
- public void setPublishFrame(MessagePublishInfo info, AMQProtocolSession publisher, final Exchange e) throws AMQException
+ public void setPublishFrame(MessagePublishInfo info, final Exchange e) throws AMQException
{
- _currentMessage = new AMQMessage(_messageStore.getNewMessageId(), info, _txnContext);
- _currentMessage.setPublisher(publisher);
+ _currentMessage = new IncomingMessage(_messageStore.getNewMessageId(), info, _txnContext, _session);
+ _currentMessage.setMessageStore(_messageStore);
_currentMessage.setExchange(e);
}
- public void publishContentHeader(ContentHeaderBody contentHeaderBody, AMQProtocolSession protocolSession)
+ public void publishContentHeader(ContentHeaderBody contentHeaderBody)
throws AMQException
{
if (_currentMessage == null)
@@ -215,7 +177,7 @@ public class AMQChannel
{
if (_log.isDebugEnabled())
{
- _log.debug(debugIdentity() + "Content header received on channel " + _channelId);
+ _log.debug("Content header received on channel " + _channelId);
}
if (ENABLE_JMSXUserID)
@@ -225,25 +187,48 @@ public class AMQChannel
//fixme: fudge for QPID-677
properties.getHeaders().keySet();
- properties.setUserId(protocolSession.getAuthorizedID().getName());
+ properties.setUserId(_session.getAuthorizedID().getName());
}
_currentMessage.setContentHeaderBody(contentHeaderBody);
+
_currentMessage.setExpiration();
routeCurrentMessage();
- _currentMessage.routingComplete(_messageStore, _storeContext, _messageHandleFactory);
- // check and deliver if header says body length is zero
- if (contentHeaderBody.bodySize == 0)
+ _currentMessage.routingComplete(_messageStore, _messageHandleFactory);
+
+ deliverCurrentMessageIfComplete();
+
+ }
+ }
+
+ private void deliverCurrentMessageIfComplete()
+ throws AMQException
+ {
+ // check and deliver if header says body length is zero
+ if (_currentMessage.allContentReceived())
+ {
+ try
{
- _txnContext.messageProcessed(protocolSession);
+ _currentMessage.deliverToQueues();
+ }
+ catch (NoRouteException e)
+ {
+ _returnMessages.add(e);
+ }
+ finally
+ {
+ // callback to allow the context to do any post message processing
+ // primary use is to allow message return processing in the non-tx case
+ _txnContext.messageProcessed(_session);
_currentMessage = null;
}
}
+
}
- public void publishContentBody(ContentBody contentBody, AMQProtocolSession protocolSession) throws AMQException
+ public void publishContentBody(ContentBody contentBody) throws AMQException
{
if (_currentMessage == null)
{
@@ -260,15 +245,11 @@ public class AMQChannel
// returns true iff the message was delivered (i.e. if all data was
// received
- if (_currentMessage.addContentBodyFrame(_storeContext,
- protocolSession.getMethodRegistry().getProtocolVersionMethodConverter().convertToContentChunk(
- contentBody)))
- {
- // callback to allow the context to do any post message processing
- // primary use is to allow message return processing in the non-tx case
- _txnContext.messageProcessed(protocolSession);
- _currentMessage = null;
- }
+ _currentMessage.addContentBodyFrame(
+ _session.getMethodRegistry().getProtocolVersionMethodConverter().convertToContentChunk(
+ contentBody));
+
+ deliverCurrentMessageIfComplete();
}
catch (AMQException e)
{
@@ -287,6 +268,7 @@ public class AMQChannel
}
catch (NoRouteException e)
{
+ //_currentMessage.incrementReference();
_returnMessages.add(e);
}
}
@@ -307,18 +289,17 @@ public class AMQChannel
*
* @param tag the tag chosen by the client (if null, server will generate one)
* @param queue the queue to subscribe to
- * @param session the protocol session of the subscriber
- * @param noLocal Flag stopping own messages being receivied.
- * @param exclusive Flag requesting exclusive access to the queue
* @param acks Are acks enabled for this subscriber
* @param filters Filters to apply to this subscriber
*
+ * @param noLocal Flag stopping own messages being receivied.
+ * @param exclusive Flag requesting exclusive access to the queue
* @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests
*
* @throws ConsumerTagNotUniqueException if the tag is not unique
* @throws AMQException if something goes wrong
*/
- public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, AMQProtocolSession session, boolean acks,
+ public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, boolean acks,
FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException, ConsumerTagNotUniqueException
{
if (tag == null)
@@ -326,77 +307,65 @@ public class AMQChannel
tag = new AMQShortString("sgen_" + getNextConsumerTag());
}
- if (_consumerTag2QueueMap.containsKey(tag))
+ if (_tag2SubscriptionMap.containsKey(tag))
{
throw new ConsumerTagNotUniqueException();
}
+ Subscription subscription =
+ SubscriptionFactoryImpl.INSTANCE.createSubscription(_channelId, _session, tag, acks, filters, noLocal, _creditManager);
+
+
+ // So to keep things straight we put before the call and catch all exceptions from the register and tidy up.
// We add before we register as the Async Delivery process may AutoClose the subscriber
// so calling _cT2QM.remove before we have done put which was after the register succeeded.
// So to keep things straight we put before the call and catch all exceptions from the register and tidy up.
- _consumerTag2QueueMap.put(tag, queue);
+
+ _tag2SubscriptionMap.put(tag, subscription);
try
{
- queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal, exclusive);
+ queue.registerSubscription(subscription, exclusive);
}
catch (AMQException e)
{
- _consumerTag2QueueMap.remove(tag);
+ _tag2SubscriptionMap.remove(tag);
throw e;
}
-
return tag;
}
/**
* Unsubscribe a consumer from a queue.
- * @param session
* @param consumerTag
* @return true if the consumerTag had a mapped queue that could be unregistered.
* @throws AMQException
*/
- public boolean unsubscribeConsumer(AMQProtocolSession session, AMQShortString consumerTag) throws AMQException
+ public boolean unsubscribeConsumer(AMQShortString consumerTag) throws AMQException
{
- if (_log.isDebugEnabled())
- {
- _log.debug("Unacked Map Dump size:" + _unacknowledgedMessageMap.size());
- _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
- {
-
- public boolean callback(UnacknowledgedMessage message) throws AMQException
- {
- _log.debug(message);
-
- return true;
- }
- public void visitComplete()
- {
- }
- });
- }
-
- AMQQueue q = _consumerTag2QueueMap.remove(consumerTag);
- if (q != null)
+ Subscription sub = _tag2SubscriptionMap.remove(consumerTag);
+ if (sub != null)
{
- q.unregisterProtocolSession(session, _channelId, consumerTag);
+ sub.getQueue().unregisterSubscription(sub);
return true;
}
+ else
+ {
+ _log.warn("Attempt to unsubscribe consumer with tag '"+consumerTag+"' which is not registered.");
+ }
return false;
}
/**
* Called from the protocol session to close this channel and clean up. T
*
- * @param session The session to close
- *
* @throws AMQException if there is an error during closure
*/
- public void close(AMQProtocolSession session) throws AMQException
+ public void close() throws AMQException
{
_txnContext.rollback();
- unsubscribeAllConsumers(session);
+ unsubscribeAllConsumers();
try
{
requeue();
@@ -414,11 +383,11 @@ public class AMQChannel
_closing = closing;
}
- private void unsubscribeAllConsumers(AMQProtocolSession session) throws AMQException
+ private void unsubscribeAllConsumers() throws AMQException
{
if (_log.isInfoEnabled())
{
- if (!_consumerTag2QueueMap.isEmpty())
+ if (!_tag2SubscriptionMap.isEmpty())
{
_log.info("Unsubscribing all consumers on channel " + toString());
}
@@ -428,17 +397,19 @@ public class AMQChannel
}
}
- for (Map.Entry<AMQShortString, AMQQueue> me : _consumerTag2QueueMap.entrySet())
+ for (Map.Entry<AMQShortString, Subscription> me : _tag2SubscriptionMap.entrySet())
{
if (_log.isInfoEnabled())
{
_log.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString());
}
- me.getValue().unregisterProtocolSession(session, _channelId, me.getKey());
+ Subscription sub = me.getValue();
+
+ sub.getQueue().unregisterSubscription(sub);
}
- _consumerTag2QueueMap.clear();
+ _tag2SubscriptionMap.clear();
}
/**
@@ -447,9 +418,9 @@ public class AMQChannel
* @param entry the record of the message on the queue that was delivered
* @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of the
* delivery tag)
- * @param consumerTag The tag for the consumer that is to acknowledge this message.
+ * @param subscription The consumer that is to acknowledge this message.
*/
- public void addUnacknowledgedMessage(QueueEntry entry, long deliveryTag, AMQShortString consumerTag)
+ public void addUnacknowledgedMessage(QueueEntry entry, long deliveryTag, Subscription subscription)
{
if (_log.isDebugEnabled())
{
@@ -462,16 +433,13 @@ public class AMQChannel
if (_log.isDebugEnabled())
{
_log.debug(debugIdentity() + " Adding unacked message(" + entry.getMessage().toString() + " DT:" + deliveryTag
- + ") with a queue(" + entry.getQueue() + ") for " + consumerTag);
+ + ") with a queue(" + entry.getQueue() + ") for " + subscription);
}
}
}
- synchronized (_unacknowledgedMessageMap.getLock())
- {
- _unacknowledgedMessageMap.add(deliveryTag, new UnacknowledgedMessage(entry, consumerTag, deliveryTag,_unacknowledgedMessageMap));
- checkSuspension();
- }
+ _unacknowledgedMessageMap.add(deliveryTag, entry);
+
}
private final String id = "(" + System.identityHashCode(this) + ")";
@@ -490,7 +458,7 @@ public class AMQChannel
public void requeue() throws AMQException
{
// we must create a new map since all the messages will get a new delivery tag when they are redelivered
- Collection<UnacknowledgedMessage> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
+ Collection<QueueEntry> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
// Deliver these messages out of the transaction as their delivery was never
// part of the transaction only the receive.
@@ -505,13 +473,9 @@ public class AMQChannel
if (!(_txnContext instanceof NonTransactionalContext))
{
- // if (_nonTransactedContext == null)
- {
- _nonTransactedContext =
- new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages);
- }
- deliveryContext = _nonTransactedContext;
+ deliveryContext =
+ new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages);
}
else
{
@@ -519,22 +483,23 @@ public class AMQChannel
}
}
- for (UnacknowledgedMessage unacked : messagesToBeDelivered)
+ for (QueueEntry unacked : messagesToBeDelivered)
{
if (!unacked.isQueueDeleted())
{
- // Ensure message is released for redelivery
- unacked.entry.release();
-
// Mark message redelivered
unacked.getMessage().setRedelivered(true);
+ // Ensure message is released for redelivery
+ unacked.release();
+
// Deliver Message
- deliveryContext.deliver(unacked.entry, false);
+ deliveryContext.requeue(unacked);
- // Should we allow access To the DM to directy deliver the message?
- // As we don't need to check for Consumers or worry about incrementing the message count?
- // unacked.queue.getDeliveryManager().deliver(_storeContext, unacked.queue.getName(), unacked.message, false);
+ }
+ else
+ {
+ unacked.discard(_storeContext);
}
}
@@ -549,32 +514,29 @@ public class AMQChannel
*/
public void requeue(long deliveryTag) throws AMQException
{
- UnacknowledgedMessage unacked = _unacknowledgedMessageMap.remove(deliveryTag);
+ QueueEntry unacked = _unacknowledgedMessageMap.remove(deliveryTag);
if (unacked != null)
{
+ // Mark message redelivered
+ unacked.getMessage().setRedelivered(true);
// Ensure message is released for redelivery
if (!unacked.isQueueDeleted())
{
- unacked.entry.release();
+ unacked.release();
}
- // Mark message redelivered
- unacked.getMessage().setRedelivered(true);
// Deliver these messages out of the transaction as their delivery was never
// part of the transaction only the receive.
TransactionalContext deliveryContext;
if (!(_txnContext instanceof NonTransactionalContext))
{
- // if (_nonTransactedContext == null)
- {
- _nonTransactedContext =
+
+ deliveryContext =
new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages);
- }
- deliveryContext = _nonTransactedContext;
}
else
{
@@ -584,7 +546,7 @@ public class AMQChannel
if (!unacked.isQueueDeleted())
{
// Redeliver the messages to the front of the queue
- deliveryContext.deliver(unacked.entry, true);
+ deliveryContext.requeue(unacked);
// Deliver increments the message count but we have already deliverted this once so don't increment it again
// this was because deliver did an increment changed this.
}
@@ -592,11 +554,8 @@ public class AMQChannel
{
_log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.getMessage().debugIdentity()
+ "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message.");
- // _log.error("Requested requeue of message:" + deliveryTag +
- // " but no queue defined using DeadLetter queue:" + getDeadLetterQueue());
- //
- // deliveryContext.deliver(unacked.message, getDeadLetterQueue(), false);
- //
+
+ unacked.discard(_storeContext);
}
}
else
@@ -604,25 +563,6 @@ public class AMQChannel
_log.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists."
+ _unacknowledgedMessageMap.size());
- if (_log.isDebugEnabled())
- {
- _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
- {
- int count = 0;
-
- public boolean callback(UnacknowledgedMessage message) throws AMQException
- {
- _log.debug(
- (count++) + ": (" + message.getMessage().debugIdentity() + ")" + "[" + message.deliveryTag + "]");
-
- return false; // Continue
- }
-
- public void visitComplete()
- {
- }
- });
- }
}
}
@@ -636,8 +576,10 @@ public class AMQChannel
*/
public void resend(final boolean requeue) throws AMQException
{
- final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>();
- final List<UnacknowledgedMessage> msgToResend = new LinkedList<UnacknowledgedMessage>();
+
+
+ final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
+ final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
if (_log.isDebugEnabled())
{
@@ -647,23 +589,25 @@ public class AMQChannel
// Process the Unacked-Map.
// Marking messages who still have a consumer for to be resent
// and those that don't to be requeued.
+
_unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
{
- public boolean callback(UnacknowledgedMessage message) throws AMQException
+ public boolean callback(final long deliveryTag, QueueEntry message) throws AMQException
{
- AMQShortString consumerTag = message.consumerTag;
+
AMQMessage msg = message.getMessage();
msg.setRedelivered(true);
- if (consumerTag != null)
+ final Subscription subscription = message.getDeliveredSubscription();
+ if (subscription != null)
{
// Consumer exists
- if (_consumerTag2QueueMap.containsKey(consumerTag))
+ if (!subscription.isClosed())
{
- msgToResend.add(message);
+ msgToResend.put(deliveryTag, message);
}
else // consumer has gone
{
- msgToRequeue.add(message);
+ msgToRequeue.put(deliveryTag, message);
}
}
else
@@ -675,7 +619,7 @@ public class AMQChannel
{
if (requeue)
{
- msgToRequeue.add(message);
+ msgToRequeue.put(deliveryTag, message);
}
else
{
@@ -684,7 +628,8 @@ public class AMQChannel
}
else
{
- _log.info("Message.queue is null and no DeadLetter Queue so dropping message:" + message);
+ message.discard(_storeContext);
+ _log.warn("Message.queue is null and no DeadLetter Queue so dropping message:" + message);
}
}
@@ -697,6 +642,8 @@ public class AMQChannel
}
});
+ _unacknowledgedMessageMap.clear();
+
// Process Messages to Resend
if (_log.isDebugEnabled())
{
@@ -710,9 +657,15 @@ public class AMQChannel
}
}
- for (UnacknowledgedMessage message : msgToResend)
+ for (Map.Entry<Long, QueueEntry> entry : msgToResend.entrySet())
{
+ QueueEntry message = entry.getValue();
+ long deliveryTag = entry.getKey();
+
+
+
AMQMessage msg = message.getMessage();
+ AMQQueue queue = message.getQueue();
// Our Java Client will always suspend the channel when resending!
// If the client has requested the messages be resent then it is
@@ -727,46 +680,20 @@ public class AMQChannel
// else
// {
// release to allow it to be delivered
- message.entry.release();
// Without any details from the client about what has been processed we have to mark
// all messages in the unacked map as redelivered.
msg.setRedelivered(true);
- Subscription sub = message.entry.getDeliveredSubscription();
+ Subscription sub = message.getDeliveredSubscription();
if (sub != null)
{
- // Get the lock so we can tell if the sub scription has closed.
- // will stop delivery to this subscription until the lock is released.
- // note: this approach would allow the use of a single queue if the
- // PreDeliveryQueue would allow head additions.
- // In the Java Qpid client we are suspended whilst doing this so it is all rather Mute..
- // needs guidance from AMQP WG Model SIG
- synchronized (sub.getSendLock())
+
+ if(!queue.resend(message, sub))
{
- if (sub.isClosed())
- {
- if (_log.isDebugEnabled())
- {
- _log.debug("Subscription(" + System.identityHashCode(sub)
- + ") closed during resend so requeuing message");
- }
- // move this message to requeue
- msgToRequeue.add(message);
- }
- else
- {
- if (_log.isDebugEnabled())
- {
- _log.debug("Requeuing " + msg.debugIdentity() + " for resend via sub:"
- + System.identityHashCode(sub));
- }
-
- sub.addToResendQueue(message.entry);
- _unacknowledgedMessageMap.remove(message.deliveryTag);
- }
- } // sync(sub.getSendLock)
+ msgToRequeue.put(deliveryTag, message);
+ }
}
else
{
@@ -777,7 +704,7 @@ public class AMQChannel
+ ")to prevent loss");
}
// move this message to requeue
- msgToRequeue.add(message);
+ msgToRequeue.put(deliveryTag, message);
}
} // for all messages
// } else !isSuspend
@@ -795,13 +722,9 @@ public class AMQChannel
TransactionalContext deliveryContext;
if (!(_txnContext instanceof NonTransactionalContext))
{
- if (_nonTransactedContext == null)
- {
- _nonTransactedContext =
- new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages);
- }
- deliveryContext = _nonTransactedContext;
+ deliveryContext =
+ new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages);
}
else
{
@@ -809,14 +732,17 @@ public class AMQChannel
}
// Process Messages to Requeue at the front of the queue
- for (UnacknowledgedMessage message : msgToRequeue)
+ for (Map.Entry<Long, QueueEntry> entry : msgToRequeue.entrySet())
{
- message.entry.release();
- message.entry.setRedelivered(true);
+ QueueEntry message = entry.getValue();
+ long deliveryTag = entry.getKey();
+
+ message.release();
+ message.setRedelivered(true);
- deliveryContext.deliver(message.entry, true);
+ deliveryContext.requeue(message);
- _unacknowledgedMessageMap.remove(message.deliveryTag);
+ _unacknowledgedMessageMap.remove(deliveryTag);
}
}
@@ -827,38 +753,47 @@ public class AMQChannel
*
* @param queue the queue that has been deleted
*
- * @throws org.apache.qpid.AMQException if there is an error processing the unacked messages
*/
- public void queueDeleted(final AMQQueue queue) throws AMQException
+ /* public void queueDeleted(final AMQQueue queue)
{
- _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
+ try
{
- public boolean callback(UnacknowledgedMessage message) throws AMQException
+ _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
{
- if (message.getQueue() == queue)
+ public boolean callback(UnacknowledgedMessage message)
{
- try
+ if (message.getQueue() == queue)
{
- message.discard(_storeContext);
- message.setQueueDeleted(true);
+ try
+ {
+ message.discard(_storeContext);
+ message.setQueueDeleted(true);
+ }
+ catch (AMQException e)
+ {
+ _log.error(
+ "Error decrementing ref count on message " + message.getMessage().getMessageId() + ": " + e, e);
+ throw new RuntimeException(e);
+ }
}
- catch (AMQException e)
- {
- _log.error(
- "Error decrementing ref count on message " + message.getMessage().getMessageId() + ": " + e, e);
- }
+
+ return false;
}
- return false;
- }
+ public void visitComplete()
+ {
+ }
+ });
+ }
+ catch (AMQException e)
+ {
+ _log.error("Unexpected Error while handling deletion of queue", e);
+ throw new RuntimeException(e);
+ }
- public void visitComplete()
- {
- }
- });
}
-
+*/
/**
* Acknowledge one or more messages.
*
@@ -870,23 +805,7 @@ public class AMQChannel
*/
public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException
{
- synchronized (_unacknowledgedMessageMap.getLock())
- {
- if (_log.isDebugEnabled())
- {
- _log.debug("Unacked (PreAck) Size:" + _unacknowledgedMessageMap.size());
- }
-
- _unacknowledgedMessageMap.acknowledgeMessage(deliveryTag, multiple, _txnContext);
-
- if (_log.isDebugEnabled())
- {
- _log.debug("Unacked (PostAck) Size:" + _unacknowledgedMessageMap.size());
- }
-
- }
-
- checkSuspension();
+ _unacknowledgedMessageMap.acknowledgeMessage(deliveryTag, multiple, _txnContext);
}
/**
@@ -899,43 +818,22 @@ public class AMQChannel
return _unacknowledgedMessageMap;
}
- private void checkSuspension()
- {
- boolean suspend;
-
- suspend =
- ((_prefetch_HighWaterMark != 0) && (_unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark))
- || ((_prefetchSize != 0) && (_prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes()));
-
- setSuspended(suspend);
- }
public void setSuspended(boolean suspended)
{
- boolean isSuspended = _suspended.get();
- if (isSuspended && !suspended)
- {
- // Continue being suspended if we are above the _prefetch_LowWaterMark
- suspended = _unacknowledgedMessageMap.size() > _prefetch_LowWaterMark;
- }
boolean wasSuspended = _suspended.getAndSet(suspended);
if (wasSuspended != suspended)
{
if (wasSuspended)
{
- _log.debug("Unsuspending channel " + this);
// may need to deliver queued messages
- for (AMQQueue q : _consumerTag2QueueMap.values())
+ for (Subscription s : _tag2SubscriptionMap.values())
{
- q.deliverAsync();
+ s.getQueue().deliverAsync(s);
}
}
- else
- {
- _log.debug("Suspending channel " + this);
- }
}
}
@@ -961,12 +859,7 @@ public class AMQChannel
public String toString()
{
- StringBuilder sb = new StringBuilder(30);
- sb.append("Channel: id ").append(_channelId).append(", transaction mode: ").append(isTransactional());
- sb.append(", prefetch marks: ").append(_prefetch_LowWaterMark);
- sb.append("/").append(_prefetch_HighWaterMark);
-
- return sb.toString();
+ return "["+_session.toString()+":"+_channelId+"]";
}
public void setDefaultQueue(AMQQueue queue)
@@ -984,14 +877,14 @@ public class AMQChannel
return _storeContext;
}
- public void processReturns(AMQProtocolSession session) throws AMQException
+ public void processReturns() throws AMQException
{
if (!_returnMessages.isEmpty())
{
for (RequiredDeliveryException bouncedMessage : _returnMessages)
{
AMQMessage message = bouncedMessage.getAMQMessage();
- session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(),
+ _session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(),
new AMQShortString(bouncedMessage.getMessage()));
message.decrementReference(_storeContext);
@@ -1001,40 +894,68 @@ public class AMQChannel
}
}
- public boolean wouldSuspend(AMQMessage msg)
+
+ public TransactionalContext getTransactionalContext()
{
- if (isSuspended())
- {
- return true;
- }
- else
- {
- boolean willSuspend =
- ((_prefetch_HighWaterMark != 0) && ((_unacknowledgedMessageMap.size() + 1) > _prefetch_HighWaterMark));
- if (!willSuspend)
- {
- final long unackedSize = _unacknowledgedMessageMap.getUnacknowledgeBytes();
+ return _txnContext;
+ }
- willSuspend = (_prefetchSize != 0) && (unackedSize != 0) && (_prefetchSize < (msg.getSize() + unackedSize));
- }
+ public boolean isClosing()
+ {
+ return _closing;
+ }
- if (willSuspend)
- {
- setSuspended(true);
- }
+ public AMQProtocolSession getProtocolSession()
+ {
+ return _session;
+ }
- return willSuspend;
- }
+ public FlowCreditManager getCreditManager()
+ {
+ return _creditManager;
+ }
+ public void setCredit(final long prefetchSize, final int prefetchCount)
+ {
+ _creditManager.setCreditLimits(prefetchSize, prefetchCount);
}
- public TransactionalContext getTransactionalContext()
+ public List<RequiredDeliveryException> getReturnMessages()
{
- return _txnContext;
+ return _returnMessages;
}
- public boolean isClosing()
+ public MessageStore getMessageStore()
{
- return _closing;
+ return _messageStore;
+ }
+
+ private final ClientDeliveryMethod _clientDeliveryMethod = new ClientDeliveryMethod()
+ {
+
+ public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+ throws AMQException
+ {
+ getProtocolSession().getProtocolOutputConverter().writeDeliver(entry.getMessage(), getChannelId(), deliveryTag, sub.getConsumerTag());
+ }
+ };
+
+ public ClientDeliveryMethod getClientDeliveryMethod()
+ {
+ return _clientDeliveryMethod;
+ }
+
+ private final RecordDeliveryMethod _recordDeliveryMethod = new RecordDeliveryMethod()
+ {
+
+ public void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+ {
+ addUnacknowledgedMessage(entry, deliveryTag, sub);
+ }
+ };
+
+ public RecordDeliveryMethod getRecordDeliveryMethod()
+ {
+ return _recordDeliveryMethod;
}
}