summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java45
1 files changed, 31 insertions, 14 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 08eb05680c..34bc57a826 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -50,6 +50,7 @@ import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.AMQPChannelActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
+import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.message.MessageMetaData;
@@ -75,6 +76,7 @@ import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.TransportException;
import java.util.ArrayList;
import java.util.Collection;
@@ -137,7 +139,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
private final AtomicBoolean _suspended = new AtomicBoolean(false);
private ServerTransaction _transaction;
-
+
private final AtomicLong _txnStarts = new AtomicLong(0);
private final AtomicLong _txnCommits = new AtomicLong(0);
private final AtomicLong _txnRejects = new AtomicLong(0);
@@ -201,12 +203,12 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
// theory
return !(_transaction instanceof AutoCommitTransaction);
}
-
+
public boolean inTransaction()
{
return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0;
}
-
+
private void incrementOutstandingTxnsIfNecessary()
{
if(isTransactional())
@@ -216,7 +218,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
_txnCount.compareAndSet(0,1);
}
}
-
+
private void decrementOutstandingTxnsIfNecessary()
{
if(isTransactional())
@@ -314,7 +316,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
try
{
_currentMessage.getStoredMessage().flushToStore();
-
final ArrayList<? extends BaseQueue> destinationQueues = _currentMessage.getDestinationQueues();
if(!checkMessageUserId(_currentMessage.getContentHeader()))
@@ -323,7 +324,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
}
else
{
- if(destinationQueues == null || _currentMessage.getDestinationQueues().isEmpty())
+ if(destinationQueues == null || destinationQueues.isEmpty())
{
if (_currentMessage.isMandatory() || _currentMessage.isImmediate())
{
@@ -331,7 +332,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
}
else
{
- _logger.warn("MESSAGE DISCARDED: No routes for message - " + createAMQMessage(_currentMessage));
+ _actor.message(ExchangeMessages.DISCARDMSG(_currentMessage.getExchange().asString(), _currentMessage.getRoutingKey()));
}
}
@@ -385,6 +386,13 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
_currentMessage = null;
throw e;
}
+ catch (RuntimeException e)
+ {
+ // we want to make sure we don't keep a reference to the message in the
+ // event of an error
+ _currentMessage = null;
+ throw e;
+ }
}
protected void routeCurrentMessage() throws AMQException
@@ -435,7 +443,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
{
throw new AMQException("Consumer already exists with same tag: " + tag);
}
-
+
Subscription subscription =
SubscriptionFactoryImpl.INSTANCE.createSubscription(_channelId, _session, tag, acks, filters, noLocal, _creditManager);
@@ -456,6 +464,11 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
_tag2SubscriptionMap.remove(tag);
throw e;
}
+ catch (RuntimeException e)
+ {
+ _tag2SubscriptionMap.remove(tag);
+ throw e;
+ }
return tag;
}
@@ -513,7 +526,11 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
}
catch (AMQException e)
{
- _logger.error("Caught AMQException whilst attempting to reque:" + e);
+ _logger.error("Caught AMQException whilst attempting to requeue:" + e);
+ }
+ catch (TransportException e)
+ {
+ _logger.error("Caught TransportException whilst attempting to requeue:" + e);
}
getConfigStore().removeConfiguredObject(this);
@@ -944,7 +961,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
finally
{
_rollingBack = false;
-
+
_txnRejects.incrementAndGet();
_txnStarts.incrementAndGet();
decrementOutstandingTxnsIfNecessary();
@@ -1038,9 +1055,9 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
throws AMQException
{
+ _session.registerMessageDelivered(entry.getMessage().getSize());
getProtocolSession().getProtocolOutputConverter().writeDeliver(entry, getChannelId(),
deliveryTag, sub.getConsumerTag());
- _session.registerMessageDelivered(entry.getMessage().getSize());
}
};
@@ -1083,7 +1100,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
? ((BasicContentHeaderProperties) header.getProperties()).getUserId()
: null;
- return (!MSG_AUTH || _session.getPrincipal().getName().equals(userID == null? "" : userID.toString()));
+ return (!MSG_AUTH || _session.getAuthorizedPrincipal().getName().equals(userID == null? "" : userID.toString()));
}
@@ -1425,12 +1442,12 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
{
return _createTime;
}
-
+
public void mgmtClose() throws AMQException
{
_session.mgmtCloseChannel(_channelId);
}
-
+
public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
{
if (inTransaction())