summaryrefslogtreecommitdiff
path: root/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java')
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java102
1 files changed, 51 insertions, 51 deletions
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
index cae61f9d80..7f8237cc85 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol.v1_0;
+import java.security.Principal;
import java.text.MessageFormat;
import java.util.Collection;
import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
@@ -42,7 +43,7 @@ import java.util.List;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
-public class Connection_1_0 implements ConnectionEventListener
+public class Connection_1_0 implements ConnectionEventListener, AMQConnectionModel<Connection_1_0,Session_1_0>
{
private final Port _port;
@@ -53,8 +54,33 @@ public class Connection_1_0 implements ConnectionEventListener
private final Collection<Session_1_0> _sessions = Collections.synchronizedCollection(new ArrayList<Session_1_0>());
private final Object _reference = new Object();
- private List<Action<Connection_1_0>> _closeTasks =
- Collections.synchronizedList(new ArrayList<Action<Connection_1_0>>());
+
+ private StatisticsCounter _messageDeliveryStatistics = new StatisticsCounter();
+ private StatisticsCounter _messageReceiptStatistics = new StatisticsCounter();
+ private StatisticsCounter _dataDeliveryStatistics = new StatisticsCounter();
+ private StatisticsCounter _dataReceiptStatistics = new StatisticsCounter();
+
+ private final LogSubject _logSubject = new LogSubject()
+ {
+ @Override
+ public String toLogString()
+ {
+ return "[" +
+ MessageFormat.format(CONNECTION_FORMAT,
+ getConnectionId(),
+ getClientId(),
+ getRemoteAddressString(),
+ _vhost.getName())
+ + "] ";
+
+ }
+ };
+
+ private volatile boolean _stopped;
+
+
+ private List<Action<? super Connection_1_0>> _closeTasks =
+ Collections.synchronizedList(new ArrayList<Action<? super Connection_1_0>>());
@@ -69,7 +95,7 @@ public class Connection_1_0 implements ConnectionEventListener
_transport = transport;
_conn = conn;
_connectionId = connectionId;
- _vhost.getConnectionRegistry().registerConnection(_model);
+ _vhost.getConnectionRegistry().registerConnection(this);
}
@@ -80,7 +106,7 @@ public class Connection_1_0 implements ConnectionEventListener
public void remoteSessionCreation(SessionEndpoint endpoint)
{
- Session_1_0 session = new Session_1_0(_vhost, this);
+ Session_1_0 session = new Session_1_0(_vhost, this, endpoint);
_sessions.add(session);
endpoint.setSessionEventListener(session);
}
@@ -90,24 +116,24 @@ public class Connection_1_0 implements ConnectionEventListener
_sessions.remove(session);
}
- void removeConnectionCloseTask(final Action<Connection_1_0> task)
+ public void removeDeleteTask(final Action<? super Connection_1_0> task)
{
_closeTasks.remove( task );
}
- void addConnectionCloseTask(final Action<Connection_1_0> task)
+ public void addDeleteTask(final Action<? super Connection_1_0> task)
{
_closeTasks.add( task );
}
public void closeReceived()
{
- List<Action<Connection_1_0>> taskCopy;
+ List<Action<? super Connection_1_0>> taskCopy;
synchronized (_closeTasks)
{
- taskCopy = new ArrayList<Action<Connection_1_0>>(_closeTasks);
+ taskCopy = new ArrayList<Action<? super Connection_1_0>>(_closeTasks);
}
- for(Action<Connection_1_0> task : taskCopy)
+ for(Action<? super Connection_1_0> task : taskCopy)
{
task.performAction(this);
}
@@ -115,7 +141,7 @@ public class Connection_1_0 implements ConnectionEventListener
{
_closeTasks.clear();
}
- _vhost.getConnectionRegistry().deregisterConnection(_model);
+ _vhost.getConnectionRegistry().deregisterConnection(this);
}
@@ -125,30 +151,6 @@ public class Connection_1_0 implements ConnectionEventListener
closeReceived();
}
- private final AMQConnectionModel _model = new AMQConnectionModel()
- {
- private StatisticsCounter _messageDeliveryStatistics = new StatisticsCounter();
- private StatisticsCounter _messageReceiptStatistics = new StatisticsCounter();
- private StatisticsCounter _dataDeliveryStatistics = new StatisticsCounter();
- private StatisticsCounter _dataReceiptStatistics = new StatisticsCounter();
-
- private final LogSubject _logSubject = new LogSubject()
- {
- @Override
- public String toLogString()
- {
- return "[" +
- MessageFormat.format(CONNECTION_FORMAT,
- getConnectionId(),
- getClientId(),
- getRemoteAddressString(),
- _vhost.getName())
- + "] ";
-
- }
- };
-
- private volatile boolean _stopped;
@Override
public void close(AMQConstant cause, String message)
@@ -169,9 +171,9 @@ public class Connection_1_0 implements ConnectionEventListener
}
@Override
- public void closeSession(AMQSessionModel session, AMQConstant cause, String message)
+ public void closeSession(Session_1_0 session, AMQConstant cause, String message)
{
- // TODO
+ session.close(cause, message);
}
@Override
@@ -181,9 +183,9 @@ public class Connection_1_0 implements ConnectionEventListener
}
@Override
- public List<AMQSessionModel> getSessionModels()
+ public List<Session_1_0> getSessionModels()
{
- return new ArrayList<AMQSessionModel>(_sessions);
+ return new ArrayList<Session_1_0>(_sessions);
}
@Override
@@ -193,12 +195,6 @@ public class Connection_1_0 implements ConnectionEventListener
}
@Override
- public String getUserName()
- {
- return getPrincipalAsString();
- }
-
- @Override
public boolean isSessionNameUnique(byte[] name)
{
return true; // TODO
@@ -216,7 +212,13 @@ public class Connection_1_0 implements ConnectionEventListener
return _conn.getRemoteContainerId();
}
- @Override
+ @Override
+ public String getRemoteContainerName()
+ {
+ return _conn.getRemoteContainerId();
+ }
+
+ @Override
public String getClientVersion()
{
return ""; //TODO
@@ -228,10 +230,9 @@ public class Connection_1_0 implements ConnectionEventListener
return ""; //TODO
}
- @Override
- public String getPrincipalAsString()
+ public Principal getAuthorizedPrincipal()
{
- return String.valueOf(_conn.getUser());
+ return _conn.getUser();
}
@Override
@@ -337,11 +338,10 @@ public class Connection_1_0 implements ConnectionEventListener
}
- };
AMQConnectionModel getModel()
{
- return _model;
+ return this;
}