diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java | 88 |
1 files changed, 65 insertions, 23 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index 999d268181..9046175c84 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -43,7 +43,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.ConcurrentHashMap; import java.util.ArrayList; -import java.nio.ByteBuffer; public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener { @@ -51,7 +50,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr private final Lock _stateChangeLock = new ReentrantLock(); private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE); - private final AtomicReference<QueueEntry> _queueContext = new AtomicReference<QueueEntry>(null); + private AMQQueue.Context _queueContext; private final AtomicBoolean _deleted = new AtomicBoolean(false); @@ -76,16 +75,21 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr private final ServerSession _session; private AtomicBoolean _stopped = new AtomicBoolean(true); private ConcurrentHashMap<Integer, QueueEntry> _sentMap = new ConcurrentHashMap<Integer, QueueEntry>(); + private static final Struct[] EMPTY_STRUCT_ARRAY = new Struct[0]; public Subscription_0_10(ServerSession session, String destination, MessageAcceptMode acceptMode, - MessageAcquireMode acquireMode, FlowCreditManager_0_10 creditManager, FilterManager filters) + MessageAcquireMode acquireMode, + MessageFlowMode flowMode, + FlowCreditManager_0_10 creditManager, + FilterManager filters) { _session = session; _destination = destination; _acceptMode = acceptMode; _acquireMode = acquireMode; _creditManager = creditManager; + _flowMode = flowMode; _filters = filters; _creditManager.addStateListener(this); _state.set(_creditManager.hasCredit() ? State.ACTIVE : State.SUSPENDED); @@ -139,10 +143,11 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr - if (_noLocal) + if (_noLocal + && (entry.getMessage() instanceof MessageTransferMessage) + && ((MessageTransferMessage)entry.getMessage()).getSession() == _session) { - - + return false; } @@ -241,18 +246,15 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr MessageTransferMessage msg = (MessageTransferMessage) serverMsg; - - MessageTransfer xfr = new MessageTransfer(); - xfr.setDestination(_destination); - if(msg.getBody() != null) + Struct[] headers; + if(msg.getHeader() == null) { - xfr.setBody(msg.getBody()); + headers = EMPTY_STRUCT_ARRAY; + } + else + { + headers = msg.getHeader().getStructs(); } - - xfr.setAcceptMode(_acceptMode); - xfr.setAcquireMode(_acquireMode); - - Struct[] headers = msg.getHeader().getStructs(); ArrayList<Struct> newHeaders = new ArrayList<Struct>(headers.length); DeliveryProperties origDeliveryProps = null; @@ -297,11 +299,23 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr deliveryProps.setRedelivered(entry.isRedelivered()); newHeaders.add(deliveryProps); - xfr.setHeader(new Header(newHeaders)); + Header header = new Header(newHeaders); + + MessageTransfer xfr = new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody()); if(_acceptMode == MessageAcceptMode.NONE) { - xfr.setCompletionListener(new MessageAcceptCompletionListener(this, _session, entry)); + xfr.setCompletionListener(new MessageAcceptCompletionListener(this, _session, entry, _flowMode == MessageFlowMode.WINDOW)); + } + else if(_flowMode == MessageFlowMode.WINDOW) + { + xfr.setCompletionListener(new Method.CompletionListener() + { + public void onComplete(Method method) + { + restoreCredit(entry); + } + }); } @@ -329,6 +343,11 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr { reject(entry); } + + public boolean acquire() + { + return entry.acquire(Subscription_0_10.this); + } }); } else @@ -350,6 +369,14 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr reject(entry); } + public boolean acquire() + { + boolean acquired = entry.acquire(Subscription_0_10.this); + _session.acknowledge(Subscription_0_10.this,entry); + return acquired; + + } + }); } @@ -367,6 +394,14 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr { entry.setRedelivered(true); entry.release(); + try + { + entry.requeue(new StoreContext()); + } + catch (AMQException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } } public void queueDeleted(AMQQueue queue) @@ -394,6 +429,11 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr _creditManager.restoreCredit(1, queueEntry.getSize()); } + public void onDequeue(QueueEntry queueEntry) + { + + } + public void setStateListener(StateListener listener) { _stateListener = listener; @@ -404,14 +444,14 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr return _state.get(); } - public QueueEntry getLastSeenEntry() + public AMQQueue.Context getQueueContext() { - return _queueContext.get(); + return _queueContext; } - public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue) + public void setQueueContext(AMQQueue.Context queueContext) { - return _queueContext.compareAndSet(expected, newValue); + _queueContext = queueContext; } public boolean isActive() @@ -453,7 +493,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr creditManager.addCredit(value, 0L); break; case BYTE: - creditManager.addCredit(0L, value); + creditManager.addCredit(0l, value); break; } @@ -472,6 +512,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr public void setFlowMode(MessageFlowMode flowMode) { + _creditManager.removeListener(this); switch(flowMode) @@ -485,6 +526,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr default: throw new RuntimeException("Unknown message flow mode: " + flowMode); } + _flowMode = flowMode; if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED)) { _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); |