summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java18
1 files changed, 10 insertions, 8 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 4f86c82578..7cad0bc5c1 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -70,6 +70,7 @@ import org.apache.qpid.server.subscription.ClientDeliveryMethod;
import org.apache.qpid.server.subscription.RecordDeliveryMethod;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
+import org.apache.qpid.server.subscription.SubscriptionImpl;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -136,7 +137,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
private final AtomicBoolean _suspended = new AtomicBoolean(false);
private ServerTransaction _transaction;
-
+
private final AtomicLong _txnStarts = new AtomicLong(0);
private final AtomicLong _txnCommits = new AtomicLong(0);
private final AtomicLong _txnRejects = new AtomicLong(0);
@@ -199,7 +200,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
// theory
return !(_transaction instanceof AutoCommitTransaction);
}
-
+
private void incrementOutstandingTxnsIfNecessary()
{
if(isTransactional())
@@ -209,7 +210,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
_txnCount.compareAndSet(0,1);
}
}
-
+
private void decrementOutstandingTxnsIfNecessary()
{
if(isTransactional())
@@ -308,7 +309,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
try
{
_currentMessage.getStoredMessage().flushToStore();
-
+
final ArrayList<? extends BaseQueue> destinationQueues = _currentMessage.getDestinationQueues();
if(!checkMessageUserId(_currentMessage.getContentHeader()))
@@ -425,7 +426,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
{
throw new AMQException("Consumer already exists with same tag: " + tag);
}
-
+
Subscription subscription =
SubscriptionFactoryImpl.INSTANCE.createSubscription(_channelId, _session, tag, acks, filters, noLocal, _creditManager);
@@ -933,7 +934,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
finally
{
_rollingBack = false;
-
+
_txnRejects.incrementAndGet();
_txnStarts.incrementAndGet();
decrementOutstandingTxnsIfNecessary();
@@ -1017,7 +1018,8 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
throws AMQException
{
getProtocolSession().getProtocolOutputConverter().writeDeliver(entry, getChannelId(),
- deliveryTag, sub.getConsumerTag());
+ deliveryTag,
+ ((SubscriptionImpl)sub).getConsumerTag());
}
};
@@ -1402,7 +1404,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
{
return _createTime;
}
-
+
public void mgmtClose() throws AMQException
{
_session.mgmtCloseChannel(_channelId);