summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-08-02 15:10:00 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-08-02 15:10:00 +0000
commit2ccde5b0be8aaa7a91110c1ac01f21d1abdb6dc6 (patch)
treeec3a12bcf13efbb3f6de4e399234b51e7298260d
parent1dae8306e77a0914bee371f582852806d2b3b3b1 (diff)
downloadqpid-python-2ccde5b0be8aaa7a91110c1ac01f21d1abdb6dc6.tar.gz
QPID-4183 : [Merge from trunk] Implement Session/ConnectionModel interfaces in AMQP 1.0 code to restore functionality
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.18@1368514 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java196
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java163
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java2
5 files changed, 360 insertions, 5 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
index 0b8bdff5c9..e6282315c6 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
@@ -146,7 +146,7 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa
_conn = new ConnectionEndpoint(container, asSaslServerProvider(_appRegistry.getAuthenticationManager(
getLocalAddress())));
- _conn.setConnectionEventListener(new Connection_1_0(_appRegistry));
+ _conn.setConnectionEventListener(new Connection_1_0(_appRegistry, _conn, _connectionId));
_conn.setFrameOutputHandler(this);
_conn.setRemoteAddress(_network.getRemoteAddress());
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
index 876a8eb275..a48441bf30 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
@@ -166,7 +166,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
_conn = new ConnectionEndpoint(container, asSaslServerProvider(ApplicationRegistry.getInstance()
.getAuthenticationManager(getLocalAddress())));
- _conn.setConnectionEventListener(new Connection_1_0(_appRegistry));
+ _conn.setConnectionEventListener(new Connection_1_0(_appRegistry, _conn, _connectionId));
_conn.setRemoteAddress(getRemoteAddress());
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
index 318a240b27..f429d8ba9f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
@@ -20,21 +20,35 @@
*/
package org.apache.qpid.server.protocol.v1_0;
+import java.text.MessageFormat;
+import java.util.Collection;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
import org.apache.qpid.amqp_1_0.transport.ConnectionEventListener;
import org.apache.qpid.amqp_1_0.transport.SessionEndpoint;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
+
public class Connection_1_0 implements ConnectionEventListener
{
private IApplicationRegistry _appRegistry;
private VirtualHost _vhost;
+ private final ConnectionEndpoint _conn;
+ private final long _connectionId;
+ private final Collection<Session_1_0> _sessions = Collections.synchronizedCollection(new ArrayList<Session_1_0>());
public static interface Task
@@ -48,18 +62,27 @@ public class Connection_1_0 implements ConnectionEventListener
- public Connection_1_0(IApplicationRegistry appRegistry)
+ public Connection_1_0(IApplicationRegistry appRegistry, ConnectionEndpoint conn, long connectionId)
{
_appRegistry = appRegistry;
_vhost = _appRegistry.getVirtualHostRegistry().getDefaultVirtualHost();
+ _conn = conn;
+ _connectionId = connectionId;
+ _vhost.getConnectionRegistry().registerConnection(_model);
+
}
public void remoteSessionCreation(SessionEndpoint endpoint)
{
Session_1_0 session = new Session_1_0(_vhost, _appRegistry, this);
+ _sessions.add(session);
endpoint.setSessionEventListener(session);
}
+ void sessionEnded(Session_1_0 session)
+ {
+ _sessions.remove(session);
+ }
void removeConnectionCloseTask(final Task task)
{
@@ -86,6 +109,8 @@ public class Connection_1_0 implements ConnectionEventListener
{
_closeTasks.clear();
}
+ _vhost.getConnectionRegistry().deregisterConnection(_model);
+
}
@@ -94,5 +119,174 @@ public class Connection_1_0 implements ConnectionEventListener
closeReceived();
}
+ private final AMQConnectionModel _model = new AMQConnectionModel()
+ {
+ private final StatisticsCounter _messageDeliveryStatistics = new StatisticsCounter();
+ private final StatisticsCounter _messageReceiptStatistics = new StatisticsCounter();
+ private final StatisticsCounter _dataDeliveryStatistics = new StatisticsCounter();
+ private final StatisticsCounter _dataReceiptStatistics = new StatisticsCounter();
+
+ private final LogSubject _logSubject = new LogSubject()
+ {
+ @Override
+ public String toLogString()
+ {
+ return "[" +
+ MessageFormat.format(CONNECTION_FORMAT,
+ getConnectionId(),
+ getClientId(),
+ getRemoteAddressString(),
+ _vhost.getName())
+ + "] ";
+
+ }
+ };
+
+ @Override
+ public void close(AMQConstant cause, String message) throws AMQException
+ {
+ // TODO
+ }
+
+ @Override
+ public void block()
+ {
+ // TODO
+ }
+
+ @Override
+ public void unblock()
+ {
+ // TODO
+ }
+
+ @Override
+ public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
+ {
+ // TODO
+ }
+
+ @Override
+ public long getConnectionId()
+ {
+ return _connectionId;
+ }
+
+ @Override
+ public List<AMQSessionModel> getSessionModels()
+ {
+ return new ArrayList<AMQSessionModel>(_sessions);
+ }
+
+ @Override
+ public LogSubject getLogSubject()
+ {
+ return _logSubject;
+ }
+
+ @Override
+ public String getUserName()
+ {
+ return getPrincipalAsString();
+ }
+
+ @Override
+ public boolean isSessionNameUnique(byte[] name)
+ {
+ return true; // TODO
+ }
+
+ @Override
+ public String getRemoteAddressString()
+ {
+ return String.valueOf(_conn.getRemoteAddress());
+ }
+
+ @Override
+ public String getClientId()
+ {
+ return _conn.getRemoteContainerId();
+ }
+
+ @Override
+ public String getClientVersion()
+ {
+ return ""; //TODO
+ }
+
+ @Override
+ public String getPrincipalAsString()
+ {
+ return String.valueOf(_conn.getUser());
+ }
+
+ @Override
+ public long getSessionCountLimit()
+ {
+ return 0; // TODO
+ }
+
+ @Override
+ public long getLastIoTime()
+ {
+ return 0; // TODO
+ }
+
+ @Override
+ public void initialiseStatistics()
+ {
+ // TODO
+ }
+
+ @Override
+ public void registerMessageReceived(long messageSize, long timestamp)
+ {
+ // TODO
+ }
+
+ @Override
+ public void registerMessageDelivered(long messageSize)
+ {
+ // TODO
+ }
+
+ @Override
+ public StatisticsCounter getMessageDeliveryStatistics()
+ {
+ return _messageDeliveryStatistics;
+ }
+
+ @Override
+ public StatisticsCounter getMessageReceiptStatistics()
+ {
+ return _messageReceiptStatistics;
+ }
+
+ @Override
+ public StatisticsCounter getDataDeliveryStatistics()
+ {
+ return _dataDeliveryStatistics;
+ }
+
+ @Override
+ public StatisticsCounter getDataReceiptStatistics()
+ {
+ return _dataReceiptStatistics;
+ }
+
+ @Override
+ public void resetStatistics()
+ {
+ // TODO
+ }
+
+
+ };
+
+ AMQConnectionModel getModel()
+ {
+ return _model;
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 09c56ec94f..999ffc55e5 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol.v1_0;
+import java.text.MessageFormat;
import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
@@ -35,18 +36,26 @@ import org.apache.qpid.amqp_1_0.type.transport.*;
import org.apache.qpid.amqp_1_0.type.transport.Error;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.transport.ServerConnection;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.*;
-public class Session_1_0 implements SessionEventListener
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
+
+public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSubject
{
private static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
private IApplicationRegistry _appRegistry;
@@ -56,6 +65,7 @@ public class Session_1_0 implements SessionEventListener
private final LinkedHashMap<Integer, ServerTransaction> _openTransactions =
new LinkedHashMap<Integer, ServerTransaction>();
private final Connection_1_0 _connection;
+ private UUID _id = UUID.randomUUID();
public Session_1_0(VirtualHost vhost, IApplicationRegistry appRegistry, final Connection_1_0 connection)
@@ -405,6 +415,8 @@ public class Session_1_0 implements SessionEventListener
iter.remove();
}
+ _connection.sessionEnded(this);
+
}
Integer binaryToInteger(final Binary txnId)
@@ -443,4 +455,153 @@ public class Session_1_0 implements SessionEventListener
public void forceEnd()
{
}
+
+ @Override
+ public UUID getQMFId()
+ {
+ return _id;
+ }
+
+ @Override
+ public AMQConnectionModel getConnectionModel()
+ {
+ return _connection.getModel();
+ }
+
+ @Override
+ public String getClientID()
+ {
+ // TODO
+ return "";
+ }
+
+ @Override
+ public void close() throws AMQException
+ {
+ // TODO - required for AMQSessionModel / management initiated closing
+ }
+
+ @Override
+ public LogSubject getLogSubject()
+ {
+ return this;
+ }
+
+ @Override
+ public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
+ {
+ // TODO - required for AMQSessionModel / long running transaction detection
+ }
+
+ @Override
+ public void block(AMQQueue queue)
+ {
+ // TODO - required for AMQSessionModel / producer side flow control
+ }
+
+ @Override
+ public void unblock(AMQQueue queue)
+ {
+ // TODO - required for AMQSessionModel / producer side flow control
+ }
+
+ @Override
+ public void block()
+ {
+ // TODO - required for AMQSessionModel / producer side flow control
+ }
+
+ @Override
+ public void unblock()
+ {
+ // TODO - required for AMQSessionModel / producer side flow control
+ }
+
+ @Override
+ public boolean getBlocking()
+ {
+ // TODO
+ return false;
+ }
+
+ @Override
+ public boolean onSameConnection(InboundMessage inbound)
+ {
+ // TODO
+ return false;
+ }
+
+ @Override
+ public int getUnacknowledgedMessageCount()
+ {
+ // TODO
+ return 0;
+ }
+
+ @Override
+ public Long getTxnCount()
+ {
+ // TODO
+ return 0l;
+ }
+
+ @Override
+ public Long getTxnStart()
+ {
+ // TODO
+ return 0l;
+ }
+
+ @Override
+ public Long getTxnCommits()
+ {
+ // TODO
+ return 0l;
+ }
+
+ @Override
+ public Long getTxnRejects()
+ {
+ // TODO
+ return 0l;
+ }
+
+ @Override
+ public int getChannelId()
+ {
+ // TODO
+ return 0;
+ }
+
+ @Override
+ public int getConsumerCount()
+ {
+ // TODO
+ return 0;
+ }
+
+ @Override
+ public int compareTo(AMQSessionModel o)
+ {
+ return getQMFId().compareTo(o.getQMFId());
+ }
+
+
+
+ public String toLogString()
+ {
+ long connectionId = getConnectionModel().getConnectionId();
+
+ String remoteAddress = getConnectionModel().getRemoteAddressString();
+
+ return "[" +
+ MessageFormat.format(CHANNEL_FORMAT,
+ connectionId,
+ getClientID(),
+ remoteAddress,
+ _vhost.getName(), // TODO - virtual host
+ 0) // TODO - channel)
+ + "] ";
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
index ad8eafc850..8a3d3716c7 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
@@ -637,7 +637,7 @@ class Subscription_1_0 implements Subscription
public AMQSessionModel getSessionModel()
{
// TODO
- return null;
+ return getSession();
}
@Override