summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java124
1 files changed, 122 insertions, 2 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index d8ec0d881e..bbc2396063 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -21,15 +21,31 @@
package org.apache.qpid.server.transport;
import org.apache.qpid.transport.*;
-import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.AMQException;
-import java.util.ArrayList;
+import java.util.*;
+import java.util.concurrent.ConcurrentSkipListMap;
+import static org.apache.qpid.util.Serial.*;
public class ServerSession extends Session
{
+
+
+ public static interface MessageDispositionChangeListener
+ {
+ public void onAccept();
+
+ public void onRelease();
+
+ public void onReject();
+ }
+
+
+ private final SortedMap<Integer, MessageDispositionChangeListener> _messageDispositionListenerMap =
+ new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>();
+
ServerSession(Connection connection, Binary name, long expiry)
{
super(connection, name, expiry);
@@ -58,8 +74,112 @@ public class ServerSession extends Session
}
}
+
public void sendMessage(MessageTransfer xfr)
{
invoke(xfr);
}
+
+ public void onMessageDispositionChange(MessageTransfer xfr, MessageDispositionChangeListener acceptListener)
+ {
+ _messageDispositionListenerMap.put(xfr.getId(), acceptListener);
+ }
+
+
+ private static interface MessageDispositionAction
+ {
+ void performAction(MessageDispositionChangeListener listener);
+ }
+
+ public void accept(RangeSet ranges)
+ {
+ dispositionChange(ranges, new MessageDispositionAction()
+ {
+ public void performAction(MessageDispositionChangeListener listener)
+ {
+ listener.onAccept();
+ }
+ });
+ }
+
+
+ public void release(RangeSet ranges)
+ {
+ dispositionChange(ranges, new MessageDispositionAction()
+ {
+ public void performAction(MessageDispositionChangeListener listener)
+ {
+ listener.onRelease();
+ }
+ });
+ }
+
+ public void reject(RangeSet ranges)
+ {
+ dispositionChange(ranges, new MessageDispositionAction()
+ {
+ public void performAction(MessageDispositionChangeListener listener)
+ {
+ listener.onReject();
+ }
+ });
+ }
+
+ public void dispositionChange(RangeSet ranges, MessageDispositionAction action)
+ {
+ if(!_messageDispositionListenerMap.isEmpty())
+ {
+ Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator();
+ Iterator<Range> rangeIter = ranges.iterator();
+
+ if(rangeIter.hasNext())
+ {
+ Range range = rangeIter.next();
+
+ while(range != null && unacceptedMessages.hasNext())
+ {
+ int next = unacceptedMessages.next();
+ while(gt(next, range.getUpper()))
+ {
+ if(rangeIter.hasNext())
+ {
+ range = rangeIter.next();
+ }
+ else
+ {
+ range = null;
+ break;
+ }
+ }
+ if(range != null && range.includes(next))
+ {
+ MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.remove(next);
+ action.performAction(changeListener);
+ }
+
+
+ }
+
+ }
+
+
+ }
+ }
+
+ public void removeDispositionListener(Method method)
+ {
+ _messageDispositionListenerMap.remove(method.getId());
+ }
+
+ public void releaseAll()
+ {
+ for(MessageDispositionChangeListener listener : _messageDispositionListenerMap.values())
+ {
+ listener.onRelease();
+ }
+ _messageDispositionListenerMap.clear();
+ }
+
+
+
}