summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java177
1 files changed, 121 insertions, 56 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 8198cec821..e197dddfde 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -23,7 +23,9 @@ package org.apache.qpid.server;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
@@ -32,9 +34,9 @@ import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
-import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
@@ -89,7 +91,6 @@ import org.apache.qpid.server.subscription.ClientDeliveryMethod;
import org.apache.qpid.server.subscription.RecordDeliveryMethod;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
-import org.apache.qpid.server.subscription.SubscriptionImpl;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -137,11 +138,9 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new LinkedList<AsyncCommand>();
- private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500;
-
private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH);
- // Set of messages being acknoweledged in the current transaction
+ // Set of messages being acknowledged in the current transaction
private SortedSet<QueueEntry> _acknowledgedMessages = new TreeSet<QueueEntry>();
private final AtomicBoolean _suspended = new AtomicBoolean(false);
@@ -157,7 +156,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
private final AMQProtocolSession _session;
private AtomicBoolean _closing = new AtomicBoolean(false);
- private final Set<AMQQueue> _blockingQueues = new ConcurrentSkipListSet<AMQQueue>();
+ private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>());
private final AtomicBoolean _blocking = new AtomicBoolean(false);
@@ -170,11 +169,13 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
private List<QueueEntry> _resendList = new ArrayList<QueueEntry>();
private static final
AMQShortString IMMEDIATE_DELIVERY_REPLY_TEXT = new AMQShortString("Immediate delivery is not possible.");
- private final UUID _id;
+ private final UUID _qmfId;
private long _createTime = System.currentTimeMillis();
private final ClientDeliveryMethod _clientDeliveryMethod;
+ private final TransactionTimeoutHelper _transactionTimeoutHelper;
+
public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
throws AMQException
{
@@ -183,7 +184,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
_actor = new AMQPChannelActor(this, session.getLogActor().getRootMessageLogger());
_logSubject = new ChannelLogSubject(this);
- _id = getConfigStore().createId();
+ _qmfId = getConfigStore().createId();
_actor.message(ChannelMessages.CREATE());
getConfigStore().addConfiguredObject(this);
@@ -194,6 +195,8 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
_transaction = new AsyncAutoCommitTransaction(_messageStore, this);
_clientDeliveryMethod = session.createDeliveryMethod(_channelId);
+
+ _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject);
}
public ConfigStore getConfigStore()
@@ -264,6 +267,11 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
return _txnCount.get();
}
+ public Long getTxnStart()
+ {
+ return _txnStarts.get();
+ }
+
public int getChannelId()
{
return _channelId;
@@ -441,7 +449,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
* @param acks Are acks enabled for this subscriber
* @param filters Filters to apply to this subscriber
*
- * @param noLocal Flag stopping own messages being receivied.
+ * @param noLocal Flag stopping own messages being received.
* @param exclusive Flag requesting exclusive access to the queue
* @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests
*
@@ -948,9 +956,11 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
public void commit() throws AMQException
{
- commit(null);
+ commit(null, false);
}
- public void commit(Runnable immediateAction) throws AMQException
+
+
+ public void commit(final Runnable immediateAction, boolean async) throws AMQException
{
if (!isTransactional())
@@ -958,11 +968,29 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
throw new AMQException("Fatal error: commit called on non-transactional channel");
}
- _transaction.commit(immediateAction);
+ if(async && _transaction instanceof LocalTransaction)
+ {
+
+ ((LocalTransaction)_transaction).commitAsync(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ immediateAction.run();
+ _txnCommits.incrementAndGet();
+ _txnStarts.incrementAndGet();
+ decrementOutstandingTxnsIfNecessary();
+ }
+ });
+ }
+ else
+ {
+ _transaction.commit(immediateAction);
- _txnCommits.incrementAndGet();
- _txnStarts.incrementAndGet();
- decrementOutstandingTxnsIfNecessary();
+ _txnCommits.incrementAndGet();
+ _txnStarts.incrementAndGet();
+ decrementOutstandingTxnsIfNecessary();
+ }
}
public void rollback() throws AMQException
@@ -1357,9 +1385,34 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
return _actor;
}
- public void block(AMQQueue queue)
+ public synchronized void block()
{
- if(_blockingQueues.add(queue))
+ if(_blockingEntities.add(this))
+ {
+ if(_blocking.compareAndSet(false,true))
+ {
+ _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED("** All Queues **"));
+ flow(false);
+ }
+ }
+ }
+
+ public synchronized void unblock()
+ {
+ if(_blockingEntities.remove(this))
+ {
+ if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false))
+ {
+ _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED());
+
+ flow(true);
+ }
+ }
+ }
+
+ public synchronized void block(AMQQueue queue)
+ {
+ if(_blockingEntities.add(queue))
{
if(_blocking.compareAndSet(false,true))
@@ -1370,11 +1423,11 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
}
}
- public void unblock(AMQQueue queue)
+ public synchronized void unblock(AMQQueue queue)
{
- if(_blockingQueues.remove(queue))
+ if(_blockingEntities.remove(queue))
{
- if(_blocking.compareAndSet(true,false) && !isClosing())
+ if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false) && !isClosing())
{
_actor.message(_logSubject, ChannelMessages.FLOW_REMOVED());
@@ -1393,6 +1446,11 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
return false;
}
+ public int getUnacknowledgedMessageCount()
+ {
+ return getUnacknowledgedMessageMap().size();
+ }
+
private void flow(boolean flow)
{
MethodRegistry methodRegistry = _session.getMethodRegistry();
@@ -1400,6 +1458,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
_session.writeFrame(responseBody.generateFrame(_channelId));
}
+ @Override
public boolean getBlocking()
{
return _blocking.get();
@@ -1456,9 +1515,10 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
return false;
}
- public UUID getId()
+ @Override
+ public UUID getQMFId()
{
- return _id;
+ return _qmfId;
}
public String getSessionName()
@@ -1484,30 +1544,42 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
long openTime = currentTime - _transaction.getTransactionStartTime();
long idleTime = currentTime - _txnUpdateTime.get();
- // Log a warning on idle or open transactions
- if (idleWarn > 0L && idleTime > idleWarn)
- {
- CurrentActor.get().message(_logSubject, ChannelMessages.IDLE_TXN(idleTime));
- _logger.warn("IDLE TRANSACTION ALERT " + _logSubject.toString() + " " + idleTime + " ms");
- }
- else if (openWarn > 0L && openTime > openWarn)
+ _transactionTimeoutHelper.logIfNecessary(idleTime, idleWarn, ChannelMessages.IDLE_TXN(idleTime),
+ TransactionTimeoutHelper.IDLE_TRANSACTION_ALERT);
+ if (_transactionTimeoutHelper.isTimedOut(idleTime, idleClose))
{
- CurrentActor.get().message(_logSubject, ChannelMessages.OPEN_TXN(openTime));
- _logger.warn("OPEN TRANSACTION ALERT " + _logSubject.toString() + " " + openTime + " ms");
+ closeConnection("Idle transaction timed out");
+ return;
}
- // Close connection for idle or open transactions that have timed out
- if (idleClose > 0L && idleTime > idleClose)
+ _transactionTimeoutHelper.logIfNecessary(openTime, openWarn, ChannelMessages.OPEN_TXN(openTime),
+ TransactionTimeoutHelper.OPEN_TRANSACTION_ALERT);
+ if (_transactionTimeoutHelper.isTimedOut(openTime, openClose))
{
- getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out");
- }
- else if (openClose > 0L && openTime > openClose)
- {
- getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out");
+ closeConnection("Open transaction timed out");
+ return;
}
}
}
+ /**
+ * Typically called from the HouseKeepingThread instead of the main receiver thread,
+ * therefore uses a lock to close the connection in a thread-safe manner.
+ */
+ private void closeConnection(String reason) throws AMQException
+ {
+ Lock receivedLock = _session.getReceivedLock();
+ receivedLock.lock();
+ try
+ {
+ _session.close(AMQConstant.RESOURCE_ERROR, reason);
+ }
+ finally
+ {
+ receivedLock.unlock();
+ }
+ }
+
public void deadLetter(long deliveryTag) throws AMQException
{
final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap();
@@ -1563,23 +1635,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
_unfinishedCommandsQueue.add(new AsyncCommand(future, action));
}
- public void completeAsyncCommands()
- {
- AsyncCommand cmd;
- while((cmd = _unfinishedCommandsQueue.peek()) != null && cmd.isReadyForCompletion())
- {
- cmd.complete();
- _unfinishedCommandsQueue.poll();
- }
- while(_unfinishedCommandsQueue.size() > UNFINISHED_COMMAND_QUEUE_THRESHOLD)
- {
- cmd = _unfinishedCommandsQueue.poll();
- cmd.awaitReadyForCompletion();
- cmd.complete();
- }
- }
-
-
public void sync()
{
AsyncCommand cmd;
@@ -1588,6 +1643,10 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
cmd.awaitReadyForCompletion();
cmd.complete();
}
+ if(_transaction instanceof LocalTransaction)
+ {
+ ((LocalTransaction)_transaction).sync();
+ }
}
private static class AsyncCommand
@@ -1624,6 +1683,12 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
public int compareTo(AMQSessionModel session)
{
- return getId().compareTo(session.getId());
+ return getQMFId().compareTo(session.getQMFId());
+ }
+
+ @Override
+ public int getConsumerCount()
+ {
+ return _tag2SubscriptionMap.size();
}
}