summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java88
1 files changed, 65 insertions, 23 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
index 999d268181..9046175c84 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
@@ -43,7 +43,6 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.ConcurrentHashMap;
import java.util.ArrayList;
-import java.nio.ByteBuffer;
public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener
{
@@ -51,7 +50,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
private final Lock _stateChangeLock = new ReentrantLock();
private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE);
- private final AtomicReference<QueueEntry> _queueContext = new AtomicReference<QueueEntry>(null);
+ private AMQQueue.Context _queueContext;
private final AtomicBoolean _deleted = new AtomicBoolean(false);
@@ -76,16 +75,21 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
private final ServerSession _session;
private AtomicBoolean _stopped = new AtomicBoolean(true);
private ConcurrentHashMap<Integer, QueueEntry> _sentMap = new ConcurrentHashMap<Integer, QueueEntry>();
+ private static final Struct[] EMPTY_STRUCT_ARRAY = new Struct[0];
public Subscription_0_10(ServerSession session, String destination, MessageAcceptMode acceptMode,
- MessageAcquireMode acquireMode, FlowCreditManager_0_10 creditManager, FilterManager filters)
+ MessageAcquireMode acquireMode,
+ MessageFlowMode flowMode,
+ FlowCreditManager_0_10 creditManager,
+ FilterManager filters)
{
_session = session;
_destination = destination;
_acceptMode = acceptMode;
_acquireMode = acquireMode;
_creditManager = creditManager;
+ _flowMode = flowMode;
_filters = filters;
_creditManager.addStateListener(this);
_state.set(_creditManager.hasCredit() ? State.ACTIVE : State.SUSPENDED);
@@ -139,10 +143,11 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
- if (_noLocal)
+ if (_noLocal
+ && (entry.getMessage() instanceof MessageTransferMessage)
+ && ((MessageTransferMessage)entry.getMessage()).getSession() == _session)
{
-
-
+ return false;
}
@@ -241,18 +246,15 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
MessageTransferMessage msg = (MessageTransferMessage) serverMsg;
-
- MessageTransfer xfr = new MessageTransfer();
- xfr.setDestination(_destination);
- if(msg.getBody() != null)
+ Struct[] headers;
+ if(msg.getHeader() == null)
{
- xfr.setBody(msg.getBody());
+ headers = EMPTY_STRUCT_ARRAY;
+ }
+ else
+ {
+ headers = msg.getHeader().getStructs();
}
-
- xfr.setAcceptMode(_acceptMode);
- xfr.setAcquireMode(_acquireMode);
-
- Struct[] headers = msg.getHeader().getStructs();
ArrayList<Struct> newHeaders = new ArrayList<Struct>(headers.length);
DeliveryProperties origDeliveryProps = null;
@@ -297,11 +299,23 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
deliveryProps.setRedelivered(entry.isRedelivered());
newHeaders.add(deliveryProps);
- xfr.setHeader(new Header(newHeaders));
+ Header header = new Header(newHeaders);
+
+ MessageTransfer xfr = new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody());
if(_acceptMode == MessageAcceptMode.NONE)
{
- xfr.setCompletionListener(new MessageAcceptCompletionListener(this, _session, entry));
+ xfr.setCompletionListener(new MessageAcceptCompletionListener(this, _session, entry, _flowMode == MessageFlowMode.WINDOW));
+ }
+ else if(_flowMode == MessageFlowMode.WINDOW)
+ {
+ xfr.setCompletionListener(new Method.CompletionListener()
+ {
+ public void onComplete(Method method)
+ {
+ restoreCredit(entry);
+ }
+ });
}
@@ -329,6 +343,11 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
{
reject(entry);
}
+
+ public boolean acquire()
+ {
+ return entry.acquire(Subscription_0_10.this);
+ }
});
}
else
@@ -350,6 +369,14 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
reject(entry);
}
+ public boolean acquire()
+ {
+ boolean acquired = entry.acquire(Subscription_0_10.this);
+ _session.acknowledge(Subscription_0_10.this,entry);
+ return acquired;
+
+ }
+
});
}
@@ -367,6 +394,14 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
{
entry.setRedelivered(true);
entry.release();
+ try
+ {
+ entry.requeue(new StoreContext());
+ }
+ catch (AMQException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
}
public void queueDeleted(AMQQueue queue)
@@ -394,6 +429,11 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
_creditManager.restoreCredit(1, queueEntry.getSize());
}
+ public void onDequeue(QueueEntry queueEntry)
+ {
+
+ }
+
public void setStateListener(StateListener listener)
{
_stateListener = listener;
@@ -404,14 +444,14 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
return _state.get();
}
- public QueueEntry getLastSeenEntry()
+ public AMQQueue.Context getQueueContext()
{
- return _queueContext.get();
+ return _queueContext;
}
- public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue)
+ public void setQueueContext(AMQQueue.Context queueContext)
{
- return _queueContext.compareAndSet(expected, newValue);
+ _queueContext = queueContext;
}
public boolean isActive()
@@ -453,7 +493,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
creditManager.addCredit(value, 0L);
break;
case BYTE:
- creditManager.addCredit(0L, value);
+ creditManager.addCredit(0l, value);
break;
}
@@ -472,6 +512,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
public void setFlowMode(MessageFlowMode flowMode)
{
+
_creditManager.removeListener(this);
switch(flowMode)
@@ -485,6 +526,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
default:
throw new RuntimeException("Unknown message flow mode: " + flowMode);
}
+ _flowMode = flowMode;
if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
{
_stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);