summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java196
1 files changed, 195 insertions, 1 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
index 318a240b27..f429d8ba9f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
@@ -20,21 +20,35 @@
*/
package org.apache.qpid.server.protocol.v1_0;
+import java.text.MessageFormat;
+import java.util.Collection;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
import org.apache.qpid.amqp_1_0.transport.ConnectionEventListener;
import org.apache.qpid.amqp_1_0.transport.SessionEndpoint;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
+
public class Connection_1_0 implements ConnectionEventListener
{
private IApplicationRegistry _appRegistry;
private VirtualHost _vhost;
+ private final ConnectionEndpoint _conn;
+ private final long _connectionId;
+ private final Collection<Session_1_0> _sessions = Collections.synchronizedCollection(new ArrayList<Session_1_0>());
public static interface Task
@@ -48,18 +62,27 @@ public class Connection_1_0 implements ConnectionEventListener
- public Connection_1_0(IApplicationRegistry appRegistry)
+ public Connection_1_0(IApplicationRegistry appRegistry, ConnectionEndpoint conn, long connectionId)
{
_appRegistry = appRegistry;
_vhost = _appRegistry.getVirtualHostRegistry().getDefaultVirtualHost();
+ _conn = conn;
+ _connectionId = connectionId;
+ _vhost.getConnectionRegistry().registerConnection(_model);
+
}
public void remoteSessionCreation(SessionEndpoint endpoint)
{
Session_1_0 session = new Session_1_0(_vhost, _appRegistry, this);
+ _sessions.add(session);
endpoint.setSessionEventListener(session);
}
+ void sessionEnded(Session_1_0 session)
+ {
+ _sessions.remove(session);
+ }
void removeConnectionCloseTask(final Task task)
{
@@ -86,6 +109,8 @@ public class Connection_1_0 implements ConnectionEventListener
{
_closeTasks.clear();
}
+ _vhost.getConnectionRegistry().deregisterConnection(_model);
+
}
@@ -94,5 +119,174 @@ public class Connection_1_0 implements ConnectionEventListener
closeReceived();
}
+ private final AMQConnectionModel _model = new AMQConnectionModel()
+ {
+ private final StatisticsCounter _messageDeliveryStatistics = new StatisticsCounter();
+ private final StatisticsCounter _messageReceiptStatistics = new StatisticsCounter();
+ private final StatisticsCounter _dataDeliveryStatistics = new StatisticsCounter();
+ private final StatisticsCounter _dataReceiptStatistics = new StatisticsCounter();
+
+ private final LogSubject _logSubject = new LogSubject()
+ {
+ @Override
+ public String toLogString()
+ {
+ return "[" +
+ MessageFormat.format(CONNECTION_FORMAT,
+ getConnectionId(),
+ getClientId(),
+ getRemoteAddressString(),
+ _vhost.getName())
+ + "] ";
+
+ }
+ };
+
+ @Override
+ public void close(AMQConstant cause, String message) throws AMQException
+ {
+ // TODO
+ }
+
+ @Override
+ public void block()
+ {
+ // TODO
+ }
+
+ @Override
+ public void unblock()
+ {
+ // TODO
+ }
+
+ @Override
+ public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
+ {
+ // TODO
+ }
+
+ @Override
+ public long getConnectionId()
+ {
+ return _connectionId;
+ }
+
+ @Override
+ public List<AMQSessionModel> getSessionModels()
+ {
+ return new ArrayList<AMQSessionModel>(_sessions);
+ }
+
+ @Override
+ public LogSubject getLogSubject()
+ {
+ return _logSubject;
+ }
+
+ @Override
+ public String getUserName()
+ {
+ return getPrincipalAsString();
+ }
+
+ @Override
+ public boolean isSessionNameUnique(byte[] name)
+ {
+ return true; // TODO
+ }
+
+ @Override
+ public String getRemoteAddressString()
+ {
+ return String.valueOf(_conn.getRemoteAddress());
+ }
+
+ @Override
+ public String getClientId()
+ {
+ return _conn.getRemoteContainerId();
+ }
+
+ @Override
+ public String getClientVersion()
+ {
+ return ""; //TODO
+ }
+
+ @Override
+ public String getPrincipalAsString()
+ {
+ return String.valueOf(_conn.getUser());
+ }
+
+ @Override
+ public long getSessionCountLimit()
+ {
+ return 0; // TODO
+ }
+
+ @Override
+ public long getLastIoTime()
+ {
+ return 0; // TODO
+ }
+
+ @Override
+ public void initialiseStatistics()
+ {
+ // TODO
+ }
+
+ @Override
+ public void registerMessageReceived(long messageSize, long timestamp)
+ {
+ // TODO
+ }
+
+ @Override
+ public void registerMessageDelivered(long messageSize)
+ {
+ // TODO
+ }
+
+ @Override
+ public StatisticsCounter getMessageDeliveryStatistics()
+ {
+ return _messageDeliveryStatistics;
+ }
+
+ @Override
+ public StatisticsCounter getMessageReceiptStatistics()
+ {
+ return _messageReceiptStatistics;
+ }
+
+ @Override
+ public StatisticsCounter getDataDeliveryStatistics()
+ {
+ return _dataDeliveryStatistics;
+ }
+
+ @Override
+ public StatisticsCounter getDataReceiptStatistics()
+ {
+ return _dataReceiptStatistics;
+ }
+
+ @Override
+ public void resetStatistics()
+ {
+ // TODO
+ }
+
+
+ };
+
+ AMQConnectionModel getModel()
+ {
+ return _model;
+ }
+
}