summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/BrokerInstance.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigStore.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionConfiguration.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java13
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/management/AMQManagedObject.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageReference.java13
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java55
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java4
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/BrokerReceiver.java2
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java43
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java47
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/ConfiguredQueueBindingListener.java2
21 files changed, 157 insertions, 76 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/BrokerInstance.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/BrokerInstance.java
index ad4e40a562..8150cd7404 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/BrokerInstance.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/BrokerInstance.java
@@ -255,6 +255,7 @@ public class BrokerInstance
private void configureLogging(File logConfigFile, int logWatchTime) throws Exception
{
+ _logger.info("configuring logging using file " + logConfigFile.getName());
if (logConfigFile.exists() && logConfigFile.canRead())
{
CurrentActor.get().message(BrokerMessages.LOG_CONFIG(logConfigFile.getAbsolutePath()));
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigStore.java
index 0e03e33be8..7dfe9ff49a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigStore.java
@@ -29,6 +29,8 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import sun.misc.Unsafe;
+
public class ConfigStore
{
private ConcurrentHashMap<ConfigObjectType, ConcurrentHashMap<UUID, ConfiguredObject>> _typeMap =
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java
index c4cad1e5c9..18f41588d5 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java
@@ -229,13 +229,12 @@ public abstract class ConfigurationPlugin
return getListValue(property, Collections.<String>emptyList());
}
- @SuppressWarnings("unchecked")
protected List<String> getListValue(String property, List<String> defaultValue)
{
- return (List<String>) _configuration.getList(property, defaultValue);
+ return _configuration.getList(property, defaultValue);
}
- /// Validation Helpers
+ // Validation Helpers
protected boolean contains(String property)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionConfiguration.java
index c5fbb6efd9..7a2632d923 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionConfiguration.java
@@ -85,8 +85,5 @@ public class SlowConsumerDetectionConfiguration extends ConfigurationPlugin
}
}
- System.out.println("Configured SCDC");
- System.out.println("Delay:" + getDelay());
- System.out.println("TimeUnit:" + getTimeUnit());
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
index c06305ee4e..caec2c1324 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
@@ -57,20 +57,21 @@ public class ConnectionRegistry implements IConnectionRegistry, Closeable
{
connection.close(cause, message);
}
- catch (AMQException e)
+ catch (Exception e)
{
- _logger.warn("Error closing connection:" + e.getMessage());
+ _logger.warn("Error closing connection: " + e.getMessage());
+ deregisterConnection(connection);
}
}
- public void registerConnection(AMQConnectionModel connnection)
+ public void registerConnection(AMQConnectionModel connection)
{
- _registry.add(connnection);
+ _registry.add(connection);
}
- public void deregisterConnection(AMQConnectionModel connnection)
+ public void deregisterConnection(AMQConnectionModel connection)
{
- _registry.remove(connnection);
+ _registry.remove(connection);
}
@Override
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/AMQManagedObject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/AMQManagedObject.java
index c4ffcd26bf..8bce180784 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/AMQManagedObject.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/AMQManagedObject.java
@@ -20,9 +20,13 @@
*/
package org.apache.qpid.server.management;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.qpid.server.logging.actors.ManagementActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.thread.Threading;
import javax.management.ListenerNotFoundException;
import javax.management.MBeanInfo;
@@ -45,12 +49,13 @@ public abstract class AMQManagedObject extends DefaultManagedObject
/**
* broadcaster support class
*/
- protected NotificationBroadcasterSupport _broadcaster = new NotificationBroadcasterSupport();
+ protected NotificationBroadcasterSupport _broadcaster = new NotificationBroadcasterSupport(
+ Executors.newCachedThreadPool(Threading.getThreadFactory()));
/**
* sequence number for notifications
*/
- protected long _notificationSequenceNumber = 0;
+ protected AtomicLong _notificationSequenceNumber = new AtomicLong(0);
protected MBeanInfo _mbeanInfo;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageReference.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageReference.java
index 055403ff08..399f8f9327 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageReference.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageReference.java
@@ -20,13 +20,12 @@
*/
package org.apache.qpid.server.message;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.atomic.AtomicBoolean;
public abstract class MessageReference<M extends ServerMessage>
{
- private static final AtomicReferenceFieldUpdater<MessageReference, ServerMessage> _messageUpdater =
- AtomicReferenceFieldUpdater.newUpdater(MessageReference.class, ServerMessage.class,"_message");
+ private final AtomicBoolean _released = new AtomicBoolean(false);
private volatile M _message;
@@ -47,10 +46,12 @@ public abstract class MessageReference<M extends ServerMessage>
public void release()
{
- M message = (M) _messageUpdater.getAndSet(this,null);
- if(message != null)
+ if(!_released.getAndSet(true))
{
- onRelease(message);
+ if(_message != null)
+ {
+ onRelease(_message);
+ }
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java
index b61da12b05..a6bab017a1 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java
@@ -63,6 +63,7 @@ public class PluginManager implements Closeable
private static final Logger _logger = Logger.getLogger(PluginManager.class);
private static final int FELIX_STOP_TIMEOUT = 30000;
+ private static final String QPID_VER_SUFFIX = "version=0.9,";
private Framework _felix;
@@ -133,33 +134,33 @@ public class PluginManager implements Closeable
"org.osgi.service.startlevel; version=1.0.0," +
"org.osgi.service.url; version=1.0.0," +
"org.osgi.util.tracker; version=1.0.0," +
- "org.apache.qpid.junit.extensions.util; version=0.7," +
- "org.apache.qpid; version=0.7," +
- "org.apache.qpid.common; version=0.7," +
- "org.apache.qpid.exchange; version=0.7," +
- "org.apache.qpid.framing; version=0.7," +
- "org.apache.qpid.management.common.mbeans.annotations; version=0.7," +
- "org.apache.qpid.protocol; version=0.7," +
- "org.apache.qpid.server.binding; version=0.7," +
- "org.apache.qpid.server.configuration; version=0.7," +
- "org.apache.qpid.server.configuration.plugins; version=0.7," +
- "org.apache.qpid.server.configuration.management; version=0.7," +
- "org.apache.qpid.server.exchange; version=0.7," +
- "org.apache.qpid.server.logging; version=0.7," +
- "org.apache.qpid.server.logging.actors; version=0.7," +
- "org.apache.qpid.server.logging.subjects; version=0.7," +
- "org.apache.qpid.server.management; version=0.7," +
- "org.apache.qpid.server.persistent; version=0.7," +
- "org.apache.qpid.server.plugins; version=0.7," +
- "org.apache.qpid.server.protocol; version=0.7," +
- "org.apache.qpid.server.queue; version=0.7," +
- "org.apache.qpid.server.registry; version=0.7," +
- "org.apache.qpid.server.security; version=0.7," +
- "org.apache.qpid.server.security.access; version=0.7," +
- "org.apache.qpid.server.security.access.plugins; version=0.7," +
- "org.apache.qpid.server.virtualhost; version=0.7," +
- "org.apache.qpid.server.virtualhost.plugins; version=0.7," +
- "org.apache.qpid.util; version=0.7," +
+ "org.apache.qpid.junit.extensions.util; " + QPID_VER_SUFFIX +
+ "org.apache.qpid; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.common; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.exchange; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.framing; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.management.common.mbeans.annotations; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.protocol; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.binding; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.configuration; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.configuration.plugins; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.configuration.management; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.exchange; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.logging; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.logging.actors; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.logging.subjects; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.management; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.persistent; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.plugins; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.protocol; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.queue; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.registry; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.security; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.security.access; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.security.access.plugins; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.virtualhost; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.virtualhost.plugins; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.util; " + QPID_VER_SUFFIX +
"org.apache.commons.configuration; version=1.0.0," +
"org.apache.commons.lang; version=1.0.0," +
"org.apache.commons.lang.builder; version=1.0.0," +
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index 1185557d8f..c339bd9f90 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -361,7 +361,6 @@ public class AMQProtocolEngine implements Receiver<java.nio.ByteBuffer>, Managab
mechanisms.getBytes(),
locales.getBytes());
_sender.send(responseBody.generateFrame(0).toNioByteBuffer());
-
}
catch (AMQException e)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
index 77101e7d58..b009b6f522 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
@@ -135,7 +135,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
public String getVersion()
{
- return (_protocolSession.getClientVersion() == null) ? null : _protocolSession.getClientVersion().toString();
+ return _protocolSession.getClientVersion();
}
public Date getLastIoTime()
@@ -324,7 +324,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
public void notifyClients(String notificationMsg)
{
Notification n =
- new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, ++_notificationSequenceNumber,
+ new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, _notificationSequenceNumber.getAndIncrement(),
System.currentTimeMillis(), notificationMsg);
_broadcaster.sendNotification(n);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/BrokerReceiver.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/BrokerReceiver.java
index 2fdf27d1aa..3a5bc7de48 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/BrokerReceiver.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/BrokerReceiver.java
@@ -54,7 +54,7 @@ public class BrokerReceiver implements Receiver<java.nio.ByteBuffer>, LogSubject
private IApplicationRegistry _appRegistry;
private volatile Receiver<java.nio.ByteBuffer> _delegate = new SelfDelegateProtocolEngine();
-
+
public BrokerReceiver(IApplicationRegistry appRegistry,
String fqdn,
Set<VERSION> supported,
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
index 92b0236b6c..3befd43d89 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
@@ -38,7 +38,7 @@ import org.apache.qpid.transport.network.NetworkConnection;
public class ProtocolEngine_0_10 extends InputHandler implements ConnectionConfig
{
- public static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
+ public static final int MAX_FRAME_SIZE = Integer.getInteger("qpid.maxFrameSize", 64 * 1024 - 1);
private NetworkConnection _network;
private ServerConnection _connection;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index b5294b6d2f..784582b83e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -57,7 +57,44 @@ import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
import java.text.SimpleDateFormat;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Set;
+
+import javax.management.JMException;
+import javax.management.MBeanException;
+import javax.management.MBeanNotificationInfo;
+import javax.management.Notification;
+import javax.management.OperationsException;
+import javax.management.monitor.MonitorNotification;
+import javax.management.openmbean.ArrayType;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.OpenType;
+import javax.management.openmbean.SimpleType;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+import javax.management.openmbean.TabularType;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.management.common.mbeans.ManagedQueue;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
+import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.transport.MessageProperties;
/**
* AMQQueueMBean is the management bean for an {@link AMQQueue}.
@@ -298,7 +335,6 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
*/
public void checkForNotification(ServerMessage msg) throws AMQException
{
-
final Set<NotificationCheck> notificationChecks = _queue.getNotificationChecks();
if(!notificationChecks.isEmpty())
@@ -317,7 +353,6 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
}
}
}
-
}
/**
@@ -330,7 +365,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
notificationMsg = notification.name() + " " + notificationMsg;
_lastNotification =
- new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, ++_notificationSequenceNumber,
+ new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, _notificationSequenceNumber.getAndIncrement(),
System.currentTimeMillis(), notificationMsg);
_broadcaster.sendNotification(_lastNotification);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
index f1407b8770..580fe8e834 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
@@ -55,6 +55,7 @@ import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticat
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.transport.network.IncomingNetworkTransport;
import org.apache.qpid.transport.network.NetworkTransport;
/**
@@ -71,7 +72,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
protected final ServerConfiguration _configuration;
- protected final Map<Integer, NetworkTransport> _transports = new HashMap<Integer, NetworkTransport>();
+ protected final Map<Integer, IncomingNetworkTransport> _transports = new HashMap<Integer, IncomingNetworkTransport>();
protected ManagedObjectRegistry _managedObjectRegistry;
@@ -374,12 +375,12 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
try
{
transport.close();
+ CurrentActor.get().message(BrokerMessages.SHUTTING_DOWN(transport.getAddress().toString(), port));
}
catch (Throwable e)
{
_logger.error("Unable to close network driver due to:" + e.getMessage());
}
- CurrentActor.get().message(BrokerMessages.SHUTTING_DOWN(transport.getAddress().toString(), port));
}
}
}
@@ -389,7 +390,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
return _configuration;
}
- public void registerTransport(int port, NetworkTransport transport)
+ public void registerTransport(int port, IncomingNetworkTransport transport)
{
synchronized (_transports)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
index 9d138055bf..3357a42e68 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
@@ -36,6 +36,7 @@ import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.transport.network.IncomingNetworkTransport;
import org.apache.qpid.transport.network.NetworkTransport;
public interface IApplicationRegistry
@@ -81,7 +82,7 @@ public interface IApplicationRegistry
/**
* Register any network transports for this registry
*/
- void registerTransport(int port, NetworkTransport transport);
+ void registerTransport(int port, IncomingNetworkTransport transport);
public UUID getBrokerId();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
index 0865165925..2e694b24ea 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
@@ -134,6 +134,8 @@ public class DerbyMessageStore implements MessageStore
private static final String DELETE_FROM_META_DATA = "DELETE FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?";
private static final String SELECT_ALL_FROM_META_DATA = "SELECT message_id, meta_data FROM " + META_DATA_TABLE_NAME;
+ private static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006";
+
private LogSubject _logSubject;
private boolean _configured;
@@ -631,9 +633,9 @@ public class DerbyMessageStore implements MessageStore
}
catch (SQLException e)
{
- if (e.getSQLState().equalsIgnoreCase("XJ015"))
+ if (e.getSQLState().equalsIgnoreCase(DERBY_SINGLE_DB_SHUTDOWN_CODE))
{
- //XJ015 is expected and represents a clean shutdown, do nothing.
+ //expected and represents a clean shutdown of this database only, do nothing.
}
else
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
index 9952700ae1..511d8e7fed 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
@@ -906,10 +906,9 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
public String toLogString()
{
String queueInfo = MessageFormat.format(QUEUE_FORMAT, _queue.getVirtualHost().getName(),
- _queue.getNameShortString());
- String result = "[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getSubscriptionID()) + "("
- // queueString is "vh(/{0})/qu({1}) " so need to trim
- + queueInfo.substring(0, queueInfo.length() - 1) + ")" + "] ";
+ _queue.getNameShortString());
+ String result = "[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getSubscriptionID()) +
+ "(" + queueInfo.trim() + ")" + "] ";
return result;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
index d6abee45d8..2439e607b1 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
@@ -91,6 +91,10 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
if (state == State.CLOSED)
{
CurrentActor.get().message(this, ConnectionMessages.CLOSE());
+ if (_virtualHost != null)
+ {
+ _virtualHost.getConnectionRegistry().deregisterConnection(this);
+ }
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java
index 2db1944cd1..9ba9e2f4a4 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java
@@ -45,13 +45,13 @@ public abstract class HouseKeepingTask implements Runnable
{
// Don't need to undo this as this is a thread pool thread so will
// always go through here before we do any real work.
- Thread.currentThread().setName(_name);
+ //Thread.currentThread().setName(_name); // XXX temporary
CurrentActor.set(new AbstractActor(_rootLogger)
{
@Override
public String getLogMessage()
{
- return _name;
+ return _name + " ";
}
});
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
index c54173a281..1038e8fbd0 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.management.NotCompliantMBeanException;
@@ -215,6 +216,25 @@ public class VirtualHostImpl implements VirtualHost
_connectionRegistry = new ConnectionRegistry();
_houseKeepingTasks = new ScheduledThreadPoolExecutor(_configuration.getHouseKeepingThreadCount());
+ _houseKeepingTasks.setThreadFactory(new ThreadFactory()
+ {
+ public Thread newThread(Runnable r)
+ {
+ Thread t = new Thread(r);
+ String name = "HouseKeeping";
+ StackTraceElement[] trace = Thread.currentThread().getStackTrace();
+ for (StackTraceElement elt : trace)
+ {
+ if (elt.getClassName().endsWith("Test"))
+ {
+ name += "-" + elt.getClassName();
+// break; // FIXME
+ }
+ }
+ t.setName(name);
+ return t;
+ }
+ });
_queueRegistry = new DefaultQueueRegistry(this);
@@ -248,6 +268,7 @@ public class VirtualHostImpl implements VirtualHost
_brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
_brokerMBean.register();
+
initialiseHouseKeeping(hostConfig.getHousekeepingExpiredMessageCheckPeriod());
}
@@ -275,12 +296,22 @@ public class VirtualHostImpl implements VirtualHost
}
catch (Exception e)
{
- _logger.error("Exception in housekeeping for queue: "
- + q.getNameShortString().toString(), e);
- //Don't throw exceptions as this will stop the
- // house keeping task from running.
+ _logger.error("Exception in housekeeping for queue: " + q.getName(), e);
+ // Don't throw exceptions as this will stop the task from running.
}
}
+ }
+ }
+
+ class CheckTransactionsTask extends HouseKeepingTask
+ {
+ public CheckTransactionsTask(VirtualHost vhost)
+ {
+ super(vhost);
+ }
+
+ public void execute()
+ {
for (AMQConnectionModel connection : getConnectionRegistry().getConnections())
{
_logger.debug("Checking for long running open transactions on connection " + connection);
@@ -293,17 +324,19 @@ public class VirtualHostImpl implements VirtualHost
_configuration.getTransactionTimeoutOpenClose(),
_configuration.getTransactionTimeoutIdleWarn(),
_configuration.getTransactionTimeoutIdleClose());
- }
+ }
catch (Exception e)
{
_logger.error("Exception in housekeeping for connection: " + connection.toString(), e);
+ // Don't throw exceptions as this will stop the task from running.
}
}
}
}
- }
+ };
scheduleHouseKeepingTask(period, new ExpiredMessagesTask(this));
+ scheduleHouseKeepingTask(period, new CheckTransactionsTask(this));
Map<String, VirtualHostPluginFactory> plugins =
ApplicationRegistry.getInstance().getPluginManager().getVirtualHostPlugins();
@@ -368,13 +401,11 @@ public class VirtualHostImpl implements VirtualHost
_houseKeepingTasks.setCorePoolSize(newSize);
}
-
public int getHouseKeepingActiveCount()
{
return _houseKeepingTasks.getActiveCount();
}
-
private void initialiseMessageStore(VirtualHostConfiguration hostConfig) throws Exception
{
String messageStoreClass = hostConfig.getMessageStoreClass();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/ConfiguredQueueBindingListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/ConfiguredQueueBindingListener.java
index 12206013eb..3346f80b7c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/ConfiguredQueueBindingListener.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/ConfiguredQueueBindingListener.java
@@ -83,9 +83,11 @@ public class ConfiguredQueueBindingListener implements BindingListener
if (config != null)
{
_cache.add(queue);
+ _log.error("=== SCD === ADD " + queue.getName());
}
else
{
+ _log.error("=== SCD === REMOVE " + queue.getName());
_cache.remove(queue);
}
}