summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-03-07 16:36:26 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-03-07 16:36:26 +0000
commit414bb380bf3b0edf51dcdc665fcef6a7b9dc5d9d (patch)
tree3928c2a587da59331246dafdd4d85591f3a5a483
parentfda07b599eda172c7979ba217390fabc66ab2b21 (diff)
downloadqpid-python-414bb380bf3b0edf51dcdc665fcef6a7b9dc5d9d.tar.gz
QPID-5611 : [Java Broker] remove LogActors
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1575315 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java5
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java36
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/Broker.java52
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/TransactionTimeoutHelper.java6
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java10
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java41
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java6
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/logging/AbstractRootMessageLogger.java176
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/logging/CompositeStartupMessageLogger.java38
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/logging/Log4jMessageLogger.java12
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/logging/LogActor.java66
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/logging/NullRootMessageLogger.java8
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java35
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/logging/SystemLog.java58
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/logging/SystemOutMessageLogger.java7
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java61
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java53
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java71
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/AbstractManagementActor.java68
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java145
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/GenericActor.java107
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/HttpManagementActor.java62
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java99
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/QueueActor.java53
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java6
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AbstractConfiguredObject.java2
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java6
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java17
-rwxr-xr-xjava/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java31
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java36
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java71
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java100
-rwxr-xr-xjava/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java69
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java107
-rwxr-xr-xjava/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java9
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/security/auth/ManagementConnectionPrincipal.java (renamed from java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/BrokerActor.java)33
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/security/auth/TaskPrincipal.java (renamed from java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/TestLogActor.java)18
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/security/auth/jmx/JMXConnectionPrincipal.java82
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/security/auth/jmx/JMXPasswordAuthenticator.java55
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java5
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/store/OperationalLoggingListener.java20
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java8
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java49
-rwxr-xr-xjava/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java34
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/TransactionTimeoutHelperTest.java46
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/TaskExecutorTest.java40
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java3
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java3
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java3
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/logging/Log4jMessageLoggerTest.java13
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java106
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java47
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/AbstractManagementActorTest.java86
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseActorTestCase.java59
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java1
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java251
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/HttpManagementActorTest.java81
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/ManagementActorTest.java186
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/QueueActorTest.java75
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/AbstractTestMessages.java55
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/ExchangeMessagesTest.java16
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java19
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java13
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java7
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java12
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java1
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java15
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java2
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java7
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java3
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/store/OperationalLoggingListenerTest.java33
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java14
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java3
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/HouseKeepingTaskTest.java45
-rw-r--r--java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/RuleSet.java6
-rw-r--r--java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControlTest.java5
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java10
-rwxr-xr-xjava/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java53
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java42
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java82
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java3
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java3
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java66
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java322
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java10
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java6
-rw-r--r--java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java16
-rw-r--r--java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java57
-rw-r--r--java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/ServletConnectionPrincipal.java9
-rw-r--r--java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java15
-rw-r--r--java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/SaslServlet.java6
-rw-r--r--java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/session/LoginLogoutReporter.java10
-rw-r--r--java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/session/LoginLogoutReporterTest.java19
-rw-r--r--java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagedObjectRegistry.java23
-rw-r--r--java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/MBeanInvocationHandlerImpl.java32
-rw-r--r--java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/ManagementLogonLogoffReporter.java49
-rw-r--r--java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/ManagementLogonLogoffReporterTest.java29
-rw-r--r--java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java4
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java4
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/logging/VirtualHostLoggingTest.java2
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java19
101 files changed, 1448 insertions, 2732 deletions
diff --git a/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java b/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
index 54051ab630..0d963ebdae 100644
--- a/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
+++ b/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
@@ -40,9 +40,6 @@ import junit.framework.TestCase;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.jmx.AMQManagedObject;
import org.apache.qpid.server.jmx.ManagedObjectRegistry;
-import org.apache.qpid.server.logging.SystemOutMessageLogger;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.TestLogActor;
import org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore;
public class BDBHAMessageStoreManagerMBeanTest extends TestCase
@@ -65,7 +62,6 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase
{
super.setUp();
- CurrentActor.set(new TestLogActor(new SystemOutMessageLogger()));
_store = mock(BDBHAMessageStore.class);
_mBeanParent = mock(AMQManagedObject.class);
when(_mBeanParent.getRegistry()).thenReturn(mock(ManagedObjectRegistry.class));
@@ -76,7 +72,6 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase
protected void tearDown() throws Exception
{
super.tearDown();
- CurrentActor.remove();
}
public void testObjectName() throws Exception
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
index 7f119880b0..d61da537f3 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
@@ -21,6 +21,7 @@ package org.apache.qpid.server.store.berkeleydb;
import java.io.File;
import java.net.InetSocketAddress;
+import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -35,10 +36,9 @@ import java.util.concurrent.Executors;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.security.auth.TaskPrincipal;
import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.logging.RootMessageLogger;
-import org.apache.qpid.server.logging.actors.AbstractActor;
-import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.HAMessageStore;
import org.apache.qpid.server.store.MessageStore;
@@ -66,6 +66,8 @@ import com.sleepycat.je.rep.StateChangeEvent;
import com.sleepycat.je.rep.StateChangeListener;
import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
+import javax.security.auth.Subject;
+
public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMessageStore
{
private static final Logger LOGGER = Logger.getLogger(BDBHAMessageStore.class);
@@ -608,7 +610,6 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
private void executeStateChangeAsync(final Callable<Void> callable, final String threadName)
{
- final RootMessageLogger _rootLogger = CurrentActor.get().getRootMessageLogger();
_executor.execute(new Runnable()
{
@@ -618,25 +619,28 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
{
final String originalThreadName = Thread.currentThread().getName();
Thread.currentThread().setName(threadName);
+ Subject subject = new Subject(false, SecurityManager.SYSTEM.getPrincipals(),
+ Collections.emptySet(), Collections.emptySet());
+ subject.getPrincipals().add(new TaskPrincipal("BDB HA State Change"));
try
{
- CurrentActor.set(new AbstractActor(_rootLogger)
+ Subject.doAs(subject, new PrivilegedAction<Object>()
{
@Override
- public String getLogMessage()
+ public Object run()
{
- return threadName;
+
+ try
+ {
+ callable.call();
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Exception during state change", e);
+ }
+ return null;
}
});
-
- try
- {
- callable.call();
- }
- catch (Exception e)
- {
- LOGGER.error("Exception during state change", e);
- }
}
finally
{
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java b/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java
index 66171c6fc2..8117f3ebfc 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java
@@ -23,6 +23,8 @@ package org.apache.qpid.server;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
import java.util.List;
import java.util.Properties;
import java.util.Set;
@@ -32,13 +34,16 @@ import org.apache.log4j.PropertyConfigurator;
import org.apache.qpid.server.configuration.ConfigurationEntryStore;
import org.apache.qpid.server.configuration.BrokerConfigurationStoreCreator;
import org.apache.qpid.server.configuration.store.ManagementModeStoreHandler;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.logging.SystemOutMessageLogger;
-import org.apache.qpid.server.logging.actors.BrokerActor;
-import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.log4j.LoggingManagementFacade;
import org.apache.qpid.server.logging.messages.BrokerMessages;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.security.auth.TaskPrincipal;
+
+import javax.security.auth.Subject;
public class Broker
{
@@ -80,17 +85,22 @@ public class Broker
public void startup(final BrokerOptions options) throws Exception
{
- CurrentActor.set(new BrokerActor(new SystemOutMessageLogger()));
- try
+ Subject subject = SecurityManager.SYSTEM;
+ subject = new Subject(false, subject.getPrincipals(), subject.getPublicCredentials(), subject.getPrivateCredentials());
+ subject.getPrincipals().add(new TaskPrincipal("Broker"));
+ Subject.doAs(subject, new PrivilegedExceptionAction<Object>()
{
- startupImpl(options);
- addShutdownHook();
- }
- finally
- {
- CurrentActor.remove();
+ @Override
+ public Object run() throws Exception
+ {
+ SystemLog.setRootMessageLogger(new SystemOutMessageLogger());
+ startupImpl(options);
+ addShutdownHook();
+
+ return null;
+ }
+ });
- }
}
private void startupImpl(final BrokerOptions options) throws Exception
@@ -98,7 +108,7 @@ public class Broker
String storeLocation = options.getConfigurationStoreLocation();
String storeType = options.getConfigurationStoreType();
- CurrentActor.get().message(BrokerMessages.CONFIG(storeLocation));
+ SystemLog.message(BrokerMessages.CONFIG(storeLocation));
//Allow skipping the logging configuration for people who are
//embedding the broker and want to configure it themselves.
@@ -159,7 +169,7 @@ public class Broker
{
if (logConfigFile.exists() && logConfigFile.canRead())
{
- CurrentActor.get().message(BrokerMessages.LOG_CONFIG(logConfigFile.getAbsolutePath()));
+ SystemLog.message(BrokerMessages.LOG_CONFIG(logConfigFile.getAbsolutePath()));
if (logWatchTime > 0)
{
@@ -270,8 +280,20 @@ public class Broker
{
public void run()
{
- LOGGER.debug("Shutdown hook running");
- Broker.this.shutdown();
+
+ Subject subject = SecurityManager.SYSTEM;
+ subject = new Subject(false, subject.getPrincipals(), subject.getPublicCredentials(), subject.getPrivateCredentials());
+ subject.getPrincipals().add(new TaskPrincipal("Shutdown"));
+ Subject.doAs(subject, new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ LOGGER.debug("Shutdown hook running");
+ Broker.this.shutdown();
+ return null;
+ }
+ });
}
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/TransactionTimeoutHelper.java b/java/broker-core/src/main/java/org/apache/qpid/server/TransactionTimeoutHelper.java
index 13f4fbf254..2824c1b7dc 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/TransactionTimeoutHelper.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/TransactionTimeoutHelper.java
@@ -18,10 +18,9 @@
*/
package org.apache.qpid.server;
-import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -73,8 +72,7 @@ public class TransactionTimeoutHelper
{
if (isTimedOut(timeSoFar, warnTimeout))
{
- LogActor logActor = CurrentActor.get();
- logActor.message(_logSubject, warnMessage);
+ SystemLog.message(_logSubject, warnMessage);
}
if(isTimedOut(timeSoFar, closeTimeout))
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java b/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
index 69e05ad989..9864fd2be1 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
@@ -21,7 +21,7 @@
package org.apache.qpid.server.binding;
import org.apache.qpid.server.exchange.ExchangeImpl;
-import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.logging.messages.BindingMessages;
import org.apache.qpid.server.logging.subjects.BindingLogSubject;
import org.apache.qpid.server.model.Binding;
@@ -94,9 +94,9 @@ public class BindingImpl
//Perform ACLs
queue.getVirtualHost().getSecurityManager().authoriseCreateBinding(this);
_logSubject = new BindingLogSubject(_bindingKey,exchange,queue);
- CurrentActor.get().message(_logSubject, BindingMessages.CREATED(String.valueOf(getArguments()),
- getArguments() != null
- && !getArguments().isEmpty()));
+ SystemLog.message(_logSubject, BindingMessages.CREATED(String.valueOf(getArguments()),
+ getArguments() != null
+ && !getArguments().isEmpty()));
}
@@ -229,7 +229,7 @@ public class BindingImpl
{
listener.stateChanged(this, State.ACTIVE, State.DELETED);
}
- CurrentActor.get().message(_logSubject, BindingMessages.DELETED());
+ SystemLog.message(_logSubject, BindingMessages.DELETED());
}
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java b/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
index e917202612..43ff07e6d0 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
@@ -22,8 +22,6 @@ package org.apache.qpid.server.configuration.updater;
import java.security.AccessController;
import java.security.PrivilegedAction;
-import java.security.PrivilegedActionException;
-import java.security.PrivilegedExceptionAction;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
@@ -39,10 +37,7 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.model.State;
-import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
public class TaskExecutor
@@ -210,47 +205,29 @@ public class TaskExecutor
private class CallableWrapper<T> implements Task<T>
{
private Task<T> _userTask;
- private LogActor _actor;
private Subject _contextSubject;
public CallableWrapper(Task<T> userWork)
{
_userTask = userWork;
- _actor = CurrentActor.get();
_contextSubject = Subject.getSubject(AccessController.getContext());
}
@Override
public T call()
{
- CurrentActor.set(_actor);
-
- try
- {
- T result = null;
- result = Subject.doAs(_contextSubject, new PrivilegedAction<T>()
+ T result = null;
+ result = Subject.doAs(_contextSubject, new PrivilegedAction<T>()
+ {
+ @Override
+ public T run()
{
- @Override
- public T run()
- {
- return executeTask(_userTask);
- }
- });
+ return executeTask(_userTask);
+ }
+ });
- return result;
- }
- finally
- {
- try
- {
- CurrentActor.remove();
- }
- catch (Exception e)
- {
- LOGGER.warn("Unexpected exception on current actor removal", e);
- }
- }
+ return result;
}
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index e01f4b7db9..7396f74c0e 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -25,7 +25,7 @@ import java.util.ArrayList;
import org.apache.log4j.Logger;
import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.logging.subjects.ExchangeLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
@@ -166,7 +166,7 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
}
};
// Log Exchange creation
- CurrentActor.get().message(ExchangeMessages.CREATED(getExchangeType().getType(), getName(), _durable));
+ SystemLog.message(ExchangeMessages.CREATED(getExchangeType().getType(), getName(), _durable));
}
public abstract ExchangeType<T> getExchangeType();
@@ -204,7 +204,7 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
_alternateExchange.removeReference(this);
}
- CurrentActor.get().message(_logSubject, ExchangeMessages.DELETED());
+ SystemLog.message(_logSubject, ExchangeMessages.DELETED());
for(Action<ExchangeImpl> task : _closeTaskList)
{
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/AbstractRootMessageLogger.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/AbstractRootMessageLogger.java
index 98da9074ef..4717e7ccf2 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/AbstractRootMessageLogger.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/logging/AbstractRootMessageLogger.java
@@ -21,10 +21,32 @@
package org.apache.qpid.server.logging;
+import org.apache.qpid.server.connection.ConnectionPrincipal;
+import org.apache.qpid.server.connection.SessionPrincipal;
+import org.apache.qpid.server.logging.subjects.LogSubjectFormat;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
+import org.apache.qpid.server.security.auth.ManagementConnectionPrincipal;
+import org.apache.qpid.server.security.auth.TaskPrincipal;
+
+import javax.security.auth.Subject;
+import java.security.AccessController;
+import java.security.Principal;
+import java.text.MessageFormat;
+import java.util.Set;
+
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT;
+
public abstract class AbstractRootMessageLogger implements RootMessageLogger
{
public static final String DEFAULT_LOG_HIERARCHY_PREFIX = "qpid.message.";
-
+
+ private final String _msgPrefix = System.getProperty("qpid.logging.prefix","");
+
private boolean _enabled = true;
public AbstractRootMessageLogger()
@@ -42,17 +64,159 @@ public abstract class AbstractRootMessageLogger implements RootMessageLogger
return _enabled;
}
- public boolean isMessageEnabled(LogActor actor, LogSubject subject, String logHierarchy)
+ public boolean isMessageEnabled(String logHierarchy)
{
return _enabled;
}
- public boolean isMessageEnabled(LogActor actor, String logHierarchy)
+ public void message(LogMessage message)
{
- return _enabled;
+ if (isMessageEnabled(message.getLogHierarchy()))
+ {
+ rawMessage(_msgPrefix + getActor() + message, message.getLogHierarchy());
+ }
+ }
+
+ public void message(LogSubject subject, LogMessage message)
+ {
+ if (isMessageEnabled(message.getLogHierarchy()))
+ {
+ rawMessage(_msgPrefix + getActor() + subject.toLogString() + message,
+ message.getLogHierarchy());
+ }
+ }
+ abstract void rawMessage(String message, String logHierarchy);
+
+ abstract void rawMessage(String message, Throwable throwable, String logHierarchy);
+
+
+ protected String getActor()
+ {
+ Subject subject = Subject.getSubject(AccessController.getContext());
+
+ SessionPrincipal sessionPrincipal = getPrincipal(subject, SessionPrincipal.class);
+ String message;
+ if(sessionPrincipal != null)
+ {
+ message = generateSessionMessage(sessionPrincipal.getSession());
+ }
+ else
+ {
+ ConnectionPrincipal connPrincipal = getPrincipal(subject, ConnectionPrincipal.class);
+
+ if(connPrincipal != null)
+ {
+ message = generateConnectionMessage(connPrincipal.getConnection());
+ }
+ else
+ {
+ TaskPrincipal taskPrincipal = getPrincipal(subject, TaskPrincipal.class);
+ if(taskPrincipal != null)
+ {
+ message = generateTaskMessage(taskPrincipal);
+ }
+ else
+ {
+ ManagementConnectionPrincipal managementConnection = getPrincipal(subject,ManagementConnectionPrincipal.class);
+ if(managementConnection != null)
+ {
+ message = generateManagementConnectionMessage(managementConnection, getPrincipal(subject, AuthenticatedPrincipal.class));
+ }
+ else
+ {
+ message = "<<UNKNOWN>> ";
+ }
+ }
+ }
+ }
+ return message;
+ }
+
+ private String generateManagementConnectionMessage(final ManagementConnectionPrincipal managementConnection,
+ final AuthenticatedPrincipal userPrincipal)
+ {
+ String remoteAddress = managementConnection.getRemoteAddress().toString();
+ String user = userPrincipal == null ? "N/A" : userPrincipal.getName();
+ return "[" + MessageFormat.format(LogSubjectFormat.MANAGEMENT_FORMAT, user, remoteAddress) + "] ";
+ }
+
+ private String generateTaskMessage(final TaskPrincipal taskPrincipal)
+ {
+ return "["+taskPrincipal.getName()+"] ";
+ }
+
+ protected String generateConnectionMessage(final AMQConnectionModel connection)
+ {
+ if (connection.getAuthorizedPrincipal() != null)
+ {
+ if (connection.getVirtualHostName() != null)
+ {
+ /**
+ * LOG FORMAT used by the AMQPConnectorActor follows
+ * ConnectionLogSubject.CONNECTION_FORMAT :
+ * con:{0}({1}@{2}/{3})
+ *
+ * Uses a MessageFormat call to insert the required values
+ * according to these indices:
+ *
+ * 0 - Connection ID 1 - User ID 2 - IP 3 - Virtualhost
+ */
+ return "[" + MessageFormat.format(CONNECTION_FORMAT,
+ connection.getConnectionId(),
+ connection.getAuthorizedPrincipal().getName(),
+ connection.getRemoteAddressString(),
+ connection.getVirtualHostName())
+ + "] ";
+
+ }
+ else
+ {
+ return"[" + MessageFormat.format(USER_FORMAT,
+ connection.getConnectionId(),
+ connection.getAuthorizedPrincipal().getName(),
+ connection.getRemoteAddressString())
+ + "] ";
+
+ }
+ }
+ else
+ {
+ return "[" + MessageFormat.format(SOCKET_FORMAT,
+ connection.getConnectionId(),
+ connection.getRemoteAddressString())
+ + "] ";
+ }
+ }
+
+ protected String generateSessionMessage(final AMQSessionModel session)
+ {
+ AMQConnectionModel connection = session.getConnectionModel();
+ return "[" + MessageFormat.format(CHANNEL_FORMAT, connection == null ? -1L : connection.getConnectionId(),
+ (connection == null || connection.getAuthorizedPrincipal() == null)
+ ? "?"
+ : connection.getAuthorizedPrincipal().getName(),
+ (connection == null || connection.getRemoteAddressString() == null)
+ ? "?"
+ : connection.getRemoteAddressString(),
+ (connection == null || connection.getVirtualHostName() == null)
+ ? "?"
+ : connection.getVirtualHostName(),
+ session.getChannelId())
+ + "] ";
+ }
+
+ private <P extends Principal> P getPrincipal(Subject subject, Class<P> clazz)
+ {
+ if(subject != null)
+ {
+ Set<P> principals = subject.getPrincipals(clazz);
+ if(principals != null && !principals.isEmpty())
+ {
+ return principals.iterator().next();
+ }
+ }
+ return null;
}
- public abstract void rawMessage(String message, String logHierarchy);
- public abstract void rawMessage(String message, Throwable throwable, String logHierarchy);
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/CompositeStartupMessageLogger.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/CompositeStartupMessageLogger.java
index e0a51b3a3e..d21fd62045 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/CompositeStartupMessageLogger.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/logging/CompositeStartupMessageLogger.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.logging;
-public class CompositeStartupMessageLogger extends AbstractRootMessageLogger
+public class CompositeStartupMessageLogger implements RootMessageLogger
{
private RootMessageLogger[] _loggers;
@@ -30,22 +30,50 @@ public class CompositeStartupMessageLogger extends AbstractRootMessageLogger
_loggers = loggers;
}
+
+ @Override
+ public boolean isEnabled()
+ {
+ for(RootMessageLogger l : _loggers)
+ {
+ if(l.isEnabled())
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
@Override
- public void rawMessage(String message, String logHierarchy)
+ public boolean isMessageEnabled(final String logHierarchy)
{
for(RootMessageLogger l : _loggers)
{
- l.rawMessage(message, logHierarchy);
+ if(l.isMessageEnabled(logHierarchy))
+ {
+ return true;
+ }
}
+ return false;
}
@Override
- public void rawMessage(String message, Throwable throwable, String logHierarchy)
+ public void message(final LogMessage message)
{
for(RootMessageLogger l : _loggers)
{
- l.rawMessage(message, throwable, logHierarchy);
+ l.message(message);
}
}
+ @Override
+ public void message(final LogSubject subject, final LogMessage message)
+ {
+ for(RootMessageLogger l : _loggers)
+ {
+ l.message(subject, message);
+ }
+ }
+
+
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/Log4jMessageLogger.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/Log4jMessageLogger.java
index b4e9f2f333..896ca84361 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/Log4jMessageLogger.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/logging/Log4jMessageLogger.java
@@ -33,15 +33,9 @@ public class Log4jMessageLogger extends AbstractRootMessageLogger
{
super(statusUpdatesEnabled);
}
-
- @Override
- public boolean isMessageEnabled(LogActor actor, LogSubject subject, String logHierarchy)
- {
- return isMessageEnabled(actor, logHierarchy);
- }
@Override
- public boolean isMessageEnabled(LogActor actor, String logHierarchy)
+ public boolean isMessageEnabled(String logHierarchy)
{
if(isEnabled())
{
@@ -55,13 +49,13 @@ public class Log4jMessageLogger extends AbstractRootMessageLogger
}
@Override
- public void rawMessage(String message, String logHierarchy)
+ void rawMessage(String message, String logHierarchy)
{
rawMessage(message, null, logHierarchy);
}
@Override
- public void rawMessage(String message, Throwable throwable, String logHierarchy)
+ void rawMessage(String message, Throwable throwable, String logHierarchy)
{
Logger logger = Logger.getLogger(logHierarchy);
logger.info(message, throwable);
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/LogActor.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/LogActor.java
deleted file mode 100644
index 18f03c2716..0000000000
--- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/LogActor.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- *
- */
-package org.apache.qpid.server.logging;
-
-/**
- * LogActor the entity that is stored as in a ThreadLocal and used to perform logging.
- *
- * The actor is responsible for formatting its display name for the log entry.
- *
- * The actor performs the requested logging.
- */
-public interface LogActor
-{
- /**
- * Logs the specified LogMessage about the LogSubject
- *
- * Currently logging has a global setting however this will later be revised and
- * as such the LogActor will need to take into consideration any new configuration
- * as a means of enabling the logging of LogActors and LogSubjects.
- *
- * @param subject The subject that is being logged
- * @param message The message to log
- */
- public void message(LogSubject subject, LogMessage message);
-
- /**
- * Logs the specified LogMessage against this actor
- *
- * Currently logging has a global setting however this will later be revised and
- * as such the LogActor will need to take into consideration any new configuration
- * as a means of enabling the logging of LogActors and LogSubjects.
- *
- * @param message The message to log
- */
- public void message(LogMessage message);
-
- /**
- *
- * @return the RootMessageLogger that is currently in use by this LogActor.
- */
- RootMessageLogger getRootMessageLogger();
-
- /**
- *
- * @return the String representing this LogActor
- */
- public String getLogMessage();
-} \ No newline at end of file
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/NullRootMessageLogger.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/NullRootMessageLogger.java
index 64e16d9ee8..c7cf609503 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/NullRootMessageLogger.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/logging/NullRootMessageLogger.java
@@ -24,13 +24,7 @@ public class NullRootMessageLogger extends AbstractRootMessageLogger
{
@Override
- public boolean isMessageEnabled(LogActor actor, LogSubject subject, String logHierarchy)
- {
- return false;
- }
-
- @Override
- public boolean isMessageEnabled(LogActor actor, String logHierarchy)
+ public boolean isMessageEnabled(String logHierarchy)
{
return false;
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java
index c31e528c55..92318ed8b0 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java
@@ -33,43 +33,20 @@ public interface RootMessageLogger
* @return boolean true if enabled.
*/
boolean isEnabled();
-
- /**
- * Determine if the LogSubject and the LogActor should be
- * generating log messages.
- * @param actor The actor requesting the logging
- * @param subject The subject of this log request
- * @param logHierarchy The log hierarchy for this request
- *
- * @return boolean true if the message should be logged.
- */
- boolean isMessageEnabled(LogActor actor, LogSubject subject, String logHierarchy);
/**
* Determine if the LogActor should be generating log messages.
*
- * @param actor The actor requesting the logging
* @param logHierarchy The log hierarchy for this request
*
* @return boolean true if the message should be logged.
*/
- boolean isMessageEnabled(LogActor actor, String logHierarchy);
+ boolean isMessageEnabled(String logHierarchy);
+
+ void message(LogMessage message);
+
+ void message(LogSubject subject, LogMessage message);
+
- /**
- * Log the raw message to the configured logger.
- *
- * @param message The message to log
- * @param logHierarchy The log hierarchy for this request
- */
- public void rawMessage(String message, String logHierarchy);
- /**
- * Log the raw message to the configured logger.
- * Along with a formatted stack trace from the Throwable.
- *
- * @param message The message to log
- * @param throwable Optional Throwable that should provide stack trace
- * @param logHierarchy The log hierarchy for this request
- */
- void rawMessage(String message, Throwable throwable, String logHierarchy);
} \ No newline at end of file
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/SystemLog.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/SystemLog.java
new file mode 100644
index 0000000000..b722c5360b
--- /dev/null
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/logging/SystemLog.java
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging;
+
+public class SystemLog
+{
+ private static RootMessageLogger _rootMessageLogger = new NullRootMessageLogger();
+
+ /**
+ * Logs the specified LogMessage about the LogSubject
+ *
+ * @param subject The subject that is being logged
+ * @param message The message to log
+ */
+ public static void message(LogSubject subject, LogMessage message)
+ {
+ getRootMessageLogger().message(subject, message);
+ }
+
+ /**
+ * Logs the specified LogMessage
+ *
+ * @param message The message to log
+ */
+ public static void message(LogMessage message)
+ {
+ getRootMessageLogger().message((message));
+ }
+
+ private synchronized static RootMessageLogger getRootMessageLogger()
+ {
+ final RootMessageLogger rootMessageLogger = _rootMessageLogger;
+ return rootMessageLogger == null ? new NullRootMessageLogger() : rootMessageLogger;
+ }
+
+ public static synchronized void setRootMessageLogger(final RootMessageLogger rootMessageLogger)
+ {
+ _rootMessageLogger = rootMessageLogger;
+ }
+}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/SystemOutMessageLogger.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/SystemOutMessageLogger.java
index 63c1bcc712..297085427d 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/SystemOutMessageLogger.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/logging/SystemOutMessageLogger.java
@@ -23,14 +23,9 @@ package org.apache.qpid.server.logging;
public class SystemOutMessageLogger extends AbstractRootMessageLogger
{
- @Override
- public boolean isMessageEnabled(LogActor actor, LogSubject subject, String logHierarchy)
- {
- return true;
- }
@Override
- public boolean isMessageEnabled(LogActor actor, String logHierarchy)
+ public boolean isMessageEnabled(String logHierarchy)
{
return true;
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java
deleted file mode 100644
index 9228a2674d..0000000000
--- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- *
- */
-package org.apache.qpid.server.logging.actors;
-
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.logging.RootMessageLogger;
-import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
-
-/**
- * An AMQPChannelActor represents a connection through the AMQP port with an
- * associated Channel.
- *
- * <p/>
- * This is responsible for correctly formatting the LogActor String in the log
- * <p/>
- * [con:1(user@127.0.0.1/)/ch:1]
- * <p/>
- * To do this it requires access to the IO Layers as well as a Channel
- */
-public class AMQPChannelActor extends AbstractActor
-{
- private final ChannelLogSubject _logString;
-
- /**
- * Create a new ChannelActor
- *
- * @param channel The Channel for this LogActor
- * @param rootLogger The root Logger that this LogActor should use
- */
- public AMQPChannelActor(AMQSessionModel channel, RootMessageLogger rootLogger)
- {
- super(rootLogger);
-
-
- _logString = new ChannelLogSubject(channel);
- }
-
- public String getLogMessage()
- {
- return _logString.toLogString();
- }
-}
-
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java
deleted file mode 100644
index d4213b2876..0000000000
--- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- *
- */
-package org.apache.qpid.server.logging.actors;
-
-import org.apache.qpid.server.logging.RootMessageLogger;
-import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
-
-
-/**
- * An AMQPConnectionActor represents a connection through the AMQP port.
- * <p/>
- * This is responsible for correctly formatting the LogActor String in the log
- * <p/>
- * [ con:1(user@127.0.0.1/) ]
- * <p/>
- * To do this it requires access to the IO Layers.
- */
-public class AMQPConnectionActor extends AbstractActor
-{
- private ConnectionLogSubject _logSubject;
-
- public AMQPConnectionActor(AMQConnectionModel session, RootMessageLogger rootLogger)
- {
- super(rootLogger);
-
- _logSubject = new ConnectionLogSubject(session);
- }
-
- public String getLogMessage()
- {
- return _logSubject.toLogString();
- }
-}
-
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java
deleted file mode 100644
index e8c6c9c323..0000000000
--- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.logging.actors;
-
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.LogMessage;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.RootMessageLogger;
-
-public abstract class AbstractActor implements LogActor
-{
- private final String _msgPrefix = System.getProperty("qpid.logging.prefix","");
-
- private RootMessageLogger _rootLogger;
-
- public AbstractActor(RootMessageLogger rootLogger)
- {
- if(rootLogger == null)
- {
- throw new NullPointerException("RootMessageLogger cannot be null");
- }
- _rootLogger = rootLogger;
- }
-
- public void message(LogSubject subject, LogMessage message)
- {
- if (_rootLogger.isMessageEnabled(this, subject, message.getLogHierarchy()))
- {
- _rootLogger.rawMessage(_msgPrefix + getLogMessage() + subject.toLogString() + message, message.getLogHierarchy());
- }
- }
-
- public void message(LogMessage message)
- {
- if (_rootLogger.isMessageEnabled(this, message.getLogHierarchy()))
- {
- _rootLogger.rawMessage(_msgPrefix + getLogMessage() + message, message.getLogHierarchy());
- }
- }
-
- public RootMessageLogger getRootMessageLogger()
- {
- return _rootLogger;
- }
-
- public String toString()
- {
- return getLogMessage();
- }
-
- abstract public String getLogMessage();
-
-}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/AbstractManagementActor.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/AbstractManagementActor.java
deleted file mode 100644
index 8cf121b3d9..0000000000
--- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/AbstractManagementActor.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.logging.actors;
-
-import java.security.AccessController;
-
-import javax.security.auth.Subject;
-
-import org.apache.qpid.server.logging.RootMessageLogger;
-import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
-
-public abstract class AbstractManagementActor extends AbstractActor
-{
- /**
- * Holds the principal name to display when principal subject is not available.
- * <p>
- * This is useful for cases when users invoke JMX operation over JConsole
- * attached to the local JVM.
- */
- protected static final String UNKNOWN_PRINCIPAL = "N/A";
-
- /** used when the principal name cannot be discovered from the Subject */
- private final String _fallbackPrincipalName;
-
- public AbstractManagementActor(RootMessageLogger rootLogger, String fallbackPrincipalName)
- {
- super(rootLogger);
- _fallbackPrincipalName = fallbackPrincipalName;
- }
-
- /**
- * Returns current {@link AuthenticatedPrincipal} name or {@link #_fallbackPrincipalName}
- * if it can't be found.
- */
- protected String getPrincipalName()
- {
- String identity = _fallbackPrincipalName;
-
- final Subject subject = Subject.getSubject(AccessController.getContext());
- if (subject != null)
- {
- AuthenticatedPrincipal authenticatedPrincipal = AuthenticatedPrincipal.getOptionalAuthenticatedPrincipalFromSubject(subject);
- if(authenticatedPrincipal != null)
- {
- identity = authenticatedPrincipal.getName();
- }
- }
- return identity;
- }
-}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java
deleted file mode 100644
index 91d9ef7dbc..0000000000
--- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.logging.actors;
-
-import java.util.EmptyStackException;
-import java.util.Stack;
-
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.LogMessage;
-import org.apache.qpid.server.logging.LogSubject;
-
-/**
- * The CurrentActor is a ThreadLocal wrapper that allows threads in the broker
- * to retrieve an actor to perform logging. This approach is used so for two
- * reasons:
- * 1) We do not have to pass a logging actor around the system
- * 2) We can set new actors at the point we have enough information. i.e.
- * - Set a low level ConnectionActor when processing bytes from the wire.
- * - Set a ChannelActor when we are processing the frame
- * <p/>
- * The code performing the logging need not worry about what type of actor is
- * currently set so can perform its logging. The resulting log entry though will
- * contain customised details from the the currently set Actor.
- * <p/>
- * The Actor model also allows the pre-creation of fixed messages so the
- * performance impact of the additional logging data is minimised.
- * <p/>
- * This class does not perform any checks to ensure that there is an Actor set
- * when calling remove or get. As a result the application developer must ensure
- * that they have called set before they attempt to use the actor via get or
- * remove the set actor.
- * <p/>
- * The checking of the return via get should not be done as the logging is
- * desired. It is preferable to cause the NullPointerException to highlight the
- * programming error rather than miss a log message.
- * <p/>
- * The same is true for the remove. A NPE will occur if no set has been called
- * highlighting the programming error.
- */
-public class CurrentActor
-{
- /** The ThreadLocal variable with initialiser */
- private static final ThreadLocal<Stack<LogActor>> _currentActor = new ThreadLocal<Stack<LogActor>>()
- {
- // Initialise the CurrentActor to be an empty List
- protected Stack<LogActor> initialValue()
- {
- return new Stack<LogActor>();
- }
- };
-
- private static LogActor _defaultActor;
-
- private CurrentActor()
- {
- }
-
- /**
- * Set a new {@link LogActor} to be the Current Actor
- * <p/>
- * This pushes the Actor in to the LIFO Queue
- *
- * @param actor The new LogActor
- */
- public static void set(LogActor actor)
- {
- Stack<LogActor> stack = _currentActor.get();
- stack.push(actor);
- }
-
- /**
- * Remove all {@link LogActor}s
- */
- public static void removeAll()
- {
- Stack<LogActor> stack = _currentActor.get();
- stack.clear();
- }
-
- /**
- * Remove the current {@link LogActor}.
- * <p/>
- * Calling remove without calling set will result in an EmptyStackException.
- */
- public static void remove()
- {
- Stack<LogActor> stack = _currentActor.get();
- stack.pop();
-
- if (stack.isEmpty())
- {
- _currentActor.remove();
- }
- }
-
- /**
- * Return the current head of the list of {@link LogActor}s.
- *
- * @return Current LogActor
- */
- public static LogActor get()
- {
- try
- {
- return _currentActor.get().peek();
- }
- catch (EmptyStackException ese)
- {
- return _defaultActor;
- }
- }
-
- public static void setDefault(LogActor defaultActor)
- {
- _defaultActor = defaultActor;
- }
-
- public static void message(LogSubject subject, LogMessage message)
- {
- get().message(subject, message);
- }
-
- public static void message(LogMessage message)
- {
- get().message(message);
- }
-}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/GenericActor.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/GenericActor.java
deleted file mode 100644
index 4cf2bd4508..0000000000
--- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/GenericActor.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.server.logging.actors;
-
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.RootMessageLogger;
-
-public class GenericActor extends AbstractActor
-{
-
- private static RootMessageLogger _defaultMessageLogger;
-
- private LogSubject _logSubject;
-
- public static RootMessageLogger getDefaultMessageLogger()
- {
- return _defaultMessageLogger;
- }
-
- public static void setDefaultMessageLogger(RootMessageLogger defaultMessageLogger)
- {
- _defaultMessageLogger = defaultMessageLogger;
- }
-
- public GenericActor(final String logSubject)
- {
- this(new LogSubject()
- {
- @Override
- public String toLogString()
- {
- return logSubject;
- }
- });
- }
-
-
- public GenericActor(LogSubject logSubject)
- {
- this(logSubject, CurrentActor.get().getRootMessageLogger());
- }
-
- public GenericActor(LogSubject logSubject, RootMessageLogger rootLogger)
- {
- super(rootLogger);
- _logSubject = logSubject;
- }
-
- public String getLogMessage()
- {
- return _logSubject.toLogString();
- }
-
- public LogSubject getLogSubject()
- {
- return _logSubject;
- }
-
- public static LogActor getInstance(final String logMessage, RootMessageLogger rootLogger)
- {
- return new GenericActor(new LogSubject()
- {
- public String toLogString()
- {
- return logMessage;
- }
-
- }, rootLogger);
- }
-
- public static LogActor getInstance(final String subjectMessage)
- {
- return new GenericActor(new LogSubject()
- {
- public String toLogString()
- {
- return "[" + subjectMessage + "] ";
- }
-
- }, _defaultMessageLogger);
- }
-
- public static LogActor getInstance(LogSubject logSubject)
- {
- return new GenericActor(logSubject, _defaultMessageLogger);
- }
-}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/HttpManagementActor.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/HttpManagementActor.java
deleted file mode 100644
index 9b445c2bd9..0000000000
--- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/HttpManagementActor.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.logging.actors;
-
-import java.text.MessageFormat;
-
-import org.apache.qpid.server.logging.RootMessageLogger;
-import org.apache.qpid.server.logging.subjects.LogSubjectFormat;
-
-/**
- * HttpManagement actor to use in {@link AbstractServlet} to log all http management operational logging.
- *
- * An instance is required per http Session.
- */
-public class HttpManagementActor extends AbstractManagementActor
-{
- private String _cachedLogString;
- private String _lastPrincipalName;
- private String _address;
-
- public HttpManagementActor(RootMessageLogger rootLogger, String ip, int port)
- {
- super(rootLogger, UNKNOWN_PRINCIPAL);
- _address = ip + ":" + port;
- }
-
- private synchronized String getAndCacheLogString()
- {
- String principalName = getPrincipalName();
-
- if(!principalName.equals(_lastPrincipalName))
- {
- _lastPrincipalName = principalName;
- _cachedLogString = "[" + MessageFormat.format(LogSubjectFormat.MANAGEMENT_FORMAT, principalName, _address) + "] ";
- }
-
- return _cachedLogString;
- }
-
- public String getLogMessage()
- {
- return getAndCacheLogString();
- }
-}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java
deleted file mode 100644
index ba5ea47fc1..0000000000
--- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.logging.actors;
-
-import org.apache.qpid.server.logging.RootMessageLogger;
-import org.apache.qpid.server.logging.subjects.LogSubjectFormat;
-
-import java.text.MessageFormat;
-
-/**
- * Management actor to use in {@link MBeanInvocationHandlerImpl} to log all management operational logging.
- */
-public class ManagementActor extends AbstractManagementActor
-{
- private String _lastThreadName = null;
-
- /**
- * The logString to be used for logging
- */
- private String _logStringContainingPrincipal;
-
- /** @param rootLogger The RootLogger to use for this Actor */
- public ManagementActor(RootMessageLogger rootLogger)
- {
- super(rootLogger, UNKNOWN_PRINCIPAL);
- }
-
- public ManagementActor(RootMessageLogger rootLogger, String principalName)
- {
- super(rootLogger, principalName);
- }
-
- private synchronized String getAndCacheLogString()
- {
- String currentName = Thread.currentThread().getName();
-
- String actor;
- String logString = _logStringContainingPrincipal;
-
- // Record the last thread name so we don't have to recreate the log string
- if (_logStringContainingPrincipal == null || !currentName.equals(_lastThreadName))
- {
- _lastThreadName = currentName;
- String principalName = getPrincipalName();
-
- // Management Thread names have this format.
- // RMI TCP Connection(2)-169.24.29.116
- // This is true for both LocalAPI and JMX Connections
- // However to be defensive lets test.
- String[] split = currentName.split("\\(");
- if (split.length == 2)
- {
- String ip = currentName.split("-")[1];
- actor = MessageFormat.format(LogSubjectFormat.MANAGEMENT_FORMAT, principalName, ip);
- }
- else
- {
- // This is a precautionary path as it is not expected to occur
- // however rather than adjusting the thread name of the two
- // tests that will use this it is safer all round to do this.
- // it is also currently used by tests :
- // AMQBrokerManagerMBeanTest
- // ExchangeMBeanTest
- actor = currentName;
- }
-
- logString = "[" + actor + "] ";
- if(principalName != UNKNOWN_PRINCIPAL )
- {
- _logStringContainingPrincipal = logString;
- }
-
- }
- return logString;
- }
-
- public String getLogMessage()
- {
- return getAndCacheLogString();
- }
-}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/QueueActor.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/QueueActor.java
deleted file mode 100644
index 4b17e8c0e6..0000000000
--- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/QueueActor.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.logging.actors;
-
-import org.apache.qpid.server.logging.RootMessageLogger;
-import org.apache.qpid.server.logging.subjects.QueueLogSubject;
-import org.apache.qpid.server.queue.AMQQueue;
-
-/**
- * This Actor is used when while the queue is performing an asynchronous process
- * of its queue.
- */
-public class QueueActor extends AbstractActor
-{
- private QueueLogSubject _logSubject;
-
- /**
- * Create an QueueLogSubject that Logs in the following format.
- *
- * @param queue The queue that this Actor is working for
- * @param rootLogger the Root logger to use.
- */
- public QueueActor(AMQQueue queue, RootMessageLogger rootLogger)
- {
- super(rootLogger);
-
- _logSubject = new QueueLogSubject(queue);
- }
-
- public String getLogMessage()
- {
- return _logSubject.toLogString();
- }
-}
-
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index f056937b1b..d63a765144 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -123,6 +123,12 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>
//children
Collection<? extends Binding> getBindings();
+ // TODO - Undo this commented out line when we stop supporting 1.6 for compilation
+ // In 1.6 this causes the build to break at AbstractQueue because the 1.6 compiler can't work out that
+ // the definition in terms of the Consumer implementation meets both this, and the contract for AMQQueue
+
+ // Collection<? extends Consumer> getConsumers();
+
//operations
void visit(QueueEntryVisitor visitor);
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AbstractConfiguredObject.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AbstractConfiguredObject.java
index 66dcd1ab3e..eda61f92b0 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AbstractConfiguredObject.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AbstractConfiguredObject.java
@@ -357,7 +357,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
{
Object value = attr.getValue((X)this);
if(value != null && attr.getAnnotation().secure() &&
- !SecurityManager.SYSTEM.equals(Subject.getSubject(AccessController.getContext())))
+ !SecurityManager.isSystemProcess())
{
return SECURE_VALUES.get(value.getClass());
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java
index 097c179514..1fcf169dc8 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java
@@ -34,7 +34,7 @@ import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
-import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.logging.messages.BrokerMessages;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.KeyStore;
@@ -99,7 +99,7 @@ public class AmqpPortAdapter extends PortAdapter<AmqpPortAdapter>
_transport.start();
for(Transport transport : getTransports())
{
- CurrentActor.get().message(BrokerMessages.LISTENING(String.valueOf(transport), getPort()));
+ SystemLog.message(BrokerMessages.LISTENING(String.valueOf(transport), getPort()));
}
}
@@ -110,7 +110,7 @@ public class AmqpPortAdapter extends PortAdapter<AmqpPortAdapter>
{
for(Transport transport : getTransports())
{
- CurrentActor.get().message(BrokerMessages.SHUTTING_DOWN(String.valueOf(transport), getPort()));
+ SystemLog.message(BrokerMessages.SHUTTING_DOWN(String.valueOf(transport), getPort()));
}
_transport.close();
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
index 01798ad4ac..ed5d371079 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
@@ -41,8 +41,7 @@ import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.logging.LogRecorder;
import org.apache.qpid.server.logging.RootMessageLogger;
-import org.apache.qpid.server.logging.actors.BrokerActor;
-import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.logging.messages.BrokerMessages;
import org.apache.qpid.server.model.*;
import org.apache.qpid.server.plugin.PreferencesProviderFactory;
@@ -1046,22 +1045,16 @@ public class BrokerAdapter<X extends Broker<X>> extends AbstractConfiguredObject
changeState(_authenticationProviders, currentState, State.ACTIVE, false);
changeState(_accessControlProviders, currentState, State.ACTIVE, false);
- CurrentActor.set(new BrokerActor(getRootMessageLogger()));
- try
- {
- changeState(_vhostAdapters, currentState, State.ACTIVE, false);
- }
- finally
- {
- CurrentActor.remove();
- }
+
+ changeState(_vhostAdapters, currentState, State.ACTIVE, false);
changeState(_portAdapters, currentState,State.ACTIVE, false);
changeState(_plugins, currentState,State.ACTIVE, false);
if (isManagementMode())
{
- CurrentActor.get().message(BrokerMessages.MANAGEMENT_MODE(BrokerOptions.MANAGEMENT_MODE_USER_NAME, _brokerOptions.getManagementModePassword()));
+ SystemLog.message(BrokerMessages.MANAGEMENT_MODE(BrokerOptions.MANAGEMENT_MODE_USER_NAME,
+ _brokerOptions.getManagementModePassword()));
}
return true;
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
index 01b220fd79..2b86f257b5 100755
--- a/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
@@ -30,17 +30,16 @@ import java.util.Set;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLPeerUnverifiedException;
-import javax.net.ssl.SSLSocket;
+import javax.security.auth.Subject;
import org.apache.log4j.Logger;
import org.apache.qpid.protocol.ServerProtocolEngine;
-import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.plugin.ProtocolEngineCreator;
-import org.apache.qpid.transport.Binary;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.security.SSLStatus;
@@ -144,6 +143,12 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
return _delegate.getConnectionId();
}
+ @Override
+ public Subject getSubject()
+ {
+ return _delegate.getSubject();
+ }
+
private static final int MINIMUM_REQUIRED_HEADER_BYTES = 8;
public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
@@ -243,6 +248,12 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
{
return _id;
}
+
+ @Override
+ public Subject getSubject()
+ {
+ return new Subject();
+ }
}
private class SelfDelegateProtocolEngine implements ServerProtocolEngine
@@ -384,6 +395,12 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
return _id;
}
+ @Override
+ public Subject getSubject()
+ {
+ return _delegate.getSubject();
+ }
+
public void exception(Throwable t)
{
_logger.error("Error establishing session", t);
@@ -423,7 +440,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
public void readerIdle()
{
- CurrentActor.get().message(ConnectionMessages.IDLE_CLOSE());
+ SystemLog.message(ConnectionMessages.IDLE_CLOSE());
_network.close();
}
@@ -547,6 +564,12 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
}
@Override
+ public Subject getSubject()
+ {
+ return _decryptEngine.getSubject();
+ }
+
+ @Override
public long getLastReadTime()
{
return _lastReadTime;
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 11eb0b8a19..7ed54499ec 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -36,6 +36,7 @@ import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.*;
import org.apache.qpid.server.model.Queue;
@@ -44,10 +45,7 @@ import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.filter.FilterManager;
-import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.QueueActor;
import org.apache.qpid.server.logging.messages.QueueMessages;
import org.apache.qpid.server.logging.subjects.QueueLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
@@ -180,7 +178,6 @@ public abstract class AbstractQueue
private LogSubject _logSubject;
- private LogActor _logActor;
private boolean _noLocal;
@@ -245,7 +242,6 @@ public abstract class AbstractQueue
_asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
_logSubject = new QueueLogSubject(this);
- _logActor = new QueueActor(this, CurrentActor.get().getRootMessageLogger());
virtualHost.getSecurityManager().authoriseCreateQueue(this);
@@ -411,14 +407,14 @@ public abstract class AbstractQueue
// Log the creation of this Queue.
// The priorities display is toggled on if we set priorities > 0
- CurrentActor.get().message(_logSubject,
- QueueMessages.CREATED(ownerString,
- _entries.getPriorities(),
- ownerString != null ,
- _lifetimePolicy != LifetimePolicy.PERMANENT,
- durable,
- !durable,
- _entries.getPriorities() > 0));
+ SystemLog.message(_logSubject,
+ QueueMessages.CREATED(ownerString,
+ _entries.getPriorities(),
+ ownerString != null,
+ _lifetimePolicy != LifetimePolicy.PERMANENT,
+ durable,
+ !durable,
+ _entries.getPriorities() > 0));
if(attributes != null && attributes.containsKey(Queue.MESSAGE_GROUP_KEY))
{
@@ -1684,7 +1680,7 @@ public abstract class AbstractQueue
stop();
//Log Queue Deletion
- CurrentActor.get().message(_logSubject, QueueMessages.DELETED());
+ SystemLog.message(_logSubject, QueueMessages.DELETED());
}
return getQueueDepthMessages();
@@ -1707,7 +1703,7 @@ public abstract class AbstractQueue
{
_overfull.set(true);
//Overfull log message
- _logActor.message(_logSubject, QueueMessages.OVERFULL(_atomicQueueSize.get(), _capacity));
+ SystemLog.message(_logSubject, QueueMessages.OVERFULL(_atomicQueueSize.get(), _capacity));
_blockedChannels.add(channel);
@@ -1717,13 +1713,12 @@ public abstract class AbstractQueue
{
//Underfull log message
- _logActor.message(_logSubject, QueueMessages.UNDERFULL(_atomicQueueSize.get(), _flowResumeCapacity));
+ SystemLog.message(_logSubject, QueueMessages.UNDERFULL(_atomicQueueSize.get(), _flowResumeCapacity));
channel.unblock(this);
_blockedChannels.remove(channel);
}
-
}
@@ -1739,7 +1734,7 @@ public abstract class AbstractQueue
{
if(_overfull.compareAndSet(true,false))
{//Underfull log message
- _logActor.message(_logSubject, QueueMessages.UNDERFULL(_atomicQueueSize.get(), _flowResumeCapacity));
+ SystemLog.message(_logSubject, QueueMessages.UNDERFULL(_atomicQueueSize.get(), _flowResumeCapacity));
}
for(final AMQSessionModel blockedChannel : _blockedChannels)
@@ -2401,11 +2396,6 @@ public abstract class AbstractQueue
_unackedMsgBytes.addAndGet(entry.getSize());
}
- public LogActor getLogActor()
- {
- return _logActor;
- }
-
@Override
public int getMaximumDeliveryAttempts()
{
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
index 8c03e8f882..ca6be01136 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
@@ -24,17 +24,14 @@ import org.apache.log4j.Logger;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.JMSSelectorFilter;
import org.apache.qpid.server.filter.MessageFilter;
-import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.GenericActor;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.logging.messages.SubscriptionMessages;
import org.apache.qpid.server.logging.subjects.QueueLogSubject;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.State;
@@ -56,7 +53,7 @@ import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPT
class QueueConsumerImpl
extends AbstractConfiguredObject<QueueConsumerImpl>
- implements QueueConsumer<QueueConsumerImpl>
+ implements QueueConsumer<QueueConsumerImpl>, LogSubject
{
@@ -76,9 +73,6 @@ class QueueConsumerImpl
private final Class<? extends ServerMessage> _messageClass;
private final Object _sessionReference;
private final AbstractQueue _queue;
- private GenericActor _logActor = new GenericActor("[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getConsumerNumber())
- + "(UNKNOWN)"
- + "] ");
static final EnumMap<ConsumerTarget.State, State> STATE_MAP =
new EnumMap<ConsumerTarget.State, State>(ConsumerTarget.State.class);
@@ -97,7 +91,7 @@ class QueueConsumerImpl
{
public void stateChanged(QueueConsumerImpl sub, State oldState, State newState)
{
- CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString()));
+ SystemLog.message(SubscriptionMessages.STATE(newState.toString()));
}
};
@ManagedAttributeField
@@ -184,12 +178,12 @@ class QueueConsumerImpl
{
if(_targetClosed.compareAndSet(false,true))
{
- CurrentActor.get().message(getLogSubject(), SubscriptionMessages.CLOSE());
+ SystemLog.message(getLogSubject(), SubscriptionMessages.CLOSE());
}
}
else
{
- CurrentActor.get().message(getLogSubject(),SubscriptionMessages.STATE(newState.toString()));
+ SystemLog.message(getLogSubject(), SubscriptionMessages.STATE(newState.toString()));
}
}
@@ -302,36 +296,17 @@ class QueueConsumerImpl
private void setupLogging()
{
- String queueString = new QueueLogSubject(_queue).toLogString();
-
- _logActor = new GenericActor("[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getConsumerNumber())
- + "("
- // queueString is [vh(/{0})/qu({1}) ] so need to trim
- // ^ ^^
- + queueString.substring(1,queueString.length() - 3)
- + ")"
- + "] ");
-
-
- if (CurrentActor.get().getRootMessageLogger().isMessageEnabled(_logActor, _logActor.getLogSubject(), SubscriptionMessages.CREATE_LOG_HIERARCHY))
- {
- final String filterLogString = getFilterLogString();
- CurrentActor.get().message(_logActor.getLogSubject(), SubscriptionMessages.CREATE(filterLogString, _queue.isDurable() && _exclusive,
- filterLogString.length() > 0));
- }
+ final String filterLogString = getFilterLogString();
+ SystemLog.message(this,
+ SubscriptionMessages.CREATE(filterLogString, _queue.isDurable() && _exclusive,
+ filterLogString.length() > 0));
}
protected final LogSubject getLogSubject()
{
- return _logActor.getLogSubject();
+ return this;
}
- final LogActor getLogActor()
- {
- return _logActor;
- }
-
-
@Override
public final void flush()
{
@@ -586,4 +561,30 @@ class QueueConsumerImpl
{
return getAttributeNames(getClass());
}
+
+ @Override
+ public String toLogString()
+ {
+ String logString;
+ if(_queue == null)
+ {
+ logString = "[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getConsumerNumber())
+ + "(UNKNOWN)"
+ + "] ";
+ }
+ else
+ {
+ String queueString = new QueueLogSubject(_queue).toLogString();
+ logString = "[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getConsumerNumber())
+ + "("
+ // queueString is [vh(/{0})/qu({1}) ] so need to trim
+ // ^ ^^
+ + queueString.substring(1,queueString.length() - 3)
+ + ")"
+ + "] ";
+
+ }
+
+ return logString;
+ }
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
index d9f3297acf..4d21e9d23a 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
@@ -20,16 +20,20 @@
*/
package org.apache.qpid.server.queue;
+import java.security.PrivilegedAction;
+import java.util.Collections;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.security.auth.TaskPrincipal;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.transport.TransportException;
+import javax.security.auth.Subject;
+
/**
* QueueRunners are Runnables used to process a queue when requiring
* asynchronous message delivery to consumers, which is necessary
@@ -62,60 +66,68 @@ public class QueueRunner implements Runnable
{
if(_scheduled.compareAndSet(SCHEDULED,RUNNING))
{
- long runAgain = Long.MIN_VALUE;
- _stateChange.set(false);
- try
- {
- CurrentActor.set(_queue.getLogActor());
-
- runAgain = _queue.processQueue(this);
- }
- catch (final ConnectionScopedRuntimeException e)
- {
- final String errorMessage = "Problem during asynchronous delivery by " + toString();
- if(_logger.isDebugEnabled())
- {
- _logger.debug(errorMessage, e);
- }
- else
- {
- _logger.info(errorMessage + ' ' + e.getMessage());
- }
- }
- catch (final TransportException transe)
+ Subject subject = new Subject(false, org.apache.qpid.server.security.SecurityManager.SYSTEM.getPrincipals(), Collections
+ .emptySet(), Collections.emptySet());
+ subject.getPrincipals().add(new TaskPrincipal("Queue Delivery"));
+ Subject.doAs(subject, new PrivilegedAction<Object>()
{
- final String errorMessage = "Problem during asynchronous delivery by " + toString();
- if(_logger.isDebugEnabled())
- {
- _logger.debug(errorMessage, transe);
- }
- else
+ @Override
+ public Object run()
{
- _logger.info(errorMessage + ' ' + transe.getMessage());
- }
- }
- finally
- {
- _scheduled.compareAndSet(RUNNING, IDLE);
- final long stateChangeCount = _queue.getStateChangeCount();
- _lastRunAgain.set(runAgain);
- _lastRunTime.set(System.nanoTime());
- if(runAgain == 0L || runAgain != stateChangeCount || _stateChange.compareAndSet(true,false))
- {
- if(_scheduled.compareAndSet(IDLE, SCHEDULED))
+ long runAgain = Long.MIN_VALUE;
+ _stateChange.set(false);
+ try
+ {
+ runAgain = _queue.processQueue(QueueRunner.this);
+ }
+ catch (final ConnectionScopedRuntimeException e)
+ {
+ final String errorMessage = "Problem during asynchronous delivery by " + toString();
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug(errorMessage, e);
+ }
+ else
+ {
+ _logger.info(errorMessage + ' ' + e.getMessage());
+ }
+ }
+ catch (final TransportException transe)
+ {
+ final String errorMessage = "Problem during asynchronous delivery by " + toString();
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug(errorMessage, transe);
+ }
+ else
+ {
+ _logger.info(errorMessage + ' ' + transe.getMessage());
+ }
+ }
+ finally
{
- _queue.execute(this);
+ _scheduled.compareAndSet(RUNNING, IDLE);
+ final long stateChangeCount = _queue.getStateChangeCount();
+ _lastRunAgain.set(runAgain);
+ _lastRunTime.set(System.nanoTime());
+ if(runAgain == 0L || runAgain != stateChangeCount || _stateChange.compareAndSet(true,false))
+ {
+ if(_scheduled.compareAndSet(IDLE, SCHEDULED))
+ {
+ _queue.execute(QueueRunner.this);
+ }
+ }
}
+ return null;
}
- CurrentActor.remove();
- }
+ });
}
}
public String toString()
{
- return "QueueRunner-" + _queue.getLogActor();
+ return "QueueRunner-" + _queue.getLogSubject();
}
public void execute(Executor executor)
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
index 9ef32e61e2..610c5f8202 100755
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
@@ -23,9 +23,13 @@ package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.security.auth.TaskPrincipal;
import org.apache.qpid.transport.TransportException;
+import javax.security.auth.Subject;
+import java.security.PrivilegedAction;
+import java.util.Collections;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -58,37 +62,46 @@ class SubFlushRunner implements Runnable
{
if(_scheduled.compareAndSet(SCHEDULED, RUNNING))
{
- boolean complete = false;
- _stateChange.set(false);
- try
+ Subject subject = new Subject(false, SecurityManager.SYSTEM.getPrincipals(), Collections.emptySet(), Collections.emptySet());
+ subject.getPrincipals().add(new TaskPrincipal("Sub. Delivery"));
+ Subject.doAs(subject, new PrivilegedAction<Object>()
{
- CurrentActor.set(_sub.getLogActor());
- complete = getQueue().flushConsumer(_sub, ITERATIONS);
- }
- catch (final TransportException transe)
- {
- final String errorMessage = "Problem during asynchronous delivery by " + toString();
- if(_logger.isDebugEnabled())
- {
- _logger.debug(errorMessage, transe);
- }
- else
- {
- _logger.info(errorMessage + ' ' + transe.getMessage());
- }
- }
- finally
- {
- CurrentActor.remove();
- _scheduled.compareAndSet(RUNNING, IDLE);
- if ((!complete || _stateChange.compareAndSet(true,false))&& !_sub.isSuspended())
+ @Override
+ public Object run()
{
- if(_scheduled.compareAndSet(IDLE,SCHEDULED))
+ boolean complete = false;
+ _stateChange.set(false);
+ try
{
- getQueue().execute(this);
+ complete = getQueue().flushConsumer(_sub, ITERATIONS);
}
+ catch (final TransportException transe)
+ {
+ final String errorMessage = "Problem during asynchronous delivery by " + toString();
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug(errorMessage, transe);
+ }
+ else
+ {
+ _logger.info(errorMessage + ' ' + transe.getMessage());
+ }
+ }
+ finally
+ {
+ _scheduled.compareAndSet(RUNNING, IDLE);
+ if ((!complete || _stateChange.compareAndSet(true,false))&& !_sub.isSuspended())
+ {
+ if(_scheduled.compareAndSet(IDLE,SCHEDULED))
+ {
+ getQueue().execute(SubFlushRunner.this);
+ }
+ }
+ }
+ return null;
}
- }
+ });
+
}
}
@@ -99,7 +112,7 @@ class SubFlushRunner implements Runnable
public String toString()
{
- return "SubFlushRunner-" + _sub.getLogActor();
+ return "SubFlushRunner-" + _sub.toLogString();
}
public void execute(Executor executor)
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/java/broker-core/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
index 429f1b3fa1..0dd241906b 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
@@ -20,7 +20,9 @@
*/
package org.apache.qpid.server.registry;
+import java.security.PrivilegedAction;
import java.util.Collection;
+import java.util.Collections;
import java.util.Timer;
import java.util.TimerTask;
@@ -34,28 +36,23 @@ import org.apache.qpid.server.configuration.ConfiguredObjectRecoverer;
import org.apache.qpid.server.configuration.RecovererProvider;
import org.apache.qpid.server.configuration.startup.DefaultRecovererProvider;
import org.apache.qpid.server.configuration.store.StoreConfigurationChangeListener;
-import org.apache.qpid.server.logging.CompositeStartupMessageLogger;
-import org.apache.qpid.server.logging.Log4jMessageLogger;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.LogRecorder;
-import org.apache.qpid.server.logging.RootMessageLogger;
-import org.apache.qpid.server.logging.SystemOutMessageLogger;
-import org.apache.qpid.server.logging.actors.AbstractActor;
-import org.apache.qpid.server.logging.actors.BrokerActor;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.GenericActor;
+import org.apache.qpid.server.logging.*;
import org.apache.qpid.server.logging.messages.BrokerMessages;
import org.apache.qpid.server.logging.messages.VirtualHostMessages;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.security.auth.TaskPrincipal;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.util.SystemUtils;
+import javax.security.auth.Subject;
+
/**
* An abstract application registry that provides access to configuration information and handles the
@@ -103,38 +100,28 @@ public class ApplicationRegistry implements IApplicationRegistry
//Create the composite (log4j+SystemOut MessageLogger to be used during startup
RootMessageLogger[] messageLoggers = {new SystemOutMessageLogger(), _rootMessageLogger};
CompositeStartupMessageLogger startupMessageLogger = new CompositeStartupMessageLogger(messageLoggers);
+ SystemLog.setRootMessageLogger(startupMessageLogger);
- BrokerActor actor = new BrokerActor(startupMessageLogger);
- CurrentActor.set(actor);
- CurrentActor.setDefault(actor);
- GenericActor.setDefaultMessageLogger(_rootMessageLogger);
- try
- {
- logStartupMessages(CurrentActor.get());
+ logStartupMessages();
- _taskExecutor = new TaskExecutor();
- _taskExecutor.start();
+ _taskExecutor = new TaskExecutor();
+ _taskExecutor.start();
- StoreConfigurationChangeListener storeChangeListener = new StoreConfigurationChangeListener(_store);
- RecovererProvider provider = new DefaultRecovererProvider((StatisticsGatherer)this, _virtualHostRegistry, _logRecorder, _rootMessageLogger, _taskExecutor, brokerOptions, storeChangeListener);
- ConfiguredObjectRecoverer<? extends ConfiguredObject> brokerRecoverer = provider.getRecoverer(Broker.class.getSimpleName());
- _broker = (Broker) brokerRecoverer.create(provider, _store.getRootEntry());
+ StoreConfigurationChangeListener storeChangeListener = new StoreConfigurationChangeListener(_store);
+ RecovererProvider provider = new DefaultRecovererProvider((StatisticsGatherer)this, _virtualHostRegistry, _logRecorder, _rootMessageLogger, _taskExecutor, brokerOptions, storeChangeListener);
+ ConfiguredObjectRecoverer<? extends ConfiguredObject> brokerRecoverer = provider.getRecoverer(Broker.class.getSimpleName());
+ _broker = (Broker) brokerRecoverer.create(provider, _store.getRootEntry());
- _virtualHostRegistry.setDefaultVirtualHostName((String)_broker.getAttribute(Broker.DEFAULT_VIRTUAL_HOST));
+ _virtualHostRegistry.setDefaultVirtualHostName((String)_broker.getAttribute(Broker.DEFAULT_VIRTUAL_HOST));
- initialiseStatisticsReporting();
+ initialiseStatisticsReporting();
- // starting the broker
- _broker.setDesiredState(State.INITIALISING, State.ACTIVE);
+ // starting the broker
+ _broker.setDesiredState(State.INITIALISING, State.ACTIVE);
- CurrentActor.get().message(BrokerMessages.READY());
- }
- finally
- {
- CurrentActor.remove();
- }
+ SystemLog.message(BrokerMessages.READY());
+ SystemLog.setRootMessageLogger(_rootMessageLogger);
- CurrentActor.setDefault(new BrokerActor(_rootMessageLogger));
}
private void initialiseStatisticsReporting()
@@ -158,29 +145,40 @@ public class ApplicationRegistry implements IApplicationRegistry
private final boolean _reset;
private final RootMessageLogger _logger;
+ private final Subject _subject;
public StatisticsReportingTask(boolean reset, RootMessageLogger logger)
{
_reset = reset;
_logger = logger;
+ _subject = new Subject(false, SecurityManager.SYSTEM.getPrincipals(), Collections.emptySet(), Collections.emptySet());
+ _subject.getPrincipals().add(new TaskPrincipal("Statistics"));
+ _subject.setReadOnly();
+
}
public void run()
{
- CurrentActor.set(new AbstractActor(_logger)
+ Subject.doAs(_subject, new PrivilegedAction<Object>()
{
- public String getLogMessage()
+ @Override
+ public Object run()
{
- return "[" + Thread.currentThread().getName() + "] ";
+ reportStatistics();
+ return null;
}
});
+ }
+
+ protected void reportStatistics()
+ {
try
{
- CurrentActor.get().message(BrokerMessages.STATS_DATA(DELIVERED, _dataDelivered.getPeak() / 1024.0, _dataDelivered.getTotal()));
- CurrentActor.get().message(BrokerMessages.STATS_MSGS(DELIVERED, _messagesDelivered.getPeak(), _messagesDelivered.getTotal()));
- CurrentActor.get().message(BrokerMessages.STATS_DATA(RECEIVED, _dataReceived.getPeak() / 1024.0, _dataReceived.getTotal()));
- CurrentActor.get().message(BrokerMessages.STATS_MSGS(RECEIVED, _messagesReceived.getPeak(), _messagesReceived.getTotal()));
- Collection<VirtualHost> hosts = _virtualHostRegistry.getVirtualHosts();
+ SystemLog.message(BrokerMessages.STATS_DATA(DELIVERED, _dataDelivered.getPeak() / 1024.0, _dataDelivered.getTotal()));
+ SystemLog.message(BrokerMessages.STATS_MSGS(DELIVERED, _messagesDelivered.getPeak(), _messagesDelivered.getTotal()));
+ SystemLog.message(BrokerMessages.STATS_DATA(RECEIVED, _dataReceived.getPeak() / 1024.0, _dataReceived.getTotal()));
+ SystemLog.message(BrokerMessages.STATS_MSGS(RECEIVED, _messagesReceived.getPeak(), _messagesReceived.getTotal()));
+ Collection<VirtualHost> hosts = _virtualHostRegistry.getVirtualHosts();
if (hosts.size() > 1)
{
@@ -192,10 +190,10 @@ public class ApplicationRegistry implements IApplicationRegistry
StatisticsCounter dataReceived = vhost.getDataReceiptStatistics();
StatisticsCounter messagesReceived = vhost.getMessageReceiptStatistics();
- CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, DELIVERED, dataDelivered.getPeak() / 1024.0, dataDelivered.getTotal()));
- CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, DELIVERED, messagesDelivered.getPeak(), messagesDelivered.getTotal()));
- CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, RECEIVED, dataReceived.getPeak() / 1024.0, dataReceived.getTotal()));
- CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, RECEIVED, messagesReceived.getPeak(), messagesReceived.getTotal()));
+ SystemLog.message(VirtualHostMessages.STATS_DATA(name, DELIVERED, dataDelivered.getPeak() / 1024.0, dataDelivered.getTotal()));
+ SystemLog.message(VirtualHostMessages.STATS_MSGS(name, DELIVERED, messagesDelivered.getPeak(), messagesDelivered.getTotal()));
+ SystemLog.message(VirtualHostMessages.STATS_DATA(name, RECEIVED, dataReceived.getPeak() / 1024.0, dataReceived.getTotal()));
+ SystemLog.message(VirtualHostMessages.STATS_MSGS(name, RECEIVED, messagesReceived.getPeak(), messagesReceived.getTotal()));
}
}
@@ -208,10 +206,6 @@ public class ApplicationRegistry implements IApplicationRegistry
{
ApplicationRegistry._logger.warn("Unexpected exception occurred while reporting the statistics", e);
}
- finally
- {
- CurrentActor.remove();
- }
}
}
@@ -241,8 +235,6 @@ public class ApplicationRegistry implements IApplicationRegistry
_logger.info("Shutting down ApplicationRegistry:" + this);
}
- //Set the Actor for Broker Shutdown
- CurrentActor.set(new BrokerActor(_rootMessageLogger));
try
{
//Stop Statistics Reporting
@@ -264,7 +256,7 @@ public class ApplicationRegistry implements IApplicationRegistry
_taskExecutor.stop();
}
- CurrentActor.get().message(BrokerMessages.STOPPED());
+ SystemLog.message(BrokerMessages.STOPPED());
_logRecorder.closeLogRecorder();
@@ -275,7 +267,6 @@ public class ApplicationRegistry implements IApplicationRegistry
{
_taskExecutor.stopImmediately();
}
- CurrentActor.remove();
}
_store = null;
_broker = null;
@@ -334,17 +325,17 @@ public class ApplicationRegistry implements IApplicationRegistry
_dataReceived = new StatisticsCounter("bytes-received");
}
- private void logStartupMessages(LogActor logActor)
+ private void logStartupMessages()
{
- logActor.message(BrokerMessages.STARTUP(QpidProperties.getReleaseVersion(), QpidProperties.getBuildVersion()));
+ SystemLog.message(BrokerMessages.STARTUP(QpidProperties.getReleaseVersion(), QpidProperties.getBuildVersion()));
- logActor.message(BrokerMessages.PLATFORM(System.getProperty("java.vendor"),
+ SystemLog.message(BrokerMessages.PLATFORM(System.getProperty("java.vendor"),
System.getProperty("java.runtime.version", System.getProperty("java.version")),
SystemUtils.getOSName(),
SystemUtils.getOSVersion(),
SystemUtils.getOSArch()));
- logActor.message(BrokerMessages.MAX_MEMORY(Runtime.getRuntime().maxMemory()));
+ SystemLog.message(BrokerMessages.MAX_MEMORY(Runtime.getRuntime().maxMemory()));
}
@Override
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java b/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java
index 1f0d482ed2..37c379da86 100755
--- a/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java
@@ -151,6 +151,12 @@ public class SecurityManager implements ConfigurationChangeListener
return _logger;
}
+ public static boolean isSystemProcess()
+ {
+ Subject subject = Subject.getSubject(AccessController.getContext());
+ return !(subject == null || subject.getPrincipals(SystemPrincipal.class).isEmpty());
+ }
+
private static final class SystemPrincipal implements Principal
{
private SystemPrincipal()
@@ -172,8 +178,7 @@ public class SecurityManager implements ConfigurationChangeListener
private boolean checkAllPlugins(AccessCheck checker)
{
// If we are running as SYSTEM then no ACL checking
- final Subject subject = Subject.getSubject(AccessController.getContext());
- if(subject != null && !subject.getPrincipals(SystemPrincipal.class).isEmpty())
+ if(isSystemProcess())
{
return true;
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/BrokerActor.java b/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/ManagementConnectionPrincipal.java
index 9e77452228..640c61c5af 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/BrokerActor.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/ManagementConnectionPrincipal.java
@@ -18,36 +18,9 @@
* under the License.
*
*/
-package org.apache.qpid.server.logging.actors;
+package org.apache.qpid.server.security.auth;
-import org.apache.qpid.server.logging.RootMessageLogger;
-
-public class BrokerActor extends AbstractActor
+public interface ManagementConnectionPrincipal extends SocketConnectionPrincipal
{
- private final String _logString;
-
- /**
- * Create a new BrokerActor
- *
- * @param logger
- */
- public BrokerActor(RootMessageLogger logger)
- {
- super(logger);
-
- _logString = "[Broker] ";
- }
-
- public BrokerActor(String name, RootMessageLogger logger)
- {
- super(logger);
-
- _logString = "[Broker(" + name + ")] ";
- }
-
- public String getLogMessage()
- {
- return _logString;
- }
-
+ public String getType();
}
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/TestLogActor.java b/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/TaskPrincipal.java
index 30f4e16e42..e1cd3e9d62 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/TestLogActor.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/TaskPrincipal.java
@@ -18,20 +18,22 @@
* under the License.
*
*/
-package org.apache.qpid.server.logging.actors;
+package org.apache.qpid.server.security.auth;
-import org.apache.qpid.server.logging.RootMessageLogger;
+import java.security.Principal;
-public class TestLogActor extends AbstractActor
+public class TaskPrincipal implements Principal
{
- public TestLogActor(RootMessageLogger rootLogger)
+
+ private final String _name;
+
+ public TaskPrincipal(final String name)
{
- super(rootLogger);
+ _name = name;
}
- public String getLogMessage()
+ public String getName()
{
- return "[Test Actor] ";
+ return _name;
}
}
- \ No newline at end of file
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/jmx/JMXConnectionPrincipal.java b/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/jmx/JMXConnectionPrincipal.java
new file mode 100644
index 0000000000..35ac197e74
--- /dev/null
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/jmx/JMXConnectionPrincipal.java
@@ -0,0 +1,82 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.security.auth.jmx;
+
+import org.apache.qpid.server.security.auth.ManagementConnectionPrincipal;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
+public class JMXConnectionPrincipal implements ManagementConnectionPrincipal
+{
+ private final InetSocketAddress _address;
+
+ public JMXConnectionPrincipal(final String host)
+ {
+ _address = new InetSocketAddress(host,0);
+ }
+
+ @Override
+ public SocketAddress getRemoteAddress()
+ {
+ return _address;
+ }
+
+ @Override
+ public String getName()
+ {
+ return _address.toString();
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final JMXConnectionPrincipal that = (JMXConnectionPrincipal) o;
+
+ if (!_address.equals(that._address))
+ {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return _address.hashCode();
+ }
+
+ @Override
+ public String getType()
+ {
+ return "JMX";
+ }
+}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/jmx/JMXPasswordAuthenticator.java b/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/jmx/JMXPasswordAuthenticator.java
index 94de7754f6..ae080ff779 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/jmx/JMXPasswordAuthenticator.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/jmx/JMXPasswordAuthenticator.java
@@ -20,18 +20,14 @@
*/
package org.apache.qpid.server.security.auth.jmx;
-import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.rmi.server.RemoteServer;
import java.rmi.server.ServerNotActiveException;
-import java.security.Principal;
import java.security.PrivilegedAction;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
-import org.apache.qpid.server.security.auth.SocketConnectionPrincipal;
import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
import javax.management.remote.JMXAuthenticator;
@@ -123,7 +119,7 @@ public class JMXPasswordAuthenticator implements JMXAuthenticator
originalSubject.getPrincipals(),
originalSubject.getPublicCredentials(),
originalSubject.getPrivateCredentials());
- subject.getPrincipals().add(new JMSConnectionPrincipal(clientHost));
+ subject.getPrincipals().add(new JMXConnectionPrincipal(clientHost));
subject.setReadOnly();
}
catch(ServerNotActiveException e)
@@ -153,53 +149,4 @@ public class JMXPasswordAuthenticator implements JMXAuthenticator
}
- private static class JMSConnectionPrincipal implements SocketConnectionPrincipal
- {
- private final InetSocketAddress _address;
-
- public JMSConnectionPrincipal(final String host)
- {
- _address = new InetSocketAddress(host,0);
- }
-
- @Override
- public SocketAddress getRemoteAddress()
- {
- return _address;
- }
-
- @Override
- public String getName()
- {
- return _address.toString();
- }
-
- @Override
- public boolean equals(final Object o)
- {
- if (this == o)
- {
- return true;
- }
- if (o == null || getClass() != o.getClass())
- {
- return false;
- }
-
- final JMSConnectionPrincipal that = (JMSConnectionPrincipal) o;
-
- if (!_address.equals(that._address))
- {
- return false;
- }
-
- return true;
- }
-
- @Override
- public int hashCode()
- {
- return _address.hashCode();
- }
- }
} \ No newline at end of file
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java
index 3c5fb44d23..bdc059e912 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java
@@ -28,10 +28,9 @@ import java.util.Map;
import java.util.UUID;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
-import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
-import org.apache.qpid.server.util.ServerScopedRuntimeException;
import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION;
@@ -103,7 +102,7 @@ public class DurableConfigurationRecoverer implements ConfigurationRecoveryHandl
checkUnresolvedDependencies();
applyUpgrade();
- CurrentActor.get().message(_logSubject, ConfigStoreMessages.RECOVERY_COMPLETE());
+ SystemLog.message(_logSubject, ConfigStoreMessages.RECOVERY_COMPLETE());
return CURRENT_CONFIG_VERSION;
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/OperationalLoggingListener.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/OperationalLoggingListener.java
index 4ab1a3ab05..1420c950d9 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/store/OperationalLoggingListener.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/OperationalLoggingListener.java
@@ -20,7 +20,7 @@
package org.apache.qpid.server.store;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
import org.apache.qpid.server.logging.messages.MessageStoreMessages;
import org.apache.qpid.server.logging.messages.TransactionLogMessages;
@@ -49,31 +49,31 @@ public class OperationalLoggingListener implements EventListener
switch(event)
{
case BEFORE_INIT:
- CurrentActor.get().message(_logSubject, ConfigStoreMessages.CREATED());
+ SystemLog.message(_logSubject, ConfigStoreMessages.CREATED());
break;
case AFTER_INIT:
- CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED());
- CurrentActor.get().message(_logSubject, TransactionLogMessages.CREATED());
+ SystemLog.message(_logSubject, MessageStoreMessages.CREATED());
+ SystemLog.message(_logSubject, TransactionLogMessages.CREATED());
String storeLocation = _store.getStoreLocation();
if (storeLocation != null)
{
- CurrentActor.get().message(_logSubject, MessageStoreMessages.STORE_LOCATION(storeLocation));
+ SystemLog.message(_logSubject, MessageStoreMessages.STORE_LOCATION(storeLocation));
}
break;
case BEFORE_ACTIVATE:
- CurrentActor.get().message(_logSubject, MessageStoreMessages.RECOVERY_START());
+ SystemLog.message(_logSubject, MessageStoreMessages.RECOVERY_START());
break;
case AFTER_ACTIVATE:
- CurrentActor.get().message(_logSubject, MessageStoreMessages.RECOVERY_COMPLETE());
+ SystemLog.message(_logSubject, MessageStoreMessages.RECOVERY_COMPLETE());
break;
case AFTER_CLOSE:
- CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED());
+ SystemLog.message(_logSubject, MessageStoreMessages.CLOSED());
break;
case PERSISTENT_MESSAGE_SIZE_OVERFULL:
- CurrentActor.get().message(_logSubject,MessageStoreMessages.OVERFULL());
+ SystemLog.message(_logSubject, MessageStoreMessages.OVERFULL());
break;
case PERSISTENT_MESSAGE_SIZE_UNDERFULL:
- CurrentActor.get().message(_logSubject,MessageStoreMessages.UNDERFULL());
+ SystemLog.message(_logSubject, MessageStoreMessages.UNDERFULL());
break;
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 739dd1cc5b..c5eb70c1f0 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -37,6 +37,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.exchange.AMQUnknownExchangeType;
import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.configuration.ExchangeConfiguration;
@@ -48,7 +49,6 @@ import org.apache.qpid.server.exchange.DefaultExchangeFactory;
import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.VirtualHostMessages;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageNode;
@@ -150,7 +150,7 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
_id = UUIDGenerator.generateVhostUUID(_name);
- CurrentActor.get().message(VirtualHostMessages.CREATED(_name));
+ SystemLog.message(VirtualHostMessages.CREATED(_name));
_securityManager = new SecurityManager(parentSecurityManager, _vhostConfig.getConfig().getString("security.acl"), _name);
@@ -704,7 +704,7 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
_state = State.STOPPED;
- CurrentActor.get().message(VirtualHostMessages.CLOSED());
+ SystemLog.message(VirtualHostMessages.CLOSED());
}
protected void closeStorage()
@@ -911,7 +911,7 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
{
if (state == State.ERRORED)
{
- CurrentActor.get().message(VirtualHostMessages.ERRORED());
+ SystemLog.message(VirtualHostMessages.ERRORED());
}
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java
index 1b0e50fd34..83ef437410 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java
@@ -22,9 +22,11 @@ package org.apache.qpid.server.virtualhost;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.logging.RootMessageLogger;
-import org.apache.qpid.server.logging.actors.AbstractActor;
-import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.security.auth.TaskPrincipal;
+
+import javax.security.auth.Subject;
+import java.security.PrivilegedAction;
+import java.util.Collections;
public abstract class HouseKeepingTask implements Runnable
{
@@ -34,12 +36,17 @@ public abstract class HouseKeepingTask implements Runnable
private String _name;
- private RootMessageLogger _rootLogger;
+ private final Subject _subject;
+
public HouseKeepingTask(VirtualHost vhost)
{
_virtualHost = vhost;
_name = _virtualHost.getName() + ":" + this.getClass().getSimpleName();
- _rootLogger = CurrentActor.get().getRootMessageLogger();
+ _subject = new Subject(false, org.apache.qpid.server.security.SecurityManager.SYSTEM.getPrincipals(), Collections
+ .emptySet(), Collections.emptySet());
+ _subject.getPrincipals().add(new TaskPrincipal(_name));
+ _subject.setReadOnly();
+
}
final public void run()
@@ -47,27 +54,27 @@ public abstract class HouseKeepingTask implements Runnable
String originalThreadName = Thread.currentThread().getName();
Thread.currentThread().setName(_name);
- CurrentActor.set(new AbstractActor(_rootLogger)
- {
- @Override
- public String getLogMessage()
- {
- return _name;
- }
- });
-
try
{
- execute();
- }
- catch (Exception e)
- {
- _logger.warn(this.getClass().getSimpleName() + " throw exception: " + e, e);
+ Subject.doAs(_subject, new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ try
+ {
+ execute();
+ }
+ catch (Exception e)
+ {
+ _logger.warn(this.getClass().getSimpleName() + " throw exception: " + e, e);
+ }
+ return null;
+ }
+ });
}
finally
{
- CurrentActor.remove();
-
// eagerly revert the thread name to make thread dumps more meaningful if captured after task has finished
Thread.currentThread().setName(originalThreadName);
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
index 302c61e491..c5ea2c7ac1 100755
--- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
@@ -28,7 +28,7 @@ import java.util.UUID;
import org.apache.log4j.Logger;
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.logging.messages.TransactionLogMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.message.EnqueueableMessage;
@@ -82,7 +82,7 @@ public class VirtualHostConfigRecoveryHandler implements
{
_logSubject = new MessageStoreLogSubject(_virtualHost.getName(), store.getClass().getSimpleName());
_store = store;
- CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_START(null, false));
+ SystemLog.message(_logSubject, TransactionLogMessages.RECOVERY_START(null, false));
return this;
}
@@ -149,9 +149,9 @@ public class VirtualHostConfigRecoveryHandler implements
else
{
StringBuilder xidString = xidAsString(id);
- CurrentActor.get().message(_logSubject,
- TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(),
- Long.toString(messageId)));
+ SystemLog.message(_logSubject,
+ TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(),
+ Long.toString(messageId)));
}
@@ -159,9 +159,9 @@ public class VirtualHostConfigRecoveryHandler implements
else
{
StringBuilder xidString = xidAsString(id);
- CurrentActor.get().message(_logSubject,
- TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
- record.getResource().getId().toString()));
+ SystemLog.message(_logSubject,
+ TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
+ record.getResource().getId().toString()));
}
}
@@ -199,9 +199,9 @@ public class VirtualHostConfigRecoveryHandler implements
else
{
StringBuilder xidString = xidAsString(id);
- CurrentActor.get().message(_logSubject,
- TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(),
- Long.toString(messageId)));
+ SystemLog.message(_logSubject,
+ TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(),
+ Long.toString(messageId)));
}
@@ -209,9 +209,9 @@ public class VirtualHostConfigRecoveryHandler implements
else
{
StringBuilder xidString = xidAsString(id);
- CurrentActor.get().message(_logSubject,
- TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
- record.getResource().getId().toString()));
+ SystemLog.message(_logSubject,
+ TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
+ record.getResource().getId().toString()));
}
}
@@ -238,7 +238,7 @@ public class VirtualHostConfigRecoveryHandler implements
_logger.warn("Message id " + m.getMessageNumber() + " in store, but not in any queue - removing....");
m.remove();
}
- CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false));
+ SystemLog.message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false));
}
public void complete()
@@ -316,9 +316,9 @@ public class VirtualHostConfigRecoveryHandler implements
for(Map.Entry<String,Integer> entry : _queueRecoveries.entrySet())
{
- CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERED(entry.getValue(), entry.getKey()));
+ SystemLog.message(_logSubject, TransactionLogMessages.RECOVERED(entry.getValue(), entry.getKey()));
- CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(entry.getKey(), true));
+ SystemLog.message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(entry.getKey(), true));
}
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/TransactionTimeoutHelperTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/TransactionTimeoutHelperTest.java
index 7a7954c8f9..4548bd9210 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/TransactionTimeoutHelperTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/TransactionTimeoutHelperTest.java
@@ -21,7 +21,9 @@ package org.apache.qpid.server;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.qpid.server.logging.messages.ChannelMessages.IDLE_TXN_LOG_HIERARCHY;
import static org.apache.qpid.server.logging.messages.ChannelMessages.OPEN_TXN_LOG_HIERARCHY;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
@@ -29,10 +31,10 @@ import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
-import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.test.utils.QpidTestCase;
import org.hamcrest.Description;
@@ -40,7 +42,7 @@ import org.mockito.ArgumentMatcher;
public class TransactionTimeoutHelperTest extends QpidTestCase
{
- private final LogActor _logActor = mock(LogActor.class);
+ private final RootMessageLogger _rootLogger = mock(RootMessageLogger.class);
private final LogSubject _logSubject = mock(LogSubject.class);
private final ServerTransaction _transaction = mock(ServerTransaction.class);
private final CloseAction _closeAction = mock(CloseAction.class);
@@ -53,7 +55,7 @@ public class TransactionTimeoutHelperTest extends QpidTestCase
_transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, 5, 10, 5, 10);
- verifyZeroInteractions(_logActor, _closeAction);
+ verifyZeroInteractions(_rootLogger, _closeAction);
}
public void testOpenTransactionProducesWarningOnly() throws Exception
@@ -64,7 +66,7 @@ public class TransactionTimeoutHelperTest extends QpidTestCase
_transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, SECONDS.toMillis(30), 0, 0, 0);
- verify(_logActor).message(same(_logSubject), isLogMessage(OPEN_TXN_LOG_HIERARCHY, "CHN-1007 : Open Transaction : 61,\\d{3} ms"));
+ verify(_rootLogger).message(same(_logSubject), isLogMessage(OPEN_TXN_LOG_HIERARCHY, "CHN-1007 : Open Transaction : 61,\\d{3} ms"));
verifyZeroInteractions(_closeAction);
}
@@ -77,7 +79,7 @@ public class TransactionTimeoutHelperTest extends QpidTestCase
_transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, 0, SECONDS.toMillis(30), 0, 0);
verify(_closeAction).doTimeoutAction("Open transaction timed out");
- verifyZeroInteractions(_logActor);
+ verifyZeroInteractions(_rootLogger);
}
public void testOpenTransactionProducesWarningAndTimeoutAction() throws Exception
@@ -88,7 +90,7 @@ public class TransactionTimeoutHelperTest extends QpidTestCase
_transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, SECONDS.toMillis(15), SECONDS.toMillis(30), 0, 0);
- verify(_logActor).message(same(_logSubject), isLogMessage(OPEN_TXN_LOG_HIERARCHY, "CHN-1007 : Open Transaction : 61,\\d{3} ms"));
+ verify(_rootLogger).message(same(_logSubject), isLogMessage(OPEN_TXN_LOG_HIERARCHY, "CHN-1007 : Open Transaction : 61,\\d{3} ms"));
verify(_closeAction).doTimeoutAction("Open transaction timed out");
}
@@ -101,7 +103,7 @@ public class TransactionTimeoutHelperTest extends QpidTestCase
_transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, 0, 0, SECONDS.toMillis(30), 0);
- verify(_logActor).message(same(_logSubject), isLogMessage(IDLE_TXN_LOG_HIERARCHY, "CHN-1008 : Idle Transaction : 31,\\d{3} ms"));
+ verify(_rootLogger).message(same(_logSubject), isLogMessage(IDLE_TXN_LOG_HIERARCHY, "CHN-1008 : Idle Transaction : 31,\\d{3} ms"));
verifyZeroInteractions(_closeAction);
}
@@ -115,7 +117,7 @@ public class TransactionTimeoutHelperTest extends QpidTestCase
_transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, 0, 0, 0, SECONDS.toMillis(30));
verify(_closeAction).doTimeoutAction("Idle transaction timed out");
- verifyZeroInteractions(_logActor);
+ verifyZeroInteractions(_rootLogger);
}
public void testIdleTransactionProducesWarningAndTimeoutAction() throws Exception
@@ -127,7 +129,7 @@ public class TransactionTimeoutHelperTest extends QpidTestCase
_transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, 0, 0, SECONDS.toMillis(15), SECONDS.toMillis(30));
- verify(_logActor).message(same(_logSubject), isLogMessage(IDLE_TXN_LOG_HIERARCHY, "CHN-1008 : Idle Transaction : 31,\\d{3} ms"));
+ verify(_rootLogger).message(same(_logSubject), isLogMessage(IDLE_TXN_LOG_HIERARCHY, "CHN-1008 : Idle Transaction : 31,\\d{3} ms"));
verify(_closeAction).doTimeoutAction("Idle transaction timed out");
}
@@ -140,8 +142,8 @@ public class TransactionTimeoutHelperTest extends QpidTestCase
_transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, SECONDS.toMillis(60), 0, SECONDS.toMillis(30), 0);
- verify(_logActor).message(same(_logSubject), isLogMessage(IDLE_TXN_LOG_HIERARCHY, "CHN-1008 : Idle Transaction : 31,\\d{3} ms"));
- verify(_logActor).message(same(_logSubject), isLogMessage(OPEN_TXN_LOG_HIERARCHY, "CHN-1007 : Open Transaction : 61,\\d{3} ms"));
+ verify(_rootLogger).message(same(_logSubject), isLogMessage(IDLE_TXN_LOG_HIERARCHY, "CHN-1008 : Idle Transaction : 31,\\d{3} ms"));
+ verify(_rootLogger).message(same(_logSubject), isLogMessage(OPEN_TXN_LOG_HIERARCHY, "CHN-1007 : Open Transaction : 61,\\d{3} ms"));
verifyZeroInteractions(_closeAction);
}
@@ -149,26 +151,14 @@ public class TransactionTimeoutHelperTest extends QpidTestCase
protected void setUp() throws Exception
{
super.setUp();
-
- CurrentActor.set(_logActor);
-
+ when(_logSubject.toLogString()).thenReturn("");
+ when(_rootLogger.isEnabled()).thenReturn(true);
+ when(_rootLogger.isMessageEnabled(anyString())).thenReturn(true);
+ SystemLog.setRootMessageLogger(_rootLogger);
_transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject, _closeAction);
_now = System.currentTimeMillis();
}
- @Override
- protected void tearDown() throws Exception
- {
- try
- {
- super.tearDown();
- }
- finally
- {
- CurrentActor.remove();
- }
- }
-
private void configureMockTransaction(final long startTime, final long updateTime)
{
when(_transaction.isTransactional()).thenReturn(true);
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/TaskExecutorTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/TaskExecutorTest.java
index a54ba06c53..083160a31d 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/TaskExecutorTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/TaskExecutorTest.java
@@ -20,11 +20,9 @@
*/
package org.apache.qpid.server.configuration.updater;
-import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@@ -37,12 +35,7 @@ import javax.security.auth.Subject;
import junit.framework.TestCase;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.NullRootMessageLogger;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.TestLogActor;
import org.apache.qpid.server.model.State;
-import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
public class TaskExecutorTest extends TestCase
@@ -215,37 +208,26 @@ public class TaskExecutorTest extends TestCase
public void testSubmitAndWaitCurrentActorAndSecurityManagerSubjectAreRespected() throws Exception
{
_executor.start();
- LogActor actor = new TestLogActor(new NullRootMessageLogger());
Subject subject = new Subject();
- final AtomicReference<LogActor> taskLogActor = new AtomicReference<LogActor>();
final AtomicReference<Subject> taskSubject = new AtomicReference<Subject>();
- try
+ Subject.doAs(subject, new PrivilegedAction<Object>()
{
- CurrentActor.set(actor);
- Subject.doAs(subject, new PrivilegedAction<Object>()
+ @Override
+ public Object run()
+ {
+ _executor.submitAndWait(new TaskExecutor.Task<Object>()
{
@Override
- public Object run()
- { _executor.submitAndWait(new TaskExecutor.Task<Object>()
+ public Void call()
{
- @Override
- public Void call()
- {
- taskLogActor.set(CurrentActor.get());
- taskSubject.set(Subject.getSubject(AccessController.getContext()));
- return null;
- }
- });
- return null;
+ taskSubject.set(Subject.getSubject(AccessController.getContext()));
+ return null;
}
});
+ return null;
+ }
+ });
- }
- finally
- {
- CurrentActor.remove();
- }
- assertEquals("Unexpected task log actor", actor, taskLogActor.get());
assertEquals("Unexpected security manager subject", subject, taskSubject.get());
}
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
index fa75d41810..0532386ff9 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
@@ -33,8 +33,6 @@ import java.util.UUID;
import junit.framework.TestCase;
import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
@@ -54,7 +52,6 @@ public class FanoutExchangeTest extends TestCase
public void setUp() throws UnknownExchangeException
{
- CurrentActor.setDefault(mock(LogActor.class));
Map<String,Object> attributes = new HashMap<String, Object>();
attributes.put(Exchange.ID, UUID.randomUUID());
attributes.put(Exchange.NAME, "test");
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
index 8c9132d166..50b448eea3 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
@@ -24,8 +24,6 @@ import java.util.Collection;
import junit.framework.TestCase;
import org.apache.qpid.server.binding.BindingImpl;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
@@ -148,7 +146,6 @@ public class HeadersBindingTest extends TestCase
VirtualHost vhost = mock(VirtualHost.class);
when(_queue.getVirtualHost()).thenReturn(vhost);
when(vhost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class));
- CurrentActor.set(mock(LogActor.class));
_exchange = mock(ExchangeImpl.class);
when(_exchange.getExchangeType()).thenReturn(mock(ExchangeType.class));
}
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
index 76752de5d0..9377bbac47 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
@@ -30,8 +30,6 @@ import java.util.Set;
import java.util.UUID;
import junit.framework.TestCase;
import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
@@ -58,7 +56,6 @@ public class HeadersExchangeTest extends TestCase
{
super.setUp();
- CurrentActor.setDefault(mock(LogActor.class));
_virtualHost = mock(VirtualHost.class);
SecurityManager securityManager = mock(SecurityManager.class);
when(_virtualHost.getSecurityManager()).thenReturn(securityManager);
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/logging/Log4jMessageLoggerTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/logging/Log4jMessageLoggerTest.java
index f6e301ec07..2078c3634f 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/logging/Log4jMessageLoggerTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/logging/Log4jMessageLoggerTest.java
@@ -26,8 +26,6 @@ import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
-import org.apache.qpid.server.logging.actors.BrokerActor;
-
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
@@ -156,18 +154,17 @@ public class Log4jMessageLoggerTest extends TestCase
Logger.getLogger(loggerName2).setLevel(Level.OFF);
Log4jMessageLogger msgLogger = new Log4jMessageLogger();
- BrokerActor actor = new BrokerActor(msgLogger);
-
+
assertTrue("Expected message logger to be enabled", msgLogger.isEnabled());
- assertTrue("Message should be enabled", msgLogger.isMessageEnabled(actor, loggerName1));
- assertFalse("Message should be disabled", msgLogger.isMessageEnabled(actor, loggerName2));
+ assertTrue("Message should be enabled", msgLogger.isMessageEnabled(loggerName1));
+ assertFalse("Message should be disabled", msgLogger.isMessageEnabled(loggerName2));
Logger.getLogger(loggerName1).setLevel(Level.WARN);
Logger.getLogger(loggerName2).setLevel(Level.INFO);
- assertFalse("Message should be disabled", msgLogger.isMessageEnabled(actor, loggerName1));
- assertTrue("Message should be enabled", msgLogger.isMessageEnabled(actor, loggerName2));
+ assertFalse("Message should be disabled", msgLogger.isMessageEnabled(loggerName1));
+ assertTrue("Message should be enabled", msgLogger.isMessageEnabled(loggerName2));
}
/**
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java
deleted file mode 100644
index 6ef319bcdf..0000000000
--- a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.logging.actors;
-
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.util.BrokerTestHelper;
-
-import java.util.List;
-
-/**
- * Test : AMQPChannelActorTest
- * Validate the AMQPChannelActor class.
- *
- * The test creates a new AMQPActor and then logs a message using it.
- *
- * The test then verifies that the logged message was the only one created and
- * that the message contains the required message.
- */
-public class AMQPChannelActorTest extends BaseConnectionActorTestCase
-{
-
- public void setUp()
- {
- // do nothing
- }
-
- private void setUpNow() throws Exception
- {
- super.setUp();
- AMQSessionModel channel = BrokerTestHelper.createSession(1, getConnection());
-
- setAmqpActor(new AMQPChannelActor(channel, getRootLogger()));
- }
-
-
- /**
- * Test that when logging on behalf of the channel
- * The test sends a message then verifies that it entered the logs.
- *
- * The log message should be fully replaced (no '{n}' values) and should
- * contain the channel id ('/ch:1') identification.
- */
- public void testChannel() throws Exception
- {
- setUpNow();
-
- final String message = sendTestLogMessage(getAmqpActor());
-
- List<Object> logs = getRawLogger().getLogMessages();
-
- assertEquals("Message log size not as expected.", 1, logs.size());
-
- // Verify that the logged message is present in the output
- assertTrue("Message was not found in log message:" + logs.get(0),
- logs.get(0).toString().contains(message));
-
- // Verify that the message has the correct type
- assertTrue("Message contains the [con: prefix",
- logs.get(0).toString().contains("[con:"));
-
-
- // Verify that all the values were presented to the MessageFormatter
- // so we will not end up with '{n}' entries in the log.
- assertFalse("Verify that the string does not contain any '{'." + logs.get(0),
- logs.get(0).toString().contains("{"));
-
- // Verify that the logged message contains the 'ch:1' marker
- assertTrue("Message was not logged as part of channel 1" + logs.get(0),
- logs.get(0).toString().contains("/ch:1"));
- }
-
- /**
- * Test that if logging is configured to be off via system property that
- * no logging is presented
- */
- public void testChannelLoggingOFF() throws Exception
- {
- setStatusUpdatesEnabled(false);
-
- setUpNow();
-
- sendTestLogMessage(getAmqpActor());
-
- List<Object> logs = getRawLogger().getLogMessages();
-
- assertEquals("Message log size not as expected.", 0, logs.size());
- }
-}
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java
index bee2930fd7..57ef51e170 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java
@@ -20,9 +20,14 @@
*/
package org.apache.qpid.server.logging.actors;
+import org.apache.qpid.server.connection.ConnectionPrincipal;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.SystemLog;
+import javax.security.auth.Subject;
+import java.security.PrivilegedAction;
+import java.util.Collections;
import java.util.List;
/**
@@ -54,6 +59,8 @@ public class AMQPConnectionActorTest extends BaseConnectionActorTestCase
{
super.setUp();
+ // ignore all the startup log messages
+ getRawLogger().clearLogMessages();
final String message = sendLogMessage();
List<Object> logs = getRawLogger().getLogMessages();
@@ -95,24 +102,34 @@ public class AMQPConnectionActorTest extends BaseConnectionActorTestCase
private String sendLogMessage()
{
final String message = "test logging";
-
- getAmqpActor().message(new LogSubject()
+ Subject subject = new Subject(false, Collections.singleton(new ConnectionPrincipal(getConnection())), Collections.emptySet(), Collections.emptySet());
+ Subject.doAs(subject, new PrivilegedAction<Object>()
{
- public String toLogString()
+ @Override
+ public Object run()
{
- return "[AMQPActorTest]";
- }
+ SystemLog.message(new LogSubject()
+ {
+ public String toLogString()
+ {
+ return "[AMQPActorTest]";
+ }
+
+ }, new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+
+ public String getLogHierarchy()
+ {
+ return "test.hierarchy";
+ }
+ }
+ );
+ return null;
- }, new LogMessage()
- {
- public String toString()
- {
- return message;
- }
-
- public String getLogHierarchy()
- {
- return "test.hierarchy";
}
});
return message;
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/AbstractManagementActorTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/AbstractManagementActorTest.java
deleted file mode 100644
index bf38bb64bf..0000000000
--- a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/AbstractManagementActorTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.logging.actors;
-
-import java.security.Principal;
-import java.security.PrivilegedAction;
-import java.util.Collections;
-
-import javax.security.auth.Subject;
-
-import org.apache.qpid.server.logging.NullRootMessageLogger;
-import org.apache.qpid.server.security.auth.TestPrincipalUtils;
-import org.apache.qpid.test.utils.QpidTestCase;
-
-public class AbstractManagementActorTest extends QpidTestCase
-{
- private AbstractManagementActor _logActor;
-
- @Override
- public void setUp()
- {
- _logActor = new AbstractManagementActor(new NullRootMessageLogger(), AbstractManagementActor.UNKNOWN_PRINCIPAL)
- {
- @Override
- public String getLogMessage()
- {
- return null;
- }
- };
- }
-
- public void testGetPrincipalName()
- {
- Subject subject = TestPrincipalUtils.createTestSubject("guest");
-
- final String principalName = Subject.doAs(subject,
- new PrivilegedAction<String>()
- {
- public String run()
- {
- return _logActor.getPrincipalName();
- }
- });
-
- assertEquals("guest", principalName);
- }
-
- public void testGetPrincipalNameUsingSubjectWithoutAuthenticatedPrincipal()
- {
- Subject subject = new Subject(true, Collections.<Principal>emptySet(), Collections.emptySet(), Collections.emptySet());
-
- final String principalName = Subject.doAs(subject,
- new PrivilegedAction<String>()
- {
- public String run()
- {
- return _logActor.getPrincipalName();
- }
- });
-
- assertEquals(AbstractManagementActor.UNKNOWN_PRINCIPAL, principalName);
- }
-
- public void testGetPrincipalWithoutSubject()
- {
- assertEquals(AbstractManagementActor.UNKNOWN_PRINCIPAL, _logActor.getPrincipalName());
- }
-}
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseActorTestCase.java b/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseActorTestCase.java
index 30c3a51604..4ff46658ef 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseActorTestCase.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseActorTestCase.java
@@ -20,17 +20,16 @@
*/
package org.apache.qpid.server.logging.actors;
-import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.logging.UnitTestMessageLogger;
import org.apache.qpid.test.utils.QpidTestCase;
public class BaseActorTestCase extends QpidTestCase
{
private boolean _statusUpdatesEnabled = true;
- private LogActor _amqpActor;
private UnitTestMessageLogger _rawLogger;
private RootMessageLogger _rootLogger;
@@ -38,10 +37,9 @@ public class BaseActorTestCase extends QpidTestCase
public void setUp() throws Exception
{
super.setUp();
- CurrentActor.removeAll();
- CurrentActor.setDefault(null);
_rawLogger = new UnitTestMessageLogger(_statusUpdatesEnabled);
_rootLogger = _rawLogger;
+ SystemLog.setRootMessageLogger(_rootLogger);
}
@Override
@@ -51,40 +49,39 @@ public class BaseActorTestCase extends QpidTestCase
{
_rawLogger.clearLogMessages();
}
- CurrentActor.removeAll();
- CurrentActor.setDefault(null);
super.tearDown();
}
- public String sendTestLogMessage(LogActor actor)
+ public String sendTestLogMessage()
{
String message = "Test logging: " + getName();
- sendTestLogMessage(actor, message);
+ sendTestLogMessage(message);
return message;
}
- public void sendTestLogMessage(LogActor actor, final String message)
+ public void sendTestLogMessage(final String message)
{
- actor.message(new LogSubject()
- {
- public String toLogString()
- {
- return message;
- }
+ SystemLog.message(new LogSubject()
+ {
+ public String toLogString()
+ {
+ return message;
+ }
- }, new LogMessage()
- {
- public String toString()
- {
- return message;
- }
+ }, new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
- public String getLogHierarchy()
- {
- return "test.hierarchy";
- }
- });
+ public String getLogHierarchy()
+ {
+ return "test.hierarchy";
+ }
+ }
+ );
}
public boolean isStatusUpdatesEnabled()
@@ -97,16 +94,6 @@ public class BaseActorTestCase extends QpidTestCase
_statusUpdatesEnabled = statusUpdatesEnabled;
}
- public LogActor getAmqpActor()
- {
- return _amqpActor;
- }
-
- public void setAmqpActor(LogActor amqpActor)
- {
- _amqpActor = amqpActor;
- }
-
public UnitTestMessageLogger getRawLogger()
{
return _rawLogger;
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java b/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java
index 1cb6474e41..797aa477a9 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java
@@ -37,7 +37,6 @@ public class BaseConnectionActorTestCase extends BaseActorTestCase
BrokerTestHelper.setUp();
_session = BrokerTestHelper.createConnection();
_virtualHost = BrokerTestHelper.createVirtualHost("test");
- setAmqpActor(new AMQPConnectionActor(_session, getRootLogger()));
}
public VirtualHost getVirtualHost()
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java
deleted file mode 100644
index 300fcd70d3..0000000000
--- a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.logging.actors;
-
-import org.apache.commons.configuration.ConfigurationException;
-
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.NullRootMessageLogger;
-import org.apache.qpid.server.util.BrokerTestHelper;
-
-/**
- * Test : CurrentActorTest
- * Summary:
- * Validate ThreadLocal operation.
- *
- * Test creates THREADS number of threads which all then execute the same test
- * together ( as close as looping Thread.start() will allow).
- *
- * Test:
- * Test sets the CurrentActor then proceeds to retrieve the value and use it.
- *
- * The test also validates that it is the same LogActor that this thread set.
- *
- * Finally the LogActor is removed and tested to make sure that it was
- * successfully removed.
- *
- * By having a higher number of threads than would normally be used in the
- * Pooling filter we aim to catch the race condition where a ThreadLocal remove
- * is called before one or more threads call get(). This way we can ensure that
- * the remove does not affect more than the Thread it was called in.
- */
-public class CurrentActorTest extends BaseConnectionActorTestCase
-{
- //Set this to be a reasonably large number
- private static final int THREADS = 10;
-
- /**
- * Test that CurrentActor behaves as LIFO queue.
- *
- * Test creates two Actors Connection and Channel and then sets the
- * CurrentActor.
- *
- * The test validates that CurrentActor remembers the Connection actor
- * after the Channel actor has been removed.
- *
- * And then finally validates that removing the Connection actor results
- * in there being no actors set.
- *
- * @throws org.apache.commons.configuration.ConfigurationException
- */
- public void testLIFO() throws ConfigurationException
- {
- assertTrue("Unexpected actor: " + CurrentActor.get(), CurrentActor.get() instanceof TestLogActor);
- AMQPConnectionActor connectionActor = new AMQPConnectionActor(getConnection(),
- new NullRootMessageLogger());
-
- /*
- * Push the actor on to the stack:
- *
- * CurrentActor -> Connection
- * Stack -> null
- */
- CurrentActor.set(connectionActor);
-
- //Use the Actor to send a simple message
- sendTestLogMessage(CurrentActor.get());
-
- // Verify it was the same actor as we set earlier
- assertEquals("Retrieved actor is not as expected ",
- connectionActor, CurrentActor.get());
-
- /**
- * Set the actor to now be the Channel actor so testing the ability
- * to push the actor on to the stack:
- *
- * CurrentActor -> Channel
- * Stack -> Connection, null
- *
- */
-
- AMQSessionModel channel = BrokerTestHelper.createSession(1, getConnection());
-
- AMQPChannelActor channelActor = new AMQPChannelActor(channel,
- new NullRootMessageLogger());
-
- CurrentActor.set(channelActor);
-
- //Use the Actor to send a simple message
- sendTestLogMessage(CurrentActor.get());
-
- // Verify it was the same actor as we set earlier
- assertEquals("Retrieved actor is not as expected ",
- channelActor, CurrentActor.get());
-
- // Remove the ChannelActor from the stack
- CurrentActor.remove();
- /*
- * Pop the actor on to the stack:
- *
- * CurrentActor -> Connection
- * Stack -> null
- */
-
-
- // Verify we now have the same connection actor as we set earlier
- assertEquals("Retrieved actor is not as expected ",
- connectionActor, CurrentActor.get());
-
- // Verify that removing the our last actor it returns us to the test
- // default that the ApplicationRegistry sets.
- CurrentActor.remove();
- /*
- * Pop the actor on to the stack:
- *
- * CurrentActor -> null
- */
-
-
- assertEquals("CurrentActor not the Test default", TestLogActor.class ,CurrentActor.get().getClass());
- }
-
- /**
- * Test the setting CurrentActor is done correctly as a ThreadLocal.
- *
- * The test starts 'THREADS' threads that all set the CurrentActor log
- * a message then remove the actor.
- *
- * Checks are done to ensure that there is no set actor after the remove.
- *
- * If the ThreadLocal was not working then having concurrent actor sets
- * would result in more than one actor and so the remove will not result
- * in the clearing of the CurrentActor
- *
- */
- public void testThreadLocal()
- {
-
- // Setup the threads
- LogMessagesWithAConnectionActor[] threads = new LogMessagesWithAConnectionActor[THREADS];
- for (int count = 0; count < THREADS; count++)
- {
- threads[count] = new LogMessagesWithAConnectionActor();
- }
-
- //Run the threads
- for (int count = 0; count < THREADS; count++)
- {
- threads[count].start();
- }
-
- // Wait for them to finish
- for (int count = 0; count < THREADS; count++)
- {
- try
- {
- threads[count].join();
- }
- catch (InterruptedException e)
- {
- //if we are interrupted then we will exit shortly.
- }
- }
-
- // Verify that none of the tests threw an exception
- for (int count = 0; count < THREADS; count++)
- {
- if (threads[count].getException() != null)
- {
- threads[count].getException().printStackTrace();
- fail("Error occurred in thread:" + count + "("+threads[count].getException()+")");
- }
- }
- }
-
- /**
- * Creates a new ConnectionActor and logs the given number of messages
- * before removing the actor and validating that there is no set actor.
- */
- public class LogMessagesWithAConnectionActor extends Thread
- {
- private Throwable _exception;
-
- public LogMessagesWithAConnectionActor()
- {
- }
-
- public void run()
- {
-
- // Create a new actor using retrieving the rootMessageLogger from
- // the default ApplicationRegistry.
- //TODO reminder that we need a better approach for broker testing.
- try
- {
- LogActor defaultActor = CurrentActor.get();
-
- AMQPConnectionActor actor = new AMQPConnectionActor(getConnection(),
- new NullRootMessageLogger());
-
- CurrentActor.set(actor);
-
- //Use the Actor to send a simple message
- sendTestLogMessage(CurrentActor.get());
-
- // Verify it was the same actor as we set earlier
- if(!actor.equals(CurrentActor.get()))
- {
- throw new IllegalArgumentException("Retrieved actor is not as expected ");
- }
-
- // Verify that removing the actor works for this thread
- CurrentActor.remove();
-
- if(CurrentActor.get() != defaultActor)
- {
- throw new IllegalArgumentException("CurrentActor ("+CurrentActor.get()+") should be default actor" + defaultActor);
- }
- }
- catch (Throwable e)
- {
- _exception = e;
- }
-
- }
-
- public Throwable getException()
- {
- return _exception;
- }
- }
-
-}
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/HttpManagementActorTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/HttpManagementActorTest.java
index 905de4b639..5b98693970 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/HttpManagementActorTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/HttpManagementActorTest.java
@@ -22,33 +22,74 @@ package org.apache.qpid.server.logging.actors;
import javax.security.auth.Subject;
+import org.apache.qpid.server.logging.LogMessage;
+import org.apache.qpid.server.logging.SystemLog;
+import org.apache.qpid.server.security.auth.ManagementConnectionPrincipal;
import org.apache.qpid.server.security.auth.TestPrincipalUtils;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.security.PrivilegedAction;
+import java.util.Collections;
import java.util.List;
public class HttpManagementActorTest extends BaseActorTestCase
{
+ public static final LogMessage EMPTY_MESSAGE = new LogMessage()
+ {
+ @Override
+ public String getLogHierarchy()
+ {
+ return "";
+ }
+
+ public String toString()
+ {
+ return "";
+ }
+ };
private static final String IP = "127.0.0.1";
private static final int PORT = 1;
- private static final String SUFFIX = "(" + IP + ":" + PORT + ")] ";
+ private static final String SUFFIX = "(/" + IP + ":" + PORT + ")] ";
+ private ManagementConnectionPrincipal _connectionPrincipal;
@Override
public void setUp() throws Exception
{
super.setUp();
- setAmqpActor(new HttpManagementActor(getRootLogger(), IP, PORT));
+ _connectionPrincipal = new ManagementConnectionPrincipal()
+ {
+ @Override
+ public String getType()
+ {
+ return "HTTP";
+ }
+
+ @Override
+ public SocketAddress getRemoteAddress()
+ {
+ return new InetSocketAddress(IP, PORT);
+ }
+
+ @Override
+ public String getName()
+ {
+ return getRemoteAddress().toString();
+ }
+ };
}
public void testSubjectPrincipalNameAppearance()
{
Subject subject = TestPrincipalUtils.createTestSubject("guest");
+ subject.getPrincipals().add(_connectionPrincipal);
+
final String message = Subject.doAs(subject, new PrivilegedAction<String>()
{
public String run()
{
- return sendTestLogMessage(getAmqpActor());
+ return sendTestLogMessage();
}
});
@@ -74,18 +115,46 @@ public class HttpManagementActorTest extends BaseActorTestCase
private void assertLogMessageWithoutPrincipal()
{
- String message = getAmqpActor().getLogMessage();
- assertEquals("Unexpected log message", "[mng:" + AbstractManagementActor.UNKNOWN_PRINCIPAL + SUFFIX, message);
+ getRawLogger().getLogMessages().clear();
+ Subject subject = new Subject(false,
+ Collections.singleton(_connectionPrincipal),
+ Collections.emptySet(),
+ Collections.emptySet());
+ Subject.doAs(subject, new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ SystemLog.message(EMPTY_MESSAGE);
+ List<Object> logs = getRawLogger().getLogMessages();
+ assertEquals("Message log size not as expected.", 1, logs.size());
+
+ String logMessage = logs.get(0).toString();
+ assertEquals("Unexpected log message",
+ "[mng:" + "N/A" + SUFFIX,
+ logMessage);
+ return null;
+ }
+ });
}
private void assertLogMessageWithPrincipal(String principalName)
{
+ getRawLogger().getLogMessages().clear();
+
Subject subject = TestPrincipalUtils.createTestSubject(principalName);
+ subject.getPrincipals().add(_connectionPrincipal);
final String message = Subject.doAs(subject, new PrivilegedAction<String>()
{
public String run()
{
- return getAmqpActor().getLogMessage();
+ SystemLog.message(EMPTY_MESSAGE);
+ List<Object> logs = getRawLogger().getLogMessages();
+ assertEquals("Message log size not as expected.", 1, logs.size());
+
+ String logMessage = logs.get(0).toString();
+
+ return logMessage;
}
});
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/ManagementActorTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/ManagementActorTest.java
deleted file mode 100644
index a0bfa592db..0000000000
--- a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/ManagementActorTest.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.logging.actors;
-
-import javax.security.auth.Subject;
-
-import org.apache.qpid.server.security.auth.TestPrincipalUtils;
-
-import java.security.PrivilegedAction;
-import java.util.List;
-
-public class ManagementActorTest extends BaseActorTestCase
-{
-
- private static final String IP = "127.0.0.1";
- private static final String CONNECTION_ID = "1";
- private String _threadName;
-
- @Override
- public void setUp() throws Exception
- {
- super.setUp();
- setAmqpActor(new ManagementActor(getRootLogger()));
-
- // Set the thread name to be the same as a RMI JMX Connection would use
- _threadName = Thread.currentThread().getName();
- Thread.currentThread().setName("RMI TCP Connection(" + CONNECTION_ID + ")-" + IP);
- }
-
- @Override
- public void tearDown() throws Exception
- {
- Thread.currentThread().setName(_threadName);
- super.tearDown();
- }
-
- /**
- * Test the AMQPActor logging as a Connection level.
- *
- * The test sends a message then verifies that it entered the logs.
- *
- * The log message should be fully replaced (no '{n}' values) and should
- * not contain any channel identification.
- */
- public void testConnection()
- {
- final String message = sendTestLogMessage(getAmqpActor());
-
- List<Object> logs = getRawLogger().getLogMessages();
-
- assertEquals("Message log size not as expected.", 1, logs.size());
-
- // Verify that the logged message is present in the output
- assertTrue("Message was not found in log message",
- logs.get(0).toString().contains(message));
-
- // Verify that all the values were presented to the MessageFormatter
- // so we will not end up with '{n}' entries in the log.
- assertFalse("Verify that the string does not contain any '{'.",
- logs.get(0).toString().contains("{"));
-
- // Verify that the message has the correct type
- assertTrue("Message does not contain the [mng: prefix",
- logs.get(0).toString().contains("[mng:"));
-
- // Verify that the logged message does not contains the 'ch:' marker
- assertFalse("Message was logged with a channel identifier." + logs.get(0),
- logs.get(0).toString().contains("/ch:"));
-
- // Verify that the message has the right values
- assertTrue("Message contains the [mng: prefix",
- logs.get(0).toString().contains("[mng:N/A(" + IP + ")"));
- }
-
- /**
- * Tests appearance of principal name in log message
- */
- public void testSubjectPrincipalNameAppearance()
- {
- Subject subject = TestPrincipalUtils.createTestSubject("guest");
-
- final String message = Subject.doAs(subject, new PrivilegedAction<String>()
- {
- public String run()
- {
- return sendTestLogMessage(getAmqpActor());
- }
- });
-
- // Verify that the log message was created
- assertNotNull("Test log message is not created!", message);
-
- List<Object> logs = getRawLogger().getLogMessages();
-
- // Verify that at least one log message was added to log
- assertEquals("Message log size not as expected.", 1, logs.size());
-
- String logMessage = logs.get(0).toString();
-
- // Verify that the logged message is present in the output
- assertTrue("Message was not found in log message", logMessage.contains(message));
-
- // Verify that the message has the right principal value
- assertTrue("Message contains the [mng: prefix", logMessage.contains("[mng:guest(" + IP + ")"));
- }
-
- public void testGetLogMessageWithSubject()
- {
- assertLogMessageInRMIThreadWithPrincipal("RMI TCP Connection(" + CONNECTION_ID + ")-" + IP, "my_principal");
- }
-
- public void testGetLogMessageWithoutSubjectButWithActorPrincipal()
- {
- String principalName = "my_principal";
- setAmqpActor(new ManagementActor(getRootLogger(), principalName));
- String message = getAmqpActor().getLogMessage();
- assertEquals("Unexpected log message", "[mng:" + principalName + "(" + IP + ")] ", message);
- }
-
- /** It's necessary to test successive calls because ManagementActor caches its log message based on thread and principal name */
- public void testGetLogMessageCaching()
- {
- String originalThreadName = "RMI TCP Connection(1)-" + IP;
- assertLogMessageInRMIThreadWithoutPrincipal(originalThreadName);
- assertLogMessageInRMIThreadWithPrincipal(originalThreadName, "my_principal");
- assertLogMessageInRMIThreadWithPrincipal("RMI TCP Connection(2)-" + IP, "my_principal");
- }
-
- public void testGetLogMessageAfterRemovingSubject()
- {
- assertLogMessageInRMIThreadWithPrincipal("RMI TCP Connection(1)-" + IP, "my_principal");
-
- Thread.currentThread().setName("RMI TCP Connection(2)-" + IP );
- String message = getAmqpActor().getLogMessage();
- assertEquals("Unexpected log message", "[mng:N/A(" + IP + ")] ", message);
-
- assertLogMessageWithoutPrincipal("TEST");
- }
-
- private void assertLogMessageInRMIThreadWithoutPrincipal(String threadName)
- {
- Thread.currentThread().setName(threadName );
- String message = getAmqpActor().getLogMessage();
- assertEquals("Unexpected log message", "[mng:N/A(" + IP + ")] ", message);
- }
-
- private void assertLogMessageWithoutPrincipal(String threadName)
- {
- Thread.currentThread().setName(threadName );
- String message = getAmqpActor().getLogMessage();
- assertEquals("Unexpected log message", "[" + threadName +"] ", message);
- }
-
- private void assertLogMessageInRMIThreadWithPrincipal(String threadName, String principalName)
- {
- Thread.currentThread().setName(threadName);
- Subject subject = TestPrincipalUtils.createTestSubject(principalName);
- final String message = Subject.doAs(subject, new PrivilegedAction<String>()
- {
- public String run()
- {
- return getAmqpActor().getLogMessage();
- }
- });
-
- assertEquals("Unexpected log message", "[mng:" + principalName + "(" + IP + ")] ", message);
- }
-}
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/QueueActorTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/QueueActorTest.java
deleted file mode 100644
index b371bf6e2f..0000000000
--- a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/QueueActorTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.logging.actors;
-
-import java.util.List;
-
-import org.apache.qpid.server.util.BrokerTestHelper;
-
-public class QueueActorTest extends BaseConnectionActorTestCase
-{
-
- @Override
- public void setUp() throws Exception
- {
- super.setUp();
- setAmqpActor(new QueueActor(BrokerTestHelper.createQueue(getName(), getVirtualHost()), getRootLogger()));
- }
-
- /**
- * Test the QueueActor as a logger.
- *
- * The test logs a message then verifies that it entered the logs correctly
- *
- * The log message should be fully replaced (no '{n}' values) and should
- * contain the correct queue identification.
- */
- public void testQueueActor()
- {
- final String message = sendTestLogMessage(getAmqpActor());
-
- List<Object> logs = getRawLogger().getLogMessages();
-
- assertEquals("Message log size not as expected.", 1, logs.size());
-
- String log = logs.get(0).toString();
-
- // Verify that the logged message is present in the output
- assertTrue("Message was not found in log message",
- log.contains(message));
-
- // Verify that all the values were presented to the MessageFormatter
- // so we will not end up with '{n}' entries in the log.
- assertFalse("Verify that the string does not contain any '{':" + log,
- log.contains("{"));
-
- // Verify that the message has the correct type
- assertTrue("Message contains the [vh: prefix:" + log,
- log.contains("[vh("));
-
- // Verify that the logged message contains the 'qu(' marker
- String expected = "qu(" + getName() + ")";
- assertTrue("Message was not logged with a queue identifier '"+expected+"' actual:" + log,
- log.contains(expected));
- }
-
-}
-
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/AbstractTestMessages.java b/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/AbstractTestMessages.java
index 229d75c69f..5eb4d2f1b0 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/AbstractTestMessages.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/AbstractTestMessages.java
@@ -23,11 +23,10 @@ package org.apache.qpid.server.logging.messages;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.logging.UnitTestMessageLogger;
-import org.apache.qpid.server.logging.actors.TestLogActor;
import org.apache.qpid.server.logging.subjects.TestBlankSubject;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -38,7 +37,6 @@ public abstract class AbstractTestMessages extends QpidTestCase
{
protected Configuration _config = new PropertiesConfiguration();
protected LogMessage _logMessage = null;
- protected LogActor _actor;
protected UnitTestMessageLogger _logger;
protected LogSubject _logSubject = new TestBlankSubject();
@@ -47,10 +45,9 @@ public abstract class AbstractTestMessages extends QpidTestCase
{
super.setUp();
- _logger = new UnitTestMessageLogger();
-
- _actor = new TestLogActor(_logger);
BrokerTestHelper.setUp();
+ _logger = new UnitTestMessageLogger();
+ SystemLog.setRootMessageLogger(_logger);
}
@Override
@@ -60,14 +57,24 @@ public abstract class AbstractTestMessages extends QpidTestCase
super.tearDown();
}
+ protected List<Object> getLog()
+ {
+ return _logger.getLogMessages();
+ }
+
+ protected void clearLog()
+ {
+ _logger.clearLogMessages();
+ }
+
+
protected List<Object> performLog()
{
if (_logMessage == null)
{
throw new NullPointerException("LogMessage has not been set");
}
-
- _actor.message(_logSubject, _logMessage);
+ SystemLog.message(_logSubject, _logMessage);
return _logger.getLogMessages();
}
@@ -77,6 +84,16 @@ public abstract class AbstractTestMessages extends QpidTestCase
validateLogMessage(logs, tag, false, expected);
}
+ protected void validateLogMessageNoSubject(List<Object> logs, String tag, String[] expected)
+ {
+ validateLogMessage(logs, tag, null, false, expected);
+ }
+
+ protected void validateLogMessageNoSubject(List<Object> logs, String tag, boolean useStringForNull, String[] expected)
+ {
+ validateLogMessage(logs, tag, null, useStringForNull, expected);
+ }
+
/**
* Validate that only a single log message occurred and that the message
* section starts with the specified tag
@@ -88,6 +105,11 @@ public abstract class AbstractTestMessages extends QpidTestCase
*/
protected void validateLogMessage(List<Object> logs, String tag, boolean useStringForNull, String[] expected)
{
+ validateLogMessage(logs, tag, _logSubject, useStringForNull, expected);
+ }
+
+ protected void validateLogMessage(List<Object> logs, String tag, LogSubject subject, boolean useStringForNull, String[] expected)
+ {
assertEquals("Log has incorrect message count", 1, logs.size());
//We trim() here as we don't care about extra white space at the end of the log message
@@ -98,11 +120,22 @@ public abstract class AbstractTestMessages extends QpidTestCase
// Simple switch to print out all the logged messages
//System.err.println(log);
- int msgIndex = log.indexOf(_logSubject.toLogString())+_logSubject.toLogString().length();
+ int msgIndex;
+ String message;
+ if(subject != null)
+ {
+ final String subjectString = subject.toLogString();
+ msgIndex = log.indexOf(subjectString)+ subjectString.length();
- assertTrue("Unable to locate Subject:" + log, msgIndex != -1);
+ assertTrue("Unable to locate Subject:" + log, msgIndex != -1);
- String message = log.substring(msgIndex);
+ message = log.substring(msgIndex);
+ }
+ else
+ {
+ msgIndex = log.indexOf(tag);
+ message = log.substring(msgIndex);
+ }
assertTrue("Message does not start with tag:" + tag + ":" + message,
message.startsWith(tag));
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/ExchangeMessagesTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/ExchangeMessagesTest.java
index d970a1f732..f3982f9be3 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/ExchangeMessagesTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/ExchangeMessagesTest.java
@@ -32,32 +32,32 @@ public class ExchangeMessagesTest extends AbstractTestMessages
{
public void testExchangeCreated_Transient() throws Exception
{
- ExchangeImpl exchange = BrokerTestHelper.createExchange("test");
+ ExchangeImpl exchange = BrokerTestHelper.createExchange("test", false);
String type = exchange.getTypeName();
String name = exchange.getName();
_logMessage = ExchangeMessages.CREATED(type, name, false);
- List<Object> log = performLog();
+ List<Object> log = getLog();
String[] expected = {"Create :", "Type:", type, "Name:", name};
- validateLogMessage(log, "EXH-1001", expected);
+ validateLogMessageNoSubject(log, "EXH-1001", expected);
}
public void testExchangeCreated_Persistent() throws Exception
{
- ExchangeImpl exchange = BrokerTestHelper.createExchange("test");
+ ExchangeImpl exchange = BrokerTestHelper.createExchange("test", true);
String type = exchange.getTypeName();
String name = exchange.getName();
_logMessage = ExchangeMessages.CREATED(type, name, true);
- List<Object> log = performLog();
+ List<Object> log = getLog();
String[] expected = {"Create :", "Durable", "Type:", type, "Name:", name};
- validateLogMessage(log, "EXH-1001", expected);
+ validateLogMessageNoSubject(log, "EXH-1001", expected);
}
public void testExchangeDeleted()
@@ -72,11 +72,11 @@ public class ExchangeMessagesTest extends AbstractTestMessages
public void testExchangeDiscardedMessage() throws Exception
{
- ExchangeImpl exchange = BrokerTestHelper.createExchange("test");
+ ExchangeImpl exchange = BrokerTestHelper.createExchange("test", false);
final String name = exchange.getName();
final String routingKey = "routingKey";
-
+ clearLog();
_logMessage = ExchangeMessages.DISCARDMSG(name, routingKey);
List<Object> log = performLog();
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java b/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java
index e7366dfe65..fd976e252e 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java
@@ -22,12 +22,10 @@ package org.apache.qpid.server.logging.subjects;
import org.apache.qpid.server.exchange.ExchangeImpl;
-import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.logging.UnitTestMessageLogger;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.TestLogActor;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -61,14 +59,8 @@ public abstract class AbstractTestLogSubject extends QpidTestCase
public void tearDown() throws Exception
{
BrokerTestHelper.tearDown();
- try
- {
- CurrentActor.removeAll();
- }
- finally
- {
- super.tearDown();
- }
+ super.tearDown();
+
}
protected List<Object> performLog(boolean statusUpdatesEnabled)
@@ -79,10 +71,9 @@ public abstract class AbstractTestLogSubject extends QpidTestCase
}
UnitTestMessageLogger logger = new UnitTestMessageLogger(statusUpdatesEnabled);
+ SystemLog.setRootMessageLogger(logger);
- LogActor actor = new TestLogActor(logger);
-
- actor.message(_subject, new LogMessage()
+ SystemLog.message(_subject, new LogMessage()
{
public String toString()
{
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
index 46a5ab9a3c..b8ecc4a2c0 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
@@ -28,15 +28,10 @@ import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
-import junit.framework.TestCase;
-
import org.apache.qpid.server.configuration.ConfigurationEntry;
import org.apache.qpid.server.configuration.RecovererProvider;
import org.apache.qpid.server.configuration.startup.VirtualHostRecoverer;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
-import org.apache.qpid.server.logging.SystemOutMessageLogger;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.TestLogActor;
import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.store.TestMemoryMessageStore;
import org.apache.qpid.server.util.BrokerTestHelper;
@@ -54,7 +49,6 @@ public class VirtualHostTest extends QpidTestCase
protected void setUp() throws Exception
{
super.setUp();
- CurrentActor.set(new TestLogActor(new SystemOutMessageLogger()));
_broker = BrokerTestHelper.createBrokerMock();
TaskExecutor taskExecutor = mock(TaskExecutor.class);
@@ -65,13 +59,6 @@ public class VirtualHostTest extends QpidTestCase
_statisticsGatherer = mock(StatisticsGatherer.class);
}
- @Override
- protected void tearDown() throws Exception
- {
- super.tearDown();
- CurrentActor.remove();
- }
-
public void testInitialisingState()
{
VirtualHost host = createHost();
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
index 64bcf8f730..4302e4a83c 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
@@ -37,9 +37,6 @@ import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.DefaultExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeImpl;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.RootMessageLogger;
-import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.plugin.ExchangeType;
@@ -74,10 +71,6 @@ public class AMQQueueFactoryTest extends QpidTestCase
when(_virtualHost.getConfiguration()).thenReturn(vhostConfig);
_queueConfiguration = mock(QueueConfiguration.class);
when(vhostConfig.getQueueConfiguration(anyString())).thenReturn(_queueConfiguration);
- LogActor logActor = mock(LogActor.class);
- CurrentActor.set(logActor);
- RootMessageLogger rootLogger = mock(RootMessageLogger.class);
- when(logActor.getRootMessageLogger()).thenReturn(rootLogger);
DurableConfigurationStore store = mock(DurableConfigurationStore.class);
when(_virtualHost.getDurableConfigurationStore()).thenReturn(store);
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
index 0657c8cdad..d5639c1de8 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
@@ -20,9 +20,6 @@ package org.apache.qpid.server.queue;
import junit.framework.TestCase;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.RootMessageLogger;
-import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
@@ -64,15 +61,6 @@ public abstract class QueueEntryImplTestBase extends TestCase
_queueEntry3 = getQueueEntryImpl(3);
}
-
- protected void mockLogging()
- {
- final LogActor logActor = mock(LogActor.class);
- when(logActor.getRootMessageLogger()).thenReturn(mock(RootMessageLogger.class));
- CurrentActor.setDefault(logActor);
- }
-
-
public void testAcquire()
{
assertTrue("Queue entry should be in AVAILABLE state before invoking of acquire method",
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
index 9039654c15..29413dd1a7 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
@@ -41,7 +41,6 @@ public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase
@Override
public void setUp() throws Exception
{
- mockLogging();
Map<String,Object> queueAttributes = new HashMap<String, Object>();
queueAttributes.put(Queue.ID, UUID.randomUUID());
queueAttributes.put(Queue.NAME, "SimpleQueueEntryImplTest");
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
index ef64226048..792e656a97 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
@@ -20,15 +20,12 @@
package org.apache.qpid.server.queue;
import java.util.Collections;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.RootMessageLogger;
-import org.apache.qpid.server.logging.actors.CurrentActor;
+
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.security.*;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Arrays;
@@ -77,8 +74,6 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
@Override
protected void setUp() throws Exception
{
- mockLogging();
-
Map<String,Object> attributes = new HashMap<String,Object>();
attributes.put(Queue.ID,UUID.randomUUID());
attributes.put(Queue.NAME, getName());
@@ -117,14 +112,6 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
}
- protected void mockLogging()
- {
- final LogActor logActor = mock(LogActor.class);
- when(logActor.getRootMessageLogger()).thenReturn(mock(RootMessageLogger.class));
- CurrentActor.setDefault(logActor);
- }
-
-
@Override
public SortedQueueEntryList getTestList()
{
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
index c1d5a9e9fe..e45a7667cd 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
@@ -46,8 +46,6 @@ public class SortedQueueEntryTest extends QueueEntryImplTestBase
@Override
public void setUp() throws Exception
{
- mockLogging();
-
Map<String,Object> attributes = new HashMap<String,Object>();
attributes.put(Queue.ID,UUID.randomUUID());
attributes.put(Queue.NAME, getName());
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
index c03727087a..e3e1a3f659 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
@@ -20,13 +20,9 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.RootMessageLogger;
-import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.security.*;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -57,9 +53,6 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase
queueAttributes.put(Queue.NAME, getName());
final VirtualHost virtualHost = mock(VirtualHost.class);
when(virtualHost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class));
- final LogActor logActor = mock(LogActor.class);
- when(logActor.getRootMessageLogger()).thenReturn(mock(RootMessageLogger.class));
- CurrentActor.set(logActor);
_testQueue = new StandardQueue(virtualHost, queueAttributes);
_sqel = (StandardQueueEntryList) _testQueue.getEntries();
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
index 613dd07741..0ad56f3061 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
@@ -40,8 +40,6 @@ import org.apache.commons.configuration.Configuration;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.exchange.ExchangeImpl;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.LifetimePolicy;
@@ -95,7 +93,6 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
_queueId = UUIDGenerator.generateRandomUUID();
_exchangeId = UUIDGenerator.generateRandomUUID();
- CurrentActor.set(mock(LogActor.class));
_storeName = getName();
_storePath = TMP_FOLDER + File.separator + _storeName;
FileUtils.delete(new File(_storePath), true);
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/store/OperationalLoggingListenerTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/store/OperationalLoggingListenerTest.java
index c309dad5eb..20a718bd23 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/store/OperationalLoggingListenerTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/store/OperationalLoggingListenerTest.java
@@ -19,11 +19,10 @@ package org.apache.qpid.server.store;
import java.util.ArrayList;
import java.util.List;
import junit.framework.TestCase;
-import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.RootMessageLogger;
-import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
import org.apache.qpid.server.logging.messages.MessageStoreMessages;
import org.apache.qpid.server.logging.messages.TransactionLogMessages;
@@ -65,7 +64,7 @@ public class OperationalLoggingListenerTest extends TestCase
{
final List<LogMessage> messages = new ArrayList<LogMessage>();
- CurrentActor.set(new TestActor(messages));
+ SystemLog.setRootMessageLogger(new TestRootLogger(messages));
if(setStoreLocation)
{
@@ -103,14 +102,6 @@ public class OperationalLoggingListenerTest extends TestCase
assertEquals(messages.remove(0).toString(), MessageStoreMessages.CLOSED().toString());
}
- @Override
- protected void tearDown() throws Exception
- {
- super.tearDown();
- CurrentActor.remove();
- }
-
-
private static final LogSubject LOG_SUBJECT = new LogSubject()
{
public String toLogString()
@@ -155,11 +146,11 @@ public class OperationalLoggingListenerTest extends TestCase
}
}
- private static class TestActor implements LogActor
+ private static class TestRootLogger implements RootMessageLogger
{
private final List<LogMessage> _messages;
- public TestActor(List<LogMessage> messages)
+ private TestRootLogger(final List<LogMessage> messages)
{
_messages = messages;
}
@@ -169,19 +160,23 @@ public class OperationalLoggingListenerTest extends TestCase
_messages.add(message);
}
- public void message(LogMessage message)
+ @Override
+ public boolean isEnabled()
{
- _messages.add(message);
+ return true;
}
- public RootMessageLogger getRootMessageLogger()
+ @Override
+ public boolean isMessageEnabled(final String logHierarchy)
{
- return null;
+ return true;
}
- public String getLogMessage()
+ public void message(LogMessage message)
{
- return null;
+ _messages.add(message);
}
+
}
+
}
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
index ea8b492552..f93036142c 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
@@ -38,11 +38,6 @@ import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.configuration.store.JsonConfigurationEntryStore;
import org.apache.qpid.server.exchange.DefaultExchangeFactory;
-import org.apache.qpid.server.logging.RootMessageLogger;
-import org.apache.qpid.server.logging.SystemOutMessageLogger;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.GenericActor;
-import org.apache.qpid.server.logging.actors.TestLogActor;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AMQQueue;
@@ -72,22 +67,17 @@ public class BrokerTestHelper
when(broker.getAttribute(Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD)).thenReturn(10000l);
when(broker.getId()).thenReturn(UUID.randomUUID());
when(broker.getSubjectCreator(any(SocketAddress.class))).thenReturn(subjectCreator);
- RootMessageLogger rootMessageLogger = CurrentActor.get().getRootMessageLogger();
- when(broker.getRootMessageLogger()).thenReturn(rootMessageLogger);
when(broker.getVirtualHostRegistry()).thenReturn(new VirtualHostRegistry());
when(broker.getSecurityManager()).thenReturn(new SecurityManager(mock(Broker.class), false));
- GenericActor.setDefaultMessageLogger(rootMessageLogger);
return broker;
}
public static void setUp()
{
- CurrentActor.set(new TestLogActor(new SystemOutMessageLogger()));
}
public static void tearDown()
{
- CurrentActor.remove();
}
public static VirtualHost createVirtualHost(VirtualHostConfiguration virtualHostConfiguration, VirtualHostRegistry virtualHostRegistry)
@@ -170,7 +160,7 @@ public class BrokerTestHelper
return connection;
}
- public static ExchangeImpl createExchange(String hostName) throws Exception
+ public static ExchangeImpl createExchange(String hostName, final boolean durable) throws Exception
{
SecurityManager securityManager = new SecurityManager(mock(Broker.class), false);
VirtualHost virtualHost = mock(VirtualHost.class);
@@ -181,7 +171,7 @@ public class BrokerTestHelper
attributes.put(org.apache.qpid.server.model.Exchange.ID, UUIDGenerator.generateExchangeUUID("amp.direct", virtualHost.getName()));
attributes.put(org.apache.qpid.server.model.Exchange.NAME, "amq.direct");
attributes.put(org.apache.qpid.server.model.Exchange.TYPE, "direct");
- attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, false);
+ attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, durable);
return factory.createExchange(attributes);
}
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
index 1eaccc4e5f..5455f03ebd 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
@@ -36,8 +36,6 @@ import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.exchange.HeadersExchange;
import org.apache.qpid.server.exchange.TopicExchange;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.plugin.ExchangeType;
@@ -185,7 +183,6 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
_store = mock(DurableConfigurationStore.class);
- CurrentActor.set(mock(LogActor.class));
}
public void testUpgradeEmptyStore() throws Exception
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/HouseKeepingTaskTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/HouseKeepingTaskTest.java
index 8b4a52bb79..7d5a24f1aa 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/HouseKeepingTaskTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/HouseKeepingTaskTest.java
@@ -20,56 +20,13 @@
*/
package org.apache.qpid.server.virtualhost;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.NullRootMessageLogger;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.TestLogActor;
import org.apache.qpid.test.utils.QpidTestCase;
-import java.util.concurrent.CountDownLatch;
-
public class HouseKeepingTaskTest extends QpidTestCase
{
- /**
- * Tests that the abstract HouseKeepingTask properly cleans up any LogActor
- * it adds to the CurrentActor stack by verifying the CurrentActor set
- * before task execution is the CurrentActor after execution.
- */
- public void testCurrentActorStackBalance() throws Exception
- {
- //create and set a test actor
- LogActor testActor = new TestLogActor(new NullRootMessageLogger());
- CurrentActor.set(testActor);
-
- //verify it is returned correctly before executing a HouseKeepingTask
- assertEquals("Expected LogActor was not returned", testActor, CurrentActor.get());
-
- final CountDownLatch latch = new CountDownLatch(1);
- HouseKeepingTask testTask = new HouseKeepingTask(new MockVirtualHost("HouseKeepingTaskTestVhost"))
- {
- @Override
- public void execute()
- {
- latch.countDown();
- }
- };
-
- //run the test HouseKeepingTask using the current Thread to influence its CurrentActor stack
- testTask.run();
-
- assertEquals("The expected LogActor was not returned, the CurrentActor stack has become unbalanced",
- testActor, CurrentActor.get());
- assertEquals("HouseKeepingTask execute() method was not run", 0, latch.getCount());
-
- //clean up the test actor
- CurrentActor.remove();
- }
public void testThreadNameIsSetForDurationOfTask() throws Exception
{
- //create and set a test actor
- LogActor testActor = new TestLogActor(new NullRootMessageLogger());
- CurrentActor.set(testActor);
String originalThreadName = Thread.currentThread().getName();
@@ -84,8 +41,6 @@ public class HouseKeepingTaskTest extends QpidTestCase
assertEquals("Thread name should have been set during execution", expectedThreadNameDuringExecution, testTask.getThreadNameDuringExecution());
assertEquals("Thread name should have been reverted after task has run", originalThreadName, Thread.currentThread().getName());
- //clean up the test actor
- CurrentActor.remove();
}
diff --git a/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/RuleSet.java b/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/RuleSet.java
index fcb5bcbf70..58f3c71c7c 100644
--- a/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/RuleSet.java
+++ b/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/RuleSet.java
@@ -38,7 +38,7 @@ import javax.security.auth.Subject;
import org.apache.commons.lang.BooleanUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.logging.messages.AccessControlMessages;
import org.apache.qpid.server.security.Result;
import org.apache.qpid.server.security.access.ObjectProperties;
@@ -321,14 +321,14 @@ public class RuleSet
switch (permission)
{
case ALLOW_LOG:
- CurrentActor.get().message(AccessControlMessages.ALLOWED(
+ SystemLog.message(AccessControlMessages.ALLOWED(
action.getOperation().toString(),
action.getObjectType().toString(),
action.getProperties().toString()));
case ALLOW:
return Result.ALLOWED;
case DENY_LOG:
- CurrentActor.get().message(AccessControlMessages.DENIED(
+ SystemLog.message(AccessControlMessages.DENIED(
action.getOperation().toString(),
action.getObjectType().toString(),
action.getProperties().toString()));
diff --git a/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControlTest.java b/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControlTest.java
index e907a88001..f5f1866c3a 100644
--- a/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControlTest.java
+++ b/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControlTest.java
@@ -33,9 +33,8 @@ import junit.framework.TestCase;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.qpid.server.connection.ConnectionPrincipal;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.logging.UnitTestMessageLogger;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.TestLogActor;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.security.Result;
import org.apache.qpid.server.security.access.ObjectProperties;
@@ -67,7 +66,7 @@ public class DefaultAccessControlTest extends TestCase
private void configureAccessControl(final RuleSet rs) throws ConfigurationException
{
_plugin = new DefaultAccessControl(rs);
- CurrentActor.set(new TestLogActor(messageLogger));
+ SystemLog.setRootMessageLogger(messageLogger);
}
private RuleSet createGroupRuleSet()
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index 120ba2d951..d41af30a09 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -22,8 +22,7 @@ package org.apache.qpid.server.protocol.v0_10;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.flow.FlowCreditManager;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
@@ -387,7 +386,6 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
protected void sendToDLQOrDiscard(MessageInstance entry)
{
- final LogActor logActor = CurrentActor.get();
final ServerMessage msg = entry.getMessage();
int requeues = entry.routeToAlternate(new Action<MessageInstance>()
@@ -395,7 +393,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
@Override
public void performAction(final MessageInstance requeueEntry)
{
- logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
+ SystemLog.message(ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
requeueEntry.getOwningResource().getName()));
}
}, null);
@@ -410,12 +408,12 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
if(alternateExchange != null)
{
- logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(),
+ SystemLog.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(),
alternateExchange.getName()));
}
else
{
- logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(),
+ SystemLog.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(),
queue.getName(),
msg.getInitialRoutingAddress()));
}
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
index 9fe1babe20..9346e3e7bb 100755
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.protocol.v0_10;
import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
@@ -30,8 +31,11 @@ import org.apache.qpid.transport.network.Disassembler;
import org.apache.qpid.transport.network.InputHandler;
import org.apache.qpid.transport.network.NetworkConnection;
+import javax.security.auth.Subject;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocolEngine
@@ -63,15 +67,31 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
}
}
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ public void setNetworkConnection(final NetworkConnection network, final Sender<ByteBuffer> sender)
{
- _network = network;
+ if(!getSubject().equals(Subject.getSubject(AccessController.getContext())))
+ {
+ Subject.doAs(getSubject(), new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ setNetworkConnection(network,sender);
+ return null;
+ }
+ });
+ }
+ else
+ {
+ SystemLog.message(ConnectionMessages.OPEN(null, null, null, null, false, false, false, false));
+ _network = network;
+
+ _connection.setNetworkConnection(network);
+ _connection.setSender(new Disassembler(wrapSender(sender), MAX_FRAME_SIZE));
+ // FIXME Two log messages to maintain compatibility with earlier protocol versions
+ SystemLog.message(ConnectionMessages.OPEN(null, "0-10", null, null, false, true, false, false));
- _connection.setNetworkConnection(network);
- _connection.setSender(new Disassembler(wrapSender(sender), MAX_FRAME_SIZE));
- // FIXME Two log messages to maintain compatibility with earlier protocol versions
- _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, null, null, false, false, false, false));
- _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", null, null, false, true, false, false));
+ }
}
private Sender<ByteBuffer> wrapSender(final Sender<ByteBuffer> sender)
@@ -155,8 +175,17 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
public void readerIdle()
{
- _connection.getLogActor().message(ConnectionMessages.IDLE_CLOSE());
- _network.close();
+ Subject.doAs(_connection.getAuthorizedSubject(), new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ SystemLog.message(ConnectionMessages.IDLE_CLOSE());
+ _network.close();
+ return null;
+ }
+ });
+
}
public String getAddress()
@@ -189,4 +218,10 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
{
return _connection.getConnectionId();
}
+
+ @Override
+ public Subject getSubject()
+ {
+ return _connection.getAuthorizedSubject();
+ }
}
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index 5bfc398bcf..5096223889 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -32,10 +32,8 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.Subject;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.connection.ConnectionPrincipal;
-import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.AMQPConnectionActor;
-import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Port;
@@ -65,7 +63,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
{
private Runnable _onOpenTask;
private AtomicBoolean _logClosed = new AtomicBoolean(false);
- private LogActor _actor;
private final Subject _authorizedSubject = new Subject();
private Principal _authorizedPrincipal = null;
@@ -87,7 +84,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
{
_connectionId = connectionId;
_authorizedSubject.getPrincipals().add(new ConnectionPrincipal(this));
- _actor = new AMQPConnectionActor(this, broker.getRootMessageLogger());
}
public Object getReference()
@@ -112,7 +108,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
{
_onOpenTask.run();
}
- _actor.message(ConnectionMessages.OPEN(getClientId(), "0-10", getClientVersion(), getClientProduct(), true, true, true, true));
+ SystemLog.message(ConnectionMessages.OPEN(getClientId(), "0-10", getClientVersion(), getClientProduct(), true, true, true, true));
getVirtualHost().getConnectionRegistry().registerConnection(this);
}
@@ -135,7 +131,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
{
if(_logClosed.compareAndSet(false, true))
{
- CurrentActor.get().message(this, ConnectionMessages.CLOSE());
+ SystemLog.message(this, ConnectionMessages.CLOSE());
}
}
@@ -258,42 +254,31 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
Subject subject;
if (event.isConnectionControl())
{
- CurrentActor.set(_actor);
subject = _authorizedSubject;
}
else
{
ServerSession channel = (ServerSession) getSession(event.getChannel());
- LogActor channelActor = null;
if (channel != null)
{
subject = channel.getAuthorizedSubject();
- channelActor = channel.getLogActor();
}
else
{
subject = _authorizedSubject;
}
-
- CurrentActor.set(channelActor == null ? _actor : channelActor);
}
- try
+ Subject.doAs(subject, new PrivilegedAction<Void>()
{
- Subject.doAs(subject, new PrivilegedAction<Void>()
+ @Override
+ public Void run()
{
- @Override
- public Void run()
- {
- ServerConnection.super.received(event);
- return null;
- }
- });
- }
- finally
- {
- CurrentActor.remove();
- }
+ ServerConnection.super.received(event);
+ return null;
+ }
+ });
+
}
public String toLogString()
@@ -331,11 +316,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
}
}
- public LogActor getLogActor()
- {
- return _actor;
- }
-
public void close(AMQConstant cause, String message)
{
closeSubscriptions();
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 5627b2eabe..453b3f85a5 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -20,7 +20,9 @@
*/
package org.apache.qpid.server.protocol.v0_10;
+import java.security.AccessController;
import java.security.Principal;
+import java.security.PrivilegedAction;
import java.text.MessageFormat;
import java.util.Collection;
import java.util.Collections;
@@ -43,15 +45,13 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
import org.apache.qpid.server.connection.SessionPrincipal;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.TransactionTimeoutHelper;
import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
-import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
@@ -102,7 +102,6 @@ public class ServerSession extends Session
private final UUID _id = UUID.randomUUID();
private final Subject _subject = new Subject();
private long _createTime = System.currentTimeMillis();
- private LogActor _actor = GenericActor.getInstance(this);
private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>());
@@ -161,18 +160,44 @@ public class ServerSession extends Session
});
}
- protected void setState(State state)
+ protected void setState(final State state)
{
- super.setState(state);
-
- if (state == State.OPEN)
+ if(runningAsSubject())
{
- _actor.message(ChannelMessages.CREATE());
- if(_blocking.get())
+ super.setState(state);
+
+ if (state == State.OPEN)
{
- invokeBlock();
+ SystemLog.message(ChannelMessages.CREATE());
+ if(_blocking.get())
+ {
+ invokeBlock();
+ }
}
}
+ else
+ {
+ runAsSubject(new PrivilegedAction<Void>() {
+
+ @Override
+ public Void run()
+ {
+ setState(state);
+ return null;
+ }
+ });
+
+ }
+ }
+
+ private <T> T runAsSubject(final PrivilegedAction<T> privilegedAction)
+ {
+ return Subject.doAs(getAuthorizedSubject(), privilegedAction);
+ }
+
+ private boolean runningAsSubject()
+ {
+ return getAuthorizedSubject().equals(Subject.getSubject(AccessController.getContext()));
}
private void invokeBlock()
@@ -394,7 +419,7 @@ public class ServerSession extends Session
{
operationalLoggingMessage = ChannelMessages.CLOSE();
}
- CurrentActor.get().message(getLogSubject(), operationalLoggingMessage);
+ SystemLog.message(getLogSubject(), operationalLoggingMessage);
}
@Override
@@ -678,14 +703,10 @@ public class ServerSession extends Session
return (ServerConnection) super.getConnection();
}
- public LogActor getLogActor()
- {
- return _actor;
- }
public LogSubject getLogSubject()
{
- return (LogSubject) this;
+ return this;
}
public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose)
@@ -704,7 +725,7 @@ public class ServerSession extends Session
}
- private void block(Object queue, String name)
+ private void block(final Object queue, final String name)
{
synchronized (_blockingEntities)
{
@@ -717,7 +738,7 @@ public class ServerSession extends Session
{
invokeBlock();
}
- _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED(name));
+ SystemLog.message(_logSubject, ChannelMessages.FLOW_ENFORCED(name));
}
@@ -735,25 +756,22 @@ public class ServerSession extends Session
unblock(this);
}
- private void unblock(Object queue)
+ private void unblock(final Object queue)
{
- synchronized(_blockingEntities)
+ if(_blockingEntities.remove(queue) && _blockingEntities.isEmpty())
{
- if(_blockingEntities.remove(queue) && _blockingEntities.isEmpty())
+ if(_blocking.compareAndSet(true,false) && !isClosing())
{
- if(_blocking.compareAndSet(true,false) && !isClosing())
- {
- _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED());
- MessageFlow mf = new MessageFlow();
- mf.setUnit(MessageCreditUnit.MESSAGE);
- mf.setDestination("");
- _outstandingCredit.set(Integer.MAX_VALUE);
- mf.setValue(Integer.MAX_VALUE);
- invoke(mf);
+ SystemLog.message(_logSubject, ChannelMessages.FLOW_REMOVED());
+ MessageFlow mf = new MessageFlow();
+ mf.setUnit(MessageCreditUnit.MESSAGE);
+ mf.setDestination("");
+ _outstandingCredit.set(Integer.MAX_VALUE);
+ mf.setValue(Integer.MAX_VALUE);
+ invoke(mf);
- }
}
}
}
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 9d7764414f..d21da824e9 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -29,6 +29,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.store.StoreException;
@@ -345,7 +346,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
else
{
- serverSession.getLogActor().message(ExchangeMessages.DISCARDMSG(exchange.getName(), messageMetaData.getRoutingKey()));
+ SystemLog.message(ExchangeMessages.DISCARDMSG(exchange.getName(), messageMetaData.getRoutingKey()));
}
}
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java b/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
index 421adb33a8..858482a034 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
@@ -19,8 +19,6 @@
package org.apache.qpid.server.protocol.v0_10;
import org.apache.qpid.server.logging.RootMessageLogger;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -41,7 +39,6 @@ public class ServerSessionTest extends QpidTestCase
super.setUp();
BrokerTestHelper.setUp();
_virtualHost = BrokerTestHelper.createVirtualHost(getName());
- GenericActor.setDefaultMessageLogger(CurrentActor.get().getRootMessageLogger());
}
@Override
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 99068a9d6c..e248fc539a 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v0_8;
import java.nio.ByteBuffer;
import java.security.AccessControlException;
+import java.security.PrivilegedAction;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -54,11 +55,9 @@ import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.filter.SimpleFilterManager;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
-import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.AMQPChannelActor;
-import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
@@ -154,7 +153,6 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
private final AtomicBoolean _blocking = new AtomicBoolean(false);
- private LogActor _actor;
private LogSubject _logSubject;
private volatile boolean _rollingBack;
@@ -178,19 +176,17 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
private Subject _subject;
- public AMQChannel(T session, int channelId, MessageStore messageStore)
+ public AMQChannel(T session, int channelId, final MessageStore messageStore)
throws AMQException
{
_session = session;
_channelId = channelId;
- _actor = new AMQPChannelActor(this, session.getLogActor().getRootMessageLogger());
_subject = new Subject(false, session.getAuthorizedSubject().getPrincipals(),
session.getAuthorizedSubject().getPublicCredentials(),
session.getAuthorizedSubject().getPrivateCredentials());
_subject.getPrincipals().add(new SessionPrincipal(this));
_logSubject = new ChannelLogSubject(this);
- _actor.message(ChannelMessages.CREATE());
_messageStore = messageStore;
@@ -214,6 +210,18 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
}
}
});
+
+ Subject.doAs(_subject, new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ SystemLog.message(ChannelMessages.CREATE());
+
+ return null;
+ }
+ });
+
}
/** Sets this channel to be part of a local transaction */
@@ -449,12 +457,13 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
}
else
{
- _actor.message(ExchangeMessages.DISCARDMSG(_currentMessage.getExchangeName().asString(),
- _currentMessage.getMessagePublishInfo().getRoutingKey() == null
- ? null
- : _currentMessage.getMessagePublishInfo()
- .getRoutingKey()
- .toString()));
+ SystemLog.message(ExchangeMessages.DISCARDMSG(_currentMessage.getExchangeName().asString(),
+ _currentMessage.getMessagePublishInfo().getRoutingKey()
+ == null
+ ? null
+ : _currentMessage.getMessagePublishInfo()
+ .getRoutingKey()
+ .toString()));
}
}
@@ -684,7 +693,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
LogMessage operationalLogMessage = cause == null ?
ChannelMessages.CLOSE() :
ChannelMessages.CLOSE_FORCED(cause.getCode(), message);
- CurrentActor.get().message(_logSubject, operationalLogMessage);
+ SystemLog.message(_logSubject, operationalLogMessage);
unsubscribeAllConsumers();
@@ -977,7 +986,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
// Log Flow Started before we start the subscriptions
if (!suspended)
{
- _actor.message(_logSubject, ChannelMessages.FLOW("Started"));
+ SystemLog.message(_logSubject, ChannelMessages.FLOW("Started"));
}
@@ -1028,7 +1037,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
// stopped.
if (suspended)
{
- _actor.message(_logSubject, ChannelMessages.FLOW("Stopped"));
+ SystemLog.message(_logSubject, ChannelMessages.FLOW("Stopped"));
}
}
@@ -1174,7 +1183,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
public void setCredit(final long prefetchSize, final int prefetchCount)
{
- _actor.message(ChannelMessages.PREFETCH_SIZE(prefetchSize, prefetchCount));
+ SystemLog.message(ChannelMessages.PREFETCH_SIZE(prefetchSize, prefetchCount));
_creditManager.setCreditLimits(prefetchSize, prefetchCount);
}
@@ -1431,19 +1440,13 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
}
}
-
- public LogActor getLogActor()
- {
- return _actor;
- }
-
public synchronized void block()
{
if(_blockingEntities.add(this))
{
if(_blocking.compareAndSet(false,true))
{
- _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED("** All Queues **"));
+ SystemLog.message(_logSubject, ChannelMessages.FLOW_ENFORCED("** All Queues **"));
flow(false);
}
}
@@ -1455,7 +1458,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
{
if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false))
{
- _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED());
+ SystemLog.message(_logSubject, ChannelMessages.FLOW_REMOVED());
flow(true);
}
@@ -1469,7 +1472,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
if(_blocking.compareAndSet(false,true))
{
- _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED(queue.getName()));
+ SystemLog.message(_logSubject, ChannelMessages.FLOW_ENFORCED(queue.getName()));
flow(false);
}
}
@@ -1481,8 +1484,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
{
if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false) && !isClosing())
{
- _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED());
-
+ SystemLog.message(_logSubject, ChannelMessages.FLOW_REMOVED());
flow(true);
}
}
@@ -1552,14 +1554,14 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
else
{
final ServerMessage msg = rejectedQueueEntry.getMessage();
- final Consumer sub = rejectedQueueEntry.getDeliveredConsumer();
+
int requeues = rejectedQueueEntry.routeToAlternate(new Action<MessageInstance>()
{
@Override
public void performAction(final MessageInstance requeueEntry)
{
- _actor.message( _logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
+ SystemLog.message( _logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
requeueEntry.getOwningResource().getName()));
}
}, null);
@@ -1577,7 +1579,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
if (altExchange == null)
{
_logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag);
- _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getInitialRoutingAddress()));
+ SystemLog.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getInitialRoutingAddress()));
}
else
@@ -1585,7 +1587,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
_logger.debug(
"Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: "
+ deliveryTag);
- _actor.message(_logSubject,
+ SystemLog.message(_logSubject,
ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName()));
}
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index 167a505c19..80c4a9fde7 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
+import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.util.ArrayList;
@@ -55,15 +56,12 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.connection.ConnectionPrincipal;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl;
-import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.AMQPConnectionActor;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.ManagementActor;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
import org.apache.qpid.server.model.Broker;
@@ -143,7 +141,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private final long _connectionID;
private Object _reference = new Object();
- private AMQPConnectionActor _actor;
private LogSubject _logSubject;
private long _lastIoTime;
@@ -172,7 +169,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private long _readBytes;
public AMQProtocolEngine(Broker broker,
- NetworkConnection network,
+ final NetworkConnection network,
final long connectionId,
Port port,
Transport transport)
@@ -184,21 +181,44 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
_receivedLock = new ReentrantLock();
_stateManager = new AMQStateManager(broker, this);
_codecFactory = new AMQCodecFactory(true, this);
-
- setNetworkConnection(network);
_connectionID = connectionId;
+ _logSubject = new ConnectionLogSubject(this);
- _actor = new AMQPConnectionActor(this, _broker.getRootMessageLogger());
_authorizedSubject.getPrincipals().add(new ConnectionPrincipal(this));
+ runAsSubject(new PrivilegedAction<Void>()
+ {
- _logSubject = new ConnectionLogSubject(this);
+ @Override
+ public Void run()
+ {
+ setNetworkConnection(network);
+
+ SystemLog.message(ConnectionMessages.OPEN(null, null, null, null, false, false, false, false));
+
+ _closeWhenNoRoute = (Boolean)_broker.getAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE);
- _actor.message(ConnectionMessages.OPEN(null, null, null, null, false, false, false, false));
+ initialiseStatistics();
- _closeWhenNoRoute = (Boolean)_broker.getAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE);
+ return null;
+ }
+ });
- initialiseStatistics();
+ }
+ private <T> T runAsSubject(PrivilegedAction<T> action)
+ {
+ return Subject.doAs(getAuthorizedSubject(), action);
+ }
+
+ private boolean runningAsSubject()
+ {
+ return getAuthorizedSubject().equals(Subject.getSubject(AccessController.getContext()));
+ }
+
+ @Override
+ public Subject getSubject()
+ {
+ return _authorizedSubject;
}
public void setNetworkConnection(NetworkConnection network)
@@ -217,11 +237,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
return _connectionID;
}
- public LogActor getLogActor()
- {
- return _actor;
- }
-
public void setMaxFrameSize(long frameMax)
{
_maxFrameSize = frameMax;
@@ -400,74 +415,57 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
AMQBody body = frame.getBodyFrame();
- //Look up the Channel's Actor and set that as the current actor
- // If that is not available then we can use the ConnectionActor
- // that is associated with this AMQMPSession.
- LogActor channelActor = null;
- if (amqChannel != null)
+ long startTime = 0;
+ String frameToString = null;
+ if (_logger.isDebugEnabled())
{
- channelActor = amqChannel.getLogActor();
+ startTime = System.currentTimeMillis();
+ frameToString = frame.toString();
+ _logger.debug("RECV: " + frame);
}
- CurrentActor.set(channelActor == null ? _actor : channelActor);
- try
+ // Check that this channel is not closing
+ if (channelAwaitingClosure(channelId))
{
- long startTime = 0;
- String frameToString = null;
- if (_logger.isDebugEnabled())
+ if ((frame.getBodyFrame() instanceof ChannelCloseOkBody))
{
- startTime = System.currentTimeMillis();
- frameToString = frame.toString();
- _logger.debug("RECV: " + frame);
- }
-
- // Check that this channel is not closing
- if (channelAwaitingClosure(channelId))
- {
- if ((frame.getBodyFrame() instanceof ChannelCloseOkBody))
- {
- if (_logger.isInfoEnabled())
- {
- _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok");
- }
- }
- else
+ if (_logger.isInfoEnabled())
{
- // The channel has been told to close, we don't process any more frames until
- // it's closed.
- return;
+ _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok");
}
}
-
- try
- {
- body.handle(channelId, this);
- }
- catch(AMQConnectionException e)
- {
- _logger.info(e.getMessage() + " whilst processing frame: " + body);
- closeConnection(channelId, e);
- throw e;
- }
- catch (AMQException e)
- {
- closeChannel(channelId, e.getErrorCode() == null ? AMQConstant.INTERNAL_ERROR : e.getErrorCode(), e.getMessage());
- throw e;
- }
- catch (TransportException e)
+ else
{
- closeChannel(channelId, AMQConstant.CHANNEL_ERROR, e.getMessage());
- throw e;
+ // The channel has been told to close, we don't process any more frames until
+ // it's closed.
+ return;
}
+ }
- if(_logger.isDebugEnabled())
- {
- _logger.debug("Frame handled in " + (System.currentTimeMillis() - startTime) + " ms. Frame: " + frameToString);
- }
+ try
+ {
+ body.handle(channelId, this);
}
- finally
+ catch(AMQConnectionException e)
+ {
+ _logger.info(e.getMessage() + " whilst processing frame: " + body);
+ closeConnection(channelId, e);
+ throw e;
+ }
+ catch (AMQException e)
{
- CurrentActor.remove();
+ closeChannel(channelId, e.getErrorCode() == null ? AMQConstant.INTERNAL_ERROR : e.getErrorCode(), e.getMessage());
+ throw e;
+ }
+ catch (TransportException e)
+ {
+ closeChannel(channelId, AMQConstant.CHANNEL_ERROR, e.getMessage());
+ throw e;
+ }
+
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Frame handled in " + (System.currentTimeMillis() - startTime) + " ms. Frame: " + frameToString);
}
}
@@ -478,7 +476,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
try
{
// Log incoming protocol negotiation request
- _actor.message(ConnectionMessages.OPEN(null, pi.getProtocolMajor() + "-" + pi.getProtocolMinor(), null, null, false, true, false, false));
+ SystemLog.message(ConnectionMessages.OPEN(null,
+ pi.getProtocolMajor() + "-" + pi.getProtocolMinor(),
+ null,
+ null,
+ false,
+ true,
+ false,
+ false));
ProtocolVersion pv = pi.checkVersion(); // Fails if not correct
@@ -778,22 +783,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
_maxNoOfChannels = value;
}
- public void commitTransactions(AMQChannel<AMQProtocolEngine> channel) throws AMQException
- {
- if ((channel != null) && channel.isTransactional())
- {
- channel.commit();
- }
- }
-
- public void rollbackTransactions(AMQChannel<AMQProtocolEngine> channel) throws AMQException
- {
- if ((channel != null) && channel.isTransactional())
- {
- channel.rollback();
- }
- }
-
/**
* Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue
* subscriptions (this may in turn remove queues if they are auto delete</li> </ul>
@@ -908,77 +897,89 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
@Override
public void closeSession()
{
- if(_closing.compareAndSet(false,true))
- {
- // force sync of outstanding async work
- _receivedLock.lock();
- try
- {
- receivedComplete();
- }
- finally
- {
- _receivedLock.unlock();
- }
- // REMOVE THIS SHOULD NOT BE HERE.
- if (CurrentActor.get() == null)
- {
- CurrentActor.set(_actor);
- }
- if (!_closed)
+ if(runningAsSubject())
+ {
+ if(_closing.compareAndSet(false,true))
{
- if (_virtualHost != null)
+ // force sync of outstanding async work
+ _receivedLock.lock();
+ try
{
- _virtualHost.getConnectionRegistry().deregisterConnection(this);
+ receivedComplete();
}
-
- closeAllChannels();
-
- for (Action<? super AMQProtocolEngine> task : _taskList)
+ finally
{
- task.performAction(this);
+ _receivedLock.unlock();
}
- synchronized(this)
+ if (!_closed)
{
- _closed = true;
- notifyAll();
+ if (_virtualHost != null)
+ {
+ _virtualHost.getConnectionRegistry().deregisterConnection(this);
+ }
+
+ closeAllChannels();
+
+ for (Action<? super AMQProtocolEngine> task : _taskList)
+ {
+ task.performAction(this);
+ }
+
+ synchronized(this)
+ {
+ _closed = true;
+ notifyAll();
+ }
+ SystemLog.message(_logSubject, ConnectionMessages.CLOSE());
}
- CurrentActor.get().message(_logSubject, ConnectionMessages.CLOSE());
}
- }
- else
- {
- synchronized(this)
+ else
{
+ synchronized(this)
+ {
- boolean lockHeld = _receivedLock.isHeldByCurrentThread();
+ boolean lockHeld = _receivedLock.isHeldByCurrentThread();
- while(!_closed)
- {
- try
+ while(!_closed)
{
- if(lockHeld)
+ try
+ {
+ if(lockHeld)
+ {
+ _receivedLock.unlock();
+ }
+ wait(1000);
+ }
+ catch (InterruptedException e)
{
- _receivedLock.unlock();
+ // do nothing
}
- wait(1000);
- }
- catch (InterruptedException e)
- {
- // do nothing
- }
- finally
- {
- if(lockHeld)
+ finally
{
- _receivedLock.lock();
+ if(lockHeld)
+ {
+ _receivedLock.lock();
+ }
}
}
}
}
}
+ else
+ {
+ runAsSubject(new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ closeSession();
+ return null;
+ }
+ });
+
+ }
}
private void closeConnection(int channelId, AMQConnectionException e)
@@ -1091,7 +1092,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
setContextKey(new AMQShortString(clientId));
}
- _actor.message(ConnectionMessages.OPEN(clientId, _protocolVersion.toString(), _clientVersion, _clientProduct, true, true, true, true));
+ SystemLog.message(ConnectionMessages.OPEN(clientId,
+ _protocolVersion.toString(),
+ _clientVersion,
+ _clientProduct,
+ true,
+ true,
+ true,
+ true));
}
}
@@ -1361,40 +1369,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
return String.valueOf(getRemoteAddress());
}
- public void mgmtCloseChannel(int channelId)
- {
- MethodRegistry methodRegistry = getMethodRegistry();
- ChannelCloseBody responseBody =
- methodRegistry.createChannelCloseBody(
- AMQConstant.REPLY_SUCCESS.getCode(),
- new AMQShortString("The channel was closed using the broker's management interface."),
- 0,0);
-
- // This seems ugly but because we use AMQChannel.close() in both normal
- // broker operation and as part of the management interface it cannot
- // be avoided. The Current Actor will be null when this method is
- // called via the QMF management interface. As such we need to set one.
- boolean removeActor = false;
- if (CurrentActor.get() == null)
- {
- removeActor = true;
- CurrentActor.set(new ManagementActor(_actor.getRootMessageLogger()));
- }
-
- try
- {
- writeFrame(responseBody.generateFrame(channelId));
- closeChannel(channelId);
- }
- finally
- {
- if (removeActor)
- {
- CurrentActor.remove();
- }
- }
- }
-
public void closeSession(AMQChannel<AMQProtocolEngine> session, AMQConstant cause, String message)
{
int channelId = session.getChannelId();
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
index 045367b667..ade087ef21 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
@@ -35,9 +35,7 @@ import org.apache.qpid.framing.MethodDispatcher;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.protocol.AMQConnectionModel;
-import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -48,8 +46,6 @@ public interface AMQProtocolSession<T extends AMQProtocolSession<T>>
{
long getSessionID();
- LogActor getLogActor();
-
void setMaxFrameSize(long frameMax);
long getMaxFrameSize();
@@ -202,14 +198,8 @@ public interface AMQProtocolSession<T extends AMQProtocolSession<T>>
void setMaximumNumberOfChannels(Long value);
- void commitTransactions(AMQChannel<T> channel) throws AMQException;
-
- void rollbackTransactions(AMQChannel<T> channel) throws AMQException;
-
List<AMQChannel<T>> getChannels();
- void mgmtCloseChannel(int channelId);
-
public Principal getPeerPrincipal();
Lock getReceivedLock();
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
index f82689e36a..3c3c17a6fb 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
@@ -511,6 +511,12 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
return _connectionId;
}
+ @Override
+ public Subject getSubject()
+ {
+ return _connection.getSubject();
+ }
+
public long getLastReadTime()
{
return _lastReadTime;
diff --git a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
index d11b1e029c..9a4adbc4c7 100644
--- a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
+++ b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
@@ -33,7 +33,7 @@ import javax.servlet.DispatcherType;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
-import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.logging.messages.ManagementConsoleMessages;
import org.apache.qpid.server.management.plugin.filter.ForbiddingAuthorisationFilter;
import org.apache.qpid.server.management.plugin.filter.RedirectingAuthorisationFilter;
@@ -140,7 +140,7 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem
private void start()
{
- CurrentActor.get().message(ManagementConsoleMessages.STARTUP(OPERATIONAL_LOGGING_NAME));
+ SystemLog.message(ManagementConsoleMessages.STARTUP(OPERATIONAL_LOGGING_NAME));
Collection<Port> httpPorts = getHttpPorts(getBroker().getPorts());
_server = createServer(httpPorts);
@@ -154,7 +154,7 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem
throw new ServerScopedRuntimeException("Failed to start HTTP management on ports : " + httpPorts, e);
}
- CurrentActor.get().message(ManagementConsoleMessages.READY(OPERATIONAL_LOGGING_NAME));
+ SystemLog.message(ManagementConsoleMessages.READY(OPERATIONAL_LOGGING_NAME));
}
private void stop()
@@ -172,7 +172,7 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem
}
}
- CurrentActor.get().message(ManagementConsoleMessages.STOPPED(OPERATIONAL_LOGGING_NAME));
+ SystemLog.message(ManagementConsoleMessages.STOPPED(OPERATIONAL_LOGGING_NAME));
}
public int getSessionTimeout()
@@ -381,13 +381,14 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem
Connector[] connectors = server.getConnectors();
for (Connector connector : connectors)
{
- CurrentActor.get().message(ManagementConsoleMessages.LISTENING(stringifyConnectorScheme(connector), connector.getPort()));
+ SystemLog.message(ManagementConsoleMessages.LISTENING(stringifyConnectorScheme(connector),
+ connector.getPort()));
if (connector instanceof SslSocketConnector)
{
SslContextFactory sslContextFactory = ((SslSocketConnector)connector).getSslContextFactory();
if (sslContextFactory != null && sslContextFactory.getKeyStorePath() != null)
{
- CurrentActor.get().message(ManagementConsoleMessages.SSL_KEYSTORE(sslContextFactory.getKeyStorePath()));
+ SystemLog.message(ManagementConsoleMessages.SSL_KEYSTORE(sslContextFactory.getKeyStorePath()));
}
}
}
@@ -398,7 +399,8 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem
Connector[] connectors = server.getConnectors();
for (Connector connector : connectors)
{
- CurrentActor.get().message(ManagementConsoleMessages.SHUTTING_DOWN(stringifyConnectorScheme(connector), connector.getPort()));
+ SystemLog.message(ManagementConsoleMessages.SHUTTING_DOWN(stringifyConnectorScheme(connector),
+ connector.getPort()));
}
}
diff --git a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java
index 5d39b3c0c1..023453e74f 100644
--- a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java
+++ b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java
@@ -22,11 +22,8 @@ package org.apache.qpid.server.management.plugin;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.security.AccessControlException;
import java.security.Principal;
import java.security.PrivilegedAction;
-import java.security.PrivilegedActionException;
-import java.security.PrivilegedExceptionAction;
import java.security.cert.X509Certificate;
import java.util.Collections;
@@ -37,9 +34,6 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;
import org.apache.commons.codec.binary.Base64;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.HttpManagementActor;
import org.apache.qpid.server.management.plugin.servlet.ServletConnectionPrincipal;
import org.apache.qpid.server.management.plugin.session.LoginLogoutReporter;
import org.apache.qpid.server.model.AuthenticationProvider;
@@ -51,7 +45,6 @@ import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationS
import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
import org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerFactory;
-import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.transport.network.security.ssl.SSLUtil;
public class HttpManagementUtil
@@ -117,55 +110,33 @@ public class HttpManagementUtil
subject.getPrincipals().add(new ServletConnectionPrincipal(request));
subject.setReadOnly();
- LogActor actor = createHttpManagementActor(broker, request);
+ assertManagementAccess(broker.getSecurityManager(), subject);
- assertManagementAccess(broker.getSecurityManager(), subject, actor);
-
- saveAuthorisedSubject(session, subject, actor);
+ saveAuthorisedSubject(session, subject);
}
}
- public static void assertManagementAccess(final SecurityManager securityManager, Subject subject, LogActor actor)
+ public static void assertManagementAccess(final SecurityManager securityManager, Subject subject)
{
- CurrentActor.set(actor);
- try
+ Subject.doAs(subject, new PrivilegedAction<Void>()
{
- Subject.doAs(subject, new PrivilegedAction<Void>()
+ @Override
+ public Void run()
{
- @Override
- public Void run()
- {
- securityManager.accessManagement();
- return null;
- }
- });
- }
- finally
- {
- CurrentActor.remove();
- }
- }
-
- public static HttpManagementActor getOrCreateAndCacheLogActor(HttpServletRequest request, Broker broker)
- {
- HttpSession session = request.getSession();
- HttpManagementActor actor = (HttpManagementActor) session.getAttribute(ATTR_LOG_ACTOR);
- if (actor == null)
- {
- actor = createHttpManagementActor(broker, request);
- session.setAttribute(ATTR_LOG_ACTOR, actor);
- }
- return actor;
+ securityManager.accessManagement();
+ return null;
+ }
+ });
}
- public static void saveAuthorisedSubject(HttpSession session, Subject subject, LogActor logActor)
+ public static void saveAuthorisedSubject(HttpSession session, Subject subject)
{
session.setAttribute(ATTR_SUBJECT, subject);
// Cause the user logon to be logged.
- session.setAttribute(ATTR_LOGIN_LOGOUT_REPORTER, new LoginLogoutReporter(logActor, subject));
+ session.setAttribute(ATTR_LOGIN_LOGOUT_REPORTER, new LoginLogoutReporter(subject));
}
public static Subject tryToAuthenticate(HttpServletRequest request, HttpManagementConfiguration managementConfig)
@@ -245,9 +216,5 @@ public class HttpManagementUtil
return null;
}
- private static HttpManagementActor createHttpManagementActor(Broker broker, HttpServletRequest request)
- {
- return new HttpManagementActor(broker.getRootMessageLogger(), request.getRemoteAddr(), request.getRemotePort());
- }
}
diff --git a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/ServletConnectionPrincipal.java b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/ServletConnectionPrincipal.java
index 0925e2b088..18a026ec93 100644
--- a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/ServletConnectionPrincipal.java
+++ b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/ServletConnectionPrincipal.java
@@ -20,13 +20,14 @@
*/
package org.apache.qpid.server.management.plugin.servlet;
+import org.apache.qpid.server.security.auth.ManagementConnectionPrincipal;
import org.apache.qpid.server.security.auth.SocketConnectionPrincipal;
import javax.servlet.ServletRequest;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
-public class ServletConnectionPrincipal implements SocketConnectionPrincipal
+public class ServletConnectionPrincipal implements ManagementConnectionPrincipal
{
private final InetSocketAddress _address;
@@ -74,4 +75,10 @@ public class ServletConnectionPrincipal implements SocketConnectionPrincipal
{
return _address.hashCode();
}
+
+ @Override
+ public String getType()
+ {
+ return "HTTP";
+ }
}
diff --git a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java
index 1133b6e091..a9e80db3bf 100644
--- a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java
+++ b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java
@@ -33,13 +33,9 @@ import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.HttpManagementActor;
import org.apache.qpid.server.management.plugin.HttpManagementConfiguration;
import org.apache.qpid.server.management.plugin.HttpManagementUtil;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.map.JsonMappingException;
@@ -88,7 +84,6 @@ public abstract class AbstractServlet extends HttpServlet
/**
* Performs the GET action as the logged-in {@link Subject}.
- * The {@link LogActor} is set before this method is called.
* Subclasses commonly override this method
*/
protected void doGetWithSubjectAndActor(HttpServletRequest request, HttpServletResponse resp) throws ServletException, IOException
@@ -117,7 +112,6 @@ public abstract class AbstractServlet extends HttpServlet
/**
* Performs the POST action as the logged-in {@link Subject}.
- * The {@link LogActor} is set before this method is called.
* Subclasses commonly override this method
*/
protected void doPostWithSubjectAndActor(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
@@ -145,7 +139,6 @@ public abstract class AbstractServlet extends HttpServlet
/**
* Performs the PUT action as the logged-in {@link Subject}.
- * The {@link LogActor} is set before this method is called.
* Subclasses commonly override this method
*/
protected void doPutWithSubjectAndActor(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
@@ -174,7 +167,6 @@ public abstract class AbstractServlet extends HttpServlet
/**
* Performs the PUT action as the logged-in {@link Subject}.
- * The {@link LogActor} is set before this method is called.
* Subclasses commonly override this method
*/
protected void doDeleteWithSubjectAndActor(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
@@ -198,8 +190,6 @@ public abstract class AbstractServlet extends HttpServlet
return;
}
- HttpManagementActor logActor = HttpManagementUtil.getOrCreateAndCacheLogActor(request, _broker);
- CurrentActor.set(logActor);
try
{
Subject.doAs(subject, privilegedExceptionAction);
@@ -223,11 +213,6 @@ public abstract class AbstractServlet extends HttpServlet
}
throw new ConnectionScopedRuntimeException(e.getCause());
}
- finally
- {
- CurrentActor.remove();
- }
-
}
protected Subject getAuthorisedSubject(HttpServletRequest request)
diff --git a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/SaslServlet.java b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/SaslServlet.java
index b59e9bc04d..2ca67fadc9 100644
--- a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/SaslServlet.java
+++ b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/SaslServlet.java
@@ -27,7 +27,6 @@ import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.management.plugin.HttpManagementConfiguration;
import org.apache.qpid.server.management.plugin.HttpManagementUtil;
import org.apache.qpid.server.model.Broker;
@@ -247,10 +246,9 @@ public class SaslServlet extends AbstractServlet
subject.setReadOnly();
Broker broker = getBroker();
- LogActor actor = HttpManagementUtil.getOrCreateAndCacheLogActor(request, broker);
try
{
- HttpManagementUtil.assertManagementAccess(broker.getSecurityManager(), subject, actor);
+ HttpManagementUtil.assertManagementAccess(broker.getSecurityManager(), subject);
}
catch(SecurityException e)
{
@@ -258,7 +256,7 @@ public class SaslServlet extends AbstractServlet
return;
}
- HttpManagementUtil.saveAuthorisedSubject(request.getSession(), subject, actor);
+ HttpManagementUtil.saveAuthorisedSubject(request.getSession(), subject);
session.removeAttribute(ATTR_ID);
session.removeAttribute(ATTR_SASL_SERVER);
session.removeAttribute(ATTR_EXPIRY);
diff --git a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/session/LoginLogoutReporter.java b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/session/LoginLogoutReporter.java
index 238f1b4719..5155654e82 100644
--- a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/session/LoginLogoutReporter.java
+++ b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/session/LoginLogoutReporter.java
@@ -28,7 +28,7 @@ import javax.servlet.http.HttpSessionBindingEvent;
import javax.servlet.http.HttpSessionBindingListener;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.logging.messages.ManagementConsoleMessages;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
@@ -40,14 +40,12 @@ import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
public class LoginLogoutReporter implements HttpSessionBindingListener
{
private static final Logger LOGGER = Logger.getLogger(LoginLogoutReporter.class);
- private final LogActor _logActor;
private final Subject _subject;
private final Principal _principal;
- public LoginLogoutReporter(LogActor logActor, Subject subject)
+ public LoginLogoutReporter(Subject subject)
{
super();
- _logActor = logActor;
_subject = subject;
_principal = AuthenticatedPrincipal.getAuthenticatedPrincipalFromSubject(_subject);
}
@@ -76,7 +74,7 @@ public class LoginLogoutReporter implements HttpSessionBindingListener
@Override
public Void run()
{
- _logActor.message(ManagementConsoleMessages.OPEN(_principal.getName()));
+ SystemLog.message(ManagementConsoleMessages.OPEN(_principal.getName()));
return null;
}
});
@@ -94,7 +92,7 @@ public class LoginLogoutReporter implements HttpSessionBindingListener
@Override
public Void run()
{
- _logActor.message(ManagementConsoleMessages.CLOSE(_principal.getName()));
+ SystemLog.message(ManagementConsoleMessages.CLOSE(_principal.getName()));
return null;
}
});
diff --git a/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/session/LoginLogoutReporterTest.java b/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/session/LoginLogoutReporterTest.java
index 1d43c44587..16ae5220ab 100644
--- a/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/session/LoginLogoutReporterTest.java
+++ b/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/session/LoginLogoutReporterTest.java
@@ -19,16 +19,20 @@
*/
package org.apache.qpid.server.management.plugin.session;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.when;
import javax.security.auth.Subject;
-import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogMessage;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.mockito.ArgumentMatcher;
-import org.mockito.Mockito;
import junit.framework.TestCase;
@@ -36,7 +40,7 @@ public class LoginLogoutReporterTest extends TestCase
{
private LoginLogoutReporter _loginLogoutReport;
private Subject _subject = new Subject();
- private LogActor _logActor = Mockito.mock(LogActor.class);
+ private RootMessageLogger _logger = mock(RootMessageLogger.class);
@Override
protected void setUp() throws Exception
@@ -44,19 +48,22 @@ public class LoginLogoutReporterTest extends TestCase
super.setUp();
_subject.getPrincipals().add(new AuthenticatedPrincipal("mockusername"));
- _loginLogoutReport = new LoginLogoutReporter(_logActor, _subject);
+ when(_logger.isEnabled()).thenReturn(true);
+ when(_logger.isMessageEnabled(anyString())).thenReturn(true);
+ SystemLog.setRootMessageLogger(_logger);
+ _loginLogoutReport = new LoginLogoutReporter(_subject);
}
public void testLoginLogged()
{
_loginLogoutReport.valueBound(null);
- verify(_logActor).message(isLogMessageWithMessage("MNG-1007 : Open : User mockusername"));
+ verify(_logger).message(isLogMessageWithMessage("MNG-1007 : Open : User mockusername"));
}
public void testLogoutLogged()
{
_loginLogoutReport.valueUnbound(null);
- verify(_logActor).message(isLogMessageWithMessage("MNG-1008 : Close : User mockusername"));
+ verify(_logger).message(isLogMessageWithMessage("MNG-1008 : Close : User mockusername"));
}
private LogMessage isLogMessageWithMessage(final String expectedMessage)
diff --git a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagedObjectRegistry.java b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagedObjectRegistry.java
index e2b9a98784..8b1463b476 100644
--- a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagedObjectRegistry.java
+++ b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagedObjectRegistry.java
@@ -20,10 +20,9 @@
*/
package org.apache.qpid.server.jmx;
-import javax.net.ssl.KeyManager;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.BrokerProperties;
-import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.logging.messages.ManagementConsoleMessages;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.KeyStore;
@@ -32,7 +31,6 @@ import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.security.auth.jmx.JMXPasswordAuthenticator;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
-import org.apache.qpid.ssl.SSLContextFactory;
import javax.management.JMException;
import javax.management.MBeanServer;
@@ -99,12 +97,12 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry
@Override
public void start() throws IOException
{
- CurrentActor.get().message(ManagementConsoleMessages.STARTUP(OPERATIONAL_LOGGING_NAME));
+ SystemLog.message(ManagementConsoleMessages.STARTUP(OPERATIONAL_LOGGING_NAME));
//check if system properties are set to use the JVM's out-of-the-box JMXAgent
if (areOutOfTheBoxJMXOptionsSet())
{
- CurrentActor.get().message(ManagementConsoleMessages.READY(OPERATIONAL_LOGGING_NAME));
+ SystemLog.message(ManagementConsoleMessages.READY(OPERATIONAL_LOGGING_NAME));
}
else
{
@@ -138,7 +136,7 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry
throw new ServerScopedRuntimeException("Unable to create SSLContext for key store", e);
}
- CurrentActor.get().message(ManagementConsoleMessages.SSL_KEYSTORE(keyStore.getName()));
+ SystemLog.message(ManagementConsoleMessages.SSL_KEYSTORE(keyStore.getName()));
//create the SSL RMI socket factories
csf = new SslRMIClientSocketFactory();
@@ -250,8 +248,8 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry
_cs.start();
String connectorServer = (connectorSslEnabled ? "SSL " : "") + "JMX RMIConnectorServer";
- CurrentActor.get().message(ManagementConsoleMessages.LISTENING(connectorServer, jmxPortConnectorServer));
- CurrentActor.get().message(ManagementConsoleMessages.READY(OPERATIONAL_LOGGING_NAME));
+ SystemLog.message(ManagementConsoleMessages.LISTENING(connectorServer, jmxPortConnectorServer));
+ SystemLog.message(ManagementConsoleMessages.READY(OPERATIONAL_LOGGING_NAME));
}
private Registry createRmiRegistry(int jmxPortRegistryServer, boolean useCustomRmiRegistry)
@@ -269,7 +267,7 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry
rmiRegistry = LocateRegistry.createRegistry(jmxPortRegistryServer, null, null);
}
- CurrentActor.get().message(ManagementConsoleMessages.LISTENING("RMI Registry", jmxPortRegistryServer));
+ SystemLog.message(ManagementConsoleMessages.LISTENING("RMI Registry", jmxPortRegistryServer));
return rmiRegistry;
}
@@ -294,7 +292,7 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry
unregisterAllMbeans();
- CurrentActor.get().message(ManagementConsoleMessages.STOPPED(OPERATIONAL_LOGGING_NAME));
+ SystemLog.message(ManagementConsoleMessages.STOPPED(OPERATIONAL_LOGGING_NAME));
}
private void closeConnectorAndRegistryServers()
@@ -338,7 +336,7 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry
if (_rmiRegistry != null)
{
// Stopping the RMI registry
- CurrentActor.get().message(ManagementConsoleMessages.SHUTTING_DOWN("RMI Registry", _registryPort.getPort()));
+ SystemLog.message(ManagementConsoleMessages.SHUTTING_DOWN("RMI Registry", _registryPort.getPort()));
try
{
boolean success = UnicastRemoteObject.unexportObject(_rmiRegistry, false);
@@ -365,7 +363,8 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry
// Stopping the JMX ConnectorServer
try
{
- CurrentActor.get().message(ManagementConsoleMessages.SHUTTING_DOWN("JMX RMIConnectorServer", _cs.getAddress().getPort()));
+ SystemLog.message(ManagementConsoleMessages.SHUTTING_DOWN("JMX RMIConnectorServer",
+ _cs.getAddress().getPort()));
_cs.stop();
}
catch (IOException e)
diff --git a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/MBeanInvocationHandlerImpl.java b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/MBeanInvocationHandlerImpl.java
index a38addae7b..b20685985f 100644
--- a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/MBeanInvocationHandlerImpl.java
+++ b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/MBeanInvocationHandlerImpl.java
@@ -23,8 +23,6 @@ package org.apache.qpid.server.jmx;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.BrokerProperties;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.ManagementActor;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.security.SecurityManager;
@@ -39,17 +37,13 @@ import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.RuntimeErrorException;
import javax.management.remote.MBeanServerForwarder;
-import javax.management.remote.rmi.RMIServer;
import javax.security.auth.Subject;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
-import java.rmi.server.RemoteServer;
-import java.rmi.server.ServerNotActiveException;
import java.security.AccessControlContext;
import java.security.AccessController;
-import java.security.PrivilegedAction;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
@@ -64,7 +58,6 @@ public class MBeanInvocationHandlerImpl implements InvocationHandler
private final static String DELEGATE = "JMImplementation:type=MBeanServerDelegate";
private MBeanServer _mbs;
- private final ManagementActor _logActor;
private final boolean _managementRightsInferAllAccess;
private final Broker _broker;
@@ -73,7 +66,6 @@ public class MBeanInvocationHandlerImpl implements InvocationHandler
{
_managementRightsInferAllAccess = Boolean.valueOf(System.getProperty(BrokerProperties.PROPERTY_MANAGEMENT_RIGHTS_INFER_ALL_ACCESS, "true"));
_broker = broker;
- _logActor = new ManagementActor(broker.getRootMessageLogger());
}
public static MBeanServerForwarder newProxyInstance(Broker broker)
@@ -117,9 +109,9 @@ public class MBeanInvocationHandlerImpl implements InvocationHandler
return false;
}
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
+ public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable
{
- String methodName = method.getName();
+ final String methodName = method.getName();
if (methodName.equals("getMBeanServer"))
{
@@ -167,16 +159,7 @@ public class MBeanInvocationHandlerImpl implements InvocationHandler
throw new SecurityException("Access denied: no authenticated principal", e);
}
- // Save the subject
- CurrentActor.set(_logActor);
- try
- {
- return authoriseAndInvoke(method, args);
- }
- finally
- {
- CurrentActor.remove();
- }
+ return authoriseAndInvoke(method, args);
}
catch (InvocationTargetException e)
{
@@ -207,7 +190,7 @@ public class MBeanInvocationHandlerImpl implements InvocationHandler
}
}
- private Object authoriseAndInvoke(final Method method, final Object[] args) throws Throwable
+ private Object authoriseAndInvoke(final Method method, final Object[] args) throws Exception
{
String methodName;
// Get the component, type and impact, which may be null
@@ -239,8 +222,11 @@ public class MBeanInvocationHandlerImpl implements InvocationHandler
{
try
{
+ Subject subject = Subject.getSubject(AccessController.getContext());
+ subject = new Subject(false, subject.getPrincipals(), subject.getPublicCredentials(), subject.getPrivateCredentials());
+ subject.getPrincipals().addAll(SecurityManager.SYSTEM.getPrincipals());
- return Subject.doAs(SecurityManager.SYSTEM, new PrivilegedExceptionAction<Object>()
+ return Subject.doAs(subject, new PrivilegedExceptionAction<Object>()
{
@Override
public Object run() throws IllegalAccessException, InvocationTargetException
@@ -251,7 +237,7 @@ public class MBeanInvocationHandlerImpl implements InvocationHandler
}
catch (PrivilegedActionException e)
{
- throw e.getCause();
+ throw (Exception) e.getCause();
}
}
else
diff --git a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/ManagementLogonLogoffReporter.java b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/ManagementLogonLogoffReporter.java
index ae0574dc21..79d944fc5c 100644
--- a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/ManagementLogonLogoffReporter.java
+++ b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/ManagementLogonLogoffReporter.java
@@ -27,12 +27,19 @@ import javax.management.Notification;
import javax.management.NotificationFilter;
import javax.management.NotificationListener;
import javax.management.remote.JMXConnectionNotification;
+import javax.security.auth.Subject;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.RootMessageLogger;
-import org.apache.qpid.server.logging.actors.ManagementActor;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.logging.messages.ManagementConsoleMessages;
+import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
+import org.apache.qpid.server.security.auth.jmx.JMXConnectionPrincipal;
+
+import java.rmi.server.RemoteServer;
+import java.rmi.server.ServerNotActiveException;
+import java.security.PrivilegedAction;
+import java.util.Collections;
public class ManagementLogonLogoffReporter implements NotificationListener, NotificationFilter
{
@@ -65,19 +72,41 @@ public class ManagementLogonLogoffReporter implements NotificationListener, Not
final String[] splitConnectionId = connectionId.split(" ");
user = splitConnectionId[1];
}
+ Subject originalSubject = new Subject(false, Collections.singleton(new AuthenticatedPrincipal(user)), Collections.emptySet(), Collections.emptySet());
+ Subject subject;
- // use a separate instance of actor as subject is not set on connect/disconnect
- // we need to pass principal name explicitly into log actor
- LogActor logActor = new ManagementActor(_rootMessageLogger, user);
- if (JMXConnectionNotification.OPENED.equals(type))
+ try
{
- logActor.message(ManagementConsoleMessages.OPEN(user));
+ String clientHost = RemoteServer.getClientHost();
+ subject = new Subject(false,
+ originalSubject.getPrincipals(),
+ originalSubject.getPublicCredentials(),
+ originalSubject.getPrivateCredentials());
+ subject.getPrincipals().add(new JMXConnectionPrincipal(clientHost));
+ subject.setReadOnly();
}
- else if (JMXConnectionNotification.CLOSED.equals(type) ||
- JMXConnectionNotification.FAILED.equals(type))
+ catch(ServerNotActiveException e)
{
- logActor.message(ManagementConsoleMessages.CLOSE(user));
+ subject = originalSubject;
}
+ final String username = user;
+ Subject.doAs(subject, new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ if (JMXConnectionNotification.OPENED.equals(type))
+ {
+ SystemLog.message(ManagementConsoleMessages.OPEN(username));
+ }
+ else if (JMXConnectionNotification.CLOSED.equals(type) ||
+ JMXConnectionNotification.FAILED.equals(type))
+ {
+ SystemLog.message(ManagementConsoleMessages.CLOSE(username));
+ }
+ return null;
+ }
+ });
}
@Override
diff --git a/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/ManagementLogonLogoffReporterTest.java b/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/ManagementLogonLogoffReporterTest.java
index ba9c2cdaa5..515a9d88f2 100644
--- a/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/ManagementLogonLogoffReporterTest.java
+++ b/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/ManagementLogonLogoffReporterTest.java
@@ -23,6 +23,7 @@ import static javax.management.remote.JMXConnectionNotification.OPENED;
import static javax.management.remote.JMXConnectionNotification.CLOSED;
import static javax.management.remote.JMXConnectionNotification.FAILED;
+import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
@@ -31,10 +32,14 @@ import static org.mockito.Matchers.anyString;
import javax.management.remote.JMXConnectionNotification;
-import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.RootMessageLogger;
import junit.framework.TestCase;
+import org.apache.qpid.server.logging.SystemLog;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.mockito.ArgumentMatcher;
public class ManagementLogonLogoffReporterTest extends TestCase
{
@@ -52,8 +57,8 @@ public class ManagementLogonLogoffReporterTest extends TestCase
_usernameAccessor = mock(UsernameAccessor.class);
_rootMessageLogger = mock(RootMessageLogger.class);
// Enable messaging so we can valid the generated strings
- when(_rootMessageLogger.isMessageEnabled(any(LogActor.class), anyString())).thenReturn(true);
-
+ when(_rootMessageLogger.isMessageEnabled(anyString())).thenReturn(true);
+ SystemLog.setRootMessageLogger(_rootMessageLogger);
_reporter = new ManagementLogonLogoffReporter(_rootMessageLogger, _usernameAccessor);
}
@@ -64,7 +69,21 @@ public class ManagementLogonLogoffReporterTest extends TestCase
_reporter.handleNotification(openNotification, null);
- verify(_rootMessageLogger).rawMessage("[main] MNG-1007 : Open : User jmxuser", "qpid.message.managementconsole.open");
+ verify(_rootMessageLogger).message(messageMatch("MNG-1007 : Open : User jmxuser",
+ "qpid.message.managementconsole.open"));
+ }
+
+ private LogMessage messageMatch(final String message, final String hierarchy)
+ {
+ return argThat(new ArgumentMatcher<LogMessage>()
+ {
+ @Override
+ public boolean matches(final Object argument)
+ {
+ LogMessage actual = (LogMessage) argument;
+ return actual.getLogHierarchy().equals(hierarchy) && actual.toString().equals(message);
+ }
+ });
}
public void testClosedNotification()
@@ -74,7 +93,7 @@ public class ManagementLogonLogoffReporterTest extends TestCase
_reporter.handleNotification(closeNotification, null);
- verify(_rootMessageLogger).rawMessage("[main] MNG-1008 : Close : User jmxuser", "qpid.message.managementconsole.close");
+ verify(_rootMessageLogger).message(messageMatch("MNG-1008 : Close : User jmxuser", "qpid.message.managementconsole.close"));
}
public void tesNotifiedForLogOnTypeEvents()
diff --git a/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java b/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java
index e8362f79f0..5c6918e87d 100644
--- a/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java
+++ b/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java
@@ -20,10 +20,14 @@
*/
package org.apache.qpid.protocol;
+import javax.security.auth.Subject;
+
public interface ServerProtocolEngine extends ProtocolEngine
{
/**
* Gets the connection ID associated with this ProtocolEngine
*/
long getConnectionId();
+
+ Subject getSubject();
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java b/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java
index cec339c033..047151684f 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java
@@ -81,7 +81,9 @@ public class ChannelLoggingTest extends AbstractTestLogging
String log = getLogMessage(results, 0);
// MESSAGE [con:0(guest@anonymous(3273383)/test)/ch:1] CHN-1001 : Create
validateMessageID("CHN-1001", log);
- assertEquals("Incorrect Channel in actor:"+fromActor(log), isBroker010()? 0 : 1, getChannelID(fromActor(log)));
+ final String fromActor = fromActor(log);
+ final int channelID = getChannelID(fromActor);
+ assertEquals("Incorrect Channel in actor:"+fromActor(log), isBroker010()? 0 : 1, channelID);
if (!isBroker010())
{
diff --git a/java/systests/src/main/java/org/apache/qpid/server/logging/VirtualHostLoggingTest.java b/java/systests/src/main/java/org/apache/qpid/server/logging/VirtualHostLoggingTest.java
index 25dd5fd2f8..824a84eda8 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/logging/VirtualHostLoggingTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/logging/VirtualHostLoggingTest.java
@@ -69,7 +69,7 @@ public class VirtualHostLoggingTest extends AbstractTestLogging
{
List<String> vhosts = Arrays.asList("test");
- assertEquals("Each vhost did not create a store.", vhosts.size(), results.size());
+ assertEquals("Each vhost did not create a store.", 2*vhosts.size(), results.size());
for (int index = 0; index < results.size(); index++)
{
diff --git a/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java b/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java
index 8bad73d0ea..c2713a5e1f 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java
@@ -20,11 +20,15 @@
*/
package org.apache.qpid.test.utils;
+import java.security.PrivilegedAction;
import java.util.Set;
import org.apache.log4j.Logger;
import org.apache.qpid.server.Broker;
+import org.apache.qpid.server.security.auth.TaskPrincipal;
+
+import javax.security.auth.Subject;
public class InternalBrokerHolder implements BrokerHolder
{
@@ -57,8 +61,21 @@ public class InternalBrokerHolder implements BrokerHolder
{
LOGGER.info("Shutting down Broker instance");
- _broker.shutdown();
+ Subject subject = org.apache.qpid.server.security.SecurityManager.SYSTEM;
+ subject = new Subject(false, subject.getPrincipals(), subject.getPublicCredentials(), subject.getPrivateCredentials());
+ subject.getPrincipals().add(new TaskPrincipal("Shutdown"));
+
+ Subject.doAs(subject, new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ _broker.shutdown();
+ return null;
+ }
+
+ });
waitUntilPortsAreFree();
LOGGER.info("Broker instance shutdown");