summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java')
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java80
1 files changed, 58 insertions, 22 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 223de4f84e..67204427fb 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -56,6 +56,7 @@ import org.apache.qpid.server.TransactionTimeoutHelper;
import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.ChannelMessages;
@@ -74,7 +75,7 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.AlreadyKnownDtxException;
@@ -136,6 +137,7 @@ public class ServerSession extends Session
private org.apache.qpid.server.model.Session<?> _modelObject;
private long _blockTime;
private long _blockingTimeout;
+ private boolean _wireBlockingState;
public static interface MessageDispositionChangeListener
{
@@ -188,7 +190,7 @@ public class ServerSession extends Session
@Override
public void doTimeoutAction(String reason)
{
- getConnectionModel().closeSession(ServerSession.this, AMQConstant.RESOURCE_ERROR, reason);
+ getConnectionModel().closeSessionAsync(ServerSession.this, AMQConstant.RESOURCE_ERROR, reason);
}
}, getVirtualHost());
@@ -207,10 +209,6 @@ public class ServerSession extends Session
if (state == State.OPEN)
{
getVirtualHost().getEventLogger().message(ChannelMessages.CREATE());
- if(_blocking.get())
- {
- invokeBlock();
- }
}
}
else
@@ -244,6 +242,17 @@ public class ServerSession extends Session
invoke(new MessageStop(""));
}
+ private void invokeUnblock()
+ {
+ MessageFlow mf = new MessageFlow();
+ mf.setUnit(MessageCreditUnit.MESSAGE);
+ mf.setDestination("");
+ _outstandingCredit.set(Integer.MAX_VALUE);
+ mf.setValue(Integer.MAX_VALUE);
+ invoke(mf);
+ }
+
+
@Override
protected boolean isFull(int id)
{
@@ -823,12 +832,11 @@ public class ServerSession extends Session
if(_blocking.compareAndSet(false,true))
{
+ getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(name));
if(getState() == State.OPEN)
{
- invokeBlock();
+ getConnection().notifyWork();
}
- _blockTime = System.currentTimeMillis();
- getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(name));
}
@@ -852,28 +860,30 @@ public class ServerSession extends Session
{
if(_blocking.compareAndSet(true,false) && !isClosing())
{
- _blockTime = 0l;
getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED());
- MessageFlow mf = new MessageFlow();
- mf.setUnit(MessageCreditUnit.MESSAGE);
- mf.setDestination("");
- _outstandingCredit.set(Integer.MAX_VALUE);
- mf.setValue(Integer.MAX_VALUE);
- invoke(mf);
-
-
+ getConnection().notifyWork();
}
}
}
+
boolean blockingTimeoutExceeded()
{
long blockTime = _blockTime;
- boolean b = _blocking.get() && blockTime != 0 && (System.currentTimeMillis() - blockTime) > _blockingTimeout;
+ boolean b = _wireBlockingState && blockTime != 0 && (System.currentTimeMillis() - blockTime) > _blockingTimeout;
return b;
}
@Override
+ public void transportStateChanged()
+ {
+ for(ConsumerTarget_0_10 consumerTarget : getSubscriptions())
+ {
+ consumerTarget.transportStateChanged();
+ }
+ }
+
+ @Override
public Object getConnectionReference()
{
return getConnection().getReference();
@@ -1002,17 +1012,17 @@ public class ServerSession extends Session
return _unfinishedCommandsQueue.isEmpty() ? null : _unfinishedCommandsQueue.getLast();
}
- public void recordFuture(final StoreFuture future, final ServerTransaction.Action action)
+ public void recordFuture(final FutureResult future, final ServerTransaction.Action action)
{
_unfinishedCommandsQueue.add(new AsyncCommand(future, action));
}
private static class AsyncCommand
{
- private final StoreFuture _future;
+ private final FutureResult _future;
private ServerTransaction.Action _action;
- public AsyncCommand(final StoreFuture future, final ServerTransaction.Action action)
+ public AsyncCommand(final FutureResult future, final ServerTransaction.Action action)
{
_future = future;
_action = action;
@@ -1125,6 +1135,32 @@ public class ServerSession extends Session
}
}
+ @Override
+ public void processPending()
+ {
+ boolean desiredBlockingState = _blocking.get();
+ if (desiredBlockingState != _wireBlockingState)
+ {
+ _wireBlockingState = desiredBlockingState;
+
+ if (desiredBlockingState)
+ {
+ invokeBlock();
+ }
+ else
+ {
+ invokeUnblock();
+ }
+ _blockTime = desiredBlockingState ? System.currentTimeMillis() : 0;
+ }
+
+
+ for(ConsumerTarget target : getSubscriptions())
+ {
+ target.processPending();
+ }
+ }
+
public final long getMaxUncommittedInMemorySize()
{