diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java | 45 |
1 files changed, 31 insertions, 14 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 08eb05680c..34bc57a826 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -50,6 +50,7 @@ import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.AMQPChannelActor; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.ChannelMessages; +import org.apache.qpid.server.logging.messages.ExchangeMessages; import org.apache.qpid.server.logging.subjects.ChannelLogSubject; import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.message.MessageMetaData; @@ -75,6 +76,7 @@ import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.transport.TransportException; import java.util.ArrayList; import java.util.Collection; @@ -137,7 +139,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel private final AtomicBoolean _suspended = new AtomicBoolean(false); private ServerTransaction _transaction; - + private final AtomicLong _txnStarts = new AtomicLong(0); private final AtomicLong _txnCommits = new AtomicLong(0); private final AtomicLong _txnRejects = new AtomicLong(0); @@ -201,12 +203,12 @@ public class AMQChannel implements SessionConfig, AMQSessionModel // theory return !(_transaction instanceof AutoCommitTransaction); } - + public boolean inTransaction() { return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0; } - + private void incrementOutstandingTxnsIfNecessary() { if(isTransactional()) @@ -216,7 +218,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel _txnCount.compareAndSet(0,1); } } - + private void decrementOutstandingTxnsIfNecessary() { if(isTransactional()) @@ -314,7 +316,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel try { _currentMessage.getStoredMessage().flushToStore(); - final ArrayList<? extends BaseQueue> destinationQueues = _currentMessage.getDestinationQueues(); if(!checkMessageUserId(_currentMessage.getContentHeader())) @@ -323,7 +324,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel } else { - if(destinationQueues == null || _currentMessage.getDestinationQueues().isEmpty()) + if(destinationQueues == null || destinationQueues.isEmpty()) { if (_currentMessage.isMandatory() || _currentMessage.isImmediate()) { @@ -331,7 +332,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel } else { - _logger.warn("MESSAGE DISCARDED: No routes for message - " + createAMQMessage(_currentMessage)); + _actor.message(ExchangeMessages.DISCARDMSG(_currentMessage.getExchange().asString(), _currentMessage.getRoutingKey())); } } @@ -385,6 +386,13 @@ public class AMQChannel implements SessionConfig, AMQSessionModel _currentMessage = null; throw e; } + catch (RuntimeException e) + { + // we want to make sure we don't keep a reference to the message in the + // event of an error + _currentMessage = null; + throw e; + } } protected void routeCurrentMessage() throws AMQException @@ -435,7 +443,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel { throw new AMQException("Consumer already exists with same tag: " + tag); } - + Subscription subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(_channelId, _session, tag, acks, filters, noLocal, _creditManager); @@ -456,6 +464,11 @@ public class AMQChannel implements SessionConfig, AMQSessionModel _tag2SubscriptionMap.remove(tag); throw e; } + catch (RuntimeException e) + { + _tag2SubscriptionMap.remove(tag); + throw e; + } return tag; } @@ -513,7 +526,11 @@ public class AMQChannel implements SessionConfig, AMQSessionModel } catch (AMQException e) { - _logger.error("Caught AMQException whilst attempting to reque:" + e); + _logger.error("Caught AMQException whilst attempting to requeue:" + e); + } + catch (TransportException e) + { + _logger.error("Caught TransportException whilst attempting to requeue:" + e); } getConfigStore().removeConfiguredObject(this); @@ -944,7 +961,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel finally { _rollingBack = false; - + _txnRejects.incrementAndGet(); _txnStarts.incrementAndGet(); decrementOutstandingTxnsIfNecessary(); @@ -1038,9 +1055,9 @@ public class AMQChannel implements SessionConfig, AMQSessionModel public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag) throws AMQException { + _session.registerMessageDelivered(entry.getMessage().getSize()); getProtocolSession().getProtocolOutputConverter().writeDeliver(entry, getChannelId(), deliveryTag, sub.getConsumerTag()); - _session.registerMessageDelivered(entry.getMessage().getSize()); } }; @@ -1083,7 +1100,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel ? ((BasicContentHeaderProperties) header.getProperties()).getUserId() : null; - return (!MSG_AUTH || _session.getPrincipal().getName().equals(userID == null? "" : userID.toString())); + return (!MSG_AUTH || _session.getAuthorizedPrincipal().getName().equals(userID == null? "" : userID.toString())); } @@ -1425,12 +1442,12 @@ public class AMQChannel implements SessionConfig, AMQSessionModel { return _createTime; } - + public void mgmtClose() throws AMQException { _session.mgmtCloseChannel(_channelId); } - + public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException { if (inTransaction()) |