diff options
Diffstat (limited to 'qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java')
-rw-r--r-- | qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java | 192 |
1 files changed, 52 insertions, 140 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java index 492ec9d7bf..4fe41d512a 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java @@ -20,17 +20,17 @@ package org.apache.qpid.server.store.berkeleydb; * */ +import java.util.Map; + import org.apache.log4j.Logger; import org.apache.qpid.server.connection.IConnectionRegistry; +import org.apache.qpid.server.logging.messages.MessageStoreMessages; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.store.DurableConfigurationRecoverer; import org.apache.qpid.server.store.DurableConfigurationStore; -import org.apache.qpid.server.store.Event; -import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.OperationalLoggingListener; import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory; import org.apache.qpid.server.virtualhost.AbstractVirtualHost; @@ -47,8 +47,7 @@ public class BDBHAVirtualHost extends AbstractVirtualHost private static final Logger LOGGER = Logger.getLogger(BDBHAVirtualHost.class); private BDBMessageStore _messageStore; - - private boolean _inVhostInitiatedClose; + private MessageStoreLogSubject _messageStoreLogSubject; BDBHAVirtualHost(VirtualHostRegistry virtualHostRegistry, StatisticsGatherer brokerStatisticsGatherer, @@ -60,28 +59,18 @@ public class BDBHAVirtualHost extends AbstractVirtualHost protected void initialiseStorage(VirtualHost virtualHost) { - _messageStore = new BDBMessageStore(new ReplicatedEnvironmentFacadeFactory()); - - final MessageStoreLogSubject storeLogSubject = - new MessageStoreLogSubject(getName(), _messageStore.getClass().getSimpleName()); - OperationalLoggingListener.listen(_messageStore, storeLogSubject, getEventLogger()); - - _messageStore.addEventListener(new BeforeActivationListener(), Event.BEFORE_ACTIVATE); - _messageStore.addEventListener(new AfterActivationListener(), Event.AFTER_ACTIVATE); - _messageStore.addEventListener(new BeforeCloseListener(), Event.BEFORE_CLOSE); - + setState(State.PASSIVE); + _messageStoreLogSubject = new MessageStoreLogSubject(getName(), BDBMessageStore.class.getSimpleName()); + _messageStore = new BDBMessageStore(new ReplicatedEnvironmentFacadeFactory()); + getEventLogger().message(_messageStoreLogSubject, MessageStoreMessages.CREATED()); - _messageStore.addEventListener(new AfterInitialisationListener(), Event.AFTER_INIT); - _messageStore.addEventListener(new BeforePassivationListener(), Event.BEFORE_PASSIVATE); + Map<String, Object> messageStoreSettings = virtualHost.getMessageStoreSettings(); - _messageStore.openConfigurationStore( - virtualHost.getName(), virtualHost.getMessageStoreSettings() - ); + _messageStore.openConfigurationStore(virtualHost.getName(), messageStoreSettings); + _messageStore.openMessageStore(virtualHost.getName(), messageStoreSettings); - _messageStore.openMessageStore( - virtualHost.getName(), virtualHost.getMessageStoreSettings() - ); + getEventLogger().message(_messageStoreLogSubject, MessageStoreMessages.STORE_LOCATION(_messageStore.getStoreLocation())); // Make the virtualhost model object a replication group listener ReplicatedEnvironmentFacade environmentFacade = (ReplicatedEnvironmentFacade) _messageStore.getEnvironmentFacade(); @@ -89,30 +78,6 @@ public class BDBHAVirtualHost extends AbstractVirtualHost } - - protected void closeStorage() - { - //Close MessageStore - if (_messageStore != null) - { - //Remove MessageStore Interface should not throw Exception - try - { - _inVhostInitiatedClose = true; - _messageStore.closeMessageStore(); - _messageStore.closeConfigurationStore(); - } - catch (Exception e) - { - getLogger().error("Failed to close message store", e); - } - finally - { - _inVhostInitiatedClose = false; - } - } - } - @Override public DurableConfigurationStore getDurableConfigurationStore() { @@ -125,77 +90,62 @@ public class BDBHAVirtualHost extends AbstractVirtualHost return _messageStore; } - private final class AfterInitialisationListener implements EventListener + private void activate() { - public void event(Event event) + try { - setState(State.PASSIVE); - } + _messageStore.getEnvironmentFacade().getEnvironment().flushLog(true); - } + DurableConfigurationRecoverer configRecoverer = + new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(), + new DefaultUpgraderProvider(BDBHAVirtualHost.this, getExchangeRegistry()), getEventLogger()); + _messageStore.recoverConfigurationStore(getModel(), configRecoverer); - private final class BeforePassivationListener implements EventListener - { - public void event(Event event) - { - State finalState = State.ERRORED; - - try - { - /* the approach here is not ideal as there is a race condition where a - * queue etc could be created while the virtual host is on the way to - * the passivated state. However the store state change from MASTER to UNKNOWN - * is documented as exceptionally rare.. - */ - - getConnectionRegistry().close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT); - removeHouseKeepingTasks(); + initialiseModel(); - getQueueRegistry().stopAllAndUnregisterMBeans(); - getExchangeRegistry().clearAndUnregisterMbeans(); - getDtxRegistry().close(); + VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(BDBHAVirtualHost.this, getMessageStoreLogSubject()); + _messageStore.recoverMessageStore(getModel(), recoveryHandler, recoveryHandler); - finalState = State.PASSIVE; - } - finally - { - setState(finalState); - reportIfError(getState()); - } + attainActivation(); } - - } - - - private final class BeforeActivationListener implements EventListener - { - @Override - public void event(Event event) + catch (Exception e) { - initialiseModel(); + LOGGER.error("Failed to activate on hearing MASTER change event", e); } } - private final class AfterActivationListener implements EventListener + private void passivate() { - @Override - public void event(Event event) + State finalState = State.ERRORED; + + try { - attainActivation(); + /* the approach here is not ideal as there is a race condition where a + * queue etc could be created while the virtual host is on the way to + * the passivated state. However the store state change from MASTER to UNKNOWN + * is documented as exceptionally rare. + */ + + getConnectionRegistry().close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT); + removeHouseKeepingTasks(); + + getQueueRegistry().stopAllAndUnregisterMBeans(); + getExchangeRegistry().clearAndUnregisterMbeans(); + getDtxRegistry().close(); + + finalState = State.PASSIVE; + } + finally + { + setState(finalState); + reportIfError(getState()); } } - private final class BeforeCloseListener implements EventListener + @Override + protected MessageStoreLogSubject getMessageStoreLogSubject() { - @Override - public void event(Event event) - { - if(!_inVhostInitiatedClose) - { - shutdownHouseKeeping(); - } - - } + return _messageStoreLogSubject; } private class BDBHAMessageStoreStateChangeListener implements StateChangeListener @@ -208,8 +158,7 @@ public class BDBHAVirtualHost extends AbstractVirtualHost if (LOGGER.isInfoEnabled()) { - LOGGER.info("Received BDB event indicating transition to state " + state - + " when current message store state is " + _messageStore._messageStoreStateManager.getState()); + LOGGER.info("Received BDB event indicating transition to state " + state); } switch (state) @@ -232,43 +181,6 @@ public class BDBHAVirtualHost extends AbstractVirtualHost throw new IllegalStateException("Unexpected state change: " + state); } } - - private void activate() - { - try - { - _messageStore.getEnvironmentFacade().getEnvironment().flushLog(true); - - DurableConfigurationRecoverer configRecoverer = - new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(), - new DefaultUpgraderProvider(BDBHAVirtualHost.this, getExchangeRegistry()), getEventLogger()); - _messageStore.recoverConfigurationStore(getModel(), configRecoverer); - - VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(BDBHAVirtualHost.this); - _messageStore.recoverMessageStore(getModel(), recoveryHandler, recoveryHandler); - } - catch (Exception e) - { - LOGGER.error("Failed to activate on hearing MASTER change event", e); - } - } - - private void passivate() - { - try - { - //TODO: move this this into the store method passivate() - if (_messageStore._messageStoreStateManager.isNotInState(org.apache.qpid.server.store.State.INITIALISED)) - { - _messageStore._messageStoreStateManager.attainState(org.apache.qpid.server.store.State.INITIALISED); - } - } - catch (Exception e) - { - LOGGER.error("Failed to passivate on hearing REPLICA or DETACHED change event", e); - } - } - } } |