diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org')
21 files changed, 157 insertions, 76 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/BrokerInstance.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/BrokerInstance.java index ad4e40a562..8150cd7404 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/BrokerInstance.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/BrokerInstance.java @@ -255,6 +255,7 @@ public class BrokerInstance private void configureLogging(File logConfigFile, int logWatchTime) throws Exception { + _logger.info("configuring logging using file " + logConfigFile.getName()); if (logConfigFile.exists() && logConfigFile.canRead()) { CurrentActor.get().message(BrokerMessages.LOG_CONFIG(logConfigFile.getAbsolutePath())); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigStore.java index 0e03e33be8..7dfe9ff49a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigStore.java @@ -29,6 +29,8 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import sun.misc.Unsafe; + public class ConfigStore { private ConcurrentHashMap<ConfigObjectType, ConcurrentHashMap<UUID, ConfiguredObject>> _typeMap = diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java index c4cad1e5c9..18f41588d5 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java @@ -229,13 +229,12 @@ public abstract class ConfigurationPlugin return getListValue(property, Collections.<String>emptyList()); } - @SuppressWarnings("unchecked") protected List<String> getListValue(String property, List<String> defaultValue) { - return (List<String>) _configuration.getList(property, defaultValue); + return _configuration.getList(property, defaultValue); } - /// Validation Helpers + // Validation Helpers protected boolean contains(String property) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionConfiguration.java index c5fbb6efd9..7a2632d923 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionConfiguration.java @@ -85,8 +85,5 @@ public class SlowConsumerDetectionConfiguration extends ConfigurationPlugin } } - System.out.println("Configured SCDC"); - System.out.println("Delay:" + getDelay()); - System.out.println("TimeUnit:" + getTimeUnit()); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java index c06305ee4e..caec2c1324 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java @@ -57,20 +57,21 @@ public class ConnectionRegistry implements IConnectionRegistry, Closeable { connection.close(cause, message); } - catch (AMQException e) + catch (Exception e) { - _logger.warn("Error closing connection:" + e.getMessage()); + _logger.warn("Error closing connection: " + e.getMessage()); + deregisterConnection(connection); } } - public void registerConnection(AMQConnectionModel connnection) + public void registerConnection(AMQConnectionModel connection) { - _registry.add(connnection); + _registry.add(connection); } - public void deregisterConnection(AMQConnectionModel connnection) + public void deregisterConnection(AMQConnectionModel connection) { - _registry.remove(connnection); + _registry.remove(connection); } @Override diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/AMQManagedObject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/AMQManagedObject.java index c4ffcd26bf..8bce180784 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/AMQManagedObject.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/AMQManagedObject.java @@ -20,9 +20,13 @@ */ package org.apache.qpid.server.management; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; + import org.apache.qpid.server.logging.actors.ManagementActor; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.thread.Threading; import javax.management.ListenerNotFoundException; import javax.management.MBeanInfo; @@ -45,12 +49,13 @@ public abstract class AMQManagedObject extends DefaultManagedObject /** * broadcaster support class */ - protected NotificationBroadcasterSupport _broadcaster = new NotificationBroadcasterSupport(); + protected NotificationBroadcasterSupport _broadcaster = new NotificationBroadcasterSupport( + Executors.newCachedThreadPool(Threading.getThreadFactory())); /** * sequence number for notifications */ - protected long _notificationSequenceNumber = 0; + protected AtomicLong _notificationSequenceNumber = new AtomicLong(0); protected MBeanInfo _mbeanInfo; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageReference.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageReference.java index 055403ff08..399f8f9327 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageReference.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageReference.java @@ -20,13 +20,12 @@ */ package org.apache.qpid.server.message; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.concurrent.atomic.AtomicBoolean; public abstract class MessageReference<M extends ServerMessage> { - private static final AtomicReferenceFieldUpdater<MessageReference, ServerMessage> _messageUpdater = - AtomicReferenceFieldUpdater.newUpdater(MessageReference.class, ServerMessage.class,"_message"); + private final AtomicBoolean _released = new AtomicBoolean(false); private volatile M _message; @@ -47,10 +46,12 @@ public abstract class MessageReference<M extends ServerMessage> public void release() { - M message = (M) _messageUpdater.getAndSet(this,null); - if(message != null) + if(!_released.getAndSet(true)) { - onRelease(message); + if(_message != null) + { + onRelease(_message); + } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java index b61da12b05..a6bab017a1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java @@ -63,6 +63,7 @@ public class PluginManager implements Closeable private static final Logger _logger = Logger.getLogger(PluginManager.class); private static final int FELIX_STOP_TIMEOUT = 30000; + private static final String QPID_VER_SUFFIX = "version=0.9,"; private Framework _felix; @@ -133,33 +134,33 @@ public class PluginManager implements Closeable "org.osgi.service.startlevel; version=1.0.0," + "org.osgi.service.url; version=1.0.0," + "org.osgi.util.tracker; version=1.0.0," + - "org.apache.qpid.junit.extensions.util; version=0.7," + - "org.apache.qpid; version=0.7," + - "org.apache.qpid.common; version=0.7," + - "org.apache.qpid.exchange; version=0.7," + - "org.apache.qpid.framing; version=0.7," + - "org.apache.qpid.management.common.mbeans.annotations; version=0.7," + - "org.apache.qpid.protocol; version=0.7," + - "org.apache.qpid.server.binding; version=0.7," + - "org.apache.qpid.server.configuration; version=0.7," + - "org.apache.qpid.server.configuration.plugins; version=0.7," + - "org.apache.qpid.server.configuration.management; version=0.7," + - "org.apache.qpid.server.exchange; version=0.7," + - "org.apache.qpid.server.logging; version=0.7," + - "org.apache.qpid.server.logging.actors; version=0.7," + - "org.apache.qpid.server.logging.subjects; version=0.7," + - "org.apache.qpid.server.management; version=0.7," + - "org.apache.qpid.server.persistent; version=0.7," + - "org.apache.qpid.server.plugins; version=0.7," + - "org.apache.qpid.server.protocol; version=0.7," + - "org.apache.qpid.server.queue; version=0.7," + - "org.apache.qpid.server.registry; version=0.7," + - "org.apache.qpid.server.security; version=0.7," + - "org.apache.qpid.server.security.access; version=0.7," + - "org.apache.qpid.server.security.access.plugins; version=0.7," + - "org.apache.qpid.server.virtualhost; version=0.7," + - "org.apache.qpid.server.virtualhost.plugins; version=0.7," + - "org.apache.qpid.util; version=0.7," + + "org.apache.qpid.junit.extensions.util; " + QPID_VER_SUFFIX + + "org.apache.qpid; " + QPID_VER_SUFFIX + + "org.apache.qpid.common; " + QPID_VER_SUFFIX + + "org.apache.qpid.exchange; " + QPID_VER_SUFFIX + + "org.apache.qpid.framing; " + QPID_VER_SUFFIX + + "org.apache.qpid.management.common.mbeans.annotations; " + QPID_VER_SUFFIX + + "org.apache.qpid.protocol; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.binding; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.configuration; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.configuration.plugins; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.configuration.management; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.exchange; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.logging; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.logging.actors; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.logging.subjects; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.management; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.persistent; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.plugins; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.protocol; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.queue; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.registry; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.security; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.security.access; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.security.access.plugins; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.virtualhost; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.virtualhost.plugins; " + QPID_VER_SUFFIX + + "org.apache.qpid.util; " + QPID_VER_SUFFIX + "org.apache.commons.configuration; version=1.0.0," + "org.apache.commons.lang; version=1.0.0," + "org.apache.commons.lang.builder; version=1.0.0," + diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index 1185557d8f..c339bd9f90 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -361,7 +361,6 @@ public class AMQProtocolEngine implements Receiver<java.nio.ByteBuffer>, Managab mechanisms.getBytes(), locales.getBytes()); _sender.send(responseBody.generateFrame(0).toNioByteBuffer()); - } catch (AMQException e) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java index 77101e7d58..b009b6f522 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java @@ -135,7 +135,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed public String getVersion() { - return (_protocolSession.getClientVersion() == null) ? null : _protocolSession.getClientVersion().toString(); + return _protocolSession.getClientVersion(); } public Date getLastIoTime() @@ -324,7 +324,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed public void notifyClients(String notificationMsg) { Notification n = - new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, ++_notificationSequenceNumber, + new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, _notificationSequenceNumber.getAndIncrement(), System.currentTimeMillis(), notificationMsg); _broadcaster.sendNotification(n); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/BrokerReceiver.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/BrokerReceiver.java index 2fdf27d1aa..3a5bc7de48 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/BrokerReceiver.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/BrokerReceiver.java @@ -54,7 +54,7 @@ public class BrokerReceiver implements Receiver<java.nio.ByteBuffer>, LogSubject private IApplicationRegistry _appRegistry; private volatile Receiver<java.nio.ByteBuffer> _delegate = new SelfDelegateProtocolEngine(); - + public BrokerReceiver(IApplicationRegistry appRegistry, String fqdn, Set<VERSION> supported, diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java index 92b0236b6c..3befd43d89 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java @@ -38,7 +38,7 @@ import org.apache.qpid.transport.network.NetworkConnection; public class ProtocolEngine_0_10 extends InputHandler implements ConnectionConfig { - public static final int MAX_FRAME_SIZE = 64 * 1024 - 1; + public static final int MAX_FRAME_SIZE = Integer.getInteger("qpid.maxFrameSize", 64 * 1024 - 1); private NetworkConnection _network; private ServerConnection _connection; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index b5294b6d2f..784582b83e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -57,7 +57,44 @@ import javax.management.openmbean.TabularDataSupport; import javax.management.openmbean.TabularType; import java.text.SimpleDateFormat; -import java.util.*; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Set; + +import javax.management.JMException; +import javax.management.MBeanException; +import javax.management.MBeanNotificationInfo; +import javax.management.Notification; +import javax.management.OperationsException; +import javax.management.monitor.MonitorNotification; +import javax.management.openmbean.ArrayType; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.OpenType; +import javax.management.openmbean.SimpleType; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; +import javax.management.openmbean.TabularType; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.management.common.mbeans.ManagedQueue; +import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor; +import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription; +import org.apache.qpid.server.management.AMQManagedObject; +import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.MessageTransferMessage; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.txn.LocalTransaction; +import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.transport.MessageProperties; /** * AMQQueueMBean is the management bean for an {@link AMQQueue}. @@ -298,7 +335,6 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que */ public void checkForNotification(ServerMessage msg) throws AMQException { - final Set<NotificationCheck> notificationChecks = _queue.getNotificationChecks(); if(!notificationChecks.isEmpty()) @@ -317,7 +353,6 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que } } } - } /** @@ -330,7 +365,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que notificationMsg = notification.name() + " " + notificationMsg; _lastNotification = - new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, ++_notificationSequenceNumber, + new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, _notificationSequenceNumber.getAndIncrement(), System.currentTimeMillis(), notificationMsg); _broadcaster.sendNotification(_lastNotification); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java index f1407b8770..580fe8e834 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java @@ -55,6 +55,7 @@ import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticat import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.transport.network.IncomingNetworkTransport; import org.apache.qpid.transport.network.NetworkTransport; /** @@ -71,7 +72,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry protected final ServerConfiguration _configuration; - protected final Map<Integer, NetworkTransport> _transports = new HashMap<Integer, NetworkTransport>(); + protected final Map<Integer, IncomingNetworkTransport> _transports = new HashMap<Integer, IncomingNetworkTransport>(); protected ManagedObjectRegistry _managedObjectRegistry; @@ -374,12 +375,12 @@ public abstract class ApplicationRegistry implements IApplicationRegistry try { transport.close(); + CurrentActor.get().message(BrokerMessages.SHUTTING_DOWN(transport.getAddress().toString(), port)); } catch (Throwable e) { _logger.error("Unable to close network driver due to:" + e.getMessage()); } - CurrentActor.get().message(BrokerMessages.SHUTTING_DOWN(transport.getAddress().toString(), port)); } } } @@ -389,7 +390,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry return _configuration; } - public void registerTransport(int port, NetworkTransport transport) + public void registerTransport(int port, IncomingNetworkTransport transport) { synchronized (_transports) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java index 9d138055bf..3357a42e68 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java @@ -36,6 +36,7 @@ import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.transport.network.IncomingNetworkTransport; import org.apache.qpid.transport.network.NetworkTransport; public interface IApplicationRegistry @@ -81,7 +82,7 @@ public interface IApplicationRegistry /** * Register any network transports for this registry */ - void registerTransport(int port, NetworkTransport transport); + void registerTransport(int port, IncomingNetworkTransport transport); public UUID getBrokerId(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java index 0865165925..2e694b24ea 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java @@ -134,6 +134,8 @@ public class DerbyMessageStore implements MessageStore private static final String DELETE_FROM_META_DATA = "DELETE FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?"; private static final String SELECT_ALL_FROM_META_DATA = "SELECT message_id, meta_data FROM " + META_DATA_TABLE_NAME; + private static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006"; + private LogSubject _logSubject; private boolean _configured; @@ -631,9 +633,9 @@ public class DerbyMessageStore implements MessageStore } catch (SQLException e) { - if (e.getSQLState().equalsIgnoreCase("XJ015")) + if (e.getSQLState().equalsIgnoreCase(DERBY_SINGLE_DB_SHUTDOWN_CODE)) { - //XJ015 is expected and represents a clean shutdown, do nothing. + //expected and represents a clean shutdown of this database only, do nothing. } else { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index 9952700ae1..511d8e7fed 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -906,10 +906,9 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr public String toLogString() { String queueInfo = MessageFormat.format(QUEUE_FORMAT, _queue.getVirtualHost().getName(), - _queue.getNameShortString()); - String result = "[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getSubscriptionID()) + "(" - // queueString is "vh(/{0})/qu({1}) " so need to trim - + queueInfo.substring(0, queueInfo.length() - 1) + ")" + "] "; + _queue.getNameShortString()); + String result = "[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getSubscriptionID()) + + "(" + queueInfo.trim() + ")" + "] "; return result; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java index d6abee45d8..2439e607b1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java @@ -91,6 +91,10 @@ public class ServerConnection extends Connection implements AMQConnectionModel, if (state == State.CLOSED) { CurrentActor.get().message(this, ConnectionMessages.CLOSE()); + if (_virtualHost != null) + { + _virtualHost.getConnectionRegistry().deregisterConnection(this); + } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java index 2db1944cd1..9ba9e2f4a4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java @@ -45,13 +45,13 @@ public abstract class HouseKeepingTask implements Runnable { // Don't need to undo this as this is a thread pool thread so will // always go through here before we do any real work. - Thread.currentThread().setName(_name); + //Thread.currentThread().setName(_name); // XXX temporary CurrentActor.set(new AbstractActor(_rootLogger) { @Override public String getLogMessage() { - return _name; + return _name + " "; } }); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index c54173a281..1038e8fbd0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import javax.management.NotCompliantMBeanException; @@ -215,6 +216,25 @@ public class VirtualHostImpl implements VirtualHost _connectionRegistry = new ConnectionRegistry(); _houseKeepingTasks = new ScheduledThreadPoolExecutor(_configuration.getHouseKeepingThreadCount()); + _houseKeepingTasks.setThreadFactory(new ThreadFactory() + { + public Thread newThread(Runnable r) + { + Thread t = new Thread(r); + String name = "HouseKeeping"; + StackTraceElement[] trace = Thread.currentThread().getStackTrace(); + for (StackTraceElement elt : trace) + { + if (elt.getClassName().endsWith("Test")) + { + name += "-" + elt.getClassName(); +// break; // FIXME + } + } + t.setName(name); + return t; + } + }); _queueRegistry = new DefaultQueueRegistry(this); @@ -248,6 +268,7 @@ public class VirtualHostImpl implements VirtualHost _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean); _brokerMBean.register(); + initialiseHouseKeeping(hostConfig.getHousekeepingExpiredMessageCheckPeriod()); } @@ -275,12 +296,22 @@ public class VirtualHostImpl implements VirtualHost } catch (Exception e) { - _logger.error("Exception in housekeeping for queue: " - + q.getNameShortString().toString(), e); - //Don't throw exceptions as this will stop the - // house keeping task from running. + _logger.error("Exception in housekeeping for queue: " + q.getName(), e); + // Don't throw exceptions as this will stop the task from running. } } + } + } + + class CheckTransactionsTask extends HouseKeepingTask + { + public CheckTransactionsTask(VirtualHost vhost) + { + super(vhost); + } + + public void execute() + { for (AMQConnectionModel connection : getConnectionRegistry().getConnections()) { _logger.debug("Checking for long running open transactions on connection " + connection); @@ -293,17 +324,19 @@ public class VirtualHostImpl implements VirtualHost _configuration.getTransactionTimeoutOpenClose(), _configuration.getTransactionTimeoutIdleWarn(), _configuration.getTransactionTimeoutIdleClose()); - } + } catch (Exception e) { _logger.error("Exception in housekeeping for connection: " + connection.toString(), e); + // Don't throw exceptions as this will stop the task from running. } } } } - } + }; scheduleHouseKeepingTask(period, new ExpiredMessagesTask(this)); + scheduleHouseKeepingTask(period, new CheckTransactionsTask(this)); Map<String, VirtualHostPluginFactory> plugins = ApplicationRegistry.getInstance().getPluginManager().getVirtualHostPlugins(); @@ -368,13 +401,11 @@ public class VirtualHostImpl implements VirtualHost _houseKeepingTasks.setCorePoolSize(newSize); } - public int getHouseKeepingActiveCount() { return _houseKeepingTasks.getActiveCount(); } - private void initialiseMessageStore(VirtualHostConfiguration hostConfig) throws Exception { String messageStoreClass = hostConfig.getMessageStoreClass(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/ConfiguredQueueBindingListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/ConfiguredQueueBindingListener.java index 12206013eb..3346f80b7c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/ConfiguredQueueBindingListener.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/ConfiguredQueueBindingListener.java @@ -83,9 +83,11 @@ public class ConfiguredQueueBindingListener implements BindingListener if (config != null) { _cache.add(queue); + _log.error("=== SCD === ADD " + queue.getName()); } else { + _log.error("=== SCD === REMOVE " + queue.getName()); _cache.remove(queue); } } |