summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java193
1 files changed, 46 insertions, 147 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index e197dddfde..ab4ca81d05 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -51,13 +51,10 @@ import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
-import org.apache.qpid.server.configuration.ConfigStore;
-import org.apache.qpid.server.configuration.ConfiguredObject;
-import org.apache.qpid.server.configuration.ConnectionConfig;
-import org.apache.qpid.server.configuration.SessionConfig;
-import org.apache.qpid.server.configuration.SessionConfigType;
+import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
@@ -75,7 +72,6 @@ import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.protocol.AMQConnectionModel;
-import org.apache.qpid.server.protocol.AMQProtocolEngine;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
@@ -83,7 +79,6 @@ import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.InboundMessageAdapter;
import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
@@ -93,19 +88,19 @@ import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.TransportException;
-public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoCommitTransaction.FutureRecorder
+public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.FutureRecorder
{
public static final int DEFAULT_PREFETCH = 4096;
private static final Logger _logger = Logger.getLogger(AMQChannel.class);
- private static final boolean MSG_AUTH =
- ApplicationRegistry.getInstance().getConfiguration().getMsgAuth();
-
+ //TODO use Broker property to configure message authorization requirements
+ private boolean _messageAuthorizationRequired = Boolean.getBoolean(BrokerProperties.PROPERTY_MSG_AUTH);
private final int _channelId;
@@ -118,7 +113,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
*/
private long _deliveryTag = 0;
- /** A channel has a default queue (the last declared) that is used when no queue name is explictily set */
+ /** A channel has a default queue (the last declared) that is used when no queue name is explicitly set */
private AMQQueue _defaultQueue;
/** This tag is unique per subscription to a queue. The server returns this in response to a basic.consume request. */
@@ -151,7 +146,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
private final AtomicLong _txnCommits = new AtomicLong(0);
private final AtomicLong _txnRejects = new AtomicLong(0);
private final AtomicLong _txnCount = new AtomicLong(0);
- private final AtomicLong _txnUpdateTime = new AtomicLong(0);
private final AMQProtocolSession _session;
private AtomicBoolean _closing = new AtomicBoolean(false);
@@ -169,12 +163,12 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
private List<QueueEntry> _resendList = new ArrayList<QueueEntry>();
private static final
AMQShortString IMMEDIATE_DELIVERY_REPLY_TEXT = new AMQShortString("Immediate delivery is not possible.");
- private final UUID _qmfId;
private long _createTime = System.currentTimeMillis();
private final ClientDeliveryMethod _clientDeliveryMethod;
private final TransactionTimeoutHelper _transactionTimeoutHelper;
+ private final UUID _id = UUID.randomUUID();
public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
throws AMQException
@@ -184,30 +178,36 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
_actor = new AMQPChannelActor(this, session.getLogActor().getRootMessageLogger());
_logSubject = new ChannelLogSubject(this);
- _qmfId = getConfigStore().createId();
_actor.message(ChannelMessages.CREATE());
- getConfigStore().addConfiguredObject(this);
-
_messageStore = messageStore;
// by default the session is non-transactional
_transaction = new AsyncAutoCommitTransaction(_messageStore, this);
- _clientDeliveryMethod = session.createDeliveryMethod(_channelId);
-
- _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject);
- }
+ _clientDeliveryMethod = session.createDeliveryMethod(_channelId);
- public ConfigStore getConfigStore()
- {
- return getVirtualHost().getConfigStore();
+ _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject, new CloseAction()
+ {
+ @Override
+ public void doTimeoutAction(String reason) throws AMQException
+ {
+ closeConnection(reason);
+ }
+ });
}
/** Sets this channel to be part of a local transaction */
public void setLocalTransactional()
{
- _transaction = new LocalTransaction(_messageStore);
+ _transaction = new LocalTransaction(_messageStore, new ActivityTimeAccessor()
+ {
+ @Override
+ public long getActivityTime()
+ {
+ return _session.getLastReceivedTime();
+ }
+ });
_txnStarts.incrementAndGet();
}
@@ -221,12 +221,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
sync();
}
-
- public boolean inTransaction()
- {
- return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0;
- }
-
private void incrementOutstandingTxnsIfNecessary()
{
if(isTransactional())
@@ -247,11 +241,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
}
}
- public Long getTxnStarts()
- {
- return _txnStarts.get();
- }
-
public Long getTxnCommits()
{
return _txnCommits.get();
@@ -369,9 +358,8 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
}
});
- _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues), getProtocolSession().getLastReceivedTime());
+ _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues));
incrementOutstandingTxnsIfNecessary();
- updateTransactionalActivity();
_currentMessage.getStoredMessage().flushToStore();
}
}
@@ -396,7 +384,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
if (_logger.isDebugEnabled())
{
- _logger.debug(debugIdentity() + "Content body received on channel " + _channelId);
+ _logger.debug(debugIdentity() + " content body received on channel " + _channelId);
}
try
@@ -556,9 +544,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
{
_logger.error("Caught TransportException whilst attempting to requeue:" + e);
}
-
- getConfigStore().removeConfiguredObject(this);
-
}
private void unsubscribeAllConsumers() throws AMQException
@@ -860,7 +845,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
{
Collection<QueueEntry> ackedMessages = getAckedMessages(deliveryTag, multiple);
_transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages));
- updateTransactionalActivity();
}
private Collection<QueueEntry> getAckedMessages(long deliveryTag, boolean multiple)
@@ -1054,19 +1038,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
}
}
-
-
- }
-
- /**
- * Update last transaction activity timestamp
- */
- private void updateTransactionalActivity()
- {
- if (isTransactional())
- {
- _txnUpdateTime.set(getProtocolSession().getLastReceivedTime());
- }
}
public String toString()
@@ -1149,10 +1120,16 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
? ((BasicContentHeaderProperties) header.getProperties()).getUserId()
: null;
- return (!MSG_AUTH || _session.getAuthorizedPrincipal().getName().equals(userID == null? "" : userID.toString()));
+ return (!_messageAuthorizationRequired || _session.getAuthorizedPrincipal().getName().equals(userID == null? "" : userID.toString()));
}
+ @Override
+ public UUID getId()
+ {
+ return _id;
+ }
+
public AMQConnectionModel getConnectionModel()
{
return _session;
@@ -1168,6 +1145,12 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
return _logSubject;
}
+ @Override
+ public int compareTo(AMQSessionModel o)
+ {
+ return getId().compareTo(o.getId());
+ }
+
private class MessageDeliveryAction implements ServerTransaction.Action
{
private IncomingMessage _incommingMessage;
@@ -1221,11 +1204,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
// TODO
throw new RuntimeException(e);
}
-
-
-
-
-
}
public void onRollback()
@@ -1375,7 +1353,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
public void onRollback()
{
- //To change body of implemented methods use File | Settings | File Templates.
}
}
@@ -1469,97 +1446,24 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
return getProtocolSession().getVirtualHost();
}
-
- public ConfiguredObject getParent()
- {
- return getVirtualHost();
- }
-
- public SessionConfigType getConfigType()
- {
- return SessionConfigType.getInstance();
- }
-
public int getChannel()
{
return getChannelId();
}
- public boolean isAttached()
- {
- return true;
- }
-
- public long getDetachedLifespan()
- {
- return 0;
- }
-
- public ConnectionConfig getConnectionConfig()
- {
- return (AMQProtocolEngine)getProtocolSession();
- }
-
- public Long getExpiryTime()
- {
- return null;
- }
-
- public Long getMaxClientRate()
- {
- return null;
- }
-
public boolean isDurable()
{
return false;
}
- @Override
- public UUID getQMFId()
- {
- return _qmfId;
- }
-
- public String getSessionName()
- {
- return getConnectionConfig().getAddress() + "/" + getChannelId();
- }
-
public long getCreateTime()
{
return _createTime;
}
- public void mgmtClose() throws AMQException
- {
- _session.mgmtCloseChannel(_channelId);
- }
-
public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
{
- if (inTransaction())
- {
- long currentTime = System.currentTimeMillis();
- long openTime = currentTime - _transaction.getTransactionStartTime();
- long idleTime = currentTime - _txnUpdateTime.get();
-
- _transactionTimeoutHelper.logIfNecessary(idleTime, idleWarn, ChannelMessages.IDLE_TXN(idleTime),
- TransactionTimeoutHelper.IDLE_TRANSACTION_ALERT);
- if (_transactionTimeoutHelper.isTimedOut(idleTime, idleClose))
- {
- closeConnection("Idle transaction timed out");
- return;
- }
-
- _transactionTimeoutHelper.logIfNecessary(openTime, openWarn, ChannelMessages.OPEN_TXN(openTime),
- TransactionTimeoutHelper.OPEN_TRANSACTION_ALERT);
- if (_transactionTimeoutHelper.isTimedOut(openTime, openClose))
- {
- closeConnection("Open transaction timed out");
- return;
- }
- }
+ _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, openWarn, openClose, idleWarn, idleClose);
}
/**
@@ -1637,6 +1541,11 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
public void sync()
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("sync() called on channel " + debugIdentity());
+ }
+
AsyncCommand cmd;
while((cmd = _unfinishedCommandsQueue.poll()) != null)
{
@@ -1674,16 +1583,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
_action.postCommit();
_action = null;
}
-
- boolean isReadyForCompletion()
- {
- return _future.isComplete();
- }
- }
-
- public int compareTo(AMQSessionModel session)
- {
- return getQMFId().compareTo(session.getQMFId());
}
@Override