diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java | 177 |
1 files changed, 121 insertions, 56 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 8198cec821..e197dddfde 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -23,7 +23,9 @@ package org.apache.qpid.server; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; @@ -32,9 +34,9 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; import java.util.UUID; -import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; @@ -89,7 +91,6 @@ import org.apache.qpid.server.subscription.ClientDeliveryMethod; import org.apache.qpid.server.subscription.RecordDeliveryMethod; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; -import org.apache.qpid.server.subscription.SubscriptionImpl; import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; @@ -137,11 +138,9 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new LinkedList<AsyncCommand>(); - private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500; - private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH); - // Set of messages being acknoweledged in the current transaction + // Set of messages being acknowledged in the current transaction private SortedSet<QueueEntry> _acknowledgedMessages = new TreeSet<QueueEntry>(); private final AtomicBoolean _suspended = new AtomicBoolean(false); @@ -157,7 +156,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm private final AMQProtocolSession _session; private AtomicBoolean _closing = new AtomicBoolean(false); - private final Set<AMQQueue> _blockingQueues = new ConcurrentSkipListSet<AMQQueue>(); + private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>()); private final AtomicBoolean _blocking = new AtomicBoolean(false); @@ -170,11 +169,13 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm private List<QueueEntry> _resendList = new ArrayList<QueueEntry>(); private static final AMQShortString IMMEDIATE_DELIVERY_REPLY_TEXT = new AMQShortString("Immediate delivery is not possible."); - private final UUID _id; + private final UUID _qmfId; private long _createTime = System.currentTimeMillis(); private final ClientDeliveryMethod _clientDeliveryMethod; + private final TransactionTimeoutHelper _transactionTimeoutHelper; + public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore) throws AMQException { @@ -183,7 +184,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm _actor = new AMQPChannelActor(this, session.getLogActor().getRootMessageLogger()); _logSubject = new ChannelLogSubject(this); - _id = getConfigStore().createId(); + _qmfId = getConfigStore().createId(); _actor.message(ChannelMessages.CREATE()); getConfigStore().addConfiguredObject(this); @@ -194,6 +195,8 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm _transaction = new AsyncAutoCommitTransaction(_messageStore, this); _clientDeliveryMethod = session.createDeliveryMethod(_channelId); + + _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject); } public ConfigStore getConfigStore() @@ -264,6 +267,11 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm return _txnCount.get(); } + public Long getTxnStart() + { + return _txnStarts.get(); + } + public int getChannelId() { return _channelId; @@ -441,7 +449,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm * @param acks Are acks enabled for this subscriber * @param filters Filters to apply to this subscriber * - * @param noLocal Flag stopping own messages being receivied. + * @param noLocal Flag stopping own messages being received. * @param exclusive Flag requesting exclusive access to the queue * @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests * @@ -948,9 +956,11 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm public void commit() throws AMQException { - commit(null); + commit(null, false); } - public void commit(Runnable immediateAction) throws AMQException + + + public void commit(final Runnable immediateAction, boolean async) throws AMQException { if (!isTransactional()) @@ -958,11 +968,29 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm throw new AMQException("Fatal error: commit called on non-transactional channel"); } - _transaction.commit(immediateAction); + if(async && _transaction instanceof LocalTransaction) + { + + ((LocalTransaction)_transaction).commitAsync(new Runnable() + { + @Override + public void run() + { + immediateAction.run(); + _txnCommits.incrementAndGet(); + _txnStarts.incrementAndGet(); + decrementOutstandingTxnsIfNecessary(); + } + }); + } + else + { + _transaction.commit(immediateAction); - _txnCommits.incrementAndGet(); - _txnStarts.incrementAndGet(); - decrementOutstandingTxnsIfNecessary(); + _txnCommits.incrementAndGet(); + _txnStarts.incrementAndGet(); + decrementOutstandingTxnsIfNecessary(); + } } public void rollback() throws AMQException @@ -1357,9 +1385,34 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm return _actor; } - public void block(AMQQueue queue) + public synchronized void block() { - if(_blockingQueues.add(queue)) + if(_blockingEntities.add(this)) + { + if(_blocking.compareAndSet(false,true)) + { + _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED("** All Queues **")); + flow(false); + } + } + } + + public synchronized void unblock() + { + if(_blockingEntities.remove(this)) + { + if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false)) + { + _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED()); + + flow(true); + } + } + } + + public synchronized void block(AMQQueue queue) + { + if(_blockingEntities.add(queue)) { if(_blocking.compareAndSet(false,true)) @@ -1370,11 +1423,11 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm } } - public void unblock(AMQQueue queue) + public synchronized void unblock(AMQQueue queue) { - if(_blockingQueues.remove(queue)) + if(_blockingEntities.remove(queue)) { - if(_blocking.compareAndSet(true,false) && !isClosing()) + if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false) && !isClosing()) { _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED()); @@ -1393,6 +1446,11 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm return false; } + public int getUnacknowledgedMessageCount() + { + return getUnacknowledgedMessageMap().size(); + } + private void flow(boolean flow) { MethodRegistry methodRegistry = _session.getMethodRegistry(); @@ -1400,6 +1458,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm _session.writeFrame(responseBody.generateFrame(_channelId)); } + @Override public boolean getBlocking() { return _blocking.get(); @@ -1456,9 +1515,10 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm return false; } - public UUID getId() + @Override + public UUID getQMFId() { - return _id; + return _qmfId; } public String getSessionName() @@ -1484,30 +1544,42 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm long openTime = currentTime - _transaction.getTransactionStartTime(); long idleTime = currentTime - _txnUpdateTime.get(); - // Log a warning on idle or open transactions - if (idleWarn > 0L && idleTime > idleWarn) - { - CurrentActor.get().message(_logSubject, ChannelMessages.IDLE_TXN(idleTime)); - _logger.warn("IDLE TRANSACTION ALERT " + _logSubject.toString() + " " + idleTime + " ms"); - } - else if (openWarn > 0L && openTime > openWarn) + _transactionTimeoutHelper.logIfNecessary(idleTime, idleWarn, ChannelMessages.IDLE_TXN(idleTime), + TransactionTimeoutHelper.IDLE_TRANSACTION_ALERT); + if (_transactionTimeoutHelper.isTimedOut(idleTime, idleClose)) { - CurrentActor.get().message(_logSubject, ChannelMessages.OPEN_TXN(openTime)); - _logger.warn("OPEN TRANSACTION ALERT " + _logSubject.toString() + " " + openTime + " ms"); + closeConnection("Idle transaction timed out"); + return; } - // Close connection for idle or open transactions that have timed out - if (idleClose > 0L && idleTime > idleClose) + _transactionTimeoutHelper.logIfNecessary(openTime, openWarn, ChannelMessages.OPEN_TXN(openTime), + TransactionTimeoutHelper.OPEN_TRANSACTION_ALERT); + if (_transactionTimeoutHelper.isTimedOut(openTime, openClose)) { - getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out"); - } - else if (openClose > 0L && openTime > openClose) - { - getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out"); + closeConnection("Open transaction timed out"); + return; } } } + /** + * Typically called from the HouseKeepingThread instead of the main receiver thread, + * therefore uses a lock to close the connection in a thread-safe manner. + */ + private void closeConnection(String reason) throws AMQException + { + Lock receivedLock = _session.getReceivedLock(); + receivedLock.lock(); + try + { + _session.close(AMQConstant.RESOURCE_ERROR, reason); + } + finally + { + receivedLock.unlock(); + } + } + public void deadLetter(long deliveryTag) throws AMQException { final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap(); @@ -1563,23 +1635,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm _unfinishedCommandsQueue.add(new AsyncCommand(future, action)); } - public void completeAsyncCommands() - { - AsyncCommand cmd; - while((cmd = _unfinishedCommandsQueue.peek()) != null && cmd.isReadyForCompletion()) - { - cmd.complete(); - _unfinishedCommandsQueue.poll(); - } - while(_unfinishedCommandsQueue.size() > UNFINISHED_COMMAND_QUEUE_THRESHOLD) - { - cmd = _unfinishedCommandsQueue.poll(); - cmd.awaitReadyForCompletion(); - cmd.complete(); - } - } - - public void sync() { AsyncCommand cmd; @@ -1588,6 +1643,10 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm cmd.awaitReadyForCompletion(); cmd.complete(); } + if(_transaction instanceof LocalTransaction) + { + ((LocalTransaction)_transaction).sync(); + } } private static class AsyncCommand @@ -1624,6 +1683,12 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm public int compareTo(AMQSessionModel session) { - return getId().compareTo(session.getId()); + return getQMFId().compareTo(session.getQMFId()); + } + + @Override + public int getConsumerCount() + { + return _tag2SubscriptionMap.size(); } } |