diff options
Diffstat (limited to 'qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java')
-rw-r--r-- | qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java | 170 |
1 files changed, 126 insertions, 44 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index 8567be37f0..2280377fca 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.protocol.v0_10; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT; +import static org.apache.qpid.transport.Connection.State.CLOSING; import java.net.SocketAddress; import java.security.Principal; @@ -30,6 +31,8 @@ import java.security.PrivilegedAction; import java.text.MessageFormat; import java.util.ArrayList; import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -37,6 +40,7 @@ import java.util.concurrent.atomic.AtomicLong; import javax.security.auth.Subject; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.connection.ConnectionPrincipal; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.LogSubject; @@ -66,7 +70,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S { private final Broker<?> _broker; - private Runnable _onOpenTask; private AtomicBoolean _logClosed = new AtomicBoolean(false); private final Subject _authorizedSubject = new Subject(); @@ -75,20 +78,26 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S private final long _connectionId; private final Object _reference = new Object(); private VirtualHostImpl<?,?,?> _virtualHost; - private AmqpPort<?> _port; - private AtomicLong _lastIoTime = new AtomicLong(); + private final AmqpPort<?> _port; + private final AtomicLong _lastIoTime = new AtomicLong(); private boolean _blocking; - private Transport _transport; + private final Transport _transport; - private final CopyOnWriteArrayList<Action<? super ServerConnection>> _taskList = + private final CopyOnWriteArrayList<Action<? super ServerConnection>> _connectionCloseTaskList = new CopyOnWriteArrayList<Action<? super ServerConnection>>(); + private final Queue<Action<? super ServerConnection>> _asyncTaskList = + new ConcurrentLinkedQueue<>(); + private final CopyOnWriteArrayList<SessionModelListener> _sessionListeners = new CopyOnWriteArrayList<SessionModelListener>(); private volatile boolean _stopped; private int _messageCompressionThreshold; - private int _maxMessageSize; + private final int _maxMessageSize; + + private ServerProtocolEngine _serverProtocolEngine; + private boolean _ignoreFutureInput; public ServerConnection(final long connectionId, Broker<?> broker, @@ -140,10 +149,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S if (state == State.OPEN) { - if (_onOpenTask != null) - { - _onOpenTask.run(); - } getEventLogger().message(ConnectionMessages.OPEN(getClientId(), "0-10", getClientVersion(), @@ -189,6 +194,16 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S super.setConnectionDelegate(delegate); } + public ServerProtocolEngine getProtocolEngine() + { + return _serverProtocolEngine; + } + + public void setProtocolEngine(final ServerProtocolEngine serverProtocolEngine) + { + _serverProtocolEngine = serverProtocolEngine; + } + public VirtualHostImpl<?,?,?> getVirtualHost() { return _virtualHost; @@ -237,28 +252,32 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S return _stopped; } - public void onOpen(final Runnable task) - { - _onOpenTask = task; - } - - public void closeSession(ServerSession session, AMQConstant cause, String message) + public void closeSessionAsync(final ServerSession session, final AMQConstant cause, final String message) { - ExecutionException ex = new ExecutionException(); - ExecutionErrorCode code = ExecutionErrorCode.INTERNAL_ERROR; - try - { - code = ExecutionErrorCode.get(cause.getCode()); - } - catch (IllegalArgumentException iae) + addAsyncTask(new Action<ServerConnection>() { - // Ignore, already set to INTERNAL_ERROR - } - ex.setErrorCode(code); - ex.setDescription(message); - session.invoke(ex); - session.close(cause, message); + @Override + public void performAction(final ServerConnection conn) + { + ExecutionException ex = new ExecutionException(); + ExecutionErrorCode code = ExecutionErrorCode.INTERNAL_ERROR; + try + { + code = ExecutionErrorCode.get(cause.getCode()); + } + catch (IllegalArgumentException iae) + { + // Ignore, already set to INTERNAL_ERROR + } + ex.setErrorCode(code); + ex.setDescription(message); + session.invoke(ex); + + session.close(cause, message); + } + }); + } public LogSubject getLogSubject() @@ -355,25 +374,35 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S } } - public void close(AMQConstant cause, String message) + public void closeAsync(final AMQConstant cause, final String message) { - closeSubscriptions(); - performDeleteTasks(); - ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL; - try - { - replyCode = ConnectionCloseCode.get(cause.getCode()); - } - catch (IllegalArgumentException iae) + + addAsyncTask(new Action<ServerConnection>() { - // Ignore - } - close(replyCode, message); + @Override + public void performAction(final ServerConnection object) + { + closeSubscriptions(); + performDeleteTasks(); + + setState(CLOSING); + ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL; + try + { + replyCode = ConnectionCloseCode.get(cause.getCode()); + } + catch (IllegalArgumentException iae) + { + // Ignore + } + sendConnectionClose(replyCode, message); + } + }); } protected void performDeleteTasks() { - for(Action<? super ServerConnection> task : _taskList) + for(Action<? super ServerConnection> task : _connectionCloseTaskList) { task.performAction(this); } @@ -646,13 +675,19 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S @Override public void addDeleteTask(final Action<? super ServerConnection> task) { - _taskList.add(task); + _connectionCloseTaskList.add(task); + } + + private void addAsyncTask(final Action<ServerConnection> action) + { + _asyncTaskList.add(action); + notifyWork(); } @Override public void removeDeleteTask(final Action<? super ServerConnection> task) { - _taskList.remove(task); + _connectionCloseTaskList.remove(task); } public int getMessageCompressionThreshold() @@ -664,4 +699,51 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S { return _maxMessageSize; } + + public void transportStateChanged() + { + for (AMQSessionModel ssn : getSessionModels()) + { + ssn.transportStateChanged(); + } + } + + @Override + public void notifyWork() + { + _serverProtocolEngine.notifyWork(); + } + + + @Override + public boolean isMessageAssignmentSuspended() + { + return _serverProtocolEngine.isMessageAssignmentSuspended(); + } + + public void processPending() + { + while(_asyncTaskList.peek() != null) + { + Action<? super ServerConnection> asyncAction = _asyncTaskList.poll(); + asyncAction.performAction(this); + } + + for (AMQSessionModel session : getSessionModels()) + { + session.processPending(); + } + + } + + public void closeAndIgnoreFutureInput() + { + _ignoreFutureInput = true; + getSender().close(); + } + + public boolean isIgnoreFutureInput() + { + return _ignoreFutureInput; + } } |