diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java | 1465 |
1 files changed, 1465 insertions, 0 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java new file mode 100644 index 0000000000..08eb05680c --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -0,0 +1,1465 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server; + +import org.apache.log4j.Logger; + +import org.apache.qpid.AMQConnectionException; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQSecurityException; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.ack.UnacknowledgedMessageMap; +import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl; +import org.apache.qpid.server.configuration.ConfigStore; +import org.apache.qpid.server.configuration.ConfiguredObject; +import org.apache.qpid.server.configuration.ConnectionConfig; +import org.apache.qpid.server.configuration.SessionConfig; +import org.apache.qpid.server.configuration.SessionConfigType; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.flow.FlowCreditManager; +import org.apache.qpid.server.flow.Pre0_10CreditManager; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.actors.AMQPChannelActor; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.messages.ChannelMessages; +import org.apache.qpid.server.logging.subjects.ChannelLogSubject; +import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.server.message.MessageMetaData; +import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.output.ProtocolOutputConverter; +import org.apache.qpid.server.protocol.AMQProtocolEngine; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.queue.IncomingMessage; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.subscription.ClientDeliveryMethod; +import org.apache.qpid.server.subscription.RecordDeliveryMethod; +import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; +import org.apache.qpid.server.txn.AutoCommitTransaction; +import org.apache.qpid.server.txn.LocalTransaction; +import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.virtualhost.VirtualHost; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +public class AMQChannel implements SessionConfig, AMQSessionModel +{ + public static final int DEFAULT_PREFETCH = 5000; + + private static final Logger _logger = Logger.getLogger(AMQChannel.class); + + private static final boolean MSG_AUTH = + ApplicationRegistry.getInstance().getConfiguration().getMsgAuth(); + + + private final int _channelId; + + + private final Pre0_10CreditManager _creditManager = new Pre0_10CreditManager(0l,0l); + + /** + * The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that + * value of this represents the <b>last</b> tag sent out + */ + private long _deliveryTag = 0; + + /** A channel has a default queue (the last declared) that is used when no queue name is explictily set */ + private AMQQueue _defaultQueue; + + /** This tag is unique per subscription to a queue. The server returns this in response to a basic.consume request. */ + private int _consumerTag; + + /** + * The current message - which may be partial in the sense that not all frames have been received yet - which has + * been received by this channel. As the frames are received the message gets updated and once all frames have been + * received the message can then be routed. + */ + private IncomingMessage _currentMessage; + + /** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */ + protected final Map<AMQShortString, Subscription> _tag2SubscriptionMap = new HashMap<AMQShortString, Subscription>(); + + private final MessageStore _messageStore; + + private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH); + + // Set of messages being acknoweledged in the current transaction + private SortedSet<QueueEntry> _acknowledgedMessages = new TreeSet<QueueEntry>(); + + private final AtomicBoolean _suspended = new AtomicBoolean(false); + + private ServerTransaction _transaction; + + private final AtomicLong _txnStarts = new AtomicLong(0); + private final AtomicLong _txnCommits = new AtomicLong(0); + private final AtomicLong _txnRejects = new AtomicLong(0); + private final AtomicLong _txnCount = new AtomicLong(0); + private final AtomicLong _txnUpdateTime = new AtomicLong(0); + + private final AMQProtocolSession _session; + private AtomicBoolean _closing = new AtomicBoolean(false); + + private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new ConcurrentHashMap<AMQQueue, Boolean>(); + + private final AtomicBoolean _blocking = new AtomicBoolean(false); + + + private LogActor _actor; + private LogSubject _logSubject; + private volatile boolean _rollingBack; + + private static final Runnable NULL_TASK = new Runnable() { public void run() {} }; + private List<QueueEntry> _resendList = new ArrayList<QueueEntry>(); + private static final + AMQShortString IMMEDIATE_DELIVERY_REPLY_TEXT = new AMQShortString("Immediate delivery is not possible."); + private final UUID _id; + private long _createTime = System.currentTimeMillis(); + + public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore) + throws AMQException + { + _session = session; + _channelId = channelId; + + _actor = new AMQPChannelActor(this, session.getLogActor().getRootMessageLogger()); + _logSubject = new ChannelLogSubject(this); + _id = getConfigStore().createId(); + _actor.message(ChannelMessages.CREATE()); + + getConfigStore().addConfiguredObject(this); + + _messageStore = messageStore; + + // by default the session is non-transactional + _transaction = new AutoCommitTransaction(_messageStore); + } + + public ConfigStore getConfigStore() + { + return getVirtualHost().getConfigStore(); + } + + /** Sets this channel to be part of a local transaction */ + public void setLocalTransactional() + { + _transaction = new LocalTransaction(_messageStore); + _txnStarts.incrementAndGet(); + } + + public boolean isTransactional() + { + // this does not look great but there should only be one "non-transactional" + // transactional context, while there could be several transactional ones in + // theory + return !(_transaction instanceof AutoCommitTransaction); + } + + public boolean inTransaction() + { + return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0; + } + + private void incrementOutstandingTxnsIfNecessary() + { + if(isTransactional()) + { + //There can currently only be at most one outstanding transaction + //due to only having LocalTransaction support. Set value to 1 if 0. + _txnCount.compareAndSet(0,1); + } + } + + private void decrementOutstandingTxnsIfNecessary() + { + if(isTransactional()) + { + //There can currently only be at most one outstanding transaction + //due to only having LocalTransaction support. Set value to 0 if 1. + _txnCount.compareAndSet(1,0); + } + } + + public Long getTxnStarts() + { + return _txnStarts.get(); + } + + public Long getTxnCommits() + { + return _txnCommits.get(); + } + + public Long getTxnRejects() + { + return _txnRejects.get(); + } + + public Long getTxnCount() + { + return _txnCount.get(); + } + + public int getChannelId() + { + return _channelId; + } + + public void setPublishFrame(MessagePublishInfo info, final Exchange e) throws AMQSecurityException + { + if (!getVirtualHost().getSecurityManager().authorisePublish(info.isImmediate(), info.getRoutingKey().asString(), e.getName())) + { + throw new AMQSecurityException("Permission denied: " + e.getName()); + } + _currentMessage = new IncomingMessage(info); + _currentMessage.setExchange(e); + } + + public void publishContentHeader(ContentHeaderBody contentHeaderBody) + throws AMQException + { + if (_currentMessage == null) + { + throw new AMQException("Received content header without previously receiving a BasicPublish frame"); + } + else + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Content header received on channel " + _channelId); + } + + _currentMessage.setContentHeaderBody(contentHeaderBody); + + _currentMessage.setExpiration(); + + + MessageMetaData mmd = _currentMessage.headersReceived(); + final StoredMessage<MessageMetaData> handle = _messageStore.addMessage(mmd); + _currentMessage.setStoredMessage(handle); + + routeCurrentMessage(); + + + _transaction.addPostTransactionAction(new ServerTransaction.Action() + { + + public void postCommit() + { + } + + public void onRollback() + { + handle.remove(); + } + }); + + deliverCurrentMessageIfComplete(); + } + } + + private void deliverCurrentMessageIfComplete() + throws AMQException + { + // check and deliver if header says body length is zero + if (_currentMessage.allContentReceived()) + { + try + { + _currentMessage.getStoredMessage().flushToStore(); + + final ArrayList<? extends BaseQueue> destinationQueues = _currentMessage.getDestinationQueues(); + + if(!checkMessageUserId(_currentMessage.getContentHeader())) + { + _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.ACCESS_REFUSED, "Access Refused", _currentMessage)); + } + else + { + if(destinationQueues == null || _currentMessage.getDestinationQueues().isEmpty()) + { + if (_currentMessage.isMandatory() || _currentMessage.isImmediate()) + { + _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message", _currentMessage)); + } + else + { + _logger.warn("MESSAGE DISCARDED: No routes for message - " + createAMQMessage(_currentMessage)); + } + + } + else + { + _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues, isTransactional())); + incrementOutstandingTxnsIfNecessary(); + updateTransactionalActivity(); + } + } + } + finally + { + long bodySize = _currentMessage.getSize(); + long timestamp = ((BasicContentHeaderProperties) _currentMessage.getContentHeader().getProperties()).getTimestamp(); + _session.registerMessageReceived(bodySize, timestamp); + _currentMessage = null; + } + } + + } + + public void publishContentBody(ContentBody contentBody) throws AMQException + { + if (_currentMessage == null) + { + throw new AMQException("Received content body without previously receiving a JmsPublishBody"); + } + + if (_logger.isDebugEnabled()) + { + _logger.debug(debugIdentity() + "Content body received on channel " + _channelId); + } + + try + { + + // returns true iff the message was delivered (i.e. if all data was + // received + final ContentChunk contentChunk = + _session.getMethodRegistry().getProtocolVersionMethodConverter().convertToContentChunk(contentBody); + + _currentMessage.addContentBodyFrame(contentChunk); + + deliverCurrentMessageIfComplete(); + } + catch (AMQException e) + { + // we want to make sure we don't keep a reference to the message in the + // event of an error + _currentMessage = null; + throw e; + } + } + + protected void routeCurrentMessage() throws AMQException + { + _currentMessage.route(); + } + + public long getNextDeliveryTag() + { + return ++_deliveryTag; + } + + public int getNextConsumerTag() + { + return ++_consumerTag; + } + + + public Subscription getSubscription(AMQShortString subscription) + { + return _tag2SubscriptionMap.get(subscription); + } + + /** + * Subscribe to a queue. We register all subscriptions in the channel so that if the channel is closed we can clean + * up all subscriptions, even if the client does not explicitly unsubscribe from all queues. + * + * @param tag the tag chosen by the client (if null, server will generate one) + * @param queue the queue to subscribe to + * @param acks Are acks enabled for this subscriber + * @param filters Filters to apply to this subscriber + * + * @param noLocal Flag stopping own messages being receivied. + * @param exclusive Flag requesting exclusive access to the queue + * @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests + * + * @throws AMQException if something goes wrong + */ + public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, boolean acks, + FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException + { + if (tag == null) + { + tag = new AMQShortString("sgen_" + getNextConsumerTag()); + } + + if (_tag2SubscriptionMap.containsKey(tag)) + { + throw new AMQException("Consumer already exists with same tag: " + tag); + } + + Subscription subscription = + SubscriptionFactoryImpl.INSTANCE.createSubscription(_channelId, _session, tag, acks, filters, noLocal, _creditManager); + + + // So to keep things straight we put before the call and catch all exceptions from the register and tidy up. + // We add before we register as the Async Delivery process may AutoClose the subscriber + // so calling _cT2QM.remove before we have done put which was after the register succeeded. + // So to keep things straight we put before the call and catch all exceptions from the register and tidy up. + + _tag2SubscriptionMap.put(tag, subscription); + + try + { + queue.registerSubscription(subscription, exclusive); + } + catch (AMQException e) + { + _tag2SubscriptionMap.remove(tag); + throw e; + } + return tag; + } + + /** + * Unsubscribe a consumer from a queue. + * @param consumerTag + * @return true if the consumerTag had a mapped queue that could be unregistered. + * @throws AMQException + */ + public boolean unsubscribeConsumer(AMQShortString consumerTag) throws AMQException + { + + Subscription sub = _tag2SubscriptionMap.remove(consumerTag); + if (sub != null) + { + try + { + sub.getSendLock(); + sub.getQueue().unregisterSubscription(sub); + } + finally + { + sub.releaseSendLock(); + } + return true; + } + else + { + _logger.warn("Attempt to unsubscribe consumer with tag '"+consumerTag+"' which is not registered."); + } + return false; + } + + /** + * Called from the protocol session to close this channel and clean up. T + * + * @throws AMQException if there is an error during closure + */ + public void close() throws AMQException + { + if(!_closing.compareAndSet(false, true)) + { + //Channel is already closing + return; + } + + CurrentActor.get().message(_logSubject, ChannelMessages.CLOSE()); + + unsubscribeAllConsumers(); + _transaction.rollback(); + + try + { + requeue(); + } + catch (AMQException e) + { + _logger.error("Caught AMQException whilst attempting to reque:" + e); + } + + getConfigStore().removeConfiguredObject(this); + + } + + private void unsubscribeAllConsumers() throws AMQException + { + if (_logger.isInfoEnabled()) + { + if (!_tag2SubscriptionMap.isEmpty()) + { + _logger.info("Unsubscribing all consumers on channel " + toString()); + } + else + { + _logger.info("No consumers to unsubscribe on channel " + toString()); + } + } + + for (Map.Entry<AMQShortString, Subscription> me : _tag2SubscriptionMap.entrySet()) + { + if (_logger.isInfoEnabled()) + { + _logger.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString()); + } + + Subscription sub = me.getValue(); + + try + { + sub.getSendLock(); + sub.getQueue().unregisterSubscription(sub); + } + finally + { + sub.releaseSendLock(); + } + + } + + _tag2SubscriptionMap.clear(); + } + + /** + * Add a message to the channel-based list of unacknowledged messages + * + * @param entry the record of the message on the queue that was delivered + * @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of the + * delivery tag) + * @param subscription The consumer that is to acknowledge this message. + */ + public void addUnacknowledgedMessage(QueueEntry entry, long deliveryTag, Subscription subscription) + { + if (_logger.isDebugEnabled()) + { + if (entry.getQueue() == null) + { + _logger.debug("Adding unacked message with a null queue:" + entry); + } + else + { + if (_logger.isDebugEnabled()) + { + _logger.debug(debugIdentity() + " Adding unacked message(" + entry.getMessage().toString() + " DT:" + deliveryTag + + ") with a queue(" + entry.getQueue() + ") for " + subscription); + } + } + } + + _unacknowledgedMessageMap.add(deliveryTag, entry); + + } + + private final String id = "(" + System.identityHashCode(this) + ")"; + + public String debugIdentity() + { + return _channelId + id; + } + + /** + * Called to attempt re-delivery all outstanding unacknowledged messages on the channel. May result in delivery to + * this same channel or to other subscribers. + * + * @throws org.apache.qpid.AMQException if the requeue fails + */ + public void requeue() throws AMQException + { + // we must create a new map since all the messages will get a new delivery tag when they are redelivered + Collection<QueueEntry> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages(); + + if (!messagesToBeDelivered.isEmpty()) + { + if (_logger.isInfoEnabled()) + { + _logger.info("Requeuing " + messagesToBeDelivered.size() + " unacked messages. for " + toString()); + } + + } + + for (QueueEntry unacked : messagesToBeDelivered) + { + if (!unacked.isQueueDeleted()) + { + // Mark message redelivered + unacked.setRedelivered(); + + // Ensure message is released for redelivery + unacked.release(); + + } + else + { + unacked.discard(); + } + } + + } + + /** + * Requeue a single message + * + * @param deliveryTag The message to requeue + * + * @throws AMQException If something goes wrong. + */ + public void requeue(long deliveryTag) throws AMQException + { + QueueEntry unacked = _unacknowledgedMessageMap.remove(deliveryTag); + + if (unacked != null) + { + // Mark message redelivered + unacked.setRedelivered(); + + // Ensure message is released for redelivery + if (!unacked.isQueueDeleted()) + { + + // Ensure message is released for redelivery + unacked.release(); + + } + else + { + _logger.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked + + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message."); + + unacked.discard(); + } + } + else + { + _logger.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists." + + _unacknowledgedMessageMap.size()); + + } + + } + + /** + * Called to resend all outstanding unacknowledged messages to this same channel. + * + * @param requeue Are the messages to be requeued or dropped. + * + * @throws AMQException When something goes wrong. + */ + public void resend(final boolean requeue) throws AMQException + { + + + final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>(); + final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>(); + + if (_logger.isDebugEnabled()) + { + _logger.debug("unacked map Size:" + _unacknowledgedMessageMap.size()); + } + + // Process the Unacked-Map. + // Marking messages who still have a consumer for to be resent + // and those that don't to be requeued. + _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, + msgToRequeue, + msgToResend, + requeue, + _messageStore)); + + + // Process Messages to Resend + if (_logger.isDebugEnabled()) + { + if (!msgToResend.isEmpty()) + { + _logger.debug("Preparing (" + msgToResend.size() + ") message to resend."); + } + else + { + _logger.debug("No message to resend."); + } + } + + for (Map.Entry<Long, QueueEntry> entry : msgToResend.entrySet()) + { + QueueEntry message = entry.getValue(); + long deliveryTag = entry.getKey(); + + + + ServerMessage msg = message.getMessage(); + AMQQueue queue = message.getQueue(); + + // Our Java Client will always suspend the channel when resending! + // If the client has requested the messages be resent then it is + // their responsibility to ensure that thay are capable of receiving them + // i.e. The channel hasn't been server side suspended. + // if (isSuspended()) + // { + // _logger.info("Channel is suspended so requeuing"); + // //move this message to requeue + // msgToRequeue.add(message); + // } + // else + // { + // release to allow it to be delivered + + // Without any details from the client about what has been processed we have to mark + // all messages in the unacked map as redelivered. + message.setRedelivered(); + + Subscription sub = message.getDeliveredSubscription(); + + if (sub != null) + { + + if(!queue.resend(message,sub)) + { + msgToRequeue.put(deliveryTag, message); + } + } + else + { + + if (_logger.isInfoEnabled()) + { + _logger.info("DeliveredSubscription not recorded so just requeueing(" + message.toString() + + ")to prevent loss"); + } + // move this message to requeue + msgToRequeue.put(deliveryTag, message); + } + } // for all messages + // } else !isSuspend + + if (_logger.isInfoEnabled()) + { + if (!msgToRequeue.isEmpty()) + { + _logger.info("Preparing (" + msgToRequeue.size() + ") message to requeue to."); + } + } + + // Process Messages to Requeue at the front of the queue + for (Map.Entry<Long, QueueEntry> entry : msgToRequeue.entrySet()) + { + QueueEntry message = entry.getValue(); + long deliveryTag = entry.getKey(); + _unacknowledgedMessageMap.remove(deliveryTag); + + message.setRedelivered(); + message.release(); + + } + } + + + /** + * Acknowledge one or more messages. + * + * @param deliveryTag the last delivery tag + * @param multiple if true will acknowledge all messages up to an including the delivery tag. if false only + * acknowledges the single message specified by the delivery tag + * + * @throws AMQException if the delivery tag is unknown (e.g. not outstanding) on this channel + */ + public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException + { + Collection<QueueEntry> ackedMessages = getAckedMessages(deliveryTag, multiple); + _transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages)); + updateTransactionalActivity(); + } + + private Collection<QueueEntry> getAckedMessages(long deliveryTag, boolean multiple) + { + + Map<Long, QueueEntry> ackedMessageMap = new LinkedHashMap<Long,QueueEntry>(); + _unacknowledgedMessageMap.collect(deliveryTag, multiple, ackedMessageMap); + _unacknowledgedMessageMap.remove(ackedMessageMap); + return ackedMessageMap.values(); + } + + /** + * Used only for testing purposes. + * + * @return the map of unacknowledged messages + */ + public UnacknowledgedMessageMap getUnacknowledgedMessageMap() + { + return _unacknowledgedMessageMap; + } + + /** + * Called from the ChannelFlowHandler to suspend this Channel + * @param suspended boolean, should this Channel be suspended + */ + public void setSuspended(boolean suspended) + { + boolean wasSuspended = _suspended.getAndSet(suspended); + if (wasSuspended != suspended) + { + // Log Flow Started before we start the subscriptions + if (!suspended) + { + _actor.message(_logSubject, ChannelMessages.FLOW("Started")); + } + + + // This section takes two different approaches to perform to perform + // the same function. Ensuring that the Subscription has taken note + // of the change in Channel State + + // Here we have become unsuspended and so we ask each the queue to + // perform an Async delivery for each of the subscriptions in this + // Channel. The alternative would be to ensure that the subscription + // had received the change in suspension state. That way the logic + // behind decieding to start an async delivery was located with the + // Subscription. + if (wasSuspended) + { + // may need to deliver queued messages + for (Subscription s : _tag2SubscriptionMap.values()) + { + s.getQueue().deliverAsync(s); + } + } + + + // Here we have become suspended so we need to ensure that each of + // the Subscriptions has noticed this change so that we can be sure + // they are not still sending messages. Again the code here is a + // very simplistic approach to ensure that the change of suspension + // has been noticed by each of the Subscriptions. Unlike the above + // case we don't actually need to do anything else. + if (!wasSuspended) + { + // may need to deliver queued messages + for (Subscription s : _tag2SubscriptionMap.values()) + { + try + { + s.getSendLock(); + } + finally + { + s.releaseSendLock(); + } + } + } + + + // Log Suspension only after we have confirmed all suspensions are + // stopped. + if (suspended) + { + _actor.message(_logSubject, ChannelMessages.FLOW("Stopped")); + } + + } + } + + public boolean isSuspended() + { + return _suspended.get() || _closing.get() || _session.isClosing(); + } + + public void commit() throws AMQException + { + if (!isTransactional()) + { + throw new AMQException("Fatal error: commit called on non-transactional channel"); + } + + _transaction.commit(); + + _txnCommits.incrementAndGet(); + _txnStarts.incrementAndGet(); + decrementOutstandingTxnsIfNecessary(); + } + + public void rollback() throws AMQException + { + rollback(NULL_TASK); + } + + public void rollback(Runnable postRollbackTask) throws AMQException + { + if (!isTransactional()) + { + throw new AMQException("Fatal error: commit called on non-transactional channel"); + } + + // stop all subscriptions + _rollingBack = true; + boolean requiresSuspend = _suspended.compareAndSet(false,true); + + // ensure all subscriptions have seen the change to the channel state + for(Subscription sub : _tag2SubscriptionMap.values()) + { + sub.getSendLock(); + sub.releaseSendLock(); + } + + try + { + _transaction.rollback(); + } + finally + { + _rollingBack = false; + + _txnRejects.incrementAndGet(); + _txnStarts.incrementAndGet(); + decrementOutstandingTxnsIfNecessary(); + } + + postRollbackTask.run(); + + for(QueueEntry entry : _resendList) + { + Subscription sub = entry.getDeliveredSubscription(); + if(sub == null || sub.isClosed()) + { + entry.release(); + } + else + { + sub.getQueue().resend(entry, sub); + } + } + _resendList.clear(); + + if(requiresSuspend) + { + _suspended.set(false); + for(Subscription sub : _tag2SubscriptionMap.values()) + { + sub.getQueue().deliverAsync(sub); + } + + } + + + } + + /** + * Update last transaction activity timestamp + */ + private void updateTransactionalActivity() + { + if (isTransactional()) + { + _txnUpdateTime.set(System.currentTimeMillis()); + } + } + + public String toString() + { + return "["+_session.toString()+":"+_channelId+"]"; + } + + public void setDefaultQueue(AMQQueue queue) + { + _defaultQueue = queue; + } + + public AMQQueue getDefaultQueue() + { + return _defaultQueue; + } + + + public boolean isClosing() + { + return _closing.get(); + } + + public AMQProtocolSession getProtocolSession() + { + return _session; + } + + public FlowCreditManager getCreditManager() + { + return _creditManager; + } + + public void setCredit(final long prefetchSize, final int prefetchCount) + { + _actor.message(ChannelMessages.PREFETCH_SIZE(prefetchSize, prefetchCount)); + _creditManager.setCreditLimits(prefetchSize, prefetchCount); + } + + public MessageStore getMessageStore() + { + return _messageStore; + } + + private final ClientDeliveryMethod _clientDeliveryMethod = new ClientDeliveryMethod() + { + + public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag) + throws AMQException + { + getProtocolSession().getProtocolOutputConverter().writeDeliver(entry, getChannelId(), + deliveryTag, sub.getConsumerTag()); + _session.registerMessageDelivered(entry.getMessage().getSize()); + } + + }; + + public ClientDeliveryMethod getClientDeliveryMethod() + { + return _clientDeliveryMethod; + } + + private final RecordDeliveryMethod _recordDeliveryMethod = new RecordDeliveryMethod() + { + + public void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag) + { + addUnacknowledgedMessage(entry, deliveryTag, sub); + } + }; + + public RecordDeliveryMethod getRecordDeliveryMethod() + { + return _recordDeliveryMethod; + } + + + private AMQMessage createAMQMessage(IncomingMessage incomingMessage) + throws AMQException + { + + AMQMessage message = new AMQMessage(incomingMessage.getStoredMessage()); + + message.setExpiration(incomingMessage.getExpiration()); + message.setClientIdentifier(_session); + return message; + } + + private boolean checkMessageUserId(ContentHeaderBody header) + { + AMQShortString userID = + header.getProperties() instanceof BasicContentHeaderProperties + ? ((BasicContentHeaderProperties) header.getProperties()).getUserId() + : null; + + return (!MSG_AUTH || _session.getPrincipal().getName().equals(userID == null? "" : userID.toString())); + + } + + public Object getID() + { + return _channelId; + } + + public AMQConnectionModel getConnectionModel() + { + return _session; + } + + public String getClientID() + { + return String.valueOf(_session.getContextKey()); + } + + public LogSubject getLogSubject() + { + return _logSubject; + } + + private class MessageDeliveryAction implements ServerTransaction.Action + { + private IncomingMessage _incommingMessage; + private ArrayList<? extends BaseQueue> _destinationQueues; + + public MessageDeliveryAction(IncomingMessage currentMessage, + ArrayList<? extends BaseQueue> destinationQueues, + boolean transactional) + { + _incommingMessage = currentMessage; + _destinationQueues = destinationQueues; + } + + public void postCommit() + { + try + { + final boolean immediate = _incommingMessage.isImmediate(); + + final AMQMessage amqMessage = createAMQMessage(_incommingMessage); + MessageReference ref = amqMessage.newReference(); + + for(final BaseQueue queue : _destinationQueues) + { + BaseQueue.PostEnqueueAction action; + + if(immediate) + { + action = new ImmediateAction(queue); + } + else + { + action = null; + } + + queue.enqueue(amqMessage, action); + + if(queue instanceof AMQQueue) + { + ((AMQQueue)queue).checkCapacity(AMQChannel.this); + } + + } + ref.release(); + } + catch (AMQException e) + { + // TODO + throw new RuntimeException(e); + } + + + + + + } + + public void onRollback() + { + // Maybe keep track of entries that were created and then delete them here in case of failure + // to in memory enqueue + } + + private class ImmediateAction implements BaseQueue.PostEnqueueAction + { + private final BaseQueue _queue; + + public ImmediateAction(BaseQueue queue) + { + _queue = queue; + } + + public void onEnqueue(QueueEntry entry) + { + if (!entry.getDeliveredToConsumer() && entry.acquire()) + { + + + ServerTransaction txn = new LocalTransaction(_messageStore); + Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1); + entries.add(entry); + final AMQMessage message = (AMQMessage) entry.getMessage(); + txn.dequeue(_queue, entry.getMessage(), + new MessageAcknowledgeAction(entries) + { + @Override + public void postCommit() + { + try + { + final + ProtocolOutputConverter outputConverter = + _session.getProtocolOutputConverter(); + + outputConverter.writeReturn(message.getMessagePublishInfo(), + message.getContentHeaderBody(), + message, + _channelId, + AMQConstant.NO_CONSUMERS.getCode(), + IMMEDIATE_DELIVERY_REPLY_TEXT); + } + catch (AMQException e) + { + throw new RuntimeException(e); + } + super.postCommit(); + } + } + ); + txn.commit(); + + + } + + } + } + } + + private class MessageAcknowledgeAction implements ServerTransaction.Action + { + private final Collection<QueueEntry> _ackedMessages; + + + public MessageAcknowledgeAction(Collection<QueueEntry> ackedMessages) + { + _ackedMessages = ackedMessages; + } + + public void postCommit() + { + try + { + for(QueueEntry entry : _ackedMessages) + { + entry.discard(); + } + } + finally + { + _acknowledgedMessages.clear(); + } + + } + + public void onRollback() + { + // explicit rollbacks resend the message after the rollback-ok is sent + if(_rollingBack) + { + _resendList.addAll(_ackedMessages); + } + else + { + try + { + for(QueueEntry entry : _ackedMessages) + { + entry.release(); + } + } + finally + { + _acknowledgedMessages.clear(); + } + } + + } + } + + private class WriteReturnAction implements ServerTransaction.Action + { + private final AMQConstant _errorCode; + private final IncomingMessage _message; + private final String _description; + + public WriteReturnAction(AMQConstant errorCode, + String description, + IncomingMessage message) + { + _errorCode = errorCode; + _message = message; + _description = description; + } + + public void postCommit() + { + try + { + _session.getProtocolOutputConverter().writeReturn(_message.getMessagePublishInfo(), + _message.getContentHeader(), + _message, + _channelId, + _errorCode.getCode(), + new AMQShortString(_description)); + } + catch (AMQException e) + { + //TODO + throw new RuntimeException(e); + } + + } + + public void onRollback() + { + //To change body of implemented methods use File | Settings | File Templates. + } + } + + + public LogActor getLogActor() + { + return _actor; + } + + public void block(AMQQueue queue) + { + if(_blockingQueues.putIfAbsent(queue, Boolean.TRUE) == null) + { + + if(_blocking.compareAndSet(false,true)) + { + _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED(queue.getNameShortString().toString())); + flow(false); + } + } + } + + public void unblock(AMQQueue queue) + { + if(_blockingQueues.remove(queue)) + { + if(_blocking.compareAndSet(true,false)) + { + _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED()); + + flow(true); + } + } + } + + private void flow(boolean flow) + { + MethodRegistry methodRegistry = _session.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createChannelFlowBody(flow); + _session.writeFrame(responseBody.generateFrame(_channelId)); + } + + public boolean getBlocking() + { + return _blocking.get(); + } + + public VirtualHost getVirtualHost() + { + return getProtocolSession().getVirtualHost(); + } + + + public ConfiguredObject getParent() + { + return getVirtualHost(); + } + + public SessionConfigType getConfigType() + { + return SessionConfigType.getInstance(); + } + + public int getChannel() + { + return getChannelId(); + } + + public boolean isAttached() + { + return true; + } + + public long getDetachedLifespan() + { + return 0; + } + + public ConnectionConfig getConnectionConfig() + { + return (AMQProtocolEngine)getProtocolSession(); + } + + public Long getExpiryTime() + { + return null; + } + + public Long getMaxClientRate() + { + return null; + } + + public boolean isDurable() + { + return false; + } + + public UUID getId() + { + return _id; + } + + public String getSessionName() + { + return getConnectionConfig().getAddress() + "/" + getChannelId(); + } + + public long getCreateTime() + { + return _createTime; + } + + public void mgmtClose() throws AMQException + { + _session.mgmtCloseChannel(_channelId); + } + + public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException + { + if (inTransaction()) + { + long currentTime = System.currentTimeMillis(); + long openTime = currentTime - _transaction.getTransactionStartTime(); + long idleTime = currentTime - _txnUpdateTime.get(); + + // Log a warning on idle or open transactions + if (idleWarn > 0L && idleTime > idleWarn) + { + CurrentActor.get().message(_logSubject, ChannelMessages.IDLE_TXN(idleTime)); + _logger.warn("IDLE TRANSACTION ALERT " + _logSubject.toString() + " " + idleTime + " ms"); + } + else if (openWarn > 0L && openTime > openWarn) + { + CurrentActor.get().message(_logSubject, ChannelMessages.OPEN_TXN(openTime)); + _logger.warn("OPEN TRANSACTION ALERT " + _logSubject.toString() + " " + openTime + " ms"); + } + + // Close connection for idle or open transactions that have timed out + if (idleClose > 0L && idleTime > idleClose) + { + getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out"); + } + else if (openClose > 0L && openTime > openClose) + { + getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out"); + } + } + } +} |