/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package org.apache.qpid.server; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl; import org.apache.qpid.server.configuration.ConfigStore; import org.apache.qpid.server.configuration.ConfiguredObject; import org.apache.qpid.server.configuration.ConnectionConfig; import org.apache.qpid.server.configuration.SessionConfig; import org.apache.qpid.server.configuration.SessionConfigType; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.flow.Pre0_10CreditManager; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.AMQPChannelActor; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.messages.ExchangeMessages; import org.apache.qpid.server.logging.subjects.ChannelLogSubject; import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.message.MessageMetaData; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.protocol.AMQProtocolEngine; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.InboundMessageAdapter; import org.apache.qpid.server.queue.IncomingMessage; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.subscription.ClientDeliveryMethod; import org.apache.qpid.server.subscription.RecordDeliveryMethod; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; import org.apache.qpid.server.subscription.SubscriptionImpl; import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.TransportException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoCommitTransaction.FutureRecorder { public static final int DEFAULT_PREFETCH = 4096; private static final Logger _logger = Logger.getLogger(AMQChannel.class); private static final boolean MSG_AUTH = ApplicationRegistry.getInstance().getConfiguration().getMsgAuth(); private final int _channelId; private final Pre0_10CreditManager _creditManager = new Pre0_10CreditManager(0l,0l); /** * The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that * value of this represents the last tag sent out */ private long _deliveryTag = 0; /** A channel has a default queue (the last declared) that is used when no queue name is explictily set */ private AMQQueue _defaultQueue; /** This tag is unique per subscription to a queue. The server returns this in response to a basic.consume request. */ private int _consumerTag; /** * The current message - which may be partial in the sense that not all frames have been received yet - which has * been received by this channel. As the frames are received the message gets updated and once all frames have been * received the message can then be routed. */ private IncomingMessage _currentMessage; /** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */ protected final Map _tag2SubscriptionMap = new HashMap(); private final MessageStore _messageStore; private final LinkedList _unfinishedCommandsQueue = new LinkedList(); private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500; private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH); // Set of messages being acknoweledged in the current transaction private SortedSet _acknowledgedMessages = new TreeSet(); private final AtomicBoolean _suspended = new AtomicBoolean(false); private ServerTransaction _transaction; private final AtomicLong _txnStarts = new AtomicLong(0); private final AtomicLong _txnCommits = new AtomicLong(0); private final AtomicLong _txnRejects = new AtomicLong(0); private final AtomicLong _txnCount = new AtomicLong(0); private final AtomicLong _txnUpdateTime = new AtomicLong(0); private final AMQProtocolSession _session; private AtomicBoolean _closing = new AtomicBoolean(false); private final ConcurrentMap _blockingQueues = new ConcurrentHashMap(); private final AtomicBoolean _blocking = new AtomicBoolean(false); private LogActor _actor; private LogSubject _logSubject; private volatile boolean _rollingBack; private static final Runnable NULL_TASK = new Runnable() { public void run() {} }; private List _resendList = new ArrayList(); private static final AMQShortString IMMEDIATE_DELIVERY_REPLY_TEXT = new AMQShortString("Immediate delivery is not possible."); private final UUID _id; private long _createTime = System.currentTimeMillis(); private final ClientDeliveryMethod _clientDeliveryMethod; public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore) throws AMQException { _session = session; _channelId = channelId; _actor = new AMQPChannelActor(this, session.getLogActor().getRootMessageLogger()); _logSubject = new ChannelLogSubject(this); _id = getConfigStore().createId(); _actor.message(ChannelMessages.CREATE()); getConfigStore().addConfiguredObject(this); _messageStore = messageStore; // by default the session is non-transactional _transaction = new AsyncAutoCommitTransaction(_messageStore, this); _clientDeliveryMethod = session.createDeliveryMethod(_channelId); } public ConfigStore getConfigStore() { return getVirtualHost().getConfigStore(); } /** Sets this channel to be part of a local transaction */ public void setLocalTransactional() { _transaction = new LocalTransaction(_messageStore); _txnStarts.incrementAndGet(); } public boolean isTransactional() { return _transaction.isTransactional(); } public void receivedComplete() { sync(); } public boolean inTransaction() { return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0; } private void incrementOutstandingTxnsIfNecessary() { if(isTransactional()) { //There can currently only be at most one outstanding transaction //due to only having LocalTransaction support. Set value to 1 if 0. _txnCount.compareAndSet(0,1); } } private void decrementOutstandingTxnsIfNecessary() { if(isTransactional()) { //There can currently only be at most one outstanding transaction //due to only having LocalTransaction support. Set value to 0 if 1. _txnCount.compareAndSet(1,0); } } public Long getTxnStarts() { return _txnStarts.get(); } public Long getTxnCommits() { return _txnCommits.get(); } public Long getTxnRejects() { return _txnRejects.get(); } public Long getTxnCount() { return _txnCount.get(); } public int getChannelId() { return _channelId; } public void setPublishFrame(MessagePublishInfo info, final Exchange e) throws AMQSecurityException { String routingKey = info.getRoutingKey() == null ? null : info.getRoutingKey().asString(); if (!getVirtualHost().getSecurityManager().authorisePublish(info.isImmediate(), routingKey, e.getName())) { throw new AMQSecurityException("Permission denied: " + e.getName()); } _currentMessage = new IncomingMessage(info); _currentMessage.setExchange(e); } public void publishContentHeader(ContentHeaderBody contentHeaderBody) throws AMQException { if (_currentMessage == null) { throw new AMQException("Received content header without previously receiving a BasicPublish frame"); } else { if (_logger.isDebugEnabled()) { _logger.debug("Content header received on channel " + _channelId); } _currentMessage.setContentHeaderBody(contentHeaderBody); _currentMessage.setExpiration(); MessageMetaData mmd = _currentMessage.headersReceived(getProtocolSession().getLastReceivedTime()); final StoredMessage handle = _messageStore.addMessage(mmd); _currentMessage.setStoredMessage(handle); routeCurrentMessage(); _transaction.addPostTransactionAction(new ServerTransaction.Action() { public void postCommit() { } public void onRollback() { handle.remove(); } }); deliverCurrentMessageIfComplete(); } } private void deliverCurrentMessageIfComplete() throws AMQException { // check and deliver if header says body length is zero if (_currentMessage.allContentReceived()) { try { final List destinationQueues = _currentMessage.getDestinationQueues(); if(!checkMessageUserId(_currentMessage.getContentHeader())) { _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.ACCESS_REFUSED, "Access Refused", _currentMessage)); } else { if(destinationQueues == null || destinationQueues.isEmpty()) { if (_currentMessage.isMandatory() || _currentMessage.isImmediate()) { _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message", _currentMessage)); } else { _actor.message(ExchangeMessages.DISCARDMSG(_currentMessage.getExchange().asString(), _currentMessage.getRoutingKey())); } } else { _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues), getProtocolSession().getLastReceivedTime()); incrementOutstandingTxnsIfNecessary(); updateTransactionalActivity(); } } _currentMessage.getStoredMessage().flushToStore(); } finally { long bodySize = _currentMessage.getSize(); long timestamp = ((BasicContentHeaderProperties) _currentMessage.getContentHeader().getProperties()).getTimestamp(); _session.registerMessageReceived(bodySize, timestamp); _currentMessage = null; } } } public void publishContentBody(ContentBody contentBody) throws AMQException { if (_currentMessage == null) { throw new AMQException("Received content body without previously receiving a JmsPublishBody"); } if (_logger.isDebugEnabled()) { _logger.debug(debugIdentity() + "Content body received on channel " + _channelId); } try { // returns true iff the message was delivered (i.e. if all data was // received final ContentChunk contentChunk = _session.getMethodRegistry().getProtocolVersionMethodConverter().convertToContentChunk(contentBody); _currentMessage.addContentBodyFrame(contentChunk); deliverCurrentMessageIfComplete(); } catch (AMQException e) { // we want to make sure we don't keep a reference to the message in the // event of an error _currentMessage = null; throw e; } catch (RuntimeException e) { // we want to make sure we don't keep a reference to the message in the // event of an error _currentMessage = null; throw e; } } protected void routeCurrentMessage() throws AMQException { _currentMessage.route(); } public long getNextDeliveryTag() { return ++_deliveryTag; } public int getNextConsumerTag() { return ++_consumerTag; } public Subscription getSubscription(AMQShortString subscription) { return _tag2SubscriptionMap.get(subscription); } /** * Subscribe to a queue. We register all subscriptions in the channel so that if the channel is closed we can clean * up all subscriptions, even if the client does not explicitly unsubscribe from all queues. * * @param tag the tag chosen by the client (if null, server will generate one) * @param queue the queue to subscribe to * @param acks Are acks enabled for this subscriber * @param filters Filters to apply to this subscriber * * @param noLocal Flag stopping own messages being receivied. * @param exclusive Flag requesting exclusive access to the queue * @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests * * @throws AMQException if something goes wrong */ public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, boolean acks, FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException { if (tag == null) { tag = new AMQShortString("sgen_" + getNextConsumerTag()); } if (_tag2SubscriptionMap.containsKey(tag)) { throw new AMQException("Consumer already exists with same tag: " + tag); } Subscription subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(_channelId, _session, tag, acks, filters, noLocal, _creditManager); // So to keep things straight we put before the call and catch all exceptions from the register and tidy up. // We add before we register as the Async Delivery process may AutoClose the subscriber // so calling _cT2QM.remove before we have done put which was after the register succeeded. // So to keep things straight we put before the call and catch all exceptions from the register and tidy up. _tag2SubscriptionMap.put(tag, subscription); try { queue.registerSubscription(subscription, exclusive); } catch (AMQException e) { _tag2SubscriptionMap.remove(tag); throw e; } catch (RuntimeException e) { _tag2SubscriptionMap.remove(tag); throw e; } return tag; } /** * Unsubscribe a consumer from a queue. * @param consumerTag * @return true if the consumerTag had a mapped queue that could be unregistered. * @throws AMQException */ public boolean unsubscribeConsumer(AMQShortString consumerTag) throws AMQException { Subscription sub = _tag2SubscriptionMap.remove(consumerTag); if (sub != null) { try { sub.getSendLock(); sub.getQueue().unregisterSubscription(sub); } finally { sub.releaseSendLock(); } return true; } else { _logger.warn("Attempt to unsubscribe consumer with tag '"+consumerTag+"' which is not registered."); } return false; } /** * Called from the protocol session to close this channel and clean up. T * * @throws AMQException if there is an error during closure */ public void close() throws AMQException { if(!_closing.compareAndSet(false, true)) { //Channel is already closing return; } CurrentActor.get().message(_logSubject, ChannelMessages.CLOSE()); unsubscribeAllConsumers(); _transaction.rollback(); try { requeue(); } catch (AMQException e) { _logger.error("Caught AMQException whilst attempting to requeue:" + e); } catch (TransportException e) { _logger.error("Caught TransportException whilst attempting to requeue:" + e); } getConfigStore().removeConfiguredObject(this); } private void unsubscribeAllConsumers() throws AMQException { if (_logger.isInfoEnabled()) { if (!_tag2SubscriptionMap.isEmpty()) { _logger.info("Unsubscribing all consumers on channel " + toString()); } else { _logger.info("No consumers to unsubscribe on channel " + toString()); } } for (Map.Entry me : _tag2SubscriptionMap.entrySet()) { if (_logger.isInfoEnabled()) { _logger.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString()); } Subscription sub = me.getValue(); try { sub.getSendLock(); sub.getQueue().unregisterSubscription(sub); } finally { sub.releaseSendLock(); } } _tag2SubscriptionMap.clear(); } /** * Add a message to the channel-based list of unacknowledged messages * * @param entry the record of the message on the queue that was delivered * @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of the * delivery tag) * @param subscription The consumer that is to acknowledge this message. */ public void addUnacknowledgedMessage(QueueEntry entry, long deliveryTag, Subscription subscription) { if (_logger.isDebugEnabled()) { if (entry.getQueue() == null) { _logger.debug("Adding unacked message with a null queue:" + entry); } else { if (_logger.isDebugEnabled()) { _logger.debug(debugIdentity() + " Adding unacked message(" + entry.getMessage().toString() + " DT:" + deliveryTag + ") with a queue(" + entry.getQueue() + ") for " + subscription); } } } _unacknowledgedMessageMap.add(deliveryTag, entry); } private final String id = "(" + System.identityHashCode(this) + ")"; public String debugIdentity() { return _channelId + id; } /** * Called to attempt re-delivery all outstanding unacknowledged messages on the channel. May result in delivery to * this same channel or to other subscribers. * * @throws org.apache.qpid.AMQException if the requeue fails */ public void requeue() throws AMQException { // we must create a new map since all the messages will get a new delivery tag when they are redelivered Collection messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages(); if (!messagesToBeDelivered.isEmpty()) { if (_logger.isInfoEnabled()) { _logger.info("Requeuing " + messagesToBeDelivered.size() + " unacked messages. for " + toString()); } } for (QueueEntry unacked : messagesToBeDelivered) { if (!unacked.isQueueDeleted()) { // Mark message redelivered unacked.setRedelivered(); // Ensure message is released for redelivery unacked.release(); } else { unacked.discard(); } } } /** * Requeue a single message * * @param deliveryTag The message to requeue * * @throws AMQException If something goes wrong. */ public void requeue(long deliveryTag) throws AMQException { QueueEntry unacked = _unacknowledgedMessageMap.remove(deliveryTag); if (unacked != null) { // Mark message redelivered unacked.setRedelivered(); // Ensure message is released for redelivery if (!unacked.isQueueDeleted()) { // Ensure message is released for redelivery unacked.release(); } else { _logger.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message."); unacked.discard(); } } else { _logger.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists." + _unacknowledgedMessageMap.size()); } } public boolean isMaxDeliveryCountEnabled(final long deliveryTag) { final QueueEntry queueEntry = _unacknowledgedMessageMap.get(deliveryTag); if (queueEntry != null) { final int maximumDeliveryCount = queueEntry.getQueue().getMaximumDeliveryCount(); return maximumDeliveryCount > 0; } return false; } public boolean isDeliveredTooManyTimes(final long deliveryTag) { final QueueEntry queueEntry = _unacknowledgedMessageMap.get(deliveryTag); if (queueEntry != null) { final int maximumDeliveryCount = queueEntry.getQueue().getMaximumDeliveryCount(); final int numDeliveries = queueEntry.getDeliveryCount(); return maximumDeliveryCount != 0 && numDeliveries >= maximumDeliveryCount; } return false; } /** * Called to resend all outstanding unacknowledged messages to this same channel. * * @param requeue Are the messages to be requeued or dropped. * * @throws AMQException When something goes wrong. */ public void resend(final boolean requeue) throws AMQException { final Map msgToRequeue = new LinkedHashMap(); final Map msgToResend = new LinkedHashMap(); if (_logger.isDebugEnabled()) { _logger.debug("unacked map Size:" + _unacknowledgedMessageMap.size()); } // Process the Unacked-Map. // Marking messages who still have a consumer for to be resent // and those that don't to be requeued. _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue, msgToResend, requeue, _messageStore)); // Process Messages to Resend if (_logger.isDebugEnabled()) { if (!msgToResend.isEmpty()) { _logger.debug("Preparing (" + msgToResend.size() + ") message to resend."); } else { _logger.debug("No message to resend."); } } for (Map.Entry entry : msgToResend.entrySet()) { QueueEntry message = entry.getValue(); long deliveryTag = entry.getKey(); //Amend the delivery counter as the client hasn't seen these messages yet. message.decrementDeliveryCount(); AMQQueue queue = message.getQueue(); // Our Java Client will always suspend the channel when resending! // If the client has requested the messages be resent then it is // their responsibility to ensure that thay are capable of receiving them // i.e. The channel hasn't been server side suspended. // if (isSuspended()) // { // _logger.info("Channel is suspended so requeuing"); // //move this message to requeue // msgToRequeue.add(message); // } // else // { // release to allow it to be delivered // Without any details from the client about what has been processed we have to mark // all messages in the unacked map as redelivered. message.setRedelivered(); Subscription sub = message.getDeliveredSubscription(); if (sub != null) { if(!queue.resend(message,sub)) { msgToRequeue.put(deliveryTag, message); } } else { if (_logger.isInfoEnabled()) { _logger.info("DeliveredSubscription not recorded so just requeueing(" + message.toString() + ")to prevent loss"); } // move this message to requeue msgToRequeue.put(deliveryTag, message); } } // for all messages // } else !isSuspend if (_logger.isInfoEnabled()) { if (!msgToRequeue.isEmpty()) { _logger.info("Preparing (" + msgToRequeue.size() + ") message to requeue to."); } } // Process Messages to Requeue at the front of the queue for (Map.Entry entry : msgToRequeue.entrySet()) { QueueEntry message = entry.getValue(); long deliveryTag = entry.getKey(); //Amend the delivery counter as the client hasn't seen these messages yet. message.decrementDeliveryCount(); _unacknowledgedMessageMap.remove(deliveryTag); message.setRedelivered(); message.release(); } } /** * Acknowledge one or more messages. * * @param deliveryTag the last delivery tag * @param multiple if true will acknowledge all messages up to an including the delivery tag. if false only * acknowledges the single message specified by the delivery tag * * @throws AMQException if the delivery tag is unknown (e.g. not outstanding) on this channel */ public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException { Collection ackedMessages = getAckedMessages(deliveryTag, multiple); _transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages)); updateTransactionalActivity(); } private Collection getAckedMessages(long deliveryTag, boolean multiple) { return _unacknowledgedMessageMap.acknowledge(deliveryTag, multiple); } /** * Used only for testing purposes. * * @return the map of unacknowledged messages */ public UnacknowledgedMessageMap getUnacknowledgedMessageMap() { return _unacknowledgedMessageMap; } /** * Called from the ChannelFlowHandler to suspend this Channel * @param suspended boolean, should this Channel be suspended */ public void setSuspended(boolean suspended) { boolean wasSuspended = _suspended.getAndSet(suspended); if (wasSuspended != suspended) { // Log Flow Started before we start the subscriptions if (!suspended) { _actor.message(_logSubject, ChannelMessages.FLOW("Started")); } // This section takes two different approaches to perform to perform // the same function. Ensuring that the Subscription has taken note // of the change in Channel State // Here we have become unsuspended and so we ask each the queue to // perform an Async delivery for each of the subscriptions in this // Channel. The alternative would be to ensure that the subscription // had received the change in suspension state. That way the logic // behind decieding to start an async delivery was located with the // Subscription. if (wasSuspended) { // may need to deliver queued messages for (Subscription s : _tag2SubscriptionMap.values()) { s.getQueue().deliverAsync(s); } } // Here we have become suspended so we need to ensure that each of // the Subscriptions has noticed this change so that we can be sure // they are not still sending messages. Again the code here is a // very simplistic approach to ensure that the change of suspension // has been noticed by each of the Subscriptions. Unlike the above // case we don't actually need to do anything else. if (!wasSuspended) { // may need to deliver queued messages for (Subscription s : _tag2SubscriptionMap.values()) { try { s.getSendLock(); } finally { s.releaseSendLock(); } } } // Log Suspension only after we have confirmed all suspensions are // stopped. if (suspended) { _actor.message(_logSubject, ChannelMessages.FLOW("Stopped")); } } } public boolean isSuspended() { return _suspended.get() || _closing.get() || _session.isClosing(); } public void commit() throws AMQException { commit(null); } public void commit(Runnable immediateAction) throws AMQException { if (!isTransactional()) { throw new AMQException("Fatal error: commit called on non-transactional channel"); } _transaction.commit(immediateAction); _txnCommits.incrementAndGet(); _txnStarts.incrementAndGet(); decrementOutstandingTxnsIfNecessary(); } public void rollback() throws AMQException { rollback(NULL_TASK); } public void rollback(Runnable postRollbackTask) throws AMQException { if (!isTransactional()) { throw new AMQException("Fatal error: commit called on non-transactional channel"); } // stop all subscriptions _rollingBack = true; boolean requiresSuspend = _suspended.compareAndSet(false,true); // ensure all subscriptions have seen the change to the channel state for(Subscription sub : _tag2SubscriptionMap.values()) { sub.getSendLock(); sub.releaseSendLock(); } try { _transaction.rollback(); } finally { _rollingBack = false; _txnRejects.incrementAndGet(); _txnStarts.incrementAndGet(); decrementOutstandingTxnsIfNecessary(); } postRollbackTask.run(); for(QueueEntry entry : _resendList) { Subscription sub = entry.getDeliveredSubscription(); if(sub == null || sub.isClosed()) { entry.release(); } else { sub.getQueue().resend(entry, sub); } } _resendList.clear(); if(requiresSuspend) { _suspended.set(false); for(Subscription sub : _tag2SubscriptionMap.values()) { sub.getQueue().deliverAsync(sub); } } } /** * Update last transaction activity timestamp */ private void updateTransactionalActivity() { if (isTransactional()) { _txnUpdateTime.set(getProtocolSession().getLastReceivedTime()); } } public String toString() { return "["+_session.toString()+":"+_channelId+"]"; } public void setDefaultQueue(AMQQueue queue) { _defaultQueue = queue; } public AMQQueue getDefaultQueue() { return _defaultQueue; } public boolean isClosing() { return _closing.get(); } public AMQProtocolSession getProtocolSession() { return _session; } public FlowCreditManager getCreditManager() { return _creditManager; } public void setCredit(final long prefetchSize, final int prefetchCount) { _actor.message(ChannelMessages.PREFETCH_SIZE(prefetchSize, prefetchCount)); _creditManager.setCreditLimits(prefetchSize, prefetchCount); } public MessageStore getMessageStore() { return _messageStore; } public ClientDeliveryMethod getClientDeliveryMethod() { return _clientDeliveryMethod; } private final RecordDeliveryMethod _recordDeliveryMethod = new RecordDeliveryMethod() { public void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag) { addUnacknowledgedMessage(entry, deliveryTag, sub); } }; public RecordDeliveryMethod getRecordDeliveryMethod() { return _recordDeliveryMethod; } private AMQMessage createAMQMessage(IncomingMessage incomingMessage) throws AMQException { AMQMessage message = new AMQMessage(incomingMessage.getStoredMessage()); message.setExpiration(incomingMessage.getExpiration()); message.setClientIdentifier(_session); return message; } private boolean checkMessageUserId(ContentHeaderBody header) { AMQShortString userID = header.getProperties() instanceof BasicContentHeaderProperties ? ((BasicContentHeaderProperties) header.getProperties()).getUserId() : null; return (!MSG_AUTH || _session.getAuthorizedPrincipal().getName().equals(userID == null? "" : userID.toString())); } public Object getID() { return _channelId; } public AMQConnectionModel getConnectionModel() { return _session; } public String getClientID() { return String.valueOf(_session.getContextKey()); } public LogSubject getLogSubject() { return _logSubject; } private class MessageDeliveryAction implements ServerTransaction.Action { private IncomingMessage _incommingMessage; private List _destinationQueues; public MessageDeliveryAction(IncomingMessage currentMessage, List destinationQueues) { _incommingMessage = currentMessage; _destinationQueues = destinationQueues; } public void postCommit() { try { final boolean immediate = _incommingMessage.isImmediate(); final AMQMessage amqMessage = createAMQMessage(_incommingMessage); MessageReference ref = amqMessage.newReference(); for(int i = 0; i < _destinationQueues.size(); i++) { BaseQueue queue = _destinationQueues.get(i); BaseQueue.PostEnqueueAction action; if(immediate) { action = new ImmediateAction(queue); } else { action = null; } queue.enqueue(amqMessage, isTransactional(), action); if(queue instanceof AMQQueue) { ((AMQQueue)queue).checkCapacity(AMQChannel.this); } } _incommingMessage.getStoredMessage().flushToStore(); ref.release(); } catch (AMQException e) { // TODO throw new RuntimeException(e); } } public void onRollback() { // Maybe keep track of entries that were created and then delete them here in case of failure // to in memory enqueue } private class ImmediateAction implements BaseQueue.PostEnqueueAction { private final BaseQueue _queue; public ImmediateAction(BaseQueue queue) { _queue = queue; } public void onEnqueue(QueueEntry entry) { if (!entry.getDeliveredToConsumer() && entry.acquire()) { ServerTransaction txn = new LocalTransaction(_messageStore); Collection entries = new ArrayList(1); entries.add(entry); final AMQMessage message = (AMQMessage) entry.getMessage(); txn.dequeue(_queue, entry.getMessage(), new MessageAcknowledgeAction(entries) { @Override public void postCommit() { try { final ProtocolOutputConverter outputConverter = _session.getProtocolOutputConverter(); outputConverter.writeReturn(message.getMessagePublishInfo(), message.getContentHeaderBody(), message, _channelId, AMQConstant.NO_CONSUMERS.getCode(), IMMEDIATE_DELIVERY_REPLY_TEXT); } catch (AMQException e) { throw new RuntimeException(e); } super.postCommit(); } } ); txn.commit(); } } } } private class MessageAcknowledgeAction implements ServerTransaction.Action { private final Collection _ackedMessages; public MessageAcknowledgeAction(Collection ackedMessages) { _ackedMessages = ackedMessages; } public void postCommit() { try { for(QueueEntry entry : _ackedMessages) { entry.discard(); } } finally { _acknowledgedMessages.clear(); } } public void onRollback() { // explicit rollbacks resend the message after the rollback-ok is sent if(_rollingBack) { _resendList.addAll(_ackedMessages); } else { try { for(QueueEntry entry : _ackedMessages) { entry.release(); } } finally { _acknowledgedMessages.clear(); } } } } private class WriteReturnAction implements ServerTransaction.Action { private final AMQConstant _errorCode; private final IncomingMessage _message; private final String _description; public WriteReturnAction(AMQConstant errorCode, String description, IncomingMessage message) { _errorCode = errorCode; _message = message; _description = description; } public void postCommit() { try { _session.getProtocolOutputConverter().writeReturn(_message.getMessagePublishInfo(), _message.getContentHeader(), _message, _channelId, _errorCode.getCode(), new AMQShortString(_description)); } catch (AMQException e) { //TODO throw new RuntimeException(e); } } public void onRollback() { //To change body of implemented methods use File | Settings | File Templates. } } public LogActor getLogActor() { return _actor; } public void block(AMQQueue queue) { if(_blockingQueues.putIfAbsent(queue, Boolean.TRUE) == null) { if(_blocking.compareAndSet(false,true)) { _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED(queue.getNameShortString().toString())); flow(false); } } } public void unblock(AMQQueue queue) { if(_blockingQueues.remove(queue)) { if(_blocking.compareAndSet(true,false)) { _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED()); flow(true); } } } private void flow(boolean flow) { MethodRegistry methodRegistry = _session.getMethodRegistry(); AMQMethodBody responseBody = methodRegistry.createChannelFlowBody(flow); _session.writeFrame(responseBody.generateFrame(_channelId)); } public boolean getBlocking() { return _blocking.get(); } public VirtualHost getVirtualHost() { return getProtocolSession().getVirtualHost(); } public ConfiguredObject getParent() { return getVirtualHost(); } public SessionConfigType getConfigType() { return SessionConfigType.getInstance(); } public int getChannel() { return getChannelId(); } public boolean isAttached() { return true; } public long getDetachedLifespan() { return 0; } public ConnectionConfig getConnectionConfig() { return (AMQProtocolEngine)getProtocolSession(); } public Long getExpiryTime() { return null; } public Long getMaxClientRate() { return null; } public boolean isDurable() { return false; } public UUID getId() { return _id; } public String getSessionName() { return getConnectionConfig().getAddress() + "/" + getChannelId(); } public long getCreateTime() { return _createTime; } public void mgmtClose() throws AMQException { _session.mgmtCloseChannel(_channelId); } public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException { if (inTransaction()) { long currentTime = System.currentTimeMillis(); long openTime = currentTime - _transaction.getTransactionStartTime(); long idleTime = currentTime - _txnUpdateTime.get(); // Log a warning on idle or open transactions if (idleWarn > 0L && idleTime > idleWarn) { CurrentActor.get().message(_logSubject, ChannelMessages.IDLE_TXN(idleTime)); _logger.warn("IDLE TRANSACTION ALERT " + _logSubject.toString() + " " + idleTime + " ms"); } else if (openWarn > 0L && openTime > openWarn) { CurrentActor.get().message(_logSubject, ChannelMessages.OPEN_TXN(openTime)); _logger.warn("OPEN TRANSACTION ALERT " + _logSubject.toString() + " " + openTime + " ms"); } // Close connection for idle or open transactions that have timed out if (idleClose > 0L && idleTime > idleClose) { getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out"); } else if (openClose > 0L && openTime > openClose) { getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out"); } } } public void deadLetter(long deliveryTag) throws AMQException { final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap(); final QueueEntry rejectedQueueEntry = unackedMap.get(deliveryTag); if (rejectedQueueEntry == null) { _logger.warn("No message found, unable to DLQ delivery tag: " + deliveryTag); return; } else { final ServerMessage msg = rejectedQueueEntry.getMessage(); final AMQQueue queue = rejectedQueueEntry.getQueue(); final Exchange altExchange = queue.getAlternateExchange(); unackedMap.remove(deliveryTag); if (altExchange == null) { _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag); _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey())); rejectedQueueEntry.discard(); return; } final InboundMessage m = new InboundMessageAdapter(rejectedQueueEntry); final List destinationQueues = altExchange.route(m); if (destinationQueues == null || destinationQueues.isEmpty()) { _logger.debug("Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: " + deliveryTag); _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName())); rejectedQueueEntry.discard(); return; } rejectedQueueEntry.routeToAlternate(); //output operational logging for each delivery post commit for (final BaseQueue destinationQueue : destinationQueues) { _actor.message(_logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), destinationQueue.getNameShortString().asString())); } } } public void recordFuture(final MessageStore.StoreFuture future, final ServerTransaction.Action action) { _unfinishedCommandsQueue.add(new AsyncCommand(future, action)); } public void completeAsyncCommands() { AsyncCommand cmd; while((cmd = _unfinishedCommandsQueue.peek()) != null && cmd.isReadyForCompletion()) { cmd.complete(); _unfinishedCommandsQueue.poll(); } while(_unfinishedCommandsQueue.size() > UNFINISHED_COMMAND_QUEUE_THRESHOLD) { cmd = _unfinishedCommandsQueue.poll(); cmd.awaitReadyForCompletion(); cmd.complete(); } } public void sync() { AsyncCommand cmd; while((cmd = _unfinishedCommandsQueue.poll()) != null) { cmd.awaitReadyForCompletion(); cmd.complete(); } } private static class AsyncCommand { private final MessageStore.StoreFuture _future; private ServerTransaction.Action _action; public AsyncCommand(final MessageStore.StoreFuture future, final ServerTransaction.Action action) { _future = future; _action = action; } void awaitReadyForCompletion() { _future.waitForCompletion(); } void complete() { if(!_future.isComplete()) { _future.waitForCompletion(); } _action.postCommit(); _action = null; } boolean isReadyForCompletion() { return _future.isComplete(); } } }