diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java | 63 |
1 files changed, 4 insertions, 59 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 8141533045..4f86c82578 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -22,7 +22,6 @@ package org.apache.qpid.server; import org.apache.log4j.Logger; -import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.framing.AMQMethodBody; @@ -142,7 +141,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel private final AtomicLong _txnCommits = new AtomicLong(0); private final AtomicLong _txnRejects = new AtomicLong(0); private final AtomicLong _txnCount = new AtomicLong(0); - private final AtomicLong _txnUpdateTime = new AtomicLong(0); private final AMQProtocolSession _session; private AtomicBoolean _closing = new AtomicBoolean(false); @@ -202,11 +200,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel return !(_transaction instanceof AutoCommitTransaction); } - public boolean inTransaction() - { - return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0; - } - private void incrementOutstandingTxnsIfNecessary() { if(isTransactional()) @@ -302,6 +295,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel }); deliverCurrentMessageIfComplete(); + } } @@ -339,15 +333,11 @@ public class AMQChannel implements SessionConfig, AMQSessionModel { _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues, isTransactional())); incrementOutstandingTxnsIfNecessary(); - updateTransactionalActivity(); } } } finally { - long bodySize = _currentMessage.getSize(); - long timestamp = ((BasicContentHeaderProperties) _currentMessage.getContentHeader().getProperties()).getTimestamp(); - _session.registerMessageReceived(bodySize, timestamp); _currentMessage = null; } } @@ -804,7 +794,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel { Collection<QueueEntry> ackedMessages = getAckedMessages(deliveryTag, multiple); _transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages)); - updateTransactionalActivity(); } private Collection<QueueEntry> getAckedMessages(long deliveryTag, boolean multiple) @@ -979,17 +968,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel } - /** - * Update last transaction activity timestamp - */ - private void updateTransactionalActivity() - { - if (isTransactional()) - { - _txnUpdateTime.set(System.currentTimeMillis()); - } - } - public String toString() { return "["+_session.toString()+":"+_channelId+"]"; @@ -1040,7 +1018,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel { getProtocolSession().getProtocolOutputConverter().writeDeliver(entry, getChannelId(), deliveryTag, sub.getConsumerTag()); - _session.registerMessageDelivered(entry.getMessage().getSize()); } }; @@ -1079,11 +1056,11 @@ public class AMQChannel implements SessionConfig, AMQSessionModel private boolean checkMessageUserId(ContentHeaderBody header) { AMQShortString userID = - header.getProperties() instanceof BasicContentHeaderProperties - ? ((BasicContentHeaderProperties) header.getProperties()).getUserId() + header.properties instanceof BasicContentHeaderProperties + ? ((BasicContentHeaderProperties) header.properties).getUserId() : null; - return (!MSG_AUTH || _session.getAuthorizedPrincipal().getName().equals(userID == null? "" : userID.toString())); + return (!MSG_AUTH || _session.getPrincipal().getName().equals(userID == null? "" : userID.toString())); } @@ -1430,36 +1407,4 @@ public class AMQChannel implements SessionConfig, AMQSessionModel { _session.mgmtCloseChannel(_channelId); } - - public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException - { - if (inTransaction()) - { - long currentTime = System.currentTimeMillis(); - long openTime = currentTime - _transaction.getTransactionStartTime(); - long idleTime = currentTime - _txnUpdateTime.get(); - - // Log a warning on idle or open transactions - if (idleWarn > 0L && idleTime > idleWarn) - { - CurrentActor.get().message(_logSubject, ChannelMessages.IDLE_TXN(idleTime)); - _logger.warn("IDLE TRANSACTION ALERT " + _logSubject.toString() + " " + idleTime + " ms"); - } - else if (openWarn > 0L && openTime > openWarn) - { - CurrentActor.get().message(_logSubject, ChannelMessages.OPEN_TXN(openTime)); - _logger.warn("OPEN TRANSACTION ALERT " + _logSubject.toString() + " " + openTime + " ms"); - } - - // Close connection for idle or open transactions that have timed out - if (idleClose > 0L && idleTime > idleClose) - { - getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out"); - } - else if (openClose > 0L && openTime > openClose) - { - getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out"); - } - } - } } |