diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2014-03-07 16:36:26 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2014-03-07 16:36:26 +0000 |
commit | 414bb380bf3b0edf51dcdc665fcef6a7b9dc5d9d (patch) | |
tree | 3928c2a587da59331246dafdd4d85591f3a5a483 | |
parent | fda07b599eda172c7979ba217390fabc66ab2b21 (diff) | |
download | qpid-python-414bb380bf3b0edf51dcdc665fcef6a7b9dc5d9d.tar.gz |
QPID-5611 : [Java Broker] remove LogActors
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1575315 13f79535-47bb-0310-9956-ffa450edef68
101 files changed, 1448 insertions, 2732 deletions
diff --git a/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java b/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java index 54051ab630..0d963ebdae 100644 --- a/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java +++ b/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java @@ -40,9 +40,6 @@ import junit.framework.TestCase; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.jmx.AMQManagedObject; import org.apache.qpid.server.jmx.ManagedObjectRegistry; -import org.apache.qpid.server.logging.SystemOutMessageLogger; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.TestLogActor; import org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore; public class BDBHAMessageStoreManagerMBeanTest extends TestCase @@ -65,7 +62,6 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase { super.setUp(); - CurrentActor.set(new TestLogActor(new SystemOutMessageLogger())); _store = mock(BDBHAMessageStore.class); _mBeanParent = mock(AMQManagedObject.class); when(_mBeanParent.getRegistry()).thenReturn(mock(ManagedObjectRegistry.class)); @@ -76,7 +72,6 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase protected void tearDown() throws Exception { super.tearDown(); - CurrentActor.remove(); } public void testObjectName() throws Exception diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java index 7f119880b0..d61da537f3 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.store.berkeleydb; import java.io.File; import java.net.InetSocketAddress; +import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -35,10 +36,9 @@ import java.util.concurrent.Executors; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.Logger; +import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.security.auth.TaskPrincipal; import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.logging.RootMessageLogger; -import org.apache.qpid.server.logging.actors.AbstractActor; -import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.HAMessageStore; import org.apache.qpid.server.store.MessageStore; @@ -66,6 +66,8 @@ import com.sleepycat.je.rep.StateChangeEvent; import com.sleepycat.je.rep.StateChangeListener; import com.sleepycat.je.rep.util.ReplicationGroupAdmin; +import javax.security.auth.Subject; + public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMessageStore { private static final Logger LOGGER = Logger.getLogger(BDBHAMessageStore.class); @@ -608,7 +610,6 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess private void executeStateChangeAsync(final Callable<Void> callable, final String threadName) { - final RootMessageLogger _rootLogger = CurrentActor.get().getRootMessageLogger(); _executor.execute(new Runnable() { @@ -618,25 +619,28 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess { final String originalThreadName = Thread.currentThread().getName(); Thread.currentThread().setName(threadName); + Subject subject = new Subject(false, SecurityManager.SYSTEM.getPrincipals(), + Collections.emptySet(), Collections.emptySet()); + subject.getPrincipals().add(new TaskPrincipal("BDB HA State Change")); try { - CurrentActor.set(new AbstractActor(_rootLogger) + Subject.doAs(subject, new PrivilegedAction<Object>() { @Override - public String getLogMessage() + public Object run() { - return threadName; + + try + { + callable.call(); + } + catch (Exception e) + { + LOGGER.error("Exception during state change", e); + } + return null; } }); - - try - { - callable.call(); - } - catch (Exception e) - { - LOGGER.error("Exception during state change", e); - } } finally { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java b/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java index 66171c6fc2..8117f3ebfc 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java @@ -23,6 +23,8 @@ package org.apache.qpid.server; import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.security.PrivilegedAction; +import java.security.PrivilegedExceptionAction; import java.util.List; import java.util.Properties; import java.util.Set; @@ -32,13 +34,16 @@ import org.apache.log4j.PropertyConfigurator; import org.apache.qpid.server.configuration.ConfigurationEntryStore; import org.apache.qpid.server.configuration.BrokerConfigurationStoreCreator; import org.apache.qpid.server.configuration.store.ManagementModeStoreHandler; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.logging.SystemOutMessageLogger; -import org.apache.qpid.server.logging.actors.BrokerActor; -import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.log4j.LoggingManagementFacade; import org.apache.qpid.server.logging.messages.BrokerMessages; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.security.auth.TaskPrincipal; + +import javax.security.auth.Subject; public class Broker { @@ -80,17 +85,22 @@ public class Broker public void startup(final BrokerOptions options) throws Exception { - CurrentActor.set(new BrokerActor(new SystemOutMessageLogger())); - try + Subject subject = SecurityManager.SYSTEM; + subject = new Subject(false, subject.getPrincipals(), subject.getPublicCredentials(), subject.getPrivateCredentials()); + subject.getPrincipals().add(new TaskPrincipal("Broker")); + Subject.doAs(subject, new PrivilegedExceptionAction<Object>() { - startupImpl(options); - addShutdownHook(); - } - finally - { - CurrentActor.remove(); + @Override + public Object run() throws Exception + { + SystemLog.setRootMessageLogger(new SystemOutMessageLogger()); + startupImpl(options); + addShutdownHook(); + + return null; + } + }); - } } private void startupImpl(final BrokerOptions options) throws Exception @@ -98,7 +108,7 @@ public class Broker String storeLocation = options.getConfigurationStoreLocation(); String storeType = options.getConfigurationStoreType(); - CurrentActor.get().message(BrokerMessages.CONFIG(storeLocation)); + SystemLog.message(BrokerMessages.CONFIG(storeLocation)); //Allow skipping the logging configuration for people who are //embedding the broker and want to configure it themselves. @@ -159,7 +169,7 @@ public class Broker { if (logConfigFile.exists() && logConfigFile.canRead()) { - CurrentActor.get().message(BrokerMessages.LOG_CONFIG(logConfigFile.getAbsolutePath())); + SystemLog.message(BrokerMessages.LOG_CONFIG(logConfigFile.getAbsolutePath())); if (logWatchTime > 0) { @@ -270,8 +280,20 @@ public class Broker { public void run() { - LOGGER.debug("Shutdown hook running"); - Broker.this.shutdown(); + + Subject subject = SecurityManager.SYSTEM; + subject = new Subject(false, subject.getPrincipals(), subject.getPublicCredentials(), subject.getPrivateCredentials()); + subject.getPrincipals().add(new TaskPrincipal("Shutdown")); + Subject.doAs(subject, new PrivilegedAction<Object>() + { + @Override + public Object run() + { + LOGGER.debug("Shutdown hook running"); + Broker.this.shutdown(); + return null; + } + }); } } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/TransactionTimeoutHelper.java b/java/broker-core/src/main/java/org/apache/qpid/server/TransactionTimeoutHelper.java index 13f4fbf254..2824c1b7dc 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/TransactionTimeoutHelper.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/TransactionTimeoutHelper.java @@ -18,10 +18,9 @@ */ package org.apache.qpid.server; -import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.txn.ServerTransaction; @@ -73,8 +72,7 @@ public class TransactionTimeoutHelper { if (isTimedOut(timeSoFar, warnTimeout)) { - LogActor logActor = CurrentActor.get(); - logActor.message(_logSubject, warnMessage); + SystemLog.message(_logSubject, warnMessage); } if(isTimedOut(timeSoFar, closeTimeout)) diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java b/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java index 69e05ad989..9864fd2be1 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java @@ -21,7 +21,7 @@ package org.apache.qpid.server.binding; import org.apache.qpid.server.exchange.ExchangeImpl; -import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.logging.messages.BindingMessages; import org.apache.qpid.server.logging.subjects.BindingLogSubject; import org.apache.qpid.server.model.Binding; @@ -94,9 +94,9 @@ public class BindingImpl //Perform ACLs queue.getVirtualHost().getSecurityManager().authoriseCreateBinding(this); _logSubject = new BindingLogSubject(_bindingKey,exchange,queue); - CurrentActor.get().message(_logSubject, BindingMessages.CREATED(String.valueOf(getArguments()), - getArguments() != null - && !getArguments().isEmpty())); + SystemLog.message(_logSubject, BindingMessages.CREATED(String.valueOf(getArguments()), + getArguments() != null + && !getArguments().isEmpty())); } @@ -229,7 +229,7 @@ public class BindingImpl { listener.stateChanged(this, State.ACTIVE, State.DELETED); } - CurrentActor.get().message(_logSubject, BindingMessages.DELETED()); + SystemLog.message(_logSubject, BindingMessages.DELETED()); } } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java b/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java index e917202612..43ff07e6d0 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java @@ -22,8 +22,6 @@ package org.apache.qpid.server.configuration.updater; import java.security.AccessController; import java.security.PrivilegedAction; -import java.security.PrivilegedActionException; -import java.security.PrivilegedExceptionAction; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; @@ -39,10 +37,7 @@ import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; import org.apache.log4j.Logger; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.model.State; -import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.util.ServerScopedRuntimeException; public class TaskExecutor @@ -210,47 +205,29 @@ public class TaskExecutor private class CallableWrapper<T> implements Task<T> { private Task<T> _userTask; - private LogActor _actor; private Subject _contextSubject; public CallableWrapper(Task<T> userWork) { _userTask = userWork; - _actor = CurrentActor.get(); _contextSubject = Subject.getSubject(AccessController.getContext()); } @Override public T call() { - CurrentActor.set(_actor); - - try - { - T result = null; - result = Subject.doAs(_contextSubject, new PrivilegedAction<T>() + T result = null; + result = Subject.doAs(_contextSubject, new PrivilegedAction<T>() + { + @Override + public T run() { - @Override - public T run() - { - return executeTask(_userTask); - } - }); + return executeTask(_userTask); + } + }); - return result; - } - finally - { - try - { - CurrentActor.remove(); - } - catch (Exception e) - { - LOGGER.warn("Unexpected exception on current actor removal", e); - } - } + return result; } } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index e01f4b7db9..7396f74c0e 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -25,7 +25,7 @@ import java.util.ArrayList; import org.apache.log4j.Logger; import org.apache.qpid.server.binding.BindingImpl; import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.logging.messages.ExchangeMessages; import org.apache.qpid.server.logging.subjects.ExchangeLogSubject; import org.apache.qpid.server.message.InstanceProperties; @@ -166,7 +166,7 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> } }; // Log Exchange creation - CurrentActor.get().message(ExchangeMessages.CREATED(getExchangeType().getType(), getName(), _durable)); + SystemLog.message(ExchangeMessages.CREATED(getExchangeType().getType(), getName(), _durable)); } public abstract ExchangeType<T> getExchangeType(); @@ -204,7 +204,7 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> _alternateExchange.removeReference(this); } - CurrentActor.get().message(_logSubject, ExchangeMessages.DELETED()); + SystemLog.message(_logSubject, ExchangeMessages.DELETED()); for(Action<ExchangeImpl> task : _closeTaskList) { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/AbstractRootMessageLogger.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/AbstractRootMessageLogger.java index 98da9074ef..4717e7ccf2 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/AbstractRootMessageLogger.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/logging/AbstractRootMessageLogger.java @@ -21,10 +21,32 @@ package org.apache.qpid.server.logging; +import org.apache.qpid.server.connection.ConnectionPrincipal; +import org.apache.qpid.server.connection.SessionPrincipal; +import org.apache.qpid.server.logging.subjects.LogSubjectFormat; +import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; +import org.apache.qpid.server.security.auth.ManagementConnectionPrincipal; +import org.apache.qpid.server.security.auth.TaskPrincipal; + +import javax.security.auth.Subject; +import java.security.AccessController; +import java.security.Principal; +import java.text.MessageFormat; +import java.util.Set; + +import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT; +import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT; +import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT; +import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT; + public abstract class AbstractRootMessageLogger implements RootMessageLogger { public static final String DEFAULT_LOG_HIERARCHY_PREFIX = "qpid.message."; - + + private final String _msgPrefix = System.getProperty("qpid.logging.prefix",""); + private boolean _enabled = true; public AbstractRootMessageLogger() @@ -42,17 +64,159 @@ public abstract class AbstractRootMessageLogger implements RootMessageLogger return _enabled; } - public boolean isMessageEnabled(LogActor actor, LogSubject subject, String logHierarchy) + public boolean isMessageEnabled(String logHierarchy) { return _enabled; } - public boolean isMessageEnabled(LogActor actor, String logHierarchy) + public void message(LogMessage message) { - return _enabled; + if (isMessageEnabled(message.getLogHierarchy())) + { + rawMessage(_msgPrefix + getActor() + message, message.getLogHierarchy()); + } + } + + public void message(LogSubject subject, LogMessage message) + { + if (isMessageEnabled(message.getLogHierarchy())) + { + rawMessage(_msgPrefix + getActor() + subject.toLogString() + message, + message.getLogHierarchy()); + } + } + abstract void rawMessage(String message, String logHierarchy); + + abstract void rawMessage(String message, Throwable throwable, String logHierarchy); + + + protected String getActor() + { + Subject subject = Subject.getSubject(AccessController.getContext()); + + SessionPrincipal sessionPrincipal = getPrincipal(subject, SessionPrincipal.class); + String message; + if(sessionPrincipal != null) + { + message = generateSessionMessage(sessionPrincipal.getSession()); + } + else + { + ConnectionPrincipal connPrincipal = getPrincipal(subject, ConnectionPrincipal.class); + + if(connPrincipal != null) + { + message = generateConnectionMessage(connPrincipal.getConnection()); + } + else + { + TaskPrincipal taskPrincipal = getPrincipal(subject, TaskPrincipal.class); + if(taskPrincipal != null) + { + message = generateTaskMessage(taskPrincipal); + } + else + { + ManagementConnectionPrincipal managementConnection = getPrincipal(subject,ManagementConnectionPrincipal.class); + if(managementConnection != null) + { + message = generateManagementConnectionMessage(managementConnection, getPrincipal(subject, AuthenticatedPrincipal.class)); + } + else + { + message = "<<UNKNOWN>> "; + } + } + } + } + return message; + } + + private String generateManagementConnectionMessage(final ManagementConnectionPrincipal managementConnection, + final AuthenticatedPrincipal userPrincipal) + { + String remoteAddress = managementConnection.getRemoteAddress().toString(); + String user = userPrincipal == null ? "N/A" : userPrincipal.getName(); + return "[" + MessageFormat.format(LogSubjectFormat.MANAGEMENT_FORMAT, user, remoteAddress) + "] "; + } + + private String generateTaskMessage(final TaskPrincipal taskPrincipal) + { + return "["+taskPrincipal.getName()+"] "; + } + + protected String generateConnectionMessage(final AMQConnectionModel connection) + { + if (connection.getAuthorizedPrincipal() != null) + { + if (connection.getVirtualHostName() != null) + { + /** + * LOG FORMAT used by the AMQPConnectorActor follows + * ConnectionLogSubject.CONNECTION_FORMAT : + * con:{0}({1}@{2}/{3}) + * + * Uses a MessageFormat call to insert the required values + * according to these indices: + * + * 0 - Connection ID 1 - User ID 2 - IP 3 - Virtualhost + */ + return "[" + MessageFormat.format(CONNECTION_FORMAT, + connection.getConnectionId(), + connection.getAuthorizedPrincipal().getName(), + connection.getRemoteAddressString(), + connection.getVirtualHostName()) + + "] "; + + } + else + { + return"[" + MessageFormat.format(USER_FORMAT, + connection.getConnectionId(), + connection.getAuthorizedPrincipal().getName(), + connection.getRemoteAddressString()) + + "] "; + + } + } + else + { + return "[" + MessageFormat.format(SOCKET_FORMAT, + connection.getConnectionId(), + connection.getRemoteAddressString()) + + "] "; + } + } + + protected String generateSessionMessage(final AMQSessionModel session) + { + AMQConnectionModel connection = session.getConnectionModel(); + return "[" + MessageFormat.format(CHANNEL_FORMAT, connection == null ? -1L : connection.getConnectionId(), + (connection == null || connection.getAuthorizedPrincipal() == null) + ? "?" + : connection.getAuthorizedPrincipal().getName(), + (connection == null || connection.getRemoteAddressString() == null) + ? "?" + : connection.getRemoteAddressString(), + (connection == null || connection.getVirtualHostName() == null) + ? "?" + : connection.getVirtualHostName(), + session.getChannelId()) + + "] "; + } + + private <P extends Principal> P getPrincipal(Subject subject, Class<P> clazz) + { + if(subject != null) + { + Set<P> principals = subject.getPrincipals(clazz); + if(principals != null && !principals.isEmpty()) + { + return principals.iterator().next(); + } + } + return null; } - public abstract void rawMessage(String message, String logHierarchy); - public abstract void rawMessage(String message, Throwable throwable, String logHierarchy); } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/CompositeStartupMessageLogger.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/CompositeStartupMessageLogger.java index e0a51b3a3e..d21fd62045 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/CompositeStartupMessageLogger.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/logging/CompositeStartupMessageLogger.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.logging; -public class CompositeStartupMessageLogger extends AbstractRootMessageLogger +public class CompositeStartupMessageLogger implements RootMessageLogger { private RootMessageLogger[] _loggers; @@ -30,22 +30,50 @@ public class CompositeStartupMessageLogger extends AbstractRootMessageLogger _loggers = loggers; } + + @Override + public boolean isEnabled() + { + for(RootMessageLogger l : _loggers) + { + if(l.isEnabled()) + { + return true; + } + } + return false; + } + @Override - public void rawMessage(String message, String logHierarchy) + public boolean isMessageEnabled(final String logHierarchy) { for(RootMessageLogger l : _loggers) { - l.rawMessage(message, logHierarchy); + if(l.isMessageEnabled(logHierarchy)) + { + return true; + } } + return false; } @Override - public void rawMessage(String message, Throwable throwable, String logHierarchy) + public void message(final LogMessage message) { for(RootMessageLogger l : _loggers) { - l.rawMessage(message, throwable, logHierarchy); + l.message(message); } } + @Override + public void message(final LogSubject subject, final LogMessage message) + { + for(RootMessageLogger l : _loggers) + { + l.message(subject, message); + } + } + + } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/Log4jMessageLogger.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/Log4jMessageLogger.java index b4e9f2f333..896ca84361 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/Log4jMessageLogger.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/logging/Log4jMessageLogger.java @@ -33,15 +33,9 @@ public class Log4jMessageLogger extends AbstractRootMessageLogger { super(statusUpdatesEnabled); } - - @Override - public boolean isMessageEnabled(LogActor actor, LogSubject subject, String logHierarchy) - { - return isMessageEnabled(actor, logHierarchy); - } @Override - public boolean isMessageEnabled(LogActor actor, String logHierarchy) + public boolean isMessageEnabled(String logHierarchy) { if(isEnabled()) { @@ -55,13 +49,13 @@ public class Log4jMessageLogger extends AbstractRootMessageLogger } @Override - public void rawMessage(String message, String logHierarchy) + void rawMessage(String message, String logHierarchy) { rawMessage(message, null, logHierarchy); } @Override - public void rawMessage(String message, Throwable throwable, String logHierarchy) + void rawMessage(String message, Throwable throwable, String logHierarchy) { Logger logger = Logger.getLogger(logHierarchy); logger.info(message, throwable); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/LogActor.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/LogActor.java deleted file mode 100644 index 18f03c2716..0000000000 --- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/LogActor.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.server.logging; - -/** - * LogActor the entity that is stored as in a ThreadLocal and used to perform logging. - * - * The actor is responsible for formatting its display name for the log entry. - * - * The actor performs the requested logging. - */ -public interface LogActor -{ - /** - * Logs the specified LogMessage about the LogSubject - * - * Currently logging has a global setting however this will later be revised and - * as such the LogActor will need to take into consideration any new configuration - * as a means of enabling the logging of LogActors and LogSubjects. - * - * @param subject The subject that is being logged - * @param message The message to log - */ - public void message(LogSubject subject, LogMessage message); - - /** - * Logs the specified LogMessage against this actor - * - * Currently logging has a global setting however this will later be revised and - * as such the LogActor will need to take into consideration any new configuration - * as a means of enabling the logging of LogActors and LogSubjects. - * - * @param message The message to log - */ - public void message(LogMessage message); - - /** - * - * @return the RootMessageLogger that is currently in use by this LogActor. - */ - RootMessageLogger getRootMessageLogger(); - - /** - * - * @return the String representing this LogActor - */ - public String getLogMessage(); -}
\ No newline at end of file diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/NullRootMessageLogger.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/NullRootMessageLogger.java index 64e16d9ee8..c7cf609503 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/NullRootMessageLogger.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/logging/NullRootMessageLogger.java @@ -24,13 +24,7 @@ public class NullRootMessageLogger extends AbstractRootMessageLogger { @Override - public boolean isMessageEnabled(LogActor actor, LogSubject subject, String logHierarchy) - { - return false; - } - - @Override - public boolean isMessageEnabled(LogActor actor, String logHierarchy) + public boolean isMessageEnabled(String logHierarchy) { return false; } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java index c31e528c55..92318ed8b0 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java @@ -33,43 +33,20 @@ public interface RootMessageLogger * @return boolean true if enabled. */ boolean isEnabled(); - - /** - * Determine if the LogSubject and the LogActor should be - * generating log messages. - * @param actor The actor requesting the logging - * @param subject The subject of this log request - * @param logHierarchy The log hierarchy for this request - * - * @return boolean true if the message should be logged. - */ - boolean isMessageEnabled(LogActor actor, LogSubject subject, String logHierarchy); /** * Determine if the LogActor should be generating log messages. * - * @param actor The actor requesting the logging * @param logHierarchy The log hierarchy for this request * * @return boolean true if the message should be logged. */ - boolean isMessageEnabled(LogActor actor, String logHierarchy); + boolean isMessageEnabled(String logHierarchy); + + void message(LogMessage message); + + void message(LogSubject subject, LogMessage message); + - /** - * Log the raw message to the configured logger. - * - * @param message The message to log - * @param logHierarchy The log hierarchy for this request - */ - public void rawMessage(String message, String logHierarchy); - /** - * Log the raw message to the configured logger. - * Along with a formatted stack trace from the Throwable. - * - * @param message The message to log - * @param throwable Optional Throwable that should provide stack trace - * @param logHierarchy The log hierarchy for this request - */ - void rawMessage(String message, Throwable throwable, String logHierarchy); }
\ No newline at end of file diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/SystemLog.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/SystemLog.java new file mode 100644 index 0000000000..b722c5360b --- /dev/null +++ b/java/broker-core/src/main/java/org/apache/qpid/server/logging/SystemLog.java @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging; + +public class SystemLog +{ + private static RootMessageLogger _rootMessageLogger = new NullRootMessageLogger(); + + /** + * Logs the specified LogMessage about the LogSubject + * + * @param subject The subject that is being logged + * @param message The message to log + */ + public static void message(LogSubject subject, LogMessage message) + { + getRootMessageLogger().message(subject, message); + } + + /** + * Logs the specified LogMessage + * + * @param message The message to log + */ + public static void message(LogMessage message) + { + getRootMessageLogger().message((message)); + } + + private synchronized static RootMessageLogger getRootMessageLogger() + { + final RootMessageLogger rootMessageLogger = _rootMessageLogger; + return rootMessageLogger == null ? new NullRootMessageLogger() : rootMessageLogger; + } + + public static synchronized void setRootMessageLogger(final RootMessageLogger rootMessageLogger) + { + _rootMessageLogger = rootMessageLogger; + } +} diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/SystemOutMessageLogger.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/SystemOutMessageLogger.java index 63c1bcc712..297085427d 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/SystemOutMessageLogger.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/logging/SystemOutMessageLogger.java @@ -23,14 +23,9 @@ package org.apache.qpid.server.logging; public class SystemOutMessageLogger extends AbstractRootMessageLogger { - @Override - public boolean isMessageEnabled(LogActor actor, LogSubject subject, String logHierarchy) - { - return true; - } @Override - public boolean isMessageEnabled(LogActor actor, String logHierarchy) + public boolean isMessageEnabled(String logHierarchy) { return true; } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java deleted file mode 100644 index 9228a2674d..0000000000 --- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.server.logging.actors; - -import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.logging.RootMessageLogger; -import org.apache.qpid.server.logging.subjects.ChannelLogSubject; - -/** - * An AMQPChannelActor represents a connection through the AMQP port with an - * associated Channel. - * - * <p/> - * This is responsible for correctly formatting the LogActor String in the log - * <p/> - * [con:1(user@127.0.0.1/)/ch:1] - * <p/> - * To do this it requires access to the IO Layers as well as a Channel - */ -public class AMQPChannelActor extends AbstractActor -{ - private final ChannelLogSubject _logString; - - /** - * Create a new ChannelActor - * - * @param channel The Channel for this LogActor - * @param rootLogger The root Logger that this LogActor should use - */ - public AMQPChannelActor(AMQSessionModel channel, RootMessageLogger rootLogger) - { - super(rootLogger); - - - _logString = new ChannelLogSubject(channel); - } - - public String getLogMessage() - { - return _logString.toLogString(); - } -} - diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java deleted file mode 100644 index d4213b2876..0000000000 --- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.server.logging.actors; - -import org.apache.qpid.server.logging.RootMessageLogger; -import org.apache.qpid.server.logging.subjects.ConnectionLogSubject; -import org.apache.qpid.server.protocol.AMQConnectionModel; - - -/** - * An AMQPConnectionActor represents a connection through the AMQP port. - * <p/> - * This is responsible for correctly formatting the LogActor String in the log - * <p/> - * [ con:1(user@127.0.0.1/) ] - * <p/> - * To do this it requires access to the IO Layers. - */ -public class AMQPConnectionActor extends AbstractActor -{ - private ConnectionLogSubject _logSubject; - - public AMQPConnectionActor(AMQConnectionModel session, RootMessageLogger rootLogger) - { - super(rootLogger); - - _logSubject = new ConnectionLogSubject(session); - } - - public String getLogMessage() - { - return _logSubject.toLogString(); - } -} - diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java deleted file mode 100644 index e8c6c9c323..0000000000 --- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.logging.actors; - -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.LogMessage; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.RootMessageLogger; - -public abstract class AbstractActor implements LogActor -{ - private final String _msgPrefix = System.getProperty("qpid.logging.prefix",""); - - private RootMessageLogger _rootLogger; - - public AbstractActor(RootMessageLogger rootLogger) - { - if(rootLogger == null) - { - throw new NullPointerException("RootMessageLogger cannot be null"); - } - _rootLogger = rootLogger; - } - - public void message(LogSubject subject, LogMessage message) - { - if (_rootLogger.isMessageEnabled(this, subject, message.getLogHierarchy())) - { - _rootLogger.rawMessage(_msgPrefix + getLogMessage() + subject.toLogString() + message, message.getLogHierarchy()); - } - } - - public void message(LogMessage message) - { - if (_rootLogger.isMessageEnabled(this, message.getLogHierarchy())) - { - _rootLogger.rawMessage(_msgPrefix + getLogMessage() + message, message.getLogHierarchy()); - } - } - - public RootMessageLogger getRootMessageLogger() - { - return _rootLogger; - } - - public String toString() - { - return getLogMessage(); - } - - abstract public String getLogMessage(); - -} diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/AbstractManagementActor.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/AbstractManagementActor.java deleted file mode 100644 index 8cf121b3d9..0000000000 --- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/AbstractManagementActor.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.logging.actors; - -import java.security.AccessController; - -import javax.security.auth.Subject; - -import org.apache.qpid.server.logging.RootMessageLogger; -import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; - -public abstract class AbstractManagementActor extends AbstractActor -{ - /** - * Holds the principal name to display when principal subject is not available. - * <p> - * This is useful for cases when users invoke JMX operation over JConsole - * attached to the local JVM. - */ - protected static final String UNKNOWN_PRINCIPAL = "N/A"; - - /** used when the principal name cannot be discovered from the Subject */ - private final String _fallbackPrincipalName; - - public AbstractManagementActor(RootMessageLogger rootLogger, String fallbackPrincipalName) - { - super(rootLogger); - _fallbackPrincipalName = fallbackPrincipalName; - } - - /** - * Returns current {@link AuthenticatedPrincipal} name or {@link #_fallbackPrincipalName} - * if it can't be found. - */ - protected String getPrincipalName() - { - String identity = _fallbackPrincipalName; - - final Subject subject = Subject.getSubject(AccessController.getContext()); - if (subject != null) - { - AuthenticatedPrincipal authenticatedPrincipal = AuthenticatedPrincipal.getOptionalAuthenticatedPrincipalFromSubject(subject); - if(authenticatedPrincipal != null) - { - identity = authenticatedPrincipal.getName(); - } - } - return identity; - } -} diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java deleted file mode 100644 index 91d9ef7dbc..0000000000 --- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.logging.actors; - -import java.util.EmptyStackException; -import java.util.Stack; - -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.LogMessage; -import org.apache.qpid.server.logging.LogSubject; - -/** - * The CurrentActor is a ThreadLocal wrapper that allows threads in the broker - * to retrieve an actor to perform logging. This approach is used so for two - * reasons: - * 1) We do not have to pass a logging actor around the system - * 2) We can set new actors at the point we have enough information. i.e. - * - Set a low level ConnectionActor when processing bytes from the wire. - * - Set a ChannelActor when we are processing the frame - * <p/> - * The code performing the logging need not worry about what type of actor is - * currently set so can perform its logging. The resulting log entry though will - * contain customised details from the the currently set Actor. - * <p/> - * The Actor model also allows the pre-creation of fixed messages so the - * performance impact of the additional logging data is minimised. - * <p/> - * This class does not perform any checks to ensure that there is an Actor set - * when calling remove or get. As a result the application developer must ensure - * that they have called set before they attempt to use the actor via get or - * remove the set actor. - * <p/> - * The checking of the return via get should not be done as the logging is - * desired. It is preferable to cause the NullPointerException to highlight the - * programming error rather than miss a log message. - * <p/> - * The same is true for the remove. A NPE will occur if no set has been called - * highlighting the programming error. - */ -public class CurrentActor -{ - /** The ThreadLocal variable with initialiser */ - private static final ThreadLocal<Stack<LogActor>> _currentActor = new ThreadLocal<Stack<LogActor>>() - { - // Initialise the CurrentActor to be an empty List - protected Stack<LogActor> initialValue() - { - return new Stack<LogActor>(); - } - }; - - private static LogActor _defaultActor; - - private CurrentActor() - { - } - - /** - * Set a new {@link LogActor} to be the Current Actor - * <p/> - * This pushes the Actor in to the LIFO Queue - * - * @param actor The new LogActor - */ - public static void set(LogActor actor) - { - Stack<LogActor> stack = _currentActor.get(); - stack.push(actor); - } - - /** - * Remove all {@link LogActor}s - */ - public static void removeAll() - { - Stack<LogActor> stack = _currentActor.get(); - stack.clear(); - } - - /** - * Remove the current {@link LogActor}. - * <p/> - * Calling remove without calling set will result in an EmptyStackException. - */ - public static void remove() - { - Stack<LogActor> stack = _currentActor.get(); - stack.pop(); - - if (stack.isEmpty()) - { - _currentActor.remove(); - } - } - - /** - * Return the current head of the list of {@link LogActor}s. - * - * @return Current LogActor - */ - public static LogActor get() - { - try - { - return _currentActor.get().peek(); - } - catch (EmptyStackException ese) - { - return _defaultActor; - } - } - - public static void setDefault(LogActor defaultActor) - { - _defaultActor = defaultActor; - } - - public static void message(LogSubject subject, LogMessage message) - { - get().message(subject, message); - } - - public static void message(LogMessage message) - { - get().message(message); - } -} diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/GenericActor.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/GenericActor.java deleted file mode 100644 index 4cf2bd4508..0000000000 --- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/GenericActor.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.server.logging.actors; - -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.RootMessageLogger; - -public class GenericActor extends AbstractActor -{ - - private static RootMessageLogger _defaultMessageLogger; - - private LogSubject _logSubject; - - public static RootMessageLogger getDefaultMessageLogger() - { - return _defaultMessageLogger; - } - - public static void setDefaultMessageLogger(RootMessageLogger defaultMessageLogger) - { - _defaultMessageLogger = defaultMessageLogger; - } - - public GenericActor(final String logSubject) - { - this(new LogSubject() - { - @Override - public String toLogString() - { - return logSubject; - } - }); - } - - - public GenericActor(LogSubject logSubject) - { - this(logSubject, CurrentActor.get().getRootMessageLogger()); - } - - public GenericActor(LogSubject logSubject, RootMessageLogger rootLogger) - { - super(rootLogger); - _logSubject = logSubject; - } - - public String getLogMessage() - { - return _logSubject.toLogString(); - } - - public LogSubject getLogSubject() - { - return _logSubject; - } - - public static LogActor getInstance(final String logMessage, RootMessageLogger rootLogger) - { - return new GenericActor(new LogSubject() - { - public String toLogString() - { - return logMessage; - } - - }, rootLogger); - } - - public static LogActor getInstance(final String subjectMessage) - { - return new GenericActor(new LogSubject() - { - public String toLogString() - { - return "[" + subjectMessage + "] "; - } - - }, _defaultMessageLogger); - } - - public static LogActor getInstance(LogSubject logSubject) - { - return new GenericActor(logSubject, _defaultMessageLogger); - } -} diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/HttpManagementActor.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/HttpManagementActor.java deleted file mode 100644 index 9b445c2bd9..0000000000 --- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/HttpManagementActor.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.logging.actors; - -import java.text.MessageFormat; - -import org.apache.qpid.server.logging.RootMessageLogger; -import org.apache.qpid.server.logging.subjects.LogSubjectFormat; - -/** - * HttpManagement actor to use in {@link AbstractServlet} to log all http management operational logging. - * - * An instance is required per http Session. - */ -public class HttpManagementActor extends AbstractManagementActor -{ - private String _cachedLogString; - private String _lastPrincipalName; - private String _address; - - public HttpManagementActor(RootMessageLogger rootLogger, String ip, int port) - { - super(rootLogger, UNKNOWN_PRINCIPAL); - _address = ip + ":" + port; - } - - private synchronized String getAndCacheLogString() - { - String principalName = getPrincipalName(); - - if(!principalName.equals(_lastPrincipalName)) - { - _lastPrincipalName = principalName; - _cachedLogString = "[" + MessageFormat.format(LogSubjectFormat.MANAGEMENT_FORMAT, principalName, _address) + "] "; - } - - return _cachedLogString; - } - - public String getLogMessage() - { - return getAndCacheLogString(); - } -} diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java deleted file mode 100644 index ba5ea47fc1..0000000000 --- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.logging.actors; - -import org.apache.qpid.server.logging.RootMessageLogger; -import org.apache.qpid.server.logging.subjects.LogSubjectFormat; - -import java.text.MessageFormat; - -/** - * Management actor to use in {@link MBeanInvocationHandlerImpl} to log all management operational logging. - */ -public class ManagementActor extends AbstractManagementActor -{ - private String _lastThreadName = null; - - /** - * The logString to be used for logging - */ - private String _logStringContainingPrincipal; - - /** @param rootLogger The RootLogger to use for this Actor */ - public ManagementActor(RootMessageLogger rootLogger) - { - super(rootLogger, UNKNOWN_PRINCIPAL); - } - - public ManagementActor(RootMessageLogger rootLogger, String principalName) - { - super(rootLogger, principalName); - } - - private synchronized String getAndCacheLogString() - { - String currentName = Thread.currentThread().getName(); - - String actor; - String logString = _logStringContainingPrincipal; - - // Record the last thread name so we don't have to recreate the log string - if (_logStringContainingPrincipal == null || !currentName.equals(_lastThreadName)) - { - _lastThreadName = currentName; - String principalName = getPrincipalName(); - - // Management Thread names have this format. - // RMI TCP Connection(2)-169.24.29.116 - // This is true for both LocalAPI and JMX Connections - // However to be defensive lets test. - String[] split = currentName.split("\\("); - if (split.length == 2) - { - String ip = currentName.split("-")[1]; - actor = MessageFormat.format(LogSubjectFormat.MANAGEMENT_FORMAT, principalName, ip); - } - else - { - // This is a precautionary path as it is not expected to occur - // however rather than adjusting the thread name of the two - // tests that will use this it is safer all round to do this. - // it is also currently used by tests : - // AMQBrokerManagerMBeanTest - // ExchangeMBeanTest - actor = currentName; - } - - logString = "[" + actor + "] "; - if(principalName != UNKNOWN_PRINCIPAL ) - { - _logStringContainingPrincipal = logString; - } - - } - return logString; - } - - public String getLogMessage() - { - return getAndCacheLogString(); - } -} diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/QueueActor.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/QueueActor.java deleted file mode 100644 index 4b17e8c0e6..0000000000 --- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/QueueActor.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.logging.actors; - -import org.apache.qpid.server.logging.RootMessageLogger; -import org.apache.qpid.server.logging.subjects.QueueLogSubject; -import org.apache.qpid.server.queue.AMQQueue; - -/** - * This Actor is used when while the queue is performing an asynchronous process - * of its queue. - */ -public class QueueActor extends AbstractActor -{ - private QueueLogSubject _logSubject; - - /** - * Create an QueueLogSubject that Logs in the following format. - * - * @param queue The queue that this Actor is working for - * @param rootLogger the Root logger to use. - */ - public QueueActor(AMQQueue queue, RootMessageLogger rootLogger) - { - super(rootLogger); - - _logSubject = new QueueLogSubject(queue); - } - - public String getLogMessage() - { - return _logSubject.toLogString(); - } -} - diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java index f056937b1b..d63a765144 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java @@ -123,6 +123,12 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X> //children Collection<? extends Binding> getBindings(); + // TODO - Undo this commented out line when we stop supporting 1.6 for compilation + // In 1.6 this causes the build to break at AbstractQueue because the 1.6 compiler can't work out that + // the definition in terms of the Consumer implementation meets both this, and the contract for AMQQueue + + // Collection<? extends Consumer> getConsumers(); + //operations void visit(QueueEntryVisitor visitor); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AbstractConfiguredObject.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AbstractConfiguredObject.java index 66dcd1ab3e..eda61f92b0 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AbstractConfiguredObject.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AbstractConfiguredObject.java @@ -357,7 +357,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im { Object value = attr.getValue((X)this); if(value != null && attr.getAnnotation().secure() && - !SecurityManager.SYSTEM.equals(Subject.getSubject(AccessController.getContext()))) + !SecurityManager.isSystemProcess()) { return SECURE_VALUES.get(value.getClass()); } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java index 097c179514..1fcf169dc8 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java @@ -34,7 +34,7 @@ import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.configuration.IllegalConfigurationException; -import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.logging.messages.BrokerMessages; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.KeyStore; @@ -99,7 +99,7 @@ public class AmqpPortAdapter extends PortAdapter<AmqpPortAdapter> _transport.start(); for(Transport transport : getTransports()) { - CurrentActor.get().message(BrokerMessages.LISTENING(String.valueOf(transport), getPort())); + SystemLog.message(BrokerMessages.LISTENING(String.valueOf(transport), getPort())); } } @@ -110,7 +110,7 @@ public class AmqpPortAdapter extends PortAdapter<AmqpPortAdapter> { for(Transport transport : getTransports()) { - CurrentActor.get().message(BrokerMessages.SHUTTING_DOWN(String.valueOf(transport), getPort())); + SystemLog.message(BrokerMessages.SHUTTING_DOWN(String.valueOf(transport), getPort())); } _transport.close(); } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java index 01798ad4ac..ed5d371079 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java @@ -41,8 +41,7 @@ import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.logging.LogRecorder; import org.apache.qpid.server.logging.RootMessageLogger; -import org.apache.qpid.server.logging.actors.BrokerActor; -import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.logging.messages.BrokerMessages; import org.apache.qpid.server.model.*; import org.apache.qpid.server.plugin.PreferencesProviderFactory; @@ -1046,22 +1045,16 @@ public class BrokerAdapter<X extends Broker<X>> extends AbstractConfiguredObject changeState(_authenticationProviders, currentState, State.ACTIVE, false); changeState(_accessControlProviders, currentState, State.ACTIVE, false); - CurrentActor.set(new BrokerActor(getRootMessageLogger())); - try - { - changeState(_vhostAdapters, currentState, State.ACTIVE, false); - } - finally - { - CurrentActor.remove(); - } + + changeState(_vhostAdapters, currentState, State.ACTIVE, false); changeState(_portAdapters, currentState,State.ACTIVE, false); changeState(_plugins, currentState,State.ACTIVE, false); if (isManagementMode()) { - CurrentActor.get().message(BrokerMessages.MANAGEMENT_MODE(BrokerOptions.MANAGEMENT_MODE_USER_NAME, _brokerOptions.getManagementModePassword())); + SystemLog.message(BrokerMessages.MANAGEMENT_MODE(BrokerOptions.MANAGEMENT_MODE_USER_NAME, + _brokerOptions.getManagementModePassword())); } return true; } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index 01b220fd79..2b86f257b5 100755 --- a/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -30,17 +30,16 @@ import java.util.Set; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLPeerUnverifiedException; -import javax.net.ssl.SSLSocket; +import javax.security.auth.Subject; import org.apache.log4j.Logger; import org.apache.qpid.protocol.ServerProtocolEngine; -import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.plugin.ProtocolEngineCreator; -import org.apache.qpid.transport.Binary; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.security.SSLStatus; @@ -144,6 +143,12 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine return _delegate.getConnectionId(); } + @Override + public Subject getSubject() + { + return _delegate.getSubject(); + } + private static final int MINIMUM_REQUIRED_HEADER_BYTES = 8; public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) @@ -243,6 +248,12 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine { return _id; } + + @Override + public Subject getSubject() + { + return new Subject(); + } } private class SelfDelegateProtocolEngine implements ServerProtocolEngine @@ -384,6 +395,12 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine return _id; } + @Override + public Subject getSubject() + { + return _delegate.getSubject(); + } + public void exception(Throwable t) { _logger.error("Error establishing session", t); @@ -423,7 +440,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine public void readerIdle() { - CurrentActor.get().message(ConnectionMessages.IDLE_CLOSE()); + SystemLog.message(ConnectionMessages.IDLE_CLOSE()); _network.close(); } @@ -547,6 +564,12 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine } @Override + public Subject getSubject() + { + return _decryptEngine.getSubject(); + } + + @Override public long getLastReadTime() { return _lastReadTime; diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 11eb0b8a19..7ed54499ec 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -36,6 +36,7 @@ import org.apache.qpid.server.binding.BindingImpl; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.connection.SessionPrincipal; import org.apache.qpid.server.exchange.ExchangeImpl; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.model.*; import org.apache.qpid.server.model.Queue; @@ -44,10 +45,7 @@ import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.filter.FilterManager; -import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.QueueActor; import org.apache.qpid.server.logging.messages.QueueMessages; import org.apache.qpid.server.logging.subjects.QueueLogSubject; import org.apache.qpid.server.message.InstanceProperties; @@ -180,7 +178,6 @@ public abstract class AbstractQueue private LogSubject _logSubject; - private LogActor _logActor; private boolean _noLocal; @@ -245,7 +242,6 @@ public abstract class AbstractQueue _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService(); _logSubject = new QueueLogSubject(this); - _logActor = new QueueActor(this, CurrentActor.get().getRootMessageLogger()); virtualHost.getSecurityManager().authoriseCreateQueue(this); @@ -411,14 +407,14 @@ public abstract class AbstractQueue // Log the creation of this Queue. // The priorities display is toggled on if we set priorities > 0 - CurrentActor.get().message(_logSubject, - QueueMessages.CREATED(ownerString, - _entries.getPriorities(), - ownerString != null , - _lifetimePolicy != LifetimePolicy.PERMANENT, - durable, - !durable, - _entries.getPriorities() > 0)); + SystemLog.message(_logSubject, + QueueMessages.CREATED(ownerString, + _entries.getPriorities(), + ownerString != null, + _lifetimePolicy != LifetimePolicy.PERMANENT, + durable, + !durable, + _entries.getPriorities() > 0)); if(attributes != null && attributes.containsKey(Queue.MESSAGE_GROUP_KEY)) { @@ -1684,7 +1680,7 @@ public abstract class AbstractQueue stop(); //Log Queue Deletion - CurrentActor.get().message(_logSubject, QueueMessages.DELETED()); + SystemLog.message(_logSubject, QueueMessages.DELETED()); } return getQueueDepthMessages(); @@ -1707,7 +1703,7 @@ public abstract class AbstractQueue { _overfull.set(true); //Overfull log message - _logActor.message(_logSubject, QueueMessages.OVERFULL(_atomicQueueSize.get(), _capacity)); + SystemLog.message(_logSubject, QueueMessages.OVERFULL(_atomicQueueSize.get(), _capacity)); _blockedChannels.add(channel); @@ -1717,13 +1713,12 @@ public abstract class AbstractQueue { //Underfull log message - _logActor.message(_logSubject, QueueMessages.UNDERFULL(_atomicQueueSize.get(), _flowResumeCapacity)); + SystemLog.message(_logSubject, QueueMessages.UNDERFULL(_atomicQueueSize.get(), _flowResumeCapacity)); channel.unblock(this); _blockedChannels.remove(channel); } - } @@ -1739,7 +1734,7 @@ public abstract class AbstractQueue { if(_overfull.compareAndSet(true,false)) {//Underfull log message - _logActor.message(_logSubject, QueueMessages.UNDERFULL(_atomicQueueSize.get(), _flowResumeCapacity)); + SystemLog.message(_logSubject, QueueMessages.UNDERFULL(_atomicQueueSize.get(), _flowResumeCapacity)); } for(final AMQSessionModel blockedChannel : _blockedChannels) @@ -2401,11 +2396,6 @@ public abstract class AbstractQueue _unackedMsgBytes.addAndGet(entry.getSize()); } - public LogActor getLogActor() - { - return _logActor; - } - @Override public int getMaximumDeliveryAttempts() { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java index 8c03e8f882..ca6be01136 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java @@ -24,17 +24,14 @@ import org.apache.log4j.Logger; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.JMSSelectorFilter; import org.apache.qpid.server.filter.MessageFilter; -import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.GenericActor; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.logging.messages.SubscriptionMessages; import org.apache.qpid.server.logging.subjects.QueueLogSubject; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.model.Consumer; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.ManagedAttributeField; import org.apache.qpid.server.model.State; @@ -56,7 +53,7 @@ import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPT class QueueConsumerImpl extends AbstractConfiguredObject<QueueConsumerImpl> - implements QueueConsumer<QueueConsumerImpl> + implements QueueConsumer<QueueConsumerImpl>, LogSubject { @@ -76,9 +73,6 @@ class QueueConsumerImpl private final Class<? extends ServerMessage> _messageClass; private final Object _sessionReference; private final AbstractQueue _queue; - private GenericActor _logActor = new GenericActor("[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getConsumerNumber()) - + "(UNKNOWN)" - + "] "); static final EnumMap<ConsumerTarget.State, State> STATE_MAP = new EnumMap<ConsumerTarget.State, State>(ConsumerTarget.State.class); @@ -97,7 +91,7 @@ class QueueConsumerImpl { public void stateChanged(QueueConsumerImpl sub, State oldState, State newState) { - CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString())); + SystemLog.message(SubscriptionMessages.STATE(newState.toString())); } }; @ManagedAttributeField @@ -184,12 +178,12 @@ class QueueConsumerImpl { if(_targetClosed.compareAndSet(false,true)) { - CurrentActor.get().message(getLogSubject(), SubscriptionMessages.CLOSE()); + SystemLog.message(getLogSubject(), SubscriptionMessages.CLOSE()); } } else { - CurrentActor.get().message(getLogSubject(),SubscriptionMessages.STATE(newState.toString())); + SystemLog.message(getLogSubject(), SubscriptionMessages.STATE(newState.toString())); } } @@ -302,36 +296,17 @@ class QueueConsumerImpl private void setupLogging() { - String queueString = new QueueLogSubject(_queue).toLogString(); - - _logActor = new GenericActor("[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getConsumerNumber()) - + "(" - // queueString is [vh(/{0})/qu({1}) ] so need to trim - // ^ ^^ - + queueString.substring(1,queueString.length() - 3) - + ")" - + "] "); - - - if (CurrentActor.get().getRootMessageLogger().isMessageEnabled(_logActor, _logActor.getLogSubject(), SubscriptionMessages.CREATE_LOG_HIERARCHY)) - { - final String filterLogString = getFilterLogString(); - CurrentActor.get().message(_logActor.getLogSubject(), SubscriptionMessages.CREATE(filterLogString, _queue.isDurable() && _exclusive, - filterLogString.length() > 0)); - } + final String filterLogString = getFilterLogString(); + SystemLog.message(this, + SubscriptionMessages.CREATE(filterLogString, _queue.isDurable() && _exclusive, + filterLogString.length() > 0)); } protected final LogSubject getLogSubject() { - return _logActor.getLogSubject(); + return this; } - final LogActor getLogActor() - { - return _logActor; - } - - @Override public final void flush() { @@ -586,4 +561,30 @@ class QueueConsumerImpl { return getAttributeNames(getClass()); } + + @Override + public String toLogString() + { + String logString; + if(_queue == null) + { + logString = "[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getConsumerNumber()) + + "(UNKNOWN)" + + "] "; + } + else + { + String queueString = new QueueLogSubject(_queue).toLogString(); + logString = "[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getConsumerNumber()) + + "(" + // queueString is [vh(/{0})/qu({1}) ] so need to trim + // ^ ^^ + + queueString.substring(1,queueString.length() - 3) + + ")" + + "] "; + + } + + return logString; + } } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java index d9f3297acf..4d21e9d23a 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java @@ -20,16 +20,20 @@ */ package org.apache.qpid.server.queue; +import java.security.PrivilegedAction; +import java.util.Collections; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.log4j.Logger; -import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.security.auth.TaskPrincipal; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.transport.TransportException; +import javax.security.auth.Subject; + /** * QueueRunners are Runnables used to process a queue when requiring * asynchronous message delivery to consumers, which is necessary @@ -62,60 +66,68 @@ public class QueueRunner implements Runnable { if(_scheduled.compareAndSet(SCHEDULED,RUNNING)) { - long runAgain = Long.MIN_VALUE; - _stateChange.set(false); - try - { - CurrentActor.set(_queue.getLogActor()); - - runAgain = _queue.processQueue(this); - } - catch (final ConnectionScopedRuntimeException e) - { - final String errorMessage = "Problem during asynchronous delivery by " + toString(); - if(_logger.isDebugEnabled()) - { - _logger.debug(errorMessage, e); - } - else - { - _logger.info(errorMessage + ' ' + e.getMessage()); - } - } - catch (final TransportException transe) + Subject subject = new Subject(false, org.apache.qpid.server.security.SecurityManager.SYSTEM.getPrincipals(), Collections + .emptySet(), Collections.emptySet()); + subject.getPrincipals().add(new TaskPrincipal("Queue Delivery")); + Subject.doAs(subject, new PrivilegedAction<Object>() { - final String errorMessage = "Problem during asynchronous delivery by " + toString(); - if(_logger.isDebugEnabled()) - { - _logger.debug(errorMessage, transe); - } - else + @Override + public Object run() { - _logger.info(errorMessage + ' ' + transe.getMessage()); - } - } - finally - { - _scheduled.compareAndSet(RUNNING, IDLE); - final long stateChangeCount = _queue.getStateChangeCount(); - _lastRunAgain.set(runAgain); - _lastRunTime.set(System.nanoTime()); - if(runAgain == 0L || runAgain != stateChangeCount || _stateChange.compareAndSet(true,false)) - { - if(_scheduled.compareAndSet(IDLE, SCHEDULED)) + long runAgain = Long.MIN_VALUE; + _stateChange.set(false); + try + { + runAgain = _queue.processQueue(QueueRunner.this); + } + catch (final ConnectionScopedRuntimeException e) + { + final String errorMessage = "Problem during asynchronous delivery by " + toString(); + if(_logger.isDebugEnabled()) + { + _logger.debug(errorMessage, e); + } + else + { + _logger.info(errorMessage + ' ' + e.getMessage()); + } + } + catch (final TransportException transe) + { + final String errorMessage = "Problem during asynchronous delivery by " + toString(); + if(_logger.isDebugEnabled()) + { + _logger.debug(errorMessage, transe); + } + else + { + _logger.info(errorMessage + ' ' + transe.getMessage()); + } + } + finally { - _queue.execute(this); + _scheduled.compareAndSet(RUNNING, IDLE); + final long stateChangeCount = _queue.getStateChangeCount(); + _lastRunAgain.set(runAgain); + _lastRunTime.set(System.nanoTime()); + if(runAgain == 0L || runAgain != stateChangeCount || _stateChange.compareAndSet(true,false)) + { + if(_scheduled.compareAndSet(IDLE, SCHEDULED)) + { + _queue.execute(QueueRunner.this); + } + } } + return null; } - CurrentActor.remove(); - } + }); } } public String toString() { - return "QueueRunner-" + _queue.getLogActor(); + return "QueueRunner-" + _queue.getLogSubject(); } public void execute(Executor executor) diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java index 9ef32e61e2..610c5f8202 100755 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java @@ -23,9 +23,13 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; -import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.security.auth.TaskPrincipal; import org.apache.qpid.transport.TransportException; +import javax.security.auth.Subject; +import java.security.PrivilegedAction; +import java.util.Collections; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -58,37 +62,46 @@ class SubFlushRunner implements Runnable { if(_scheduled.compareAndSet(SCHEDULED, RUNNING)) { - boolean complete = false; - _stateChange.set(false); - try + Subject subject = new Subject(false, SecurityManager.SYSTEM.getPrincipals(), Collections.emptySet(), Collections.emptySet()); + subject.getPrincipals().add(new TaskPrincipal("Sub. Delivery")); + Subject.doAs(subject, new PrivilegedAction<Object>() { - CurrentActor.set(_sub.getLogActor()); - complete = getQueue().flushConsumer(_sub, ITERATIONS); - } - catch (final TransportException transe) - { - final String errorMessage = "Problem during asynchronous delivery by " + toString(); - if(_logger.isDebugEnabled()) - { - _logger.debug(errorMessage, transe); - } - else - { - _logger.info(errorMessage + ' ' + transe.getMessage()); - } - } - finally - { - CurrentActor.remove(); - _scheduled.compareAndSet(RUNNING, IDLE); - if ((!complete || _stateChange.compareAndSet(true,false))&& !_sub.isSuspended()) + @Override + public Object run() { - if(_scheduled.compareAndSet(IDLE,SCHEDULED)) + boolean complete = false; + _stateChange.set(false); + try { - getQueue().execute(this); + complete = getQueue().flushConsumer(_sub, ITERATIONS); } + catch (final TransportException transe) + { + final String errorMessage = "Problem during asynchronous delivery by " + toString(); + if(_logger.isDebugEnabled()) + { + _logger.debug(errorMessage, transe); + } + else + { + _logger.info(errorMessage + ' ' + transe.getMessage()); + } + } + finally + { + _scheduled.compareAndSet(RUNNING, IDLE); + if ((!complete || _stateChange.compareAndSet(true,false))&& !_sub.isSuspended()) + { + if(_scheduled.compareAndSet(IDLE,SCHEDULED)) + { + getQueue().execute(SubFlushRunner.this); + } + } + } + return null; } - } + }); + } } @@ -99,7 +112,7 @@ class SubFlushRunner implements Runnable public String toString() { - return "SubFlushRunner-" + _sub.getLogActor(); + return "SubFlushRunner-" + _sub.toLogString(); } public void execute(Executor executor) diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/java/broker-core/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java index 429f1b3fa1..0dd241906b 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java @@ -20,7 +20,9 @@ */ package org.apache.qpid.server.registry; +import java.security.PrivilegedAction; import java.util.Collection; +import java.util.Collections; import java.util.Timer; import java.util.TimerTask; @@ -34,28 +36,23 @@ import org.apache.qpid.server.configuration.ConfiguredObjectRecoverer; import org.apache.qpid.server.configuration.RecovererProvider; import org.apache.qpid.server.configuration.startup.DefaultRecovererProvider; import org.apache.qpid.server.configuration.store.StoreConfigurationChangeListener; -import org.apache.qpid.server.logging.CompositeStartupMessageLogger; -import org.apache.qpid.server.logging.Log4jMessageLogger; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.LogRecorder; -import org.apache.qpid.server.logging.RootMessageLogger; -import org.apache.qpid.server.logging.SystemOutMessageLogger; -import org.apache.qpid.server.logging.actors.AbstractActor; -import org.apache.qpid.server.logging.actors.BrokerActor; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.GenericActor; +import org.apache.qpid.server.logging.*; import org.apache.qpid.server.logging.messages.BrokerMessages; import org.apache.qpid.server.logging.messages.VirtualHostMessages; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.State; import org.apache.qpid.server.configuration.updater.TaskExecutor; +import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.security.auth.TaskPrincipal; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.qpid.util.SystemUtils; +import javax.security.auth.Subject; + /** * An abstract application registry that provides access to configuration information and handles the @@ -103,38 +100,28 @@ public class ApplicationRegistry implements IApplicationRegistry //Create the composite (log4j+SystemOut MessageLogger to be used during startup RootMessageLogger[] messageLoggers = {new SystemOutMessageLogger(), _rootMessageLogger}; CompositeStartupMessageLogger startupMessageLogger = new CompositeStartupMessageLogger(messageLoggers); + SystemLog.setRootMessageLogger(startupMessageLogger); - BrokerActor actor = new BrokerActor(startupMessageLogger); - CurrentActor.set(actor); - CurrentActor.setDefault(actor); - GenericActor.setDefaultMessageLogger(_rootMessageLogger); - try - { - logStartupMessages(CurrentActor.get()); + logStartupMessages(); - _taskExecutor = new TaskExecutor(); - _taskExecutor.start(); + _taskExecutor = new TaskExecutor(); + _taskExecutor.start(); - StoreConfigurationChangeListener storeChangeListener = new StoreConfigurationChangeListener(_store); - RecovererProvider provider = new DefaultRecovererProvider((StatisticsGatherer)this, _virtualHostRegistry, _logRecorder, _rootMessageLogger, _taskExecutor, brokerOptions, storeChangeListener); - ConfiguredObjectRecoverer<? extends ConfiguredObject> brokerRecoverer = provider.getRecoverer(Broker.class.getSimpleName()); - _broker = (Broker) brokerRecoverer.create(provider, _store.getRootEntry()); + StoreConfigurationChangeListener storeChangeListener = new StoreConfigurationChangeListener(_store); + RecovererProvider provider = new DefaultRecovererProvider((StatisticsGatherer)this, _virtualHostRegistry, _logRecorder, _rootMessageLogger, _taskExecutor, brokerOptions, storeChangeListener); + ConfiguredObjectRecoverer<? extends ConfiguredObject> brokerRecoverer = provider.getRecoverer(Broker.class.getSimpleName()); + _broker = (Broker) brokerRecoverer.create(provider, _store.getRootEntry()); - _virtualHostRegistry.setDefaultVirtualHostName((String)_broker.getAttribute(Broker.DEFAULT_VIRTUAL_HOST)); + _virtualHostRegistry.setDefaultVirtualHostName((String)_broker.getAttribute(Broker.DEFAULT_VIRTUAL_HOST)); - initialiseStatisticsReporting(); + initialiseStatisticsReporting(); - // starting the broker - _broker.setDesiredState(State.INITIALISING, State.ACTIVE); + // starting the broker + _broker.setDesiredState(State.INITIALISING, State.ACTIVE); - CurrentActor.get().message(BrokerMessages.READY()); - } - finally - { - CurrentActor.remove(); - } + SystemLog.message(BrokerMessages.READY()); + SystemLog.setRootMessageLogger(_rootMessageLogger); - CurrentActor.setDefault(new BrokerActor(_rootMessageLogger)); } private void initialiseStatisticsReporting() @@ -158,29 +145,40 @@ public class ApplicationRegistry implements IApplicationRegistry private final boolean _reset; private final RootMessageLogger _logger; + private final Subject _subject; public StatisticsReportingTask(boolean reset, RootMessageLogger logger) { _reset = reset; _logger = logger; + _subject = new Subject(false, SecurityManager.SYSTEM.getPrincipals(), Collections.emptySet(), Collections.emptySet()); + _subject.getPrincipals().add(new TaskPrincipal("Statistics")); + _subject.setReadOnly(); + } public void run() { - CurrentActor.set(new AbstractActor(_logger) + Subject.doAs(_subject, new PrivilegedAction<Object>() { - public String getLogMessage() + @Override + public Object run() { - return "[" + Thread.currentThread().getName() + "] "; + reportStatistics(); + return null; } }); + } + + protected void reportStatistics() + { try { - CurrentActor.get().message(BrokerMessages.STATS_DATA(DELIVERED, _dataDelivered.getPeak() / 1024.0, _dataDelivered.getTotal())); - CurrentActor.get().message(BrokerMessages.STATS_MSGS(DELIVERED, _messagesDelivered.getPeak(), _messagesDelivered.getTotal())); - CurrentActor.get().message(BrokerMessages.STATS_DATA(RECEIVED, _dataReceived.getPeak() / 1024.0, _dataReceived.getTotal())); - CurrentActor.get().message(BrokerMessages.STATS_MSGS(RECEIVED, _messagesReceived.getPeak(), _messagesReceived.getTotal())); - Collection<VirtualHost> hosts = _virtualHostRegistry.getVirtualHosts(); + SystemLog.message(BrokerMessages.STATS_DATA(DELIVERED, _dataDelivered.getPeak() / 1024.0, _dataDelivered.getTotal())); + SystemLog.message(BrokerMessages.STATS_MSGS(DELIVERED, _messagesDelivered.getPeak(), _messagesDelivered.getTotal())); + SystemLog.message(BrokerMessages.STATS_DATA(RECEIVED, _dataReceived.getPeak() / 1024.0, _dataReceived.getTotal())); + SystemLog.message(BrokerMessages.STATS_MSGS(RECEIVED, _messagesReceived.getPeak(), _messagesReceived.getTotal())); + Collection<VirtualHost> hosts = _virtualHostRegistry.getVirtualHosts(); if (hosts.size() > 1) { @@ -192,10 +190,10 @@ public class ApplicationRegistry implements IApplicationRegistry StatisticsCounter dataReceived = vhost.getDataReceiptStatistics(); StatisticsCounter messagesReceived = vhost.getMessageReceiptStatistics(); - CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, DELIVERED, dataDelivered.getPeak() / 1024.0, dataDelivered.getTotal())); - CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, DELIVERED, messagesDelivered.getPeak(), messagesDelivered.getTotal())); - CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, RECEIVED, dataReceived.getPeak() / 1024.0, dataReceived.getTotal())); - CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, RECEIVED, messagesReceived.getPeak(), messagesReceived.getTotal())); + SystemLog.message(VirtualHostMessages.STATS_DATA(name, DELIVERED, dataDelivered.getPeak() / 1024.0, dataDelivered.getTotal())); + SystemLog.message(VirtualHostMessages.STATS_MSGS(name, DELIVERED, messagesDelivered.getPeak(), messagesDelivered.getTotal())); + SystemLog.message(VirtualHostMessages.STATS_DATA(name, RECEIVED, dataReceived.getPeak() / 1024.0, dataReceived.getTotal())); + SystemLog.message(VirtualHostMessages.STATS_MSGS(name, RECEIVED, messagesReceived.getPeak(), messagesReceived.getTotal())); } } @@ -208,10 +206,6 @@ public class ApplicationRegistry implements IApplicationRegistry { ApplicationRegistry._logger.warn("Unexpected exception occurred while reporting the statistics", e); } - finally - { - CurrentActor.remove(); - } } } @@ -241,8 +235,6 @@ public class ApplicationRegistry implements IApplicationRegistry _logger.info("Shutting down ApplicationRegistry:" + this); } - //Set the Actor for Broker Shutdown - CurrentActor.set(new BrokerActor(_rootMessageLogger)); try { //Stop Statistics Reporting @@ -264,7 +256,7 @@ public class ApplicationRegistry implements IApplicationRegistry _taskExecutor.stop(); } - CurrentActor.get().message(BrokerMessages.STOPPED()); + SystemLog.message(BrokerMessages.STOPPED()); _logRecorder.closeLogRecorder(); @@ -275,7 +267,6 @@ public class ApplicationRegistry implements IApplicationRegistry { _taskExecutor.stopImmediately(); } - CurrentActor.remove(); } _store = null; _broker = null; @@ -334,17 +325,17 @@ public class ApplicationRegistry implements IApplicationRegistry _dataReceived = new StatisticsCounter("bytes-received"); } - private void logStartupMessages(LogActor logActor) + private void logStartupMessages() { - logActor.message(BrokerMessages.STARTUP(QpidProperties.getReleaseVersion(), QpidProperties.getBuildVersion())); + SystemLog.message(BrokerMessages.STARTUP(QpidProperties.getReleaseVersion(), QpidProperties.getBuildVersion())); - logActor.message(BrokerMessages.PLATFORM(System.getProperty("java.vendor"), + SystemLog.message(BrokerMessages.PLATFORM(System.getProperty("java.vendor"), System.getProperty("java.runtime.version", System.getProperty("java.version")), SystemUtils.getOSName(), SystemUtils.getOSVersion(), SystemUtils.getOSArch())); - logActor.message(BrokerMessages.MAX_MEMORY(Runtime.getRuntime().maxMemory())); + SystemLog.message(BrokerMessages.MAX_MEMORY(Runtime.getRuntime().maxMemory())); } @Override diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java b/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java index 1f0d482ed2..37c379da86 100755 --- a/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java @@ -151,6 +151,12 @@ public class SecurityManager implements ConfigurationChangeListener return _logger; } + public static boolean isSystemProcess() + { + Subject subject = Subject.getSubject(AccessController.getContext()); + return !(subject == null || subject.getPrincipals(SystemPrincipal.class).isEmpty()); + } + private static final class SystemPrincipal implements Principal { private SystemPrincipal() @@ -172,8 +178,7 @@ public class SecurityManager implements ConfigurationChangeListener private boolean checkAllPlugins(AccessCheck checker) { // If we are running as SYSTEM then no ACL checking - final Subject subject = Subject.getSubject(AccessController.getContext()); - if(subject != null && !subject.getPrincipals(SystemPrincipal.class).isEmpty()) + if(isSystemProcess()) { return true; } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/BrokerActor.java b/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/ManagementConnectionPrincipal.java index 9e77452228..640c61c5af 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/BrokerActor.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/ManagementConnectionPrincipal.java @@ -18,36 +18,9 @@ * under the License. * */ -package org.apache.qpid.server.logging.actors; +package org.apache.qpid.server.security.auth; -import org.apache.qpid.server.logging.RootMessageLogger; - -public class BrokerActor extends AbstractActor +public interface ManagementConnectionPrincipal extends SocketConnectionPrincipal { - private final String _logString; - - /** - * Create a new BrokerActor - * - * @param logger - */ - public BrokerActor(RootMessageLogger logger) - { - super(logger); - - _logString = "[Broker] "; - } - - public BrokerActor(String name, RootMessageLogger logger) - { - super(logger); - - _logString = "[Broker(" + name + ")] "; - } - - public String getLogMessage() - { - return _logString; - } - + public String getType(); } diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/TestLogActor.java b/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/TaskPrincipal.java index 30f4e16e42..e1cd3e9d62 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/TestLogActor.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/TaskPrincipal.java @@ -18,20 +18,22 @@ * under the License. * */ -package org.apache.qpid.server.logging.actors; +package org.apache.qpid.server.security.auth; -import org.apache.qpid.server.logging.RootMessageLogger; +import java.security.Principal; -public class TestLogActor extends AbstractActor +public class TaskPrincipal implements Principal { - public TestLogActor(RootMessageLogger rootLogger) + + private final String _name; + + public TaskPrincipal(final String name) { - super(rootLogger); + _name = name; } - public String getLogMessage() + public String getName() { - return "[Test Actor] "; + return _name; } } -
\ No newline at end of file diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/jmx/JMXConnectionPrincipal.java b/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/jmx/JMXConnectionPrincipal.java new file mode 100644 index 0000000000..35ac197e74 --- /dev/null +++ b/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/jmx/JMXConnectionPrincipal.java @@ -0,0 +1,82 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.security.auth.jmx; + +import org.apache.qpid.server.security.auth.ManagementConnectionPrincipal; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +public class JMXConnectionPrincipal implements ManagementConnectionPrincipal +{ + private final InetSocketAddress _address; + + public JMXConnectionPrincipal(final String host) + { + _address = new InetSocketAddress(host,0); + } + + @Override + public SocketAddress getRemoteAddress() + { + return _address; + } + + @Override + public String getName() + { + return _address.toString(); + } + + @Override + public boolean equals(final Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + + final JMXConnectionPrincipal that = (JMXConnectionPrincipal) o; + + if (!_address.equals(that._address)) + { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + return _address.hashCode(); + } + + @Override + public String getType() + { + return "JMX"; + } +} diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/jmx/JMXPasswordAuthenticator.java b/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/jmx/JMXPasswordAuthenticator.java index 94de7754f6..ae080ff779 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/jmx/JMXPasswordAuthenticator.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/jmx/JMXPasswordAuthenticator.java @@ -20,18 +20,14 @@ */ package org.apache.qpid.server.security.auth.jmx; -import java.net.InetSocketAddress; import java.net.SocketAddress; import java.rmi.server.RemoteServer; import java.rmi.server.ServerNotActiveException; -import java.security.Principal; import java.security.PrivilegedAction; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus; -import org.apache.qpid.server.security.auth.SocketConnectionPrincipal; import org.apache.qpid.server.security.auth.SubjectAuthenticationResult; import javax.management.remote.JMXAuthenticator; @@ -123,7 +119,7 @@ public class JMXPasswordAuthenticator implements JMXAuthenticator originalSubject.getPrincipals(), originalSubject.getPublicCredentials(), originalSubject.getPrivateCredentials()); - subject.getPrincipals().add(new JMSConnectionPrincipal(clientHost)); + subject.getPrincipals().add(new JMXConnectionPrincipal(clientHost)); subject.setReadOnly(); } catch(ServerNotActiveException e) @@ -153,53 +149,4 @@ public class JMXPasswordAuthenticator implements JMXAuthenticator } - private static class JMSConnectionPrincipal implements SocketConnectionPrincipal - { - private final InetSocketAddress _address; - - public JMSConnectionPrincipal(final String host) - { - _address = new InetSocketAddress(host,0); - } - - @Override - public SocketAddress getRemoteAddress() - { - return _address; - } - - @Override - public String getName() - { - return _address.toString(); - } - - @Override - public boolean equals(final Object o) - { - if (this == o) - { - return true; - } - if (o == null || getClass() != o.getClass()) - { - return false; - } - - final JMSConnectionPrincipal that = (JMSConnectionPrincipal) o; - - if (!_address.equals(that._address)) - { - return false; - } - - return true; - } - - @Override - public int hashCode() - { - return _address.hashCode(); - } - } }
\ No newline at end of file diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java index 3c5fb44d23..bdc059e912 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java @@ -28,10 +28,9 @@ import java.util.Map; import java.util.UUID; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.IllegalConfigurationException; -import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.logging.messages.ConfigStoreMessages; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; -import org.apache.qpid.server.util.ServerScopedRuntimeException; import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION; @@ -103,7 +102,7 @@ public class DurableConfigurationRecoverer implements ConfigurationRecoveryHandl checkUnresolvedDependencies(); applyUpgrade(); - CurrentActor.get().message(_logSubject, ConfigStoreMessages.RECOVERY_COMPLETE()); + SystemLog.message(_logSubject, ConfigStoreMessages.RECOVERY_COMPLETE()); return CURRENT_CONFIG_VERSION; } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/OperationalLoggingListener.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/OperationalLoggingListener.java index 4ab1a3ab05..1420c950d9 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/store/OperationalLoggingListener.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/OperationalLoggingListener.java @@ -20,7 +20,7 @@ package org.apache.qpid.server.store; import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.logging.messages.ConfigStoreMessages; import org.apache.qpid.server.logging.messages.MessageStoreMessages; import org.apache.qpid.server.logging.messages.TransactionLogMessages; @@ -49,31 +49,31 @@ public class OperationalLoggingListener implements EventListener switch(event) { case BEFORE_INIT: - CurrentActor.get().message(_logSubject, ConfigStoreMessages.CREATED()); + SystemLog.message(_logSubject, ConfigStoreMessages.CREATED()); break; case AFTER_INIT: - CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED()); - CurrentActor.get().message(_logSubject, TransactionLogMessages.CREATED()); + SystemLog.message(_logSubject, MessageStoreMessages.CREATED()); + SystemLog.message(_logSubject, TransactionLogMessages.CREATED()); String storeLocation = _store.getStoreLocation(); if (storeLocation != null) { - CurrentActor.get().message(_logSubject, MessageStoreMessages.STORE_LOCATION(storeLocation)); + SystemLog.message(_logSubject, MessageStoreMessages.STORE_LOCATION(storeLocation)); } break; case BEFORE_ACTIVATE: - CurrentActor.get().message(_logSubject, MessageStoreMessages.RECOVERY_START()); + SystemLog.message(_logSubject, MessageStoreMessages.RECOVERY_START()); break; case AFTER_ACTIVATE: - CurrentActor.get().message(_logSubject, MessageStoreMessages.RECOVERY_COMPLETE()); + SystemLog.message(_logSubject, MessageStoreMessages.RECOVERY_COMPLETE()); break; case AFTER_CLOSE: - CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED()); + SystemLog.message(_logSubject, MessageStoreMessages.CLOSED()); break; case PERSISTENT_MESSAGE_SIZE_OVERFULL: - CurrentActor.get().message(_logSubject,MessageStoreMessages.OVERFULL()); + SystemLog.message(_logSubject, MessageStoreMessages.OVERFULL()); break; case PERSISTENT_MESSAGE_SIZE_UNDERFULL: - CurrentActor.get().message(_logSubject,MessageStoreMessages.UNDERFULL()); + SystemLog.message(_logSubject, MessageStoreMessages.UNDERFULL()); break; } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 739dd1cc5b..c5eb70c1f0 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -37,6 +37,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.exchange.AMQUnknownExchangeType; import org.apache.qpid.server.exchange.ExchangeImpl; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.configuration.ExchangeConfiguration; @@ -48,7 +49,6 @@ import org.apache.qpid.server.exchange.DefaultExchangeFactory; import org.apache.qpid.server.exchange.DefaultExchangeRegistry; import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.VirtualHostMessages; import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageNode; @@ -150,7 +150,7 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg _id = UUIDGenerator.generateVhostUUID(_name); - CurrentActor.get().message(VirtualHostMessages.CREATED(_name)); + SystemLog.message(VirtualHostMessages.CREATED(_name)); _securityManager = new SecurityManager(parentSecurityManager, _vhostConfig.getConfig().getString("security.acl"), _name); @@ -704,7 +704,7 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg _state = State.STOPPED; - CurrentActor.get().message(VirtualHostMessages.CLOSED()); + SystemLog.message(VirtualHostMessages.CLOSED()); } protected void closeStorage() @@ -911,7 +911,7 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg { if (state == State.ERRORED) { - CurrentActor.get().message(VirtualHostMessages.ERRORED()); + SystemLog.message(VirtualHostMessages.ERRORED()); } } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java index 1b0e50fd34..83ef437410 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java @@ -22,9 +22,11 @@ package org.apache.qpid.server.virtualhost; import org.apache.log4j.Logger; -import org.apache.qpid.server.logging.RootMessageLogger; -import org.apache.qpid.server.logging.actors.AbstractActor; -import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.security.auth.TaskPrincipal; + +import javax.security.auth.Subject; +import java.security.PrivilegedAction; +import java.util.Collections; public abstract class HouseKeepingTask implements Runnable { @@ -34,12 +36,17 @@ public abstract class HouseKeepingTask implements Runnable private String _name; - private RootMessageLogger _rootLogger; + private final Subject _subject; + public HouseKeepingTask(VirtualHost vhost) { _virtualHost = vhost; _name = _virtualHost.getName() + ":" + this.getClass().getSimpleName(); - _rootLogger = CurrentActor.get().getRootMessageLogger(); + _subject = new Subject(false, org.apache.qpid.server.security.SecurityManager.SYSTEM.getPrincipals(), Collections + .emptySet(), Collections.emptySet()); + _subject.getPrincipals().add(new TaskPrincipal(_name)); + _subject.setReadOnly(); + } final public void run() @@ -47,27 +54,27 @@ public abstract class HouseKeepingTask implements Runnable String originalThreadName = Thread.currentThread().getName(); Thread.currentThread().setName(_name); - CurrentActor.set(new AbstractActor(_rootLogger) - { - @Override - public String getLogMessage() - { - return _name; - } - }); - try { - execute(); - } - catch (Exception e) - { - _logger.warn(this.getClass().getSimpleName() + " throw exception: " + e, e); + Subject.doAs(_subject, new PrivilegedAction<Object>() + { + @Override + public Object run() + { + try + { + execute(); + } + catch (Exception e) + { + _logger.warn(this.getClass().getSimpleName() + " throw exception: " + e, e); + } + return null; + } + }); } finally { - CurrentActor.remove(); - // eagerly revert the thread name to make thread dumps more meaningful if captured after task has finished Thread.currentThread().setName(originalThreadName); } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java index 302c61e491..c5ea2c7ac1 100755 --- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java @@ -28,7 +28,7 @@ import java.util.UUID; import org.apache.log4j.Logger; import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.logging.messages.TransactionLogMessages; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; import org.apache.qpid.server.message.EnqueueableMessage; @@ -82,7 +82,7 @@ public class VirtualHostConfigRecoveryHandler implements { _logSubject = new MessageStoreLogSubject(_virtualHost.getName(), store.getClass().getSimpleName()); _store = store; - CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_START(null, false)); + SystemLog.message(_logSubject, TransactionLogMessages.RECOVERY_START(null, false)); return this; } @@ -149,9 +149,9 @@ public class VirtualHostConfigRecoveryHandler implements else { StringBuilder xidString = xidAsString(id); - CurrentActor.get().message(_logSubject, - TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(), - Long.toString(messageId))); + SystemLog.message(_logSubject, + TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(), + Long.toString(messageId))); } @@ -159,9 +159,9 @@ public class VirtualHostConfigRecoveryHandler implements else { StringBuilder xidString = xidAsString(id); - CurrentActor.get().message(_logSubject, - TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(), - record.getResource().getId().toString())); + SystemLog.message(_logSubject, + TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(), + record.getResource().getId().toString())); } } @@ -199,9 +199,9 @@ public class VirtualHostConfigRecoveryHandler implements else { StringBuilder xidString = xidAsString(id); - CurrentActor.get().message(_logSubject, - TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(), - Long.toString(messageId))); + SystemLog.message(_logSubject, + TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(), + Long.toString(messageId))); } @@ -209,9 +209,9 @@ public class VirtualHostConfigRecoveryHandler implements else { StringBuilder xidString = xidAsString(id); - CurrentActor.get().message(_logSubject, - TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(), - record.getResource().getId().toString())); + SystemLog.message(_logSubject, + TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(), + record.getResource().getId().toString())); } } @@ -238,7 +238,7 @@ public class VirtualHostConfigRecoveryHandler implements _logger.warn("Message id " + m.getMessageNumber() + " in store, but not in any queue - removing...."); m.remove(); } - CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false)); + SystemLog.message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false)); } public void complete() @@ -316,9 +316,9 @@ public class VirtualHostConfigRecoveryHandler implements for(Map.Entry<String,Integer> entry : _queueRecoveries.entrySet()) { - CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERED(entry.getValue(), entry.getKey())); + SystemLog.message(_logSubject, TransactionLogMessages.RECOVERED(entry.getValue(), entry.getKey())); - CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(entry.getKey(), true)); + SystemLog.message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(entry.getKey(), true)); } diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/TransactionTimeoutHelperTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/TransactionTimeoutHelperTest.java index 7a7954c8f9..4548bd9210 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/TransactionTimeoutHelperTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/TransactionTimeoutHelperTest.java @@ -21,7 +21,9 @@ package org.apache.qpid.server; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.qpid.server.logging.messages.ChannelMessages.IDLE_TXN_LOG_HIERARCHY; import static org.apache.qpid.server.logging.messages.ChannelMessages.OPEN_TXN_LOG_HIERARCHY; +import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -29,10 +31,10 @@ import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction; -import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.test.utils.QpidTestCase; import org.hamcrest.Description; @@ -40,7 +42,7 @@ import org.mockito.ArgumentMatcher; public class TransactionTimeoutHelperTest extends QpidTestCase { - private final LogActor _logActor = mock(LogActor.class); + private final RootMessageLogger _rootLogger = mock(RootMessageLogger.class); private final LogSubject _logSubject = mock(LogSubject.class); private final ServerTransaction _transaction = mock(ServerTransaction.class); private final CloseAction _closeAction = mock(CloseAction.class); @@ -53,7 +55,7 @@ public class TransactionTimeoutHelperTest extends QpidTestCase _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, 5, 10, 5, 10); - verifyZeroInteractions(_logActor, _closeAction); + verifyZeroInteractions(_rootLogger, _closeAction); } public void testOpenTransactionProducesWarningOnly() throws Exception @@ -64,7 +66,7 @@ public class TransactionTimeoutHelperTest extends QpidTestCase _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, SECONDS.toMillis(30), 0, 0, 0); - verify(_logActor).message(same(_logSubject), isLogMessage(OPEN_TXN_LOG_HIERARCHY, "CHN-1007 : Open Transaction : 61,\\d{3} ms")); + verify(_rootLogger).message(same(_logSubject), isLogMessage(OPEN_TXN_LOG_HIERARCHY, "CHN-1007 : Open Transaction : 61,\\d{3} ms")); verifyZeroInteractions(_closeAction); } @@ -77,7 +79,7 @@ public class TransactionTimeoutHelperTest extends QpidTestCase _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, 0, SECONDS.toMillis(30), 0, 0); verify(_closeAction).doTimeoutAction("Open transaction timed out"); - verifyZeroInteractions(_logActor); + verifyZeroInteractions(_rootLogger); } public void testOpenTransactionProducesWarningAndTimeoutAction() throws Exception @@ -88,7 +90,7 @@ public class TransactionTimeoutHelperTest extends QpidTestCase _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, SECONDS.toMillis(15), SECONDS.toMillis(30), 0, 0); - verify(_logActor).message(same(_logSubject), isLogMessage(OPEN_TXN_LOG_HIERARCHY, "CHN-1007 : Open Transaction : 61,\\d{3} ms")); + verify(_rootLogger).message(same(_logSubject), isLogMessage(OPEN_TXN_LOG_HIERARCHY, "CHN-1007 : Open Transaction : 61,\\d{3} ms")); verify(_closeAction).doTimeoutAction("Open transaction timed out"); } @@ -101,7 +103,7 @@ public class TransactionTimeoutHelperTest extends QpidTestCase _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, 0, 0, SECONDS.toMillis(30), 0); - verify(_logActor).message(same(_logSubject), isLogMessage(IDLE_TXN_LOG_HIERARCHY, "CHN-1008 : Idle Transaction : 31,\\d{3} ms")); + verify(_rootLogger).message(same(_logSubject), isLogMessage(IDLE_TXN_LOG_HIERARCHY, "CHN-1008 : Idle Transaction : 31,\\d{3} ms")); verifyZeroInteractions(_closeAction); } @@ -115,7 +117,7 @@ public class TransactionTimeoutHelperTest extends QpidTestCase _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, 0, 0, 0, SECONDS.toMillis(30)); verify(_closeAction).doTimeoutAction("Idle transaction timed out"); - verifyZeroInteractions(_logActor); + verifyZeroInteractions(_rootLogger); } public void testIdleTransactionProducesWarningAndTimeoutAction() throws Exception @@ -127,7 +129,7 @@ public class TransactionTimeoutHelperTest extends QpidTestCase _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, 0, 0, SECONDS.toMillis(15), SECONDS.toMillis(30)); - verify(_logActor).message(same(_logSubject), isLogMessage(IDLE_TXN_LOG_HIERARCHY, "CHN-1008 : Idle Transaction : 31,\\d{3} ms")); + verify(_rootLogger).message(same(_logSubject), isLogMessage(IDLE_TXN_LOG_HIERARCHY, "CHN-1008 : Idle Transaction : 31,\\d{3} ms")); verify(_closeAction).doTimeoutAction("Idle transaction timed out"); } @@ -140,8 +142,8 @@ public class TransactionTimeoutHelperTest extends QpidTestCase _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, SECONDS.toMillis(60), 0, SECONDS.toMillis(30), 0); - verify(_logActor).message(same(_logSubject), isLogMessage(IDLE_TXN_LOG_HIERARCHY, "CHN-1008 : Idle Transaction : 31,\\d{3} ms")); - verify(_logActor).message(same(_logSubject), isLogMessage(OPEN_TXN_LOG_HIERARCHY, "CHN-1007 : Open Transaction : 61,\\d{3} ms")); + verify(_rootLogger).message(same(_logSubject), isLogMessage(IDLE_TXN_LOG_HIERARCHY, "CHN-1008 : Idle Transaction : 31,\\d{3} ms")); + verify(_rootLogger).message(same(_logSubject), isLogMessage(OPEN_TXN_LOG_HIERARCHY, "CHN-1007 : Open Transaction : 61,\\d{3} ms")); verifyZeroInteractions(_closeAction); } @@ -149,26 +151,14 @@ public class TransactionTimeoutHelperTest extends QpidTestCase protected void setUp() throws Exception { super.setUp(); - - CurrentActor.set(_logActor); - + when(_logSubject.toLogString()).thenReturn(""); + when(_rootLogger.isEnabled()).thenReturn(true); + when(_rootLogger.isMessageEnabled(anyString())).thenReturn(true); + SystemLog.setRootMessageLogger(_rootLogger); _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject, _closeAction); _now = System.currentTimeMillis(); } - @Override - protected void tearDown() throws Exception - { - try - { - super.tearDown(); - } - finally - { - CurrentActor.remove(); - } - } - private void configureMockTransaction(final long startTime, final long updateTime) { when(_transaction.isTransactional()).thenReturn(true); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/TaskExecutorTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/TaskExecutorTest.java index a54ba06c53..083160a31d 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/TaskExecutorTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/TaskExecutorTest.java @@ -20,11 +20,9 @@ */ package org.apache.qpid.server.configuration.updater; -import java.security.AccessControlContext; import java.security.AccessController; import java.security.PrivilegedAction; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -37,12 +35,7 @@ import javax.security.auth.Subject; import junit.framework.TestCase; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.NullRootMessageLogger; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.TestLogActor; import org.apache.qpid.server.model.State; -import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.util.ServerScopedRuntimeException; public class TaskExecutorTest extends TestCase @@ -215,37 +208,26 @@ public class TaskExecutorTest extends TestCase public void testSubmitAndWaitCurrentActorAndSecurityManagerSubjectAreRespected() throws Exception { _executor.start(); - LogActor actor = new TestLogActor(new NullRootMessageLogger()); Subject subject = new Subject(); - final AtomicReference<LogActor> taskLogActor = new AtomicReference<LogActor>(); final AtomicReference<Subject> taskSubject = new AtomicReference<Subject>(); - try + Subject.doAs(subject, new PrivilegedAction<Object>() { - CurrentActor.set(actor); - Subject.doAs(subject, new PrivilegedAction<Object>() + @Override + public Object run() + { + _executor.submitAndWait(new TaskExecutor.Task<Object>() { @Override - public Object run() - { _executor.submitAndWait(new TaskExecutor.Task<Object>() + public Void call() { - @Override - public Void call() - { - taskLogActor.set(CurrentActor.get()); - taskSubject.set(Subject.getSubject(AccessController.getContext())); - return null; - } - }); - return null; + taskSubject.set(Subject.getSubject(AccessController.getContext())); + return null; } }); + return null; + } + }); - } - finally - { - CurrentActor.remove(); - } - assertEquals("Unexpected task log actor", actor, taskLogActor.get()); assertEquals("Unexpected security manager subject", subject, taskSubject.get()); } diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java index fa75d41810..0532386ff9 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java @@ -33,8 +33,6 @@ import java.util.UUID; import junit.framework.TestCase; import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; @@ -54,7 +52,6 @@ public class FanoutExchangeTest extends TestCase public void setUp() throws UnknownExchangeException { - CurrentActor.setDefault(mock(LogActor.class)); Map<String,Object> attributes = new HashMap<String, Object>(); attributes.put(Exchange.ID, UUID.randomUUID()); attributes.put(Exchange.NAME, "test"); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java index 8c9132d166..50b448eea3 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java @@ -24,8 +24,6 @@ import java.util.Collection; import junit.framework.TestCase; import org.apache.qpid.server.binding.BindingImpl; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; @@ -148,7 +146,6 @@ public class HeadersBindingTest extends TestCase VirtualHost vhost = mock(VirtualHost.class); when(_queue.getVirtualHost()).thenReturn(vhost); when(vhost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class)); - CurrentActor.set(mock(LogActor.class)); _exchange = mock(ExchangeImpl.class); when(_exchange.getExchangeType()).thenReturn(mock(ExchangeType.class)); } diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java index 76752de5d0..9377bbac47 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java @@ -30,8 +30,6 @@ import java.util.Set; import java.util.UUID; import junit.framework.TestCase; import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; @@ -58,7 +56,6 @@ public class HeadersExchangeTest extends TestCase { super.setUp(); - CurrentActor.setDefault(mock(LogActor.class)); _virtualHost = mock(VirtualHost.class); SecurityManager securityManager = mock(SecurityManager.class); when(_virtualHost.getSecurityManager()).thenReturn(securityManager); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/logging/Log4jMessageLoggerTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/logging/Log4jMessageLoggerTest.java index f6e301ec07..2078c3634f 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/logging/Log4jMessageLoggerTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/logging/Log4jMessageLoggerTest.java @@ -26,8 +26,6 @@ import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.log4j.spi.LoggingEvent; -import org.apache.qpid.server.logging.actors.BrokerActor; - import java.io.IOException; import java.util.LinkedList; import java.util.List; @@ -156,18 +154,17 @@ public class Log4jMessageLoggerTest extends TestCase Logger.getLogger(loggerName2).setLevel(Level.OFF); Log4jMessageLogger msgLogger = new Log4jMessageLogger(); - BrokerActor actor = new BrokerActor(msgLogger); - + assertTrue("Expected message logger to be enabled", msgLogger.isEnabled()); - assertTrue("Message should be enabled", msgLogger.isMessageEnabled(actor, loggerName1)); - assertFalse("Message should be disabled", msgLogger.isMessageEnabled(actor, loggerName2)); + assertTrue("Message should be enabled", msgLogger.isMessageEnabled(loggerName1)); + assertFalse("Message should be disabled", msgLogger.isMessageEnabled(loggerName2)); Logger.getLogger(loggerName1).setLevel(Level.WARN); Logger.getLogger(loggerName2).setLevel(Level.INFO); - assertFalse("Message should be disabled", msgLogger.isMessageEnabled(actor, loggerName1)); - assertTrue("Message should be enabled", msgLogger.isMessageEnabled(actor, loggerName2)); + assertFalse("Message should be disabled", msgLogger.isMessageEnabled(loggerName1)); + assertTrue("Message should be enabled", msgLogger.isMessageEnabled(loggerName2)); } /** diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java deleted file mode 100644 index 6ef319bcdf..0000000000 --- a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.logging.actors; - -import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.util.BrokerTestHelper; - -import java.util.List; - -/** - * Test : AMQPChannelActorTest - * Validate the AMQPChannelActor class. - * - * The test creates a new AMQPActor and then logs a message using it. - * - * The test then verifies that the logged message was the only one created and - * that the message contains the required message. - */ -public class AMQPChannelActorTest extends BaseConnectionActorTestCase -{ - - public void setUp() - { - // do nothing - } - - private void setUpNow() throws Exception - { - super.setUp(); - AMQSessionModel channel = BrokerTestHelper.createSession(1, getConnection()); - - setAmqpActor(new AMQPChannelActor(channel, getRootLogger())); - } - - - /** - * Test that when logging on behalf of the channel - * The test sends a message then verifies that it entered the logs. - * - * The log message should be fully replaced (no '{n}' values) and should - * contain the channel id ('/ch:1') identification. - */ - public void testChannel() throws Exception - { - setUpNow(); - - final String message = sendTestLogMessage(getAmqpActor()); - - List<Object> logs = getRawLogger().getLogMessages(); - - assertEquals("Message log size not as expected.", 1, logs.size()); - - // Verify that the logged message is present in the output - assertTrue("Message was not found in log message:" + logs.get(0), - logs.get(0).toString().contains(message)); - - // Verify that the message has the correct type - assertTrue("Message contains the [con: prefix", - logs.get(0).toString().contains("[con:")); - - - // Verify that all the values were presented to the MessageFormatter - // so we will not end up with '{n}' entries in the log. - assertFalse("Verify that the string does not contain any '{'." + logs.get(0), - logs.get(0).toString().contains("{")); - - // Verify that the logged message contains the 'ch:1' marker - assertTrue("Message was not logged as part of channel 1" + logs.get(0), - logs.get(0).toString().contains("/ch:1")); - } - - /** - * Test that if logging is configured to be off via system property that - * no logging is presented - */ - public void testChannelLoggingOFF() throws Exception - { - setStatusUpdatesEnabled(false); - - setUpNow(); - - sendTestLogMessage(getAmqpActor()); - - List<Object> logs = getRawLogger().getLogMessages(); - - assertEquals("Message log size not as expected.", 0, logs.size()); - } -} diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java index bee2930fd7..57ef51e170 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java @@ -20,9 +20,14 @@ */ package org.apache.qpid.server.logging.actors; +import org.apache.qpid.server.connection.ConnectionPrincipal; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.SystemLog; +import javax.security.auth.Subject; +import java.security.PrivilegedAction; +import java.util.Collections; import java.util.List; /** @@ -54,6 +59,8 @@ public class AMQPConnectionActorTest extends BaseConnectionActorTestCase { super.setUp(); + // ignore all the startup log messages + getRawLogger().clearLogMessages(); final String message = sendLogMessage(); List<Object> logs = getRawLogger().getLogMessages(); @@ -95,24 +102,34 @@ public class AMQPConnectionActorTest extends BaseConnectionActorTestCase private String sendLogMessage() { final String message = "test logging"; - - getAmqpActor().message(new LogSubject() + Subject subject = new Subject(false, Collections.singleton(new ConnectionPrincipal(getConnection())), Collections.emptySet(), Collections.emptySet()); + Subject.doAs(subject, new PrivilegedAction<Object>() { - public String toLogString() + @Override + public Object run() { - return "[AMQPActorTest]"; - } + SystemLog.message(new LogSubject() + { + public String toLogString() + { + return "[AMQPActorTest]"; + } + + }, new LogMessage() + { + public String toString() + { + return message; + } + + public String getLogHierarchy() + { + return "test.hierarchy"; + } + } + ); + return null; - }, new LogMessage() - { - public String toString() - { - return message; - } - - public String getLogHierarchy() - { - return "test.hierarchy"; } }); return message; diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/AbstractManagementActorTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/AbstractManagementActorTest.java deleted file mode 100644 index bf38bb64bf..0000000000 --- a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/AbstractManagementActorTest.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.logging.actors; - -import java.security.Principal; -import java.security.PrivilegedAction; -import java.util.Collections; - -import javax.security.auth.Subject; - -import org.apache.qpid.server.logging.NullRootMessageLogger; -import org.apache.qpid.server.security.auth.TestPrincipalUtils; -import org.apache.qpid.test.utils.QpidTestCase; - -public class AbstractManagementActorTest extends QpidTestCase -{ - private AbstractManagementActor _logActor; - - @Override - public void setUp() - { - _logActor = new AbstractManagementActor(new NullRootMessageLogger(), AbstractManagementActor.UNKNOWN_PRINCIPAL) - { - @Override - public String getLogMessage() - { - return null; - } - }; - } - - public void testGetPrincipalName() - { - Subject subject = TestPrincipalUtils.createTestSubject("guest"); - - final String principalName = Subject.doAs(subject, - new PrivilegedAction<String>() - { - public String run() - { - return _logActor.getPrincipalName(); - } - }); - - assertEquals("guest", principalName); - } - - public void testGetPrincipalNameUsingSubjectWithoutAuthenticatedPrincipal() - { - Subject subject = new Subject(true, Collections.<Principal>emptySet(), Collections.emptySet(), Collections.emptySet()); - - final String principalName = Subject.doAs(subject, - new PrivilegedAction<String>() - { - public String run() - { - return _logActor.getPrincipalName(); - } - }); - - assertEquals(AbstractManagementActor.UNKNOWN_PRINCIPAL, principalName); - } - - public void testGetPrincipalWithoutSubject() - { - assertEquals(AbstractManagementActor.UNKNOWN_PRINCIPAL, _logActor.getPrincipalName()); - } -} diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseActorTestCase.java b/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseActorTestCase.java index 30c3a51604..4ff46658ef 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseActorTestCase.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseActorTestCase.java @@ -20,17 +20,16 @@ */ package org.apache.qpid.server.logging.actors; -import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.logging.UnitTestMessageLogger; import org.apache.qpid.test.utils.QpidTestCase; public class BaseActorTestCase extends QpidTestCase { private boolean _statusUpdatesEnabled = true; - private LogActor _amqpActor; private UnitTestMessageLogger _rawLogger; private RootMessageLogger _rootLogger; @@ -38,10 +37,9 @@ public class BaseActorTestCase extends QpidTestCase public void setUp() throws Exception { super.setUp(); - CurrentActor.removeAll(); - CurrentActor.setDefault(null); _rawLogger = new UnitTestMessageLogger(_statusUpdatesEnabled); _rootLogger = _rawLogger; + SystemLog.setRootMessageLogger(_rootLogger); } @Override @@ -51,40 +49,39 @@ public class BaseActorTestCase extends QpidTestCase { _rawLogger.clearLogMessages(); } - CurrentActor.removeAll(); - CurrentActor.setDefault(null); super.tearDown(); } - public String sendTestLogMessage(LogActor actor) + public String sendTestLogMessage() { String message = "Test logging: " + getName(); - sendTestLogMessage(actor, message); + sendTestLogMessage(message); return message; } - public void sendTestLogMessage(LogActor actor, final String message) + public void sendTestLogMessage(final String message) { - actor.message(new LogSubject() - { - public String toLogString() - { - return message; - } + SystemLog.message(new LogSubject() + { + public String toLogString() + { + return message; + } - }, new LogMessage() - { - public String toString() - { - return message; - } + }, new LogMessage() + { + public String toString() + { + return message; + } - public String getLogHierarchy() - { - return "test.hierarchy"; - } - }); + public String getLogHierarchy() + { + return "test.hierarchy"; + } + } + ); } public boolean isStatusUpdatesEnabled() @@ -97,16 +94,6 @@ public class BaseActorTestCase extends QpidTestCase _statusUpdatesEnabled = statusUpdatesEnabled; } - public LogActor getAmqpActor() - { - return _amqpActor; - } - - public void setAmqpActor(LogActor amqpActor) - { - _amqpActor = amqpActor; - } - public UnitTestMessageLogger getRawLogger() { return _rawLogger; diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java b/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java index 1cb6474e41..797aa477a9 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java @@ -37,7 +37,6 @@ public class BaseConnectionActorTestCase extends BaseActorTestCase BrokerTestHelper.setUp(); _session = BrokerTestHelper.createConnection(); _virtualHost = BrokerTestHelper.createVirtualHost("test"); - setAmqpActor(new AMQPConnectionActor(_session, getRootLogger())); } public VirtualHost getVirtualHost() diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java deleted file mode 100644 index 300fcd70d3..0000000000 --- a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java +++ /dev/null @@ -1,251 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.logging.actors; - -import org.apache.commons.configuration.ConfigurationException; - -import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.NullRootMessageLogger; -import org.apache.qpid.server.util.BrokerTestHelper; - -/** - * Test : CurrentActorTest - * Summary: - * Validate ThreadLocal operation. - * - * Test creates THREADS number of threads which all then execute the same test - * together ( as close as looping Thread.start() will allow). - * - * Test: - * Test sets the CurrentActor then proceeds to retrieve the value and use it. - * - * The test also validates that it is the same LogActor that this thread set. - * - * Finally the LogActor is removed and tested to make sure that it was - * successfully removed. - * - * By having a higher number of threads than would normally be used in the - * Pooling filter we aim to catch the race condition where a ThreadLocal remove - * is called before one or more threads call get(). This way we can ensure that - * the remove does not affect more than the Thread it was called in. - */ -public class CurrentActorTest extends BaseConnectionActorTestCase -{ - //Set this to be a reasonably large number - private static final int THREADS = 10; - - /** - * Test that CurrentActor behaves as LIFO queue. - * - * Test creates two Actors Connection and Channel and then sets the - * CurrentActor. - * - * The test validates that CurrentActor remembers the Connection actor - * after the Channel actor has been removed. - * - * And then finally validates that removing the Connection actor results - * in there being no actors set. - * - * @throws org.apache.commons.configuration.ConfigurationException - */ - public void testLIFO() throws ConfigurationException - { - assertTrue("Unexpected actor: " + CurrentActor.get(), CurrentActor.get() instanceof TestLogActor); - AMQPConnectionActor connectionActor = new AMQPConnectionActor(getConnection(), - new NullRootMessageLogger()); - - /* - * Push the actor on to the stack: - * - * CurrentActor -> Connection - * Stack -> null - */ - CurrentActor.set(connectionActor); - - //Use the Actor to send a simple message - sendTestLogMessage(CurrentActor.get()); - - // Verify it was the same actor as we set earlier - assertEquals("Retrieved actor is not as expected ", - connectionActor, CurrentActor.get()); - - /** - * Set the actor to now be the Channel actor so testing the ability - * to push the actor on to the stack: - * - * CurrentActor -> Channel - * Stack -> Connection, null - * - */ - - AMQSessionModel channel = BrokerTestHelper.createSession(1, getConnection()); - - AMQPChannelActor channelActor = new AMQPChannelActor(channel, - new NullRootMessageLogger()); - - CurrentActor.set(channelActor); - - //Use the Actor to send a simple message - sendTestLogMessage(CurrentActor.get()); - - // Verify it was the same actor as we set earlier - assertEquals("Retrieved actor is not as expected ", - channelActor, CurrentActor.get()); - - // Remove the ChannelActor from the stack - CurrentActor.remove(); - /* - * Pop the actor on to the stack: - * - * CurrentActor -> Connection - * Stack -> null - */ - - - // Verify we now have the same connection actor as we set earlier - assertEquals("Retrieved actor is not as expected ", - connectionActor, CurrentActor.get()); - - // Verify that removing the our last actor it returns us to the test - // default that the ApplicationRegistry sets. - CurrentActor.remove(); - /* - * Pop the actor on to the stack: - * - * CurrentActor -> null - */ - - - assertEquals("CurrentActor not the Test default", TestLogActor.class ,CurrentActor.get().getClass()); - } - - /** - * Test the setting CurrentActor is done correctly as a ThreadLocal. - * - * The test starts 'THREADS' threads that all set the CurrentActor log - * a message then remove the actor. - * - * Checks are done to ensure that there is no set actor after the remove. - * - * If the ThreadLocal was not working then having concurrent actor sets - * would result in more than one actor and so the remove will not result - * in the clearing of the CurrentActor - * - */ - public void testThreadLocal() - { - - // Setup the threads - LogMessagesWithAConnectionActor[] threads = new LogMessagesWithAConnectionActor[THREADS]; - for (int count = 0; count < THREADS; count++) - { - threads[count] = new LogMessagesWithAConnectionActor(); - } - - //Run the threads - for (int count = 0; count < THREADS; count++) - { - threads[count].start(); - } - - // Wait for them to finish - for (int count = 0; count < THREADS; count++) - { - try - { - threads[count].join(); - } - catch (InterruptedException e) - { - //if we are interrupted then we will exit shortly. - } - } - - // Verify that none of the tests threw an exception - for (int count = 0; count < THREADS; count++) - { - if (threads[count].getException() != null) - { - threads[count].getException().printStackTrace(); - fail("Error occurred in thread:" + count + "("+threads[count].getException()+")"); - } - } - } - - /** - * Creates a new ConnectionActor and logs the given number of messages - * before removing the actor and validating that there is no set actor. - */ - public class LogMessagesWithAConnectionActor extends Thread - { - private Throwable _exception; - - public LogMessagesWithAConnectionActor() - { - } - - public void run() - { - - // Create a new actor using retrieving the rootMessageLogger from - // the default ApplicationRegistry. - //TODO reminder that we need a better approach for broker testing. - try - { - LogActor defaultActor = CurrentActor.get(); - - AMQPConnectionActor actor = new AMQPConnectionActor(getConnection(), - new NullRootMessageLogger()); - - CurrentActor.set(actor); - - //Use the Actor to send a simple message - sendTestLogMessage(CurrentActor.get()); - - // Verify it was the same actor as we set earlier - if(!actor.equals(CurrentActor.get())) - { - throw new IllegalArgumentException("Retrieved actor is not as expected "); - } - - // Verify that removing the actor works for this thread - CurrentActor.remove(); - - if(CurrentActor.get() != defaultActor) - { - throw new IllegalArgumentException("CurrentActor ("+CurrentActor.get()+") should be default actor" + defaultActor); - } - } - catch (Throwable e) - { - _exception = e; - } - - } - - public Throwable getException() - { - return _exception; - } - } - -} diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/HttpManagementActorTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/HttpManagementActorTest.java index 905de4b639..5b98693970 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/HttpManagementActorTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/HttpManagementActorTest.java @@ -22,33 +22,74 @@ package org.apache.qpid.server.logging.actors; import javax.security.auth.Subject; +import org.apache.qpid.server.logging.LogMessage; +import org.apache.qpid.server.logging.SystemLog; +import org.apache.qpid.server.security.auth.ManagementConnectionPrincipal; import org.apache.qpid.server.security.auth.TestPrincipalUtils; +import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.security.PrivilegedAction; +import java.util.Collections; import java.util.List; public class HttpManagementActorTest extends BaseActorTestCase { + public static final LogMessage EMPTY_MESSAGE = new LogMessage() + { + @Override + public String getLogHierarchy() + { + return ""; + } + + public String toString() + { + return ""; + } + }; private static final String IP = "127.0.0.1"; private static final int PORT = 1; - private static final String SUFFIX = "(" + IP + ":" + PORT + ")] "; + private static final String SUFFIX = "(/" + IP + ":" + PORT + ")] "; + private ManagementConnectionPrincipal _connectionPrincipal; @Override public void setUp() throws Exception { super.setUp(); - setAmqpActor(new HttpManagementActor(getRootLogger(), IP, PORT)); + _connectionPrincipal = new ManagementConnectionPrincipal() + { + @Override + public String getType() + { + return "HTTP"; + } + + @Override + public SocketAddress getRemoteAddress() + { + return new InetSocketAddress(IP, PORT); + } + + @Override + public String getName() + { + return getRemoteAddress().toString(); + } + }; } public void testSubjectPrincipalNameAppearance() { Subject subject = TestPrincipalUtils.createTestSubject("guest"); + subject.getPrincipals().add(_connectionPrincipal); + final String message = Subject.doAs(subject, new PrivilegedAction<String>() { public String run() { - return sendTestLogMessage(getAmqpActor()); + return sendTestLogMessage(); } }); @@ -74,18 +115,46 @@ public class HttpManagementActorTest extends BaseActorTestCase private void assertLogMessageWithoutPrincipal() { - String message = getAmqpActor().getLogMessage(); - assertEquals("Unexpected log message", "[mng:" + AbstractManagementActor.UNKNOWN_PRINCIPAL + SUFFIX, message); + getRawLogger().getLogMessages().clear(); + Subject subject = new Subject(false, + Collections.singleton(_connectionPrincipal), + Collections.emptySet(), + Collections.emptySet()); + Subject.doAs(subject, new PrivilegedAction<Object>() + { + @Override + public Object run() + { + SystemLog.message(EMPTY_MESSAGE); + List<Object> logs = getRawLogger().getLogMessages(); + assertEquals("Message log size not as expected.", 1, logs.size()); + + String logMessage = logs.get(0).toString(); + assertEquals("Unexpected log message", + "[mng:" + "N/A" + SUFFIX, + logMessage); + return null; + } + }); } private void assertLogMessageWithPrincipal(String principalName) { + getRawLogger().getLogMessages().clear(); + Subject subject = TestPrincipalUtils.createTestSubject(principalName); + subject.getPrincipals().add(_connectionPrincipal); final String message = Subject.doAs(subject, new PrivilegedAction<String>() { public String run() { - return getAmqpActor().getLogMessage(); + SystemLog.message(EMPTY_MESSAGE); + List<Object> logs = getRawLogger().getLogMessages(); + assertEquals("Message log size not as expected.", 1, logs.size()); + + String logMessage = logs.get(0).toString(); + + return logMessage; } }); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/ManagementActorTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/ManagementActorTest.java deleted file mode 100644 index a0bfa592db..0000000000 --- a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/ManagementActorTest.java +++ /dev/null @@ -1,186 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.logging.actors; - -import javax.security.auth.Subject; - -import org.apache.qpid.server.security.auth.TestPrincipalUtils; - -import java.security.PrivilegedAction; -import java.util.List; - -public class ManagementActorTest extends BaseActorTestCase -{ - - private static final String IP = "127.0.0.1"; - private static final String CONNECTION_ID = "1"; - private String _threadName; - - @Override - public void setUp() throws Exception - { - super.setUp(); - setAmqpActor(new ManagementActor(getRootLogger())); - - // Set the thread name to be the same as a RMI JMX Connection would use - _threadName = Thread.currentThread().getName(); - Thread.currentThread().setName("RMI TCP Connection(" + CONNECTION_ID + ")-" + IP); - } - - @Override - public void tearDown() throws Exception - { - Thread.currentThread().setName(_threadName); - super.tearDown(); - } - - /** - * Test the AMQPActor logging as a Connection level. - * - * The test sends a message then verifies that it entered the logs. - * - * The log message should be fully replaced (no '{n}' values) and should - * not contain any channel identification. - */ - public void testConnection() - { - final String message = sendTestLogMessage(getAmqpActor()); - - List<Object> logs = getRawLogger().getLogMessages(); - - assertEquals("Message log size not as expected.", 1, logs.size()); - - // Verify that the logged message is present in the output - assertTrue("Message was not found in log message", - logs.get(0).toString().contains(message)); - - // Verify that all the values were presented to the MessageFormatter - // so we will not end up with '{n}' entries in the log. - assertFalse("Verify that the string does not contain any '{'.", - logs.get(0).toString().contains("{")); - - // Verify that the message has the correct type - assertTrue("Message does not contain the [mng: prefix", - logs.get(0).toString().contains("[mng:")); - - // Verify that the logged message does not contains the 'ch:' marker - assertFalse("Message was logged with a channel identifier." + logs.get(0), - logs.get(0).toString().contains("/ch:")); - - // Verify that the message has the right values - assertTrue("Message contains the [mng: prefix", - logs.get(0).toString().contains("[mng:N/A(" + IP + ")")); - } - - /** - * Tests appearance of principal name in log message - */ - public void testSubjectPrincipalNameAppearance() - { - Subject subject = TestPrincipalUtils.createTestSubject("guest"); - - final String message = Subject.doAs(subject, new PrivilegedAction<String>() - { - public String run() - { - return sendTestLogMessage(getAmqpActor()); - } - }); - - // Verify that the log message was created - assertNotNull("Test log message is not created!", message); - - List<Object> logs = getRawLogger().getLogMessages(); - - // Verify that at least one log message was added to log - assertEquals("Message log size not as expected.", 1, logs.size()); - - String logMessage = logs.get(0).toString(); - - // Verify that the logged message is present in the output - assertTrue("Message was not found in log message", logMessage.contains(message)); - - // Verify that the message has the right principal value - assertTrue("Message contains the [mng: prefix", logMessage.contains("[mng:guest(" + IP + ")")); - } - - public void testGetLogMessageWithSubject() - { - assertLogMessageInRMIThreadWithPrincipal("RMI TCP Connection(" + CONNECTION_ID + ")-" + IP, "my_principal"); - } - - public void testGetLogMessageWithoutSubjectButWithActorPrincipal() - { - String principalName = "my_principal"; - setAmqpActor(new ManagementActor(getRootLogger(), principalName)); - String message = getAmqpActor().getLogMessage(); - assertEquals("Unexpected log message", "[mng:" + principalName + "(" + IP + ")] ", message); - } - - /** It's necessary to test successive calls because ManagementActor caches its log message based on thread and principal name */ - public void testGetLogMessageCaching() - { - String originalThreadName = "RMI TCP Connection(1)-" + IP; - assertLogMessageInRMIThreadWithoutPrincipal(originalThreadName); - assertLogMessageInRMIThreadWithPrincipal(originalThreadName, "my_principal"); - assertLogMessageInRMIThreadWithPrincipal("RMI TCP Connection(2)-" + IP, "my_principal"); - } - - public void testGetLogMessageAfterRemovingSubject() - { - assertLogMessageInRMIThreadWithPrincipal("RMI TCP Connection(1)-" + IP, "my_principal"); - - Thread.currentThread().setName("RMI TCP Connection(2)-" + IP ); - String message = getAmqpActor().getLogMessage(); - assertEquals("Unexpected log message", "[mng:N/A(" + IP + ")] ", message); - - assertLogMessageWithoutPrincipal("TEST"); - } - - private void assertLogMessageInRMIThreadWithoutPrincipal(String threadName) - { - Thread.currentThread().setName(threadName ); - String message = getAmqpActor().getLogMessage(); - assertEquals("Unexpected log message", "[mng:N/A(" + IP + ")] ", message); - } - - private void assertLogMessageWithoutPrincipal(String threadName) - { - Thread.currentThread().setName(threadName ); - String message = getAmqpActor().getLogMessage(); - assertEquals("Unexpected log message", "[" + threadName +"] ", message); - } - - private void assertLogMessageInRMIThreadWithPrincipal(String threadName, String principalName) - { - Thread.currentThread().setName(threadName); - Subject subject = TestPrincipalUtils.createTestSubject(principalName); - final String message = Subject.doAs(subject, new PrivilegedAction<String>() - { - public String run() - { - return getAmqpActor().getLogMessage(); - } - }); - - assertEquals("Unexpected log message", "[mng:" + principalName + "(" + IP + ")] ", message); - } -} diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/QueueActorTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/QueueActorTest.java deleted file mode 100644 index b371bf6e2f..0000000000 --- a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/QueueActorTest.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.logging.actors; - -import java.util.List; - -import org.apache.qpid.server.util.BrokerTestHelper; - -public class QueueActorTest extends BaseConnectionActorTestCase -{ - - @Override - public void setUp() throws Exception - { - super.setUp(); - setAmqpActor(new QueueActor(BrokerTestHelper.createQueue(getName(), getVirtualHost()), getRootLogger())); - } - - /** - * Test the QueueActor as a logger. - * - * The test logs a message then verifies that it entered the logs correctly - * - * The log message should be fully replaced (no '{n}' values) and should - * contain the correct queue identification. - */ - public void testQueueActor() - { - final String message = sendTestLogMessage(getAmqpActor()); - - List<Object> logs = getRawLogger().getLogMessages(); - - assertEquals("Message log size not as expected.", 1, logs.size()); - - String log = logs.get(0).toString(); - - // Verify that the logged message is present in the output - assertTrue("Message was not found in log message", - log.contains(message)); - - // Verify that all the values were presented to the MessageFormatter - // so we will not end up with '{n}' entries in the log. - assertFalse("Verify that the string does not contain any '{':" + log, - log.contains("{")); - - // Verify that the message has the correct type - assertTrue("Message contains the [vh: prefix:" + log, - log.contains("[vh(")); - - // Verify that the logged message contains the 'qu(' marker - String expected = "qu(" + getName() + ")"; - assertTrue("Message was not logged with a queue identifier '"+expected+"' actual:" + log, - log.contains(expected)); - } - -} - diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/AbstractTestMessages.java b/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/AbstractTestMessages.java index 229d75c69f..5eb4d2f1b0 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/AbstractTestMessages.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/AbstractTestMessages.java @@ -23,11 +23,10 @@ package org.apache.qpid.server.logging.messages; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.PropertiesConfiguration; -import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.logging.UnitTestMessageLogger; -import org.apache.qpid.server.logging.actors.TestLogActor; import org.apache.qpid.server.logging.subjects.TestBlankSubject; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.test.utils.QpidTestCase; @@ -38,7 +37,6 @@ public abstract class AbstractTestMessages extends QpidTestCase { protected Configuration _config = new PropertiesConfiguration(); protected LogMessage _logMessage = null; - protected LogActor _actor; protected UnitTestMessageLogger _logger; protected LogSubject _logSubject = new TestBlankSubject(); @@ -47,10 +45,9 @@ public abstract class AbstractTestMessages extends QpidTestCase { super.setUp(); - _logger = new UnitTestMessageLogger(); - - _actor = new TestLogActor(_logger); BrokerTestHelper.setUp(); + _logger = new UnitTestMessageLogger(); + SystemLog.setRootMessageLogger(_logger); } @Override @@ -60,14 +57,24 @@ public abstract class AbstractTestMessages extends QpidTestCase super.tearDown(); } + protected List<Object> getLog() + { + return _logger.getLogMessages(); + } + + protected void clearLog() + { + _logger.clearLogMessages(); + } + + protected List<Object> performLog() { if (_logMessage == null) { throw new NullPointerException("LogMessage has not been set"); } - - _actor.message(_logSubject, _logMessage); + SystemLog.message(_logSubject, _logMessage); return _logger.getLogMessages(); } @@ -77,6 +84,16 @@ public abstract class AbstractTestMessages extends QpidTestCase validateLogMessage(logs, tag, false, expected); } + protected void validateLogMessageNoSubject(List<Object> logs, String tag, String[] expected) + { + validateLogMessage(logs, tag, null, false, expected); + } + + protected void validateLogMessageNoSubject(List<Object> logs, String tag, boolean useStringForNull, String[] expected) + { + validateLogMessage(logs, tag, null, useStringForNull, expected); + } + /** * Validate that only a single log message occurred and that the message * section starts with the specified tag @@ -88,6 +105,11 @@ public abstract class AbstractTestMessages extends QpidTestCase */ protected void validateLogMessage(List<Object> logs, String tag, boolean useStringForNull, String[] expected) { + validateLogMessage(logs, tag, _logSubject, useStringForNull, expected); + } + + protected void validateLogMessage(List<Object> logs, String tag, LogSubject subject, boolean useStringForNull, String[] expected) + { assertEquals("Log has incorrect message count", 1, logs.size()); //We trim() here as we don't care about extra white space at the end of the log message @@ -98,11 +120,22 @@ public abstract class AbstractTestMessages extends QpidTestCase // Simple switch to print out all the logged messages //System.err.println(log); - int msgIndex = log.indexOf(_logSubject.toLogString())+_logSubject.toLogString().length(); + int msgIndex; + String message; + if(subject != null) + { + final String subjectString = subject.toLogString(); + msgIndex = log.indexOf(subjectString)+ subjectString.length(); - assertTrue("Unable to locate Subject:" + log, msgIndex != -1); + assertTrue("Unable to locate Subject:" + log, msgIndex != -1); - String message = log.substring(msgIndex); + message = log.substring(msgIndex); + } + else + { + msgIndex = log.indexOf(tag); + message = log.substring(msgIndex); + } assertTrue("Message does not start with tag:" + tag + ":" + message, message.startsWith(tag)); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/ExchangeMessagesTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/ExchangeMessagesTest.java index d970a1f732..f3982f9be3 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/ExchangeMessagesTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/ExchangeMessagesTest.java @@ -32,32 +32,32 @@ public class ExchangeMessagesTest extends AbstractTestMessages { public void testExchangeCreated_Transient() throws Exception { - ExchangeImpl exchange = BrokerTestHelper.createExchange("test"); + ExchangeImpl exchange = BrokerTestHelper.createExchange("test", false); String type = exchange.getTypeName(); String name = exchange.getName(); _logMessage = ExchangeMessages.CREATED(type, name, false); - List<Object> log = performLog(); + List<Object> log = getLog(); String[] expected = {"Create :", "Type:", type, "Name:", name}; - validateLogMessage(log, "EXH-1001", expected); + validateLogMessageNoSubject(log, "EXH-1001", expected); } public void testExchangeCreated_Persistent() throws Exception { - ExchangeImpl exchange = BrokerTestHelper.createExchange("test"); + ExchangeImpl exchange = BrokerTestHelper.createExchange("test", true); String type = exchange.getTypeName(); String name = exchange.getName(); _logMessage = ExchangeMessages.CREATED(type, name, true); - List<Object> log = performLog(); + List<Object> log = getLog(); String[] expected = {"Create :", "Durable", "Type:", type, "Name:", name}; - validateLogMessage(log, "EXH-1001", expected); + validateLogMessageNoSubject(log, "EXH-1001", expected); } public void testExchangeDeleted() @@ -72,11 +72,11 @@ public class ExchangeMessagesTest extends AbstractTestMessages public void testExchangeDiscardedMessage() throws Exception { - ExchangeImpl exchange = BrokerTestHelper.createExchange("test"); + ExchangeImpl exchange = BrokerTestHelper.createExchange("test", false); final String name = exchange.getName(); final String routingKey = "routingKey"; - + clearLog(); _logMessage = ExchangeMessages.DISCARDMSG(name, routingKey); List<Object> log = performLog(); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java b/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java index e7366dfe65..fd976e252e 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java @@ -22,12 +22,10 @@ package org.apache.qpid.server.logging.subjects; import org.apache.qpid.server.exchange.ExchangeImpl; -import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.logging.UnitTestMessageLogger; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.TestLogActor; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -61,14 +59,8 @@ public abstract class AbstractTestLogSubject extends QpidTestCase public void tearDown() throws Exception { BrokerTestHelper.tearDown(); - try - { - CurrentActor.removeAll(); - } - finally - { - super.tearDown(); - } + super.tearDown(); + } protected List<Object> performLog(boolean statusUpdatesEnabled) @@ -79,10 +71,9 @@ public abstract class AbstractTestLogSubject extends QpidTestCase } UnitTestMessageLogger logger = new UnitTestMessageLogger(statusUpdatesEnabled); + SystemLog.setRootMessageLogger(logger); - LogActor actor = new TestLogActor(logger); - - actor.message(_subject, new LogMessage() + SystemLog.message(_subject, new LogMessage() { public String toString() { diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java index 46a5ab9a3c..b8ecc4a2c0 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java @@ -28,15 +28,10 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; -import junit.framework.TestCase; - import org.apache.qpid.server.configuration.ConfigurationEntry; import org.apache.qpid.server.configuration.RecovererProvider; import org.apache.qpid.server.configuration.startup.VirtualHostRecoverer; import org.apache.qpid.server.configuration.updater.TaskExecutor; -import org.apache.qpid.server.logging.SystemOutMessageLogger; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.TestLogActor; import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.store.TestMemoryMessageStore; import org.apache.qpid.server.util.BrokerTestHelper; @@ -54,7 +49,6 @@ public class VirtualHostTest extends QpidTestCase protected void setUp() throws Exception { super.setUp(); - CurrentActor.set(new TestLogActor(new SystemOutMessageLogger())); _broker = BrokerTestHelper.createBrokerMock(); TaskExecutor taskExecutor = mock(TaskExecutor.class); @@ -65,13 +59,6 @@ public class VirtualHostTest extends QpidTestCase _statisticsGatherer = mock(StatisticsGatherer.class); } - @Override - protected void tearDown() throws Exception - { - super.tearDown(); - CurrentActor.remove(); - } - public void testInitialisingState() { VirtualHost host = createHost(); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java index 64bcf8f730..4302e4a83c 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java @@ -37,9 +37,6 @@ import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.DefaultExchangeFactory; import org.apache.qpid.server.exchange.ExchangeImpl; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.RootMessageLogger; -import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.plugin.ExchangeType; @@ -74,10 +71,6 @@ public class AMQQueueFactoryTest extends QpidTestCase when(_virtualHost.getConfiguration()).thenReturn(vhostConfig); _queueConfiguration = mock(QueueConfiguration.class); when(vhostConfig.getQueueConfiguration(anyString())).thenReturn(_queueConfiguration); - LogActor logActor = mock(LogActor.class); - CurrentActor.set(logActor); - RootMessageLogger rootLogger = mock(RootMessageLogger.class); - when(logActor.getRootMessageLogger()).thenReturn(rootLogger); DurableConfigurationStore store = mock(DurableConfigurationStore.class); when(_virtualHost.getDurableConfigurationStore()).thenReturn(store); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java index 0657c8cdad..d5639c1de8 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java @@ -20,9 +20,6 @@ package org.apache.qpid.server.queue; import junit.framework.TestCase; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.RootMessageLogger; -import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; @@ -64,15 +61,6 @@ public abstract class QueueEntryImplTestBase extends TestCase _queueEntry3 = getQueueEntryImpl(3); } - - protected void mockLogging() - { - final LogActor logActor = mock(LogActor.class); - when(logActor.getRootMessageLogger()).thenReturn(mock(RootMessageLogger.class)); - CurrentActor.setDefault(logActor); - } - - public void testAcquire() { assertTrue("Queue entry should be in AVAILABLE state before invoking of acquire method", diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java index 9039654c15..29413dd1a7 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java @@ -41,7 +41,6 @@ public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase @Override public void setUp() throws Exception { - mockLogging(); Map<String,Object> queueAttributes = new HashMap<String, Object>(); queueAttributes.put(Queue.ID, UUID.randomUUID()); queueAttributes.put(Queue.NAME, "SimpleQueueEntryImplTest"); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java index ef64226048..792e656a97 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java @@ -20,15 +20,12 @@ package org.apache.qpid.server.queue; import java.util.Collections; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.RootMessageLogger; -import org.apache.qpid.server.logging.actors.CurrentActor; + import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.security.*; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Arrays; @@ -77,8 +74,6 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase @Override protected void setUp() throws Exception { - mockLogging(); - Map<String,Object> attributes = new HashMap<String,Object>(); attributes.put(Queue.ID,UUID.randomUUID()); attributes.put(Queue.NAME, getName()); @@ -117,14 +112,6 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase } - protected void mockLogging() - { - final LogActor logActor = mock(LogActor.class); - when(logActor.getRootMessageLogger()).thenReturn(mock(RootMessageLogger.class)); - CurrentActor.setDefault(logActor); - } - - @Override public SortedQueueEntryList getTestList() { diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java index c1d5a9e9fe..e45a7667cd 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java @@ -46,8 +46,6 @@ public class SortedQueueEntryTest extends QueueEntryImplTestBase @Override public void setUp() throws Exception { - mockLogging(); - Map<String,Object> attributes = new HashMap<String,Object>(); attributes.put(Queue.ID,UUID.randomUUID()); attributes.put(Queue.NAME, getName()); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java index c03727087a..e3e1a3f659 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java @@ -20,13 +20,9 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.RootMessageLogger; -import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.security.*; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -57,9 +53,6 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase queueAttributes.put(Queue.NAME, getName()); final VirtualHost virtualHost = mock(VirtualHost.class); when(virtualHost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class)); - final LogActor logActor = mock(LogActor.class); - when(logActor.getRootMessageLogger()).thenReturn(mock(RootMessageLogger.class)); - CurrentActor.set(logActor); _testQueue = new StandardQueue(virtualHost, queueAttributes); _sqel = (StandardQueueEntryList) _testQueue.getEntries(); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java index 613dd07741..0ad56f3061 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java @@ -40,8 +40,6 @@ import org.apache.commons.configuration.Configuration; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.server.binding.BindingImpl; import org.apache.qpid.server.exchange.ExchangeImpl; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.model.ExclusivityPolicy; import org.apache.qpid.server.model.LifetimePolicy; @@ -95,7 +93,6 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest _queueId = UUIDGenerator.generateRandomUUID(); _exchangeId = UUIDGenerator.generateRandomUUID(); - CurrentActor.set(mock(LogActor.class)); _storeName = getName(); _storePath = TMP_FOLDER + File.separator + _storeName; FileUtils.delete(new File(_storePath), true); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/store/OperationalLoggingListenerTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/store/OperationalLoggingListenerTest.java index c309dad5eb..20a718bd23 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/store/OperationalLoggingListenerTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/store/OperationalLoggingListenerTest.java @@ -19,11 +19,10 @@ package org.apache.qpid.server.store; import java.util.ArrayList; import java.util.List; import junit.framework.TestCase; -import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.RootMessageLogger; -import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.logging.messages.ConfigStoreMessages; import org.apache.qpid.server.logging.messages.MessageStoreMessages; import org.apache.qpid.server.logging.messages.TransactionLogMessages; @@ -65,7 +64,7 @@ public class OperationalLoggingListenerTest extends TestCase { final List<LogMessage> messages = new ArrayList<LogMessage>(); - CurrentActor.set(new TestActor(messages)); + SystemLog.setRootMessageLogger(new TestRootLogger(messages)); if(setStoreLocation) { @@ -103,14 +102,6 @@ public class OperationalLoggingListenerTest extends TestCase assertEquals(messages.remove(0).toString(), MessageStoreMessages.CLOSED().toString()); } - @Override - protected void tearDown() throws Exception - { - super.tearDown(); - CurrentActor.remove(); - } - - private static final LogSubject LOG_SUBJECT = new LogSubject() { public String toLogString() @@ -155,11 +146,11 @@ public class OperationalLoggingListenerTest extends TestCase } } - private static class TestActor implements LogActor + private static class TestRootLogger implements RootMessageLogger { private final List<LogMessage> _messages; - public TestActor(List<LogMessage> messages) + private TestRootLogger(final List<LogMessage> messages) { _messages = messages; } @@ -169,19 +160,23 @@ public class OperationalLoggingListenerTest extends TestCase _messages.add(message); } - public void message(LogMessage message) + @Override + public boolean isEnabled() { - _messages.add(message); + return true; } - public RootMessageLogger getRootMessageLogger() + @Override + public boolean isMessageEnabled(final String logHierarchy) { - return null; + return true; } - public String getLogMessage() + public void message(LogMessage message) { - return null; + _messages.add(message); } + } + } diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java index ea8b492552..f93036142c 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java @@ -38,11 +38,6 @@ import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.configuration.store.JsonConfigurationEntryStore; import org.apache.qpid.server.exchange.DefaultExchangeFactory; -import org.apache.qpid.server.logging.RootMessageLogger; -import org.apache.qpid.server.logging.SystemOutMessageLogger; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.GenericActor; -import org.apache.qpid.server.logging.actors.TestLogActor; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AMQQueue; @@ -72,22 +67,17 @@ public class BrokerTestHelper when(broker.getAttribute(Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD)).thenReturn(10000l); when(broker.getId()).thenReturn(UUID.randomUUID()); when(broker.getSubjectCreator(any(SocketAddress.class))).thenReturn(subjectCreator); - RootMessageLogger rootMessageLogger = CurrentActor.get().getRootMessageLogger(); - when(broker.getRootMessageLogger()).thenReturn(rootMessageLogger); when(broker.getVirtualHostRegistry()).thenReturn(new VirtualHostRegistry()); when(broker.getSecurityManager()).thenReturn(new SecurityManager(mock(Broker.class), false)); - GenericActor.setDefaultMessageLogger(rootMessageLogger); return broker; } public static void setUp() { - CurrentActor.set(new TestLogActor(new SystemOutMessageLogger())); } public static void tearDown() { - CurrentActor.remove(); } public static VirtualHost createVirtualHost(VirtualHostConfiguration virtualHostConfiguration, VirtualHostRegistry virtualHostRegistry) @@ -170,7 +160,7 @@ public class BrokerTestHelper return connection; } - public static ExchangeImpl createExchange(String hostName) throws Exception + public static ExchangeImpl createExchange(String hostName, final boolean durable) throws Exception { SecurityManager securityManager = new SecurityManager(mock(Broker.class), false); VirtualHost virtualHost = mock(VirtualHost.class); @@ -181,7 +171,7 @@ public class BrokerTestHelper attributes.put(org.apache.qpid.server.model.Exchange.ID, UUIDGenerator.generateExchangeUUID("amp.direct", virtualHost.getName())); attributes.put(org.apache.qpid.server.model.Exchange.NAME, "amq.direct"); attributes.put(org.apache.qpid.server.model.Exchange.TYPE, "direct"); - attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, false); + attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, durable); return factory.createExchange(attributes); } diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java index 1eaccc4e5f..5455f03ebd 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java @@ -36,8 +36,6 @@ import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.exchange.HeadersExchange; import org.apache.qpid.server.exchange.TopicExchange; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.model.Binding; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.plugin.ExchangeType; @@ -185,7 +183,6 @@ public class DurableConfigurationRecovererTest extends QpidTestCase _store = mock(DurableConfigurationStore.class); - CurrentActor.set(mock(LogActor.class)); } public void testUpgradeEmptyStore() throws Exception diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/HouseKeepingTaskTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/HouseKeepingTaskTest.java index 8b4a52bb79..7d5a24f1aa 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/HouseKeepingTaskTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/HouseKeepingTaskTest.java @@ -20,56 +20,13 @@ */ package org.apache.qpid.server.virtualhost; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.NullRootMessageLogger; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.TestLogActor; import org.apache.qpid.test.utils.QpidTestCase; -import java.util.concurrent.CountDownLatch; - public class HouseKeepingTaskTest extends QpidTestCase { - /** - * Tests that the abstract HouseKeepingTask properly cleans up any LogActor - * it adds to the CurrentActor stack by verifying the CurrentActor set - * before task execution is the CurrentActor after execution. - */ - public void testCurrentActorStackBalance() throws Exception - { - //create and set a test actor - LogActor testActor = new TestLogActor(new NullRootMessageLogger()); - CurrentActor.set(testActor); - - //verify it is returned correctly before executing a HouseKeepingTask - assertEquals("Expected LogActor was not returned", testActor, CurrentActor.get()); - - final CountDownLatch latch = new CountDownLatch(1); - HouseKeepingTask testTask = new HouseKeepingTask(new MockVirtualHost("HouseKeepingTaskTestVhost")) - { - @Override - public void execute() - { - latch.countDown(); - } - }; - - //run the test HouseKeepingTask using the current Thread to influence its CurrentActor stack - testTask.run(); - - assertEquals("The expected LogActor was not returned, the CurrentActor stack has become unbalanced", - testActor, CurrentActor.get()); - assertEquals("HouseKeepingTask execute() method was not run", 0, latch.getCount()); - - //clean up the test actor - CurrentActor.remove(); - } public void testThreadNameIsSetForDurationOfTask() throws Exception { - //create and set a test actor - LogActor testActor = new TestLogActor(new NullRootMessageLogger()); - CurrentActor.set(testActor); String originalThreadName = Thread.currentThread().getName(); @@ -84,8 +41,6 @@ public class HouseKeepingTaskTest extends QpidTestCase assertEquals("Thread name should have been set during execution", expectedThreadNameDuringExecution, testTask.getThreadNameDuringExecution()); assertEquals("Thread name should have been reverted after task has run", originalThreadName, Thread.currentThread().getName()); - //clean up the test actor - CurrentActor.remove(); } diff --git a/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/RuleSet.java b/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/RuleSet.java index fcb5bcbf70..58f3c71c7c 100644 --- a/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/RuleSet.java +++ b/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/RuleSet.java @@ -38,7 +38,7 @@ import javax.security.auth.Subject; import org.apache.commons.lang.BooleanUtils; import org.apache.commons.lang.StringUtils; import org.apache.log4j.Logger; -import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.logging.messages.AccessControlMessages; import org.apache.qpid.server.security.Result; import org.apache.qpid.server.security.access.ObjectProperties; @@ -321,14 +321,14 @@ public class RuleSet switch (permission) { case ALLOW_LOG: - CurrentActor.get().message(AccessControlMessages.ALLOWED( + SystemLog.message(AccessControlMessages.ALLOWED( action.getOperation().toString(), action.getObjectType().toString(), action.getProperties().toString())); case ALLOW: return Result.ALLOWED; case DENY_LOG: - CurrentActor.get().message(AccessControlMessages.DENIED( + SystemLog.message(AccessControlMessages.DENIED( action.getOperation().toString(), action.getObjectType().toString(), action.getProperties().toString())); diff --git a/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControlTest.java b/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControlTest.java index e907a88001..f5f1866c3a 100644 --- a/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControlTest.java +++ b/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControlTest.java @@ -33,9 +33,8 @@ import junit.framework.TestCase; import org.apache.commons.configuration.ConfigurationException; import org.apache.qpid.server.connection.ConnectionPrincipal; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.logging.UnitTestMessageLogger; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.TestLogActor; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.security.Result; import org.apache.qpid.server.security.access.ObjectProperties; @@ -67,7 +66,7 @@ public class DefaultAccessControlTest extends TestCase private void configureAccessControl(final RuleSet rs) throws ConfigurationException { _plugin = new DefaultAccessControl(rs); - CurrentActor.set(new TestLogActor(messageLogger)); + SystemLog.setRootMessageLogger(messageLogger); } private RuleSet createGroupRuleSet() diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java index 120ba2d951..d41af30a09 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java @@ -22,8 +22,7 @@ package org.apache.qpid.server.protocol.v0_10; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.flow.FlowCreditManager; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; @@ -387,7 +386,6 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC protected void sendToDLQOrDiscard(MessageInstance entry) { - final LogActor logActor = CurrentActor.get(); final ServerMessage msg = entry.getMessage(); int requeues = entry.routeToAlternate(new Action<MessageInstance>() @@ -395,7 +393,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC @Override public void performAction(final MessageInstance requeueEntry) { - logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), + SystemLog.message(ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), requeueEntry.getOwningResource().getName())); } }, null); @@ -410,12 +408,12 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC if(alternateExchange != null) { - logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), + SystemLog.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), alternateExchange.getName())); } else { - logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), + SystemLog.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getInitialRoutingAddress())); } diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java index 9fe1babe20..9346e3e7bb 100755 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.protocol.v0_10; import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; @@ -30,8 +31,11 @@ import org.apache.qpid.transport.network.Disassembler; import org.apache.qpid.transport.network.InputHandler; import org.apache.qpid.transport.network.NetworkConnection; +import javax.security.auth.Subject; import java.net.SocketAddress; import java.nio.ByteBuffer; +import java.security.AccessController; +import java.security.PrivilegedAction; public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocolEngine @@ -63,15 +67,31 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol } } - public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) + public void setNetworkConnection(final NetworkConnection network, final Sender<ByteBuffer> sender) { - _network = network; + if(!getSubject().equals(Subject.getSubject(AccessController.getContext()))) + { + Subject.doAs(getSubject(), new PrivilegedAction<Object>() + { + @Override + public Object run() + { + setNetworkConnection(network,sender); + return null; + } + }); + } + else + { + SystemLog.message(ConnectionMessages.OPEN(null, null, null, null, false, false, false, false)); + _network = network; + + _connection.setNetworkConnection(network); + _connection.setSender(new Disassembler(wrapSender(sender), MAX_FRAME_SIZE)); + // FIXME Two log messages to maintain compatibility with earlier protocol versions + SystemLog.message(ConnectionMessages.OPEN(null, "0-10", null, null, false, true, false, false)); - _connection.setNetworkConnection(network); - _connection.setSender(new Disassembler(wrapSender(sender), MAX_FRAME_SIZE)); - // FIXME Two log messages to maintain compatibility with earlier protocol versions - _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, null, null, false, false, false, false)); - _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", null, null, false, true, false, false)); + } } private Sender<ByteBuffer> wrapSender(final Sender<ByteBuffer> sender) @@ -155,8 +175,17 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol public void readerIdle() { - _connection.getLogActor().message(ConnectionMessages.IDLE_CLOSE()); - _network.close(); + Subject.doAs(_connection.getAuthorizedSubject(), new PrivilegedAction<Object>() + { + @Override + public Object run() + { + SystemLog.message(ConnectionMessages.IDLE_CLOSE()); + _network.close(); + return null; + } + }); + } public String getAddress() @@ -189,4 +218,10 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol { return _connection.getConnectionId(); } + + @Override + public Subject getSubject() + { + return _connection.getAuthorizedSubject(); + } } diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index 5bfc398bcf..5096223889 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -32,10 +32,8 @@ import java.util.concurrent.atomic.AtomicLong; import javax.security.auth.Subject; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.connection.ConnectionPrincipal; -import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.actors.AMQPConnectionActor; -import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Port; @@ -65,7 +63,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S { private Runnable _onOpenTask; private AtomicBoolean _logClosed = new AtomicBoolean(false); - private LogActor _actor; private final Subject _authorizedSubject = new Subject(); private Principal _authorizedPrincipal = null; @@ -87,7 +84,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S { _connectionId = connectionId; _authorizedSubject.getPrincipals().add(new ConnectionPrincipal(this)); - _actor = new AMQPConnectionActor(this, broker.getRootMessageLogger()); } public Object getReference() @@ -112,7 +108,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S { _onOpenTask.run(); } - _actor.message(ConnectionMessages.OPEN(getClientId(), "0-10", getClientVersion(), getClientProduct(), true, true, true, true)); + SystemLog.message(ConnectionMessages.OPEN(getClientId(), "0-10", getClientVersion(), getClientProduct(), true, true, true, true)); getVirtualHost().getConnectionRegistry().registerConnection(this); } @@ -135,7 +131,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S { if(_logClosed.compareAndSet(false, true)) { - CurrentActor.get().message(this, ConnectionMessages.CLOSE()); + SystemLog.message(this, ConnectionMessages.CLOSE()); } } @@ -258,42 +254,31 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S Subject subject; if (event.isConnectionControl()) { - CurrentActor.set(_actor); subject = _authorizedSubject; } else { ServerSession channel = (ServerSession) getSession(event.getChannel()); - LogActor channelActor = null; if (channel != null) { subject = channel.getAuthorizedSubject(); - channelActor = channel.getLogActor(); } else { subject = _authorizedSubject; } - - CurrentActor.set(channelActor == null ? _actor : channelActor); } - try + Subject.doAs(subject, new PrivilegedAction<Void>() { - Subject.doAs(subject, new PrivilegedAction<Void>() + @Override + public Void run() { - @Override - public Void run() - { - ServerConnection.super.received(event); - return null; - } - }); - } - finally - { - CurrentActor.remove(); - } + ServerConnection.super.received(event); + return null; + } + }); + } public String toLogString() @@ -331,11 +316,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S } } - public LogActor getLogActor() - { - return _actor; - } - public void close(AMQConstant cause, String message) { closeSubscriptions(); diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index 5627b2eabe..453b3f85a5 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -20,7 +20,9 @@ */ package org.apache.qpid.server.protocol.v0_10; +import java.security.AccessController; import java.security.Principal; +import java.security.PrivilegedAction; import java.text.MessageFormat; import java.util.Collection; import java.util.Collections; @@ -43,15 +45,13 @@ import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; import org.apache.qpid.server.connection.SessionPrincipal; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.TransactionTimeoutHelper; import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction; -import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.subjects.ChannelLogSubject; import org.apache.qpid.server.message.InstanceProperties; @@ -102,7 +102,6 @@ public class ServerSession extends Session private final UUID _id = UUID.randomUUID(); private final Subject _subject = new Subject(); private long _createTime = System.currentTimeMillis(); - private LogActor _actor = GenericActor.getInstance(this); private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>()); @@ -161,18 +160,44 @@ public class ServerSession extends Session }); } - protected void setState(State state) + protected void setState(final State state) { - super.setState(state); - - if (state == State.OPEN) + if(runningAsSubject()) { - _actor.message(ChannelMessages.CREATE()); - if(_blocking.get()) + super.setState(state); + + if (state == State.OPEN) { - invokeBlock(); + SystemLog.message(ChannelMessages.CREATE()); + if(_blocking.get()) + { + invokeBlock(); + } } } + else + { + runAsSubject(new PrivilegedAction<Void>() { + + @Override + public Void run() + { + setState(state); + return null; + } + }); + + } + } + + private <T> T runAsSubject(final PrivilegedAction<T> privilegedAction) + { + return Subject.doAs(getAuthorizedSubject(), privilegedAction); + } + + private boolean runningAsSubject() + { + return getAuthorizedSubject().equals(Subject.getSubject(AccessController.getContext())); } private void invokeBlock() @@ -394,7 +419,7 @@ public class ServerSession extends Session { operationalLoggingMessage = ChannelMessages.CLOSE(); } - CurrentActor.get().message(getLogSubject(), operationalLoggingMessage); + SystemLog.message(getLogSubject(), operationalLoggingMessage); } @Override @@ -678,14 +703,10 @@ public class ServerSession extends Session return (ServerConnection) super.getConnection(); } - public LogActor getLogActor() - { - return _actor; - } public LogSubject getLogSubject() { - return (LogSubject) this; + return this; } public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) @@ -704,7 +725,7 @@ public class ServerSession extends Session } - private void block(Object queue, String name) + private void block(final Object queue, final String name) { synchronized (_blockingEntities) { @@ -717,7 +738,7 @@ public class ServerSession extends Session { invokeBlock(); } - _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED(name)); + SystemLog.message(_logSubject, ChannelMessages.FLOW_ENFORCED(name)); } @@ -735,25 +756,22 @@ public class ServerSession extends Session unblock(this); } - private void unblock(Object queue) + private void unblock(final Object queue) { - synchronized(_blockingEntities) + if(_blockingEntities.remove(queue) && _blockingEntities.isEmpty()) { - if(_blockingEntities.remove(queue) && _blockingEntities.isEmpty()) + if(_blocking.compareAndSet(true,false) && !isClosing()) { - if(_blocking.compareAndSet(true,false) && !isClosing()) - { - _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED()); - MessageFlow mf = new MessageFlow(); - mf.setUnit(MessageCreditUnit.MESSAGE); - mf.setDestination(""); - _outstandingCredit.set(Integer.MAX_VALUE); - mf.setValue(Integer.MAX_VALUE); - invoke(mf); + SystemLog.message(_logSubject, ChannelMessages.FLOW_REMOVED()); + MessageFlow mf = new MessageFlow(); + mf.setUnit(MessageCreditUnit.MESSAGE); + mf.setDestination(""); + _outstandingCredit.set(Integer.MAX_VALUE); + mf.setValue(Integer.MAX_VALUE); + invoke(mf); - } } } } diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 9d7764414f..d21da824e9 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -29,6 +29,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.exchange.ExchangeImpl; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.model.ExclusivityPolicy; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.store.StoreException; @@ -345,7 +346,7 @@ public class ServerSessionDelegate extends SessionDelegate } else { - serverSession.getLogActor().message(ExchangeMessages.DISCARDMSG(exchange.getName(), messageMetaData.getRoutingKey())); + SystemLog.message(ExchangeMessages.DISCARDMSG(exchange.getName(), messageMetaData.getRoutingKey())); } } diff --git a/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java b/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java index 421adb33a8..858482a034 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java @@ -19,8 +19,6 @@ package org.apache.qpid.server.protocol.v0_10; import org.apache.qpid.server.logging.RootMessageLogger; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -41,7 +39,6 @@ public class ServerSessionTest extends QpidTestCase super.setUp(); BrokerTestHelper.setUp(); _virtualHost = BrokerTestHelper.createVirtualHost(getName()); - GenericActor.setDefaultMessageLogger(CurrentActor.get().getRootMessageLogger()); } @Override diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 99068a9d6c..e248fc539a 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v0_8; import java.nio.ByteBuffer; import java.security.AccessControlException; +import java.security.PrivilegedAction; import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; @@ -54,11 +55,9 @@ import org.apache.qpid.server.filter.FilterManagerFactory; import org.apache.qpid.server.filter.SimpleFilterManager; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.flow.Pre0_10CreditManager; -import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.actors.AMQPChannelActor; -import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.messages.ExchangeMessages; import org.apache.qpid.server.logging.subjects.ChannelLogSubject; @@ -154,7 +153,6 @@ public class AMQChannel<T extends AMQProtocolSession<T>> private final AtomicBoolean _blocking = new AtomicBoolean(false); - private LogActor _actor; private LogSubject _logSubject; private volatile boolean _rollingBack; @@ -178,19 +176,17 @@ public class AMQChannel<T extends AMQProtocolSession<T>> private Subject _subject; - public AMQChannel(T session, int channelId, MessageStore messageStore) + public AMQChannel(T session, int channelId, final MessageStore messageStore) throws AMQException { _session = session; _channelId = channelId; - _actor = new AMQPChannelActor(this, session.getLogActor().getRootMessageLogger()); _subject = new Subject(false, session.getAuthorizedSubject().getPrincipals(), session.getAuthorizedSubject().getPublicCredentials(), session.getAuthorizedSubject().getPrivateCredentials()); _subject.getPrincipals().add(new SessionPrincipal(this)); _logSubject = new ChannelLogSubject(this); - _actor.message(ChannelMessages.CREATE()); _messageStore = messageStore; @@ -214,6 +210,18 @@ public class AMQChannel<T extends AMQProtocolSession<T>> } } }); + + Subject.doAs(_subject, new PrivilegedAction<Object>() + { + @Override + public Object run() + { + SystemLog.message(ChannelMessages.CREATE()); + + return null; + } + }); + } /** Sets this channel to be part of a local transaction */ @@ -449,12 +457,13 @@ public class AMQChannel<T extends AMQProtocolSession<T>> } else { - _actor.message(ExchangeMessages.DISCARDMSG(_currentMessage.getExchangeName().asString(), - _currentMessage.getMessagePublishInfo().getRoutingKey() == null - ? null - : _currentMessage.getMessagePublishInfo() - .getRoutingKey() - .toString())); + SystemLog.message(ExchangeMessages.DISCARDMSG(_currentMessage.getExchangeName().asString(), + _currentMessage.getMessagePublishInfo().getRoutingKey() + == null + ? null + : _currentMessage.getMessagePublishInfo() + .getRoutingKey() + .toString())); } } @@ -684,7 +693,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> LogMessage operationalLogMessage = cause == null ? ChannelMessages.CLOSE() : ChannelMessages.CLOSE_FORCED(cause.getCode(), message); - CurrentActor.get().message(_logSubject, operationalLogMessage); + SystemLog.message(_logSubject, operationalLogMessage); unsubscribeAllConsumers(); @@ -977,7 +986,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> // Log Flow Started before we start the subscriptions if (!suspended) { - _actor.message(_logSubject, ChannelMessages.FLOW("Started")); + SystemLog.message(_logSubject, ChannelMessages.FLOW("Started")); } @@ -1028,7 +1037,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> // stopped. if (suspended) { - _actor.message(_logSubject, ChannelMessages.FLOW("Stopped")); + SystemLog.message(_logSubject, ChannelMessages.FLOW("Stopped")); } } @@ -1174,7 +1183,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> public void setCredit(final long prefetchSize, final int prefetchCount) { - _actor.message(ChannelMessages.PREFETCH_SIZE(prefetchSize, prefetchCount)); + SystemLog.message(ChannelMessages.PREFETCH_SIZE(prefetchSize, prefetchCount)); _creditManager.setCreditLimits(prefetchSize, prefetchCount); } @@ -1431,19 +1440,13 @@ public class AMQChannel<T extends AMQProtocolSession<T>> } } - - public LogActor getLogActor() - { - return _actor; - } - public synchronized void block() { if(_blockingEntities.add(this)) { if(_blocking.compareAndSet(false,true)) { - _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED("** All Queues **")); + SystemLog.message(_logSubject, ChannelMessages.FLOW_ENFORCED("** All Queues **")); flow(false); } } @@ -1455,7 +1458,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> { if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false)) { - _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED()); + SystemLog.message(_logSubject, ChannelMessages.FLOW_REMOVED()); flow(true); } @@ -1469,7 +1472,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> if(_blocking.compareAndSet(false,true)) { - _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED(queue.getName())); + SystemLog.message(_logSubject, ChannelMessages.FLOW_ENFORCED(queue.getName())); flow(false); } } @@ -1481,8 +1484,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> { if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false) && !isClosing()) { - _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED()); - + SystemLog.message(_logSubject, ChannelMessages.FLOW_REMOVED()); flow(true); } } @@ -1552,14 +1554,14 @@ public class AMQChannel<T extends AMQProtocolSession<T>> else { final ServerMessage msg = rejectedQueueEntry.getMessage(); - final Consumer sub = rejectedQueueEntry.getDeliveredConsumer(); + int requeues = rejectedQueueEntry.routeToAlternate(new Action<MessageInstance>() { @Override public void performAction(final MessageInstance requeueEntry) { - _actor.message( _logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), + SystemLog.message( _logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), requeueEntry.getOwningResource().getName())); } }, null); @@ -1577,7 +1579,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> if (altExchange == null) { _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag); - _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getInitialRoutingAddress())); + SystemLog.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getInitialRoutingAddress())); } else @@ -1585,7 +1587,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> _logger.debug( "Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: " + deliveryTag); - _actor.message(_logSubject, + SystemLog.message(_logSubject, ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName())); } } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 167a505c19..80c4a9fde7 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; +import java.security.AccessController; import java.security.Principal; import java.security.PrivilegedAction; import java.util.ArrayList; @@ -55,15 +56,12 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.connection.ConnectionPrincipal; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl; -import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.actors.AMQPConnectionActor; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.ManagementActor; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.logging.subjects.ConnectionLogSubject; import org.apache.qpid.server.model.Broker; @@ -143,7 +141,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private final long _connectionID; private Object _reference = new Object(); - private AMQPConnectionActor _actor; private LogSubject _logSubject; private long _lastIoTime; @@ -172,7 +169,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private long _readBytes; public AMQProtocolEngine(Broker broker, - NetworkConnection network, + final NetworkConnection network, final long connectionId, Port port, Transport transport) @@ -184,21 +181,44 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _receivedLock = new ReentrantLock(); _stateManager = new AMQStateManager(broker, this); _codecFactory = new AMQCodecFactory(true, this); - - setNetworkConnection(network); _connectionID = connectionId; + _logSubject = new ConnectionLogSubject(this); - _actor = new AMQPConnectionActor(this, _broker.getRootMessageLogger()); _authorizedSubject.getPrincipals().add(new ConnectionPrincipal(this)); + runAsSubject(new PrivilegedAction<Void>() + { - _logSubject = new ConnectionLogSubject(this); + @Override + public Void run() + { + setNetworkConnection(network); + + SystemLog.message(ConnectionMessages.OPEN(null, null, null, null, false, false, false, false)); + + _closeWhenNoRoute = (Boolean)_broker.getAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE); - _actor.message(ConnectionMessages.OPEN(null, null, null, null, false, false, false, false)); + initialiseStatistics(); - _closeWhenNoRoute = (Boolean)_broker.getAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE); + return null; + } + }); - initialiseStatistics(); + } + private <T> T runAsSubject(PrivilegedAction<T> action) + { + return Subject.doAs(getAuthorizedSubject(), action); + } + + private boolean runningAsSubject() + { + return getAuthorizedSubject().equals(Subject.getSubject(AccessController.getContext())); + } + + @Override + public Subject getSubject() + { + return _authorizedSubject; } public void setNetworkConnection(NetworkConnection network) @@ -217,11 +237,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return _connectionID; } - public LogActor getLogActor() - { - return _actor; - } - public void setMaxFrameSize(long frameMax) { _maxFrameSize = frameMax; @@ -400,74 +415,57 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi AMQBody body = frame.getBodyFrame(); - //Look up the Channel's Actor and set that as the current actor - // If that is not available then we can use the ConnectionActor - // that is associated with this AMQMPSession. - LogActor channelActor = null; - if (amqChannel != null) + long startTime = 0; + String frameToString = null; + if (_logger.isDebugEnabled()) { - channelActor = amqChannel.getLogActor(); + startTime = System.currentTimeMillis(); + frameToString = frame.toString(); + _logger.debug("RECV: " + frame); } - CurrentActor.set(channelActor == null ? _actor : channelActor); - try + // Check that this channel is not closing + if (channelAwaitingClosure(channelId)) { - long startTime = 0; - String frameToString = null; - if (_logger.isDebugEnabled()) + if ((frame.getBodyFrame() instanceof ChannelCloseOkBody)) { - startTime = System.currentTimeMillis(); - frameToString = frame.toString(); - _logger.debug("RECV: " + frame); - } - - // Check that this channel is not closing - if (channelAwaitingClosure(channelId)) - { - if ((frame.getBodyFrame() instanceof ChannelCloseOkBody)) - { - if (_logger.isInfoEnabled()) - { - _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok"); - } - } - else + if (_logger.isInfoEnabled()) { - // The channel has been told to close, we don't process any more frames until - // it's closed. - return; + _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok"); } } - - try - { - body.handle(channelId, this); - } - catch(AMQConnectionException e) - { - _logger.info(e.getMessage() + " whilst processing frame: " + body); - closeConnection(channelId, e); - throw e; - } - catch (AMQException e) - { - closeChannel(channelId, e.getErrorCode() == null ? AMQConstant.INTERNAL_ERROR : e.getErrorCode(), e.getMessage()); - throw e; - } - catch (TransportException e) + else { - closeChannel(channelId, AMQConstant.CHANNEL_ERROR, e.getMessage()); - throw e; + // The channel has been told to close, we don't process any more frames until + // it's closed. + return; } + } - if(_logger.isDebugEnabled()) - { - _logger.debug("Frame handled in " + (System.currentTimeMillis() - startTime) + " ms. Frame: " + frameToString); - } + try + { + body.handle(channelId, this); } - finally + catch(AMQConnectionException e) + { + _logger.info(e.getMessage() + " whilst processing frame: " + body); + closeConnection(channelId, e); + throw e; + } + catch (AMQException e) { - CurrentActor.remove(); + closeChannel(channelId, e.getErrorCode() == null ? AMQConstant.INTERNAL_ERROR : e.getErrorCode(), e.getMessage()); + throw e; + } + catch (TransportException e) + { + closeChannel(channelId, AMQConstant.CHANNEL_ERROR, e.getMessage()); + throw e; + } + + if(_logger.isDebugEnabled()) + { + _logger.debug("Frame handled in " + (System.currentTimeMillis() - startTime) + " ms. Frame: " + frameToString); } } @@ -478,7 +476,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi try { // Log incoming protocol negotiation request - _actor.message(ConnectionMessages.OPEN(null, pi.getProtocolMajor() + "-" + pi.getProtocolMinor(), null, null, false, true, false, false)); + SystemLog.message(ConnectionMessages.OPEN(null, + pi.getProtocolMajor() + "-" + pi.getProtocolMinor(), + null, + null, + false, + true, + false, + false)); ProtocolVersion pv = pi.checkVersion(); // Fails if not correct @@ -778,22 +783,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _maxNoOfChannels = value; } - public void commitTransactions(AMQChannel<AMQProtocolEngine> channel) throws AMQException - { - if ((channel != null) && channel.isTransactional()) - { - channel.commit(); - } - } - - public void rollbackTransactions(AMQChannel<AMQProtocolEngine> channel) throws AMQException - { - if ((channel != null) && channel.isTransactional()) - { - channel.rollback(); - } - } - /** * Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue * subscriptions (this may in turn remove queues if they are auto delete</li> </ul> @@ -908,77 +897,89 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi @Override public void closeSession() { - if(_closing.compareAndSet(false,true)) - { - // force sync of outstanding async work - _receivedLock.lock(); - try - { - receivedComplete(); - } - finally - { - _receivedLock.unlock(); - } - // REMOVE THIS SHOULD NOT BE HERE. - if (CurrentActor.get() == null) - { - CurrentActor.set(_actor); - } - if (!_closed) + if(runningAsSubject()) + { + if(_closing.compareAndSet(false,true)) { - if (_virtualHost != null) + // force sync of outstanding async work + _receivedLock.lock(); + try { - _virtualHost.getConnectionRegistry().deregisterConnection(this); + receivedComplete(); } - - closeAllChannels(); - - for (Action<? super AMQProtocolEngine> task : _taskList) + finally { - task.performAction(this); + _receivedLock.unlock(); } - synchronized(this) + if (!_closed) { - _closed = true; - notifyAll(); + if (_virtualHost != null) + { + _virtualHost.getConnectionRegistry().deregisterConnection(this); + } + + closeAllChannels(); + + for (Action<? super AMQProtocolEngine> task : _taskList) + { + task.performAction(this); + } + + synchronized(this) + { + _closed = true; + notifyAll(); + } + SystemLog.message(_logSubject, ConnectionMessages.CLOSE()); } - CurrentActor.get().message(_logSubject, ConnectionMessages.CLOSE()); } - } - else - { - synchronized(this) + else { + synchronized(this) + { - boolean lockHeld = _receivedLock.isHeldByCurrentThread(); + boolean lockHeld = _receivedLock.isHeldByCurrentThread(); - while(!_closed) - { - try + while(!_closed) { - if(lockHeld) + try + { + if(lockHeld) + { + _receivedLock.unlock(); + } + wait(1000); + } + catch (InterruptedException e) { - _receivedLock.unlock(); + // do nothing } - wait(1000); - } - catch (InterruptedException e) - { - // do nothing - } - finally - { - if(lockHeld) + finally { - _receivedLock.lock(); + if(lockHeld) + { + _receivedLock.lock(); + } } } } } } + else + { + runAsSubject(new PrivilegedAction<Object>() + { + @Override + public Object run() + { + closeSession(); + return null; + } + }); + + } } private void closeConnection(int channelId, AMQConnectionException e) @@ -1091,7 +1092,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi setContextKey(new AMQShortString(clientId)); } - _actor.message(ConnectionMessages.OPEN(clientId, _protocolVersion.toString(), _clientVersion, _clientProduct, true, true, true, true)); + SystemLog.message(ConnectionMessages.OPEN(clientId, + _protocolVersion.toString(), + _clientVersion, + _clientProduct, + true, + true, + true, + true)); } } @@ -1361,40 +1369,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return String.valueOf(getRemoteAddress()); } - public void mgmtCloseChannel(int channelId) - { - MethodRegistry methodRegistry = getMethodRegistry(); - ChannelCloseBody responseBody = - methodRegistry.createChannelCloseBody( - AMQConstant.REPLY_SUCCESS.getCode(), - new AMQShortString("The channel was closed using the broker's management interface."), - 0,0); - - // This seems ugly but because we use AMQChannel.close() in both normal - // broker operation and as part of the management interface it cannot - // be avoided. The Current Actor will be null when this method is - // called via the QMF management interface. As such we need to set one. - boolean removeActor = false; - if (CurrentActor.get() == null) - { - removeActor = true; - CurrentActor.set(new ManagementActor(_actor.getRootMessageLogger())); - } - - try - { - writeFrame(responseBody.generateFrame(channelId)); - closeChannel(channelId); - } - finally - { - if (removeActor) - { - CurrentActor.remove(); - } - } - } - public void closeSession(AMQChannel<AMQProtocolEngine> session, AMQConstant cause, String message) { int channelId = session.getChannelId(); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java index 045367b667..ade087ef21 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java @@ -35,9 +35,7 @@ import org.apache.qpid.framing.MethodDispatcher; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; -import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.protocol.AMQConnectionModel; -import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -48,8 +46,6 @@ public interface AMQProtocolSession<T extends AMQProtocolSession<T>> { long getSessionID(); - LogActor getLogActor(); - void setMaxFrameSize(long frameMax); long getMaxFrameSize(); @@ -202,14 +198,8 @@ public interface AMQProtocolSession<T extends AMQProtocolSession<T>> void setMaximumNumberOfChannels(Long value); - void commitTransactions(AMQChannel<T> channel) throws AMQException; - - void rollbackTransactions(AMQChannel<T> channel) throws AMQException; - List<AMQChannel<T>> getChannels(); - void mgmtCloseChannel(int channelId); - public Principal getPeerPrincipal(); Lock getReceivedLock(); diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java index f82689e36a..3c3c17a6fb 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java @@ -511,6 +511,12 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut return _connectionId; } + @Override + public Subject getSubject() + { + return _connection.getSubject(); + } + public long getLastReadTime() { return _lastReadTime; diff --git a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java index d11b1e029c..9a4adbc4c7 100644 --- a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java +++ b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java @@ -33,7 +33,7 @@ import javax.servlet.DispatcherType; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.IllegalConfigurationException; -import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.logging.messages.ManagementConsoleMessages; import org.apache.qpid.server.management.plugin.filter.ForbiddingAuthorisationFilter; import org.apache.qpid.server.management.plugin.filter.RedirectingAuthorisationFilter; @@ -140,7 +140,7 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem private void start() { - CurrentActor.get().message(ManagementConsoleMessages.STARTUP(OPERATIONAL_LOGGING_NAME)); + SystemLog.message(ManagementConsoleMessages.STARTUP(OPERATIONAL_LOGGING_NAME)); Collection<Port> httpPorts = getHttpPorts(getBroker().getPorts()); _server = createServer(httpPorts); @@ -154,7 +154,7 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem throw new ServerScopedRuntimeException("Failed to start HTTP management on ports : " + httpPorts, e); } - CurrentActor.get().message(ManagementConsoleMessages.READY(OPERATIONAL_LOGGING_NAME)); + SystemLog.message(ManagementConsoleMessages.READY(OPERATIONAL_LOGGING_NAME)); } private void stop() @@ -172,7 +172,7 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem } } - CurrentActor.get().message(ManagementConsoleMessages.STOPPED(OPERATIONAL_LOGGING_NAME)); + SystemLog.message(ManagementConsoleMessages.STOPPED(OPERATIONAL_LOGGING_NAME)); } public int getSessionTimeout() @@ -381,13 +381,14 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem Connector[] connectors = server.getConnectors(); for (Connector connector : connectors) { - CurrentActor.get().message(ManagementConsoleMessages.LISTENING(stringifyConnectorScheme(connector), connector.getPort())); + SystemLog.message(ManagementConsoleMessages.LISTENING(stringifyConnectorScheme(connector), + connector.getPort())); if (connector instanceof SslSocketConnector) { SslContextFactory sslContextFactory = ((SslSocketConnector)connector).getSslContextFactory(); if (sslContextFactory != null && sslContextFactory.getKeyStorePath() != null) { - CurrentActor.get().message(ManagementConsoleMessages.SSL_KEYSTORE(sslContextFactory.getKeyStorePath())); + SystemLog.message(ManagementConsoleMessages.SSL_KEYSTORE(sslContextFactory.getKeyStorePath())); } } } @@ -398,7 +399,8 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem Connector[] connectors = server.getConnectors(); for (Connector connector : connectors) { - CurrentActor.get().message(ManagementConsoleMessages.SHUTTING_DOWN(stringifyConnectorScheme(connector), connector.getPort())); + SystemLog.message(ManagementConsoleMessages.SHUTTING_DOWN(stringifyConnectorScheme(connector), + connector.getPort())); } } diff --git a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java index 5d39b3c0c1..023453e74f 100644 --- a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java +++ b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java @@ -22,11 +22,8 @@ package org.apache.qpid.server.management.plugin; import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.security.AccessControlException; import java.security.Principal; import java.security.PrivilegedAction; -import java.security.PrivilegedActionException; -import java.security.PrivilegedExceptionAction; import java.security.cert.X509Certificate; import java.util.Collections; @@ -37,9 +34,6 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpSession; import org.apache.commons.codec.binary.Base64; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.HttpManagementActor; import org.apache.qpid.server.management.plugin.servlet.ServletConnectionPrincipal; import org.apache.qpid.server.management.plugin.session.LoginLogoutReporter; import org.apache.qpid.server.model.AuthenticationProvider; @@ -51,7 +45,6 @@ import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationS import org.apache.qpid.server.security.auth.SubjectAuthenticationResult; import org.apache.qpid.server.security.auth.UsernamePrincipal; import org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerFactory; -import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.transport.network.security.ssl.SSLUtil; public class HttpManagementUtil @@ -117,55 +110,33 @@ public class HttpManagementUtil subject.getPrincipals().add(new ServletConnectionPrincipal(request)); subject.setReadOnly(); - LogActor actor = createHttpManagementActor(broker, request); + assertManagementAccess(broker.getSecurityManager(), subject); - assertManagementAccess(broker.getSecurityManager(), subject, actor); - - saveAuthorisedSubject(session, subject, actor); + saveAuthorisedSubject(session, subject); } } - public static void assertManagementAccess(final SecurityManager securityManager, Subject subject, LogActor actor) + public static void assertManagementAccess(final SecurityManager securityManager, Subject subject) { - CurrentActor.set(actor); - try + Subject.doAs(subject, new PrivilegedAction<Void>() { - Subject.doAs(subject, new PrivilegedAction<Void>() + @Override + public Void run() { - @Override - public Void run() - { - securityManager.accessManagement(); - return null; - } - }); - } - finally - { - CurrentActor.remove(); - } - } - - public static HttpManagementActor getOrCreateAndCacheLogActor(HttpServletRequest request, Broker broker) - { - HttpSession session = request.getSession(); - HttpManagementActor actor = (HttpManagementActor) session.getAttribute(ATTR_LOG_ACTOR); - if (actor == null) - { - actor = createHttpManagementActor(broker, request); - session.setAttribute(ATTR_LOG_ACTOR, actor); - } - return actor; + securityManager.accessManagement(); + return null; + } + }); } - public static void saveAuthorisedSubject(HttpSession session, Subject subject, LogActor logActor) + public static void saveAuthorisedSubject(HttpSession session, Subject subject) { session.setAttribute(ATTR_SUBJECT, subject); // Cause the user logon to be logged. - session.setAttribute(ATTR_LOGIN_LOGOUT_REPORTER, new LoginLogoutReporter(logActor, subject)); + session.setAttribute(ATTR_LOGIN_LOGOUT_REPORTER, new LoginLogoutReporter(subject)); } public static Subject tryToAuthenticate(HttpServletRequest request, HttpManagementConfiguration managementConfig) @@ -245,9 +216,5 @@ public class HttpManagementUtil return null; } - private static HttpManagementActor createHttpManagementActor(Broker broker, HttpServletRequest request) - { - return new HttpManagementActor(broker.getRootMessageLogger(), request.getRemoteAddr(), request.getRemotePort()); - } } diff --git a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/ServletConnectionPrincipal.java b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/ServletConnectionPrincipal.java index 0925e2b088..18a026ec93 100644 --- a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/ServletConnectionPrincipal.java +++ b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/ServletConnectionPrincipal.java @@ -20,13 +20,14 @@ */ package org.apache.qpid.server.management.plugin.servlet; +import org.apache.qpid.server.security.auth.ManagementConnectionPrincipal; import org.apache.qpid.server.security.auth.SocketConnectionPrincipal; import javax.servlet.ServletRequest; import java.net.InetSocketAddress; import java.net.SocketAddress; -public class ServletConnectionPrincipal implements SocketConnectionPrincipal +public class ServletConnectionPrincipal implements ManagementConnectionPrincipal { private final InetSocketAddress _address; @@ -74,4 +75,10 @@ public class ServletConnectionPrincipal implements SocketConnectionPrincipal { return _address.hashCode(); } + + @Override + public String getType() + { + return "HTTP"; + } } diff --git a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java index 1133b6e091..a9e80db3bf 100644 --- a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java +++ b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java @@ -33,13 +33,9 @@ import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.log4j.Logger; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.HttpManagementActor; import org.apache.qpid.server.management.plugin.HttpManagementConfiguration; import org.apache.qpid.server.management.plugin.HttpManagementUtil; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.codehaus.jackson.JsonGenerationException; import org.codehaus.jackson.map.JsonMappingException; @@ -88,7 +84,6 @@ public abstract class AbstractServlet extends HttpServlet /** * Performs the GET action as the logged-in {@link Subject}. - * The {@link LogActor} is set before this method is called. * Subclasses commonly override this method */ protected void doGetWithSubjectAndActor(HttpServletRequest request, HttpServletResponse resp) throws ServletException, IOException @@ -117,7 +112,6 @@ public abstract class AbstractServlet extends HttpServlet /** * Performs the POST action as the logged-in {@link Subject}. - * The {@link LogActor} is set before this method is called. * Subclasses commonly override this method */ protected void doPostWithSubjectAndActor(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException @@ -145,7 +139,6 @@ public abstract class AbstractServlet extends HttpServlet /** * Performs the PUT action as the logged-in {@link Subject}. - * The {@link LogActor} is set before this method is called. * Subclasses commonly override this method */ protected void doPutWithSubjectAndActor(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException @@ -174,7 +167,6 @@ public abstract class AbstractServlet extends HttpServlet /** * Performs the PUT action as the logged-in {@link Subject}. - * The {@link LogActor} is set before this method is called. * Subclasses commonly override this method */ protected void doDeleteWithSubjectAndActor(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException @@ -198,8 +190,6 @@ public abstract class AbstractServlet extends HttpServlet return; } - HttpManagementActor logActor = HttpManagementUtil.getOrCreateAndCacheLogActor(request, _broker); - CurrentActor.set(logActor); try { Subject.doAs(subject, privilegedExceptionAction); @@ -223,11 +213,6 @@ public abstract class AbstractServlet extends HttpServlet } throw new ConnectionScopedRuntimeException(e.getCause()); } - finally - { - CurrentActor.remove(); - } - } protected Subject getAuthorisedSubject(HttpServletRequest request) diff --git a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/SaslServlet.java b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/SaslServlet.java index b59e9bc04d..2ca67fadc9 100644 --- a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/SaslServlet.java +++ b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/SaslServlet.java @@ -27,7 +27,6 @@ import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.SerializationConfig; import org.apache.log4j.Logger; -import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.management.plugin.HttpManagementConfiguration; import org.apache.qpid.server.management.plugin.HttpManagementUtil; import org.apache.qpid.server.model.Broker; @@ -247,10 +246,9 @@ public class SaslServlet extends AbstractServlet subject.setReadOnly(); Broker broker = getBroker(); - LogActor actor = HttpManagementUtil.getOrCreateAndCacheLogActor(request, broker); try { - HttpManagementUtil.assertManagementAccess(broker.getSecurityManager(), subject, actor); + HttpManagementUtil.assertManagementAccess(broker.getSecurityManager(), subject); } catch(SecurityException e) { @@ -258,7 +256,7 @@ public class SaslServlet extends AbstractServlet return; } - HttpManagementUtil.saveAuthorisedSubject(request.getSession(), subject, actor); + HttpManagementUtil.saveAuthorisedSubject(request.getSession(), subject); session.removeAttribute(ATTR_ID); session.removeAttribute(ATTR_SASL_SERVER); session.removeAttribute(ATTR_EXPIRY); diff --git a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/session/LoginLogoutReporter.java b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/session/LoginLogoutReporter.java index 238f1b4719..5155654e82 100644 --- a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/session/LoginLogoutReporter.java +++ b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/session/LoginLogoutReporter.java @@ -28,7 +28,7 @@ import javax.servlet.http.HttpSessionBindingEvent; import javax.servlet.http.HttpSessionBindingListener; import org.apache.log4j.Logger; -import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.logging.messages.ManagementConsoleMessages; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; @@ -40,14 +40,12 @@ import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; public class LoginLogoutReporter implements HttpSessionBindingListener { private static final Logger LOGGER = Logger.getLogger(LoginLogoutReporter.class); - private final LogActor _logActor; private final Subject _subject; private final Principal _principal; - public LoginLogoutReporter(LogActor logActor, Subject subject) + public LoginLogoutReporter(Subject subject) { super(); - _logActor = logActor; _subject = subject; _principal = AuthenticatedPrincipal.getAuthenticatedPrincipalFromSubject(_subject); } @@ -76,7 +74,7 @@ public class LoginLogoutReporter implements HttpSessionBindingListener @Override public Void run() { - _logActor.message(ManagementConsoleMessages.OPEN(_principal.getName())); + SystemLog.message(ManagementConsoleMessages.OPEN(_principal.getName())); return null; } }); @@ -94,7 +92,7 @@ public class LoginLogoutReporter implements HttpSessionBindingListener @Override public Void run() { - _logActor.message(ManagementConsoleMessages.CLOSE(_principal.getName())); + SystemLog.message(ManagementConsoleMessages.CLOSE(_principal.getName())); return null; } }); diff --git a/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/session/LoginLogoutReporterTest.java b/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/session/LoginLogoutReporterTest.java index 1d43c44587..16ae5220ab 100644 --- a/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/session/LoginLogoutReporterTest.java +++ b/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/session/LoginLogoutReporterTest.java @@ -19,16 +19,20 @@ */ package org.apache.qpid.server.management.plugin.session; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.when; import javax.security.auth.Subject; -import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogMessage; +import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.mockito.ArgumentMatcher; -import org.mockito.Mockito; import junit.framework.TestCase; @@ -36,7 +40,7 @@ public class LoginLogoutReporterTest extends TestCase { private LoginLogoutReporter _loginLogoutReport; private Subject _subject = new Subject(); - private LogActor _logActor = Mockito.mock(LogActor.class); + private RootMessageLogger _logger = mock(RootMessageLogger.class); @Override protected void setUp() throws Exception @@ -44,19 +48,22 @@ public class LoginLogoutReporterTest extends TestCase super.setUp(); _subject.getPrincipals().add(new AuthenticatedPrincipal("mockusername")); - _loginLogoutReport = new LoginLogoutReporter(_logActor, _subject); + when(_logger.isEnabled()).thenReturn(true); + when(_logger.isMessageEnabled(anyString())).thenReturn(true); + SystemLog.setRootMessageLogger(_logger); + _loginLogoutReport = new LoginLogoutReporter(_subject); } public void testLoginLogged() { _loginLogoutReport.valueBound(null); - verify(_logActor).message(isLogMessageWithMessage("MNG-1007 : Open : User mockusername")); + verify(_logger).message(isLogMessageWithMessage("MNG-1007 : Open : User mockusername")); } public void testLogoutLogged() { _loginLogoutReport.valueUnbound(null); - verify(_logActor).message(isLogMessageWithMessage("MNG-1008 : Close : User mockusername")); + verify(_logger).message(isLogMessageWithMessage("MNG-1008 : Close : User mockusername")); } private LogMessage isLogMessageWithMessage(final String expectedMessage) diff --git a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagedObjectRegistry.java b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagedObjectRegistry.java index e2b9a98784..8b1463b476 100644 --- a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagedObjectRegistry.java +++ b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagedObjectRegistry.java @@ -20,10 +20,9 @@ */ package org.apache.qpid.server.jmx; -import javax.net.ssl.KeyManager; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.BrokerProperties; -import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.logging.messages.ManagementConsoleMessages; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.KeyStore; @@ -32,7 +31,6 @@ import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.security.auth.jmx.JMXPasswordAuthenticator; import org.apache.qpid.server.util.ServerScopedRuntimeException; -import org.apache.qpid.ssl.SSLContextFactory; import javax.management.JMException; import javax.management.MBeanServer; @@ -99,12 +97,12 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry @Override public void start() throws IOException { - CurrentActor.get().message(ManagementConsoleMessages.STARTUP(OPERATIONAL_LOGGING_NAME)); + SystemLog.message(ManagementConsoleMessages.STARTUP(OPERATIONAL_LOGGING_NAME)); //check if system properties are set to use the JVM's out-of-the-box JMXAgent if (areOutOfTheBoxJMXOptionsSet()) { - CurrentActor.get().message(ManagementConsoleMessages.READY(OPERATIONAL_LOGGING_NAME)); + SystemLog.message(ManagementConsoleMessages.READY(OPERATIONAL_LOGGING_NAME)); } else { @@ -138,7 +136,7 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry throw new ServerScopedRuntimeException("Unable to create SSLContext for key store", e); } - CurrentActor.get().message(ManagementConsoleMessages.SSL_KEYSTORE(keyStore.getName())); + SystemLog.message(ManagementConsoleMessages.SSL_KEYSTORE(keyStore.getName())); //create the SSL RMI socket factories csf = new SslRMIClientSocketFactory(); @@ -250,8 +248,8 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry _cs.start(); String connectorServer = (connectorSslEnabled ? "SSL " : "") + "JMX RMIConnectorServer"; - CurrentActor.get().message(ManagementConsoleMessages.LISTENING(connectorServer, jmxPortConnectorServer)); - CurrentActor.get().message(ManagementConsoleMessages.READY(OPERATIONAL_LOGGING_NAME)); + SystemLog.message(ManagementConsoleMessages.LISTENING(connectorServer, jmxPortConnectorServer)); + SystemLog.message(ManagementConsoleMessages.READY(OPERATIONAL_LOGGING_NAME)); } private Registry createRmiRegistry(int jmxPortRegistryServer, boolean useCustomRmiRegistry) @@ -269,7 +267,7 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry rmiRegistry = LocateRegistry.createRegistry(jmxPortRegistryServer, null, null); } - CurrentActor.get().message(ManagementConsoleMessages.LISTENING("RMI Registry", jmxPortRegistryServer)); + SystemLog.message(ManagementConsoleMessages.LISTENING("RMI Registry", jmxPortRegistryServer)); return rmiRegistry; } @@ -294,7 +292,7 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry unregisterAllMbeans(); - CurrentActor.get().message(ManagementConsoleMessages.STOPPED(OPERATIONAL_LOGGING_NAME)); + SystemLog.message(ManagementConsoleMessages.STOPPED(OPERATIONAL_LOGGING_NAME)); } private void closeConnectorAndRegistryServers() @@ -338,7 +336,7 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry if (_rmiRegistry != null) { // Stopping the RMI registry - CurrentActor.get().message(ManagementConsoleMessages.SHUTTING_DOWN("RMI Registry", _registryPort.getPort())); + SystemLog.message(ManagementConsoleMessages.SHUTTING_DOWN("RMI Registry", _registryPort.getPort())); try { boolean success = UnicastRemoteObject.unexportObject(_rmiRegistry, false); @@ -365,7 +363,8 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry // Stopping the JMX ConnectorServer try { - CurrentActor.get().message(ManagementConsoleMessages.SHUTTING_DOWN("JMX RMIConnectorServer", _cs.getAddress().getPort())); + SystemLog.message(ManagementConsoleMessages.SHUTTING_DOWN("JMX RMIConnectorServer", + _cs.getAddress().getPort())); _cs.stop(); } catch (IOException e) diff --git a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/MBeanInvocationHandlerImpl.java b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/MBeanInvocationHandlerImpl.java index a38addae7b..b20685985f 100644 --- a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/MBeanInvocationHandlerImpl.java +++ b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/MBeanInvocationHandlerImpl.java @@ -23,8 +23,6 @@ package org.apache.qpid.server.jmx; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.BrokerProperties; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.ManagementActor; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.security.SecurityManager; @@ -39,17 +37,13 @@ import javax.management.MBeanServer; import javax.management.ObjectName; import javax.management.RuntimeErrorException; import javax.management.remote.MBeanServerForwarder; -import javax.management.remote.rmi.RMIServer; import javax.security.auth.Subject; import java.lang.reflect.InvocationHandler; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Proxy; -import java.rmi.server.RemoteServer; -import java.rmi.server.ServerNotActiveException; import java.security.AccessControlContext; import java.security.AccessController; -import java.security.PrivilegedAction; import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; import java.util.Arrays; @@ -64,7 +58,6 @@ public class MBeanInvocationHandlerImpl implements InvocationHandler private final static String DELEGATE = "JMImplementation:type=MBeanServerDelegate"; private MBeanServer _mbs; - private final ManagementActor _logActor; private final boolean _managementRightsInferAllAccess; private final Broker _broker; @@ -73,7 +66,6 @@ public class MBeanInvocationHandlerImpl implements InvocationHandler { _managementRightsInferAllAccess = Boolean.valueOf(System.getProperty(BrokerProperties.PROPERTY_MANAGEMENT_RIGHTS_INFER_ALL_ACCESS, "true")); _broker = broker; - _logActor = new ManagementActor(broker.getRootMessageLogger()); } public static MBeanServerForwarder newProxyInstance(Broker broker) @@ -117,9 +109,9 @@ public class MBeanInvocationHandlerImpl implements InvocationHandler return false; } - public Object invoke(Object proxy, Method method, Object[] args) throws Throwable + public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable { - String methodName = method.getName(); + final String methodName = method.getName(); if (methodName.equals("getMBeanServer")) { @@ -167,16 +159,7 @@ public class MBeanInvocationHandlerImpl implements InvocationHandler throw new SecurityException("Access denied: no authenticated principal", e); } - // Save the subject - CurrentActor.set(_logActor); - try - { - return authoriseAndInvoke(method, args); - } - finally - { - CurrentActor.remove(); - } + return authoriseAndInvoke(method, args); } catch (InvocationTargetException e) { @@ -207,7 +190,7 @@ public class MBeanInvocationHandlerImpl implements InvocationHandler } } - private Object authoriseAndInvoke(final Method method, final Object[] args) throws Throwable + private Object authoriseAndInvoke(final Method method, final Object[] args) throws Exception { String methodName; // Get the component, type and impact, which may be null @@ -239,8 +222,11 @@ public class MBeanInvocationHandlerImpl implements InvocationHandler { try { + Subject subject = Subject.getSubject(AccessController.getContext()); + subject = new Subject(false, subject.getPrincipals(), subject.getPublicCredentials(), subject.getPrivateCredentials()); + subject.getPrincipals().addAll(SecurityManager.SYSTEM.getPrincipals()); - return Subject.doAs(SecurityManager.SYSTEM, new PrivilegedExceptionAction<Object>() + return Subject.doAs(subject, new PrivilegedExceptionAction<Object>() { @Override public Object run() throws IllegalAccessException, InvocationTargetException @@ -251,7 +237,7 @@ public class MBeanInvocationHandlerImpl implements InvocationHandler } catch (PrivilegedActionException e) { - throw e.getCause(); + throw (Exception) e.getCause(); } } else diff --git a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/ManagementLogonLogoffReporter.java b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/ManagementLogonLogoffReporter.java index ae0574dc21..79d944fc5c 100644 --- a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/ManagementLogonLogoffReporter.java +++ b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/ManagementLogonLogoffReporter.java @@ -27,12 +27,19 @@ import javax.management.Notification; import javax.management.NotificationFilter; import javax.management.NotificationListener; import javax.management.remote.JMXConnectionNotification; +import javax.security.auth.Subject; import org.apache.log4j.Logger; -import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.RootMessageLogger; -import org.apache.qpid.server.logging.actors.ManagementActor; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.logging.messages.ManagementConsoleMessages; +import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; +import org.apache.qpid.server.security.auth.jmx.JMXConnectionPrincipal; + +import java.rmi.server.RemoteServer; +import java.rmi.server.ServerNotActiveException; +import java.security.PrivilegedAction; +import java.util.Collections; public class ManagementLogonLogoffReporter implements NotificationListener, NotificationFilter { @@ -65,19 +72,41 @@ public class ManagementLogonLogoffReporter implements NotificationListener, Not final String[] splitConnectionId = connectionId.split(" "); user = splitConnectionId[1]; } + Subject originalSubject = new Subject(false, Collections.singleton(new AuthenticatedPrincipal(user)), Collections.emptySet(), Collections.emptySet()); + Subject subject; - // use a separate instance of actor as subject is not set on connect/disconnect - // we need to pass principal name explicitly into log actor - LogActor logActor = new ManagementActor(_rootMessageLogger, user); - if (JMXConnectionNotification.OPENED.equals(type)) + try { - logActor.message(ManagementConsoleMessages.OPEN(user)); + String clientHost = RemoteServer.getClientHost(); + subject = new Subject(false, + originalSubject.getPrincipals(), + originalSubject.getPublicCredentials(), + originalSubject.getPrivateCredentials()); + subject.getPrincipals().add(new JMXConnectionPrincipal(clientHost)); + subject.setReadOnly(); } - else if (JMXConnectionNotification.CLOSED.equals(type) || - JMXConnectionNotification.FAILED.equals(type)) + catch(ServerNotActiveException e) { - logActor.message(ManagementConsoleMessages.CLOSE(user)); + subject = originalSubject; } + final String username = user; + Subject.doAs(subject, new PrivilegedAction<Object>() + { + @Override + public Object run() + { + if (JMXConnectionNotification.OPENED.equals(type)) + { + SystemLog.message(ManagementConsoleMessages.OPEN(username)); + } + else if (JMXConnectionNotification.CLOSED.equals(type) || + JMXConnectionNotification.FAILED.equals(type)) + { + SystemLog.message(ManagementConsoleMessages.CLOSE(username)); + } + return null; + } + }); } @Override diff --git a/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/ManagementLogonLogoffReporterTest.java b/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/ManagementLogonLogoffReporterTest.java index ba9c2cdaa5..515a9d88f2 100644 --- a/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/ManagementLogonLogoffReporterTest.java +++ b/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/ManagementLogonLogoffReporterTest.java @@ -23,6 +23,7 @@ import static javax.management.remote.JMXConnectionNotification.OPENED; import static javax.management.remote.JMXConnectionNotification.CLOSED; import static javax.management.remote.JMXConnectionNotification.FAILED; +import static org.mockito.Matchers.argThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.mockito.Mockito.verify; @@ -31,10 +32,14 @@ import static org.mockito.Matchers.anyString; import javax.management.remote.JMXConnectionNotification; -import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.RootMessageLogger; import junit.framework.TestCase; +import org.apache.qpid.server.logging.SystemLog; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.mockito.ArgumentMatcher; public class ManagementLogonLogoffReporterTest extends TestCase { @@ -52,8 +57,8 @@ public class ManagementLogonLogoffReporterTest extends TestCase _usernameAccessor = mock(UsernameAccessor.class); _rootMessageLogger = mock(RootMessageLogger.class); // Enable messaging so we can valid the generated strings - when(_rootMessageLogger.isMessageEnabled(any(LogActor.class), anyString())).thenReturn(true); - + when(_rootMessageLogger.isMessageEnabled(anyString())).thenReturn(true); + SystemLog.setRootMessageLogger(_rootMessageLogger); _reporter = new ManagementLogonLogoffReporter(_rootMessageLogger, _usernameAccessor); } @@ -64,7 +69,21 @@ public class ManagementLogonLogoffReporterTest extends TestCase _reporter.handleNotification(openNotification, null); - verify(_rootMessageLogger).rawMessage("[main] MNG-1007 : Open : User jmxuser", "qpid.message.managementconsole.open"); + verify(_rootMessageLogger).message(messageMatch("MNG-1007 : Open : User jmxuser", + "qpid.message.managementconsole.open")); + } + + private LogMessage messageMatch(final String message, final String hierarchy) + { + return argThat(new ArgumentMatcher<LogMessage>() + { + @Override + public boolean matches(final Object argument) + { + LogMessage actual = (LogMessage) argument; + return actual.getLogHierarchy().equals(hierarchy) && actual.toString().equals(message); + } + }); } public void testClosedNotification() @@ -74,7 +93,7 @@ public class ManagementLogonLogoffReporterTest extends TestCase _reporter.handleNotification(closeNotification, null); - verify(_rootMessageLogger).rawMessage("[main] MNG-1008 : Close : User jmxuser", "qpid.message.managementconsole.close"); + verify(_rootMessageLogger).message(messageMatch("MNG-1008 : Close : User jmxuser", "qpid.message.managementconsole.close")); } public void tesNotifiedForLogOnTypeEvents() diff --git a/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java b/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java index e8362f79f0..5c6918e87d 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java @@ -20,10 +20,14 @@ */ package org.apache.qpid.protocol; +import javax.security.auth.Subject; + public interface ServerProtocolEngine extends ProtocolEngine { /** * Gets the connection ID associated with this ProtocolEngine */ long getConnectionId(); + + Subject getSubject(); } diff --git a/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java b/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java index cec339c033..047151684f 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java @@ -81,7 +81,9 @@ public class ChannelLoggingTest extends AbstractTestLogging String log = getLogMessage(results, 0); // MESSAGE [con:0(guest@anonymous(3273383)/test)/ch:1] CHN-1001 : Create validateMessageID("CHN-1001", log); - assertEquals("Incorrect Channel in actor:"+fromActor(log), isBroker010()? 0 : 1, getChannelID(fromActor(log))); + final String fromActor = fromActor(log); + final int channelID = getChannelID(fromActor); + assertEquals("Incorrect Channel in actor:"+fromActor(log), isBroker010()? 0 : 1, channelID); if (!isBroker010()) { diff --git a/java/systests/src/main/java/org/apache/qpid/server/logging/VirtualHostLoggingTest.java b/java/systests/src/main/java/org/apache/qpid/server/logging/VirtualHostLoggingTest.java index 25dd5fd2f8..824a84eda8 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/logging/VirtualHostLoggingTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/logging/VirtualHostLoggingTest.java @@ -69,7 +69,7 @@ public class VirtualHostLoggingTest extends AbstractTestLogging { List<String> vhosts = Arrays.asList("test"); - assertEquals("Each vhost did not create a store.", vhosts.size(), results.size()); + assertEquals("Each vhost did not create a store.", 2*vhosts.size(), results.size()); for (int index = 0; index < results.size(); index++) { diff --git a/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java b/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java index 8bad73d0ea..c2713a5e1f 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java +++ b/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java @@ -20,11 +20,15 @@ */ package org.apache.qpid.test.utils; +import java.security.PrivilegedAction; import java.util.Set; import org.apache.log4j.Logger; import org.apache.qpid.server.Broker; +import org.apache.qpid.server.security.auth.TaskPrincipal; + +import javax.security.auth.Subject; public class InternalBrokerHolder implements BrokerHolder { @@ -57,8 +61,21 @@ public class InternalBrokerHolder implements BrokerHolder { LOGGER.info("Shutting down Broker instance"); - _broker.shutdown(); + Subject subject = org.apache.qpid.server.security.SecurityManager.SYSTEM; + subject = new Subject(false, subject.getPrincipals(), subject.getPublicCredentials(), subject.getPrivateCredentials()); + subject.getPrincipals().add(new TaskPrincipal("Shutdown")); + + Subject.doAs(subject, new PrivilegedAction<Object>() + { + @Override + public Object run() + { + _broker.shutdown(); + return null; + } + + }); waitUntilPortsAreFree(); LOGGER.info("Broker instance shutdown"); |