summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java33
1 files changed, 19 insertions, 14 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index c0db61cf3a..77dfbc0376 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -25,10 +25,10 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.subscription.Subscription_0_10;
-import org.apache.qpid.server.txn.Transaction;
+import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
-import org.apache.qpid.server.PrincipalHolder;
+import org.apache.qpid.server.security.PrincipalHolder;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.AMQException;
@@ -46,7 +46,6 @@ public class ServerSession extends Session implements PrincipalHolder
{
private static final String NULL_DESTINTATION = UUID.randomUUID().toString();
-
public static interface MessageDispositionChangeListener
{
public void onAccept();
@@ -69,7 +68,7 @@ public class ServerSession extends Session implements PrincipalHolder
private final SortedMap<Integer, MessageDispositionChangeListener> _messageDispositionListenerMap =
new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>();
- private Transaction _transaction;
+ private ServerTransaction _transaction;
private Principal _principal;
@@ -97,10 +96,16 @@ public class ServerSession extends Session implements PrincipalHolder
_reference = new WeakReference(this);
}
+ @Override
+ protected boolean isFull(int id)
+ {
+ return isCommandsFull(id);
+ }
+
public void enqueue(final ServerMessage message, final ArrayList<AMQQueue> queues)
{
- _transaction.enqueue(queues,message, new Transaction.Action()
+ _transaction.enqueue(queues,message, new ServerTransaction.Action()
{
AMQQueue[] _queues = queues.toArray(new AMQQueue[queues.size()]);
@@ -117,6 +122,7 @@ public class ServerSession extends Session implements PrincipalHolder
{
// TODO
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ throw new RuntimeException(e);
}
}
}
@@ -130,10 +136,11 @@ public class ServerSession extends Session implements PrincipalHolder
}
-
- public void sendMessage(MessageTransfer xfr)
+
+ public void sendMessage(MessageTransfer xfr,
+ Runnable postIdSettingAction)
{
- invoke(xfr);
+ invoke(xfr, postIdSettingAction);
}
public void onMessageDispositionChange(MessageTransfer xfr, MessageDispositionChangeListener acceptListener)
@@ -231,12 +238,12 @@ public class ServerSession extends Session implements PrincipalHolder
public void dispositionChange(RangeSet ranges, MessageDispositionAction action)
{
- if(!_messageDispositionListenerMap.isEmpty())
+ if(ranges != null && !_messageDispositionListenerMap.isEmpty())
{
Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator();
Iterator<Range> rangeIter = ranges.iterator();
- if(rangeIter.hasNext())
+ if(rangeIter.hasNext())
{
Range range = rangeIter.next();
@@ -266,7 +273,6 @@ public class ServerSession extends Session implements PrincipalHolder
}
-
}
}
@@ -287,14 +293,14 @@ public class ServerSession extends Session implements PrincipalHolder
for (Task task : _taskList)
{
task.doTask(this);
- }
+ }
}
public void acknowledge(final Subscription_0_10 sub, final QueueEntry entry)
{
_transaction.dequeue(entry.getQueue(), entry.getMessage(),
- new Transaction.Action()
+ new ServerTransaction.Action()
{
public void postCommit()
@@ -307,7 +313,6 @@ public class ServerSession extends Session implements PrincipalHolder
entry.release();
}
});
-
}
public Collection<Subscription_0_10> getSubscriptions()