diff options
Diffstat (limited to 'trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java')
-rw-r--r-- | trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java | 1196 |
1 files changed, 0 insertions, 1196 deletions
diff --git a/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java deleted file mode 100644 index 333c1b9cac..0000000000 --- a/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ /dev/null @@ -1,1196 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server; - -import java.util.*; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.framing.*; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.framing.abstraction.ContentChunk; -import org.apache.qpid.server.ack.UnacknowledgedMessageMap; -import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.flow.FlowCreditManager; -import org.apache.qpid.server.flow.Pre0_10CreditManager; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.*; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; -import org.apache.qpid.server.subscription.ClientDeliveryMethod; -import org.apache.qpid.server.subscription.RecordDeliveryMethod; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.txn.*; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.message.AMQMessage; -import org.apache.qpid.server.message.MessageMetaData; -import org.apache.qpid.server.message.MessageReference; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.messages.ChannelMessages; -import org.apache.qpid.server.logging.subjects.ChannelLogSubject; -import org.apache.qpid.server.logging.actors.AMQPChannelActor; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.output.ProtocolOutputConverter; - -public class AMQChannel -{ - public static final int DEFAULT_PREFETCH = 5000; - - private static final Logger _logger = Logger.getLogger(AMQChannel.class); - - private static final boolean MSG_AUTH = - ApplicationRegistry.getInstance().getConfiguration().getMsgAuth(); - - - private final int _channelId; - - - private final Pre0_10CreditManager _creditManager = new Pre0_10CreditManager(0l,0l); - - /** - * The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that - * value of this represents the <b>last</b> tag sent out - */ - private long _deliveryTag = 0; - - /** A channel has a default queue (the last declared) that is used when no queue name is explictily set */ - private AMQQueue _defaultQueue; - - /** This tag is unique per subscription to a queue. The server returns this in response to a basic.consume request. */ - private int _consumerTag; - - /** - * The current message - which may be partial in the sense that not all frames have been received yet - which has - * been received by this channel. As the frames are received the message gets updated and once all frames have been - * received the message can then be routed. - */ - private IncomingMessage _currentMessage; - - /** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */ - protected final Map<AMQShortString, Subscription> _tag2SubscriptionMap = new HashMap<AMQShortString, Subscription>(); - - private final MessageStore _messageStore; - - private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH); - - // Set of messages being acknoweledged in the current transaction - private SortedSet<QueueEntry> _acknowledgedMessages = new TreeSet<QueueEntry>(); - - private final AtomicBoolean _suspended = new AtomicBoolean(false); - - private ServerTransaction _transaction; - - // Why do we need this reference ? - ritchiem - private final AMQProtocolSession _session; - private boolean _closing; - - private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new ConcurrentHashMap<AMQQueue, Boolean>(); - - private final AtomicBoolean _blocking = new AtomicBoolean(false); - - - private LogActor _actor; - private LogSubject _logSubject; - private volatile boolean _rollingBack; - - private static final Runnable NULL_TASK = new Runnable() { public void run() {} }; - private List<QueueEntry> _resendList = new ArrayList<QueueEntry>(); - private static final - AMQShortString IMMEDIATE_DELIVERY_REPLY_TEXT = new AMQShortString("Immediate delivery is not possible."); - - public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore) - throws AMQException - { - _session = session; - _channelId = channelId; - - _actor = new AMQPChannelActor(this, session.getLogActor().getRootMessageLogger()); - _logSubject = new ChannelLogSubject(this); - - _actor.message(ChannelMessages.CHN_CREATE()); - - _messageStore = messageStore; - - // by default the session is non-transactional - _transaction = new AutoCommitTransaction(_messageStore); - } - - /** Sets this channel to be part of a local transaction */ - public void setLocalTransactional() - { - _transaction = new LocalTransaction(_messageStore); - } - - public boolean isTransactional() - { - // this does not look great but there should only be one "non-transactional" - // transactional context, while there could be several transactional ones in - // theory - return !(_transaction instanceof AutoCommitTransaction); - } - - public int getChannelId() - { - return _channelId; - } - - public void setPublishFrame(MessagePublishInfo info, final Exchange e) throws AMQException - { - - _currentMessage = new IncomingMessage(info); - _currentMessage.setExchange(e); - } - - public void publishContentHeader(ContentHeaderBody contentHeaderBody) - throws AMQException - { - if (_currentMessage == null) - { - throw new AMQException("Received content header without previously receiving a BasicPublish frame"); - } - else - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Content header received on channel " + _channelId); - } - - _currentMessage.setContentHeaderBody(contentHeaderBody); - - _currentMessage.setExpiration(); - - - MessageMetaData mmd = _currentMessage.headersReceived(); - final StoredMessage<MessageMetaData> handle = _messageStore.addMessage(mmd); - _currentMessage.setStoredMessage(handle); - - routeCurrentMessage(); - - - _transaction.addPostCommitAction(new ServerTransaction.Action() - { - - public void postCommit() - { - } - - public void onRollback() - { - handle.remove(); - } - }); - - deliverCurrentMessageIfComplete(); - - } - } - - private void deliverCurrentMessageIfComplete() - throws AMQException - { - // check and deliver if header says body length is zero - if (_currentMessage.allContentReceived()) - { - try - { - - final ArrayList<AMQQueue> destinationQueues = _currentMessage.getDestinationQueues(); - - if(!checkMessageUserId(_currentMessage.getContentHeader())) - { - _transaction.addPostCommitAction(new WriteReturnAction(AMQConstant.ACCESS_REFUSED, "Access Refused", _currentMessage)); - } - else - { - if(destinationQueues == null || _currentMessage.getDestinationQueues().isEmpty()) - { - if (_currentMessage.isMandatory() || _currentMessage.isImmediate()) - { - _transaction.addPostCommitAction(new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message", _currentMessage)); - } - else - { - _logger.warn("MESSAGE DISCARDED: No routes for message - " + createAMQMessage(_currentMessage)); - } - - } - else - { - _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues)); - - } - } - } - finally - { - _currentMessage = null; - } - } - - } - - public void publishContentBody(ContentBody contentBody) throws AMQException - { - if (_currentMessage == null) - { - throw new AMQException("Received content body without previously receiving a JmsPublishBody"); - } - - if (_logger.isDebugEnabled()) - { - _logger.debug(debugIdentity() + "Content body received on channel " + _channelId); - } - - try - { - - // returns true iff the message was delivered (i.e. if all data was - // received - final ContentChunk contentChunk = - _session.getMethodRegistry().getProtocolVersionMethodConverter().convertToContentChunk(contentBody); - - _currentMessage.addContentBodyFrame(contentChunk); - - deliverCurrentMessageIfComplete(); - } - catch (AMQException e) - { - // we want to make sure we don't keep a reference to the message in the - // event of an error - _currentMessage = null; - throw e; - } - } - - protected void routeCurrentMessage() throws AMQException - { - _currentMessage.route(); - } - - public long getNextDeliveryTag() - { - return ++_deliveryTag; - } - - public int getNextConsumerTag() - { - return ++_consumerTag; - } - - - public Subscription getSubscription(AMQShortString subscription) - { - return _tag2SubscriptionMap.get(subscription); - } - - /** - * Subscribe to a queue. We register all subscriptions in the channel so that if the channel is closed we can clean - * up all subscriptions, even if the client does not explicitly unsubscribe from all queues. - * - * @param tag the tag chosen by the client (if null, server will generate one) - * @param queue the queue to subscribe to - * @param acks Are acks enabled for this subscriber - * @param filters Filters to apply to this subscriber - * - * @param noLocal Flag stopping own messages being receivied. - * @param exclusive Flag requesting exclusive access to the queue - * @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests - * - * @throws AMQException if something goes wrong - */ - public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, boolean acks, - FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException - { - if (tag == null) - { - tag = new AMQShortString("sgen_" + getNextConsumerTag()); - } - - if (_tag2SubscriptionMap.containsKey(tag)) - { - throw new AMQException("Consumer already exists with same tag: " + tag); - } - - Subscription subscription = - SubscriptionFactoryImpl.INSTANCE.createSubscription(_channelId, _session, tag, acks, filters, noLocal, _creditManager); - - - // So to keep things straight we put before the call and catch all exceptions from the register and tidy up. - // We add before we register as the Async Delivery process may AutoClose the subscriber - // so calling _cT2QM.remove before we have done put which was after the register succeeded. - // So to keep things straight we put before the call and catch all exceptions from the register and tidy up. - - _tag2SubscriptionMap.put(tag, subscription); - - try - { - queue.registerSubscription(subscription, exclusive); - } - catch (AMQException e) - { - _tag2SubscriptionMap.remove(tag); - throw e; - } - return tag; - } - - /** - * Unsubscribe a consumer from a queue. - * @param consumerTag - * @return true if the consumerTag had a mapped queue that could be unregistered. - * @throws AMQException - */ - public boolean unsubscribeConsumer(AMQShortString consumerTag) throws AMQException - { - - Subscription sub = _tag2SubscriptionMap.remove(consumerTag); - if (sub != null) - { - try - { - sub.getSendLock(); - sub.getQueue().unregisterSubscription(sub); - } - finally - { - sub.releaseSendLock(); - } - return true; - } - else - { - _logger.warn("Attempt to unsubscribe consumer with tag '"+consumerTag+"' which is not registered."); - } - return false; - } - - /** - * Called from the protocol session to close this channel and clean up. T - * - * @throws AMQException if there is an error during closure - */ - public void close() throws AMQException - { - setClosing(true); - - unsubscribeAllConsumers(); - _transaction.rollback(); - - try - { - requeue(); - } - catch (AMQException e) - { - _logger.error("Caught AMQException whilst attempting to reque:" + e); - } - - } - - private void setClosing(boolean closing) - { - _closing = closing; - - CurrentActor.get().message(_logSubject, ChannelMessages.CHN_CLOSE()); - } - - private void unsubscribeAllConsumers() throws AMQException - { - if (_logger.isInfoEnabled()) - { - if (!_tag2SubscriptionMap.isEmpty()) - { - _logger.info("Unsubscribing all consumers on channel " + toString()); - } - else - { - _logger.info("No consumers to unsubscribe on channel " + toString()); - } - } - - for (Map.Entry<AMQShortString, Subscription> me : _tag2SubscriptionMap.entrySet()) - { - if (_logger.isInfoEnabled()) - { - _logger.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString()); - } - - Subscription sub = me.getValue(); - - try - { - sub.getSendLock(); - sub.getQueue().unregisterSubscription(sub); - } - finally - { - sub.releaseSendLock(); - } - - } - - _tag2SubscriptionMap.clear(); - } - - /** - * Add a message to the channel-based list of unacknowledged messages - * - * @param entry the record of the message on the queue that was delivered - * @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of the - * delivery tag) - * @param subscription The consumer that is to acknowledge this message. - */ - public void addUnacknowledgedMessage(QueueEntry entry, long deliveryTag, Subscription subscription) - { - if (_logger.isDebugEnabled()) - { - if (entry.getQueue() == null) - { - _logger.debug("Adding unacked message with a null queue:" + entry); - } - else - { - if (_logger.isDebugEnabled()) - { - _logger.debug(debugIdentity() + " Adding unacked message(" + entry.getMessage().toString() + " DT:" + deliveryTag - + ") with a queue(" + entry.getQueue() + ") for " + subscription); - } - } - } - - _unacknowledgedMessageMap.add(deliveryTag, entry); - - } - - private final String id = "(" + System.identityHashCode(this) + ")"; - - public String debugIdentity() - { - return _channelId + id; - } - - /** - * Called to attempt re-delivery all outstanding unacknowledged messages on the channel. May result in delivery to - * this same channel or to other subscribers. - * - * @throws org.apache.qpid.AMQException if the requeue fails - */ - public void requeue() throws AMQException - { - // we must create a new map since all the messages will get a new delivery tag when they are redelivered - Collection<QueueEntry> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages(); - - if (!messagesToBeDelivered.isEmpty()) - { - if (_logger.isInfoEnabled()) - { - _logger.info("Requeuing " + messagesToBeDelivered.size() + " unacked messages. for " + toString()); - } - - } - - for (QueueEntry unacked : messagesToBeDelivered) - { - if (!unacked.isQueueDeleted()) - { - // Mark message redelivered - unacked.setRedelivered(); - - // Ensure message is released for redelivery - unacked.release(); - - } - else - { - unacked.discard(); - } - } - - } - - /** - * Requeue a single message - * - * @param deliveryTag The message to requeue - * - * @throws AMQException If something goes wrong. - */ - public void requeue(long deliveryTag) throws AMQException - { - QueueEntry unacked = _unacknowledgedMessageMap.remove(deliveryTag); - - if (unacked != null) - { - // Mark message redelivered - unacked.setRedelivered(); - - // Ensure message is released for redelivery - if (!unacked.isQueueDeleted()) - { - - // Ensure message is released for redelivery - unacked.release(); - - } - else - { - _logger.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked - + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message."); - - unacked.discard(); - } - } - else - { - _logger.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists." - + _unacknowledgedMessageMap.size()); - - } - - } - - /** - * Called to resend all outstanding unacknowledged messages to this same channel. - * - * @param requeue Are the messages to be requeued or dropped. - * - * @throws AMQException When something goes wrong. - */ - public void resend(final boolean requeue) throws AMQException - { - - - final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>(); - final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>(); - - if (_logger.isDebugEnabled()) - { - _logger.debug("unacked map Size:" + _unacknowledgedMessageMap.size()); - } - - // Process the Unacked-Map. - // Marking messages who still have a consumer for to be resent - // and those that don't to be requeued. - _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, - msgToRequeue, - msgToResend, - requeue, - _messageStore)); - - - // Process Messages to Resend - if (_logger.isDebugEnabled()) - { - if (!msgToResend.isEmpty()) - { - _logger.debug("Preparing (" + msgToResend.size() + ") message to resend."); - } - else - { - _logger.debug("No message to resend."); - } - } - - for (Map.Entry<Long, QueueEntry> entry : msgToResend.entrySet()) - { - QueueEntry message = entry.getValue(); - long deliveryTag = entry.getKey(); - - - - ServerMessage msg = message.getMessage(); - AMQQueue queue = message.getQueue(); - - // Our Java Client will always suspend the channel when resending! - // If the client has requested the messages be resent then it is - // their responsibility to ensure that thay are capable of receiving them - // i.e. The channel hasn't been server side suspended. - // if (isSuspended()) - // { - // _logger.info("Channel is suspended so requeuing"); - // //move this message to requeue - // msgToRequeue.add(message); - // } - // else - // { - // release to allow it to be delivered - - // Without any details from the client about what has been processed we have to mark - // all messages in the unacked map as redelivered. - message.setRedelivered(); - - Subscription sub = message.getDeliveredSubscription(); - - if (sub != null) - { - - if(!queue.resend(message,sub)) - { - msgToRequeue.put(deliveryTag, message); - } - } - else - { - - if (_logger.isInfoEnabled()) - { - _logger.info("DeliveredSubscription not recorded so just requeueing(" + message.toString() - + ")to prevent loss"); - } - // move this message to requeue - msgToRequeue.put(deliveryTag, message); - } - } // for all messages - // } else !isSuspend - - if (_logger.isInfoEnabled()) - { - if (!msgToRequeue.isEmpty()) - { - _logger.info("Preparing (" + msgToRequeue.size() + ") message to requeue to."); - } - } - - // Process Messages to Requeue at the front of the queue - for (Map.Entry<Long, QueueEntry> entry : msgToRequeue.entrySet()) - { - QueueEntry message = entry.getValue(); - long deliveryTag = entry.getKey(); - _unacknowledgedMessageMap.remove(deliveryTag); - - message.setRedelivered(); - message.release(); - - } - } - - - /** - * Acknowledge one or more messages. - * - * @param deliveryTag the last delivery tag - * @param multiple if true will acknowledge all messages up to an including the delivery tag. if false only - * acknowledges the single message specified by the delivery tag - * - * @throws AMQException if the delivery tag is unknown (e.g. not outstanding) on this channel - */ - public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException - { - Collection<QueueEntry> ackedMessages = getAckedMessages(deliveryTag, multiple); - _transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages)); - } - - private Collection<QueueEntry> getAckedMessages(long deliveryTag, boolean multiple) - { - - Map<Long, QueueEntry> ackedMessageMap = new LinkedHashMap<Long,QueueEntry>(); - _unacknowledgedMessageMap.collect(deliveryTag, multiple, ackedMessageMap); - _unacknowledgedMessageMap.remove(ackedMessageMap); - return ackedMessageMap.values(); - } - - /** - * Used only for testing purposes. - * - * @return the map of unacknowledged messages - */ - public UnacknowledgedMessageMap getUnacknowledgedMessageMap() - { - return _unacknowledgedMessageMap; - } - - /** - * Called from the ChannelFlowHandler to suspend this Channel - * @param suspended boolean, should this Channel be suspended - */ - public void setSuspended(boolean suspended) - { - boolean wasSuspended = _suspended.getAndSet(suspended); - if (wasSuspended != suspended) - { - // Log Flow Started before we start the subscriptions - if (!suspended) - { - _actor.message(_logSubject, ChannelMessages.CHN_FLOW("Started")); - } - - - // This section takes two different approaches to perform to perform - // the same function. Ensuring that the Subscription has taken note - // of the change in Channel State - - // Here we have become unsuspended and so we ask each the queue to - // perform an Async delivery for each of the subscriptions in this - // Channel. The alternative would be to ensure that the subscription - // had received the change in suspension state. That way the logic - // behind decieding to start an async delivery was located with the - // Subscription. - if (wasSuspended) - { - // may need to deliver queued messages - for (Subscription s : _tag2SubscriptionMap.values()) - { - s.getQueue().deliverAsync(s); - } - } - - - // Here we have become suspended so we need to ensure that each of - // the Subscriptions has noticed this change so that we can be sure - // they are not still sending messages. Again the code here is a - // very simplistic approach to ensure that the change of suspension - // has been noticed by each of the Subscriptions. Unlike the above - // case we don't actually need to do anything else. - if (!wasSuspended) - { - // may need to deliver queued messages - for (Subscription s : _tag2SubscriptionMap.values()) - { - try - { - s.getSendLock(); - } - finally - { - s.releaseSendLock(); - } - } - } - - - // Log Suspension only after we have confirmed all suspensions are - // stopped. - if (suspended) - { - _actor.message(_logSubject, ChannelMessages.CHN_FLOW("Stopped")); - } - - } - } - - public boolean isSuspended() - { - return _suspended.get() || _closing || _session.isClosing(); - } - - public void commit() throws AMQException - { - if (!isTransactional()) - { - throw new AMQException("Fatal error: commit called on non-transactional channel"); - } - - _transaction.commit(); - - } - - public void rollback() throws AMQException - { - rollback(NULL_TASK); - } - - public void rollback(Runnable postRollbackTask) throws AMQException - { - if (!isTransactional()) - { - throw new AMQException("Fatal error: commit called on non-transactional channel"); - } - - // stop all subscriptions - _rollingBack = true; - boolean requiresSuspend = _suspended.compareAndSet(false,true); - - // ensure all subscriptions have seen the change to the channel state - for(Subscription sub : _tag2SubscriptionMap.values()) - { - sub.getSendLock(); - sub.releaseSendLock(); - } - - try - { - _transaction.rollback(); - } - finally - { - _rollingBack = false; - } - - postRollbackTask.run(); - - for(QueueEntry entry : _resendList) - { - Subscription sub = entry.getDeliveredSubscription(); - if(sub == null || sub.isClosed()) - { - entry.release(); - } - else - { - sub.getQueue().resend(entry, sub); - } - } - _resendList.clear(); - - if(requiresSuspend) - { - _suspended.set(false); - for(Subscription sub : _tag2SubscriptionMap.values()) - { - sub.getQueue().deliverAsync(sub); - } - - } - - - } - - public String toString() - { - return "["+_session.toString()+":"+_channelId+"]"; - } - - public void setDefaultQueue(AMQQueue queue) - { - _defaultQueue = queue; - } - - public AMQQueue getDefaultQueue() - { - return _defaultQueue; - } - - - public boolean isClosing() - { - return _closing; - } - - public AMQProtocolSession getProtocolSession() - { - return _session; - } - - public FlowCreditManager getCreditManager() - { - return _creditManager; - } - - public void setCredit(final long prefetchSize, final int prefetchCount) - { - _actor.message(ChannelMessages.CHN_PREFETCH_SIZE(prefetchSize, prefetchCount)); - _creditManager.setCreditLimits(prefetchSize, prefetchCount); - } - - public MessageStore getMessageStore() - { - return _messageStore; - } - - private final ClientDeliveryMethod _clientDeliveryMethod = new ClientDeliveryMethod() - { - - public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag) - throws AMQException - { - getProtocolSession().getProtocolOutputConverter().writeDeliver(entry, getChannelId(), - deliveryTag, sub.getConsumerTag()); - } - - }; - - public ClientDeliveryMethod getClientDeliveryMethod() - { - return _clientDeliveryMethod; - } - - private final RecordDeliveryMethod _recordDeliveryMethod = new RecordDeliveryMethod() - { - - public void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag) - { - addUnacknowledgedMessage(entry, deliveryTag, sub); - } - }; - - public RecordDeliveryMethod getRecordDeliveryMethod() - { - return _recordDeliveryMethod; - } - - - private AMQMessage createAMQMessage(IncomingMessage incomingMessage) - throws AMQException - { - - AMQMessage message = new AMQMessage(incomingMessage.getStoredMessage()); - - message.setExpiration(incomingMessage.getExpiration()); - message.setClientIdentifier(_session); - return message; - } - - private boolean checkMessageUserId(ContentHeaderBody header) - { - AMQShortString userID = - header.properties instanceof BasicContentHeaderProperties - ? ((BasicContentHeaderProperties) header.properties).getUserId() - : null; - - return (!MSG_AUTH || _session.getPrincipal().getName().equals(userID == null? "" : userID.toString())); - - } - - private class MessageDeliveryAction implements ServerTransaction.Action - { - private IncomingMessage _incommingMessage; - private ArrayList<AMQQueue> _destinationQueues; - - public MessageDeliveryAction(IncomingMessage currentMessage, - ArrayList<AMQQueue> destinationQueues) - { - _incommingMessage = currentMessage; - _destinationQueues = destinationQueues; - } - - public void postCommit() - { - try - { - final boolean immediate = _incommingMessage.isImmediate(); - - final AMQMessage amqMessage = createAMQMessage(_incommingMessage); - MessageReference ref = amqMessage.newReference(); - - for(AMQQueue queue : _destinationQueues) - { - - QueueEntry entry = queue.enqueue(amqMessage); - queue.checkCapacity(AMQChannel.this); - - - if(immediate && !entry.getDeliveredToConsumer() && entry.acquire()) - { - - - ServerTransaction txn = new LocalTransaction(_messageStore); - Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1); - entries.add(entry); - final AMQMessage message = (AMQMessage) entry.getMessage(); - txn.dequeue(queue, entry.getMessage(), - new MessageAcknowledgeAction(entries) - { - @Override - public void postCommit() - { - try - { - final - ProtocolOutputConverter outputConverter = - _session.getProtocolOutputConverter(); - - outputConverter.writeReturn(message.getMessagePublishInfo(), - message.getContentHeaderBody(), - message, - _channelId, - AMQConstant.NO_CONSUMERS.getCode(), - IMMEDIATE_DELIVERY_REPLY_TEXT); - } - catch (AMQException e) - { - throw new RuntimeException(e); - } - super.postCommit(); - } - } - ); - txn.commit(); - - - - - } - - } - ref.release(); - } - catch (AMQException e) - { - // TODO - throw new RuntimeException(e); - } - - - - - - } - - public void onRollback() - { - // Maybe keep track of entries that were created and then delete them here in case of failure - // to in memory enqueue - } - } - - private class MessageAcknowledgeAction implements ServerTransaction.Action - { - private final Collection<QueueEntry> _ackedMessages; - - - public MessageAcknowledgeAction(Collection<QueueEntry> ackedMessages) - { - _ackedMessages = ackedMessages; - } - - public void postCommit() - { - try - { - for(QueueEntry entry : _ackedMessages) - { - entry.discard(); - } - } - finally - { - _acknowledgedMessages.clear(); - } - - } - - public void onRollback() - { - // explicit rollbacks resend the message after the rollback-ok is sent - if(_rollingBack) - { - _resendList.addAll(_ackedMessages); - } - else - { - try - { - for(QueueEntry entry : _ackedMessages) - { - entry.release(); - } - } - finally - { - _acknowledgedMessages.clear(); - } - } - - } - } - - private class WriteReturnAction implements ServerTransaction.Action - { - private final AMQConstant _errorCode; - private final IncomingMessage _message; - private final String _description; - - public WriteReturnAction(AMQConstant errorCode, - String description, - IncomingMessage message) - { - _errorCode = errorCode; - _message = message; - _description = description; - } - - public void postCommit() - { - try - { - _session.getProtocolOutputConverter().writeReturn(_message.getMessagePublishInfo(), - _message.getContentHeader(), - _message, - _channelId, - _errorCode.getCode(), - new AMQShortString(_description)); - } - catch (AMQException e) - { - //TODO - throw new RuntimeException(e); - } - - } - - public void onRollback() - { - //To change body of implemented methods use File | Settings | File Templates. - } - } - - - public LogActor getLogActor() - { - return _actor; - } - - public void block(AMQQueue queue) - { - if(_blockingQueues.putIfAbsent(queue, Boolean.TRUE) == null) - { - - if(_blocking.compareAndSet(false,true)) - { - _actor.message(_logSubject, ChannelMessages.CHN_FLOW_ENFORCED(queue.getName().toString())); - flow(false); - } - } - } - - public void unblock(AMQQueue queue) - { - if(_blockingQueues.remove(queue)) - { - if(_blocking.compareAndSet(true,false)) - { - _actor.message(_logSubject, ChannelMessages.CHN_FLOW_REMOVED()); - - flow(true); - } - } - } - - private void flow(boolean flow) - { - MethodRegistry methodRegistry = _session.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createChannelFlowBody(flow); - _session.writeFrame(responseBody.generateFrame(_channelId)); - } - - public boolean getBlocking() - { - return _blocking.get(); - } -} |