diff options
Diffstat (limited to 'qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java')
-rw-r--r-- | qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java | 86 |
1 files changed, 81 insertions, 5 deletions
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index 8e24d55da0..b515fda4a7 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -30,7 +30,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import javax.security.auth.Subject; @@ -51,6 +53,7 @@ import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.protocol.SessionModelListener; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; @@ -64,6 +67,7 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod private final AmqpPort<?> _port; private final Broker<?> _broker; private final SubjectCreator _subjectCreator; + private final ProtocolEngine_1_0_0_SASL _protocolEngine; private VirtualHostImpl _vhost; private final Transport _transport; private final ConnectionEndpoint _conn; @@ -98,15 +102,24 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod private List<Action<? super Connection_1_0>> _closeTasks = Collections.synchronizedList(new ArrayList<Action<? super Connection_1_0>>()); + + private final Queue<Action<? super Connection_1_0>> _asyncTaskList = + new ConcurrentLinkedQueue<>(); + + private boolean _closedOnOpen; + public Connection_1_0(Broker<?> broker, ConnectionEndpoint conn, long connectionId, AmqpPort<?> port, - Transport transport, final SubjectCreator subjectCreator) + Transport transport, + final SubjectCreator subjectCreator, + final ProtocolEngine_1_0_0_SASL protocolEngine) { + _protocolEngine = protocolEngine; _broker = broker; _port = port; _transport = transport; @@ -207,6 +220,13 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod _closeTasks.add( task ); } + private void addAsyncTask(final Action<Connection_1_0> action) + { + _asyncTaskList.add(action); + notifyWork(); + } + + public void closeReceived() { Collection<Session_1_0> sessions = new ArrayList(_sessions); @@ -245,9 +265,19 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod @Override - public void close(AMQConstant cause, String message) + public void closeAsync(AMQConstant cause, String message) { - _conn.close(); + Action<Connection_1_0> action = new Action<Connection_1_0>() + { + @Override + public void performAction(final Connection_1_0 object) + { + _conn.close(); + + } + }; + addAsyncTask(action); + } @Override @@ -263,9 +293,16 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod } @Override - public void closeSession(Session_1_0 session, AMQConstant cause, String message) + public void closeSessionAsync(final Session_1_0 session, final AMQConstant cause, final String message) { - session.close(cause, message); + addAsyncTask(new Action<Connection_1_0>() + { + @Override + public void performAction(final Connection_1_0 object) + { + session.close(cause, message); + } + }); } @Override @@ -363,6 +400,11 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod return _port; } + public ServerProtocolEngine getProtocolEngine() + { + return _protocolEngine; + } + @Override public Transport getTransport() { @@ -480,4 +522,38 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod } + public void transportStateChanged() + { + for (Session_1_0 session : _sessions) + { + session.transportStateChanged(); + } + } + + @Override + public void notifyWork() + { + _protocolEngine.notifyWork(); + } + + @Override + public boolean isMessageAssignmentSuspended() + { + return _protocolEngine.isMessageAssignmentSuspended(); + } + + public void processPending() + { + while(_asyncTaskList.peek() != null) + { + Action<? super Connection_1_0> asyncAction = _asyncTaskList.poll(); + asyncAction.performAction(this); + } + + for (AMQSessionModel session : getSessionModels()) + { + session.processPending(); + } + + } } |