diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java | 196 |
1 files changed, 195 insertions, 1 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index 318a240b27..f429d8ba9f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -20,21 +20,35 @@ */ package org.apache.qpid.server.protocol.v1_0; +import java.text.MessageFormat; +import java.util.Collection; +import org.apache.qpid.AMQException; +import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; import org.apache.qpid.amqp_1_0.transport.ConnectionEventListener; import org.apache.qpid.amqp_1_0.transport.SessionEndpoint; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT; + public class Connection_1_0 implements ConnectionEventListener { private IApplicationRegistry _appRegistry; private VirtualHost _vhost; + private final ConnectionEndpoint _conn; + private final long _connectionId; + private final Collection<Session_1_0> _sessions = Collections.synchronizedCollection(new ArrayList<Session_1_0>()); public static interface Task @@ -48,18 +62,27 @@ public class Connection_1_0 implements ConnectionEventListener - public Connection_1_0(IApplicationRegistry appRegistry) + public Connection_1_0(IApplicationRegistry appRegistry, ConnectionEndpoint conn, long connectionId) { _appRegistry = appRegistry; _vhost = _appRegistry.getVirtualHostRegistry().getDefaultVirtualHost(); + _conn = conn; + _connectionId = connectionId; + _vhost.getConnectionRegistry().registerConnection(_model); + } public void remoteSessionCreation(SessionEndpoint endpoint) { Session_1_0 session = new Session_1_0(_vhost, _appRegistry, this); + _sessions.add(session); endpoint.setSessionEventListener(session); } + void sessionEnded(Session_1_0 session) + { + _sessions.remove(session); + } void removeConnectionCloseTask(final Task task) { @@ -86,6 +109,8 @@ public class Connection_1_0 implements ConnectionEventListener { _closeTasks.clear(); } + _vhost.getConnectionRegistry().deregisterConnection(_model); + } @@ -94,5 +119,174 @@ public class Connection_1_0 implements ConnectionEventListener closeReceived(); } + private final AMQConnectionModel _model = new AMQConnectionModel() + { + private final StatisticsCounter _messageDeliveryStatistics = new StatisticsCounter(); + private final StatisticsCounter _messageReceiptStatistics = new StatisticsCounter(); + private final StatisticsCounter _dataDeliveryStatistics = new StatisticsCounter(); + private final StatisticsCounter _dataReceiptStatistics = new StatisticsCounter(); + + private final LogSubject _logSubject = new LogSubject() + { + @Override + public String toLogString() + { + return "[" + + MessageFormat.format(CONNECTION_FORMAT, + getConnectionId(), + getClientId(), + getRemoteAddressString(), + _vhost.getName()) + + "] "; + + } + }; + + @Override + public void close(AMQConstant cause, String message) throws AMQException + { + // TODO + } + + @Override + public void block() + { + // TODO + } + + @Override + public void unblock() + { + // TODO + } + + @Override + public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException + { + // TODO + } + + @Override + public long getConnectionId() + { + return _connectionId; + } + + @Override + public List<AMQSessionModel> getSessionModels() + { + return new ArrayList<AMQSessionModel>(_sessions); + } + + @Override + public LogSubject getLogSubject() + { + return _logSubject; + } + + @Override + public String getUserName() + { + return getPrincipalAsString(); + } + + @Override + public boolean isSessionNameUnique(byte[] name) + { + return true; // TODO + } + + @Override + public String getRemoteAddressString() + { + return String.valueOf(_conn.getRemoteAddress()); + } + + @Override + public String getClientId() + { + return _conn.getRemoteContainerId(); + } + + @Override + public String getClientVersion() + { + return ""; //TODO + } + + @Override + public String getPrincipalAsString() + { + return String.valueOf(_conn.getUser()); + } + + @Override + public long getSessionCountLimit() + { + return 0; // TODO + } + + @Override + public long getLastIoTime() + { + return 0; // TODO + } + + @Override + public void initialiseStatistics() + { + // TODO + } + + @Override + public void registerMessageReceived(long messageSize, long timestamp) + { + // TODO + } + + @Override + public void registerMessageDelivered(long messageSize) + { + // TODO + } + + @Override + public StatisticsCounter getMessageDeliveryStatistics() + { + return _messageDeliveryStatistics; + } + + @Override + public StatisticsCounter getMessageReceiptStatistics() + { + return _messageReceiptStatistics; + } + + @Override + public StatisticsCounter getDataDeliveryStatistics() + { + return _dataDeliveryStatistics; + } + + @Override + public StatisticsCounter getDataReceiptStatistics() + { + return _dataReceiptStatistics; + } + + @Override + public void resetStatistics() + { + // TODO + } + + + }; + + AMQConnectionModel getModel() + { + return _model; + } + } |