diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java | 193 |
1 files changed, 46 insertions, 147 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index e197dddfde..ab4ca81d05 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -51,13 +51,10 @@ import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl; -import org.apache.qpid.server.configuration.ConfigStore; -import org.apache.qpid.server.configuration.ConfiguredObject; -import org.apache.qpid.server.configuration.ConnectionConfig; -import org.apache.qpid.server.configuration.SessionConfig; -import org.apache.qpid.server.configuration.SessionConfigType; +import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.flow.Pre0_10CreditManager; @@ -75,7 +72,6 @@ import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.protocol.AMQConnectionModel; -import org.apache.qpid.server.protocol.AMQProtocolEngine; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; @@ -83,7 +79,6 @@ import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.InboundMessageAdapter; import org.apache.qpid.server.queue.IncomingMessage; import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; @@ -93,19 +88,19 @@ import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; +import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.TransportException; -public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoCommitTransaction.FutureRecorder +public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.FutureRecorder { public static final int DEFAULT_PREFETCH = 4096; private static final Logger _logger = Logger.getLogger(AMQChannel.class); - private static final boolean MSG_AUTH = - ApplicationRegistry.getInstance().getConfiguration().getMsgAuth(); - + //TODO use Broker property to configure message authorization requirements + private boolean _messageAuthorizationRequired = Boolean.getBoolean(BrokerProperties.PROPERTY_MSG_AUTH); private final int _channelId; @@ -118,7 +113,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm */ private long _deliveryTag = 0; - /** A channel has a default queue (the last declared) that is used when no queue name is explictily set */ + /** A channel has a default queue (the last declared) that is used when no queue name is explicitly set */ private AMQQueue _defaultQueue; /** This tag is unique per subscription to a queue. The server returns this in response to a basic.consume request. */ @@ -151,7 +146,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm private final AtomicLong _txnCommits = new AtomicLong(0); private final AtomicLong _txnRejects = new AtomicLong(0); private final AtomicLong _txnCount = new AtomicLong(0); - private final AtomicLong _txnUpdateTime = new AtomicLong(0); private final AMQProtocolSession _session; private AtomicBoolean _closing = new AtomicBoolean(false); @@ -169,12 +163,12 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm private List<QueueEntry> _resendList = new ArrayList<QueueEntry>(); private static final AMQShortString IMMEDIATE_DELIVERY_REPLY_TEXT = new AMQShortString("Immediate delivery is not possible."); - private final UUID _qmfId; private long _createTime = System.currentTimeMillis(); private final ClientDeliveryMethod _clientDeliveryMethod; private final TransactionTimeoutHelper _transactionTimeoutHelper; + private final UUID _id = UUID.randomUUID(); public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore) throws AMQException @@ -184,30 +178,36 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm _actor = new AMQPChannelActor(this, session.getLogActor().getRootMessageLogger()); _logSubject = new ChannelLogSubject(this); - _qmfId = getConfigStore().createId(); _actor.message(ChannelMessages.CREATE()); - getConfigStore().addConfiguredObject(this); - _messageStore = messageStore; // by default the session is non-transactional _transaction = new AsyncAutoCommitTransaction(_messageStore, this); - _clientDeliveryMethod = session.createDeliveryMethod(_channelId); - - _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject); - } + _clientDeliveryMethod = session.createDeliveryMethod(_channelId); - public ConfigStore getConfigStore() - { - return getVirtualHost().getConfigStore(); + _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject, new CloseAction() + { + @Override + public void doTimeoutAction(String reason) throws AMQException + { + closeConnection(reason); + } + }); } /** Sets this channel to be part of a local transaction */ public void setLocalTransactional() { - _transaction = new LocalTransaction(_messageStore); + _transaction = new LocalTransaction(_messageStore, new ActivityTimeAccessor() + { + @Override + public long getActivityTime() + { + return _session.getLastReceivedTime(); + } + }); _txnStarts.incrementAndGet(); } @@ -221,12 +221,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm sync(); } - - public boolean inTransaction() - { - return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0; - } - private void incrementOutstandingTxnsIfNecessary() { if(isTransactional()) @@ -247,11 +241,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm } } - public Long getTxnStarts() - { - return _txnStarts.get(); - } - public Long getTxnCommits() { return _txnCommits.get(); @@ -369,9 +358,8 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm } }); - _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues), getProtocolSession().getLastReceivedTime()); + _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues)); incrementOutstandingTxnsIfNecessary(); - updateTransactionalActivity(); _currentMessage.getStoredMessage().flushToStore(); } } @@ -396,7 +384,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm if (_logger.isDebugEnabled()) { - _logger.debug(debugIdentity() + "Content body received on channel " + _channelId); + _logger.debug(debugIdentity() + " content body received on channel " + _channelId); } try @@ -556,9 +544,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm { _logger.error("Caught TransportException whilst attempting to requeue:" + e); } - - getConfigStore().removeConfiguredObject(this); - } private void unsubscribeAllConsumers() throws AMQException @@ -860,7 +845,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm { Collection<QueueEntry> ackedMessages = getAckedMessages(deliveryTag, multiple); _transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages)); - updateTransactionalActivity(); } private Collection<QueueEntry> getAckedMessages(long deliveryTag, boolean multiple) @@ -1054,19 +1038,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm } } - - - } - - /** - * Update last transaction activity timestamp - */ - private void updateTransactionalActivity() - { - if (isTransactional()) - { - _txnUpdateTime.set(getProtocolSession().getLastReceivedTime()); - } } public String toString() @@ -1149,10 +1120,16 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm ? ((BasicContentHeaderProperties) header.getProperties()).getUserId() : null; - return (!MSG_AUTH || _session.getAuthorizedPrincipal().getName().equals(userID == null? "" : userID.toString())); + return (!_messageAuthorizationRequired || _session.getAuthorizedPrincipal().getName().equals(userID == null? "" : userID.toString())); } + @Override + public UUID getId() + { + return _id; + } + public AMQConnectionModel getConnectionModel() { return _session; @@ -1168,6 +1145,12 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm return _logSubject; } + @Override + public int compareTo(AMQSessionModel o) + { + return getId().compareTo(o.getId()); + } + private class MessageDeliveryAction implements ServerTransaction.Action { private IncomingMessage _incommingMessage; @@ -1221,11 +1204,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm // TODO throw new RuntimeException(e); } - - - - - } public void onRollback() @@ -1375,7 +1353,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm public void onRollback() { - //To change body of implemented methods use File | Settings | File Templates. } } @@ -1469,97 +1446,24 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm return getProtocolSession().getVirtualHost(); } - - public ConfiguredObject getParent() - { - return getVirtualHost(); - } - - public SessionConfigType getConfigType() - { - return SessionConfigType.getInstance(); - } - public int getChannel() { return getChannelId(); } - public boolean isAttached() - { - return true; - } - - public long getDetachedLifespan() - { - return 0; - } - - public ConnectionConfig getConnectionConfig() - { - return (AMQProtocolEngine)getProtocolSession(); - } - - public Long getExpiryTime() - { - return null; - } - - public Long getMaxClientRate() - { - return null; - } - public boolean isDurable() { return false; } - @Override - public UUID getQMFId() - { - return _qmfId; - } - - public String getSessionName() - { - return getConnectionConfig().getAddress() + "/" + getChannelId(); - } - public long getCreateTime() { return _createTime; } - public void mgmtClose() throws AMQException - { - _session.mgmtCloseChannel(_channelId); - } - public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException { - if (inTransaction()) - { - long currentTime = System.currentTimeMillis(); - long openTime = currentTime - _transaction.getTransactionStartTime(); - long idleTime = currentTime - _txnUpdateTime.get(); - - _transactionTimeoutHelper.logIfNecessary(idleTime, idleWarn, ChannelMessages.IDLE_TXN(idleTime), - TransactionTimeoutHelper.IDLE_TRANSACTION_ALERT); - if (_transactionTimeoutHelper.isTimedOut(idleTime, idleClose)) - { - closeConnection("Idle transaction timed out"); - return; - } - - _transactionTimeoutHelper.logIfNecessary(openTime, openWarn, ChannelMessages.OPEN_TXN(openTime), - TransactionTimeoutHelper.OPEN_TRANSACTION_ALERT); - if (_transactionTimeoutHelper.isTimedOut(openTime, openClose)) - { - closeConnection("Open transaction timed out"); - return; - } - } + _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, openWarn, openClose, idleWarn, idleClose); } /** @@ -1637,6 +1541,11 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm public void sync() { + if(_logger.isDebugEnabled()) + { + _logger.debug("sync() called on channel " + debugIdentity()); + } + AsyncCommand cmd; while((cmd = _unfinishedCommandsQueue.poll()) != null) { @@ -1674,16 +1583,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm _action.postCommit(); _action = null; } - - boolean isReadyForCompletion() - { - return _future.isComplete(); - } - } - - public int compareTo(AMQSessionModel session) - { - return getQMFId().compareTo(session.getQMFId()); } @Override |