diff options
Diffstat (limited to 'java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java')
-rw-r--r-- | java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java | 57 |
1 files changed, 41 insertions, 16 deletions
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index ff7ce0a79d..9c012eb782 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -25,6 +25,7 @@ import java.security.Principal; import java.text.MessageFormat; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import javax.security.auth.Subject; @@ -42,6 +43,7 @@ import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.stats.StatisticsCounter; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.Connection; @@ -56,7 +58,8 @@ import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTIO import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT; -public class ServerConnection extends Connection implements AMQConnectionModel, LogSubject, AuthorizationHolder +public class ServerConnection extends Connection implements AMQConnectionModel<ServerConnection, ServerSession>, + LogSubject, AuthorizationHolder { private Runnable _onOpenTask; private AtomicBoolean _logClosed = new AtomicBoolean(false); @@ -72,6 +75,10 @@ public class ServerConnection extends Connection implements AMQConnectionModel, private AtomicLong _lastIoTime = new AtomicLong(); private boolean _blocking; private Transport _transport; + + private final CopyOnWriteArrayList<Action<? super ServerConnection>> _taskList = + new CopyOnWriteArrayList<Action<? super ServerConnection>>(); + private volatile boolean _stopped; public ServerConnection(final long connectionId, Broker broker) @@ -197,7 +204,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel, _onOpenTask = task; } - public void closeSession(AMQSessionModel session, AMQConstant cause, String message) + public void closeSession(ServerSession session, AMQConstant cause, String message) { ExecutionException ex = new ExecutionException(); ExecutionErrorCode code = ExecutionErrorCode.INTERNAL_ERROR; @@ -211,7 +218,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel, } ex.setErrorCode(code); ex.setDescription(message); - ((ServerSession)session).invoke(ex); + session.invoke(ex); session.close(cause, message); } @@ -315,6 +322,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel, public void close(AMQConstant cause, String message) { closeSubscriptions(); + performDeleteTasks(); ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL; try { @@ -327,6 +335,14 @@ public class ServerConnection extends Connection implements AMQConnectionModel, close(replyCode, message); } + protected void performDeleteTasks() + { + for(Action<? super ServerConnection> task : _taskList) + { + task.performAction(this); + } + } + public synchronized void block() { if(!_blocking) @@ -367,12 +383,12 @@ public class ServerConnection extends Connection implements AMQConnectionModel, super.removeSession(ssn); } - public List<AMQSessionModel> getSessionModels() + public List<ServerSession> getSessionModels() { - List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>(); + List<ServerSession> sessions = new ArrayList<ServerSession>(); for (Session ssn : getChannels()) { - sessions.add((AMQSessionModel) ssn); + sessions.add((ServerSession) ssn); } return sessions; } @@ -475,14 +491,10 @@ public class ServerConnection extends Connection implements AMQConnectionModel, return String.valueOf(getRemoteAddress()); } - public String getUserName() - { - return _authorizedPrincipal.getName(); - } - @Override public void closed() { + performDeleteTasks(); closeSubscriptions(); super.closed(); } @@ -522,6 +534,12 @@ public class ServerConnection extends Connection implements AMQConnectionModel, } @Override + public String getRemoteContainerName() + { + return getConnectionDelegate().getClientId(); + } + + @Override public String getClientVersion() { return getConnectionDelegate().getClientVersion(); @@ -533,11 +551,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel, return getConnectionDelegate().getClientProduct(); } - public String getPrincipalAsString() - { - return getAuthorizedPrincipal() == null ? null : getAuthorizedPrincipal().getName(); - } - public long getSessionCountLimit() { return getChannelMax(); @@ -565,4 +578,16 @@ public class ServerConnection extends Connection implements AMQConnectionModel, super.doHeartBeat(); } + + @Override + public void addDeleteTask(final Action<? super ServerConnection> task) + { + _taskList.add(task); + } + + @Override + public void removeDeleteTask(final Action<? super ServerConnection> task) + { + _taskList.remove(task); + } } |