summaryrefslogtreecommitdiff
path: root/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java')
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java57
1 files changed, 41 insertions, 16 deletions
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index ff7ce0a79d..9c012eb782 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -25,6 +25,7 @@ import java.security.Principal;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.Subject;
@@ -42,6 +43,7 @@ import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Connection;
@@ -56,7 +58,8 @@ import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTIO
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT;
-public class ServerConnection extends Connection implements AMQConnectionModel, LogSubject, AuthorizationHolder
+public class ServerConnection extends Connection implements AMQConnectionModel<ServerConnection, ServerSession>,
+ LogSubject, AuthorizationHolder
{
private Runnable _onOpenTask;
private AtomicBoolean _logClosed = new AtomicBoolean(false);
@@ -72,6 +75,10 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
private AtomicLong _lastIoTime = new AtomicLong();
private boolean _blocking;
private Transport _transport;
+
+ private final CopyOnWriteArrayList<Action<? super ServerConnection>> _taskList =
+ new CopyOnWriteArrayList<Action<? super ServerConnection>>();
+
private volatile boolean _stopped;
public ServerConnection(final long connectionId, Broker broker)
@@ -197,7 +204,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
_onOpenTask = task;
}
- public void closeSession(AMQSessionModel session, AMQConstant cause, String message)
+ public void closeSession(ServerSession session, AMQConstant cause, String message)
{
ExecutionException ex = new ExecutionException();
ExecutionErrorCode code = ExecutionErrorCode.INTERNAL_ERROR;
@@ -211,7 +218,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
}
ex.setErrorCode(code);
ex.setDescription(message);
- ((ServerSession)session).invoke(ex);
+ session.invoke(ex);
session.close(cause, message);
}
@@ -315,6 +322,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
public void close(AMQConstant cause, String message)
{
closeSubscriptions();
+ performDeleteTasks();
ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL;
try
{
@@ -327,6 +335,14 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
close(replyCode, message);
}
+ protected void performDeleteTasks()
+ {
+ for(Action<? super ServerConnection> task : _taskList)
+ {
+ task.performAction(this);
+ }
+ }
+
public synchronized void block()
{
if(!_blocking)
@@ -367,12 +383,12 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
super.removeSession(ssn);
}
- public List<AMQSessionModel> getSessionModels()
+ public List<ServerSession> getSessionModels()
{
- List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>();
+ List<ServerSession> sessions = new ArrayList<ServerSession>();
for (Session ssn : getChannels())
{
- sessions.add((AMQSessionModel) ssn);
+ sessions.add((ServerSession) ssn);
}
return sessions;
}
@@ -475,14 +491,10 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
return String.valueOf(getRemoteAddress());
}
- public String getUserName()
- {
- return _authorizedPrincipal.getName();
- }
-
@Override
public void closed()
{
+ performDeleteTasks();
closeSubscriptions();
super.closed();
}
@@ -522,6 +534,12 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
}
@Override
+ public String getRemoteContainerName()
+ {
+ return getConnectionDelegate().getClientId();
+ }
+
+ @Override
public String getClientVersion()
{
return getConnectionDelegate().getClientVersion();
@@ -533,11 +551,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
return getConnectionDelegate().getClientProduct();
}
- public String getPrincipalAsString()
- {
- return getAuthorizedPrincipal() == null ? null : getAuthorizedPrincipal().getName();
- }
-
public long getSessionCountLimit()
{
return getChannelMax();
@@ -565,4 +578,16 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
super.doHeartBeat();
}
+
+ @Override
+ public void addDeleteTask(final Action<? super ServerConnection> task)
+ {
+ _taskList.add(task);
+ }
+
+ @Override
+ public void removeDeleteTask(final Action<? super ServerConnection> task)
+ {
+ _taskList.remove(task);
+ }
}