diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java | 33 |
1 files changed, 19 insertions, 14 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index c0db61cf3a..77dfbc0376 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -25,10 +25,10 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.subscription.Subscription_0_10; -import org.apache.qpid.server.txn.Transaction; +import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; -import org.apache.qpid.server.PrincipalHolder; +import org.apache.qpid.server.security.PrincipalHolder; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.AMQException; @@ -46,7 +46,6 @@ public class ServerSession extends Session implements PrincipalHolder { private static final String NULL_DESTINTATION = UUID.randomUUID().toString(); - public static interface MessageDispositionChangeListener { public void onAccept(); @@ -69,7 +68,7 @@ public class ServerSession extends Session implements PrincipalHolder private final SortedMap<Integer, MessageDispositionChangeListener> _messageDispositionListenerMap = new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>(); - private Transaction _transaction; + private ServerTransaction _transaction; private Principal _principal; @@ -97,10 +96,16 @@ public class ServerSession extends Session implements PrincipalHolder _reference = new WeakReference(this); } + @Override + protected boolean isFull(int id) + { + return isCommandsFull(id); + } + public void enqueue(final ServerMessage message, final ArrayList<AMQQueue> queues) { - _transaction.enqueue(queues,message, new Transaction.Action() + _transaction.enqueue(queues,message, new ServerTransaction.Action() { AMQQueue[] _queues = queues.toArray(new AMQQueue[queues.size()]); @@ -117,6 +122,7 @@ public class ServerSession extends Session implements PrincipalHolder { // TODO e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + throw new RuntimeException(e); } } } @@ -130,10 +136,11 @@ public class ServerSession extends Session implements PrincipalHolder } - - public void sendMessage(MessageTransfer xfr) + + public void sendMessage(MessageTransfer xfr, + Runnable postIdSettingAction) { - invoke(xfr); + invoke(xfr, postIdSettingAction); } public void onMessageDispositionChange(MessageTransfer xfr, MessageDispositionChangeListener acceptListener) @@ -231,12 +238,12 @@ public class ServerSession extends Session implements PrincipalHolder public void dispositionChange(RangeSet ranges, MessageDispositionAction action) { - if(!_messageDispositionListenerMap.isEmpty()) + if(ranges != null && !_messageDispositionListenerMap.isEmpty()) { Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator(); Iterator<Range> rangeIter = ranges.iterator(); - if(rangeIter.hasNext()) + if(rangeIter.hasNext()) { Range range = rangeIter.next(); @@ -266,7 +273,6 @@ public class ServerSession extends Session implements PrincipalHolder } - } } @@ -287,14 +293,14 @@ public class ServerSession extends Session implements PrincipalHolder for (Task task : _taskList) { task.doTask(this); - } + } } public void acknowledge(final Subscription_0_10 sub, final QueueEntry entry) { _transaction.dequeue(entry.getQueue(), entry.getMessage(), - new Transaction.Action() + new ServerTransaction.Action() { public void postCommit() @@ -307,7 +313,6 @@ public class ServerSession extends Session implements PrincipalHolder entry.release(); } }); - } public Collection<Subscription_0_10> getSubscriptions() |