diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java | 909 |
1 files changed, 909 insertions, 0 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java new file mode 100644 index 0000000000..aa390d6c26 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -0,0 +1,909 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server; + +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.ack.UnacknowledgedMessageMap; +import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.exchange.NoRouteException; +import org.apache.qpid.server.flow.FlowCreditManager; +import org.apache.qpid.server.flow.Pre0_10CreditManager; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.IncomingMessage; +import org.apache.qpid.server.queue.MessageHandleFactory; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.queue.UnauthorizedAccessException; +import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; +import org.apache.qpid.server.subscription.ClientDeliveryMethod; +import org.apache.qpid.server.subscription.RecordDeliveryMethod; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.txn.LocalTransactionalContext; +import org.apache.qpid.server.txn.NonTransactionalContext; +import org.apache.qpid.server.txn.TransactionalContext; + +public class AMQChannel +{ + public static final int DEFAULT_PREFETCH = 5000; + + private static final Logger _log = Logger.getLogger(AMQChannel.class); + + private final int _channelId; + + + private final Pre0_10CreditManager _creditManager = new Pre0_10CreditManager(0l,0l); + + /** + * The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that + * value of this represents the <b>last</b> tag sent out + */ + private long _deliveryTag = 0; + + /** A channel has a default queue (the last declared) that is used when no queue name is explictily set */ + private AMQQueue _defaultQueue; + + /** This tag is unique per subscription to a queue. The server returns this in response to a basic.consume request. */ + private int _consumerTag; + + /** + * The current message - which may be partial in the sense that not all frames have been received yet - which has + * been received by this channel. As the frames are received the message gets updated and once all frames have been + * received the message can then be routed. + */ + private IncomingMessage _currentMessage; + + /** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */ + protected final Map<AMQShortString, Subscription> _tag2SubscriptionMap = new HashMap<AMQShortString, Subscription>(); + + private final MessageStore _messageStore; + + private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH); + + private final AtomicBoolean _suspended = new AtomicBoolean(false); + + private TransactionalContext _txnContext; + + /** + * A context used by the message store enabling it to track context for a given channel even across thread + * boundaries + */ + private final StoreContext _storeContext; + + private final List<RequiredDeliveryException> _returnMessages = new LinkedList<RequiredDeliveryException>(); + + private MessageHandleFactory _messageHandleFactory = new MessageHandleFactory(); + + // Why do we need this reference ? - ritchiem + private final AMQProtocolSession _session; + private boolean _closing; + + public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore) + throws AMQException + { + _session = session; + _channelId = channelId; + _storeContext = new StoreContext("Session: " + session.getClientIdentifier() + "; channel: " + channelId); + + + _messageStore = messageStore; + + // by default the session is non-transactional + _txnContext = new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages); + } + + /** Sets this channel to be part of a local transaction */ + public void setLocalTransactional() + { + _txnContext = new LocalTransactionalContext(this); + } + + public boolean isTransactional() + { + // this does not look great but there should only be one "non-transactional" + // transactional context, while there could be several transactional ones in + // theory + return !(_txnContext instanceof NonTransactionalContext); + } + + public int getChannelId() + { + return _channelId; + } + + public void setPublishFrame(MessagePublishInfo info, final Exchange e) throws AMQException + { + + _currentMessage = new IncomingMessage(_messageStore.getNewMessageId(), info, _txnContext, _session); + _currentMessage.setMessageStore(_messageStore); + _currentMessage.setExchange(e); + } + + public void publishContentHeader(ContentHeaderBody contentHeaderBody) + throws AMQException + { + if (_currentMessage == null) + { + throw new AMQException("Received content header without previously receiving a BasicPublish frame"); + } + else + { + if (_log.isDebugEnabled()) + { + _log.debug("Content header received on channel " + _channelId); + } + + _currentMessage.setContentHeaderBody(contentHeaderBody); + + _currentMessage.setExpiration(); + + routeCurrentMessage(); + + _currentMessage.routingComplete(_messageStore, _messageHandleFactory); + + deliverCurrentMessageIfComplete(); + + } + } + + private void deliverCurrentMessageIfComplete() + throws AMQException + { + // check and deliver if header says body length is zero + if (_currentMessage.allContentReceived()) + { + try + { + _currentMessage.deliverToQueues(); + } + catch (NoRouteException e) + { + _returnMessages.add(e); + } + catch(UnauthorizedAccessException ex) + { + _returnMessages.add(ex); + } + finally + { + // callback to allow the context to do any post message processing + // primary use is to allow message return processing in the non-tx case + _txnContext.messageProcessed(_session); + _currentMessage = null; + } + } + + } + + public void publishContentBody(ContentBody contentBody) throws AMQException + { + if (_currentMessage == null) + { + throw new AMQException("Received content body without previously receiving a JmsPublishBody"); + } + + if (_log.isDebugEnabled()) + { + _log.debug(debugIdentity() + "Content body received on channel " + _channelId); + } + + try + { + + // returns true iff the message was delivered (i.e. if all data was + // received + _currentMessage.addContentBodyFrame( + _session.getMethodRegistry().getProtocolVersionMethodConverter().convertToContentChunk( + contentBody)); + + deliverCurrentMessageIfComplete(); + } + catch (AMQException e) + { + // we want to make sure we don't keep a reference to the message in the + // event of an error + _currentMessage = null; + throw e; + } + } + + protected void routeCurrentMessage() throws AMQException + { + try + { + _currentMessage.route(); + } + catch (NoRouteException e) + { + //_currentMessage.incrementReference(); + _returnMessages.add(e); + } + } + + public long getNextDeliveryTag() + { + return ++_deliveryTag; + } + + public int getNextConsumerTag() + { + return ++_consumerTag; + } + + /** + * Subscribe to a queue. We register all subscriptions in the channel so that if the channel is closed we can clean + * up all subscriptions, even if the client does not explicitly unsubscribe from all queues. + * + * @param tag the tag chosen by the client (if null, server will generate one) + * @param queue the queue to subscribe to + * @param acks Are acks enabled for this subscriber + * @param filters Filters to apply to this subscriber + * + * @param noLocal Flag stopping own messages being receivied. + * @param exclusive Flag requesting exclusive access to the queue + * @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests + * + * @throws ConsumerTagNotUniqueException if the tag is not unique + * @throws AMQException if something goes wrong + */ + public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, boolean acks, + FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException, ConsumerTagNotUniqueException + { + if (tag == null) + { + tag = new AMQShortString("sgen_" + getNextConsumerTag()); + } + + if (_tag2SubscriptionMap.containsKey(tag)) + { + throw new ConsumerTagNotUniqueException(); + } + + Subscription subscription = + SubscriptionFactoryImpl.INSTANCE.createSubscription(_channelId, _session, tag, acks, filters, noLocal, _creditManager); + + + // So to keep things straight we put before the call and catch all exceptions from the register and tidy up. + // We add before we register as the Async Delivery process may AutoClose the subscriber + // so calling _cT2QM.remove before we have done put which was after the register succeeded. + // So to keep things straight we put before the call and catch all exceptions from the register and tidy up. + + _tag2SubscriptionMap.put(tag, subscription); + + try + { + queue.registerSubscription(subscription, exclusive); + } + catch (AMQException e) + { + _tag2SubscriptionMap.remove(tag); + throw e; + } + return tag; + } + + /** + * Unsubscribe a consumer from a queue. + * @param consumerTag + * @return true if the consumerTag had a mapped queue that could be unregistered. + * @throws AMQException + */ + public boolean unsubscribeConsumer(AMQShortString consumerTag) throws AMQException + { + + Subscription sub = _tag2SubscriptionMap.remove(consumerTag); + if (sub != null) + { + try + { + sub.getSendLock(); + sub.getQueue().unregisterSubscription(sub); + } + finally + { + sub.releaseSendLock(); + } + return true; + } + else + { + _log.warn("Attempt to unsubscribe consumer with tag '"+consumerTag+"' which is not registered."); + } + return false; + } + + /** + * Called from the protocol session to close this channel and clean up. T + * + * @throws AMQException if there is an error during closure + */ + public void close() throws AMQException + { + _txnContext.rollback(); + unsubscribeAllConsumers(); + try + { + requeue(); + } + catch (AMQException e) + { + _log.error("Caught AMQException whilst attempting to reque:" + e); + } + + setClosing(true); + } + + private void setClosing(boolean closing) + { + _closing = closing; + } + + private void unsubscribeAllConsumers() throws AMQException + { + if (_log.isInfoEnabled()) + { + if (!_tag2SubscriptionMap.isEmpty()) + { + _log.info("Unsubscribing all consumers on channel " + toString()); + } + else + { + _log.info("No consumers to unsubscribe on channel " + toString()); + } + } + + for (Map.Entry<AMQShortString, Subscription> me : _tag2SubscriptionMap.entrySet()) + { + if (_log.isInfoEnabled()) + { + _log.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString()); + } + + Subscription sub = me.getValue(); + + try + { + sub.getSendLock(); + sub.getQueue().unregisterSubscription(sub); + } + finally + { + sub.releaseSendLock(); + } + + } + + _tag2SubscriptionMap.clear(); + } + + /** + * Add a message to the channel-based list of unacknowledged messages + * + * @param entry the record of the message on the queue that was delivered + * @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of the + * delivery tag) + * @param subscription The consumer that is to acknowledge this message. + */ + public void addUnacknowledgedMessage(QueueEntry entry, long deliveryTag, Subscription subscription) + { + if (_log.isDebugEnabled()) + { + if (entry.getQueue() == null) + { + _log.debug("Adding unacked message with a null queue:" + entry.debugIdentity()); + } + else + { + if (_log.isDebugEnabled()) + { + _log.debug(debugIdentity() + " Adding unacked message(" + entry.getMessage().toString() + " DT:" + deliveryTag + + ") with a queue(" + entry.getQueue() + ") for " + subscription); + } + } + } + + _unacknowledgedMessageMap.add(deliveryTag, entry); + + } + + private final String id = "(" + System.identityHashCode(this) + ")"; + + public String debugIdentity() + { + return _channelId + id; + } + + /** + * Called to attempt re-delivery all outstanding unacknowledged messages on the channel. May result in delivery to + * this same channel or to other subscribers. + * + * @throws org.apache.qpid.AMQException if the requeue fails + */ + public void requeue() throws AMQException + { + // we must create a new map since all the messages will get a new delivery tag when they are redelivered + Collection<QueueEntry> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages(); + + // Deliver these messages out of the transaction as their delivery was never + // part of the transaction only the receive. + TransactionalContext deliveryContext = null; + + if (!messagesToBeDelivered.isEmpty()) + { + if (_log.isInfoEnabled()) + { + _log.info("Requeuing " + messagesToBeDelivered.size() + " unacked messages. for " + toString()); + } + + if (!(_txnContext instanceof NonTransactionalContext)) + { + + deliveryContext = + new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages); + } + else + { + deliveryContext = _txnContext; + } + } + + for (QueueEntry unacked : messagesToBeDelivered) + { + if (!unacked.isQueueDeleted()) + { + // Mark message redelivered + unacked.getMessage().setRedelivered(true); + + // Ensure message is released for redelivery + unacked.release(); + + // Deliver Message + deliveryContext.requeue(unacked); + + } + else + { + unacked.discard(_storeContext); + } + } + + } + + /** + * Requeue a single message + * + * @param deliveryTag The message to requeue + * + * @throws AMQException If something goes wrong. + */ + public void requeue(long deliveryTag) throws AMQException + { + QueueEntry unacked = _unacknowledgedMessageMap.remove(deliveryTag); + + if (unacked != null) + { + // Mark message redelivered + unacked.getMessage().setRedelivered(true); + + // Ensure message is released for redelivery + if (!unacked.isQueueDeleted()) + { + unacked.release(); + } + + + // Deliver these messages out of the transaction as their delivery was never + // part of the transaction only the receive. + TransactionalContext deliveryContext; + if (!(_txnContext instanceof NonTransactionalContext)) + { + + deliveryContext = + new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages); + + } + else + { + deliveryContext = _txnContext; + } + + if (!unacked.isQueueDeleted()) + { + // Redeliver the messages to the front of the queue + deliveryContext.requeue(unacked); + // Deliver increments the message count but we have already deliverted this once so don't increment it again + // this was because deliver did an increment changed this. + } + else + { + _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.getMessage().debugIdentity() + + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message."); + + unacked.discard(_storeContext); + } + } + else + { + _log.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists." + + _unacknowledgedMessageMap.size()); + + } + + } + + /** + * Called to resend all outstanding unacknowledged messages to this same channel. + * + * @param requeue Are the messages to be requeued or dropped. + * + * @throws AMQException When something goes wrong. + */ + public void resend(final boolean requeue) throws AMQException + { + + + final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>(); + final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>(); + + if (_log.isDebugEnabled()) + { + _log.debug("unacked map Size:" + _unacknowledgedMessageMap.size()); + } + + // Process the Unacked-Map. + // Marking messages who still have a consumer for to be resent + // and those that don't to be requeued. + _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue, + msgToResend, requeue, _storeContext)); + + + // Process Messages to Resend + if (_log.isDebugEnabled()) + { + if (!msgToResend.isEmpty()) + { + _log.debug("Preparing (" + msgToResend.size() + ") message to resend."); + } + else + { + _log.debug("No message to resend."); + } + } + + for (Map.Entry<Long, QueueEntry> entry : msgToResend.entrySet()) + { + QueueEntry message = entry.getValue(); + long deliveryTag = entry.getKey(); + + + + AMQMessage msg = message.getMessage(); + AMQQueue queue = message.getQueue(); + + // Our Java Client will always suspend the channel when resending! + // If the client has requested the messages be resent then it is + // their responsibility to ensure that thay are capable of receiving them + // i.e. The channel hasn't been server side suspended. + // if (isSuspended()) + // { + // _log.info("Channel is suspended so requeuing"); + // //move this message to requeue + // msgToRequeue.add(message); + // } + // else + // { + // release to allow it to be delivered + + // Without any details from the client about what has been processed we have to mark + // all messages in the unacked map as redelivered. + msg.setRedelivered(true); + + Subscription sub = message.getDeliveredSubscription(); + + if (sub != null) + { + + if(!queue.resend(message, sub)) + { + msgToRequeue.put(deliveryTag, message); + } + } + else + { + + if (_log.isInfoEnabled()) + { + _log.info("DeliveredSubscription not recorded so just requeueing(" + message.toString() + + ")to prevent loss"); + } + // move this message to requeue + msgToRequeue.put(deliveryTag, message); + } + } // for all messages + // } else !isSuspend + + if (_log.isInfoEnabled()) + { + if (!msgToRequeue.isEmpty()) + { + _log.info("Preparing (" + msgToRequeue.size() + ") message to requeue to."); + } + } + + // Deliver these messages out of the transaction as their delivery was never + // part of the transaction only the receive. + TransactionalContext deliveryContext; + if (!(_txnContext instanceof NonTransactionalContext)) + { + + deliveryContext = + new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages); + } + else + { + deliveryContext = _txnContext; + } + + // Process Messages to Requeue at the front of the queue + for (Map.Entry<Long, QueueEntry> entry : msgToRequeue.entrySet()) + { + QueueEntry message = entry.getValue(); + long deliveryTag = entry.getKey(); + + message.release(); + message.setRedelivered(true); + + deliveryContext.requeue(message); + + _unacknowledgedMessageMap.remove(deliveryTag); + } + } + + /** + * Callback indicating that a queue has been deleted. We must update the structure of unacknowledged messages to + * remove the queue reference and also decrement any message reference counts, without actually removing the item + * since we may get an ack for a delivery tag that was generated from the deleted queue. + * + * @param queue the queue that has been deleted + * + */ + /* public void queueDeleted(final AMQQueue queue) + { + try + { + _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() + { + public boolean callback(UnacknowledgedMessage message) + { + if (message.getQueue() == queue) + { + try + { + message.discard(_storeContext); + message.setQueueDeleted(true); + + } + catch (AMQException e) + { + _log.error( + "Error decrementing ref count on message " + message.getMessage().getMessageId() + ": " + e, e); + throw new RuntimeException(e); + } + } + + return false; + } + + public void visitComplete() + { + } + }); + } + catch (AMQException e) + { + _log.error("Unexpected Error while handling deletion of queue", e); + throw new RuntimeException(e); + } + + } +*/ + /** + * Acknowledge one or more messages. + * + * @param deliveryTag the last delivery tag + * @param multiple if true will acknowledge all messages up to an including the delivery tag. if false only + * acknowledges the single message specified by the delivery tag + * + * @throws AMQException if the delivery tag is unknown (e.g. not outstanding) on this channel + */ + public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException + { + _unacknowledgedMessageMap.acknowledgeMessage(deliveryTag, multiple, _txnContext); + } + + /** + * Used only for testing purposes. + * + * @return the map of unacknowledged messages + */ + public UnacknowledgedMessageMap getUnacknowledgedMessageMap() + { + return _unacknowledgedMessageMap; + } + + + public void setSuspended(boolean suspended) + { + + + boolean wasSuspended = _suspended.getAndSet(suspended); + if (wasSuspended != suspended) + { + if (wasSuspended) + { + // may need to deliver queued messages + for (Subscription s : _tag2SubscriptionMap.values()) + { + s.getQueue().deliverAsync(s); + } + } + } + } + + public boolean isSuspended() + { + return _suspended.get(); + } + + public void commit() throws AMQException + { + if (!isTransactional()) + { + throw new AMQException("Fatal error: commit called on non-transactional channel"); + } + + _txnContext.commit(); + } + + public void rollback() throws AMQException + { + _txnContext.rollback(); + } + + public String toString() + { + return "["+_session.toString()+":"+_channelId+"]"; + } + + public void setDefaultQueue(AMQQueue queue) + { + _defaultQueue = queue; + } + + public AMQQueue getDefaultQueue() + { + return _defaultQueue; + } + + public StoreContext getStoreContext() + { + return _storeContext; + } + + public void processReturns() throws AMQException + { + if (!_returnMessages.isEmpty()) + { + for (RequiredDeliveryException bouncedMessage : _returnMessages) + { + AMQMessage message = bouncedMessage.getAMQMessage(); + _session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(), + new AMQShortString(bouncedMessage.getMessage())); + + message.decrementReference(_storeContext); + } + + _returnMessages.clear(); + } + } + + + public TransactionalContext getTransactionalContext() + { + return _txnContext; + } + + public boolean isClosing() + { + return _closing; + } + + public AMQProtocolSession getProtocolSession() + { + return _session; + } + + public FlowCreditManager getCreditManager() + { + return _creditManager; + } + + public void setCredit(final long prefetchSize, final int prefetchCount) + { + _creditManager.setCreditLimits(prefetchSize, prefetchCount); + } + + public List<RequiredDeliveryException> getReturnMessages() + { + return _returnMessages; + } + + public MessageStore getMessageStore() + { + return _messageStore; + } + + private final ClientDeliveryMethod _clientDeliveryMethod = new ClientDeliveryMethod() + { + + public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag) + throws AMQException + { + getProtocolSession().getProtocolOutputConverter().writeDeliver(entry.getMessage(), getChannelId(), deliveryTag, sub.getConsumerTag()); + } + }; + + public ClientDeliveryMethod getClientDeliveryMethod() + { + return _clientDeliveryMethod; + } + + private final RecordDeliveryMethod _recordDeliveryMethod = new RecordDeliveryMethod() + { + + public void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag) + { + addUnacknowledgedMessage(entry, deliveryTag, sub); + } + }; + + public RecordDeliveryMethod getRecordDeliveryMethod() + { + return _recordDeliveryMethod; + } +} |