summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java120
1 files changed, 15 insertions, 105 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
index 52acd9085b..6ec1c512e5 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
@@ -24,18 +24,19 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.management.NotCompliantMBeanException;
-import javax.management.ObjectName;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
@@ -62,8 +63,6 @@ import org.apache.qpid.server.logging.messages.VirtualHostMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
-import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.DefaultQueueRegistry;
@@ -72,7 +71,7 @@ import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
@@ -100,7 +99,7 @@ public class VirtualHostImpl implements VirtualHost
private AMQBrokerManagerMBean _brokerMBean;
- private final AuthenticationManager _authenticationManager;
+ private AuthenticationManager _authenticationManager;
private SecurityManager _securityManager;
@@ -112,8 +111,6 @@ public class VirtualHostImpl implements VirtualHost
private BrokerConfig _broker;
private UUID _id;
- private boolean _statisticsEnabled = false;
- private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
private final long _createTime = System.currentTimeMillis();
private final ConcurrentHashMap<BrokerLink,BrokerLink> _links = new ConcurrentHashMap<BrokerLink, BrokerLink>();
@@ -164,12 +161,12 @@ public class VirtualHostImpl implements VirtualHost
public String getObjectInstanceName()
{
- return ObjectName.quote(_name);
+ return _name.toString();
}
public String getName()
{
- return _name;
+ return _name.toString();
}
public VirtualHostImpl getVirtualHost()
@@ -247,13 +244,11 @@ public class VirtualHostImpl implements VirtualHost
initialiseMessageStore(hostConfig);
}
- _authenticationManager = ApplicationRegistry.getInstance().getAuthenticationManager();
+ _authenticationManager = new PrincipalDatabaseAuthenticationManager(_name, _configuration);
_brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
_brokerMBean.register();
initialiseHouseKeeping(hostConfig.getHousekeepingExpiredMessageCheckPeriod());
-
- initialiseStatistics();
}
private void initialiseHouseKeeping(long period)
@@ -286,30 +281,19 @@ public class VirtualHostImpl implements VirtualHost
// house keeping task from running.
}
}
- for (AMQConnectionModel connection : getConnectionRegistry().getConnections())
- {
- _logger.debug("Checking for long running open transactions on connection " + connection);
- for (AMQSessionModel session : connection.getSessionModels())
- {
- _logger.debug("Checking for long running open transactions on session " + session);
- try
- {
- session.checkTransactionStatus(_configuration.getTransactionTimeoutOpenWarn(),
- _configuration.getTransactionTimeoutOpenClose(),
- _configuration.getTransactionTimeoutIdleWarn(),
- _configuration.getTransactionTimeoutIdleClose());
- }
- catch (Exception e)
- {
- _logger.error("Exception in housekeeping for connection: " + connection.toString(), e);
- }
- }
- }
}
}
scheduleHouseKeepingTask(period, new ExpiredMessagesTask(this));
+ class ForceChannelClosuresTask extends TimerTask
+ {
+ public void run()
+ {
+ _connectionRegistry.expireClosedChannels();
+ }
+ }
+
Map<String, VirtualHostPluginFactory> plugins =
ApplicationRegistry.getInstance().getPluginManager().getVirtualHostPlugins();
@@ -643,80 +627,6 @@ public class VirtualHostImpl implements VirtualHost
{
return _bindingFactory;
}
-
- public void registerMessageDelivered(long messageSize)
- {
- if (isStatisticsEnabled())
- {
- _messagesDelivered.registerEvent(1L);
- _dataDelivered.registerEvent(messageSize);
- }
- _appRegistry.registerMessageDelivered(messageSize);
- }
-
- public void registerMessageReceived(long messageSize, long timestamp)
- {
- if (isStatisticsEnabled())
- {
- _messagesReceived.registerEvent(1L, timestamp);
- _dataReceived.registerEvent(messageSize, timestamp);
- }
- _appRegistry.registerMessageReceived(messageSize, timestamp);
- }
-
- public StatisticsCounter getMessageReceiptStatistics()
- {
- return _messagesReceived;
- }
-
- public StatisticsCounter getDataReceiptStatistics()
- {
- return _dataReceived;
- }
-
- public StatisticsCounter getMessageDeliveryStatistics()
- {
- return _messagesDelivered;
- }
-
- public StatisticsCounter getDataDeliveryStatistics()
- {
- return _dataDelivered;
- }
-
- public void resetStatistics()
- {
- _messagesDelivered.reset();
- _dataDelivered.reset();
- _messagesReceived.reset();
- _dataReceived.reset();
-
- for (AMQConnectionModel connection : _connectionRegistry.getConnections())
- {
- connection.resetStatistics();
- }
- }
-
- public void initialiseStatistics()
- {
- setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
- _appRegistry.getConfiguration().isStatisticsGenerationVirtualhostsEnabled());
-
- _messagesDelivered = new StatisticsCounter("messages-delivered-" + getName());
- _dataDelivered = new StatisticsCounter("bytes-delivered-" + getName());
- _messagesReceived = new StatisticsCounter("messages-received-" + getName());
- _dataReceived = new StatisticsCounter("bytes-received-" + getName());
- }
-
- public boolean isStatisticsEnabled()
- {
- return _statisticsEnabled;
- }
-
- public void setStatisticsEnabled(boolean enabled)
- {
- _statisticsEnabled = enabled;
- }
public void createBrokerConnection(final String transport,
final String host,