From 2ccde5b0be8aaa7a91110c1ac01f21d1abdb6dc6 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Thu, 2 Aug 2012 15:10:00 +0000 Subject: QPID-4183 : [Merge from trunk] Implement Session/ConnectionModel interfaces in AMQP 1.0 code to restore functionality git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.18@1368514 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/server/protocol/ProtocolEngine_1_0_0.java | 2 +- .../server/protocol/ProtocolEngine_1_0_0_SASL.java | 2 +- .../qpid/server/protocol/v1_0/Connection_1_0.java | 196 ++++++++++++++++++++- .../qpid/server/protocol/v1_0/Session_1_0.java | 163 ++++++++++++++++- .../server/protocol/v1_0/Subscription_1_0.java | 2 +- 5 files changed, 360 insertions(+), 5 deletions(-) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java index 0b8bdff5c9..e6282315c6 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java @@ -146,7 +146,7 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa _conn = new ConnectionEndpoint(container, asSaslServerProvider(_appRegistry.getAuthenticationManager( getLocalAddress()))); - _conn.setConnectionEventListener(new Connection_1_0(_appRegistry)); + _conn.setConnectionEventListener(new Connection_1_0(_appRegistry, _conn, _connectionId)); _conn.setFrameOutputHandler(this); _conn.setRemoteAddress(_network.getRemoteAddress()); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java index 876a8eb275..a48441bf30 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java @@ -166,7 +166,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut _conn = new ConnectionEndpoint(container, asSaslServerProvider(ApplicationRegistry.getInstance() .getAuthenticationManager(getLocalAddress()))); - _conn.setConnectionEventListener(new Connection_1_0(_appRegistry)); + _conn.setConnectionEventListener(new Connection_1_0(_appRegistry, _conn, _connectionId)); _conn.setRemoteAddress(getRemoteAddress()); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index 318a240b27..f429d8ba9f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -20,21 +20,35 @@ */ package org.apache.qpid.server.protocol.v1_0; +import java.text.MessageFormat; +import java.util.Collection; +import org.apache.qpid.AMQException; +import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; import org.apache.qpid.amqp_1_0.transport.ConnectionEventListener; import org.apache.qpid.amqp_1_0.transport.SessionEndpoint; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT; + public class Connection_1_0 implements ConnectionEventListener { private IApplicationRegistry _appRegistry; private VirtualHost _vhost; + private final ConnectionEndpoint _conn; + private final long _connectionId; + private final Collection _sessions = Collections.synchronizedCollection(new ArrayList()); public static interface Task @@ -48,18 +62,27 @@ public class Connection_1_0 implements ConnectionEventListener - public Connection_1_0(IApplicationRegistry appRegistry) + public Connection_1_0(IApplicationRegistry appRegistry, ConnectionEndpoint conn, long connectionId) { _appRegistry = appRegistry; _vhost = _appRegistry.getVirtualHostRegistry().getDefaultVirtualHost(); + _conn = conn; + _connectionId = connectionId; + _vhost.getConnectionRegistry().registerConnection(_model); + } public void remoteSessionCreation(SessionEndpoint endpoint) { Session_1_0 session = new Session_1_0(_vhost, _appRegistry, this); + _sessions.add(session); endpoint.setSessionEventListener(session); } + void sessionEnded(Session_1_0 session) + { + _sessions.remove(session); + } void removeConnectionCloseTask(final Task task) { @@ -86,6 +109,8 @@ public class Connection_1_0 implements ConnectionEventListener { _closeTasks.clear(); } + _vhost.getConnectionRegistry().deregisterConnection(_model); + } @@ -94,5 +119,174 @@ public class Connection_1_0 implements ConnectionEventListener closeReceived(); } + private final AMQConnectionModel _model = new AMQConnectionModel() + { + private final StatisticsCounter _messageDeliveryStatistics = new StatisticsCounter(); + private final StatisticsCounter _messageReceiptStatistics = new StatisticsCounter(); + private final StatisticsCounter _dataDeliveryStatistics = new StatisticsCounter(); + private final StatisticsCounter _dataReceiptStatistics = new StatisticsCounter(); + + private final LogSubject _logSubject = new LogSubject() + { + @Override + public String toLogString() + { + return "[" + + MessageFormat.format(CONNECTION_FORMAT, + getConnectionId(), + getClientId(), + getRemoteAddressString(), + _vhost.getName()) + + "] "; + + } + }; + + @Override + public void close(AMQConstant cause, String message) throws AMQException + { + // TODO + } + + @Override + public void block() + { + // TODO + } + + @Override + public void unblock() + { + // TODO + } + + @Override + public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException + { + // TODO + } + + @Override + public long getConnectionId() + { + return _connectionId; + } + + @Override + public List getSessionModels() + { + return new ArrayList(_sessions); + } + + @Override + public LogSubject getLogSubject() + { + return _logSubject; + } + + @Override + public String getUserName() + { + return getPrincipalAsString(); + } + + @Override + public boolean isSessionNameUnique(byte[] name) + { + return true; // TODO + } + + @Override + public String getRemoteAddressString() + { + return String.valueOf(_conn.getRemoteAddress()); + } + + @Override + public String getClientId() + { + return _conn.getRemoteContainerId(); + } + + @Override + public String getClientVersion() + { + return ""; //TODO + } + + @Override + public String getPrincipalAsString() + { + return String.valueOf(_conn.getUser()); + } + + @Override + public long getSessionCountLimit() + { + return 0; // TODO + } + + @Override + public long getLastIoTime() + { + return 0; // TODO + } + + @Override + public void initialiseStatistics() + { + // TODO + } + + @Override + public void registerMessageReceived(long messageSize, long timestamp) + { + // TODO + } + + @Override + public void registerMessageDelivered(long messageSize) + { + // TODO + } + + @Override + public StatisticsCounter getMessageDeliveryStatistics() + { + return _messageDeliveryStatistics; + } + + @Override + public StatisticsCounter getMessageReceiptStatistics() + { + return _messageReceiptStatistics; + } + + @Override + public StatisticsCounter getDataDeliveryStatistics() + { + return _dataDeliveryStatistics; + } + + @Override + public StatisticsCounter getDataReceiptStatistics() + { + return _dataReceiptStatistics; + } + + @Override + public void resetStatistics() + { + // TODO + } + + + }; + + AMQConnectionModel getModel() + { + return _model; + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index 09c56ec94f..999ffc55e5 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.protocol.v1_0; +import java.text.MessageFormat; import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint; import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint; @@ -35,18 +36,26 @@ import org.apache.qpid.amqp_1_0.type.transport.*; import org.apache.qpid.amqp_1_0.type.transport.Error; import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; +import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.transport.ServerConnection; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.*; -public class Session_1_0 implements SessionEventListener +import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT; + +public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSubject { private static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy"); private IApplicationRegistry _appRegistry; @@ -56,6 +65,7 @@ public class Session_1_0 implements SessionEventListener private final LinkedHashMap _openTransactions = new LinkedHashMap(); private final Connection_1_0 _connection; + private UUID _id = UUID.randomUUID(); public Session_1_0(VirtualHost vhost, IApplicationRegistry appRegistry, final Connection_1_0 connection) @@ -405,6 +415,8 @@ public class Session_1_0 implements SessionEventListener iter.remove(); } + _connection.sessionEnded(this); + } Integer binaryToInteger(final Binary txnId) @@ -443,4 +455,153 @@ public class Session_1_0 implements SessionEventListener public void forceEnd() { } + + @Override + public UUID getQMFId() + { + return _id; + } + + @Override + public AMQConnectionModel getConnectionModel() + { + return _connection.getModel(); + } + + @Override + public String getClientID() + { + // TODO + return ""; + } + + @Override + public void close() throws AMQException + { + // TODO - required for AMQSessionModel / management initiated closing + } + + @Override + public LogSubject getLogSubject() + { + return this; + } + + @Override + public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException + { + // TODO - required for AMQSessionModel / long running transaction detection + } + + @Override + public void block(AMQQueue queue) + { + // TODO - required for AMQSessionModel / producer side flow control + } + + @Override + public void unblock(AMQQueue queue) + { + // TODO - required for AMQSessionModel / producer side flow control + } + + @Override + public void block() + { + // TODO - required for AMQSessionModel / producer side flow control + } + + @Override + public void unblock() + { + // TODO - required for AMQSessionModel / producer side flow control + } + + @Override + public boolean getBlocking() + { + // TODO + return false; + } + + @Override + public boolean onSameConnection(InboundMessage inbound) + { + // TODO + return false; + } + + @Override + public int getUnacknowledgedMessageCount() + { + // TODO + return 0; + } + + @Override + public Long getTxnCount() + { + // TODO + return 0l; + } + + @Override + public Long getTxnStart() + { + // TODO + return 0l; + } + + @Override + public Long getTxnCommits() + { + // TODO + return 0l; + } + + @Override + public Long getTxnRejects() + { + // TODO + return 0l; + } + + @Override + public int getChannelId() + { + // TODO + return 0; + } + + @Override + public int getConsumerCount() + { + // TODO + return 0; + } + + @Override + public int compareTo(AMQSessionModel o) + { + return getQMFId().compareTo(o.getQMFId()); + } + + + + public String toLogString() + { + long connectionId = getConnectionModel().getConnectionId(); + + String remoteAddress = getConnectionModel().getRemoteAddressString(); + + return "[" + + MessageFormat.format(CHANNEL_FORMAT, + connectionId, + getClientID(), + remoteAddress, + _vhost.getName(), // TODO - virtual host + 0) // TODO - channel) + + "] "; + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java index ad8eafc850..8a3d3716c7 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java @@ -637,7 +637,7 @@ class Subscription_1_0 implements Subscription public AMQSessionModel getSessionModel() { // TODO - return null; + return getSession(); } @Override -- cgit v1.2.1