summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2009-07-22 09:52:02 +0000
committerMartin Ritchie <ritchiem@apache.org>2009-07-22 09:52:02 +0000
commitba01534206bc194dab376f25fcc3fa3687d0dc2c (patch)
tree6f7f204aa120473340f4fbc2c10889e463d475b9
parent33ee5b9247bd4d1e6b7eb88869286ed77c2baf17 (diff)
downloadqpid-python-ba01534206bc194dab376f25fcc3fa3687d0dc2c.tar.gz
QPID-1992 : Addition of new Broker Logging Framework
Provided static CurrentActor for accessing ThreadLocal. Included Test to validate setting of ThreadLocals. Added Test for AMQPActor Added getRootMessageLogger() to IApplicationRegistry Adjusted *ProtocolSessions to start counting at 0. Allowed Setting of Vhost on the MockProtocolSession Created a fixed Principle in MockProtocolSession Changes to MockProtocolSession, prevent NPEs when the AMQPActor creates its log string. Converted CurrentActor to use a Stack allowing a variety of actors to take their turn on a thread. Improved package structure Added testing for Actors Moved FileMonitorTools functionality to FileUtils and provided a Test Converted Log4jMessageLoggerTest to a proper UnitTest Moved Test cases to test package Updated other broker tests to set the authenticated user before setting the virtualhost, Whilst the logging could output null as the username it would be better if the tests correctly set the authorizedID. Update to include tests for disabled logging Fully tested LogSubjects Updated MockAMQQueue to be able to take a Virtualhost as per a normal Queue. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@796650 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java17
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/BrokerMessages.java34
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogActor.java43
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogMessage.java26
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogSubject.java37
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RawMessageLogger.java44
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java56
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLoggerImpl.java52
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java79
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java115
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java45
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java54
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java57
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLogger.java55
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/AbstractLogSubject.java64
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java62
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java54
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java48
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubject.java46
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java45
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubject.java49
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java54
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java11
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java21
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java14
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/RootMessageLoggerImplTest.java86
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/StatusUpdateConfigurationTest.java73
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java206
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java202
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java262
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/ManagementActorTest.java125
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/TestBlankActor.java33
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLoggerTest.java288
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/UnitTestMessageLogger.java60
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/UnitTestMessageLoggerTest.java102
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java258
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java68
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java79
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java69
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubjectTest.java57
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/QueueLogSubjectTest.java60
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubjectTest.java114
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java31
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java1
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java12
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java8
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java28
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java9
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java20
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java45
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java182
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/util/FileUtilsTest.java80
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java5
61 files changed, 3646 insertions, 141 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
index e0d325a5b0..fc16b75e1a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
@@ -94,6 +94,7 @@ public class ServerConfiguration implements SignalHandler
envVarMap.put("QPID_SOCKETWRITEBUFFER", "connector.socketWriteBuffer");
envVarMap.put("QPID_TCPNODELAY", "connector.tcpNoDelay");
envVarMap.put("QPID_ENABLEPOOLEDALLOCATOR", "advanced.enablePooledAllocator");
+ envVarMap.put("QPID_STATUS-UPDATES", "status-updates");
}
public ServerConfiguration(File configurationURL) throws ConfigurationException
@@ -186,7 +187,12 @@ public class ServerConfiguration implements SignalHandler
}
return conf;
}
-
+
+ public boolean getStatusEnabled()
+ {
+ return getConfig().getBoolean("status-updates", true);
+ }
+
// Our configuration class needs to make the interpolate method
// public so it can be called below from the config method.
private static class MyConfiguration extends CompositeConfiguration
@@ -541,4 +547,13 @@ public class ServerConfiguration implements SignalHandler
getConfig().getLong("housekeeping.expiredMessageCheckPeriod",
DEFAULT_HOUSEKEEPING_PERIOD));
}
+
+ public boolean getStatusUpdates()
+ {
+ // Retrieve the setting from configuration but default to on.
+ String value = getConfig().getString("status-updates", "on");
+
+ return value.equalsIgnoreCase("on");
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
index 054674aed4..5d7adc6371 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
@@ -54,8 +54,11 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
- final AMQChannel channel = new AMQChannel(session,channelId, virtualHost.getMessageStore()
- );
+ final AMQChannel channel = new AMQChannel(session,channelId,
+ virtualHost.getMessageStore());
+
+
+
session.addChannel(channel);
ChannelOpenOkBody response;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/BrokerMessages.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/BrokerMessages.java
new file mode 100644
index 0000000000..e9cc7449cd
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/BrokerMessages.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging;
+
+public class BrokerMessages
+{
+
+ public static LogMessage BRK_1001(String version, String build)
+ {
+ return new LogMessage()
+ {
+
+ };
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogActor.java
new file mode 100644
index 0000000000..203a5d160d
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogActor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging;
+
+/**
+ * LogActor the entity that is stored as in a ThreadLocal and used to perform logging.
+ *
+ * The actor is responsible for formatting its display name for the log entry.
+ *
+ * The actor performs the requested logging.
+ */
+public interface LogActor
+{
+ /**
+ * Logs the specified LogMessage about the LogSubject
+ *
+ * Currently logging has a global setting however this will later be revised and
+ * as such the LogActor will need to take into consideration any new configuration
+ * as a means of enabling the logging of LogActors and LogSubjects.
+ *
+ * @param subject The subject that is being logged
+ * @param message The message to log
+ */
+ public void message(LogSubject subject, LogMessage message);
+} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogMessage.java
new file mode 100644
index 0000000000..5c112ff100
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogMessage.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging;
+
+public interface LogMessage
+{
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogSubject.java
new file mode 100644
index 0000000000..e53ef364bf
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogSubject.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging;
+
+/**
+ * Each LogSubject that wishes to be logged will implement this to provide their
+ * own display representation.
+ *
+ * The display representation is retrieved through the toString() method.
+ */
+public interface LogSubject
+{
+ /**
+ * Logs the message as provided by String.valueOf(message).
+ *
+ * @returns String the display representation of this LogSubject
+ */
+ public String toString();
+} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RawMessageLogger.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RawMessageLogger.java
new file mode 100644
index 0000000000..7d515f3263
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RawMessageLogger.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging;
+
+/**
+ * A RawMessage Logger takes the given String and any Throwable and writes the
+ * data to its resource.
+ */
+public interface RawMessageLogger
+{
+
+ /**
+ * Log the given message.
+ *
+ * @param message String to log.
+ */
+ public void rawMessage(String message);
+
+ /**
+ * Log the message and formatted stack trace for any Throwable.
+ *
+ * @param message String to log.
+ * @param throwable Throwable for which to provide stack trace.
+ */
+ public void rawMessage(String message, Throwable throwable);
+} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java
new file mode 100644
index 0000000000..cd7992faa7
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging;
+
+/**
+ * The RootMessageLogger is used by the LogActors to query if
+ * logging is enabled for the requested message and to provide the actual
+ * message that should be logged.
+ */
+public interface RootMessageLogger
+{
+ /**
+ * Determine if the LogSubject and the LogActor should be
+ * generating log messages.
+ *
+ * @param subject The subject of this log request
+ * @param actor The actor requesting the logging
+ * @return boolean true if the message should be logged.
+ */
+ boolean isMessageEnabled(LogActor actor, LogSubject subject);
+
+
+ /**
+ * Log the raw message to the configured logger.
+ *
+ * @param message The message to log
+ */
+ public void rawMessage(String message);
+
+ /**
+ * Log the raw message to the configured logger.
+ * Along with a formated stack trace from the Throwable.
+ *
+ * @param message The message to log
+ * @param throwable Optional Throwable that should provide stact trace
+ */
+ void rawMessage(String message, Throwable throwable);
+} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLoggerImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLoggerImpl.java
new file mode 100644
index 0000000000..9270c316b6
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLoggerImpl.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging;
+
+import org.apache.qpid.server.configuration.ServerConfiguration;
+
+public class RootMessageLoggerImpl implements RootMessageLogger
+{
+ private boolean _enabled;
+
+ RawMessageLogger _rawLogger;
+ private static final String MESSAGE = "MESSAGE ";
+
+ public RootMessageLoggerImpl(ServerConfiguration configuration, RawMessageLogger rawLogger)
+ {
+ _enabled = configuration.getStatusUpdates();
+ _rawLogger = rawLogger;
+ }
+
+ public boolean isMessageEnabled(LogActor actor, LogSubject subject)
+ {
+ return _enabled;
+ }
+
+ public void rawMessage(String message)
+ {
+ _rawLogger.rawMessage(MESSAGE + message);
+ }
+
+ public void rawMessage(String message, Throwable throwable)
+ {
+ _rawLogger.rawMessage(MESSAGE + message, throwable);
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java
new file mode 100644
index 0000000000..3170040a77
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging.actors;
+
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.text.MessageFormat;
+
+/**
+ * An AMQPChannelActor represtents a connection through the AMQP port with an
+ * associated Channel.
+ *
+ * <p/>
+ * This is responsible for correctly formatting the LogActor String in the log
+ * <p/>
+ * [con:1(user@127.0.0.1/)/ch:1]
+ * <p/>
+ * To do this it requires access to the IO Layers as well as a Channel
+ */
+public class AMQPChannelActor extends AbstractActor
+{
+
+ /**
+ * Create a new ChannelActor
+ *
+ * @param channel The Channel for this LogActor
+ * @param rootLogger The root Logger that this LogActor should use
+ */
+ public AMQPChannelActor(AMQChannel channel, RootMessageLogger rootLogger)
+ {
+ super(rootLogger);
+
+ AMQProtocolSession session = channel.getProtocolSession();
+
+ /**
+ * LOG FORMAT used by the AMQPConnectorActor follows
+ * ChannelLogSubject.CHANNEL_FORMAT :
+ * con:{0}({1}@{2}/{3})/ch:{4}
+ *
+ * Uses a MessageFormat call to insert the requried values according to
+ * these indicies:
+ *
+ * 0 - Connection ID
+ * 1 - User ID
+ * 2 - IP
+ * 3 - Virtualhost
+ */
+ _logString = "[" + MessageFormat.format(ChannelLogSubject.CHANNEL_FORMAT,
+ session.getSessionID(),
+ session.getAuthorizedID().getName(),
+ session.getRemoteAddress(),
+ session.getVirtualHost().getName(),
+ channel.getChannelId())
+ + "] ";
+ }
+}
+
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java
new file mode 100644
index 0000000000..432b1d8203
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging.actors;
+
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+
+import java.text.MessageFormat;
+
+/**
+ * An AMQPConnectionActor represtents a connectionthrough the AMQP port.
+ * <p/>
+ * This is responsible for correctly formatting the LogActor String in the log
+ * <p/>
+ * [ con:1(user@127.0.0.1/) ]
+ * <p/>
+ * To do this it requires access to the IO Layers.
+ */
+public class AMQPConnectionActor extends AbstractActor
+{
+ /**
+ * 0 - Connection ID
+ * 1 - Remote Address
+ */
+ public static String SOCKET_FORMAT = "con:{0}({1})";
+
+ /**
+ * LOG FORMAT for the ConnectionLogSubject,
+ * Uses a MessageFormat call to insert the requried values according to
+ * these indicies:
+ *
+ * 0 - Connection ID
+ * 1 - User ID
+ * 2 - IP
+ */
+ public static final String USER_FORMAT = "con:{0}({1}@{2})";
+
+ public AMQPConnectionActor(AMQProtocolSession session, RootMessageLogger rootLogger)
+ {
+ super(rootLogger);
+
+ _logString = "[" + MessageFormat.format(SOCKET_FORMAT,
+ session.getSessionID(),
+ session.getRemoteAddress())
+
+ + "] ";
+ }
+
+ /**
+ * Call when the connection has been authorized so that the logString
+ * can be updated with the new user identity.
+ *
+ * @param session the authorized session
+ */
+ public void connectionAuthorized(AMQProtocolSession session)
+ {
+ _logString = "[" + MessageFormat.format(USER_FORMAT,
+ session.getSessionID(),
+ session.getAuthorizedID().getName(),
+ session.getRemoteAddress())
+ + "] ";
+
+ }
+
+ /**
+ * Called once the user has been authenticated and they are now selecting
+ * the virtual host they wish to use.
+ *
+ * @param session the session that now has a virtualhost associated with it.
+ */
+ public void virtualHostSelected(AMQProtocolSession session)
+ {
+
+ /**
+ * LOG FORMAT used by the AMQPConnectorActor follows
+ * ConnectionLogSubject.CONNECTION_FORMAT :
+ * con:{0}({1}@{2}/{3})
+ *
+ * Uses a MessageFormat call to insert the requried values according to
+ * these indicies:
+ *
+ * 0 - Connection ID
+ * 1 - User ID
+ * 2 - IP
+ * 3 - Virtualhost
+ */
+ _logString = "[" + MessageFormat.format(ConnectionLogSubject.CONNECTION_FORMAT,
+ session.getSessionID(),
+ session.getAuthorizedID().getName(),
+ session.getRemoteAddress(),
+ session.getVirtualHost().getName())
+ + "] ";
+
+ }
+}
+
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java
new file mode 100644
index 0000000000..95f2dc9ff6
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.actors;
+
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogMessage;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.RootMessageLogger;
+
+public abstract class AbstractActor implements LogActor
+{
+ protected String _logString;
+ protected RootMessageLogger _rootLogger;
+
+ public AbstractActor(RootMessageLogger rootLogger)
+ {
+ _rootLogger = rootLogger;
+ }
+
+ public void message(LogSubject subject, LogMessage message)
+ {
+ if (_rootLogger.isMessageEnabled(this, subject))
+ {
+ _rootLogger.rawMessage(_logString + String.valueOf(subject) + message);
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java
new file mode 100644
index 0000000000..221e57eebb
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.actors;
+
+import org.apache.qpid.server.logging.LogActor;
+
+import java.util.LinkedList;
+import java.util.Deque;
+
+public class CurrentActor
+{
+ private static final ThreadLocal<Deque<LogActor>> _currentActor = new ThreadLocal<Deque<LogActor>>()
+ {
+ protected Deque<LogActor> initialValue()
+ {
+ return new LinkedList<LogActor>();
+ }
+ };
+
+ public static void set(LogActor actor)
+ {
+ Deque<LogActor> stack = _currentActor.get();
+ stack.addFirst(actor);
+ }
+
+ public static void remove()
+ {
+ Deque<LogActor> stack = _currentActor.get();
+ stack.remove();
+ }
+
+ public static LogActor get()
+ {
+ return _currentActor.get().peek();
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java
new file mode 100644
index 0000000000..58d55a13bb
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.actors;
+
+import org.apache.qpid.server.logging.RootMessageLogger;
+
+import java.text.MessageFormat;
+import java.security.Principal;
+
+public class ManagementActor extends AbstractActor
+{
+
+ /**
+ * LOG FORMAT for the ManagementActor,
+ * Uses a MessageFormat call to insert the requried values according to
+ * these indicies:
+ *
+ * 0 - Connection ID
+ * 1 - User ID
+ * 2 - IP
+ */
+ public static final String MANAGEMENT_FORMAT = "mng:{0}({1}@{2})";
+
+ /**
+ * //todo Correct interface to provide connection details
+ * @param user
+ * @param rootLogger The RootLogger to use for this Actor
+ */
+ public ManagementActor(Principal user, RootMessageLogger rootLogger)
+ {
+ super(rootLogger);
+
+ _logString = "["+ MessageFormat.format(MANAGEMENT_FORMAT,
+ "<MNG:ConnectionID>",
+ user.getName(),
+ "<MNG:RemoteAddress>")
+ + "] ";
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLogger.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLogger.java
new file mode 100644
index 0000000000..3774155626
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLogger.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging.rawloggers;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.logging.RawMessageLogger;
+
+public class Log4jMessageLogger implements RawMessageLogger
+{
+ public static final String DEFAULT_LEVEL = "INFO";
+ public static final String DEFAULT_LOGGER = "qpid.message";
+ private Level _level;
+ private Logger _rawMessageLogger;
+
+ public Log4jMessageLogger()
+ {
+ this(DEFAULT_LEVEL, DEFAULT_LOGGER);
+ }
+
+ public Log4jMessageLogger(String level, String logger)
+ {
+ _level = Level.toLevel(level);
+
+ _rawMessageLogger = Logger.getLogger(logger);
+ }
+
+ public void rawMessage(String message)
+ {
+ rawMessage(message, null);
+ }
+
+ public void rawMessage(String message, Throwable throwable)
+ {
+ _rawMessageLogger.log(_level, message, throwable);
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/AbstractLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/AbstractLogSubject.java
new file mode 100644
index 0000000000..4fb5bdcc93
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/AbstractLogSubject.java
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import org.apache.qpid.server.logging.LogSubject;
+
+import java.text.MessageFormat;
+
+/**
+ * The LogSubjects all have a similar requriement to format their output and
+ * provide the String value.
+ *
+ * This Abstract LogSubject provides this basic functionality, allowing the
+ * actual LogSubjects to provide their formating and data.
+ */
+public abstract class AbstractLogSubject implements LogSubject
+{
+ /**
+ * The logString that will be returned via toString
+ */
+ protected String logString;
+
+ /**
+ * Set the toString logging of this LogSubject. Based on a format provided
+ * by format and the var args.
+ * @param format The Message to format
+ * @param args The values to put in to the message.
+ */
+ protected void setLogStringWithFormat(String format, Object... args)
+ {
+ logString = "[" + MessageFormat.format(format, args) + "] ";
+ }
+
+ /**
+ * ToString is how the Logging infrastructure will get the text for this
+ * LogSubject
+ *
+ * @return String representing this LogSubject
+ */
+ @Override
+ public String toString()
+ {
+ return logString;
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java
new file mode 100644
index 0000000000..fd171fea5a
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.AMQQueue;
+
+public class BindingLogSubject extends AbstractLogSubject
+{
+
+ /**
+ * LOG FORMAT for the ChannelLogSubject,
+ * Uses a MessageFormat call to insert the requried values according to
+ * these indicies:
+ *
+ * 0 - Virtualhost Name
+ * 1 - Exchange Type
+ * 2 - Exchange Name
+ * 3 - Queue Name
+ * 4 - Binding RoutingKey
+ */
+ protected static String BINDING_FORMAT = "vh(/{0})/ex({1}/{2})/qu({3})/rk({4})";
+
+ /**
+ * Create a BindingLogSubject that Logs in the following format.
+ *
+ * [ vh(/)/ex(amq.direct)/qu(testQueue)/bd(testQueue) ]
+ *
+ * @param routingKey
+ * @param exchange
+ * @param queue
+ */
+ public BindingLogSubject(AMQShortString routingKey, Exchange exchange,
+ AMQQueue queue)
+ {
+ setLogStringWithFormat(BINDING_FORMAT, queue.getVirtualHost().getName(),
+ exchange.getType(),
+ exchange.getName(),
+ queue.getName(),
+ routingKey);
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
new file mode 100644
index 0000000000..1b22de6d01
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+
+public class ChannelLogSubject extends AbstractLogSubject
+{
+ /**
+ * LOG FORMAT for the ChannelLogSubject,
+ * Uses a MessageFormat call to insert the requried values according to
+ * these indicies:
+ *
+ * 0 - Connection ID
+ * 1 - User ID
+ * 2 - IP
+ * 3 - Virtualhost
+ * 4 - Channel ID
+ */
+ public static String CHANNEL_FORMAT = ConnectionLogSubject.CONNECTION_FORMAT
+ + "/ch:{4}";
+
+ public ChannelLogSubject(AMQChannel channel)
+ {
+ AMQProtocolSession session = channel.getProtocolSession();
+
+ // Provide the value for the 4th replacement.
+ setLogStringWithFormat(CHANNEL_FORMAT,
+ session.getSessionID(),
+ session.getAuthorizedID().getName(),
+ session.getRemoteAddress(),
+ session.getVirtualHost().getName(),
+ channel.getChannelId());
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java
new file mode 100644
index 0000000000..e07dbcda23
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+
+/** The Connection LogSubject */
+public class ConnectionLogSubject extends AbstractLogSubject
+{
+
+ /**
+ * LOG FORMAT for the ConnectionLogSubject,
+ * Uses a MessageFormat call to insert the requried values according to
+ * these indicies:
+ *
+ * 0 - Connection ID
+ * 1 - User ID
+ * 2 - IP
+ * 3 - Virtualhost
+ */
+ public static final String CONNECTION_FORMAT = "con:{0}({1}@{2}/{3})";
+
+ public ConnectionLogSubject(AMQProtocolSession session)
+ {
+ setLogStringWithFormat(CONNECTION_FORMAT, session.getSessionID(),
+ session.getAuthorizedID().getName(),
+ session.getRemoteAddress(),
+ session.getVirtualHost().getName());
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubject.java
new file mode 100644
index 0000000000..21e5f5e4ce
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubject.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class ExchangeLogSubject extends AbstractLogSubject
+{
+
+ /**
+ * LOG FORMAT for the ExchangeLogSubject,
+ * Uses a MessageFormat call to insert the requried values according to
+ * these indicies:
+ *
+ * 0 - Virtualhost Name
+ * 1 - Exchange Type
+ * 2 - Exchange Name
+ */
+ protected static String BINDING_FORMAT = "vh(/{0})/ex({1}/{2})";
+
+ /** Create an ExchangeLogSubject that Logs in the following format. */
+ public ExchangeLogSubject(Exchange exchange, VirtualHost vhost)
+ {
+ setLogStringWithFormat(BINDING_FORMAT, vhost.getName(),
+ exchange.getType(), exchange.getName());
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java
new file mode 100644
index 0000000000..89f31ef477
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import org.apache.qpid.server.queue.AMQQueue;
+
+public class QueueLogSubject extends AbstractLogSubject
+{
+
+ /**
+ * LOG FORMAT for the ExchangeLogSubject,
+ * Uses a MessageFormat call to insert the requried values according to
+ * these indicies:
+ *
+ * 0 - Virtualhost name
+ * 1 - queue name
+ */
+ protected static String BINDING_FORMAT = "vh(/{0})/qu({1})";
+
+ /** Create an QueueLogSubject that Logs in the following format. */
+ public QueueLogSubject(AMQQueue queue)
+ {
+ setLogStringWithFormat(BINDING_FORMAT,
+ queue.getVirtualHost().getName(),
+ queue.getName());
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubject.java
new file mode 100644
index 0000000000..b68ef2e9a9
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubject.java
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import org.apache.qpid.server.subscription.Subscription;
+
+public class SubscriptionLogSubject extends AbstractLogSubject
+{
+
+ /**
+ * LOG FORMAT for the SubscriptionLogSubject,
+ * Uses a MessageFormat call to insert the requried values according to
+ * these indicies:
+ *
+ * 0 - Subscription ID
+ * 1 - queue name
+ */
+ protected static String BINDING_FORMAT = "sub:{0}(qu({1}))";
+
+ /**
+ * Create an QueueLogSubject that Logs in the following format.
+ *
+ * @param subscription
+ */
+ public SubscriptionLogSubject(Subscription subscription)
+ {
+
+ setLogStringWithFormat(BINDING_FORMAT, subscription.getSubscriptionID(),
+ subscription.getQueue().getName());
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 205ca73f13..e46a52f3bf 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -21,26 +21,39 @@
package org.apache.qpid.server.protocol;
import org.apache.log4j.Logger;
-
+import org.apache.mina.common.CloseFuture;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.common.IoSession;
-import org.apache.mina.common.CloseFuture;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
-
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.codec.AMQDecoder;
import org.apache.qpid.common.ClientProperties;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.MethodDispatcher;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.handler.ServerMethodDispatcherImpl;
+import org.apache.qpid.server.logging.actors.AMQPConnectionActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.output.ProtocolOutputConverter;
@@ -54,7 +67,6 @@ import org.apache.qpid.transport.Sender;
import javax.management.JMException;
import javax.security.sasl.SaslServer;
-
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.security.Principal;
@@ -64,6 +76,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicLong;
public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
{
@@ -71,6 +84,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
+ private static final AtomicLong idGenerator = new AtomicLong(0);
+
// to save boxing the channelId and looking up in a map... cache in an array the low numbered
// channels. This value must be of the form 2^x - 1.
private static final int CHANNEL_CACHE_SIZE = 0xff;
@@ -120,6 +135,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
private static final long LAST_WRITE_FUTURE_JOIN_TIMEOUT = 60000L;
private org.apache.mina.common.WriteFuture _lastWriteFuture;
+ // Create a simple ID that increments for ever new Session
+ private final long _sessionID = idGenerator.getAndIncrement();
+
+ private AMQPConnectionActor _actor;
+
public ManagedObject getManagedObject()
{
return _managedObject;
@@ -134,6 +154,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
_codecFactory = codecFactory;
+ _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
+
try
{
IoServiceConfig config = session.getServiceConfig();
@@ -158,6 +180,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
_codecFactory = codecFactory;
+ _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
}
private AMQProtocolSessionMBean createMBean() throws AMQException
@@ -183,6 +206,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
return (AMQProtocolSession) minaProtocolSession.getAttachment();
}
+ public long getSessionID()
+ {
+ return _sessionID;
+ }
+
public void dataBlockReceived(AMQDataBlock message) throws Exception
{
_lastReceived = message;
@@ -235,6 +263,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
}
}
+ CurrentActor.set(_actor);
try
{
body.handle(channelId, this);
@@ -244,7 +273,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
closeChannel(channelId);
throw e;
}
-
+ finally
+ {
+ CurrentActor.remove();
+ }
}
private void protocolInitiationReceived(ProtocolInitiation pi)
@@ -796,6 +828,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
{
_virtualHost = virtualHost;
+ _actor.virtualHostSelected(this);
+
_virtualHost.getConnectionRegistry().registerConnection(this);
_managedObject = createMBean();
@@ -820,6 +854,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
public void setAuthorizedID(Principal authorizedID)
{
_authorizedID = authorizedID;
+
+ // Let the actor know that this connection is now Authorized
+ _actor.connectionAuthorized(this);
}
public Principal getAuthorizedID()
@@ -827,6 +864,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
return _authorizedID;
}
+ public SocketAddress getRemoteAddress()
+ {
+ return _minaProtocolSession.getRemoteAddress();
+ }
+
public MethodRegistry getMethodRegistry()
{
return MethodRegistry.getMethodRegistry(getProtocolVersion());
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index 1bac601225..f721730d9c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -36,6 +36,7 @@ import java.security.Principal;
public interface AMQProtocolSession extends AMQVersionAwareProtocolSession
{
+ long getSessionID();
public static final class ProtocolSessionIdentifier
{
@@ -198,6 +199,8 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession
/** @return a Principal that was used to authorized this session */
Principal getAuthorizedID();
+ public java.net.SocketAddress getRemoteAddress();
+
public MethodRegistry getMethodRegistry();
public MethodDispatcher getMethodDispatcher();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index e994967dc5..8c66508307 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -202,6 +202,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public void bind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException
{
+
exchange.registerQueue(routingKey, this, arguments);
if (isDurable() && exchange.isDurable())
{
@@ -209,6 +210,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
_bindings.addBinding(routingKey, arguments, exchange);
+// ExchangeBinding binding = new ExchangeBinding(routingKey, exchange, arguments);
+
+ //fixme MR logging in progress
+// _bindings.addBinding(binding);
+//
+// if (_logger.isMessageEnabled(binding))
+// {
+// _logger.message(binding, "QM-1001 : Created Binding");
+// }
}
public void unBind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
index 22b4623ae1..b58b849133 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
@@ -35,6 +35,7 @@ import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.logging.RootMessageLogger;
/**
* An abstract application registry that provides access to configuration information and handles the
@@ -70,6 +71,8 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
protected PluginManager _pluginManager;
+ protected RootMessageLogger _rootMessageLogger;
+
static
{
Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownService()));
@@ -287,4 +290,9 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
return _pluginManager;
}
+ public RootMessageLogger getRootMessageLogger()
+ {
+ return _rootMessageLogger;
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
index 39164883f9..31a85b878a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
@@ -33,6 +33,8 @@ import org.apache.qpid.server.security.auth.database.ConfigurationFilePrincipalD
import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.logging.RootMessageLoggerImpl;
+import org.apache.qpid.server.logging.rawloggers.Log4jMessageLogger;
public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
{
@@ -44,9 +46,12 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
public void initialise() throws Exception
{
+ _rootMessageLogger = new RootMessageLoggerImpl(_configuration,
+ new Log4jMessageLogger());
+
initialiseManagedObjectRegistry();
- _virtualHostRegistry = new VirtualHostRegistry();
+ _virtualHostRegistry = new VirtualHostRegistry(this);
_pluginManager = new PluginManager(_configuration.getPluginDirectory());
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
index bbfda3addc..7d17639f22 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
@@ -33,6 +33,7 @@ import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
import org.apache.qpid.server.security.access.ACLManager;
import org.apache.qpid.server.security.access.ACLPlugin;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.logging.RootMessageLogger;
import org.apache.mina.common.IoAcceptor;
public interface IApplicationRegistry
@@ -69,6 +70,8 @@ public interface IApplicationRegistry
PluginManager getPluginManager();
+ RootMessageLogger getRootMessageLogger();
+
/**
* Register any acceptors for this registry
* @param bindAddress The address that the acceptor has been bound with
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
index 9419572399..19eabce9ff 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
@@ -52,6 +52,8 @@ public interface Subscription
AMQShortString getConsumerTag();
+ long getSubscriptionID();
+
boolean isSuspended();
boolean hasInterest(QueueEntry msg);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index 7aa9d1e3af..51da884d1e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.subscription;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -66,6 +67,11 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
private QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
private final Lock _stateChangeLock;
+ private static final AtomicLong idGenerator = new AtomicLong(0);
+ // Create a simple ID that increments for ever new Subscription
+ private final long _subscriptionID = idGenerator.getAndIncrement();
+
+
static final class BrowserSubscription extends SubscriptionImpl
{
public BrowserSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
@@ -526,6 +532,11 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
return _consumerTag;
}
+ public long getSubscriptionID()
+ {
+ return _subscriptionID;
+ }
+
public AMQProtocolSession getProtocolSession()
{
return _channel.getProtocolSession();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
index eda2d3a94e..9ef1e029d3 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
@@ -20,17 +20,12 @@
*/
package org.apache.qpid.server.util;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Properties;
-
-import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.MapConfiguration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.logging.RootMessageLoggerImpl;
+import org.apache.qpid.server.logging.rawloggers.Log4jMessageLogger;
import org.apache.qpid.server.management.NoopManagedObjectRegistry;
import org.apache.qpid.server.plugins.PluginManager;
import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -41,6 +36,10 @@ import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticat
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Properties;
+
public class NullApplicationRegistry extends ApplicationRegistry
{
public NullApplicationRegistry() throws ConfigurationException
@@ -51,9 +50,11 @@ public class NullApplicationRegistry extends ApplicationRegistry
public void initialise() throws Exception
{
_logger.info("Initialising NullApplicationRegistry");
-
+
+ _rootMessageLogger = new RootMessageLoggerImpl(_configuration, new Log4jMessageLogger());
+
_configuration.setHousekeepingExpiredMessageCheckPeriod(200);
-
+
Properties users = new Properties();
users.put("guest", "guest");
@@ -65,7 +66,7 @@ public class NullApplicationRegistry extends ApplicationRegistry
_authenticationManager = new PrincipalDatabaseAuthenticationManager(null, null);
_managedObjectRegistry = new NoopManagedObjectRegistry();
- _virtualHostRegistry = new VirtualHostRegistry();
+ _virtualHostRegistry = new VirtualHostRegistry(this);
PropertiesConfiguration vhostProps = new PropertiesConfiguration();
VirtualHostConfiguration hostConfig = new VirtualHostConfiguration("test", vhostProps);
VirtualHost dummyHost = new VirtualHost(hostConfig);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
index 27917fac8a..5543adbeb5 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.server.virtualhost;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
@@ -32,6 +35,12 @@ public class VirtualHostRegistry
private String _defaultVirtualHostName;
+ private ApplicationRegistry _applicationRegistry;
+
+ public VirtualHostRegistry(ApplicationRegistry applicationRegistry)
+ {
+ _applicationRegistry = applicationRegistry;
+ }
public synchronized void registerVirtualHost(VirtualHost host) throws Exception
{
@@ -67,4 +76,9 @@ public class VirtualHostRegistry
{
return new ArrayList<VirtualHost>(_registry.values());
}
+
+ public ApplicationRegistry getApplicationRegistry()
+ {
+ return _applicationRegistry;
+ }
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/RootMessageLoggerImplTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/RootMessageLoggerImplTest.java
new file mode 100644
index 0000000000..012a590687
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/RootMessageLoggerImplTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging;
+
+import junit.framework.TestCase;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.logging.rawloggers.UnitTestMessageLogger;
+
+import java.util.List;
+
+public class RootMessageLoggerImplTest extends TestCase
+{
+
+ RootMessageLogger _rootLogger;
+ UnitTestMessageLogger _rawLogger;
+
+ public void setUp() throws ConfigurationException
+ {
+ Configuration config = new PropertiesConfiguration();
+ ServerConfiguration serverConfig = new ServerConfiguration(config);
+
+ _rawLogger = new UnitTestMessageLogger();
+
+ _rootLogger = new RootMessageLoggerImpl(serverConfig, _rawLogger);
+ }
+
+ public void tearDown()
+ {
+ _rawLogger.clearLogMessages();
+ }
+
+ public void testLog()
+ {
+ String message = "test logging";
+
+ _rootLogger.rawMessage(message);
+
+ List<Object> logs = _rawLogger.getLogMessages();
+
+ assertEquals("Message log size not as expected.", 1, logs.size());
+
+ assertTrue(logs.get(0).toString().contains(message));
+ }
+
+ public void testLogWithThrowable()
+ {
+ String message = "test logging";
+ Exception exception = new Exception("Test");
+
+ _rootLogger.rawMessage(message, exception);
+
+ List<Object> logs = _rawLogger.getLogMessages();
+
+ assertEquals("Message log size not as expected.", 2, logs.size());
+
+ String loggedMessage = (String) logs.get(0);
+ assertTrue("Message not found in log:" + loggedMessage,
+ loggedMessage.contains(message));
+
+ Exception fromLog = (Exception) logs.get(1);
+ assertEquals(exception.getMessage(), fromLog.getMessage());
+ }
+
+
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/StatusUpdateConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/StatusUpdateConfigurationTest.java
new file mode 100644
index 0000000000..9a3c18bf99
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/StatusUpdateConfigurationTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging;
+
+import junit.framework.TestCase;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+
+/**
+ * Set of test to validate the effects of the changes made to the
+ * ServerConfiguration to enable the enabling/disabling of status update
+ * messages.
+ *
+ * The default is to on.
+ */
+public class StatusUpdateConfigurationTest extends TestCase
+{
+
+ /**
+ * Validate that with no configuration the status updates will default to
+ * enabled.
+ * @throws org.apache.commons.configuration.ConfigurationException
+ * - if there was a problem in creating the configuratino
+ */
+ public void testEnabled() throws ConfigurationException
+ {
+
+ ServerConfiguration serverConfig = new ServerConfiguration(
+ new PropertiesConfiguration());
+
+ assertTrue("Status Updates not enabled as expected.",
+ serverConfig.getStatusUpdates());
+ }
+
+
+ /**
+ * Validate that through the config it is possible to disable status updates
+ * @throws org.apache.commons.configuration.ConfigurationException
+ * - if there was a problem in creating the configuratino
+ */
+ public void testUpdateControls() throws ConfigurationException
+ {
+
+ Configuration config = new PropertiesConfiguration();
+ ServerConfiguration serverConfig = new ServerConfiguration(config);
+
+ config.setProperty("status-updates", "off");
+
+
+ assertFalse("Status Updates should not be enabled.",
+ serverConfig.getStatusUpdates());
+ }
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java
new file mode 100644
index 0000000000..298e3bc22c
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java
@@ -0,0 +1,206 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.actors;
+
+import junit.framework.TestCase;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.MockProtocolSession;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.logging.actors.AMQPConnectionActor;
+import org.apache.qpid.server.logging.rawloggers.UnitTestMessageLogger;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.RootMessageLoggerImpl;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.LogMessage;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.AMQChannel;
+
+import java.util.List;
+
+/**
+ * Test : AMQPConnectionActorTest
+ * Validate the AMQPConnectionActor class.
+ *
+ * The test creates a new AMQPActor and then logs a message using it.
+ *
+ * The test then verifies that the logged message was the only one created and
+ * that the message contains the required message.
+ */
+public class AMQPChannelActorTest extends TestCase
+{
+
+ LogActor _amqpActor;
+ UnitTestMessageLogger _rawLogger;
+
+ public void setUp() throws ConfigurationException, AMQException
+ {
+ Configuration config = new PropertiesConfiguration();
+ ServerConfiguration serverConfig = new ServerConfiguration(config);
+
+ _rawLogger = new UnitTestMessageLogger();
+ RootMessageLogger rootLogger =
+ new RootMessageLoggerImpl(serverConfig, _rawLogger);
+
+ // Create a single session for this test.
+ // Re-use is ok as we are testing the LogActor object is set correctly,
+ // not the value of the output.
+ AMQProtocolSession session = new MockProtocolSession(new MemoryMessageStore());
+ // Use the first Virtualhost that has been defined to initialise
+ // the MockProtocolSession. This prevents a NPE when the
+ // AMQPActor attempts to lookup the name of the VHost.
+ try
+ {
+ session.setVirtualHost(ApplicationRegistry.getInstance().
+ getVirtualHostRegistry().getVirtualHosts().
+ toArray(new VirtualHost[1])[0]);
+ }
+ catch (AMQException e)
+ {
+ fail("Unable to set virtualhost on session:" + e.getMessage());
+ }
+
+
+ AMQChannel channel = new AMQChannel(session, 1, session.getVirtualHost().getMessageStore());
+
+ _amqpActor = new AMQPChannelActor(channel, rootLogger);
+
+ }
+
+ public void tearDown()
+ {
+ _rawLogger.clearLogMessages();
+ }
+
+ /**
+ * Test that when logging on behalf of the channel
+ * The test sends a message then verifies that it entered the logs.
+ *
+ * The log message should be fully repalaced (no '{n}' values) and should
+ * contain the channel id ('/ch:1') identification.
+ */
+ public void testChannel()
+ {
+ final String message = "test logging";
+
+ _amqpActor.message(new LogSubject()
+ {
+ public String toString()
+ {
+ return "[AMQPActorTest]";
+ }
+
+ }, new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+ });
+
+ List<Object> logs = _rawLogger.getLogMessages();
+
+ assertEquals("Message log size not as expected.", 1, logs.size());
+
+ // Verify that the logged message is present in the output
+ assertTrue("Message was not found in log message:" + logs.get(0),
+ logs.get(0).toString().contains(message));
+
+ // Verify that the message has the correct type
+ assertTrue("Message contains the [con: prefix",
+ logs.get(0).toString().contains("[con:"));
+
+
+ // Verify that all the values were presented to the MessageFormatter
+ // so we will not end up with '{n}' entries in the log.
+ assertFalse("Verify that the string does not contain any '{'." + logs.get(0),
+ logs.get(0).toString().contains("{"));
+
+ // Verify that the logged message contains the 'ch:1' marker
+ assertTrue("Message was not logged as part of channel 1" + logs.get(0),
+ logs.get(0).toString().contains("/ch:1"));
+
+ }
+
+ public void testChannelLoggingOff() throws ConfigurationException, AMQException
+ {
+ Configuration config = new PropertiesConfiguration();
+ config.addProperty("status-updates", "OFF");
+
+ ServerConfiguration serverConfig = new ServerConfiguration(config);
+
+ _rawLogger = new UnitTestMessageLogger();
+ RootMessageLogger rootLogger =
+ new RootMessageLoggerImpl(serverConfig, _rawLogger);
+
+ // Create a single session for this test.
+ // Re-use is ok as we are testing the LogActor object is set correctly,
+ // not the value of the output.
+ AMQProtocolSession session = new MockProtocolSession(new MemoryMessageStore());
+ // Use the first Virtualhost that has been defined to initialise
+ // the MockProtocolSession. This prevents a NPE when the
+ // AMQPActor attempts to lookup the name of the VHost.
+ try
+ {
+ session.setVirtualHost(ApplicationRegistry.getInstance().
+ getVirtualHostRegistry().getVirtualHosts().
+ toArray(new VirtualHost[1])[0]);
+ }
+ catch (AMQException e)
+ {
+ fail("Unable to set virtualhost on session:" + e.getMessage());
+ }
+
+
+ AMQChannel channel = new AMQChannel(session, 1, session.getVirtualHost().getMessageStore());
+
+ _amqpActor = new AMQPChannelActor(channel, rootLogger);
+
+ final String message = "test logging";
+
+ _amqpActor.message(new LogSubject()
+ {
+ public String toString()
+ {
+ return "[AMQPActorTest]";
+ }
+
+ }, new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+ });
+
+ List<Object> logs = _rawLogger.getLogMessages();
+
+ assertEquals("Message log size not as expected.", 0, logs.size());
+
+ }
+
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java
new file mode 100644
index 0000000000..c220865864
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java
@@ -0,0 +1,202 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.actors;
+
+import junit.framework.TestCase;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.MockProtocolSession;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.logging.actors.AMQPConnectionActor;
+import org.apache.qpid.server.logging.rawloggers.UnitTestMessageLogger;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.RootMessageLoggerImpl;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.LogMessage;
+import org.apache.qpid.server.logging.LogActor;
+
+import java.util.List;
+
+/**
+ * Test : AMQPConnectionActorTest
+ * Validate the AMQPConnectionActor class.
+ *
+ * The test creates a new AMQPActor and then logs a message using it.
+ *
+ * The test then verifies that the logged message was the only one created and
+ * that the message contains the required message.
+ */
+public class AMQPConnectionActorTest extends TestCase
+{
+
+ LogActor _amqpActor;
+ UnitTestMessageLogger _rawLogger;
+
+ public void setUp() throws ConfigurationException
+ {
+ Configuration config = new PropertiesConfiguration();
+ ServerConfiguration serverConfig = new ServerConfiguration(config);
+
+ _rawLogger = new UnitTestMessageLogger();
+ RootMessageLogger rootLogger =
+ new RootMessageLoggerImpl(serverConfig, _rawLogger);
+
+ // Create a single session for this test.
+ // Re-use is ok as we are testing the LogActor object is set correctly,
+ // not the value of the output.
+ AMQProtocolSession session = new MockProtocolSession(new MemoryMessageStore());
+ // Use the first Virtualhost that has been defined to initialise
+ // the MockProtocolSession. This prevents a NPE when the
+ // AMQPActor attempts to lookup the name of the VHost.
+ try
+ {
+ session.setVirtualHost(ApplicationRegistry.getInstance().
+ getVirtualHostRegistry().getVirtualHosts().
+ toArray(new VirtualHost[1])[0]);
+ }
+ catch (AMQException e)
+ {
+ fail("Unable to set virtualhost on session:" + e.getMessage());
+ }
+
+ _amqpActor = new AMQPConnectionActor(session, rootLogger);
+ }
+
+ public void tearDown()
+ {
+ _rawLogger.clearLogMessages();
+ }
+
+ /**
+ * Test the AMQPActor logging as a Connection level.
+ *
+ * The test sends a message then verifies that it entered the logs.
+ *
+ * The log message should be fully repalaced (no '{n}' values) and should
+ * not contain any channel identification.
+ *
+ */
+ public void testConnection()
+ {
+ final String message = "test logging";
+
+ _amqpActor.message(new LogSubject()
+ {
+ public String toString()
+ {
+ return "[AMQPActorTest]";
+ }
+
+ }, new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+ });
+
+ List<Object> logs = _rawLogger.getLogMessages();
+
+ assertEquals("Message log size not as expected.", 1, logs.size());
+
+ // Verify that the logged message is present in the output
+ assertTrue("Message was not found in log message",
+ logs.get(0).toString().contains(message));
+
+ // Verify that the message has the correct type
+ assertTrue("Message contains the [con: prefix",
+ logs.get(0).toString().contains("[con:"));
+
+ // Verify that all the values were presented to the MessageFormatter
+ // so we will not end up with '{n}' entries in the log.
+ assertFalse("Verify that the string does not contain any '{'.",
+ logs.get(0).toString().contains("{"));
+
+ // Verify that the logged message does not contains the 'ch:' marker
+ assertFalse("Message was logged with a channel identifier." + logs.get(0),
+ logs.get(0).toString().contains("/ch:"));
+ }
+
+
+
+ public void testConnectionLoggingOff() throws ConfigurationException, AMQException
+ {
+ Configuration config = new PropertiesConfiguration();
+ config.addProperty("status-updates", "OFF");
+
+ ServerConfiguration serverConfig = new ServerConfiguration(config);
+
+ _rawLogger = new UnitTestMessageLogger();
+ RootMessageLogger rootLogger =
+ new RootMessageLoggerImpl(serverConfig, _rawLogger);
+
+ // Create a single session for this test.
+ // Re-use is ok as we are testing the LogActor object is set correctly,
+ // not the value of the output.
+ AMQProtocolSession session = new MockProtocolSession(new MemoryMessageStore());
+ // Use the first Virtualhost that has been defined to initialise
+ // the MockProtocolSession. This prevents a NPE when the
+ // AMQPActor attempts to lookup the name of the VHost.
+ try
+ {
+ session.setVirtualHost(ApplicationRegistry.getInstance().
+ getVirtualHostRegistry().getVirtualHosts().
+ toArray(new VirtualHost[1])[0]);
+ }
+ catch (AMQException e)
+ {
+ fail("Unable to set virtualhost on session:" + e.getMessage());
+ }
+
+
+ _amqpActor = new AMQPConnectionActor(session, rootLogger);
+
+ final String message = "test logging";
+
+ _amqpActor.message(new LogSubject()
+ {
+ public String toString()
+ {
+ return "[AMQPActorTest]";
+ }
+
+ }, new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+ });
+
+ List<Object> logs = _rawLogger.getLogMessages();
+
+ assertEquals("Message log size not as expected.", 0, logs.size());
+
+ }
+
+
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java
new file mode 100644
index 0000000000..c1cc3253a8
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java
@@ -0,0 +1,262 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.actors;
+
+import junit.framework.TestCase;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.logging.LogMessage;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.MockProtocolSession;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+/**
+ * Test : CurrentActorTest
+ * Summary:
+ * Validate ThreadLocal operation.
+ *
+ * Test creates THREADS number of threads which all then execute the same test
+ * together ( as close as looping Thread.start() will allow).
+ *
+ * Test:
+ * Test sets the CurrentActor then proceeds to retrieve the value and use it.
+ *
+ * The test also validates that it is the same LogActor that this thread set.
+ *
+ * Finally the LogActor is removed and tested to make sure that it was
+ * successfully removed.
+ *
+ * By having a higher number of threads than would normally be used in the
+ * Poolling filter we aim to catch the race condition where a ThreadLocal remove
+ * is called before one or more threads call get(). This way we can ensure that
+ * the remove does not affect more than the Thread it was called in.
+ */
+public class CurrentActorTest extends TestCase
+{
+ //Set this to be a reasonably large number
+ int THREADS = 10;
+
+ // Record any exceptions that are thrown by the threads
+ final Exception[] _errors = new Exception[THREADS];
+
+ // Create a single session for this test.
+ AMQProtocolSession session;
+
+ public void setUp()
+ {
+ // Create a single session for this test.
+ // Re-use is ok as we are testing the LogActor object is set correctly,
+ // not the value of the output.
+ session = new MockProtocolSession(new MemoryMessageStore());
+ // Use the first Virtualhost that has been defined to initialise
+ // the MockProtocolSession. This prevents a NPE when the
+ // AMQPActor attempts to lookup the name of the VHost.
+ try
+ {
+ session.setVirtualHost(ApplicationRegistry.getInstance().
+ getVirtualHostRegistry().getVirtualHosts().
+ toArray(new VirtualHost[1])[0]);
+ }
+ catch (AMQException e)
+ {
+ fail("Unable to set virtualhost on session:" + e.getMessage());
+ }
+ }
+
+ public void testFIFO() throws AMQException
+ {
+ // Create a new actor using retrieving the rootMessageLogger from
+ // the default ApplicationRegistry.
+ //fixme reminder that we need a better approach for broker testing.
+ AMQPConnectionActor connectionActor = new AMQPConnectionActor(session,
+ ApplicationRegistry.getInstance().
+ getRootMessageLogger());
+
+ CurrentActor.set(connectionActor);
+
+ //Use the Actor to send a simple message
+ CurrentActor.get().message(new LogSubject()
+ {
+ public String toString()
+ {
+ return "[CurrentActorTest] ";
+ }
+
+ }, new LogMessage()
+ {
+ public String toString()
+ {
+ return "Connection Log Msg";
+ }
+ });
+
+ // Verify it was the same actor as we set earlier
+ assertEquals("Retrieved actor is not as expected ",
+ connectionActor, CurrentActor.get());
+
+ /**
+ * Set the actor to nwo be the Channel actor so testing the ability
+ * to push the actor on to the stack
+ */
+
+ AMQChannel channel = new AMQChannel(session, 1, session.getVirtualHost().getMessageStore());
+
+ AMQPChannelActor channelActor = new AMQPChannelActor(channel,
+ ApplicationRegistry.getInstance().
+ getRootMessageLogger());
+
+ CurrentActor.set(channelActor);
+
+ //Use the Actor to send a simple message
+ CurrentActor.get().message(new LogSubject()
+ {
+ public String toString()
+ {
+ return "[CurrentActorTest] ";
+ }
+
+ }, new LogMessage()
+ {
+ public String toString()
+ {
+ return "Channel Log Msg";
+ }
+ });
+
+ // Verify it was the same actor as we set earlier
+ assertEquals("Retrieved actor is not as expected ",
+ channelActor, CurrentActor.get());
+
+ // Remove the ChannelActor from the stack
+ CurrentActor.remove();
+
+ // Verify we now have the same connection actor as we set earlier
+ assertEquals("Retrieved actor is not as expected ",
+ connectionActor, CurrentActor.get());
+
+ // Verify that removing the last actor returns us to a null value.
+ CurrentActor.remove();
+
+ assertNull("CurrentActor should be null", CurrentActor.get());
+
+ }
+
+ public void testThreadLocal()
+ {
+
+ // Setup the threads
+ Thread[] threads = new Thread[THREADS];
+ for (int count = 0; count < THREADS; count++)
+ {
+ Runnable test = new Test(count);
+ threads[count] = new Thread(test);
+ }
+
+ //Run the threads
+ for (int count = 0; count < THREADS; count++)
+ {
+ threads[count].start();
+ }
+
+ // Wait for them to finish
+ for (int count = 0; count < THREADS; count++)
+ {
+ try
+ {
+ threads[count].join();
+ }
+ catch (InterruptedException e)
+ {
+ //if we are interrupted then we will exit shortly.
+ }
+ }
+
+ // Verify that none of the tests threw an exception
+ for (int count = 0; count < THREADS; count++)
+ {
+ if (_errors[count] != null)
+ {
+ _errors[count].printStackTrace();
+ fail("Error occured in thread:" + count);
+ }
+ }
+ }
+
+ public class Test implements Runnable
+ {
+ int count;
+
+ Test(int count)
+ {
+ this.count = count;
+ }
+
+ public void run()
+ {
+
+ // Create a new actor using retrieving the rootMessageLogger from
+ // the default ApplicationRegistry.
+ //fixme reminder that we need a better approach for broker testing.
+ AMQPConnectionActor actor = new AMQPConnectionActor(session,
+ ApplicationRegistry.getInstance().
+ getRootMessageLogger());
+
+ CurrentActor.set(actor);
+
+ try
+ {
+ //Use the Actor to send a simple message
+ CurrentActor.get().message(new LogSubject()
+ {
+ public String toString()
+ {
+ return "[CurrentActorTest] ";
+ }
+
+ }, new LogMessage()
+ {
+ public String toString()
+ {
+ return "Running Thread:" + count;
+ }
+ });
+
+ // Verify it was the same actor as we set earlier
+ assertEquals("Retrieved actor is not as expected ",
+ actor, CurrentActor.get());
+
+ // Verify that removing the actor works for this thread
+ CurrentActor.remove();
+
+ assertNull("CurrentActor should be null", CurrentActor.get());
+ }
+ catch (Exception e)
+ {
+ _errors[count] = e;
+ }
+
+ }
+ }
+
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/ManagementActorTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/ManagementActorTest.java
new file mode 100644
index 0000000000..fa0bb6529e
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/ManagementActorTest.java
@@ -0,0 +1,125 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.actors;
+
+import junit.framework.TestCase;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogMessage;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.RootMessageLoggerImpl;
+import org.apache.qpid.server.logging.rawloggers.UnitTestMessageLogger;
+
+import java.security.Principal;
+import java.util.List;
+
+/**
+ * Test : AMQPConnectionActorTest
+ * Validate the AMQPConnectionActor class.
+ *
+ * The test creates a new AMQPActor and then logs a message using it.
+ *
+ * The test then verifies that the logged message was the only one created and
+ * that the message contains the required message.
+ */
+public class ManagementActorTest extends TestCase
+{
+
+ LogActor _amqpActor;
+ UnitTestMessageLogger _rawLogger;
+
+ public void setUp() throws ConfigurationException
+ {
+ Configuration config = new PropertiesConfiguration();
+ ServerConfiguration serverConfig = new ServerConfiguration(config);
+
+ _rawLogger = new UnitTestMessageLogger();
+ RootMessageLogger rootLogger =
+ new RootMessageLoggerImpl(serverConfig, _rawLogger);
+
+ _amqpActor = new ManagementActor(new Principal()
+ {
+ public String getName()
+ {
+ return "ManagementActorTest";
+ }
+ }, rootLogger);
+ }
+
+ public void tearDown()
+ {
+ _rawLogger.clearLogMessages();
+ }
+
+ /**
+ * Test the AMQPActor logging as a Connection level.
+ *
+ * The test sends a message then verifies that it entered the logs.
+ *
+ * The log message should be fully repalaced (no '{n}' values) and should
+ * not contain any channel identification.
+ */
+ public void testConnection()
+ {
+ final String message = "test logging";
+
+ _amqpActor.message(new LogSubject()
+ {
+ public String toString()
+ {
+ return "[AMQPActorTest]";
+ }
+
+ }, new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+ });
+
+ List<Object> logs = _rawLogger.getLogMessages();
+
+ assertEquals("Message log size not as expected.", 1, logs.size());
+
+ // Verify that the logged message is present in the output
+ assertTrue("Message was not found in log message",
+ logs.get(0).toString().contains(message));
+
+ // Verify that all the values were presented to the MessageFormatter
+ // so we will not end up with '{n}' entries in the log.
+ assertFalse("Verify that the string does not contain any '{'.",
+ logs.get(0).toString().contains("{"));
+
+ // Verify that the message has the correct type
+ assertTrue("Message contains the [mng: prefix",
+ logs.get(0).toString().contains("[mng:"));
+
+ // Verify that the logged message does not contains the 'ch:' marker
+ assertFalse("Message was logged with a channel identifier." + logs.get(0),
+ logs.get(0).toString().contains("/ch:"));
+ }
+
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/TestBlankActor.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/TestBlankActor.java
new file mode 100644
index 0000000000..ec84d8bc9b
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/TestBlankActor.java
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.actors;
+
+import org.apache.qpid.server.logging.RootMessageLogger;
+
+public class TestBlankActor extends AbstractActor
+{
+ public TestBlankActor(RootMessageLogger rootLogger)
+ {
+ super(rootLogger);
+ _logString = "[Blank]";
+ }
+}
+ \ No newline at end of file
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLoggerTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLoggerTest.java
new file mode 100644
index 0000000000..d7a5aa667b
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLoggerTest.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging.rawloggers;
+
+import junit.framework.TestCase;
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+/** Test that the Log4jMessageLogger defaults behave as expected */
+public class Log4jMessageLoggerTest extends TestCase
+{
+ private File _lodgfile;
+
+ Level _rootLevel;
+ Log4jTestAppender _appender;
+
+ @Override
+ public void setUp() throws IOException
+ {
+ // Setup a file for logging
+ _appender = new Log4jTestAppender();
+
+ Logger root = Logger.getRootLogger();
+ root.addAppender(_appender);
+
+ _rootLevel = Logger.getRootLogger().getLevel();
+ if (_rootLevel != Level.INFO)
+ {
+ root.setLevel(Level.INFO);
+ root.warn("Root Logger set to:" + _rootLevel + " Resetting to INFO for test.");
+ }
+ root.warn("Adding Test Appender:" + _appender);
+ }
+
+ @Override
+ public void tearDown()
+ {
+ Logger root = Logger.getRootLogger();
+ root.warn("Removing Test Appender:" + _appender);
+ root.warn("Resetting Root Level to : " + _rootLevel);
+
+ Logger.getRootLogger().setLevel(_rootLevel);
+
+ Logger.getRootLogger().removeAppender(_appender);
+
+ //Call close on our appender. This will clear the log messages
+ // from Memory
+ _appender.close();
+ }
+
+ /**
+ * Verify that the default configuraion of Log4jMessageLogger will
+ * log a message.
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public void testDefaultLogsMessage() throws IOException, InterruptedException
+ {
+ // Create a logger to test
+ Log4jMessageLogger logger = new Log4jMessageLogger();
+
+ //Create Message for test
+ String message = "testDefaults";
+
+ // Log the message
+ logger.rawMessage(message);
+
+ verifyLogPresent(message);
+ }
+
+ /**
+ * This test checks that if the Root Logger level is set such that the INFO
+ * messages would not be logged then the Log4jMessageLogger default of INFO
+ * will result in logging not being presented.
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public void testDefaultsLogsAtInfo() throws IOException, InterruptedException
+ {
+ // Create a logger to test
+ Log4jMessageLogger logger = new Log4jMessageLogger();
+
+ //Create Message for test
+ String message = "testDefaults";
+
+ //Set default logger level to off
+ Logger.getRootLogger().setLevel(Level.WARN);
+
+ // Log the message
+ logger.rawMessage(message);
+
+ verifyNoLog(message);
+ }
+
+ /**
+ * Test that changing the logger works.
+ * <p/>
+ * Test this by setting the default logger level to off which has been
+ * verified to work by test 'testDefaultsLevelObeyed'
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public void testDefaultLoggerAdjustment() throws IOException, InterruptedException
+ {
+ String loggerName = "TestLogger";
+ // Create a logger to test
+ Log4jMessageLogger logger = new Log4jMessageLogger(Log4jMessageLogger.DEFAULT_LEVEL, loggerName);
+
+ //Create Message for test
+ String message = "testDefaults";
+
+ //Disable the default Log4jMessageLogger logger
+ Level originalLevel = Logger.getLogger(Log4jMessageLogger.DEFAULT_LOGGER).getLevel();
+ Logger.getLogger(Log4jMessageLogger.DEFAULT_LOGGER).setLevel(Level.OFF);
+
+ // Log the message
+ logger.rawMessage(message);
+
+ verifyLogPresent(message);
+
+ // Restore the logging level
+ Logger.getLogger(Log4jMessageLogger.DEFAULT_LOGGER).setLevel(originalLevel);
+ }
+
+ /**
+ * Test that changing the log level has an effect.
+ * Set the level to be debug
+ * but only set the logger to log at INFO
+ * there should be no data printed
+ * subsequently changing the root logger to allow DEBUG should
+ * show the message
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public void testDefaultsLevelObeyed() throws IOException, InterruptedException
+ {
+ // Create a logger to test
+ Log4jMessageLogger logger = new Log4jMessageLogger("DEBUG", Log4jMessageLogger.DEFAULT_LOGGER);
+
+ //Create Message for test
+ String message = "testDefaults";
+
+ //Set root logger to INFO only
+ Logger.getRootLogger().setLevel(Level.INFO);
+
+ // Log the message
+ logger.rawMessage(message);
+
+ verifyNoLog(message);
+
+ //Set root logger to INFO only
+ Logger.getRootLogger().setLevel(Level.DEBUG);
+
+ // Log the message
+ logger.rawMessage(message);
+
+ verifyLogPresent(message);
+ }
+
+ /**
+ * Check that the Log Message reached log4j
+ * @param message the message to search for
+ */
+ private void verifyLogPresent(String message)
+ {
+ List<String> results = findMessageInLog(message);
+
+ //Validate we only got one message
+ assertEquals("The result set was not as expected.", 1, results.size());
+
+ // Validate message
+ String line = results.get(0);
+
+ assertNotNull("No Message retrieved from log file", line);
+ assertTrue("Message not contained in log.:" + line,
+ line.contains(message));
+ }
+
+ /**
+ * Check that the given Message is not present in the log4j records.
+ * @param message the message to search for
+ */
+ private void verifyNoLog(String message)
+ {
+ List<String> results = findMessageInLog(message);
+
+ //Validate we only got one message
+ if (results.size() > 0)
+ {
+ System.err.println("Unexpected Log messages");
+
+ for (String msg : results)
+ {
+ System.err.println(msg);
+ }
+ }
+
+ assertEquals("No messages expected.", 0, results.size());
+ }
+
+ /**
+ * Get the appenders list of events and return a list of all the messages
+ * that contain the given message
+ *
+ * @param message the search string
+ * @return The list of all logged messages that contain the search string.
+ */
+ private List<String> findMessageInLog(String message)
+ {
+ List<LoggingEvent> log = _appender.getLog();
+
+ // Search Results for requested message
+ List<String> result = new LinkedList<String>();
+
+ for (LoggingEvent event : log)
+ {
+ if (String.valueOf(event.getMessage()).contains(message))
+ {
+ result.add(String.valueOf(event.getMessage()));
+ }
+ }
+
+ return result;
+ }
+
+
+ /**
+ * Log4j Appender that simply records all the Logging Events so we can
+ * verify that the above logging will make it to log4j in a unit test.
+ */
+ private class Log4jTestAppender extends AppenderSkeleton
+ {
+ List<LoggingEvent> _log = new LinkedList<LoggingEvent>();
+
+ protected void append(LoggingEvent loggingEvent)
+ {
+ _log.add(loggingEvent);
+ }
+
+ public void close()
+ {
+ _log.clear();
+ }
+
+ /**
+ * @return the list of LoggingEvents that have occured in this Appender
+ */
+ public List<LoggingEvent> getLog()
+ {
+ return _log;
+ }
+
+ public boolean requiresLayout()
+ {
+ return false;
+ }
+ }
+}
+
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/UnitTestMessageLogger.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/UnitTestMessageLogger.java
new file mode 100644
index 0000000000..df50cfb57a
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/UnitTestMessageLogger.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging.rawloggers;
+
+import org.apache.qpid.server.logging.RawMessageLogger;
+
+import java.util.List;
+import java.util.LinkedList;
+
+public class UnitTestMessageLogger implements RawMessageLogger
+{
+ List<Object> _log;
+
+ public UnitTestMessageLogger()
+ {
+ _log = new LinkedList<Object>();
+ }
+
+
+ public void rawMessage(String message)
+ {
+ _log.add(message);
+ }
+
+ public void rawMessage(String message, Throwable throwable)
+ {
+ _log.add(message);
+ _log.add(throwable);
+ }
+
+
+ public List<Object> getLogMessages()
+ {
+ return _log;
+ }
+
+ public void clearLogMessages()
+ {
+ _log.clear();
+ }
+
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/UnitTestMessageLoggerTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/UnitTestMessageLoggerTest.java
new file mode 100644
index 0000000000..e10de48432
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/UnitTestMessageLoggerTest.java
@@ -0,0 +1,102 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.rawloggers;
+
+import junit.framework.TestCase;
+
+import java.util.List;
+
+/**
+ * Test: UnitTestMessageLoggerTest
+ *
+ * This test verifies that UnitTestMessageLogger adhears to its interface.
+ *
+ * Messages are logged, and Throwables recorded in an array that can be
+ * retreived and cleared.
+ *
+ */
+public class UnitTestMessageLoggerTest extends TestCase
+{
+ private static final String TEST_MESSAGE = "Test";
+ private static final String TEST_THROWABLE = "Test Throwable";
+
+ public void testRawMessage()
+ {
+ UnitTestMessageLogger logger = new UnitTestMessageLogger();
+
+ assertEquals("Messages logged before test start", 0,
+ logger.getLogMessages().size());
+
+ // Log a message
+ logger.rawMessage(TEST_MESSAGE);
+
+ List<Object> messages = logger.getLogMessages();
+
+ assertEquals("Expected to have 1 messages logged", 1, messages.size());
+
+ assertEquals("First message not what was logged",
+ TEST_MESSAGE, messages.get(0));
+ }
+
+ public void testRawMessageWithThrowable()
+ {
+ UnitTestMessageLogger logger = new UnitTestMessageLogger();
+
+ assertEquals("Messages logged before test start", 0,
+ logger.getLogMessages().size());
+
+ // Log a message
+ Throwable throwable = new Throwable(TEST_THROWABLE);
+
+ logger.rawMessage(TEST_MESSAGE, throwable);
+
+ List<Object> messages = logger.getLogMessages();
+
+ assertEquals("Expected to have 2 entries", 2, messages.size());
+
+ assertEquals("Message text not what was logged",
+ TEST_MESSAGE, messages.get(0));
+
+ assertEquals("Message throwable not what was logged",
+ TEST_THROWABLE, ((Throwable) messages.get(1)).getMessage());
+
+ }
+
+ public void testClear()
+ {
+ UnitTestMessageLogger logger = new UnitTestMessageLogger();
+
+ assertEquals("Messages logged before test start", 0,
+ logger.getLogMessages().size());
+
+ // Log a message
+ logger.rawMessage(TEST_MESSAGE);
+
+ assertEquals("Expected to have 1 messages logged",
+ 1, logger.getLogMessages().size());
+
+ logger.clearLogMessages();
+
+ assertEquals("Expected to have no messagse after a clear",
+ 0, logger.getLogMessages().size());
+
+ }
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java
new file mode 100644
index 0000000000..04081db8e3
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java
@@ -0,0 +1,258 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import junit.framework.TestCase;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogMessage;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.RootMessageLoggerImpl;
+import org.apache.qpid.server.logging.actors.TestBlankActor;
+import org.apache.qpid.server.logging.rawloggers.UnitTestMessageLogger;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.util.List;
+
+public abstract class AbstractTestLogSubject extends TestCase
+{
+ protected Configuration _config = new PropertiesConfiguration();
+ protected LogSubject _subject = null;
+
+ protected List<Object> performLog() throws ConfigurationException
+ {
+ if (_subject == null)
+ {
+ throw new NullPointerException("LogSubject has not been set");
+ }
+
+ ServerConfiguration serverConfig = new ServerConfiguration(_config);
+
+ UnitTestMessageLogger logger = new UnitTestMessageLogger();
+ RootMessageLogger rootLogger =
+ new RootMessageLoggerImpl(serverConfig, logger);
+
+ LogActor actor_actor = new TestBlankActor(rootLogger);
+
+ actor_actor.message(_subject, new LogMessage()
+ {
+ public String toString()
+ {
+ return "<Log Message>";
+ }
+ });
+
+ return logger.getLogMessages();
+ }
+
+ /**
+ * Verify that the connection section has the expected items
+ *
+ * @param connectionID - The connection id (int) to check for
+ * @param user - the Connected username
+ * @param ipString - the ipString/hostname
+ * @param vhost - the virtualhost that the user connected to.
+ * @param message - the message these values should appear in.
+ */
+ protected void verifyConnection(long connectionID, String user, String ipString, String vhost, String message)
+ {
+ // This should return us MockProtocolSessionUser@null/test
+ String connectionSlice = getSlice("con:" + connectionID, message);
+
+ assertNotNull("Unable to find connection 'con:" + connectionID + "'",
+ connectionSlice);
+
+ // Exract the userName
+ String[] userNameParts = connectionSlice.split("@");
+
+ assertEquals("Unable to split Username from rest of Connection:"
+ + connectionSlice, 2, userNameParts.length);
+
+ assertEquals("Username not as expected", userNameParts[0], user);
+
+ // Extract IP.
+ String[] ipParts = userNameParts[1].split("/");
+
+ assertEquals("Unable to split IP from rest of Connection:"
+ + userNameParts[1], 2, ipParts.length);
+
+ assertEquals("IP not as expected", ipParts[0], ipString);
+
+ //Finally check vhost
+ assertEquals("Virtualhost name not as expected.", vhost, ipParts[1]);
+ }
+
+ /**
+ * Verify that the RoutingKey is present in the provided message.
+ *
+ * @param message The message to check
+ * @param routingKey The routing key to check against
+ */
+ protected void verifyRoutingKey(String message, AMQShortString routingKey)
+ {
+ String routingKeySlice = getSlice("rk", message);
+
+ assertNotNull("Routing Key not found:" + message, routingKey);
+
+ assertEquals("Routing key not correct",
+ routingKey.toString(), routingKeySlice);
+ }
+
+ /**
+ * Verify that the given Queue's name exists in the provided message
+ *
+ * @param message The message to check
+ * @param queue The queue to check against
+ */
+ protected void verifyQueue(String message, AMQQueue queue)
+ {
+ String queueSlice = getSlice("qu", message);
+
+ assertNotNull("Queue not found:" + message, queueSlice);
+
+ assertEquals("Queue name not correct",
+ queue.getName().toString(), queueSlice);
+ }
+
+ /**
+ * Verify that the given exchange (name and type) are present in the
+ * provided message.
+ *
+ * @param message The message to check
+ * @param exchange the exchange to check against
+ */
+ protected void verifyExchange(String message, Exchange exchange)
+ {
+ String exchangeSilce = getSlice("ex", message);
+
+ assertNotNull("Exchange not found:" + message, exchangeSilce);
+
+ String[] exchangeParts = exchangeSilce.split("/");
+
+ assertEquals("Exchange should be in two parts ex(type/name)", 2,
+ exchangeParts.length);
+
+ assertEquals("Exchange type not correct",
+ exchange.getType().toString(), exchangeParts[0]);
+
+ assertEquals("Exchange name not correct",
+ exchange.getName().toString(), exchangeParts[1]);
+
+ }
+
+ /**
+ * Verify that a VirtualHost with the given name appears in the given
+ * message.
+ *
+ * @param message the message to search
+ * @param vhost the vhostName to check against
+ */
+ protected void verifyVirtualHost(String message, VirtualHost vhost)
+ {
+ String vhostSlice = getSlice("vh", message);
+
+ assertNotNull("Virtualhost not found:" + message, vhostSlice);
+
+ assertEquals("Virtualhost not correct", "/" + vhost.getName(), vhostSlice);
+ }
+
+ /**
+ * Parse the log message and return the slice according to the following:
+ * Given Example:
+ * con:1(guest@127.0.0.1/test)/ch:2/ex(amq.direct)/qu(myQueue)/rk(myQueue)
+ *
+ * Each item (except channel) is of the format <key>(<values>)
+ *
+ * So Given an ID to slice on:
+ * con:1 - Connection 1
+ * ex - exchange
+ * qu - queue
+ * rk - routing key
+ *
+ * @param sliceID the slice to locate
+ * @param message the message to search in
+ *
+ * @return the slice if found otherwise null is returned
+ */
+ protected String getSlice(String sliceID, String message)
+ {
+ int indexOfSlice = message.indexOf(sliceID + "(");
+
+ if (indexOfSlice == -1)
+ {
+ return null;
+ }
+
+ int endIndex = message.indexOf(')', indexOfSlice);
+
+ if (endIndex == -1)
+ {
+ return null;
+ }
+
+ return message.substring(indexOfSlice + 1 + sliceID.length(),
+ endIndex);
+ }
+
+ /**
+ * Test that when Logging occurs a single log statement is provided
+ *
+ * @throws ConfigurationException
+ */
+ public void testEnabled() throws ConfigurationException
+ {
+ List<Object> logs = performLog();
+
+ assertEquals("Log has to many messagse", 1, logs.size());
+
+ validateLogStatement(String.valueOf(logs.get(0)));
+ }
+
+ /**
+ * Call to the individiual tests to validate the message is formatted as
+ * expected
+ *
+ * @param message the message whos format needs validation
+ */
+ protected abstract void validateLogStatement(String message);
+
+ /**
+ * Ensure that when status-updates are off this does not perform logging
+ *
+ * @throws ConfigurationException
+ */
+ public void testDisabled() throws ConfigurationException
+ {
+ _config.addProperty("status-updates", "OFF");
+
+ List<Object> logs = performLog();
+
+ assertEquals("Log has to many messagse", 0, logs.size());
+ }
+
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java
new file mode 100644
index 0000000000..845d02267f
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MockAMQQueue;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class BindingLogSubjectTest extends AbstractTestLogSubject
+{
+
+ AMQQueue _queue;
+ AMQShortString _routingKey;
+ Exchange _exchange;
+ VirtualHost _testVhost;
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ _testVhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().
+ getVirtualHost("test");
+ // Configure items for subjectCreation
+ _routingKey = new AMQShortString("RoutingKey");
+ _exchange = _testVhost.getExchangeRegistry().getDefaultExchange();
+ _queue = new MockAMQQueue("BindingLogSubjectTest");
+ ((MockAMQQueue) _queue).setVirtualHost(_testVhost);
+
+ _subject = new BindingLogSubject(_routingKey, _exchange, _queue);
+ }
+
+ /**
+ * Validate that the logged Subject message is as expected:
+ * MESSAGE [Blank][vh(/test)/ex(direct/<<default>>)/qu(BindingLogSubjectTest)/rk(RoutingKey)] <Log Message>
+ * @param message the message whos format needs validation
+ */
+ @Override
+ protected void validateLogStatement(String message)
+ {
+ verifyVirtualHost(message, _testVhost);
+ verifyExchange(message, _exchange);
+ verifyQueue(message, _queue);
+ verifyRoutingKey(message, _routingKey);
+ }
+
+
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java
new file mode 100644
index 0000000000..9d5cb70f4b
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.MockProtocolSession;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class ChannelLogSubjectTest extends ConnectionLogSubjectTest
+{
+ private final int _channelID = 1;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ // Create a single session for this test.
+ // Re-use is ok as we are testing the LogActor object is set correctly,
+ // not the value of the output.
+ _session = new MockProtocolSession(new MemoryMessageStore());
+ // Use the first Virtualhost that has been defined to initialise
+ // the MockProtocolSession. This prevents a NPE when the
+ // AMQPActor attempts to lookup the name of the VHost.
+ try
+ {
+ _session.setVirtualHost(ApplicationRegistry.getInstance().
+ getVirtualHostRegistry().getVirtualHosts().
+ toArray(new VirtualHost[1])[0]);
+ }
+ catch (AMQException e)
+ {
+ fail("Unable to set virtualhost on session:" + e.getMessage());
+ }
+
+ AMQChannel channel = new AMQChannel(_session, _channelID, _session.getVirtualHost().getMessageStore());
+
+ _subject = new ChannelLogSubject(channel);
+ }
+
+ /**
+ * MESSAGE [Blank][con:0(MockProtocolSessionUser@null/test)/ch:1] <Log Message>
+ *
+ * @param message the message whos format needs validation
+ */
+ protected void validateLogStatement(String message)
+ {
+ // Use the ConnectionLogSubjectTest to vaildate that the connection
+ // section is ok
+ super.validateLogStatement(message);
+
+ // Finally check that the channel identifier is correctly added
+ assertTrue("Channel 1 identifier not found as part of Subject",
+ message.contains(")/ch:1]"));
+ }
+
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java
new file mode 100644
index 0000000000..ff2d9b5e11
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java
@@ -0,0 +1,69 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.MockProtocolSession;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class ConnectionLogSubjectTest extends AbstractTestLogSubject
+{
+ AMQProtocolSession _session;
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ // Create a single session for this test.
+ // Re-use is ok as we are testing the LogActor object is set correctly,
+ // not the value of the output.
+ _session = new MockProtocolSession(new MemoryMessageStore());
+ // Use the first Virtualhost that has been defined to initialise
+ // the MockProtocolSession. This prevents a NPE when the
+ // AMQPActor attempts to lookup the name of the VHost.
+ try
+ {
+ _session.setVirtualHost(ApplicationRegistry.getInstance().
+ getVirtualHostRegistry().getVirtualHosts().
+ toArray(new VirtualHost[1])[0]);
+ }
+ catch (AMQException e)
+ {
+ fail("Unable to set virtualhost on session:" + e.getMessage());
+ }
+
+ _subject = new ConnectionLogSubject(_session);
+ }
+
+ /**
+ * MESSAGE [Blank][con:0(MockProtocolSessionUser@null/test)] <Log Message>
+ *
+ * @param message the message whos format needs validation
+ */
+ protected void validateLogStatement(String message)
+ {
+ verifyConnection(_session.getSessionID(), "MockProtocolSessionUser", "null", "test", message);
+ }
+
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubjectTest.java
new file mode 100644
index 0000000000..35df4c5976
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubjectTest.java
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MockAMQQueue;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.framing.AMQShortString;
+
+public class ExchangeLogSubjectTest extends AbstractTestLogSubject
+{
+ Exchange _exchange;
+ VirtualHost _testVhost;
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ _testVhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().
+ getVirtualHost("test");
+
+ _exchange = _testVhost.getExchangeRegistry().getDefaultExchange();
+ _subject = new ExchangeLogSubject(_exchange,_testVhost);
+ }
+
+ /**
+ * Validate that the logged Subject message is as expected:
+ * MESSAGE [Blank][vh(/test)/ex(direct/<<default>>)] <Log Message>
+ * @param message the message whos format needs validation
+ */
+ @Override
+ protected void validateLogStatement(String message)
+ {
+ verifyVirtualHost(message, _testVhost);
+ verifyExchange(message, _exchange);
+ }
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/QueueLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/QueueLogSubjectTest.java
new file mode 100644
index 0000000000..7ef1f8d903
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/QueueLogSubjectTest.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MockAMQQueue;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class QueueLogSubjectTest extends AbstractTestLogSubject
+{
+
+ AMQQueue _queue;
+ VirtualHost _testVhost;
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ _testVhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().
+ getVirtualHost("test");
+
+ _queue = new MockAMQQueue("QueueLogSubjectTest");
+ ((MockAMQQueue) _queue).setVirtualHost(_testVhost);
+
+ _subject = new QueueLogSubject(_queue);
+ }
+
+ /**
+ * Validate that the logged Subject message is as expected:
+ * MESSAGE [Blank][vh(/test)/qu(BindingLogSubjectTest)] <Log Message>
+ *
+ * @param message the message whos format needs validation
+ */
+ @Override
+ protected void validateLogStatement(String message)
+ {
+ System.err.println(message);
+ verifyVirtualHost(message, _testVhost);
+ verifyQueue(message, _queue);
+ }
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubjectTest.java
new file mode 100644
index 0000000000..0b0b0d78d1
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubjectTest.java
@@ -0,0 +1,114 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.flow.LimitlessCreditManager;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MockAMQQueue;
+import org.apache.qpid.server.queue.MockProtocolSession;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionFactory;
+import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class SubscriptionLogSubjectTest extends AbstractTestLogSubject
+{
+
+ AMQQueue _queue;
+ VirtualHost _testVhost;
+ private boolean _acks;
+ private FieldTable _filters;
+ private boolean _noLocal;
+ private int _channelID = 1;
+ Subscription _subscription;
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ _testVhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().
+ getVirtualHost("test");
+
+ _queue = new MockAMQQueue("QueueLogSubjectTest");
+ ((MockAMQQueue) _queue).setVirtualHost(_testVhost);
+
+ // Create a single session for this test.
+ // Re-use is ok as we are testing the LogActor object is set correctly,
+ // not the value of the output.
+ AMQProtocolSession session = new MockProtocolSession(new MemoryMessageStore());
+ // Use the first Virtualhost that has been defined to initialise
+ // the MockProtocolSession. This prevents a NPE when the
+ // AMQPActor attempts to lookup the name of the VHost.
+ try
+ {
+ session.setVirtualHost(ApplicationRegistry.getInstance().
+ getVirtualHostRegistry().getVirtualHosts().
+ toArray(new VirtualHost[1])[0]);
+ }
+ catch (AMQException e)
+ {
+ fail("Unable to set virtualhost on session:" + e.getMessage());
+ }
+
+ AMQChannel channel = new AMQChannel(session, _channelID, session.getVirtualHost().getMessageStore());
+
+ session.addChannel(channel);
+
+ SubscriptionFactory factory = new SubscriptionFactoryImpl();
+
+ _subscription = factory.createSubscription(_channelID, session, new AMQShortString("cTag"),
+ _acks, _filters, _noLocal,
+ new LimitlessCreditManager());
+
+ _subscription.setQueue(_queue);
+
+ _subject = new SubscriptionLogSubject(_subscription);
+ }
+
+ /**
+ * Validate that the logged Subject message is as expected:
+ * MESSAGE [Blank][sub:0(qu(QueueLogSubjectTest))] <Log Message>
+ *
+ * @param message the message whos format needs validation
+ */
+ @Override
+ protected void validateLogStatement(String message)
+ {
+ String subscriptionSlice = getSlice("sub:"
+ + _subscription.getSubscriptionID(),
+ message);
+
+ assertNotNull("Unable to locate subscription 'sub:" +
+ _subscription.getSubscriptionID() + "'");
+
+ // Adding the ')' is a bit ugly but SubscriptionLogSubject is the only
+ // Subject that nests () and so the simple parser of checking for the
+ // next ')' falls down.
+ verifyQueue(subscriptionSlice+")", _queue);
+ }
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
index d5db87350b..f09b03ab85 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
@@ -21,25 +21,22 @@
package org.apache.qpid.server.protocol;
import junit.framework.TestCase;
-
import org.apache.log4j.Logger;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.SkeletonMessageStore;
import javax.management.JMException;
+import java.security.Principal;
-/**
- * Test class to test MBean operations for AMQMinaProtocolSession.
- */
+/** Test class to test MBean operations for AMQMinaProtocolSession. */
public class AMQProtocolSessionMBeanTest extends TestCase
{
/** Used for debugging. */
@@ -56,11 +53,11 @@ public class AMQProtocolSessionMBeanTest extends TestCase
int channelCount = _mbean.channels().size();
assertTrue(channelCount == 1);
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue_" + System.currentTimeMillis()),
- false,
- new AMQShortString("test"),
- true,
- _protocolSession.getVirtualHost(), null);
- AMQChannel channel = new AMQChannel(_protocolSession,2, _messageStore);
+ false,
+ new AMQShortString("test"),
+ true,
+ _protocolSession.getVirtualHost(), null);
+ AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore);
channel.setDefaultQueue(queue);
_protocolSession.addChannel(channel);
channelCount = _mbean.channels().size();
@@ -114,8 +111,16 @@ public class AMQProtocolSessionMBeanTest extends TestCase
IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
_protocolSession =
- new AMQMinaProtocolSession(new TestIoSession(), appRegistry.getVirtualHostRegistry(), new AMQCodecFactory(true),
- null);
+ new AMQMinaProtocolSession(new TestIoSession(), appRegistry.getVirtualHostRegistry(), new AMQCodecFactory(true),
+ null);
+ // Need to authenticate session for it to work, (well for logging to work)
+ _protocolSession.setAuthorizedID(new Principal()
+ {
+ public String getName()
+ {
+ return "AMQProtocolSessionMBeanTestUser";
+ }
+ });
_protocolSession.setVirtualHost(appRegistry.getVirtualHostRegistry().getVirtualHost("test"));
_channel = new AMQChannel(_protocolSession, 1, _messageStore);
_protocolSession.addChannel(_channel);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
index da35ddc594..49c5f8a14b 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
@@ -34,6 +34,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+import java.net.SocketAddress;
public class InternalTestProtocolSession extends AMQMinaProtocolSession implements ProtocolOutputConverter
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
index 1bdabf345b..9597c1319a 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
@@ -30,6 +30,8 @@ import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
+import java.security.Principal;
+
/** Test class to test MBean operations for AMQMinaProtocolSession. */
public class MaxChannelsTest extends TestCase
{
@@ -40,6 +42,16 @@ public class MaxChannelsTest extends TestCase
{
_session = new AMQMinaProtocolSession(new TestIoSession(), _appRegistry
.getVirtualHostRegistry(), new AMQCodecFactory(true), null);
+
+ // Need to authenticate session for it to work, (well for logging to work)
+ _session.setAuthorizedID(new Principal()
+ {
+ public String getName()
+ {
+ return "AMQProtocolSessionMBeanTestUser";
+ }
+ });
+
_session.setVirtualHost(_appRegistry.getVirtualHostRegistry().getVirtualHost("test"));
// check the channel count is correct
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index f73366c197..651c1311c8 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -44,6 +44,7 @@ public class MockAMQQueue implements AMQQueue
{
private boolean _deleted = false;
private AMQShortString _name;
+ private VirtualHost _virtualhost;
public MockAMQQueue(String name)
{
@@ -75,9 +76,14 @@ public class MockAMQQueue implements AMQQueue
return null; //To change body of implemented methods use File | Settings | File Templates.
}
+ public void setVirtualHost(VirtualHost virtualhost)
+ {
+ _virtualhost = virtualhost;
+ }
+
public VirtualHost getVirtualHost()
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return _virtualhost;
}
public void bind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
index 99c88fac3e..b9dcd972b1 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
@@ -34,7 +34,9 @@ import org.apache.qpid.transport.Sender;
import javax.security.sasl.SaslServer;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
import java.security.Principal;
+import java.net.SocketAddress;
/**
* A protocol session that can be used for testing purposes.
@@ -45,11 +47,21 @@ public class MockProtocolSession implements AMQProtocolSession
private Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>();
+ private static final AtomicLong idGenerator = new AtomicLong(0);
+
+ private final long _sessionID = idGenerator.getAndIncrement();
+ private VirtualHost _virtualHost;
+
public MockProtocolSession(MessageStore messageStore)
{
_messageStore = messageStore;
}
+ public long getSessionID()
+ {
+ return _sessionID;
+ }
+
public void dataBlockReceived(AMQDataBlock message) throws Exception
{
}
@@ -158,12 +170,12 @@ public class MockProtocolSession implements AMQProtocolSession
public VirtualHost getVirtualHost()
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return _virtualHost;
}
public void setVirtualHost(VirtualHost virtualHost)
{
- //To change body of implemented methods use File | Settings | File Templates.
+ _virtualHost = virtualHost;
}
public void addSessionCloseTask(Task task)
@@ -188,6 +200,18 @@ public class MockProtocolSession implements AMQProtocolSession
public Principal getAuthorizedID()
{
+ return new Principal()
+ {
+ public String getName()
+ {
+ return "MockProtocolSessionUser";
+ }
+ };
+
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
return null; //To change body of implemented methods use File | Settings | File Templates.
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java
index 83bcd03177..f56d562354 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java
@@ -40,6 +40,7 @@ import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
public class FirewallPluginTest extends TestCase
{
@@ -89,11 +90,13 @@ public class FirewallPluginTest extends TestCase
public void setUp() throws Exception
{
_store = new TestableMemoryMessageStore();
- PropertiesConfiguration env = new PropertiesConfiguration();
- _virtualHost = new VirtualHost(new VirtualHostConfiguration("test", env));
TestIoSession iosession = new TestIoSession();
iosession.setAddress("127.0.0.1");
- VirtualHostRegistry virtualHostRegistry = null;
+
+ // Retreive VirtualHost from the Registry
+ VirtualHostRegistry virtualHostRegistry = ApplicationRegistry.getInstance().getVirtualHostRegistry();
+ _virtualHost = virtualHostRegistry.getVirtualHost("test");
+
AMQCodecFactory codecFactory = new AMQCodecFactory(true);
_session = new AMQMinaProtocolSession(iosession, virtualHostRegistry, codecFactory);
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
index 33fd669d5c..43152ef780 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
@@ -21,10 +21,6 @@ package org.apache.qpid.server.subscription;
*
*/
-import java.util.ArrayList;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.AMQChannel;
@@ -32,6 +28,11 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.QueueEntry.SubscriptionAcquiredState;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
public class MockSubscription implements Subscription
{
@@ -44,6 +45,10 @@ public class MockSubscription implements Subscription
private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>();
private final Lock _stateChangeLock = new ReentrantLock();
+ private static final AtomicLong idGenerator = new AtomicLong(0);
+ // Create a simple ID that increments for ever new Subscription
+ private final long _subscriptionID = idGenerator.getAndIncrement();
+
public void close()
{
_closed = true;
@@ -66,7 +71,12 @@ public class MockSubscription implements Subscription
public AMQShortString getConsumerTag()
{
- return tag ;
+ return tag;
+ }
+
+ public long getSubscriptionID()
+ {
+ return _subscriptionID;
}
public QueueEntry getLastSeenEntry()
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
index 101c33043d..585ed9a538 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
@@ -21,30 +21,31 @@
package org.apache.qpid.server.util;
import junit.framework.TestCase;
-
import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.ConsumerTagNotUniqueException;
import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.InternalTestProtocolSession;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.ConsumerTagNotUniqueException;
-import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.AMQException;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.util.MockChannel;
-import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.exchange.ExchangeDefaults;
+
+import java.security.Principal;
public class InternalBrokerBaseCase extends TestCase
{
@@ -64,7 +65,7 @@ public class InternalBrokerBaseCase extends TestCase
configuration.setProperty("virtualhosts.virtualhost.test.store.class", TestableMemoryMessageStore.class.getName());
_registry = new TestApplicationRegistry(new ServerConfiguration(configuration));
ApplicationRegistry.initialise(_registry);
- _virtualHost = _registry.getVirtualHostRegistry().getVirtualHost("test");
+ _virtualHost = _registry.getVirtualHostRegistry().getVirtualHost("test");
_messageStore = _virtualHost.getMessageStore();
@@ -80,6 +81,14 @@ public class InternalBrokerBaseCase extends TestCase
_session = new InternalTestProtocolSession();
+ _session.setAuthorizedID(new Principal()
+ {
+ public String getName()
+ {
+ return "InternalBrokerBaseCaseUser";
+ }
+ });
+
_session.setVirtualHost(_virtualHost);
_channel = new MockChannel(_session, 1, _messageStore);
@@ -176,7 +185,7 @@ public class InternalBrokerBaseCase extends TestCase
for (int count = 0; count < messages; count++)
{
- channel.setPublishFrame(info, _virtualHost.getExchangeRegistry().getExchange(info.getExchange()));
+ channel.setPublishFrame(info, _virtualHost.getExchangeRegistry().getExchange(info.getExchange()));
//Set the body size
ContentHeaderBody _headerBody = new ContentHeaderBody();
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
index c6ecac6a01..84bee7984b 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
@@ -87,7 +87,7 @@ public class TestApplicationRegistry extends ApplicationRegistry
_messageStore = new TestableMemoryMessageStore();
- _virtualHostRegistry = new VirtualHostRegistry();
+ _virtualHostRegistry = new VirtualHostRegistry(this);
PropertiesConfiguration vhostProps = new PropertiesConfiguration();
VirtualHostConfiguration hostConfig = new VirtualHostConfiguration("test", vhostProps);
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java b/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java
index 63222b50db..515c849290 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java
@@ -20,7 +20,18 @@
*/
package org.apache.qpid.util;
-import java.io.*;
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.LinkedList;
+import java.util.List;
/**
* FileUtils provides some simple helper methods for working with files. It follows the convention of wrapping all
@@ -46,7 +57,8 @@ public class FileUtils
{
BufferedInputStream is = null;
- try{
+ try
+ {
try
{
is = new BufferedInputStream(new FileInputStream(filename));
@@ -57,7 +69,9 @@ public class FileUtils
}
return readStreamAsString(is);
- }finally {
+ }
+ finally
+ {
if (is != null)
{
try
@@ -210,68 +224,69 @@ public class FileUtils
/*
* Deletes a given file
*/
- public static boolean deleteFile(String filePath)
- {
- return delete(new File(filePath), false);
- }
+ public static boolean deleteFile(String filePath)
+ {
+ return delete(new File(filePath), false);
+ }
/*
* Deletes a given empty directory
*/
- public static boolean deleteDirectory(String directoryPath)
- {
- File directory = new File(directoryPath);
-
- if (directory.isDirectory())
- {
- if (directory.listFiles().length == 0)
- {
- return delete(directory, true);
- }
- }
-
- return false;
- }
-
- /**
- * Delete a given file/directory,
- * A directory will always require the recursive flag to be set.
- * if a directory is specified and recursive set then delete the whole tree
- * @param file the File object to start at
- * @param recursive boolean to recurse if a directory is specified.
- * @return <code>true</code> if and only if the file or directory is
- * successfully deleted; <code>false</code> otherwise
- */
- public static boolean delete(File file, boolean recursive)
- {
- boolean success = true;
-
- if (file.isDirectory())
- {
- if (recursive)
- {
- File[] files = file.listFiles();
-
- // This can occur if the file is deleted outside the JVM
- if (files == null)
- {
- return false;
- }
-
- for (int i = 0; i < files.length; i++)
- {
- success = delete(files[i], true) && success;
- }
-
- return success && file.delete();
- }
-
- return false;
- }
-
- return file.delete();
- }
+ public static boolean deleteDirectory(String directoryPath)
+ {
+ File directory = new File(directoryPath);
+ if (directory.isDirectory())
+ {
+ if (directory.listFiles().length == 0)
+ {
+ return delete(directory, true);
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Delete a given file/directory,
+ * A directory will always require the recursive flag to be set.
+ * if a directory is specified and recursive set then delete the whole tree
+ *
+ * @param file the File object to start at
+ * @param recursive boolean to recurse if a directory is specified.
+ *
+ * @return <code>true</code> if and only if the file or directory is
+ * successfully deleted; <code>false</code> otherwise
+ */
+ public static boolean delete(File file, boolean recursive)
+ {
+ boolean success = true;
+
+ if (file.isDirectory())
+ {
+ if (recursive)
+ {
+ File[] files = file.listFiles();
+
+ // This can occur if the file is deleted outside the JVM
+ if (files == null)
+ {
+ return false;
+ }
+
+ for (int i = 0; i < files.length; i++)
+ {
+ success = delete(files[i], true) && success;
+ }
+
+ return success && file.delete();
+ }
+
+ return false;
+ }
+
+ return file.delete();
+ }
public static class UnableToCopyException extends Exception
{
@@ -294,7 +309,6 @@ public class FileUtils
throw new IllegalArgumentException("Unable to copy '" + source.toString() + "' to '" + dst + "' a file with same name exists.");
}
-
if (source.isFile())
{
copy(source, dst);
@@ -303,22 +317,48 @@ public class FileUtils
//else we have a source directory
if (!dst.isDirectory() && !dst.mkdir())
{
- throw new UnableToCopyException("Unable to create destination directory");
+ throw new UnableToCopyException("Unable to create destination directory");
}
-
for (File file : source.listFiles())
{
- if (file.isFile())
- {
- copy(file, new File(dst.toString() + File.separator + file.getName()));
- }
- else
- {
- copyRecursive(file, new File(dst + File.separator + file.getName()));
- }
+ if (file.isFile())
+ {
+ copy(file, new File(dst.toString() + File.separator + file.getName()));
+ }
+ else
+ {
+ copyRecursive(file, new File(dst + File.separator + file.getName()));
+ }
}
+ }
+
+ /**
+ * Checks the specified file for instances of the search string.
+ *
+ * @param file the file to search
+ * @param search the search String
+ *
+ * @throws java.io.IOException
+ * @return the list of matching entries
+ */
+ public static List<String> searchFile(File file, String search)
+ throws IOException
+ {
+
+ List<String> results = new LinkedList<String>();
+
+ BufferedReader reader = new BufferedReader(new FileReader(file));
+ while (reader.ready())
+ {
+ String line = reader.readLine();
+ if (line.contains(search))
+ {
+ results.add(line);
+ }
+ }
+ return results;
}
}
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/util/FileUtilsTest.java b/qpid/java/common/src/test/java/org/apache/qpid/util/FileUtilsTest.java
index 94e7e20a86..7eba5f092e 100644
--- a/qpid/java/common/src/test/java/org/apache/qpid/util/FileUtilsTest.java
+++ b/qpid/java/common/src/test/java/org/apache/qpid/util/FileUtilsTest.java
@@ -22,11 +22,12 @@ package org.apache.qpid.util;
import junit.framework.TestCase;
-import java.io.File;
-import java.io.IOException;
import java.io.BufferedWriter;
-import java.io.FileWriter;
+import java.io.File;
import java.io.FileNotFoundException;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.List;
public class FileUtilsTest extends TestCase
{
@@ -47,7 +48,7 @@ public class FileUtilsTest extends TestCase
//Create initial file
File test = createTestFile(fileName, TEST_DATA);
-
+
try
{
//Check number of files before copy
@@ -137,7 +138,6 @@ public class FileUtilsTest extends TestCase
testSubDir.deleteOnExit();
testDir.deleteOnExit();
-
//Perform Copy
File copyDir = new File(testDir.toString() + COPY);
try
@@ -282,7 +282,7 @@ public class FileUtilsTest extends TestCase
public void testDeleteNonExistentFile()
{
- File test = new File("FileUtilsTest-testDelete-"+System.currentTimeMillis());
+ File test = new File("FileUtilsTest-testDelete-" + System.currentTimeMillis());
assertTrue("File exists", !test.exists());
assertFalse("File is a directory", test.isDirectory());
@@ -303,7 +303,6 @@ public class FileUtilsTest extends TestCase
}
}
-
/**
* Given two lists of File arrays ensure they are the same length and all entries in Before are in After
*
@@ -543,4 +542,71 @@ public class FileUtilsTest extends TestCase
}
}
+ public static final String SEARCH_STRING = "testSearch";
+
+ /**
+ * Test searchFile(File file, String search) will find a match when it
+ * exists.
+ *
+ * @throws java.io.IOException if unable to perform test setup
+ */
+ public void testSearchSucceed() throws IOException
+ {
+ File _logfile = File.createTempFile("FileUtilsTest-testSearchSucceed", ".out");
+
+ prepareFileForSearchTest(_logfile);
+
+ List<String> results = FileUtils.searchFile(_logfile, SEARCH_STRING);
+
+ assertNotNull("Null result set returned", results);
+
+ assertEquals("Results do not contain expected count", 1, results.size());
+ }
+
+ /**
+ * Test searchFile(File file, String search) will not find a match when the
+ * test string does not exist.
+ *
+ * @throws java.io.IOException if unable to perform test setup
+ */
+ public void testSearchFail() throws IOException
+ {
+ File _logfile = File.createTempFile("FileUtilsTest-testSearchFail", ".out");
+
+ prepareFileForSearchTest(_logfile);
+
+ List<String> results = FileUtils.searchFile(_logfile, "Hello");
+
+ assertNotNull("Null result set returned", results);
+
+ //Validate we only got one message
+ if (results.size() > 0)
+ {
+ System.err.println("Unexpected messages");
+
+ for (String msg : results)
+ {
+ System.err.println(msg);
+ }
+ }
+
+ assertEquals("Results contains data when it was not expected",
+ 0, results.size());
+ }
+
+ /**
+ * Write the SEARCH_STRING in to the given file.
+ *
+ * @param logfile The file to write the SEARCH_STRING into
+ *
+ * @throws IOException if an error occurs
+ */
+ private void prepareFileForSearchTest(File logfile) throws IOException
+ {
+ BufferedWriter writer = new BufferedWriter(new FileWriter(logfile));
+ writer.append(SEARCH_STRING);
+ writer.flush();
+ writer.close();
+ }
+
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
index 66ec9686dd..dd01cb2f8e 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
@@ -141,6 +141,11 @@ public class SubscriptionTestHelper implements Subscription
return null; //To change body of implemented methods use File | Settings | File Templates.
}
+ public long getSubscriptionID()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public boolean isActive()
{
return false; //To change body of implemented methods use File | Settings | File Templates.