diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java | 124 |
1 files changed, 122 insertions, 2 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index d8ec0d881e..bbc2396063 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -21,15 +21,31 @@ package org.apache.qpid.server.transport; import org.apache.qpid.transport.*; -import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.AMQException; -import java.util.ArrayList; +import java.util.*; +import java.util.concurrent.ConcurrentSkipListMap; +import static org.apache.qpid.util.Serial.*; public class ServerSession extends Session { + + + public static interface MessageDispositionChangeListener + { + public void onAccept(); + + public void onRelease(); + + public void onReject(); + } + + + private final SortedMap<Integer, MessageDispositionChangeListener> _messageDispositionListenerMap = + new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>(); + ServerSession(Connection connection, Binary name, long expiry) { super(connection, name, expiry); @@ -58,8 +74,112 @@ public class ServerSession extends Session } } + public void sendMessage(MessageTransfer xfr) { invoke(xfr); } + + public void onMessageDispositionChange(MessageTransfer xfr, MessageDispositionChangeListener acceptListener) + { + _messageDispositionListenerMap.put(xfr.getId(), acceptListener); + } + + + private static interface MessageDispositionAction + { + void performAction(MessageDispositionChangeListener listener); + } + + public void accept(RangeSet ranges) + { + dispositionChange(ranges, new MessageDispositionAction() + { + public void performAction(MessageDispositionChangeListener listener) + { + listener.onAccept(); + } + }); + } + + + public void release(RangeSet ranges) + { + dispositionChange(ranges, new MessageDispositionAction() + { + public void performAction(MessageDispositionChangeListener listener) + { + listener.onRelease(); + } + }); + } + + public void reject(RangeSet ranges) + { + dispositionChange(ranges, new MessageDispositionAction() + { + public void performAction(MessageDispositionChangeListener listener) + { + listener.onReject(); + } + }); + } + + public void dispositionChange(RangeSet ranges, MessageDispositionAction action) + { + if(!_messageDispositionListenerMap.isEmpty()) + { + Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator(); + Iterator<Range> rangeIter = ranges.iterator(); + + if(rangeIter.hasNext()) + { + Range range = rangeIter.next(); + + while(range != null && unacceptedMessages.hasNext()) + { + int next = unacceptedMessages.next(); + while(gt(next, range.getUpper())) + { + if(rangeIter.hasNext()) + { + range = rangeIter.next(); + } + else + { + range = null; + break; + } + } + if(range != null && range.includes(next)) + { + MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.remove(next); + action.performAction(changeListener); + } + + + } + + } + + + } + } + + public void removeDispositionListener(Method method) + { + _messageDispositionListenerMap.remove(method.getId()); + } + + public void releaseAll() + { + for(MessageDispositionChangeListener listener : _messageDispositionListenerMap.values()) + { + listener.onRelease(); + } + _messageDispositionListenerMap.clear(); + } + + + } |