diff options
author | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-10-03 16:00:24 +0000 |
---|---|---|
committer | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-10-03 16:00:24 +0000 |
commit | a5b1a1073e2596da8b5fbcd24769aec87107d212 (patch) | |
tree | 66ae04f95841345d6bbd06a1fb303c27ece65902 | |
parent | 98147863bc1e7816eedf6c957d96390ae35ebc60 (diff) | |
download | qpid-python-a5b1a1073e2596da8b5fbcd24769aec87107d212.tar.gz |
QPID-2835 Implement CON Operational Logging on 0-10
Committed patch from SorinS <ssuciu@gmail.com>
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1003984 13f79535-47bb-0310-9956-ffa450edef68
14 files changed, 129 insertions, 64 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/GenericActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/GenericActor.java index 09db81e7e5..9afc76ce78 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/GenericActor.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/GenericActor.java @@ -72,7 +72,7 @@ public class GenericActor extends AbstractActor { public String toLogString() { - return "[" + subjectMessage + "]"; + return "[" + subjectMessage + "] "; } }, _defaultMessageLogger); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java index 448c8508a5..bcda385f64 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java @@ -35,4 +35,5 @@ public interface AMQConnectionModel */ public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException; + public long getConnectionId(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index c55c07a145..5368dfe532 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -1151,6 +1151,11 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol return _id; } + public long getConnectionId() + { + return getSessionID(); + } + public String getAddress() { return String.valueOf(getRemoteAddress()); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index e894dda341..eb957ee33c 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -20,8 +20,8 @@ */ package org.apache.qpid.server.protocol; -import org.apache.log4j.Logger; +import org.apache.log4j.Logger; import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory.VERSION; import org.apache.qpid.server.registry.IApplicationRegistry; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java index f1e79839c9..1fe4ec792e 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java @@ -22,12 +22,13 @@ package org.apache.qpid.server.protocol; import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.transport.NetworkDriver; -import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.network.InputHandler; import org.apache.qpid.transport.network.Assembler; import org.apache.qpid.transport.network.Disassembler; import org.apache.qpid.server.configuration.*; import org.apache.qpid.server.transport.ServerConnection; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.registry.IApplicationRegistry; import java.net.SocketAddress; @@ -55,6 +56,10 @@ public class ProtocolEngine_0_10 extends InputHandler implements ProtocolEngine _networkDriver = networkDriver; _id = appRegistry.getConfigStore().createId(); _appRegistry = appRegistry; + + // FIXME Two log messages to maintain compatinbility with earlier protocol versions + CurrentActor.get().message(ConnectionMessages.OPEN(null, null, false, false)); + CurrentActor.get().message(ConnectionMessages.OPEN(null, "0-10", false, true)); } public void setNetworkDriver(NetworkDriver driver) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java index 3c924f3231..a1a7bd119b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java @@ -20,38 +20,33 @@ */ package org.apache.qpid.server.transport; -import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT; -import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT; -import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT; -import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT; +import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.*; +import java.text.MessageFormat; + +import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.configuration.ConnectionConfig; -import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.GenericActor; +import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.Connection; -import org.apache.qpid.transport.Method; -import org.apache.qpid.transport.ConnectionCloseCode; -import org.apache.qpid.transport.Session; -import org.apache.qpid.transport.SessionDetachCode; -import org.apache.qpid.transport.SessionDetach; -import org.apache.qpid.transport.Binary; -import org.apache.qpid.transport.SessionDetached; -import org.apache.qpid.transport.SessionException; -import org.apache.qpid.transport.ExecutionException; import org.apache.qpid.transport.ExecutionErrorCode; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.AMQException; - -import java.text.MessageFormat; +import org.apache.qpid.transport.ExecutionException; +import org.apache.qpid.transport.Method; -public class ServerConnection extends Connection implements AMQConnectionModel +public class ServerConnection extends Connection implements AMQConnectionModel, LogSubject { private ConnectionConfig _config; private Runnable _onOpenTask; public ServerConnection() { + CurrentActor.set(GenericActor.getInstance(this)); } @Override @@ -64,9 +59,19 @@ public class ServerConnection extends Connection implements AMQConnectionModel protected void setState(State state) { super.setState(state); - if(state == State.OPEN && _onOpenTask != null) + + if (state == State.OPEN) + { + if (_onOpenTask != null) + { + _onOpenTask.run(); + } + CurrentActor.get().message(ConnectionMessages.OPEN(getClientId(), "0-10", true, true)); + } + + if (state == State.CLOSED) { - _onOpenTask.run(); + CurrentActor.get().message(this, ConnectionMessages.CLOSE()); } } @@ -137,8 +142,8 @@ public class ServerConnection extends Connection implements AMQConnectionModel return " [" + MessageFormat.format(CONNECTION_FORMAT, getConnectionId(), - getAuthorizationID(), - _config.getAddress(), + getClientId(), + getConfig().getAddress(), getVirtualHost().getName()) + "] "; } @@ -147,8 +152,8 @@ public class ServerConnection extends Connection implements AMQConnectionModel return " [" + MessageFormat.format(USER_FORMAT, getConnectionId(), - getAuthorizationID(), - _config.getAddress()) + getClientId(), + getConfig().getAddress()) + "] "; } @@ -156,8 +161,8 @@ public class ServerConnection extends Connection implements AMQConnectionModel { return " [" + MessageFormat.format(SOCKET_FORMAT, - this.getConnectionId(), - _config.getAddress()) + getConnectionId(), + getConfig().getAddress()) + "] "; } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java index a95f4e5c42..4a304b3e66 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java @@ -21,7 +21,9 @@ package org.apache.qpid.server.transport; import org.apache.qpid.transport.*; - +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.GenericActor; +import org.apache.qpid.common.ClientProperties; import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.registry.IApplicationRegistry; @@ -71,7 +73,7 @@ public class ServerConnectionDelegate extends ServerDelegate SessionDelegate serverSessionDelegate = new ServerSessionDelegate(_appRegistry); ServerSession ssn = new ServerSession(conn, serverSessionDelegate, new Binary(atc.getName()), 0); - //ssn.setSessionListener(new Echo()); + return ssn; } @@ -112,6 +114,7 @@ public class ServerConnectionDelegate extends ServerDelegate else { sconn.invoke(new ConnectionOpenOk(Collections.emptyList())); + CurrentActor.set(GenericActor.getInstance(sconn)); sconn.setState(Connection.State.OPEN); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 1f4e32a3e0..71add9c097 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -578,9 +578,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo public String getClientID() { - //fixme this will only work for 0-10 connections - // In 0-8 there is an explicit ClientID property that is != Principal. - return getPrincipal().getName(); + return getConnection().getClientId(); } public LogSubject getLogSubject() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 2ee0a86e7c..adfd178ec3 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -23,7 +23,9 @@ package org.apache.qpid.client; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import javax.jms.ExceptionListener; import javax.jms.JMSException; @@ -159,7 +161,6 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec ConnectionSettings conSettings = new ConnectionSettings(); retriveConnectionSettings(conSettings,brokerDetail); - _qpidConnection.connect(conSettings); _conn._connected = true; @@ -371,6 +372,17 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec conSettings.setVerifyHostname(brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_SSL_VERIFY_HOSTNAME)); + // Pass client name from connection URL + Map<String, Object> clientProps = new HashMap<String, Object>(); + try + { + clientProps.put("clientName", _conn.getClientID()); + conSettings.setClientProperties(clientProps); + } + catch (JMSException e) + { + // Ignore + } if (brokerDetail.getProperty(BrokerDetails.OPTIONS_TCP_NO_DELAY) != null) { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 3c56aa22bd..174dc54a72 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -32,6 +32,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; import javax.security.sasl.SaslClient; import javax.security.sasl.SaslServer; @@ -111,9 +112,10 @@ public class Connection extends ConnectionInvoker private String userID; private ConnectionSettings conSettings; private SecurityLayer securityLayer; + private String _clientId; - // want to make this final - private int _connectionId; + private static final AtomicLong idGenerator = new AtomicLong(0); + private final long _connectionId = idGenerator.incrementAndGet(); public Connection() {} @@ -147,6 +149,16 @@ public class Connection extends ConnectionInvoker } } + public String getClientId() + { + return _clientId; + } + + public void setClientId(String id) + { + _clientId = id; + } + void setLocale(String locale) { this.locale = locale; @@ -321,12 +333,7 @@ public class Connection extends ConnectionInvoker _sessionFactory = sessionFactory; } - public void setConnectionId(int id) - { - _connectionId = id; - } - - public int getConnectionId() + public long getConnectionId() { return _connectionId; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java index 3bd5c7fe0d..644a2daa58 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java @@ -69,6 +69,9 @@ public class ServerDelegate extends ConnectionDelegate conn.setLocale(ok.getLocale()); String mechanism = ok.getMechanism(); + String clientName = (String) ok.getClientProperties().get("clientName"); + conn.setClientId(clientName); + if (mechanism == null || mechanism.length() == 0) { conn.connectionTune diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java index 3bc6730623..84e66c25bd 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java @@ -20,8 +20,6 @@ package org.apache.qpid.transport.network.nio; * */ - -import java.io.EOFException; import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; @@ -30,7 +28,6 @@ import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.ConnectionDelegate; @@ -44,8 +41,7 @@ public class NioHandler implements Runnable private Receiver<ByteBuffer> _receiver; private SocketChannel _ch; private ByteBuffer _readBuf; - private static Map<Integer,NioSender> _handlers = new ConcurrentHashMap<Integer,NioSender>(); - private AtomicInteger _count = new AtomicInteger(); + private static Map<Long,NioSender> _handlers = new ConcurrentHashMap<Long,NioSender>(); private NioHandler(){} @@ -91,7 +87,6 @@ public class NioHandler implements Runnable con.setSender(new Disassembler(sender, 64*1024 - 1)); con.setConnectionDelegate(delegate); - con.setConnectionId(_count.incrementAndGet()); _handlers.put(con.getConnectionId(),sender); _receiver = new InputHandler(new Assembler(con), InputHandler.State.FRAME_HDR); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ConnectionLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ConnectionLoggingTest.java index e6b29c392d..d28429aa39 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ConnectionLoggingTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ConnectionLoggingTest.java @@ -53,10 +53,18 @@ public class ConnectionLoggingTest extends AbstractTestLogging assertLoggingNotYetOccured(CONNECTION_PREFIX); Connection connection = getConnection(); + String clientid = connection.getClientID(); - List<String> results = waitAndFindMatches(CONNECTION_PREFIX); + // Wait until opened + waitForMessage("CON-1001"); + + // Close the conneciton + connection.close(); - assertTrue("No CON messages logged", results.size() > 0); + // Wait to ensure that the desired message is logged + waitForMessage("CON-1002"); + + List<String> results = waitAndFindMatches("CON-1001"); // Validation // We should have at least three messages when running InVM but when running External @@ -79,9 +87,9 @@ public class ConnectionLoggingTest extends AbstractTestLogging //Use just the data from the last connection for the test results = connectionData.get(connectionID); - // If we are running inVM we will get three open messagse, if running externally weN will also have - // open and close messages from the failed 0-10 negotiation - assertTrue("CON messages not logged:" + results.size(), results.size() >= 3); + // If we are running inVM or with 0-10 we will get three open messagse + // if running externally with 0-8/0-9 we will also have open and close messages from the failed 0-10 negotiation + assertTrue("CON messages not logged:" + results.size(), results.size() >= 3); String log = getLogMessage(results, 0); // MESSAGE [con:1(/127.0.0.1:52540)] CON-1001 : Open @@ -94,14 +102,12 @@ public class ConnectionLoggingTest extends AbstractTestLogging // 3 - Assert the options are correct // MESSAGE [con:1(/127.0.0.1:52540)] CON-1001 : Open : Client ID : clientid : Protocol Version : 0-9 - validateConnectionOpen(results, 0, true, true, connection.getClientID()); + validateConnectionOpen(results, 0, true, true, clientid); // MESSAGE [con:1(/127.0.0.1:52540)] CON-1001 : Open : Protocol Version : 0-9 validateConnectionOpen(results, 1, true, false, null); validateConnectionOpen(results, 2, false, false, null); - - connection.close(); } private void validateConnectionOpen(List<String> results, int positionFromEnd, @@ -115,7 +121,7 @@ public class ConnectionLoggingTest extends AbstractTestLogging if(clientIdOptionPresent && clientIdValue != null) { - assertTrue("Client ID value is not present", fromMessage(log).contains(clientIdValue)); + assertTrue("Client ID value is not present: " + clientIdValue, fromMessage(log).contains(clientIdValue)); } assertEquals("unexpected Protocol Version option state", @@ -144,8 +150,13 @@ public class ConnectionLoggingTest extends AbstractTestLogging { assertLoggingNotYetOccured(CONNECTION_PREFIX); - // Open and then close the conneciton - getConnection().close(); + Connection connection = getConnection(); + + // Wait until opened + waitForMessage("CON-1001"); + + // Close the conneciton + connection.close(); // Wait to ensure that the desired message is logged waitForMessage("CON-1002"); @@ -163,12 +174,19 @@ public class ConnectionLoggingTest extends AbstractTestLogging assertTrue("Message does not end with close:" + log, log.endsWith("Close")); // Extract connection ID to validate there is a CON-1001 messasge for it - int connectionID = getConnectionID(log); + int closeConnectionID = getConnectionID(fromSubject(log)); + assertTrue("Could not find connection id in CLOSE", closeConnectionID != -1); //Previous log message should be the open log = getLogMessageFromEnd(results, 1); // MESSAGE [con:1(/127.0.0.1:52540)] CON-1001 : Open : Client ID : clientid : Protocol Version : 0-9 validateMessageID("CON-1001",log); - assertEquals("Connection IDs do not match", connectionID, getConnectionID(fromActor(log))); + + // Extract connection ID to validate it matches the CON-1002 messasge + int openConnectionID = getConnectionID(fromActor(log)); + assertTrue("Could not find connection id in OPEN", openConnectionID != -1); + + // Check connection ids match + assertEquals("Connection IDs do not match", closeConnectionID, openConnectionID); } } diff --git a/qpid/java/test-profiles/Java010Excludes b/qpid/java/test-profiles/Java010Excludes index 6da362f33a..44b9cff9d8 100755 --- a/qpid/java/test-profiles/Java010Excludes +++ b/qpid/java/test-profiles/Java010Excludes @@ -22,7 +22,20 @@ org.apache.qpid.server.exchange.ReturnUnroutableMandatoryMessageTest#* org.apache.qpid.test.client.QueueBrowsingFlowToDiskTest#* // 0-10 Broker does not follow the same Logging convention as the Java broker -org.apache.qpid.server.logging.* +org.apache.qpid.server.logging.AccessControlLoggingTest#* +org.apache.qpid.server.logging.AlertingTest#* +org.apache.qpid.server.logging.BindingLoggingTest#* +org.apache.qpid.server.logging.BrokerLoggingTest#* +org.apache.qpid.server.logging.ChannelLoggingTest#* +org.apache.qpid.server.logging.DerbyMessageStoreLoggingTest#* +org.apache.qpid.server.logging.DurableQueueLoggingTest#* +org.apache.qpid.server.logging.ExchangeLoggingTest#* +org.apache.qpid.server.logging.ManagementLoggingTest#* +org.apache.qpid.server.logging.MemoryMessageStoreLoggingTest#* +org.apache.qpid.server.logging.QueueLoggingTest#* +org.apache.qpid.server.logging.SubscriptionLoggingTest#* +org.apache.qpid.server.logging.TransientQueueLoggingTest#* +org.apache.qpid.server.logging.VirtualHostLoggingTest#* org.apache.qpid.server.logging.messages.* org.apache.qpid.server.logging.subjects.* org.apache.qpid.server.logging.actors.* |