From 676b0069419e84024417ea72f46487ae003dfd4c Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Fri, 4 Apr 2014 22:10:24 +0000 Subject: Merge from trunk git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-config-store-changes@1584926 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/store/berkeleydb/BDBHAVirtualHost.java | 17 +- .../server/store/berkeleydb/BDBMessageStore.java | 542 ++++++------- .../qpid/server/store/berkeleydb/entry/Xid.java | 52 -- .../server/store/berkeleydb/tuple/XidBinding.java | 2 +- .../store/berkeleydb/BDBMessageStoreTest.java | 436 ++++++++++ .../berkeleydb/upgrade/UpgradeFrom5To6Test.java | 2 +- .../store/berkeleydb/BDBMessageStoreTest.java | 666 ---------------- .../configuration/startup/BrokerStoreUpgrader.java | 21 +- .../store/JsonConfigurationEntryStore.java | 27 +- .../store/ManagementModeStoreHandler.java | 34 +- .../store/MemoryConfigurationEntryStore.java | 25 +- .../server/store/AbstractJDBCMessageStore.java | 589 +++++++------- .../server/store/AbstractMemoryMessageStore.java | 297 ++++++- ...ConfiguredObjectRecordRecoveverAndUpgrader.java | 60 ++ .../server/store/DurableConfigurationStore.java | 18 +- .../qpid/server/store/JsonFileConfigStore.java | 14 +- .../org/apache/qpid/server/store/MessageStore.java | 18 +- .../server/store/MessageStoreRecoveryHandler.java | 33 - .../apache/qpid/server/store/NullMessageStore.java | 34 +- .../store/TransactionLogRecoveryHandler.java | 42 - .../java/org/apache/qpid/server/store/Xid.java | 102 +++ .../handler/ConfiguredObjectRecordHandler.java | 41 + .../handler/DistributedTransactionHandler.java | 30 + .../qpid/server/store/handler/MessageHandler.java | 30 + .../store/handler/MessageInstanceHandler.java | 29 + .../virtualhost/DefaultUpgraderProvider.java | 4 + .../server/virtualhost/MessageStoreRecoverer.java | 357 +++++++++ .../server/virtualhost/StandardVirtualHost.java | 18 +- .../VirtualHostConfigRecoveryHandler.java | 350 -------- .../store/ManagementModeStoreHandlerTest.java | 81 +- .../AbstractDurableConfigurationStoreTestCase.java | 60 +- .../JsonFileConfigStoreConfigurationTest.java | 6 - .../qpid/server/store/JsonFileConfigStoreTest.java | 57 +- .../store/MessageStoreQuotaEventsTestBase.java | 4 - .../qpid/server/store/MessageStoreTestCase.java | 404 ++++++++-- .../qpid/server/store/TestMemoryMessageStore.java | 21 +- .../qpid/server/store/TestMessageMetaData.java | 15 +- .../qpid/server/store/TestMessageMetaDataType.java | 64 +- .../org/apache/qpid/server/store/TestRecord.java | 91 +++ .../server/store/TestableMemoryMessageStore.java | 149 ---- .../store/TestableMemoryMessageStoreFactory.java | 47 -- .../qpid/server/txn/MockStoreTransaction.java | 18 +- .../apache/qpid/server/util/BrokerTestHelper.java | 3 +- .../virtualhost/MessageStoreRecovererTest.java | 414 ++++++++++ ...g.apache.qpid.server.plugin.MessageStoreFactory | 1 - .../apache/qpid/server/protocol/v0_8/AckTest.java | 6 +- .../qpid/server/protocol/v0_8/AcknowledgeTest.java | 6 +- .../protocol/v0_8/QueueBrowserUsesNoAckTest.java | 4 +- .../protocol/v0_8/ReferenceCountingTest.java | 9 +- .../qpid/server/store/derby/DerbyMessageStore.java | 6 - .../server/store/derby/DerbyMessageStoreTest.java | 9 +- .../qpid/server/store/jdbc/JDBCMessageStore.java | 6 - qpid/java/broker-plugins/memory-store/pom.xml | 16 + .../qpid/server/store/MemoryMessageStore.java | 5 - .../qpid/server/store/MemoryMessageStoreTest.java | 47 ++ .../apache/qpid/server/store/MessageStoreTest.java | 880 --------------------- .../qpid/server/store/QuotaMessageStore.java | 8 +- .../apache/qpid/server/store/SlowMessageStore.java | 42 +- .../server/store/VirtualHostMessageStoreTest.java | 872 ++++++++++++++++++++ .../qpid/test/utils/TestBrokerConfiguration.java | 20 +- qpid/java/test-profiles/CPPExcludes | 16 +- qpid/java/test-profiles/JavaBDBExcludes | 4 - qpid/java/test-profiles/JavaTransientExcludes | 18 +- 63 files changed, 4070 insertions(+), 3229 deletions(-) delete mode 100644 qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/Xid.java create mode 100644 qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java delete mode 100644 qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java create mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecordRecoveverAndUpgrader.java delete mode 100755 qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStoreRecoveryHandler.java delete mode 100755 qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java create mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Xid.java create mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/ConfiguredObjectRecordHandler.java create mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/DistributedTransactionHandler.java create mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/MessageHandler.java create mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/MessageInstanceHandler.java create mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java delete mode 100755 qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java create mode 100644 qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestRecord.java delete mode 100644 qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java delete mode 100644 qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java create mode 100644 qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MessageStoreRecovererTest.java create mode 100644 qpid/java/broker-plugins/memory-store/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java delete mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java create mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java index aae0a56a40..a58bc274a9 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java @@ -29,15 +29,15 @@ import org.apache.qpid.server.logging.messages.MessageStoreMessages; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.stats.StatisticsGatherer; -import org.apache.qpid.server.store.DurableConfigurationRecoverer; +import org.apache.qpid.server.store.ConfiguredObjectRecordRecoveverAndUpgrader; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory; +import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; import org.apache.qpid.server.virtualhost.AbstractVirtualHost; -import org.apache.qpid.server.virtualhost.DefaultUpgraderProvider; +import org.apache.qpid.server.virtualhost.MessageStoreRecoverer; import org.apache.qpid.server.virtualhost.State; -import org.apache.qpid.server.virtualhost.VirtualHostConfigRecoveryHandler; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import com.sleepycat.je.rep.StateChangeEvent; @@ -98,17 +98,12 @@ public class BDBHAVirtualHost extends AbstractVirtualHost { _messageStore.getEnvironmentFacade().getEnvironment().flushLog(true); - DefaultUpgraderProvider upgraderProvider = new DefaultUpgraderProvider(this); - - DurableConfigurationRecoverer configRecoverer = - new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(), - upgraderProvider, getEventLogger()); - _messageStore.recoverConfigurationStore(configRecoverer); + ConfiguredObjectRecordHandler upgraderRecoverer = new ConfiguredObjectRecordRecoveverAndUpgrader(this, getDurableConfigurationRecoverers()); + _messageStore.visitConfiguredObjectRecords(upgraderRecoverer); initialiseModel(); - VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(BDBHAVirtualHost.this, getMessageStoreLogSubject()); - _messageStore.recoverMessageStore(recoveryHandler, recoveryHandler); + new MessageStoreRecoverer(this, getMessageStoreLogSubject()).recover(); attainActivation(); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index c64bc43066..652e4c135d 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -27,8 +27,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; import java.util.Map; import java.util.Random; import java.util.UUID; @@ -38,29 +36,23 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.log4j.Logger; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.store.EventManager; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreRecoveryHandler; -import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMemoryMessage; import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.store.TransactionLogRecoveryHandler; -import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler; import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.server.store.Xid; import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory.EnvironmentFacadeTask; import org.apache.qpid.server.store.berkeleydb.entry.HierarchyKey; import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction; import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey; -import org.apache.qpid.server.store.berkeleydb.entry.Xid; import org.apache.qpid.server.store.berkeleydb.tuple.ConfiguredObjectBinding; import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding; import org.apache.qpid.server.store.berkeleydb.tuple.HierarchyKeyBinding; @@ -70,6 +62,10 @@ import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding; import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding; import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding; import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader; +import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; +import org.apache.qpid.server.store.handler.DistributedTransactionHandler; +import org.apache.qpid.server.store.handler.MessageHandler; +import org.apache.qpid.server.store.handler.MessageInstanceHandler; import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.util.FileUtils; @@ -129,7 +125,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore private long _persistentSizeHighThreshold; private final EventManager _eventManager = new EventManager(); - private final String _type; private final EnvironmentFacadeFactory _environmentFacadeFactory; @@ -143,7 +138,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore public BDBMessageStore(EnvironmentFacadeFactory environmentFacadeFactory) { - _type = environmentFacadeFactory.getType(); _environmentFacadeFactory = environmentFacadeFactory; } @@ -160,18 +154,19 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { if (_environmentFacade == null) { - String[] databaseNames = null; + EnvironmentFacadeTask[] initialisationTasks = null; if (MapValueConverter.getBooleanAttribute(IS_MESSAGE_STORE_TOO, storeSettings, false)) { - databaseNames = new String[CONFIGURATION_STORE_DATABASE_NAMES.length + MESSAGE_STORE_DATABASE_NAMES.length]; + String[] databaseNames = new String[CONFIGURATION_STORE_DATABASE_NAMES.length + MESSAGE_STORE_DATABASE_NAMES.length]; System.arraycopy(CONFIGURATION_STORE_DATABASE_NAMES, 0, databaseNames, 0, CONFIGURATION_STORE_DATABASE_NAMES.length); System.arraycopy(MESSAGE_STORE_DATABASE_NAMES, 0, databaseNames, CONFIGURATION_STORE_DATABASE_NAMES.length, MESSAGE_STORE_DATABASE_NAMES.length); + initialisationTasks = new EnvironmentFacadeTask[]{new UpgradeTask(parent), new OpenDatabasesTask(databaseNames), new DiskSpaceTask(), new MaxMessageIdTask() }; } else { - databaseNames = CONFIGURATION_STORE_DATABASE_NAMES; + initialisationTasks = new EnvironmentFacadeTask[]{new UpgradeTask(parent), new OpenDatabasesTask(CONFIGURATION_STORE_DATABASE_NAMES)}; } - _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(storeSettings, new UpgradeTask(parent), new OpenDatabasesTask(databaseNames)); + _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(storeSettings, initialisationTasks); } else { @@ -181,11 +176,88 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } @Override - public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler) + public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) { checkConfigurationStoreOpen(); - recoverConfig(recoveryHandler); + try + { + int configVersion = getConfigVersion(); + + handler.begin(configVersion); + doVisitAllConfiguredObjectRecords(handler); + + int newConfigVersion = handler.end(); + if(newConfigVersion != configVersion) + { + updateConfigVersion(newConfigVersion); + } + } + catch (DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Cannot visit configured object records", e); + } + + } + + private void doVisitAllConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) + { + Map configuredObjects = new HashMap(); + Cursor objectsCursor = null; + Cursor hierarchyCursor = null; + try + { + objectsCursor = getConfiguredObjectsDb().openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + DatabaseEntry value = new DatabaseEntry(); + + + while (objectsCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + UUID id = UUIDTupleBinding.getInstance().entryToObject(key); + + BDBConfiguredObjectRecord configuredObject = + (BDBConfiguredObjectRecord) new ConfiguredObjectBinding(id).entryToObject(value); + configuredObjects.put(configuredObject.getId(), configuredObject); + } + + // set parents + hierarchyCursor = getConfiguredObjectHierarchyDb().openCursor(null, null); + while (hierarchyCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + HierarchyKey hk = HierarchyKeyBinding.getInstance().entryToObject(key); + UUID parentId = UUIDTupleBinding.getInstance().entryToObject(value); + BDBConfiguredObjectRecord child = configuredObjects.get(hk.getChildId()); + if(child != null) + { + ConfiguredObjectRecord parent = configuredObjects.get(parentId); + if(parent != null) + { + child.addParent(hk.getParentType(), parent); + } + else if(hk.getParentType().equals("Exchange")) + { + // TODO - remove this hack for the pre-defined exchanges + child.addParent(hk.getParentType(), new BDBConfiguredObjectRecord(parentId, "Exchange", Collections.emptyMap())); + } + } + } + } + finally + { + closeCursorSafely(objectsCursor); + closeCursorSafely(hierarchyCursor); + } + + for (ConfiguredObjectRecord record : configuredObjects.values()) + { + boolean shoudlContinue = handler.handle(record); + if (!shoudlContinue) + { + break; + } + } + } @Override @@ -209,7 +281,8 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore if (_environmentFacade == null) { - _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(messageStoreSettings, new UpgradeTask(parent), new OpenDatabasesTask(MESSAGE_STORE_DATABASE_NAMES), new DiskSpaceTask()); + _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(messageStoreSettings, + new UpgradeTask(parent), new OpenDatabasesTask(MESSAGE_STORE_DATABASE_NAMES), new DiskSpaceTask(), new MaxMessageIdTask()); } _committer = _environmentFacade.createCommitter(parent.getName()); @@ -217,21 +290,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } } - @Override - public synchronized void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler) throws StoreException - { - checkMessageStoreOpen(); - - if(messageRecoveryHandler != null) - { - recoverMessages(messageRecoveryHandler); - } - if(transactionLogRecoveryHandler != null) - { - recoverQueueEntries(transactionLogRecoveryHandler); - } - } - @Override public org.apache.qpid.server.store.Transaction newTransaction() throws StoreException { @@ -314,27 +372,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } } - private void recoverConfig(ConfigurationRecoveryHandler recoveryHandler) throws StoreException - { - try - { - final int configVersion = getConfigVersion(); - recoveryHandler.beginConfigurationRecovery(this, configVersion); - loadConfiguredObjects(recoveryHandler); - - final int newConfigVersion = recoveryHandler.completeConfigurationRecovery(); - if(newConfigVersion != configVersion) - { - updateConfigVersion(newConfigVersion); - } - } - catch (DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Error recovering persistent state: " + e.getMessage(), e); - } - - } - @SuppressWarnings("resource") private void updateConfigVersion(int newConfigVersion) throws StoreException { @@ -399,62 +436,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } } - private void loadConfiguredObjects(ConfigurationRecoveryHandler crh) throws DatabaseException, StoreException - { - Cursor objectsCursor = null; - Cursor hierarchyCursor = null; - try - { - objectsCursor = getConfiguredObjectsDb().openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - DatabaseEntry value = new DatabaseEntry(); - - Map configuredObjects = - new HashMap(); - - while (objectsCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - UUID id = UUIDTupleBinding.getInstance().entryToObject(key); - - BDBConfiguredObjectRecord configuredObject = - (BDBConfiguredObjectRecord) new ConfiguredObjectBinding(id).entryToObject(value); - configuredObjects.put(configuredObject.getId(), configuredObject); - } - - // set parents - hierarchyCursor = getConfiguredObjectHierarchyDb().openCursor(null, null); - while (hierarchyCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - HierarchyKey hk = HierarchyKeyBinding.getInstance().entryToObject(key); - UUID parentId = UUIDTupleBinding.getInstance().entryToObject(value); - BDBConfiguredObjectRecord child = configuredObjects.get(hk.getChildId()); - if(child != null) - { - ConfiguredObjectRecord parent = configuredObjects.get(parentId); - if(parent != null) - { - child.addParent(hk.getParentType(), parent); - } - else if(hk.getParentType().equals("Exchange")) - { - // TODO - remove this hack for the pre-defined exchanges - child.addParent(hk.getParentType(), new BDBConfiguredObjectRecord(parentId, "Exchange", Collections.emptyMap())); - } - } - } - - for (ConfiguredObjectRecord record : configuredObjects.values()) - { - crh.configuredObject(record); - } - } - finally - { - closeCursorSafely(objectsCursor); - closeCursorSafely(hierarchyCursor); - } - } - private void closeCursorSafely(Cursor cursor) throws StoreException { if (cursor != null) @@ -470,124 +451,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } } - private void recoverMessages(MessageStoreRecoveryHandler msrh) throws StoreException - { - StoredMessageRecoveryHandler mrh = msrh.begin(); - - Cursor cursor = null; - try - { - cursor = getMessageMetaDataDb().openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - DatabaseEntry value = new DatabaseEntry(); - MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance(); - - long maxId = 0; - - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - long messageId = LongBinding.entryToLong(key); - StorableMessageMetaData metaData = valueBinding.entryToObject(value); - - StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true); - - mrh.message(message); - - maxId = Math.max(maxId, messageId); - } - - _messageId.set(maxId); - mrh.completeMessageRecovery(); - } - catch (DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Cannot recover messages", e); - } - finally - { - closeCursorSafely(cursor); - } - } - - private void recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) - throws StoreException - { - QueueEntryRecoveryHandler qerh = recoveryHandler.begin(this); - - ArrayList entries = new ArrayList(); - - Cursor cursor = null; - try - { - cursor = getDeliveryDb().openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); - - DatabaseEntry value = new DatabaseEntry(); - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - QueueEntryKey qek = keyBinding.entryToObject(key); - - entries.add(qek); - } - - try - { - cursor.close(); - } - finally - { - cursor = null; - } - - for(QueueEntryKey entry : entries) - { - UUID queueId = entry.getQueueId(); - long messageId = entry.getMessageId(); - qerh.queueEntry(queueId, messageId); - } - } - catch (DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Cannot recover queue entries", e); - } - finally - { - closeCursorSafely(cursor); - } - - TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = qerh.completeQueueEntryRecovery(); - - cursor = null; - try - { - cursor = getXidDb().openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - XidBinding keyBinding = XidBinding.getInstance(); - PreparedTransactionBinding valueBinding = new PreparedTransactionBinding(); - DatabaseEntry value = new DatabaseEntry(); - - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - Xid xid = keyBinding.entryToObject(key); - PreparedTransaction preparedTransaction = valueBinding.entryToObject(value); - dtxrh.dtxRecord(xid.getFormat(),xid.getGlobalId(),xid.getBranchId(), - preparedTransaction.getEnqueues(),preparedTransaction.getDequeues()); - } - - } - catch (DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Cannot recover transactions", e); - } - finally - { - closeCursorSafely(cursor); - } - - - dtxrh.completeDtxRecordRecovery(); - } void removeMessage(long messageId, boolean sync) throws StoreException { @@ -738,6 +601,12 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore public void create(ConfiguredObjectRecord configuredObject) throws StoreException { checkConfigurationStoreOpen(); + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Create " + configuredObject); + } + com.sleepycat.je.Transaction txn = null; try { @@ -831,7 +700,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Updating " + record.getType() + ", id: " + record.getId()); + LOGGER.debug("Updating, creating " + createIfNecessary + " : " + record); } DatabaseEntry key = new DatabaseEntry(); @@ -889,8 +758,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore if (LOGGER.isDebugEnabled()) { LOGGER.debug("Enqueuing message " + messageId + " on queue " - + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId() - + " in transaction " + tx); + + queue.getName() + " with id " + queue.getId() + " in transaction " + tx); } getDeliveryDb().put(tx, key, value); } @@ -898,8 +766,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { LOGGER.error("Failed to enqueue: " + e.getMessage(), e); throw _environmentFacade.handleDatabaseException("Error writing enqueued message with id " + messageId + " for queue " - + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId() - + " to database", e); + + queue.getName() + " with id " + queue.getId() + " to database", e); } } @@ -924,7 +791,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore if (LOGGER.isDebugEnabled()) { LOGGER.debug("Dequeue message id " + messageId + " from queue " - + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id); + + queue.getName() + " with id " + id); } try @@ -934,19 +801,18 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore if (status == OperationStatus.NOTFOUND) { throw new StoreException("Unable to find message with id " + messageId + " on queue " - + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id); + + queue.getName() + " with id " + id); } else if (status != OperationStatus.SUCCESS) { throw new StoreException("Unable to remove message with id " + messageId + " on queue" - + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id); + + queue.getName() + " with id " + id); } if (LOGGER.isDebugEnabled()) { LOGGER.debug("Removed message " + messageId + " on queue " - + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id - + " from delivery db"); + + queue.getName() + " with id " + id); } } @@ -1071,57 +937,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } } - /** - * Primarily for testing purposes. - * - * @param queueId - * - * @return a list of message ids for messages enqueued for a particular queue - */ - List getEnqueuedMessages(UUID queueId) throws StoreException - { - Cursor cursor = null; - try - { - cursor = getDeliveryDb().openCursor(null, null); - - DatabaseEntry key = new DatabaseEntry(); - - QueueEntryKey dd = new QueueEntryKey(queueId, 0); - - QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); - keyBinding.objectToEntry(dd, key); - - DatabaseEntry value = new DatabaseEntry(); - - LinkedList messageIds = new LinkedList(); - - OperationStatus status = cursor.getSearchKeyRange(key, value, LockMode.DEFAULT); - dd = keyBinding.entryToObject(key); - - while ((status == OperationStatus.SUCCESS) && dd.getQueueId().equals(queueId)) - { - - messageIds.add(dd.getMessageId()); - status = cursor.getNext(key, value, LockMode.DEFAULT); - if (status == OperationStatus.SUCCESS) - { - dd = keyBinding.entryToObject(key); - } - } - - return messageIds; - } - catch (DatabaseException e) - { - throw new StoreException("Database error: " + e.getMessage(), e); - } - finally - { - closeCursorSafely(cursor); - } - } - /** * Return a valid, currently unused message id. * @@ -1792,12 +1607,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } } - @Override - public String getStoreType() - { - return _type; - } - private Database getConfiguredObjectsDb() { return _environmentFacade.getOpenDatabase(CONFIGURED_OBJECTS_DB_NAME); @@ -1901,4 +1710,147 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } } + + public class MaxMessageIdTask implements EnvironmentFacadeTask, MessageHandler + { + private long _maxId; + + @Override + public void execute(EnvironmentFacade facade) + { + visitMessagesInternal(this, facade); + _messageId.set(_maxId); + } + + @Override + public boolean handle(StoredMessage storedMessage) + { + long id = storedMessage.getMessageNumber(); + if (_maxId { diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java new file mode 100644 index 0000000000..6fba1b215e --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java @@ -0,0 +1,436 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.protocol.v0_10.MessageMetaDataType_0_10; +import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10; +import org.apache.qpid.server.protocol.v0_8.MessageMetaData; +import org.apache.qpid.server.protocol.v0_8.MessageMetaDataType_0_8; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreTestCase; +import org.apache.qpid.server.store.StorableMessageMetaData; +import org.apache.qpid.server.store.StoreException; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.MessageDeliveryMode; +import org.apache.qpid.transport.MessageDeliveryPriority; +import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.MessageTransfer; +import org.apache.qpid.util.FileUtils; + +/** + * Subclass of MessageStoreTestCase which runs the standard tests from the superclass against + * the BDB Store as well as additional tests specific to the BDB store-implementation. + */ +public class BDBMessageStoreTest extends MessageStoreTestCase +{ + private static byte[] CONTENT_BYTES = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; + + private String _storeLocation; + + @Override + protected void tearDown() throws Exception + { + try + { + super.tearDown(); + } + finally + { + deleteStoreIfExists(); + } + } + + /** + * Tests that message metadata and content are successfully read back from a + * store after it has been reloaded. Both 0-8 and 0-10 metadata is used to + * verify their ability to co-exist within the store and be successful retrieved. + */ + public void testBDBMessagePersistence() throws Exception + { + BDBMessageStore bdbStore = (BDBMessageStore)getStore(); + + // Create content ByteBuffers. + // Split the content into 2 chunks for the 0-8 message, as per broker behaviour. + // Use a single chunk for the 0-10 message as per broker behaviour. + String bodyText = "jfhdjsflsdhfjdshfjdslhfjdslhfsjlhfsjkhfdsjkhfdsjkfhdslkjf"; + + ByteBuffer firstContentBytes_0_8 = ByteBuffer.wrap(bodyText.substring(0, 10).getBytes()); + ByteBuffer secondContentBytes_0_8 = ByteBuffer.wrap(bodyText.substring(10).getBytes()); + + ByteBuffer completeContentBody_0_10 = ByteBuffer.wrap(bodyText.getBytes()); + int bodySize = completeContentBody_0_10.limit(); + + /* + * Create and insert a 0-8 message (metadata and multi-chunk content) + */ + MessagePublishInfo pubInfoBody_0_8 = createPublishInfoBody_0_8(); + BasicContentHeaderProperties props_0_8 = createContentHeaderProperties_0_8(); + + ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize); + + MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8); + StoredMessage storedMessage_0_8 = bdbStore.addMessage(messageMetaData_0_8); + + long origArrivalTime_0_8 = messageMetaData_0_8.getArrivalTime(); + long messageid_0_8 = storedMessage_0_8.getMessageNumber(); + + storedMessage_0_8.addContent(0, firstContentBytes_0_8); + storedMessage_0_8.addContent(firstContentBytes_0_8.limit(), secondContentBytes_0_8); + storedMessage_0_8.flushToStore(); + + /* + * Create and insert a 0-10 message (metadata and content) + */ + MessageProperties msgProps_0_10 = createMessageProperties_0_10(bodySize); + DeliveryProperties delProps_0_10 = createDeliveryProperties_0_10(); + Header header_0_10 = new Header(delProps_0_10, msgProps_0_10); + + MessageTransfer xfr_0_10 = new MessageTransfer("destination", MessageAcceptMode.EXPLICIT, + MessageAcquireMode.PRE_ACQUIRED, header_0_10, completeContentBody_0_10); + + MessageMetaData_0_10 messageMetaData_0_10 = new MessageMetaData_0_10(xfr_0_10); + StoredMessage storedMessage_0_10 = bdbStore.addMessage(messageMetaData_0_10); + + long origArrivalTime_0_10 = messageMetaData_0_10.getArrivalTime(); + long messageid_0_10 = storedMessage_0_10.getMessageNumber(); + + storedMessage_0_10.addContent(0, completeContentBody_0_10); + storedMessage_0_10.flushToStore(); + + /* + * reload the store only (read-only) + */ + reopenStore(); + + /* + * Read back and validate the 0-8 message metadata and content + */ + BDBMessageStore reopenedBdbStore = (BDBMessageStore) getStore(); + StorableMessageMetaData storeableMMD_0_8 = reopenedBdbStore.getMessageMetaData(messageid_0_8); + + assertEquals("Unexpected message type", MessageMetaDataType_0_8.TYPE, storeableMMD_0_8.getType().ordinal()); + assertTrue("Unexpected instance type", storeableMMD_0_8 instanceof MessageMetaData); + MessageMetaData returnedMMD_0_8 = (MessageMetaData) storeableMMD_0_8; + + assertEquals("Message arrival time has changed", origArrivalTime_0_8, returnedMMD_0_8.getArrivalTime()); + + MessagePublishInfo returnedPubBody_0_8 = returnedMMD_0_8.getMessagePublishInfo(); + assertEquals("Message exchange has changed", pubInfoBody_0_8.getExchange(), returnedPubBody_0_8.getExchange()); + assertEquals("Immediate flag has changed", pubInfoBody_0_8.isImmediate(), returnedPubBody_0_8.isImmediate()); + assertEquals("Mandatory flag has changed", pubInfoBody_0_8.isMandatory(), returnedPubBody_0_8.isMandatory()); + assertEquals("Routing key has changed", pubInfoBody_0_8.getRoutingKey(), returnedPubBody_0_8.getRoutingKey()); + + ContentHeaderBody returnedHeaderBody_0_8 = returnedMMD_0_8.getContentHeaderBody(); + assertEquals("ContentHeader ClassID has changed", chb_0_8.getClassId(), returnedHeaderBody_0_8.getClassId()); + assertEquals("ContentHeader weight has changed", chb_0_8.getWeight(), returnedHeaderBody_0_8.getWeight()); + assertEquals("ContentHeader bodySize has changed", chb_0_8.getBodySize(), returnedHeaderBody_0_8.getBodySize()); + + BasicContentHeaderProperties returnedProperties_0_8 = returnedHeaderBody_0_8.getProperties(); + assertEquals("Property ContentType has changed", props_0_8.getContentTypeAsString(), returnedProperties_0_8.getContentTypeAsString()); + assertEquals("Property MessageID has changed", props_0_8.getMessageIdAsString(), returnedProperties_0_8.getMessageIdAsString()); + + ByteBuffer recoveredContent_0_8 = ByteBuffer.allocate((int) chb_0_8.getBodySize()) ; + long recoveredCount_0_8 = reopenedBdbStore.getContent(messageid_0_8, 0, recoveredContent_0_8); + assertEquals("Incorrect amount of payload data recovered", chb_0_8.getBodySize(), recoveredCount_0_8); + String returnedPayloadString_0_8 = new String(recoveredContent_0_8.array()); + assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_8); + + /* + * Read back and validate the 0-10 message metadata and content + */ + StorableMessageMetaData storeableMMD_0_10 = reopenedBdbStore.getMessageMetaData(messageid_0_10); + + assertEquals("Unexpected message type", MessageMetaDataType_0_10.TYPE, storeableMMD_0_10.getType().ordinal()); + assertTrue("Unexpected instance type", storeableMMD_0_10 instanceof MessageMetaData_0_10); + MessageMetaData_0_10 returnedMMD_0_10 = (MessageMetaData_0_10) storeableMMD_0_10; + + assertEquals("Message arrival time has changed", origArrivalTime_0_10, returnedMMD_0_10.getArrivalTime()); + + DeliveryProperties returnedDelProps_0_10 = returnedMMD_0_10.getHeader().getDeliveryProperties(); + assertNotNull("DeliveryProperties were not returned", returnedDelProps_0_10); + assertEquals("Immediate flag has changed", delProps_0_10.getImmediate(), returnedDelProps_0_10.getImmediate()); + assertEquals("Routing key has changed", delProps_0_10.getRoutingKey(), returnedDelProps_0_10.getRoutingKey()); + assertEquals("Message exchange has changed", delProps_0_10.getExchange(), returnedDelProps_0_10.getExchange()); + assertEquals("Message expiration has changed", delProps_0_10.getExpiration(), returnedDelProps_0_10.getExpiration()); + assertEquals("Message delivery priority has changed", delProps_0_10.getPriority(), returnedDelProps_0_10.getPriority()); + + MessageProperties returnedMsgProps = returnedMMD_0_10.getHeader().getMessageProperties(); + assertNotNull("MessageProperties were not returned", returnedMsgProps); + assertTrue("Message correlationID has changed", Arrays.equals(msgProps_0_10.getCorrelationId(), returnedMsgProps.getCorrelationId())); + assertEquals("Message content length has changed", msgProps_0_10.getContentLength(), returnedMsgProps.getContentLength()); + assertEquals("Message content type has changed", msgProps_0_10.getContentType(), returnedMsgProps.getContentType()); + + ByteBuffer recoveredContent = ByteBuffer.allocate((int) msgProps_0_10.getContentLength()) ; + long recoveredCount = reopenedBdbStore.getContent(messageid_0_10, 0, recoveredContent); + assertEquals("Incorrect amount of payload data recovered", msgProps_0_10.getContentLength(), recoveredCount); + + String returnedPayloadString_0_10 = new String(recoveredContent.array()); + assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_10); + + reopenedBdbStore.closeMessageStore(); + } + + private DeliveryProperties createDeliveryProperties_0_10() + { + DeliveryProperties delProps_0_10 = new DeliveryProperties(); + + delProps_0_10.setDeliveryMode(MessageDeliveryMode.PERSISTENT); + delProps_0_10.setImmediate(true); + delProps_0_10.setExchange("exchange12345"); + delProps_0_10.setRoutingKey("routingKey12345"); + delProps_0_10.setExpiration(5); + delProps_0_10.setPriority(MessageDeliveryPriority.ABOVE_AVERAGE); + + return delProps_0_10; + } + + private MessageProperties createMessageProperties_0_10(int bodySize) + { + MessageProperties msgProps_0_10 = new MessageProperties(); + msgProps_0_10.setContentLength(bodySize); + msgProps_0_10.setCorrelationId("qwerty".getBytes()); + msgProps_0_10.setContentType("text/html"); + + return msgProps_0_10; + } + + + private MessagePublishInfo createPublishInfoBody_0_8() + { + return new MessagePublishInfo() + { + public AMQShortString getExchange() + { + return new AMQShortString("exchange12345"); + } + + @Override + public void setExchange(AMQShortString exchange) + { + } + + @Override + public boolean isImmediate() + { + return false; + } + + @Override + public boolean isMandatory() + { + return true; + } + + @Override + public AMQShortString getRoutingKey() + { + return new AMQShortString("routingKey12345"); + } + }; + + } + + private ContentHeaderBody createContentHeaderBody_0_8(BasicContentHeaderProperties props, int length) + { + MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9); + int classForBasic = methodRegistry.createBasicQosOkBody().getClazz(); + return new ContentHeaderBody(classForBasic, 1, props, length); + } + + private BasicContentHeaderProperties createContentHeaderProperties_0_8() + { + BasicContentHeaderProperties props = new BasicContentHeaderProperties(); + props.setDeliveryMode(Integer.valueOf(BasicContentHeaderProperties.PERSISTENT).byteValue()); + props.setContentType("text/html"); + props.getHeaders().setString("Test", "MST"); + return props; + } + + public void testGetContentWithOffset() throws Exception + { + BDBMessageStore bdbStore = (BDBMessageStore) getStore(); + StoredMessage storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(bdbStore); + long messageid_0_8 = storedMessage_0_8.getMessageNumber(); + + // normal case: offset is 0 + ByteBuffer dst = ByteBuffer.allocate(10); + int length = bdbStore.getContent(messageid_0_8, 0, dst); + assertEquals("Unexpected length", CONTENT_BYTES.length, length); + byte[] array = dst.array(); + assertTrue("Unexpected content", Arrays.equals(CONTENT_BYTES, array)); + + // offset is in the middle + dst = ByteBuffer.allocate(10); + length = bdbStore.getContent(messageid_0_8, 5, dst); + assertEquals("Unexpected length", 5, length); + array = dst.array(); + byte[] expected = new byte[10]; + System.arraycopy(CONTENT_BYTES, 5, expected, 0, 5); + assertTrue("Unexpected content", Arrays.equals(expected, array)); + + // offset beyond the content length + dst = ByteBuffer.allocate(10); + try + { + bdbStore.getContent(messageid_0_8, 15, dst); + fail("Should fail for the offset greater than message size"); + } + catch (RuntimeException e) + { + assertEquals("Unexpected exception message", "Offset 15 is greater than message size 10 for message id " + + messageid_0_8 + "!", e.getMessage()); + } + + // buffer is smaller then message size + dst = ByteBuffer.allocate(5); + length = bdbStore.getContent(messageid_0_8, 0, dst); + assertEquals("Unexpected length", 5, length); + array = dst.array(); + expected = new byte[5]; + System.arraycopy(CONTENT_BYTES, 0, expected, 0, 5); + assertTrue("Unexpected content", Arrays.equals(expected, array)); + + // buffer is smaller then message size, offset is not 0 + dst = ByteBuffer.allocate(5); + length = bdbStore.getContent(messageid_0_8, 2, dst); + assertEquals("Unexpected length", 5, length); + array = dst.array(); + expected = new byte[5]; + System.arraycopy(CONTENT_BYTES, 2, expected, 0, 5); + assertTrue("Unexpected content", Arrays.equals(expected, array)); + } + + /** + * Tests that messages which are added to the store and then removed using the + * public MessageStore interfaces are actually removed from the store by then + * interrogating the store with its own implementation methods and verifying + * expected exceptions are thrown to indicate the message is not present. + */ + public void testMessageCreationAndRemoval() throws Exception + { + BDBMessageStore bdbStore = (BDBMessageStore)getStore(); + + StoredMessage storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(bdbStore); + long messageid_0_8 = storedMessage_0_8.getMessageNumber(); + + bdbStore.removeMessage(messageid_0_8, true); + + //verify the removal using the BDB store implementation methods directly + try + { + // the next line should throw since the message id should not be found + bdbStore.getMessageMetaData(messageid_0_8); + fail("No exception thrown when message id not found getting metadata"); + } + catch (StoreException e) + { + // pass since exception expected + } + + //expecting no content, allocate a 1 byte + ByteBuffer dst = ByteBuffer.allocate(1); + + assertEquals("Retrieved content when none was expected", + 0, bdbStore.getContent(messageid_0_8, 0, dst)); + } + + private StoredMessage createAndStoreSingleChunkMessage_0_8(MessageStore store) + { + ByteBuffer chunk1 = ByteBuffer.wrap(CONTENT_BYTES); + + int bodySize = CONTENT_BYTES.length; + + //create and store the message using the MessageStore interface + MessagePublishInfo pubInfoBody_0_8 = createPublishInfoBody_0_8(); + BasicContentHeaderProperties props_0_8 = createContentHeaderProperties_0_8(); + + ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize); + + MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8); + StoredMessage storedMessage_0_8 = store.addMessage(messageMetaData_0_8); + + storedMessage_0_8.addContent(0, chunk1); + storedMessage_0_8.flushToStore(); + + return storedMessage_0_8; + } + + public void testOnDelete() throws Exception + { + String storeLocation = getStore().getStoreLocation(); + + File location = new File(storeLocation); + assertTrue("Store does not exist at " + storeLocation, location.exists()); + + getStore().closeMessageStore(); + assertTrue("Store does not exist at " + storeLocation, location.exists()); + + getStore().onDelete(); + assertFalse("Store exists at " + storeLocation, location.exists()); + } + + + @Override + protected Map getStoreSettings() throws Exception + { + _storeLocation = TMP_FOLDER + File.separator + getTestName(); + deleteStoreIfExists(); + Map messageStoreSettings = new HashMap(); + messageStoreSettings.put(MessageStore.STORE_PATH, _storeLocation); + return messageStoreSettings; + + } + + private void deleteStoreIfExists() + { + if (_storeLocation != null) + { + File location = new File(_storeLocation); + if (location.exists()) + { + FileUtils.delete(location, true); + } + } + } + + @Override + protected MessageStore createMessageStore() + { + return new BDBMessageStore(); + } + +} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java index 0460b1ce4c..717534a6b8 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java @@ -48,7 +48,7 @@ import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.QueueArgumentsConverter; -import org.apache.qpid.server.store.berkeleydb.entry.Xid; +import org.apache.qpid.server.store.Xid; import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding; import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CompoundKey; import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CompoundKeyBinding; diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java deleted file mode 100644 index 465c49e0c4..0000000000 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java +++ /dev/null @@ -1,666 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.File; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.List; -import java.util.UUID; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.message.AMQMessageHeader; -import org.apache.qpid.server.message.EnqueueableMessage; -import org.apache.qpid.server.message.MessageReference; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.protocol.v0_10.MessageMetaDataType_0_10; -import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10; -import org.apache.qpid.server.protocol.v0_8.MessageMetaData; -import org.apache.qpid.server.protocol.v0_8.MessageMetaDataType_0_8; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreRecoveryHandler; -import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; -import org.apache.qpid.server.store.MessageStoreTest; -import org.apache.qpid.server.store.StorableMessageMetaData; -import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.store.Transaction; -import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.transport.DeliveryProperties; -import org.apache.qpid.transport.Header; -import org.apache.qpid.transport.MessageAcceptMode; -import org.apache.qpid.transport.MessageAcquireMode; -import org.apache.qpid.transport.MessageDeliveryMode; -import org.apache.qpid.transport.MessageDeliveryPriority; -import org.apache.qpid.transport.MessageProperties; -import org.apache.qpid.transport.MessageTransfer; - -/** - * Subclass of MessageStoreTest which runs the standard tests from the superclass against - * the BDB Store as well as additional tests specific to the BDB store-implementation. - */ -public class BDBMessageStoreTest extends MessageStoreTest -{ - private static byte[] CONTENT_BYTES = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; - - /** - * Tests that message metadata and content are successfully read back from a - * store after it has been reloaded. Both 0-8 and 0-10 metadata is used to - * verify their ability to co-exist within the store and be successful retrieved. - */ - public void testBDBMessagePersistence() throws Exception - { - MessageStore store = getVirtualHost().getMessageStore(); - - BDBMessageStore bdbStore = assertBDBStore(store); - - // Create content ByteBuffers. - // Split the content into 2 chunks for the 0-8 message, as per broker behaviour. - // Use a single chunk for the 0-10 message as per broker behaviour. - String bodyText = "jfhdjsflsdhfjdshfjdslhfjdslhfsjlhfsjkhfdsjkhfdsjkfhdslkjf"; - - ByteBuffer firstContentBytes_0_8 = ByteBuffer.wrap(bodyText.substring(0, 10).getBytes()); - ByteBuffer secondContentBytes_0_8 = ByteBuffer.wrap(bodyText.substring(10).getBytes()); - - ByteBuffer completeContentBody_0_10 = ByteBuffer.wrap(bodyText.getBytes()); - int bodySize = completeContentBody_0_10.limit(); - - /* - * Create and insert a 0-8 message (metadata and multi-chunk content) - */ - MessagePublishInfo pubInfoBody_0_8 = createPublishInfoBody_0_8(); - BasicContentHeaderProperties props_0_8 = createContentHeaderProperties_0_8(); - - ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize); - - MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8); - StoredMessage storedMessage_0_8 = bdbStore.addMessage(messageMetaData_0_8); - - long origArrivalTime_0_8 = messageMetaData_0_8.getArrivalTime(); - long messageid_0_8 = storedMessage_0_8.getMessageNumber(); - - storedMessage_0_8.addContent(0, firstContentBytes_0_8); - storedMessage_0_8.addContent(firstContentBytes_0_8.limit(), secondContentBytes_0_8); - storedMessage_0_8.flushToStore(); - - /* - * Create and insert a 0-10 message (metadata and content) - */ - MessageProperties msgProps_0_10 = createMessageProperties_0_10(bodySize); - DeliveryProperties delProps_0_10 = createDeliveryProperties_0_10(); - Header header_0_10 = new Header(delProps_0_10, msgProps_0_10); - - MessageTransfer xfr_0_10 = new MessageTransfer("destination", MessageAcceptMode.EXPLICIT, - MessageAcquireMode.PRE_ACQUIRED, header_0_10, completeContentBody_0_10); - - MessageMetaData_0_10 messageMetaData_0_10 = new MessageMetaData_0_10(xfr_0_10); - StoredMessage storedMessage_0_10 = bdbStore.addMessage(messageMetaData_0_10); - - long origArrivalTime_0_10 = messageMetaData_0_10.getArrivalTime(); - long messageid_0_10 = storedMessage_0_10.getMessageNumber(); - - storedMessage_0_10.addContent(0, completeContentBody_0_10); - storedMessage_0_10.flushToStore(); - - /* - * reload the store only (read-only) - */ - BDBMessageStore readOnlyStore = reloadStore(bdbStore); - - /* - * Read back and validate the 0-8 message metadata and content - */ - StorableMessageMetaData storeableMMD_0_8 = readOnlyStore.getMessageMetaData(messageid_0_8); - - assertEquals("Unexpected message type", MessageMetaDataType_0_8.TYPE, storeableMMD_0_8.getType().ordinal()); - assertTrue("Unexpected instance type", storeableMMD_0_8 instanceof MessageMetaData); - MessageMetaData returnedMMD_0_8 = (MessageMetaData) storeableMMD_0_8; - - assertEquals("Message arrival time has changed", origArrivalTime_0_8, returnedMMD_0_8.getArrivalTime()); - - MessagePublishInfo returnedPubBody_0_8 = returnedMMD_0_8.getMessagePublishInfo(); - assertEquals("Message exchange has changed", pubInfoBody_0_8.getExchange(), returnedPubBody_0_8.getExchange()); - assertEquals("Immediate flag has changed", pubInfoBody_0_8.isImmediate(), returnedPubBody_0_8.isImmediate()); - assertEquals("Mandatory flag has changed", pubInfoBody_0_8.isMandatory(), returnedPubBody_0_8.isMandatory()); - assertEquals("Routing key has changed", pubInfoBody_0_8.getRoutingKey(), returnedPubBody_0_8.getRoutingKey()); - - ContentHeaderBody returnedHeaderBody_0_8 = returnedMMD_0_8.getContentHeaderBody(); - assertEquals("ContentHeader ClassID has changed", chb_0_8.getClassId(), returnedHeaderBody_0_8.getClassId()); - assertEquals("ContentHeader weight has changed", chb_0_8.getWeight(), returnedHeaderBody_0_8.getWeight()); - assertEquals("ContentHeader bodySize has changed", chb_0_8.getBodySize(), returnedHeaderBody_0_8.getBodySize()); - - BasicContentHeaderProperties returnedProperties_0_8 = returnedHeaderBody_0_8.getProperties(); - assertEquals("Property ContentType has changed", props_0_8.getContentTypeAsString(), returnedProperties_0_8.getContentTypeAsString()); - assertEquals("Property MessageID has changed", props_0_8.getMessageIdAsString(), returnedProperties_0_8.getMessageIdAsString()); - - ByteBuffer recoveredContent_0_8 = ByteBuffer.allocate((int) chb_0_8.getBodySize()) ; - long recoveredCount_0_8 = readOnlyStore.getContent(messageid_0_8, 0, recoveredContent_0_8); - assertEquals("Incorrect amount of payload data recovered", chb_0_8.getBodySize(), recoveredCount_0_8); - String returnedPayloadString_0_8 = new String(recoveredContent_0_8.array()); - assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_8); - - /* - * Read back and validate the 0-10 message metadata and content - */ - StorableMessageMetaData storeableMMD_0_10 = readOnlyStore.getMessageMetaData(messageid_0_10); - - assertEquals("Unexpected message type", MessageMetaDataType_0_10.TYPE, storeableMMD_0_10.getType().ordinal()); - assertTrue("Unexpected instance type", storeableMMD_0_10 instanceof MessageMetaData_0_10); - MessageMetaData_0_10 returnedMMD_0_10 = (MessageMetaData_0_10) storeableMMD_0_10; - - assertEquals("Message arrival time has changed", origArrivalTime_0_10, returnedMMD_0_10.getArrivalTime()); - - DeliveryProperties returnedDelProps_0_10 = returnedMMD_0_10.getHeader().getDeliveryProperties(); - assertNotNull("DeliveryProperties were not returned", returnedDelProps_0_10); - assertEquals("Immediate flag has changed", delProps_0_10.getImmediate(), returnedDelProps_0_10.getImmediate()); - assertEquals("Routing key has changed", delProps_0_10.getRoutingKey(), returnedDelProps_0_10.getRoutingKey()); - assertEquals("Message exchange has changed", delProps_0_10.getExchange(), returnedDelProps_0_10.getExchange()); - assertEquals("Message expiration has changed", delProps_0_10.getExpiration(), returnedDelProps_0_10.getExpiration()); - assertEquals("Message delivery priority has changed", delProps_0_10.getPriority(), returnedDelProps_0_10.getPriority()); - - MessageProperties returnedMsgProps = returnedMMD_0_10.getHeader().getMessageProperties(); - assertNotNull("MessageProperties were not returned", returnedMsgProps); - assertTrue("Message correlationID has changed", Arrays.equals(msgProps_0_10.getCorrelationId(), returnedMsgProps.getCorrelationId())); - assertEquals("Message content length has changed", msgProps_0_10.getContentLength(), returnedMsgProps.getContentLength()); - assertEquals("Message content type has changed", msgProps_0_10.getContentType(), returnedMsgProps.getContentType()); - - ByteBuffer recoveredContent = ByteBuffer.allocate((int) msgProps_0_10.getContentLength()) ; - long recoveredCount = readOnlyStore.getContent(messageid_0_10, 0, recoveredContent); - assertEquals("Incorrect amount of payload data recovered", msgProps_0_10.getContentLength(), recoveredCount); - - String returnedPayloadString_0_10 = new String(recoveredContent.array()); - assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_10); - - readOnlyStore.closeMessageStore(); - } - - private DeliveryProperties createDeliveryProperties_0_10() - { - DeliveryProperties delProps_0_10 = new DeliveryProperties(); - - delProps_0_10.setDeliveryMode(MessageDeliveryMode.PERSISTENT); - delProps_0_10.setImmediate(true); - delProps_0_10.setExchange("exchange12345"); - delProps_0_10.setRoutingKey("routingKey12345"); - delProps_0_10.setExpiration(5); - delProps_0_10.setPriority(MessageDeliveryPriority.ABOVE_AVERAGE); - - return delProps_0_10; - } - - private MessageProperties createMessageProperties_0_10(int bodySize) - { - MessageProperties msgProps_0_10 = new MessageProperties(); - msgProps_0_10.setContentLength(bodySize); - msgProps_0_10.setCorrelationId("qwerty".getBytes()); - msgProps_0_10.setContentType("text/html"); - - return msgProps_0_10; - } - - /** - * Close the provided store and create a new (read-only) store to read back the data. - * - * Use this method instead of reloading the virtual host like other tests in order - * to avoid the recovery handler deleting the message for not being on a queue. - */ - private BDBMessageStore reloadStore(BDBMessageStore messageStore) throws Exception - { - messageStore.closeMessageStore(); - - - BDBMessageStore newStore = new BDBMessageStore(); - - MessageStoreRecoveryHandler recoveryHandler = mock(MessageStoreRecoveryHandler.class); - when(recoveryHandler.begin()).thenReturn(mock(StoredMessageRecoveryHandler.class)); - VirtualHost virtualHost = getVirtualHostModel(); - newStore.openMessageStore(virtualHost, virtualHost.getMessageStoreSettings()); - - newStore.recoverMessageStore(recoveryHandler, null); - - return newStore; - } - - private MessagePublishInfo createPublishInfoBody_0_8() - { - return new MessagePublishInfo() - { - public AMQShortString getExchange() - { - return new AMQShortString("exchange12345"); - } - - public void setExchange(AMQShortString exchange) - { - } - - public boolean isImmediate() - { - return false; - } - - public boolean isMandatory() - { - return true; - } - - public AMQShortString getRoutingKey() - { - return new AMQShortString("routingKey12345"); - } - }; - - } - - private ContentHeaderBody createContentHeaderBody_0_8(BasicContentHeaderProperties props, int length) - { - MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9); - int classForBasic = methodRegistry.createBasicQosOkBody().getClazz(); - return new ContentHeaderBody(classForBasic, 1, props, length); - } - - private BasicContentHeaderProperties createContentHeaderProperties_0_8() - { - BasicContentHeaderProperties props = new BasicContentHeaderProperties(); - props.setDeliveryMode(Integer.valueOf(BasicContentHeaderProperties.PERSISTENT).byteValue()); - props.setContentType("text/html"); - props.getHeaders().setString("Test", "MST"); - return props; - } - - public void testGetContentWithOffset() throws Exception - { - MessageStore store = getVirtualHost().getMessageStore(); - BDBMessageStore bdbStore = assertBDBStore(store); - StoredMessage storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store); - long messageid_0_8 = storedMessage_0_8.getMessageNumber(); - - // normal case: offset is 0 - ByteBuffer dst = ByteBuffer.allocate(10); - int length = bdbStore.getContent(messageid_0_8, 0, dst); - assertEquals("Unexpected length", CONTENT_BYTES.length, length); - byte[] array = dst.array(); - assertTrue("Unexpected content", Arrays.equals(CONTENT_BYTES, array)); - - // offset is in the middle - dst = ByteBuffer.allocate(10); - length = bdbStore.getContent(messageid_0_8, 5, dst); - assertEquals("Unexpected length", 5, length); - array = dst.array(); - byte[] expected = new byte[10]; - System.arraycopy(CONTENT_BYTES, 5, expected, 0, 5); - assertTrue("Unexpected content", Arrays.equals(expected, array)); - - // offset beyond the content length - dst = ByteBuffer.allocate(10); - try - { - bdbStore.getContent(messageid_0_8, 15, dst); - fail("Should fail for the offset greater than message size"); - } - catch (RuntimeException e) - { - assertEquals("Unexpected exception message", "Offset 15 is greater than message size 10 for message id " - + messageid_0_8 + "!", e.getMessage()); - } - - // buffer is smaller then message size - dst = ByteBuffer.allocate(5); - length = bdbStore.getContent(messageid_0_8, 0, dst); - assertEquals("Unexpected length", 5, length); - array = dst.array(); - expected = new byte[5]; - System.arraycopy(CONTENT_BYTES, 0, expected, 0, 5); - assertTrue("Unexpected content", Arrays.equals(expected, array)); - - // buffer is smaller then message size, offset is not 0 - dst = ByteBuffer.allocate(5); - length = bdbStore.getContent(messageid_0_8, 2, dst); - assertEquals("Unexpected length", 5, length); - array = dst.array(); - expected = new byte[5]; - System.arraycopy(CONTENT_BYTES, 2, expected, 0, 5); - assertTrue("Unexpected content", Arrays.equals(expected, array)); - } - /** - * Tests that messages which are added to the store and then removed using the - * public MessageStore interfaces are actually removed from the store by then - * interrogating the store with its own implementation methods and verifying - * expected exceptions are thrown to indicate the message is not present. - */ - public void testMessageCreationAndRemoval() throws Exception - { - MessageStore store = getVirtualHost().getMessageStore(); - BDBMessageStore bdbStore = assertBDBStore(store); - - StoredMessage storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store); - long messageid_0_8 = storedMessage_0_8.getMessageNumber(); - - bdbStore.removeMessage(messageid_0_8, true); - - //verify the removal using the BDB store implementation methods directly - try - { - // the next line should throw since the message id should not be found - bdbStore.getMessageMetaData(messageid_0_8); - fail("No exception thrown when message id not found getting metadata"); - } - catch (StoreException e) - { - // pass since exception expected - } - - //expecting no content, allocate a 1 byte - ByteBuffer dst = ByteBuffer.allocate(1); - - assertEquals("Retrieved content when none was expected", - 0, bdbStore.getContent(messageid_0_8, 0, dst)); - } - private BDBMessageStore assertBDBStore(MessageStore store) - { - - assertEquals("Test requires an instance of BDBMessageStore to proceed", BDBMessageStore.class, store.getClass()); - - return (BDBMessageStore) store; - } - - private StoredMessage createAndStoreSingleChunkMessage_0_8(MessageStore store) - { - ByteBuffer chunk1 = ByteBuffer.wrap(CONTENT_BYTES); - - int bodySize = CONTENT_BYTES.length; - - //create and store the message using the MessageStore interface - MessagePublishInfo pubInfoBody_0_8 = createPublishInfoBody_0_8(); - BasicContentHeaderProperties props_0_8 = createContentHeaderProperties_0_8(); - - ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize); - - MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8); - StoredMessage storedMessage_0_8 = store.addMessage(messageMetaData_0_8); - - storedMessage_0_8.addContent(0, chunk1); - storedMessage_0_8.flushToStore(); - - return storedMessage_0_8; - } - - /** - * Tests transaction commit by utilising the enqueue and dequeue methods available - * in the TransactionLog interface implemented by the store, and verifying the - * behaviour using BDB implementation methods. - */ - public void testTranCommit() throws Exception - { - MessageStore log = getVirtualHost().getMessageStore(); - - BDBMessageStore bdbStore = assertBDBStore(log); - - final UUID mockQueueId = UUIDGenerator.generateRandomUUID(); - TransactionLogResource mockQueue = new TransactionLogResource() - { - @Override - public String getName() - { - return getId().toString(); - } - - @Override - public UUID getId() - { - return mockQueueId; - } - - @Override - public boolean isDurable() - { - return true; - } - }; - - Transaction txn = log.newTransaction(); - - txn.enqueueMessage(mockQueue, new MockMessage(1L)); - txn.enqueueMessage(mockQueue, new MockMessage(5L)); - txn.commitTran(); - - List enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueId); - - assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size()); - Long val = enqueuedIds.get(0); - assertEquals("First Message is incorrect", 1L, val.longValue()); - val = enqueuedIds.get(1); - assertEquals("Second Message is incorrect", 5L, val.longValue()); - } - - - /** - * Tests transaction rollback before a commit has occurred by utilising the - * enqueue and dequeue methods available in the TransactionLog interface - * implemented by the store, and verifying the behaviour using BDB - * implementation methods. - */ - public void testTranRollbackBeforeCommit() throws Exception - { - MessageStore log = getVirtualHost().getMessageStore(); - - BDBMessageStore bdbStore = assertBDBStore(log); - - final UUID mockQueueId = UUIDGenerator.generateRandomUUID(); - TransactionLogResource mockQueue = new TransactionLogResource() - { - @Override - public String getName() - { - return getId().toString(); - } - - @Override - public UUID getId() - { - return mockQueueId; - } - - @Override - public boolean isDurable() - { - return true; - } - }; - - Transaction txn = log.newTransaction(); - - txn.enqueueMessage(mockQueue, new MockMessage(21L)); - txn.abortTran(); - - txn = log.newTransaction(); - txn.enqueueMessage(mockQueue, new MockMessage(22L)); - txn.enqueueMessage(mockQueue, new MockMessage(23L)); - txn.commitTran(); - - List enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueId); - - assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size()); - Long val = enqueuedIds.get(0); - assertEquals("First Message is incorrect", 22L, val.longValue()); - val = enqueuedIds.get(1); - assertEquals("Second Message is incorrect", 23L, val.longValue()); - } - - public void testOnDelete() throws Exception - { - MessageStore log = getVirtualHost().getMessageStore(); - BDBMessageStore bdbStore = assertBDBStore(log); - String storeLocation = bdbStore.getStoreLocation(); - - File location = new File(storeLocation); - assertTrue("Store does not exist at " + storeLocation, location.exists()); - - bdbStore.closeMessageStore(); - assertTrue("Store does not exist at " + storeLocation, location.exists()); - - bdbStore.onDelete(); - assertFalse("Store exists at " + storeLocation, location.exists()); - } - - /** - * Tests transaction rollback after a commit has occurred by utilising the - * enqueue and dequeue methods available in the TransactionLog interface - * implemented by the store, and verifying the behaviour using BDB - * implementation methods. - */ - public void testTranRollbackAfterCommit() throws Exception - { - MessageStore log = getVirtualHost().getMessageStore(); - - BDBMessageStore bdbStore = assertBDBStore(log); - - final UUID mockQueueId = UUIDGenerator.generateRandomUUID(); - TransactionLogResource mockQueue = new TransactionLogResource() - { - @Override - public String getName() - { - return getId().toString(); - } - - @Override - public UUID getId() - { - return mockQueueId; - } - - @Override - public boolean isDurable() - { - return true; - } - }; - - Transaction txn = log.newTransaction(); - - txn.enqueueMessage(mockQueue, new MockMessage(30L)); - txn.commitTran(); - - txn = log.newTransaction(); - txn.enqueueMessage(mockQueue, new MockMessage(31L)); - txn.abortTran(); - - txn = log.newTransaction(); - txn.enqueueMessage(mockQueue, new MockMessage(32L)); - txn.commitTran(); - - List enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueId); - - assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size()); - Long val = enqueuedIds.get(0); - assertEquals("First Message is incorrect", 30L, val.longValue()); - val = enqueuedIds.get(1); - assertEquals("Second Message is incorrect", 32L, val.longValue()); - } - - @SuppressWarnings("rawtypes") - private static class MockMessage implements ServerMessage, EnqueueableMessage - { - private long _messageId; - - public MockMessage(long messageId) - { - _messageId = messageId; - } - - public String getInitialRoutingAddress() - { - return null; - } - - public AMQMessageHeader getMessageHeader() - { - return null; - } - - public StoredMessage getStoredMessage() - { - return null; - } - - public boolean isPersistent() - { - return true; - } - - public long getSize() - { - return 0; - } - - public boolean isImmediate() - { - return false; - } - - public long getExpiration() - { - return 0; - } - - public MessageReference newReference() - { - return null; - } - - public long getMessageNumber() - { - return _messageId; - } - - public long getArrivalTime() - { - return 0; - } - - public int getContent(ByteBuffer buf, int offset) - { - return 0; - } - - public ByteBuffer getContent(int offset, int length) - { - return null; - } - - @Override - public Object getConnectionReference() - { - return null; - } - } -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/BrokerStoreUpgrader.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/BrokerStoreUpgrader.java index 1446cca156..b7b672fd58 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/BrokerStoreUpgrader.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/BrokerStoreUpgrader.java @@ -35,12 +35,12 @@ import org.apache.qpid.server.configuration.store.StoreConfigurationChangeListen import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Model; import org.apache.qpid.server.model.SystemContext; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.ConfiguredObjectRecordImpl; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.DurableConfigurationStoreUpgrader; import org.apache.qpid.server.store.NonNullUpgrader; +import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; public class BrokerStoreUpgrader { @@ -583,17 +583,17 @@ public class BrokerStoreUpgrader public Broker upgrade(DurableConfigurationStore store) { - final BrokerStoreRecoveryHandler recoveryHandler = new BrokerStoreRecoveryHandler(_systemContext); + final BrokerStoreRecoveryHandler recoveryHandler = new BrokerStoreRecoveryHandler(_systemContext, store); store.openConfigurationStore(_systemContext, Collections.emptyMap()); - store.recoverConfigurationStore(recoveryHandler); + store.visitConfiguredObjectRecords(recoveryHandler); return recoveryHandler.getBroker(); } - private static class BrokerStoreRecoveryHandler implements ConfigurationRecoveryHandler + private static class BrokerStoreRecoveryHandler implements ConfiguredObjectRecordHandler { - private static Logger LOGGER = Logger.getLogger(ConfigurationRecoveryHandler.class); + private static Logger LOGGER = Logger.getLogger(BrokerStoreRecoveryHandler.class); private DurableConfigurationStoreUpgrader _upgrader; private DurableConfigurationStore _store; @@ -601,27 +601,28 @@ public class BrokerStoreUpgrader private int _version; private final SystemContext _systemContext; - private BrokerStoreRecoveryHandler(final SystemContext systemContext) + private BrokerStoreRecoveryHandler(final SystemContext systemContext, DurableConfigurationStore store) { _systemContext = systemContext; + _store = store; } @Override - public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion) + public void begin(final int configVersion) { - _store = store; _version = configVersion; } @Override - public void configuredObject(final ConfiguredObjectRecord object) + public boolean handle(final ConfiguredObjectRecord object) { _records.put(object.getId(), object); + return true; } @Override - public int completeConfigurationRecovery() + public int end() { String version = getCurrentVersion(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/JsonConfigurationEntryStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/JsonConfigurationEntryStore.java index 7024068099..59f248c9f5 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/JsonConfigurationEntryStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/JsonConfigurationEntryStore.java @@ -20,14 +20,6 @@ */ package org.apache.qpid.server.configuration.store; -import org.apache.qpid.server.configuration.ConfigurationEntry; -import org.apache.qpid.server.configuration.IllegalConfigurationException; -import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler; -import org.apache.qpid.server.store.ConfiguredObjectRecord; -import org.apache.qpid.server.store.DurableConfigurationStore; -import org.apache.qpid.server.store.StoreException; - import java.io.File; import java.util.ArrayList; import java.util.Collection; @@ -35,6 +27,14 @@ import java.util.Collections; import java.util.Map; import java.util.UUID; +import org.apache.qpid.server.configuration.ConfigurationEntry; +import org.apache.qpid.server.configuration.IllegalConfigurationException; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.store.ConfiguredObjectRecord; +import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.store.StoreException; +import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; + public class JsonConfigurationEntryStore extends MemoryConfigurationEntryStore { public static final String STORE_TYPE = "json"; @@ -124,30 +124,31 @@ public class JsonConfigurationEntryStore extends MemoryConfigurationEntryStore else { final Collection records = new ArrayList(); - final ConfigurationRecoveryHandler replayHandler = new ConfigurationRecoveryHandler() + final ConfiguredObjectRecordHandler replayHandler = new ConfiguredObjectRecordHandler() { private int _configVersion; @Override - public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion) + public void begin(final int configVersion) { _configVersion = configVersion; } @Override - public void configuredObject(ConfiguredObjectRecord record) + public boolean handle(ConfiguredObjectRecord record) { records.add(record); + return true; } @Override - public int completeConfigurationRecovery() + public int end() { return _configVersion; } }; initialStore.openConfigurationStore(_parentObject, Collections.emptyMap()); - initialStore.recoverConfigurationStore(replayHandler); + initialStore.visitConfiguredObjectRecords(replayHandler); update(true, records.toArray(new ConfiguredObjectRecord[records.size()])); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandler.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandler.java index 21fffea80f..cdf44822ef 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandler.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandler.java @@ -38,11 +38,11 @@ import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.ConfiguredObjectRecordImpl; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.StoreException; +import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; import org.apache.qpid.server.util.MapValueConverter; public class ManagementModeStoreHandler implements DurableConfigurationStore @@ -80,20 +80,21 @@ public class ManagementModeStoreHandler implements DurableConfigurationStore _records = new HashMap(); - final ConfigurationRecoveryHandler localRecoveryHandler = new ConfigurationRecoveryHandler() + final ConfiguredObjectRecordHandler localRecoveryHandler = new ConfiguredObjectRecordHandler() { private int _version; private boolean _quiesceRmiPort = _options.getManagementModeRmiPortOverride() > 0; private boolean _quiesceJmxPort = _options.getManagementModeJmxPortOverride() > 0; private boolean _quiesceHttpPort = _options.getManagementModeHttpPortOverride() > 0; + @Override - public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion) + public void begin(final int configVersion) { _version = configVersion; } @Override - public void configuredObject(final ConfiguredObjectRecord object) + public boolean handle(final ConfiguredObjectRecord object) { String entryType = object.getType(); Map attributes = object.getAttributes(); @@ -153,11 +154,12 @@ public class ManagementModeStoreHandler implements DurableConfigurationStore { _records.put(object.getId(), object); } + return true; } @Override - public int completeConfigurationRecovery() + public int end() { return _version; } @@ -166,7 +168,7 @@ public class ManagementModeStoreHandler implements DurableConfigurationStore - _store.recoverConfigurationStore(localRecoveryHandler); + _store.visitConfiguredObjectRecords(localRecoveryHandler); _cliEntries = createPortsFromCommandLineOptions(_options); @@ -179,17 +181,20 @@ public class ManagementModeStoreHandler implements DurableConfigurationStore } @Override - public void recoverConfigurationStore(final ConfigurationRecoveryHandler recoveryHandler) throws StoreException + public void visitConfiguredObjectRecords(final ConfiguredObjectRecordHandler recoveryHandler) throws StoreException { - recoveryHandler.beginConfigurationRecovery(this,0); + recoveryHandler.begin(0); for(ConfiguredObjectRecord record : _records.values()) { - recoveryHandler.configuredObject(record); + if(!recoveryHandler.handle(record)) + { + break; + } } - recoveryHandler.completeConfigurationRecovery(); + recoveryHandler.end(); } @@ -357,16 +362,16 @@ public class ManagementModeStoreHandler implements DurableConfigurationStore final int managementModeJmxPortOverride = options.getManagementModeJmxPortOverride(); final int managementModeHttpPortOverride = options.getManagementModeHttpPortOverride(); - _store.recoverConfigurationStore(new ConfigurationRecoveryHandler() + _store.visitConfiguredObjectRecords(new ConfiguredObjectRecordHandler() { @Override - public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion) + public void begin(final int configVersion) { } @Override - public void configuredObject(final ConfiguredObjectRecord entry) + public boolean handle(final ConfiguredObjectRecord entry) { String entryType = entry.getType(); Map attributes = entry.getAttributes(); @@ -417,11 +422,12 @@ public class ManagementModeStoreHandler implements DurableConfigurationStore // save original state quiescedEntries.put(entry.getId(), attributes.get(ATTRIBUTE_STATE)); } + return true; } @Override - public int completeConfigurationRecovery() + public int end() { return 0; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java index b4f095b51e..d534814410 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java @@ -58,10 +58,9 @@ import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Model; import org.apache.qpid.server.model.SystemContext; import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler; import org.apache.qpid.server.store.ConfiguredObjectRecord; -import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.StoreException; +import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; import org.apache.qpid.util.Strings; import org.apache.qpid.util.Strings.ChainedResolver; @@ -128,30 +127,31 @@ public class MemoryConfigurationEntryStore implements ConfigurationEntryStore _storeLocation = initialStore.getStoreLocation(); } final Collection records = new ArrayList(); - final ConfigurationRecoveryHandler replayHandler = new ConfigurationRecoveryHandler() + final ConfiguredObjectRecordHandler replayHandler = new ConfiguredObjectRecordHandler() { private int _configVersion; @Override - public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion) + public void begin(final int configVersion) { _configVersion = configVersion; } @Override - public void configuredObject(ConfiguredObjectRecord record) + public boolean handle(ConfiguredObjectRecord record) { records.add(record); + return true; } @Override - public int completeConfigurationRecovery() + public int end() { return _configVersion; } }; initialStore.openConfigurationStore(parentObject, Collections.emptyMap()); - initialStore.recoverConfigurationStore(replayHandler); + initialStore.visitConfiguredObjectRecords(replayHandler); update(true, records.toArray(new ConfiguredObjectRecord[records.size()])); @@ -365,10 +365,10 @@ public class MemoryConfigurationEntryStore implements ConfigurationEntryStore } @Override - public void recoverConfigurationStore(final ConfigurationRecoveryHandler recoveryHandler) throws StoreException + public void visitConfiguredObjectRecords(final ConfiguredObjectRecordHandler recoveryHandler) throws StoreException { - recoveryHandler.beginConfigurationRecovery(this,0); + recoveryHandler.begin(0); final Map> parentMap = new HashMap>(); @@ -435,9 +435,12 @@ public class MemoryConfigurationEntryStore implements ConfigurationEntryStore } for(ConfiguredObjectRecord record : records.values()) { - recoveryHandler.configuredObject(record); + if(!recoveryHandler.handle(record)) + { + break; + } } - recoveryHandler.completeConfigurationRecovery(); + recoveryHandler.end(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index e7b6adaf7a..6be5460d5f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -49,7 +49,10 @@ import org.apache.log4j.Logger; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.plugin.MessageMetaDataType; -import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; +import org.apache.qpid.server.store.handler.DistributedTransactionHandler; +import org.apache.qpid.server.store.handler.MessageHandler; +import org.apache.qpid.server.store.handler.MessageInstanceHandler; import org.codehaus.jackson.JsonGenerationException; import org.codehaus.jackson.JsonGenerator; import org.codehaus.jackson.JsonParseException; @@ -221,19 +224,125 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } @Override - public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler) + public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) { checkConfigurationStoreOpen(); try { - recoveryHandler.beginConfigurationRecovery(this, getConfigVersion()); - loadConfiguredObjects(recoveryHandler); - setConfigVersion(recoveryHandler.completeConfigurationRecovery()); + int configVersion = getConfigVersion(); + + handler.begin(configVersion); + doVisitAllConfiguredObjectRecords(handler); + + int newConfigVersion = handler.end(); + if(newConfigVersion != configVersion) + { + setConfigVersion(newConfigVersion); + } } catch (SQLException e) { - throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); + throw new StoreException("Cannot visit configured object records", e); + } + + } + + private void doVisitAllConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws SQLException + { + Connection conn = newAutoCommitConnection(); + Map configuredObjects = new HashMap(); + final ObjectMapper objectMapper = new ObjectMapper(); + try + { + PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS); + try + { + ResultSet rs = stmt.executeQuery(); + try + { + while (rs.next()) + { + String id = rs.getString(1); + String objectType = rs.getString(2); + String attributes = getBlobAsString(rs, 3); + final ConfiguredObjectRecordImpl configuredObjectRecord = + new ConfiguredObjectRecordImpl(UUID.fromString(id), objectType, + objectMapper.readValue(attributes, Map.class)); + configuredObjects.put(configuredObjectRecord.getId(),configuredObjectRecord); + + } + } + catch (JsonMappingException e) + { + throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); + } + catch (JsonParseException e) + { + throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); + } + catch (IOException e) + { + throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); + } + finally + { + rs.close(); + } + } + finally + { + stmt.close(); + } + stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECT_HIERARCHY); + try + { + ResultSet rs = stmt.executeQuery(); + try + { + while (rs.next()) + { + UUID childId = UUID.fromString(rs.getString(1)); + String parentType = rs.getString(2); + UUID parentId = UUID.fromString(rs.getString(3)); + + ConfiguredObjectRecordImpl child = configuredObjects.get(childId); + ConfiguredObjectRecordImpl parent = configuredObjects.get(parentId); + + if(child != null && parent != null) + { + child.addParent(parentType, parent); + } + else if(child != null && child.getType().endsWith("Binding") && parentType.equals("Exchange")) + { + // TODO - remove this hack for amq. exchanges + child.addParent(parentType, new ConfiguredObjectRecordImpl(parentId, parentType, Collections.emptyMap())); + } + } + } + finally + { + rs.close(); + } + } + finally + { + stmt.close(); + } + + } + finally + { + conn.close(); + } + + for(ConfiguredObjectRecord record : configuredObjects.values()) + { + boolean shoudlContinue = handler.handle(record); + if (!shoudlContinue) + { + break; + } } } @@ -282,44 +391,25 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC { createOrOpenMessageStoreDatabase(); upgradeIfNecessary(parent); - } - catch (SQLException e) - { - throw new StoreException("Unable to activate message store ", e); - } - } - } - @Override - public void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler) - { - checkMessageStoreOpen(); - - if(messageRecoveryHandler != null) - { - try - { - recoverMessages(messageRecoveryHandler); - } - catch (SQLException e) - { - throw new StoreException("Error encountered when restoring message data from " + - "persistent store ", e); - } - } - if(transactionLogRecoveryHandler != null) - { - try - { - TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = recoverQueueEntries(transactionLogRecoveryHandler); - recoverXids(dtxrh); + visitMessages(new MessageHandler() + { + @Override + public boolean handle(StoredMessage storedMessage) + { + long id = storedMessage.getMessageNumber(); + if (_messageId.get() < id) + { + _messageId.set(id); + } + return true; + } + }); } catch (SQLException e) { - throw new StoreException("Error encountered when restoring distributed transaction " + - "data from persistent store ", e); + throw new StoreException("Unable to activate message store ", e); } - } } @@ -1043,11 +1133,9 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC getLogger().debug("Enqueuing message " + messageId + " on queue " - + (queue instanceof AMQQueue - ? ((AMQQueue) queue).getName() - : "") - + queue.getId() - + "[Connection" + + queue.getName() + + " with id " + queue.getId() + + " [Connection" + conn + "]"); } @@ -1068,7 +1156,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC catch (SQLException e) { getLogger().error("Failed to enqueue: " + e.getMessage(), e); - throw new StoreException("Error writing enqueued message with id " + messageId + " for queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" ) + " with id " + queue.getId() + throw new StoreException("Error writing enqueued message with id " + messageId + " for queue " + queue.getName() + " with id " + queue.getId() + " to database", e); } @@ -1093,15 +1181,13 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC if(results != 1) { - throw new StoreException("Unable to find message with id " + messageId + " on queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" ) + throw new StoreException("Unable to find message with id " + messageId + " on queue " + queue.getName() + " with id " + queue.getId()); } if (getLogger().isDebugEnabled()) { - getLogger().debug("Dequeuing message " + messageId + " on queue " + (queue instanceof AMQQueue - ? ((AMQQueue) queue).getName() - : "") + getLogger().debug("Dequeuing message " + messageId + " on queue " + queue.getName() + " with id " + queue.getId()); } } @@ -1114,7 +1200,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC catch (SQLException e) { getLogger().error("Failed to dequeue: " + e.getMessage(), e); - throw new StoreException("Error deleting enqueued message with id " + messageId + " for queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" ) + throw new StoreException("Error deleting enqueued message with id " + messageId + " for queue " + queue.getName() + " with id " + queue.getId() + " from database", e); } @@ -1363,131 +1449,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } - private void recoverMessages(MessageStoreRecoveryHandler recoveryHandler) throws SQLException - { - Connection conn = newAutoCommitConnection(); - try - { - MessageStoreRecoveryHandler.StoredMessageRecoveryHandler messageHandler = recoveryHandler.begin(); - - Statement stmt = conn.createStatement(); - try - { - ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_META_DATA); - try - { - - long maxId = 0; - - while(rs.next()) - { - - long messageId = rs.getLong(1); - if(messageId > maxId) - { - maxId = messageId; - } - - byte[] dataAsBytes = getBlobAsBytes(rs, 2); - - ByteBuffer buf = ByteBuffer.wrap(dataAsBytes); - buf.position(1); - buf = buf.slice(); - MessageMetaDataType type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]); - StorableMessageMetaData metaData = type.createMetaData(buf); - StoredJDBCMessage message = new StoredJDBCMessage(messageId, metaData, true); - messageHandler.message(message); - } - - _messageId.set(maxId); - - messageHandler.completeMessageRecovery(); - } - finally - { - rs.close(); - } - } - finally - { - stmt.close(); - } - } - finally - { - conn.close(); - } - } - - - private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) throws SQLException - { - Connection conn = newAutoCommitConnection(); - try - { - TransactionLogRecoveryHandler.QueueEntryRecoveryHandler queueEntryHandler = recoveryHandler.begin(this); - - Statement stmt = conn.createStatement(); - try - { - ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY); - try - { - while(rs.next()) - { - - String id = rs.getString(1); - long messageId = rs.getLong(2); - queueEntryHandler.queueEntry(UUID.fromString(id), messageId); - } - } - finally - { - rs.close(); - } - } - finally - { - stmt.close(); - } - - return queueEntryHandler.completeQueueEntryRecovery(); - } - finally - { - conn.close(); - } - } - - private static final class Xid - { - - private final long _format; - private final byte[] _globalId; - private final byte[] _branchId; - - public Xid(long format, byte[] globalId, byte[] branchId) - { - _format = format; - _globalId = globalId; - _branchId = branchId; - } - - public long getFormat() - { - return _format; - } - - public byte[] getGlobalId() - { - return _globalId; - } - - public byte[] getBranchId() - { - return _branchId; - } - } private static class RecordImpl implements Transaction.Record, TransactionLogResource, EnqueueableMessage { @@ -1550,93 +1511,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } } - private void recoverXids(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh) throws SQLException - { - Connection conn = newAutoCommitConnection(); - try - { - List xids = new ArrayList(); - - Statement stmt = conn.createStatement(); - try - { - ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_XIDS); - try - { - while(rs.next()) - { - - long format = rs.getLong(1); - byte[] globalId = rs.getBytes(2); - byte[] branchId = rs.getBytes(3); - xids.add(new Xid(format, globalId, branchId)); - } - } - finally - { - rs.close(); - } - } - finally - { - stmt.close(); - } - - - - for(Xid xid : xids) - { - List enqueues = new ArrayList(); - List dequeues = new ArrayList(); - - PreparedStatement pstmt = conn.prepareStatement(SELECT_ALL_FROM_XID_ACTIONS); - - try - { - pstmt.setLong(1, xid.getFormat()); - pstmt.setBytes(2, xid.getGlobalId()); - pstmt.setBytes(3, xid.getBranchId()); - - ResultSet rs = pstmt.executeQuery(); - try - { - while(rs.next()) - { - - String actionType = rs.getString(1); - UUID queueId = UUID.fromString(rs.getString(2)); - long messageId = rs.getLong(3); - - RecordImpl record = new RecordImpl(queueId, messageId); - List records = "E".equals(actionType) ? enqueues : dequeues; - records.add(record); - } - } - finally - { - rs.close(); - } - } - finally - { - pstmt.close(); - } - - dtxrh.dtxRecord(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(), - enqueues.toArray(new RecordImpl[enqueues.size()]), - dequeues.toArray(new RecordImpl[dequeues.size()])); - } - - - dtxrh.completeDtxRecordRecovery(); - } - finally - { - conn.close(); - } - - } - private StorableMessageMetaData getMetaData(long messageId) throws SQLException { @@ -2357,43 +2231,81 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } } - private void loadConfiguredObjects(ConfigurationRecoveryHandler recoveryHandler) throws SQLException, - StoreException + @Override + public void visitMessages(MessageHandler handler) throws StoreException { - Connection conn = newAutoCommitConnection(); - Map configuredObjects = new HashMap(); - final ObjectMapper objectMapper = new ObjectMapper(); + checkMessageStoreOpen(); + + Connection conn = null; try { - PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS); + conn = newAutoCommitConnection(); + Statement stmt = conn.createStatement(); try { - ResultSet rs = stmt.executeQuery(); + ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_META_DATA); try { while (rs.next()) { - String id = rs.getString(1); - String objectType = rs.getString(2); - String attributes = getBlobAsString(rs, 3); - final ConfiguredObjectRecordImpl configuredObjectRecord = - new ConfiguredObjectRecordImpl(UUID.fromString(id), objectType, - objectMapper.readValue(attributes, Map.class)); - configuredObjects.put(configuredObjectRecord.getId(),configuredObjectRecord); - + long messageId = rs.getLong(1); + byte[] dataAsBytes = getBlobAsBytes(rs, 2); + ByteBuffer buf = ByteBuffer.wrap(dataAsBytes); + buf.position(1); + buf = buf.slice(); + MessageMetaDataType type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]); + StorableMessageMetaData metaData = type.createMetaData(buf); + StoredJDBCMessage message = new StoredJDBCMessage(messageId, metaData, true); + if (!handler.handle(message)) + { + break; + } } } - catch (JsonMappingException e) - { - throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); - } - catch (JsonParseException e) + finally { - throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); + rs.close(); } - catch (IOException e) + } + finally + { + stmt.close(); + } + } + catch (SQLException e) + { + throw new StoreException("Error encountered when visiting messages", e); + } + finally + { + closeConnection(conn); + } + } + + @Override + public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException + { + checkMessageStoreOpen(); + + Connection conn = null; + try + { + conn = newAutoCommitConnection(); + Statement stmt = conn.createStatement(); + try + { + ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY); + try { - throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); + while(rs.next()) + { + String id = rs.getString(1); + long messageId = rs.getLong(2); + if (!handler.handle(UUID.fromString(id), messageId)) + { + break; + } + } } finally { @@ -2404,31 +2316,41 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC { stmt.close(); } - stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECT_HIERARCHY); + } + catch(SQLException e) + { + throw new StoreException("Error encountered when visiting message instances", e); + } + finally + { + closeConnection(conn); + } + } + + @Override + public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException + { + checkMessageStoreOpen(); + + Connection conn = null; + try + { + conn = newAutoCommitConnection(); + List xids = new ArrayList(); + + Statement stmt = conn.createStatement(); try { - ResultSet rs = stmt.executeQuery(); + ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_XIDS); try { - while (rs.next()) + while(rs.next()) { - UUID childId = UUID.fromString(rs.getString(1)); - String parentType = rs.getString(2); - UUID parentId = UUID.fromString(rs.getString(3)); - - ConfiguredObjectRecordImpl child = configuredObjects.get(childId); - ConfiguredObjectRecordImpl parent = configuredObjects.get(parentId); - - if(child != null && parent != null) - { - child.addParent(parentType, parent); - } - else if(child != null && child.getType().endsWith("Binding") && parentType.equals("Exchange")) - { - // TODO - remove this hack for amq. exchanges - child.addParent(parentType, new ConfiguredObjectRecordImpl(parentId, parentType, Collections.emptyMap())); - } + long format = rs.getLong(1); + byte[] globalId = rs.getBytes(2); + byte[] branchId = rs.getBytes(3); + xids.add(new Xid(format, globalId, branchId)); } } finally @@ -2441,18 +2363,67 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC stmt.close(); } + + + for(Xid xid : xids) + { + List enqueues = new ArrayList(); + List dequeues = new ArrayList(); + + PreparedStatement pstmt = conn.prepareStatement(SELECT_ALL_FROM_XID_ACTIONS); + + try + { + pstmt.setLong(1, xid.getFormat()); + pstmt.setBytes(2, xid.getGlobalId()); + pstmt.setBytes(3, xid.getBranchId()); + + ResultSet rs = pstmt.executeQuery(); + try + { + while(rs.next()) + { + + String actionType = rs.getString(1); + UUID queueId = UUID.fromString(rs.getString(2)); + long messageId = rs.getLong(3); + + RecordImpl record = new RecordImpl(queueId, messageId); + List records = "E".equals(actionType) ? enqueues : dequeues; + records.add(record); + } + } + finally + { + rs.close(); + } + } + finally + { + pstmt.close(); + } + + if (!handler.handle(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(), + enqueues.toArray(new RecordImpl[enqueues.size()]), + dequeues.toArray(new RecordImpl[dequeues.size()]))) + { + break; + } + } + } - finally + catch (SQLException e) { - conn.close(); - } + throw new StoreException("Error encountered when visiting distributed transactions", e); - for(ConfiguredObjectRecord record : configuredObjects.values()) + } + finally { - recoveryHandler.configuredObject(record); + closeConnection(conn); } } + protected abstract String getBlobAsString(ResultSet rs, int col) throws SQLException; protected abstract void storedSizeChange(int storeSizeIncrease); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java index a7e9ef2ab6..99785c48a9 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java @@ -20,17 +20,36 @@ */ package org.apache.qpid.server.store; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.qpid.server.message.EnqueueableMessage; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.store.Transaction.Record; +import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; +import org.apache.qpid.server.store.handler.DistributedTransactionHandler; +import org.apache.qpid.server.store.handler.MessageHandler; +import org.apache.qpid.server.store.handler.MessageInstanceHandler; /** A simple message store that stores the messages in a thread-safe structure in memory. */ -abstract public class AbstractMemoryMessageStore extends NullMessageStore +abstract class AbstractMemoryMessageStore implements MessageStore, DurableConfigurationStore { - private final AtomicLong _messageId = new AtomicLong(1); - - private static final Transaction IN_MEMORY_TRANSACTION = new Transaction() + private final class MemoryMessageStoreTransaction implements Transaction { + private Map> _localEnqueueMap = new HashMap>(); + private Map> _localDequeueMap = new HashMap>(); + + private Map _localDistributedTransactionsRecords = new HashMap(); + private Set _localDistributedTransactionsRemoves = new HashSet(); + @Override public StoreFuture commitTranAsync() { @@ -40,50 +59,145 @@ abstract public class AbstractMemoryMessageStore extends NullMessageStore @Override public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) { + Set messageIds = _localEnqueueMap.get(queue.getId()); + if (messageIds == null) + { + messageIds = new HashSet(); + _localEnqueueMap.put(queue.getId(), messageIds); + } + messageIds.add(message.getMessageNumber()); } @Override - public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message) + public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message) { + Set messageIds = _localDequeueMap.get(queue.getId()); + if (messageIds == null) + { + messageIds = new HashSet(); + _localDequeueMap.put(queue.getId(), messageIds); + } + messageIds.add(message.getMessageNumber()); } @Override public void commitTran() { + commitTransactionInternal(this); + _localEnqueueMap.clear(); + _localDequeueMap.clear(); } @Override public void abortTran() { + _localEnqueueMap.clear(); + _localDequeueMap.clear(); } @Override public void removeXid(long format, byte[] globalId, byte[] branchId) { + _localDistributedTransactionsRemoves.add(new Xid(format, globalId, branchId)); } @Override public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues) { + _localDistributedTransactionsRecords.put(new Xid(format, globalId, branchId), new DistributedTransactionRecords(enqueues, dequeues)); } - }; + } + + private final AtomicLong _messageId = new AtomicLong(1); - private final EventManager _eventManager = new EventManager(); + private final ConcurrentHashMap _configuredObjectRecords = new ConcurrentHashMap(); + protected ConcurrentHashMap _messages = new ConcurrentHashMap(); + private Object _transactionLock = new Object(); + private Map> _messageInstances = new HashMap>(); + private Map _distributedTransactions = new HashMap(); + + @SuppressWarnings("unchecked") @Override - public StoredMessage addMessage(StorableMessageMetaData metaData) + public StoredMessage addMessage(final StorableMessageMetaData metaData) { - final long id = _messageId.getAndIncrement(); - StoredMemoryMessage message = new StoredMemoryMessage(id, metaData); + long id = _messageId.getAndIncrement(); + + if(metaData.isPersistent()) + { + return new StoredMemoryMessage(id, metaData) + { + + @Override + public StoreFuture flushToStore() + { + _messages.putIfAbsent(getMessageNumber(), this) ; + return super.flushToStore(); + } + + @Override + public void remove() + { + _messages.remove(getMessageNumber()); + super.remove(); + } + + }; + } + else + { + return new StoredMemoryMessage(id, metaData); + } + } + + private void commitTransactionInternal(MemoryMessageStoreTransaction transaction) + { + synchronized (_transactionLock ) + { + for (Map.Entry> loacalEnqueuedEntry : transaction._localEnqueueMap.entrySet()) + { + Set messageIds = _messageInstances.get(loacalEnqueuedEntry.getKey()); + if (messageIds == null) + { + messageIds = new HashSet(); + _messageInstances.put(loacalEnqueuedEntry.getKey(), messageIds); + } + messageIds.addAll(loacalEnqueuedEntry.getValue()); + } + + for (Map.Entry> loacalDequeueEntry : transaction._localDequeueMap.entrySet()) + { + Set messageIds = _messageInstances.get(loacalDequeueEntry.getKey()); + if (messageIds != null) + { + messageIds.removeAll(loacalDequeueEntry.getValue()); + if (messageIds.isEmpty()) + { + _messageInstances.remove(loacalDequeueEntry.getKey()); + } + } + } + + for (Map.Entry entry : transaction._localDistributedTransactionsRecords.entrySet()) + { + _distributedTransactions.put(entry.getKey(), entry.getValue()); + } + + for (Xid removed : transaction._localDistributedTransactionsRemoves) + { + _distributedTransactions.remove(removed); + } + + } + - return message; } @Override public Transaction newTransaction() { - return IN_MEMORY_TRANSACTION; + return new MemoryMessageStoreTransaction(); } @Override @@ -95,7 +209,164 @@ abstract public class AbstractMemoryMessageStore extends NullMessageStore @Override public void addEventListener(EventListener eventListener, Event... events) { - _eventManager.addEventListener(eventListener, events); } + @Override + public void create(ConfiguredObjectRecord record) + { + if (_configuredObjectRecords.putIfAbsent(record.getId(), record) != null) + { + throw new StoreException("Record with id " + record.getId() + " is already present"); + } + } + + @Override + public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) + { + for (ConfiguredObjectRecord record : records) + { + ConfiguredObjectRecord previousValue = _configuredObjectRecords.replace(record.getId(), record); + if (previousValue == null && !createIfNecessary) + { + throw new StoreException("Record with id " + record.getId() + " does not exist"); + } + } + } + + @Override + public UUID[] remove(final ConfiguredObjectRecord... objects) + { + List removed = new ArrayList(); + for (ConfiguredObjectRecord record : objects) + { + if (_configuredObjectRecords.remove(record.getId()) != null) + { + removed.add(record.getId()); + } + } + return removed.toArray(new UUID[removed.size()]); + } + + @Override + public void closeConfigurationStore() + { + _configuredObjectRecords.clear(); + } + + @Override + public void openConfigurationStore(ConfiguredObject parent, Map storeSettings) + { + } + + @Override + public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws StoreException + { + handler.begin(VirtualHost.CURRENT_CONFIG_VERSION); + for (ConfiguredObjectRecord record : _configuredObjectRecords.values()) + { + if (!handler.handle(record)) + { + break; + } + } + handler.end(); + } + + @Override + public void openMessageStore(ConfiguredObject parent, Map messageStoreSettings) + { + } + + @Override + public void closeMessageStore() + { + _messages.clear(); + synchronized (_transactionLock) + { + _messageInstances.clear(); + _distributedTransactions.clear(); + } + } + + @Override + public String getStoreLocation() + { + return null; + } + + @Override + public void onDelete() + { + } + + @Override + public void visitMessages(MessageHandler handler) throws StoreException + { + for (StoredMemoryMessage message : _messages.values()) + { + if(!handler.handle(message)) + { + break; + } + } + } + + @Override + public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException + { + synchronized (_transactionLock) + { + for (Map.Entry> enqueuedEntry : _messageInstances.entrySet()) + { + UUID resourceId = enqueuedEntry.getKey(); + for (Long messageId : enqueuedEntry.getValue()) + { + if (!handler.handle(resourceId, messageId)) + { + return; + } + } + } + } + } + + @Override + public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException + { + synchronized (_transactionLock) + { + for (Map.Entry entry : _distributedTransactions.entrySet()) + { + Xid xid = entry.getKey(); + DistributedTransactionRecords records = entry.getValue(); + if (!handler.handle(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(), records.getEnqueues(), records.getDequeues())) + { + break; + } + } + } + } + + private static final class DistributedTransactionRecords + { + private Record[] _enqueues; + private Record[] _dequeues; + + public DistributedTransactionRecords(Record[] enqueues, Record[] dequeues) + { + super(); + _enqueues = enqueues; + _dequeues = dequeues; + } + + public Record[] getEnqueues() + { + return _enqueues; + } + + public Record[] getDequeues() + { + return _dequeues; + } + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecordRecoveverAndUpgrader.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecordRecoveverAndUpgrader.java new file mode 100644 index 0000000000..85265d986e --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecordRecoveverAndUpgrader.java @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.util.Map; + +import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; +import org.apache.qpid.server.virtualhost.DefaultUpgraderProvider; +import org.apache.qpid.server.virtualhost.VirtualHost; + +public class ConfiguredObjectRecordRecoveverAndUpgrader implements ConfiguredObjectRecordHandler +{ + private DurableConfigurationRecoverer _configRecoverer; + private DurableConfigurationStore _store; + + public ConfiguredObjectRecordRecoveverAndUpgrader(VirtualHost virtualHost, Map recoverers) + { + DefaultUpgraderProvider upgraderProvider = new DefaultUpgraderProvider(virtualHost); + _configRecoverer = new DurableConfigurationRecoverer(virtualHost.getName(), recoverers, upgraderProvider, virtualHost.getEventLogger()); + _store = virtualHost.getDurableConfigurationStore(); + } + + @Override + public void begin(int configVersion) + { + _configRecoverer.beginConfigurationRecovery(_store, configVersion); + } + + @Override + public boolean handle(ConfiguredObjectRecord record) + { + _configRecoverer.configuredObject(record); + return true; + } + + @Override + public int end() + { + return _configRecoverer.completeConfigurationRecovery(); + } + +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java index 9f610b0199..7d93f18906 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java @@ -20,11 +20,12 @@ */ package org.apache.qpid.server.store; -import org.apache.qpid.server.model.ConfiguredObject; - import java.util.Map; import java.util.UUID; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; + public interface DurableConfigurationStore { String STORE_TYPE = "storeType"; @@ -46,12 +47,6 @@ public interface DurableConfigurationStore */ void openConfigurationStore(ConfiguredObject parent, Map storeSettings) throws StoreException; - /** - * Recovers configuration from the store using given recovery handler - * @param recoveryHandler recovery handler - */ - void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler) throws StoreException; - /** * Makes the specified object persistent. * @@ -85,4 +80,11 @@ public interface DurableConfigurationStore void closeConfigurationStore() throws StoreException; + /** + * Visit all configured object records with given handler. + * + * @param handler a handler to invoke on each configured object record + * @throws StoreException + */ + void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws StoreException; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java index 819da86ca0..a5ace16cfa 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java @@ -31,6 +31,7 @@ import java.util.*; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Model; import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; import org.codehaus.jackson.JsonGenerator; import org.codehaus.jackson.JsonParseException; import org.codehaus.jackson.JsonProcessingException; @@ -97,22 +98,27 @@ public class JsonFileConfigStore implements DurableConfigurationStore } @Override - public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler) + public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) { - recoveryHandler.beginConfigurationRecovery(this,_configVersion); + handler.begin(_configVersion); List records = new ArrayList(_objectsById.values()); for(ConfiguredObjectRecord record : records) { - recoveryHandler.configuredObject(record); + boolean shouldContinue = handler.handle(record); + if (!shouldContinue) + { + break; + } } int oldConfigVersion = _configVersion; - _configVersion = recoveryHandler.completeConfigurationRecovery(); + _configVersion = handler.end(); if(oldConfigVersion != _configVersion) { save(); } } + private void setup(final Map configurationStoreSettings) { Object storePathAttr = configurationStoreSettings.get(DurableConfigurationStore.STORE_PATH); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java index 69f9073f6e..433f618d1a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java @@ -23,6 +23,9 @@ package org.apache.qpid.server.store; import java.util.Map; import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.store.handler.DistributedTransactionHandler; +import org.apache.qpid.server.store.handler.MessageHandler; +import org.apache.qpid.server.store.handler.MessageInstanceHandler; /** * MessageStore defines the interface to a storage area, which can be used to preserve the state of messages. @@ -43,13 +46,6 @@ public interface MessageStore */ void openMessageStore(ConfiguredObject parent, Map messageStoreSettings); - /** - * Called after opening to recover messages and transactions with given recovery handlers - * @param messageRecoveryHandler - * @param transactionLogRecoveryHandler - */ - void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler); - public StoredMessage addMessage(T metaData); @@ -71,8 +67,10 @@ public interface MessageStore String getStoreLocation(); - // TODO dead method - remove?? - String getStoreType(); - void onDelete(); + + void visitMessages(MessageHandler handler) throws StoreException; + void visitMessageInstances(MessageInstanceHandler handler) throws StoreException; + void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException; + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStoreRecoveryHandler.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStoreRecoveryHandler.java deleted file mode 100755 index ba65b8e1ec..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStoreRecoveryHandler.java +++ /dev/null @@ -1,33 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.store; - -public interface MessageStoreRecoveryHandler -{ - StoredMessageRecoveryHandler begin(); - - public static interface StoredMessageRecoveryHandler - { - void message(StoredMessage message); - - void completeMessageRecovery(); - } -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java index 59b4530014..a3ed4bea05 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java @@ -23,6 +23,10 @@ import java.util.Map; import java.util.UUID; import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; +import org.apache.qpid.server.store.handler.DistributedTransactionHandler; +import org.apache.qpid.server.store.handler.MessageHandler; +import org.apache.qpid.server.store.handler.MessageInstanceHandler; public abstract class NullMessageStore implements MessageStore, DurableConfigurationStore { @@ -32,11 +36,6 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura { } - @Override - public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler) - { - } - @Override public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) { @@ -91,11 +90,6 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura return null; } - @Override - public void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler) - { - } - @Override public void addEventListener(EventListener eventListener, Event... events) { @@ -112,4 +106,24 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura { } + @Override + public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws StoreException + { + } + + @Override + public void visitMessages(MessageHandler handler) throws StoreException + { + } + + @Override + public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException + { + } + + @Override + public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException + { + } + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java deleted file mode 100755 index bd4da648f9..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java +++ /dev/null @@ -1,42 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.store; - -import java.util.UUID; - -public interface TransactionLogRecoveryHandler -{ - QueueEntryRecoveryHandler begin(MessageStore log); - - public static interface QueueEntryRecoveryHandler - { - DtxRecordRecoveryHandler completeQueueEntryRecovery(); - - void queueEntry(UUID queueId, long messageId); - } - - public static interface DtxRecordRecoveryHandler - { - void dtxRecord(long format, byte[] globalId, byte[] branchId, Transaction.Record[] enqueues, Transaction.Record[] dequeues); - - void completeDtxRecordRecovery(); - } -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Xid.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Xid.java new file mode 100644 index 0000000000..4db33861a4 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Xid.java @@ -0,0 +1,102 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.util.Arrays; + +public final class Xid +{ + private final long _format; + private final byte[] _globalId; + private final byte[] _branchId; + + public Xid(long format, byte[] globalId, byte[] branchId) + { + _format = format; + _globalId = globalId; + _branchId = branchId; + } + + public long getFormat() + { + return _format; + } + + public byte[] getGlobalId() + { + return _globalId; + } + + public byte[] getBranchId() + { + return _branchId; + } + + @Override + public int hashCode() + { + final int prime = 31; + int result = 1; + result = prime * result + Arrays.hashCode(_branchId); + result = prime * result + (int) (_format ^ (_format >>> 32)); + result = prime * result + Arrays.hashCode(_globalId); + return result; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) + { + return true; + } + + if (obj == null) + { + return false; + } + + if (getClass() != obj.getClass()) + { + return false; + } + + Xid other = (Xid) obj; + + if (!Arrays.equals(_branchId, other._branchId)) + { + return false; + } + + if (_format != other._format) + { + return false; + } + + if (!Arrays.equals(_globalId, other._globalId)) + { + return false; + } + return true; + } + + +} \ No newline at end of file diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/ConfiguredObjectRecordHandler.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/ConfiguredObjectRecordHandler.java new file mode 100644 index 0000000000..747c735ff1 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/ConfiguredObjectRecordHandler.java @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.handler; + +import org.apache.qpid.server.store.ConfiguredObjectRecord; + +public interface ConfiguredObjectRecordHandler +{ + // TODO configVersion argument will be removed. + void begin(int configVersion); + + /** + * Handles the given record. + * + * @param record + * @return false is returned if the handler does not wish to handle other record, true otherwise + */ + boolean handle(ConfiguredObjectRecord record); + + //TODO: return should be void + // temporarily returning new config version + int end(); +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/DistributedTransactionHandler.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/DistributedTransactionHandler.java new file mode 100644 index 0000000000..733c93355b --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/DistributedTransactionHandler.java @@ -0,0 +1,30 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.handler; + +import org.apache.qpid.server.store.Transaction.Record; + +public interface DistributedTransactionHandler +{ + + boolean handle(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues); + +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/MessageHandler.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/MessageHandler.java new file mode 100644 index 0000000000..30c1f7b450 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/MessageHandler.java @@ -0,0 +1,30 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.handler; + +import org.apache.qpid.server.store.StoredMessage; + +public interface MessageHandler +{ + + boolean handle(StoredMessage storedMessage); + +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/MessageInstanceHandler.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/MessageInstanceHandler.java new file mode 100644 index 0000000000..3775ec4fee --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/MessageInstanceHandler.java @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.handler; + +import java.util.UUID; + +public interface MessageInstanceHandler +{ + boolean handle(UUID queueId, long messageId); + +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java index 7e0562afec..46b5dbb9fc 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java @@ -78,6 +78,10 @@ public class DefaultUpgraderProvider implements UpgraderProvider public DurableConfigurationStoreUpgrader getUpgrader(final int configVersion, DurableConfigurationRecoverer recoverer) { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Getting upgrader for configVersion: " + configVersion); + } DurableConfigurationStoreUpgrader currentUpgrader = null; switch(configVersion) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java new file mode 100644 index 0000000000..df47c85f64 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java @@ -0,0 +1,357 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost; + +import java.util.HashMap; +import java.util.Map; +import java.util.TreeMap; +import java.util.UUID; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.logging.EventLogger; +import org.apache.qpid.server.logging.messages.MessageStoreMessages; +import org.apache.qpid.server.logging.messages.TransactionLogMessages; +import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; +import org.apache.qpid.server.message.EnqueueableMessage; +import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.plugin.MessageMetaDataType; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StorableMessageMetaData; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.store.Transaction; +import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.server.store.Transaction.Record; +import org.apache.qpid.server.store.handler.DistributedTransactionHandler; +import org.apache.qpid.server.store.handler.MessageHandler; +import org.apache.qpid.server.store.handler.MessageInstanceHandler; +import org.apache.qpid.server.txn.DtxBranch; +import org.apache.qpid.server.txn.DtxRegistry; +import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.transport.Xid; +import org.apache.qpid.transport.util.Functions; + +public class MessageStoreRecoverer +{ + private static final Logger _logger = Logger.getLogger(MessageStoreRecoverer.class); + + private final VirtualHost _virtualHost; + + private final Map _queueRecoveries = new TreeMap(); + private final Map> _recoveredMessages = new HashMap>(); + private final Map> _unusedMessages = new HashMap>(); + private final EventLogger _eventLogger; + + private final MessageStoreLogSubject _logSubject; + private final MessageStore _store; + + + public MessageStoreRecoverer(VirtualHost virtualHost, MessageStoreLogSubject logSubject) + { + super(); + _virtualHost = virtualHost; + _eventLogger = virtualHost.getEventLogger(); + _logSubject = logSubject; + _store = virtualHost.getMessageStore(); + } + + + public void recover() + { + _eventLogger.message(_logSubject, MessageStoreMessages.RECOVERY_START()); + _store.visitMessages(messageVisitor); + + _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERY_START(null, false)); + _store.visitMessageInstances(messageAndMessageInstanceRecoverer); + + for(Map.Entry entry : _queueRecoveries.entrySet()) + { + _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERED(entry.getValue(), entry.getKey())); + _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(entry.getKey(), true)); + } + + _store.visitDistributedTransactions(distributedTransactionRecoverer); + + + + for(StoredMessage m : _unusedMessages.values()) + { + _logger.warn("Message id " + m.getMessageNumber() + " in store, but not in any queue - removing...."); + m.remove(); + } + _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false)); + + _eventLogger.message(_logSubject, MessageStoreMessages.RECOVERED(_recoveredMessages.size() - _unusedMessages.size())); + _eventLogger.message(_logSubject, MessageStoreMessages.RECOVERY_COMPLETE()); + + + } + + MessageHandler messageVisitor = new MessageHandler() + { + + @Override + public boolean handle(StoredMessage message) + { + StorableMessageMetaData metaData = message.getMetaData(); + + @SuppressWarnings("rawtypes") + MessageMetaDataType type = metaData.getType(); + + @SuppressWarnings("unchecked") + ServerMessage serverMessage = type.createMessage(message); + + _recoveredMessages.put(message.getMessageNumber(), serverMessage); + _unusedMessages.put(message.getMessageNumber(), message); + return true; + } + + }; + + MessageInstanceHandler messageAndMessageInstanceRecoverer = new MessageInstanceHandler() + { + @Override + public boolean handle(final UUID queueId, long messageId) + { + AMQQueue queue = _virtualHost.getQueue(queueId); + if(queue != null) + { + String queueName = queue.getName(); + ServerMessage message = _recoveredMessages.get(messageId); + _unusedMessages.remove(messageId); + + if(message != null) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("On recovery, delivering " + message.getMessageNumber() + " to " + queueName); + } + + Integer count = _queueRecoveries.get(queueName); + if (count == null) + { + count = 0; + } + + queue.enqueue(message,null); + + _queueRecoveries.put(queueName, ++count); + } + else + { + _logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queueName + " is unknown, entry will be discarded"); + Transaction txn = _store.newTransaction(); + txn.dequeueMessage(queue, new DummyMessage(messageId)); + txn.commitTranAsync(); + } + } + else + { + _logger.warn("Message id " + messageId + " in log references queue with id " + queueId + " which is not in the configuration, entry will be discarded"); + Transaction txn = _store.newTransaction(); + TransactionLogResource mockQueue = + new TransactionLogResource() + { + @Override + public String getName() + { + return "<>"; + } + + @Override + public UUID getId() + { + return queueId; + } + + @Override + public boolean isDurable() + { + return false; + } + }; + txn.dequeueMessage(mockQueue, new DummyMessage(messageId)); + txn.commitTranAsync(); + } + return true; + } + }; + + private DistributedTransactionHandler distributedTransactionRecoverer = new DistributedTransactionHandler() + { + + @Override + public boolean handle(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues) + { + Xid id = new Xid(format, globalId, branchId); + DtxRegistry dtxRegistry = _virtualHost.getDtxRegistry(); + DtxBranch branch = dtxRegistry.getBranch(id); + if(branch == null) + { + branch = new DtxBranch(id, _store, _virtualHost); + dtxRegistry.registerBranch(branch); + } + for(Transaction.Record record : enqueues) + { + final AMQQueue queue = _virtualHost.getQueue(record.getResource().getId()); + if(queue != null) + { + final long messageId = record.getMessage().getMessageNumber(); + final ServerMessage message = _recoveredMessages.get(messageId); + _unusedMessages.remove(messageId); + + if(message != null) + { + final MessageReference ref = message.newReference(); + + branch.enqueue(queue,message); + + branch.addPostTransactionAction(new ServerTransaction.Action() + { + + public void postCommit() + { + queue.enqueue(message, null); + ref.release(); + } + + public void onRollback() + { + ref.release(); + } + }); + } + else + { + StringBuilder xidString = xidAsString(id); + _eventLogger.message(_logSubject, + TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(), + Long.toString(messageId))); + } + } + else + { + StringBuilder xidString = xidAsString(id); + _eventLogger.message(_logSubject, + TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(), + record.getResource().getId().toString())); + + } + } + for(Transaction.Record record : dequeues) + { + final AMQQueue queue = _virtualHost.getQueue(record.getResource().getId()); + if(queue != null) + { + final long messageId = record.getMessage().getMessageNumber(); + final ServerMessage message = _recoveredMessages.get(messageId); + _unusedMessages.remove(messageId); + + if(message != null) + { + final QueueEntry entry = queue.getMessageOnTheQueue(messageId); + + entry.acquire(); + + branch.dequeue(queue, message); + + branch.addPostTransactionAction(new ServerTransaction.Action() + { + + public void postCommit() + { + entry.delete(); + } + + public void onRollback() + { + entry.release(); + } + }); + } + else + { + StringBuilder xidString = xidAsString(id); + _eventLogger.message(_logSubject, + TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(), + Long.toString(messageId))); + + } + + } + else + { + StringBuilder xidString = xidAsString(id); + _eventLogger.message(_logSubject, + TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(), + record.getResource().getId().toString())); + } + + } + + branch.setState(DtxBranch.State.PREPARED); + branch.prePrepareTransaction(); + return true; + } + + private StringBuilder xidAsString(Xid id) + { + return new StringBuilder("(") + .append(id.getFormat()) + .append(',') + .append(Functions.str(id.getGlobalId())) + .append(',') + .append(Functions.str(id.getBranchId())) + .append(')'); + } + + + }; + + + private static class DummyMessage implements EnqueueableMessage + { + + private final long _messageId; + + public DummyMessage(long messageId) + { + _messageId = messageId; + } + + public long getMessageNumber() + { + return _messageId; + } + + public boolean isPersistent() + { + return true; + } + + public StoredMessage getStoredMessage() + { + return null; + } + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java index e3fd938225..14849aea1e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java @@ -29,11 +29,11 @@ import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.plugin.MessageStoreFactory; import org.apache.qpid.server.stats.StatisticsGatherer; - -import org.apache.qpid.server.store.DurableConfigurationRecoverer; +import org.apache.qpid.server.store.ConfiguredObjectRecordRecoveverAndUpgrader; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.DurableConfigurationStoreCreator; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; public class StandardVirtualHost extends AbstractVirtualHost { @@ -107,18 +107,22 @@ public class StandardVirtualHost extends AbstractVirtualHost if (_configurationStoreLogSubject != null) { getEventLogger().message(_configurationStoreLogSubject, ConfigStoreMessages.STORE_LOCATION(configurationStoreSettings.toString())); + getEventLogger().message(_configurationStoreLogSubject, ConfigStoreMessages.RECOVERY_START()); } - DurableConfigurationRecoverer configRecoverer = new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(), - new DefaultUpgraderProvider(this), getEventLogger()); + ConfiguredObjectRecordHandler upgraderRecoverer = new ConfiguredObjectRecordRecoveverAndUpgrader(this, getDurableConfigurationRecoverers()); + + _durableConfigurationStore.visitConfiguredObjectRecords(upgraderRecoverer); - _durableConfigurationStore.recoverConfigurationStore(configRecoverer); + if (_configurationStoreLogSubject != null) + { + getEventLogger().message(_configurationStoreLogSubject, ConfigStoreMessages.RECOVERY_COMPLETE()); + } // If store does not have entries for standard exchanges (amq.*), the following will create them. initialiseModel(); - VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this, getMessageStoreLogSubject()); - _messageStore.recoverMessageStore(recoveryHandler, recoveryHandler); + new MessageStoreRecoverer(this, getMessageStoreLogSubject()).recover(); attainActivation(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java deleted file mode 100755 index 3216115967..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java +++ /dev/null @@ -1,350 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.virtualhost; - -import java.util.HashMap; -import java.util.Map; -import java.util.TreeMap; -import java.util.UUID; - -import org.apache.log4j.Logger; -import org.apache.qpid.server.logging.EventLogger; -import org.apache.qpid.server.logging.messages.MessageStoreMessages; -import org.apache.qpid.server.logging.messages.TransactionLogMessages; -import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; -import org.apache.qpid.server.message.EnqueueableMessage; -import org.apache.qpid.server.message.MessageReference; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreRecoveryHandler; -import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.store.Transaction; -import org.apache.qpid.server.store.TransactionLogRecoveryHandler; -import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.server.txn.DtxBranch; -import org.apache.qpid.server.txn.DtxRegistry; -import org.apache.qpid.server.txn.ServerTransaction; -import org.apache.qpid.transport.Xid; -import org.apache.qpid.transport.util.Functions; - -public class VirtualHostConfigRecoveryHandler implements - MessageStoreRecoveryHandler, - MessageStoreRecoveryHandler.StoredMessageRecoveryHandler, - TransactionLogRecoveryHandler, - TransactionLogRecoveryHandler.QueueEntryRecoveryHandler, - TransactionLogRecoveryHandler.DtxRecordRecoveryHandler -{ - private static final Logger _logger = Logger.getLogger(VirtualHostConfigRecoveryHandler.class); - - private final VirtualHost _virtualHost; - - private final Map _queueRecoveries = new TreeMap(); - private final Map _recoveredMessages = new HashMap(); - private final Map _unusedMessages = new HashMap(); - private final EventLogger _eventLogger; - - private final MessageStoreLogSubject _logSubject; - private MessageStore _store; - - public VirtualHostConfigRecoveryHandler(VirtualHost virtualHost, MessageStoreLogSubject logSubject) - { - _virtualHost = virtualHost; - _eventLogger = virtualHost.getEventLogger(); - _logSubject = logSubject; - } - - public VirtualHostConfigRecoveryHandler begin(MessageStore store) - { - _store = store; - _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERY_START(null, false)); - return this; - } - - public StoredMessageRecoveryHandler begin() - { - _eventLogger.message(_logSubject, MessageStoreMessages.RECOVERY_START()); - return this; - } - - public void message(StoredMessage message) - { - ServerMessage serverMessage = message.getMetaData().getType().createMessage(message); - - _recoveredMessages.put(message.getMessageNumber(), serverMessage); - _unusedMessages.put(message.getMessageNumber(), message); - } - - public void completeMessageRecovery() - { - } - - public void dtxRecord(long format, byte[] globalId, byte[] branchId, - Transaction.Record[] enqueues, - Transaction.Record[] dequeues) - { - Xid id = new Xid(format, globalId, branchId); - DtxRegistry dtxRegistry = _virtualHost.getDtxRegistry(); - DtxBranch branch = dtxRegistry.getBranch(id); - if(branch == null) - { - branch = new DtxBranch(id, _store, _virtualHost); - dtxRegistry.registerBranch(branch); - } - for(Transaction.Record record : enqueues) - { - final AMQQueue queue = _virtualHost.getQueue(record.getResource().getId()); - if(queue != null) - { - final long messageId = record.getMessage().getMessageNumber(); - final ServerMessage message = _recoveredMessages.get(messageId); - _unusedMessages.remove(messageId); - - if(message != null) - { - final MessageReference ref = message.newReference(); - - - branch.enqueue(queue,message); - - branch.addPostTransactionAction(new ServerTransaction.Action() - { - - public void postCommit() - { - queue.enqueue(message, null); - ref.release(); - } - - public void onRollback() - { - ref.release(); - } - }); - } - else - { - StringBuilder xidString = xidAsString(id); - _eventLogger.message(_logSubject, - TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(), - Long.toString(messageId))); - - } - - } - else - { - StringBuilder xidString = xidAsString(id); - _eventLogger.message(_logSubject, - TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(), - record.getResource().getId().toString())); - - } - } - for(Transaction.Record record : dequeues) - { - final AMQQueue queue = _virtualHost.getQueue(record.getResource().getId()); - if(queue != null) - { - final long messageId = record.getMessage().getMessageNumber(); - final ServerMessage message = _recoveredMessages.get(messageId); - _unusedMessages.remove(messageId); - - if(message != null) - { - final QueueEntry entry = queue.getMessageOnTheQueue(messageId); - - entry.acquire(); - - branch.dequeue(queue, message); - - branch.addPostTransactionAction(new ServerTransaction.Action() - { - - public void postCommit() - { - entry.delete(); - } - - public void onRollback() - { - entry.release(); - } - }); - } - else - { - StringBuilder xidString = xidAsString(id); - _eventLogger.message(_logSubject, - TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(), - Long.toString(messageId))); - - } - - } - else - { - StringBuilder xidString = xidAsString(id); - _eventLogger.message(_logSubject, - TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(), - record.getResource().getId().toString())); - } - - } - - branch.setState(DtxBranch.State.PREPARED); - branch.prePrepareTransaction(); - } - - private static StringBuilder xidAsString(Xid id) - { - return new StringBuilder("(") - .append(id.getFormat()) - .append(',') - .append(Functions.str(id.getGlobalId())) - .append(',') - .append(Functions.str(id.getBranchId())) - .append(')'); - } - - public void completeDtxRecordRecovery() - { - for(StoredMessage m : _unusedMessages.values()) - { - _logger.warn("Message id " + m.getMessageNumber() + " in store, but not in any queue - removing...."); - m.remove(); - } - _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false)); - - _eventLogger.message(_logSubject, MessageStoreMessages.RECOVERED(_recoveredMessages.size() - _unusedMessages.size())); - _eventLogger.message(_logSubject, MessageStoreMessages.RECOVERY_COMPLETE()); - } - - public void queueEntry(final UUID queueId, long messageId) - { - AMQQueue queue = _virtualHost.getQueue(queueId); - if(queue != null) - { - String queueName = queue.getName(); - ServerMessage message = _recoveredMessages.get(messageId); - _unusedMessages.remove(messageId); - - if(message != null) - { - - - if (_logger.isDebugEnabled()) - { - _logger.debug("On recovery, delivering " + message.getMessageNumber() + " to " + queueName); - } - - Integer count = _queueRecoveries.get(queueName); - if (count == null) - { - count = 0; - } - - queue.enqueue(message,null); - - _queueRecoveries.put(queueName, ++count); - } - else - { - _logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queueName + " is unknown, entry will be discarded"); - Transaction txn = _store.newTransaction(); - txn.dequeueMessage(queue, new DummyMessage(messageId)); - txn.commitTranAsync(); - } - } - else - { - _logger.warn("Message id " + messageId + " in log references queue with id " + queueId + " which is not in the configuration, entry will be discarded"); - Transaction txn = _store.newTransaction(); - TransactionLogResource mockQueue = - new TransactionLogResource() - { - @Override - public String getName() - { - return "<>"; - } - - @Override - public UUID getId() - { - return queueId; - } - - @Override - public boolean isDurable() - { - return false; - } - }; - txn.dequeueMessage(mockQueue, new DummyMessage(messageId)); - txn.commitTranAsync(); - } - } - - public DtxRecordRecoveryHandler completeQueueEntryRecovery() - { - - for(Map.Entry entry : _queueRecoveries.entrySet()) - { - _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERED(entry.getValue(), entry.getKey())); - - _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(entry.getKey(), true)); - } - - return this; - } - - private static class DummyMessage implements EnqueueableMessage - { - - - private final long _messageId; - - public DummyMessage(long messageId) - { - _messageId = messageId; - } - - public long getMessageNumber() - { - return _messageId; - } - - - public boolean isPersistent() - { - return true; - } - - - public StoredMessage getStoredMessage() - { - return null; - } - } - -} diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java index 0fe9d1ac49..1de857d224 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java @@ -50,10 +50,10 @@ import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.SystemContext; import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.ConfiguredObjectRecordImpl; import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; import org.apache.qpid.test.utils.QpidTestCase; public class ManagementModeStoreHandlerTest extends QpidTestCase @@ -89,20 +89,22 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase when(_portEntry.getParents()).thenReturn(Collections.singletonMap(Broker.class.getSimpleName(), _root)); when(_portEntry.getType()).thenReturn(Port.class.getSimpleName()); - final ArgumentCaptor recovererArgumentCaptor = ArgumentCaptor.forClass(ConfigurationRecoveryHandler.class); + final ArgumentCaptor recovererArgumentCaptor = ArgumentCaptor.forClass(ConfiguredObjectRecordHandler.class); doAnswer( new Answer() { @Override public Object answer(final InvocationOnMock invocation) throws Throwable { - ConfigurationRecoveryHandler recoverer = recovererArgumentCaptor.getValue(); - recoverer.configuredObject(_root); - recoverer.configuredObject(_portEntry); + ConfiguredObjectRecordHandler recoverer = recovererArgumentCaptor.getValue(); + if(recoverer.handle(_root)) + { + recoverer.handle(_portEntry); + } return null; } } - ).when(_store).recoverConfigurationStore(recovererArgumentCaptor.capture()); + ).when(_store).visitConfiguredObjectRecords(recovererArgumentCaptor.capture()); _options = new BrokerOptions(); _handler = new ManagementModeStoreHandler(_store, _options); @@ -112,21 +114,21 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase private ConfiguredObjectRecord getRootEntry() { BrokerFinder brokerFinder = new BrokerFinder(); - _handler.recoverConfigurationStore(brokerFinder); + _handler.visitConfiguredObjectRecords(brokerFinder); return brokerFinder.getBrokerRecord(); } private ConfiguredObjectRecord getEntry(UUID id) { RecordFinder recordFinder = new RecordFinder(id); - _handler.recoverConfigurationStore(recordFinder); + _handler.visitConfiguredObjectRecords(recordFinder); return recordFinder.getFoundRecord(); } private Collection getChildrenIds(ConfiguredObjectRecord record) { ChildFinder childFinder = new ChildFinder(record); - _handler.recoverConfigurationStore(childFinder); + _handler.visitConfiguredObjectRecords(childFinder); return childFinder.getChildIds(); } @@ -288,21 +290,25 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase attributes.put(VirtualHost.TYPE, "STANDARD"); final ConfiguredObjectRecord virtualHost = new ConfiguredObjectRecordImpl(virtualHostId, VirtualHost.class.getSimpleName(), attributes, Collections.singletonMap(Broker.class.getSimpleName(), _root)); - final ArgumentCaptor recovererArgumentCaptor = ArgumentCaptor.forClass(ConfigurationRecoveryHandler.class); + final ArgumentCaptor recovererArgumentCaptor = ArgumentCaptor.forClass(ConfiguredObjectRecordHandler.class); doAnswer( new Answer() { @Override public Object answer(final InvocationOnMock invocation) throws Throwable { - ConfigurationRecoveryHandler recoverer = recovererArgumentCaptor.getValue(); - recoverer.configuredObject(_root); - recoverer.configuredObject(_portEntry); - recoverer.configuredObject(virtualHost); + ConfiguredObjectRecordHandler recoverer = recovererArgumentCaptor.getValue(); + if(recoverer.handle(_root)) + { + if(recoverer.handle(_portEntry)) + { + recoverer.handle(virtualHost); + } + } return null; } } - ).when(_store).recoverConfigurationStore(recovererArgumentCaptor.capture()); + ).when(_store).visitConfiguredObjectRecords(recovererArgumentCaptor.capture()); State expectedState = mmQuiesceVhosts ? State.QUIESCED : null; if(mmQuiesceVhosts) @@ -457,28 +463,32 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase } - private class BrokerFinder implements ConfigurationRecoveryHandler + private class BrokerFinder implements ConfiguredObjectRecordHandler { private ConfiguredObjectRecord _brokerRecord; + private int _version; + @Override - public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion) + public void begin(final int configVersion) { - + _version = configVersion; } @Override - public void configuredObject(final ConfiguredObjectRecord object) + public boolean handle(final ConfiguredObjectRecord object) { if(object.getType().equals(Broker.class.getSimpleName())) { _brokerRecord = object; + return false; } + return true; } @Override - public int completeConfigurationRecovery() + public int end() { - return 0; + return _version; } public ConfiguredObjectRecord getBrokerRecord() @@ -487,10 +497,11 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase } } - private class RecordFinder implements ConfigurationRecoveryHandler + private class RecordFinder implements ConfiguredObjectRecordHandler { private final UUID _id; private ConfiguredObjectRecord _foundRecord; + private int _version; private RecordFinder(final UUID id) { @@ -498,24 +509,26 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase } @Override - public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion) + public void begin(final int configVersion) { - + _version = configVersion; } @Override - public void configuredObject(final ConfiguredObjectRecord object) + public boolean handle(final ConfiguredObjectRecord object) { if(object.getId().equals(_id)) { _foundRecord = object; + return false; } + return true; } @Override - public int completeConfigurationRecovery() + public int end() { - return 0; + return _version; } public ConfiguredObjectRecord getFoundRecord() @@ -524,10 +537,11 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase } } - private class ChildFinder implements ConfigurationRecoveryHandler + private class ChildFinder implements ConfiguredObjectRecordHandler { private final Collection _childIds = new HashSet(); private final ConfiguredObjectRecord _parent; + private int _version; private ChildFinder(final ConfiguredObjectRecord parent) { @@ -535,13 +549,13 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase } @Override - public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion) + public void begin(final int configVersion) { - + _version = configVersion; } @Override - public void configuredObject(final ConfiguredObjectRecord object) + public boolean handle(final ConfiguredObjectRecord object) { if(object.getParents() != null) @@ -555,12 +569,13 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase } } + return true; } @Override - public int completeConfigurationRecovery() + public int end() { - return 0; + return _version; } public Collection getChildIds() diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java index 83052110a1..b38d9d7bd2 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java @@ -21,9 +21,7 @@ package org.apache.qpid.server.store; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.argThat; -import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -49,6 +47,7 @@ import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.util.FileUtils; import org.mockito.ArgumentCaptor; @@ -71,9 +70,8 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest private String _storePath; private String _storeName; - private ConfigurationRecoveryHandler _recoveryHandler; + private ConfiguredObjectRecordHandler _handler; - private ExchangeImpl _exchange = mock(ExchangeImpl.class); private static final String ROUTING_KEY = "routingKey"; private static final String QUEUE_NAME = "queueName"; private Map _bindingArgs; @@ -96,16 +94,8 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest FileUtils.delete(new File(_storePath), true); setTestSystemProperty("QPID_WORK", TMP_FOLDER); - _recoveryHandler = mock(ConfigurationRecoveryHandler.class); - when(_exchange.getName()).thenReturn(EXCHANGE_NAME); - when(_exchange.getId()).thenReturn(_exchangeId); - when(_exchange.getExchangeType()).thenReturn(mock(ExchangeType.class)); - when(_exchange.getEventLogger()).thenReturn(new EventLogger()); - - ConfiguredObjectRecord exchangeRecord = mock(ConfiguredObjectRecord.class); - when(exchangeRecord.getId()).thenReturn(_exchangeId); - when(exchangeRecord.getType()).thenReturn(Exchange.class.getSimpleName()); - when(_exchange.asObjectRecord()).thenReturn(exchangeRecord); + _handler = mock(ConfiguredObjectRecordHandler.class); + when(_handler.handle(any(ConfiguredObjectRecord.class))).thenReturn(true); _bindingArgs = new HashMap(); String argKey = AMQPFilterTypes.JMS_SELECTOR.toString(); @@ -134,7 +124,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest DurableConfigurationStoreHelper.createExchange(_configStore, exchange); reopenStore(); - verify(_recoveryHandler).configuredObject(matchesRecord(_exchangeId, EXCHANGE, + verify(_handler).handle(matchesRecord(_exchangeId, EXCHANGE, map( org.apache.qpid.server.model.Exchange.NAME, getName(), org.apache.qpid.server.model.Exchange.TYPE, getName()+"Type", org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS.name()))); @@ -168,14 +158,16 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest DurableConfigurationStoreHelper.removeExchange(_configStore, exchange); reopenStore(); - verify(_recoveryHandler, never()).configuredObject(any(ConfiguredObjectRecord.class)); + verify(_handler, never()).handle(any(ConfiguredObjectRecord.class)); } public void testBindQueue() throws Exception { + ExchangeImpl exchange = createTestExchange(); AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false, null); BindingImpl binding = new BindingImpl(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue, - _exchange, _bindingArgs); + exchange, _bindingArgs); + DurableConfigurationStoreHelper.createExchange(_configStore, exchange); DurableConfigurationStoreHelper.createQueue(_configStore, queue); DurableConfigurationStoreHelper.createBinding(_configStore, binding); @@ -187,10 +179,10 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest Map parents = new HashMap(); - parents.put(Exchange.class.getSimpleName(), _exchange.getId()); + parents.put(Exchange.class.getSimpleName(), exchange.getId()); parents.put(Queue.class.getSimpleName(), queue.getId()); - verify(_recoveryHandler).configuredObject(matchesRecord(binding.getId(), BINDING, map, parents)); + verify(_handler).handle(matchesRecord(binding.getId(), BINDING, map, parents)); } @@ -260,15 +252,18 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest public void testUnbindQueue() throws Exception { + ExchangeImpl exchange = createTestExchange(); + DurableConfigurationStoreHelper.createExchange(_configStore, exchange); + AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false, null); BindingImpl binding = new BindingImpl(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue, - _exchange, _bindingArgs); + exchange, _bindingArgs); DurableConfigurationStoreHelper.createBinding(_configStore, binding); DurableConfigurationStoreHelper.removeBinding(_configStore, binding); reopenStore(); - verify(_recoveryHandler, never()).configuredObject(matchesRecord(ANY_UUID, BINDING, + verify(_handler, never()).handle(matchesRecord(ANY_UUID, BINDING, ANY_MAP)); } @@ -282,7 +277,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest queueAttributes.put(Queue.NAME, getName()); queueAttributes.put(Queue.OWNER, getName()+"Owner"); queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name()); - verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes)); + verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes)); } public void testCreateQueueAMQQueueFieldTable() throws Exception @@ -304,7 +299,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name()); queueAttributes.putAll(attributes); - verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes)); + verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes)); } public void testCreateQueueAMQQueueWithAlternateExchange() throws Exception @@ -322,7 +317,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name()); queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString()); - verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes)); + verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes)); } private ExchangeImpl createTestAlternateExchange() @@ -355,7 +350,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest queueAttributes.put(Queue.NAME, getName()); queueAttributes.putAll(attributes); - verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes)); + verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes)); } @@ -382,7 +377,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest queueAttributes.putAll(attributes); queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString()); - verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes)); + verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes)); } public void testRemoveQueue() throws Exception @@ -397,7 +392,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest // remove queue DurableConfigurationStoreHelper.removeQueue(_configStore,queue); reopenStore(); - verify(_recoveryHandler, never()).configuredObject(any(ConfiguredObjectRecord.class)); + verify(_handler, never()).handle(any(ConfiguredObjectRecord.class)); } private AMQQueue createTestQueue(String queueName, @@ -463,11 +458,9 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest { ExchangeImpl exchange = mock(ExchangeImpl.class); Map actualAttributes = new HashMap(); - actualAttributes.put("id", _exchangeId); actualAttributes.put("name", getName()); actualAttributes.put("type", getName() + "Type"); actualAttributes.put("lifetimePolicy", LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS); - when(exchange.getActualAttributes()).thenReturn(actualAttributes); when(exchange.getName()).thenReturn(getName()); when(exchange.getTypeName()).thenReturn(getName() + "Type"); when(exchange.isAutoDelete()).thenReturn(true); @@ -475,11 +468,10 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest ConfiguredObjectRecord exchangeRecord = mock(ConfiguredObjectRecord.class); when(exchangeRecord.getId()).thenReturn(_exchangeId); when(exchangeRecord.getType()).thenReturn(Exchange.class.getSimpleName()); - Map actualAttributesExceptId = new HashMap(actualAttributes); - actualAttributesExceptId.remove("id"); - when(exchangeRecord.getAttributes()).thenReturn(actualAttributesExceptId); + when(exchangeRecord.getAttributes()).thenReturn(actualAttributes); when(exchange.asObjectRecord()).thenReturn(exchangeRecord); - + when(exchange.getExchangeType()).thenReturn(mock(ExchangeType.class)); + when(exchange.getEventLogger()).thenReturn(new EventLogger()); return exchange; } @@ -491,7 +483,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest ConfiguredObject parent = mock(ConfiguredObject.class); when(parent.getName()).thenReturn("testName"); _configStore.openConfigurationStore(parent, _configurationStoreSettings); - _configStore.recoverConfigurationStore(_recoveryHandler); + _configStore.visitConfiguredObjectRecords(_handler); } protected abstract DurableConfigurationStore createConfigStore() throws Exception; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreConfigurationTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreConfigurationTest.java index 8f2d0029f6..2400a68c93 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreConfigurationTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreConfigurationTest.java @@ -27,10 +27,4 @@ public class JsonFileConfigStoreConfigurationTest extends AbstractDurableConfigu { return new JsonFileConfigStore(); } - - @Override - public void testBindQueue() throws Exception - { - // TODO: Temporarily disable the test as it is already fixed on trunk - } } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java index 1de24e371d..6907898a6c 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java @@ -28,6 +28,7 @@ import java.util.UUID; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.test.utils.TestFileUtils; @@ -43,15 +44,15 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.times; public class JsonFileConfigStoreTest extends QpidTestCase { - private final ConfigurationRecoveryHandler _recoveryHandler = mock(ConfigurationRecoveryHandler.class); - private JsonFileConfigStore _store; private HashMap _configurationStoreSettings; private ConfiguredObject _virtualHost; private File _storeLocation; + private ConfiguredObjectRecordHandler _handler; private static final UUID ANY_UUID = UUID.randomUUID(); @@ -69,6 +70,9 @@ public class JsonFileConfigStoreTest extends QpidTestCase _configurationStoreSettings.put(JsonFileConfigStore.STORE_TYPE, JsonFileConfigStore.TYPE); _configurationStoreSettings.put(JsonFileConfigStore.STORE_PATH, _storeLocation.getAbsolutePath()); _store = new JsonFileConfigStore(); + + _handler = mock(ConfiguredObjectRecordHandler.class); + when(_handler.handle(any(ConfiguredObjectRecord.class))).thenReturn(true); } @Override @@ -113,35 +117,35 @@ public class JsonFileConfigStoreTest extends QpidTestCase } } - public void testStartFromNoStore() throws Exception + public void testVisitEmptyStore() { _store.openConfigurationStore(_virtualHost, _configurationStoreSettings); - _store.recoverConfigurationStore(_recoveryHandler); - InOrder inorder = inOrder(_recoveryHandler); - inorder.verify(_recoveryHandler).beginConfigurationRecovery(eq(_store), eq(0)); - inorder.verify(_recoveryHandler,never()).configuredObject(any(ConfiguredObjectRecord.class)); - inorder.verify(_recoveryHandler).completeConfigurationRecovery(); + _store.visitConfiguredObjectRecords(_handler); + InOrder inorder = inOrder(_handler); + inorder.verify(_handler).begin(eq(0)); + inorder.verify(_handler,never()).handle(any(ConfiguredObjectRecord.class)); + inorder.verify(_handler).end(); _store.closeConfigurationStore(); } public void testUpdatedConfigVersionIsRetained() throws Exception { final int NEW_CONFIG_VERSION = 42; - when(_recoveryHandler.completeConfigurationRecovery()).thenReturn(NEW_CONFIG_VERSION); + when(_handler.end()).thenReturn(NEW_CONFIG_VERSION); _store.openConfigurationStore(_virtualHost, _configurationStoreSettings); - _store.recoverConfigurationStore(_recoveryHandler); + _store.visitConfiguredObjectRecords(_handler); _store.closeConfigurationStore(); _store.openConfigurationStore(_virtualHost, _configurationStoreSettings); - _store.recoverConfigurationStore(_recoveryHandler); - InOrder inorder = inOrder(_recoveryHandler); + _store.visitConfiguredObjectRecords(_handler); + InOrder inorder = inOrder(_handler); // first time the config version should be the initial version - 0 - inorder.verify(_recoveryHandler).beginConfigurationRecovery(eq(_store), eq(0)); + inorder.verify(_handler).begin(eq(0)); // second time the config version should be the updated version - inorder.verify(_recoveryHandler).beginConfigurationRecovery(eq(_store), eq(NEW_CONFIG_VERSION)); + inorder.verify(_handler).begin(eq(NEW_CONFIG_VERSION)); _store.closeConfigurationStore(); } @@ -157,8 +161,9 @@ public class JsonFileConfigStoreTest extends QpidTestCase _store.closeConfigurationStore(); _store.openConfigurationStore(_virtualHost, _configurationStoreSettings); - _store.recoverConfigurationStore(_recoveryHandler); - verify(_recoveryHandler).configuredObject(matchesRecord(queueId, queueType, queueAttr)); + + _store.visitConfiguredObjectRecords(_handler); + verify(_handler, times(1)).handle(matchesRecord(queueId, queueType, queueAttr)); _store.closeConfigurationStore(); } @@ -179,8 +184,8 @@ public class JsonFileConfigStoreTest extends QpidTestCase _store.closeConfigurationStore(); _store.openConfigurationStore(_virtualHost, _configurationStoreSettings); - _store.recoverConfigurationStore(_recoveryHandler); - verify(_recoveryHandler).configuredObject(matchesRecord(queueId, queueType, queueAttr)); + _store.visitConfiguredObjectRecords(_handler); + verify(_handler, times(1)).handle(matchesRecord(queueId, queueType, queueAttr)); _store.closeConfigurationStore(); } @@ -201,8 +206,8 @@ public class JsonFileConfigStoreTest extends QpidTestCase _store.closeConfigurationStore(); _store.openConfigurationStore(_virtualHost, _configurationStoreSettings); - _store.recoverConfigurationStore(_recoveryHandler); - verify(_recoveryHandler, never()).configuredObject(any(ConfiguredObjectRecord.class)); + _store.visitConfiguredObjectRecords(_handler); + verify(_handler, never()).handle(any(ConfiguredObjectRecord.class)); _store.closeConfigurationStore(); } @@ -311,12 +316,12 @@ public class JsonFileConfigStoreTest extends QpidTestCase _store.update(true, bindingRecord, binding2Record); _store.closeConfigurationStore(); _store.openConfigurationStore(_virtualHost, _configurationStoreSettings); - _store.recoverConfigurationStore(_recoveryHandler); - verify(_recoveryHandler).configuredObject(matchesRecord(queueId, "Queue", EMPTY_ATTR)); - verify(_recoveryHandler).configuredObject(matchesRecord(queue2Id, "Queue", EMPTY_ATTR)); - verify(_recoveryHandler).configuredObject(matchesRecord(exchangeId, "Exchange", EMPTY_ATTR)); - verify(_recoveryHandler).configuredObject(matchesRecord(bindingId, "Binding", EMPTY_ATTR)); - verify(_recoveryHandler).configuredObject(matchesRecord(binding2Id, "Binding", EMPTY_ATTR)); + _store.visitConfiguredObjectRecords(_handler); + verify(_handler).handle(matchesRecord(queueId, "Queue", EMPTY_ATTR)); + verify(_handler).handle(matchesRecord(queue2Id, "Queue", EMPTY_ATTR)); + verify(_handler).handle(matchesRecord(exchangeId, "Exchange", EMPTY_ATTR)); + verify(_handler).handle(matchesRecord(bindingId, "Binding", EMPTY_ATTR)); + verify(_handler).handle(matchesRecord(binding2Id, "Binding", EMPTY_ATTR)); _store.closeConfigurationStore(); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java index 451a2744c3..89fef15e7e 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java @@ -33,7 +33,6 @@ import java.util.UUID; import org.apache.log4j.Logger; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.util.FileUtils; @@ -66,12 +65,9 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple _store = createStore(); - MessageStoreRecoveryHandler recoveryHandler = mock(MessageStoreRecoveryHandler.class); - when(recoveryHandler.begin()).thenReturn(mock(StoredMessageRecoveryHandler.class)); ConfiguredObject parent = mock(ConfiguredObject.class); when(parent.getName()).thenReturn("test"); _store.openMessageStore(parent, storeSettings); - _store.recoverMessageStore(recoveryHandler, null); _transactionResource = UUID.randomUUID(); _events = new ArrayList(); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java index 51d3fc15d2..8bf981bd7b 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java @@ -21,29 +21,31 @@ package org.apache.qpid.server.store; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.argThat; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; import org.apache.qpid.server.store.Transaction.Record; +import org.apache.qpid.server.store.handler.DistributedTransactionHandler; +import org.apache.qpid.server.store.handler.MessageHandler; +import org.apache.qpid.server.store.handler.MessageInstanceHandler; import org.apache.qpid.test.utils.QpidTestCase; +import org.mockito.ArgumentMatcher; public abstract class MessageStoreTestCase extends QpidTestCase { - private MessageStoreRecoveryHandler _messageStoreRecoveryHandler; - private StoredMessageRecoveryHandler _storedMessageRecoveryHandler; - private TransactionLogRecoveryHandler _logRecoveryHandler; - private TransactionLogRecoveryHandler.QueueEntryRecoveryHandler _queueEntryRecoveryHandler; - private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler _dtxRecordRecoveryHandler; - private MessageStore _store; private Map _storeSettings; private ConfiguredObject _parent; @@ -55,35 +57,34 @@ public abstract class MessageStoreTestCase extends QpidTestCase _parent = mock(ConfiguredObject.class); when(_parent.getName()).thenReturn("test"); - _storedMessageRecoveryHandler = mock(StoredMessageRecoveryHandler.class); - _logRecoveryHandler = mock(TransactionLogRecoveryHandler.class); - _messageStoreRecoveryHandler = mock(MessageStoreRecoveryHandler.class); - _queueEntryRecoveryHandler = mock(TransactionLogRecoveryHandler.QueueEntryRecoveryHandler.class); - _dtxRecordRecoveryHandler = mock(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler.class); - - when(_messageStoreRecoveryHandler.begin()).thenReturn(_storedMessageRecoveryHandler); - when(_logRecoveryHandler.begin(any(MessageStore.class))).thenReturn(_queueEntryRecoveryHandler); - when(_queueEntryRecoveryHandler.completeQueueEntryRecovery()).thenReturn(_dtxRecordRecoveryHandler); - _storeSettings = getStoreSettings(); _store = createMessageStore(); _store.openMessageStore(_parent, _storeSettings); - _store.recoverMessageStore(_messageStoreRecoveryHandler, _logRecoveryHandler); + } protected abstract Map getStoreSettings() throws Exception; protected abstract MessageStore createMessageStore(); - public MessageStore getStore() + protected MessageStore getStore() { return _store; } - public void testRecordXid() throws Exception + protected void reopenStore() throws Exception { + _store.closeMessageStore(); + + _store = createMessageStore(); + _store.openMessageStore(_parent, _storeSettings); + } + + public void testAddAndRemoveRecordXid() throws Exception + { + long format = 1l; Record enqueueRecord = getTestRecord(1); Record dequeueRecord = getTestRecord(2); Record[] enqueues = { enqueueRecord }; @@ -92,27 +93,287 @@ public abstract class MessageStoreTestCase extends QpidTestCase byte[] branchId = new byte[] { 2 }; Transaction transaction = _store.newTransaction(); - transaction.recordXid(1l, globalId, branchId, enqueues, dequeues); + transaction.recordXid(format, globalId, branchId, enqueues, dequeues); transaction.commitTran(); + reopenStore(); - verify(_dtxRecordRecoveryHandler).dtxRecord(1l, globalId, branchId, enqueues, dequeues); + + DistributedTransactionHandler handler = mock(DistributedTransactionHandler.class); + _store.visitDistributedTransactions(handler); + verify(handler, times(1)).handle(format,globalId, branchId, enqueues, dequeues); transaction = _store.newTransaction(); transaction.removeXid(1l, globalId, branchId); transaction.commitTran(); reopenStore(); - verify(_dtxRecordRecoveryHandler, times(1)).dtxRecord(1l, globalId, branchId, enqueues, dequeues); + + handler = mock(DistributedTransactionHandler.class); + _store.visitDistributedTransactions(handler); + verify(handler, never()).handle(format,globalId, branchId, enqueues, dequeues); } - private void reopenStore() throws Exception + public void testVisitMessages() throws Exception { - _store.closeMessageStore(); + long messageId = 1; + int contentSize = 0; + final StoredMessage message = _store.addMessage(new TestMessageMetaData(messageId, contentSize)); + StoreFuture flushFuture = message.flushToStore(); + flushFuture.waitForCompletion(); - _store = createMessageStore(); - _store.openMessageStore(_parent, _storeSettings); - _store.recoverMessageStore(_messageStoreRecoveryHandler, _logRecoveryHandler); + MessageHandler handler = mock(MessageHandler.class); + _store.visitMessages(handler); + + verify(handler, times(1)).handle(argThat(new MessageMetaDataMatcher(messageId))); + + } + + public void testVisitMessagesAborted() throws Exception + { + int contentSize = 0; + for (int i = 0; i < 3; i++) + { + final StoredMessage message = _store.addMessage(new TestMessageMetaData(i + 1, contentSize)); + StoreFuture flushFuture = message.flushToStore(); + flushFuture.waitForCompletion(); + } + + MessageHandler handler = mock(MessageHandler.class); + when(handler.handle(any(StoredMessage.class))).thenReturn(true, false); + + _store.visitMessages(handler); + + verify(handler, times(2)).handle(any(StoredMessage.class)); + } + + public void testReopenedMessageStoreUsesLastMessageId() throws Exception + { + int contentSize = 0; + for (int i = 0; i < 3; i++) + { + final StoredMessage message = _store.addMessage(new TestMessageMetaData(i + 1, contentSize)); + StoreFuture flushFuture = message.flushToStore(); + flushFuture.waitForCompletion(); + } + + reopenStore(); + + final StoredMessage message = _store.addMessage(new TestMessageMetaData(4, contentSize)); + + StoreFuture flushFuture = message.flushToStore(); + flushFuture.waitForCompletion(); + + assertEquals("Unexpected message id", 4, message.getMessageNumber()); + } + + public void testVisitMessageInstances() throws Exception + { + long messageId = 1; + int contentSize = 0; + final StoredMessage message = _store.addMessage(new TestMessageMetaData(messageId, contentSize)); + StoreFuture flushFuture = message.flushToStore(); + flushFuture.waitForCompletion(); + + EnqueueableMessage enqueueableMessage = createMockEnqueueableMessage(messageId, message); + + UUID queueId = UUID.randomUUID(); + TransactionLogResource queue = createTransactionLogResource(queueId); + + Transaction transaction = _store.newTransaction(); + transaction.enqueueMessage(queue, enqueueableMessage); + transaction.commitTran(); + + MessageInstanceHandler handler = mock(MessageInstanceHandler.class); + _store.visitMessageInstances(handler); + + verify(handler, times(1)).handle(queueId, messageId); + } + + public void testVisitDistributedTransactions() throws Exception + { + long format = 1l; + byte[] branchId = new byte[] { 2 }; + byte[] globalId = new byte[] { 1 }; + Record enqueueRecord = getTestRecord(1); + Record dequeueRecord = getTestRecord(2); + Record[] enqueues = { enqueueRecord }; + Record[] dequeues = { dequeueRecord }; + + Transaction transaction = _store.newTransaction(); + transaction.recordXid(format, globalId, branchId, enqueues, dequeues); + transaction.commitTran(); + + DistributedTransactionHandler handler = mock(DistributedTransactionHandler.class); + _store.visitDistributedTransactions(handler); + + verify(handler, times(1)).handle(format,globalId, branchId, enqueues, dequeues); + + } + + public void testCommitTransaction() throws Exception + { + final UUID mockQueueId = UUIDGenerator.generateRandomUUID(); + TransactionLogResource mockQueue = createTransactionLogResource(mockQueueId); + + Transaction txn = getStore().newTransaction(); + + long messageId1 = 1L; + long messageId2 = 5L; + final EnqueueableMessage enqueueableMessage1 = createEnqueueableMessage(messageId1); + final EnqueueableMessage enqueueableMessage2 = createEnqueueableMessage(messageId2); + + txn.enqueueMessage(mockQueue, enqueueableMessage1); + txn.enqueueMessage(mockQueue, enqueueableMessage2); + txn.commitTran(); + + QueueFilteringMessageInstanceHandler filter = new QueueFilteringMessageInstanceHandler(mockQueueId); + getStore().visitMessageInstances(filter); + Set enqueuedIds = filter.getEnqueuedIds(); + + assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size()); + assertTrue("Message with id " + messageId1 + " is not found", enqueuedIds.contains(messageId1)); + assertTrue("Message with id " + messageId2 + " is not found", enqueuedIds.contains(messageId2)); } + + public void testRollbackTransactionBeforeCommit() throws Exception + { + final UUID mockQueueId = UUIDGenerator.generateRandomUUID(); + TransactionLogResource mockQueue = createTransactionLogResource(mockQueueId); + + long messageId1 = 21L; + long messageId2 = 22L; + long messageId3 = 23L; + final EnqueueableMessage enqueueableMessage1 = createEnqueueableMessage(messageId1); + final EnqueueableMessage enqueueableMessage2 = createEnqueueableMessage(messageId2); + final EnqueueableMessage enqueueableMessage3 = createEnqueueableMessage(messageId3); + + Transaction txn = getStore().newTransaction(); + + txn.enqueueMessage(mockQueue, enqueueableMessage1); + txn.abortTran(); + + txn = getStore().newTransaction(); + txn.enqueueMessage(mockQueue, enqueueableMessage2); + txn.enqueueMessage(mockQueue, enqueueableMessage3); + txn.commitTran(); + + QueueFilteringMessageInstanceHandler filter = new QueueFilteringMessageInstanceHandler(mockQueueId); + getStore().visitMessageInstances(filter); + Set enqueuedIds = filter.getEnqueuedIds(); + + assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size()); + assertTrue("Message with id " + messageId2 + " is not found", enqueuedIds.contains(messageId2)); + assertTrue("Message with id " + messageId3 + " is not found", enqueuedIds.contains(messageId3)); + } + + public void testRollbackTransactionAfterCommit() throws Exception + { + final UUID mockQueueId = UUIDGenerator.generateRandomUUID(); + TransactionLogResource mockQueue = createTransactionLogResource(mockQueueId); + + long messageId1 = 30L; + long messageId2 = 31L; + long messageId3 = 32L; + + final EnqueueableMessage enqueueableMessage1 = createEnqueueableMessage(messageId1); + final EnqueueableMessage enqueueableMessage2 = createEnqueueableMessage(messageId2); + final EnqueueableMessage enqueueableMessage3 = createEnqueueableMessage(messageId3); + + Transaction txn = getStore().newTransaction(); + + txn.enqueueMessage(mockQueue, enqueueableMessage1); + txn.commitTran(); + + txn = getStore().newTransaction(); + txn.enqueueMessage(mockQueue, enqueueableMessage2); + txn.abortTran(); + + txn = getStore().newTransaction(); + txn.enqueueMessage(mockQueue, enqueueableMessage3); + txn.commitTran(); + + QueueFilteringMessageInstanceHandler filter = new QueueFilteringMessageInstanceHandler(mockQueueId); + getStore().visitMessageInstances(filter); + Set enqueuedIds = filter.getEnqueuedIds(); + + assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size()); + assertTrue("Message with id " + messageId1 + " is not found", enqueuedIds.contains(messageId1)); + assertTrue("Message with id " + messageId3 + " is not found", enqueuedIds.contains(messageId3)); + } + + public void testStoreIgnoresTransientMessage() throws Exception + { + long messageId = 1; + int contentSize = 0; + final StoredMessage message = _store.addMessage(new TestMessageMetaData(messageId, contentSize, false)); + StoreFuture flushFuture = message.flushToStore(); + flushFuture.waitForCompletion(); + + MessageHandler handler = mock(MessageHandler.class); + _store.visitMessages(handler); + + verify(handler, times(0)).handle(argThat(new MessageMetaDataMatcher(messageId))); + } + + public void testAddAndRemoveMessageWithoutContent() throws Exception + { + long messageId = 1; + int contentSize = 0; + final StoredMessage message = _store.addMessage(new TestMessageMetaData(messageId, contentSize)); + StoreFuture flushFuture = message.flushToStore(); + flushFuture.waitForCompletion(); + + final AtomicReference> retrievedMessageRef = new AtomicReference>(); + _store.visitMessages(new MessageHandler() + { + + @Override + public boolean handle(StoredMessage storedMessage) + { + retrievedMessageRef.set(storedMessage); + return true; + } + }); + + StoredMessage retrievedMessage = retrievedMessageRef.get(); + assertNotNull("Message was not found", retrievedMessageRef); + assertEquals("Unexpected retreived message", message.getMessageNumber(), retrievedMessage.getMessageNumber()); + + retrievedMessage.remove(); + + retrievedMessageRef.set(null); + _store.visitMessages(new MessageHandler() + { + + @Override + public boolean handle(StoredMessage storedMessage) + { + retrievedMessageRef.set(storedMessage); + return true; + } + }); + assertNull(retrievedMessageRef.get()); + } + + + private TransactionLogResource createTransactionLogResource(UUID queueId) + { + TransactionLogResource queue = mock(TransactionLogResource.class); + when(queue.getId()).thenReturn(queueId); + when(queue.getName()).thenReturn("testQueue"); + when(queue.isDurable()).thenReturn(true); + return queue; + } + + private EnqueueableMessage createMockEnqueueableMessage(long messageId, final StoredMessage message) + { + EnqueueableMessage enqueueableMessage = mock(EnqueueableMessage.class); + when(enqueueableMessage.isPersistent()).thenReturn(true); + when(enqueueableMessage.getMessageNumber()).thenReturn(messageId); + when(enqueueableMessage.getStoredMessage()).thenReturn(message); + return enqueueableMessage; + } + private Record getTestRecord(long messageNumber) { UUID queueId1 = UUIDGenerator.generateRandomUUID(); @@ -121,77 +382,66 @@ public abstract class MessageStoreTestCase extends QpidTestCase EnqueueableMessage message1 = mock(EnqueueableMessage.class); when(message1.isPersistent()).thenReturn(true); when(message1.getMessageNumber()).thenReturn(messageNumber); - final StoredMessage storedMessage = mock(StoredMessage.class); + final StoredMessage storedMessage = mock(StoredMessage.class); when(storedMessage.getMessageNumber()).thenReturn(messageNumber); when(message1.getStoredMessage()).thenReturn(storedMessage); Record enqueueRecord = new TestRecord(queue1, message1); return enqueueRecord; } - private static class TestRecord implements Record + private EnqueueableMessage createEnqueueableMessage(long messageId1) { - private TransactionLogResource _queue; - private EnqueueableMessage _message; + final StoredMessage message1 = _store.addMessage(new TestMessageMetaData(messageId1, 0)); + StoreFuture flushFuture = message1.flushToStore(); + flushFuture.waitForCompletion(); + EnqueueableMessage enqueueableMessage1 = createMockEnqueueableMessage(messageId1, message1); + return enqueueableMessage1; + } - public TestRecord(TransactionLogResource queue, EnqueueableMessage message) + private class MessageMetaDataMatcher extends ArgumentMatcher> + { + private long _messageNumber; + + public MessageMetaDataMatcher(long messageNumber) { super(); - _queue = queue; - _message = message; + _messageNumber = messageNumber; } - @Override - public TransactionLogResource getResource() + public boolean matches(Object obj) { - return _queue; + return obj instanceof StoredMessage && ((StoredMessage)obj).getMessageNumber() == _messageNumber; } + } - @Override - public EnqueueableMessage getMessage() - { - return _message; - } + private class QueueFilteringMessageInstanceHandler implements MessageInstanceHandler + { + private final UUID _queueId; + private final Set _enqueuedIds = new HashSet(); - @Override - public int hashCode() + public QueueFilteringMessageInstanceHandler(UUID queueId) { - final int prime = 31; - int result = 1; - result = prime * result + ((_message == null) ? 0 : new Long(_message.getMessageNumber()).hashCode()); - result = prime * result + ((_queue == null) ? 0 : _queue.getId().hashCode()); - return result; + _queueId = queueId; } @Override - public boolean equals(Object obj) + public boolean handle(UUID queueId, long messageId) { - if (this == obj) - { - return true; - } - if (obj == null) - { - return false; - } - if (!(obj instanceof Record)) + if (queueId.equals(_queueId)) { - return false; + if (_enqueuedIds.contains(messageId)) + { + fail("Queue with id " + _queueId + " contains duplicate message ids"); + } + _enqueuedIds.add(messageId); } - Record other = (Record) obj; - if (_message == null && other.getMessage() != null) - { - return false; - } - if (_queue == null && other.getResource() != null) - { - return false; - } - if (_message.getMessageNumber() != other.getMessage().getMessageNumber()) - { - return false; - } - return _queue.getId().equals(other.getResource().getId()); + return true; } + public Set getEnqueuedIds() + { + return _enqueuedIds; + } } + } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java index 32df355c07..bfa4e1d52e 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java @@ -20,15 +20,30 @@ */ package org.apache.qpid.server.store; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.qpid.server.store.handler.MessageHandler; + /** A simple message store that stores the messages in a thread-safe structure in memory. */ public class TestMemoryMessageStore extends AbstractMemoryMessageStore { public static final String TYPE = "TestMemory"; - @Override - public String getStoreType() + public int getMessageCount() { - return TYPE; + final AtomicInteger counter = new AtomicInteger(); + visitMessages(new MessageHandler() + { + + @Override + public boolean handle(StoredMessage storedMessage) + { + counter.incrementAndGet(); + return true; + } + }); + return counter.get(); } + } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java index e14b41b221..6e55b468a6 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java @@ -34,13 +34,20 @@ public class TestMessageMetaData implements StorableMessageMetaData private static final TestMessageMetaDataType TYPE = new TestMessageMetaDataType(); - private int _contentSize; - private long _messageId; + private final int _contentSize; + private final long _messageId; + private final boolean _persistent; public TestMessageMetaData(long messageId, int contentSize) + { + this(messageId, contentSize, true); + } + + public TestMessageMetaData(long messageId, int contentSize, boolean persistent) { _contentSize = contentSize; _messageId = messageId; + _persistent = persistent; } @Override @@ -59,7 +66,7 @@ public class TestMessageMetaData implements StorableMessageMetaData } @Override - public MessageMetaDataType getType() + public MessageMetaDataType getType() { return TYPE; } @@ -67,7 +74,7 @@ public class TestMessageMetaData implements StorableMessageMetaData @Override public boolean isPersistent() { - return true; + return _persistent; } @Override diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java index 5622383f3f..e5c94cf66b 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java @@ -70,7 +70,22 @@ public class TestMessageMetaDataType implements MessageMetaDataType { - private StoredMessage _storedMsg; + private final StoredMessage _storedMsg; + + private final MessageReference _messageReference = new MessageReference() + { + + @Override + public ServerMessage getMessage() + { + return TestServerMessage.this; + } + + @Override + public void release() + { + } + }; public TestServerMessage(StoredMessage storedMsg) { @@ -115,7 +130,7 @@ public class TestMessageMetaDataType implements MessageMetaDataType _messages = new HashMap(); - private final AtomicInteger _messageCount = new AtomicInteger(0); - - @Override - public StoredMessage addMessage(StorableMessageMetaData metaData) - { - return new TestableStoredMessage(super.addMessage(metaData)); - } - - public int getMessageCount() - { - return _messageCount.get(); - } - - public Map getMessages() - { - return _messages; - } - - private class TestableTransaction implements Transaction - { - @Override - public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) - { - getMessages().put(message.getMessageNumber(), (AMQQueue)queue); - } - - @Override - public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message) - { - getMessages().remove(message.getMessageNumber()); - } - - @Override - public void commitTran() - { - } - - @Override - public StoreFuture commitTranAsync() - { - return StoreFuture.IMMEDIATE_FUTURE; - } - - public void abortTran() - { - } - - public void removeXid(long format, byte[] globalId, byte[] branchId) - { - } - - public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues) - { - } - } - - - @Override - public Transaction newTransaction() - { - return new TestableTransaction(); - } - - - private class TestableStoredMessage implements StoredMessage - { - private final StoredMessage _storedMessage; - - public TestableStoredMessage(StoredMessage storedMessage) - { - _messageCount.incrementAndGet(); - _storedMessage = storedMessage; - } - - public StorableMessageMetaData getMetaData() - { - return _storedMessage.getMetaData(); - } - - public long getMessageNumber() - { - return _storedMessage.getMessageNumber(); - } - - public void addContent(int offsetInMessage, ByteBuffer src) - { - _storedMessage.addContent(offsetInMessage, src); - } - - public int getContent(int offsetInMessage, ByteBuffer dst) - { - return _storedMessage.getContent(offsetInMessage, dst); - } - - - public ByteBuffer getContent(int offsetInMessage, int size) - { - return _storedMessage.getContent(offsetInMessage, size); - } - - public StoreFuture flushToStore() - { - return _storedMessage.flushToStore(); - } - - public void remove() - { - _storedMessage.remove(); - _messageCount.decrementAndGet(); - } - } -} diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java deleted file mode 100644 index ba9b7c155e..0000000000 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.server.store; - -import java.util.Map; - -import org.apache.qpid.server.plugin.MessageStoreFactory; - -public class TestableMemoryMessageStoreFactory implements MessageStoreFactory -{ - @Override - public String getType() - { - return TestableMemoryMessageStore.TYPE; - } - - @Override - public MessageStore createMessageStore() - { - return new TestableMemoryMessageStore(); - } - - @Override - public void validateAttributes(Map attributes) - { - } - -} diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java index ab18c8f41d..da868a01f1 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java @@ -33,14 +33,14 @@ import org.apache.qpid.server.util.ServerScopedRuntimeException; * Mock implementation of a (Store) Transaction allow its state to be observed. * Also provide a factory method to produce TestTransactionLog objects suitable * for unit test use. - * + * */ class MockStoreTransaction implements Transaction { enum TransactionState {NOT_STARTED, STARTED, COMMITTED, ABORTED}; private TransactionState _state = TransactionState.NOT_STARTED; - + private int _numberOfEnqueuedMessages = 0; private int _numberOfDequeuedMessages = 0; private boolean _throwExceptionOnQueueOp; @@ -52,7 +52,7 @@ class MockStoreTransaction implements Transaction public void setState(TransactionState state) { - _state = state; + _state = state; } public TransactionState getState() @@ -64,10 +64,10 @@ class MockStoreTransaction implements Transaction { if (_throwExceptionOnQueueOp) { - + throw new ServerScopedRuntimeException("Mocked exception"); } - + _numberOfEnqueuedMessages++; } @@ -87,7 +87,7 @@ class MockStoreTransaction implements Transaction { throw new ServerScopedRuntimeException("Mocked exception"); } - + _numberOfDequeuedMessages++; } @@ -124,12 +124,6 @@ class MockStoreTransaction implements Transaction storeTransaction.setState(TransactionState.STARTED); return storeTransaction; } - - @Override - public String getStoreType() - { - return "TEST"; - } }; } } \ No newline at end of file diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java index fd56f3fa1c..1b131a18e1 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java @@ -44,7 +44,6 @@ import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TestMemoryMessageStore; -import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.plugin.PluggableFactoryLoader; @@ -110,7 +109,7 @@ public class BrokerTestHelper when(virtualHost.getAttribute(org.apache.qpid.server.model.VirtualHost.TYPE)).thenReturn(StandardVirtualHostFactory.TYPE); Map messageStoreSettings = new HashMap(); - messageStoreSettings.put(MessageStore.STORE_TYPE, TestableMemoryMessageStore.TYPE); + messageStoreSettings.put(MessageStore.STORE_TYPE, TestMemoryMessageStore.TYPE); when(virtualHost.getMessageStoreSettings()).thenReturn(messageStoreSettings); when(virtualHost.getName()).thenReturn(name); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MessageStoreRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MessageStoreRecovererTest.java new file mode 100644 index 0000000000..ce5616b9ca --- /dev/null +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MessageStoreRecovererTest.java @@ -0,0 +1,414 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isNull; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.UUID; + +import junit.framework.TestCase; + +import org.apache.qpid.server.logging.EventLogger; +import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; +import org.apache.qpid.server.message.EnqueueableMessage; +import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.NullMessageStore; +import org.apache.qpid.server.store.StorableMessageMetaData; +import org.apache.qpid.server.store.StoreException; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.store.TestMessageMetaData; +import org.apache.qpid.server.store.Transaction; +import org.apache.qpid.server.store.Transaction.Record; +import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.server.store.handler.DistributedTransactionHandler; +import org.apache.qpid.server.store.handler.MessageHandler; +import org.apache.qpid.server.store.handler.MessageInstanceHandler; +import org.apache.qpid.server.txn.DtxBranch; +import org.apache.qpid.server.txn.DtxRegistry; +import org.apache.qpid.server.util.Action; +import org.apache.qpid.transport.Xid; +import org.mockito.ArgumentMatcher; + +public class MessageStoreRecovererTest extends TestCase +{ + private VirtualHost _virtualHost; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + + _virtualHost = mock(VirtualHost.class); + when(_virtualHost.getEventLogger()).thenReturn(new EventLogger()); + + } + + @SuppressWarnings("unchecked") + public void testRecoveryOfSingleMessageOnSingleQueue() + { + final AMQQueue queue = createRegisteredMockQueue(); + + final long messageId = 1; + final StoredMessage storedMessage = createMockStoredMessage(messageId); + + MessageStore store = new NullMessageStore() + { + @Override + public void visitMessages(MessageHandler handler) throws StoreException + { + handler.handle(storedMessage); + } + + @Override + public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException + { + handler.handle(queue.getId(), messageId); + } + }; + + when(_virtualHost.getMessageStore()).thenReturn(store); + + MessageStoreRecoverer recoverer = new MessageStoreRecoverer(_virtualHost, mock(MessageStoreLogSubject.class)); + recoverer.recover(); + + ServerMessage message = storedMessage.getMetaData().getType().createMessage(storedMessage); + verify(queue, times(1)).enqueue(eq(message), (Action)isNull()); + } + + @SuppressWarnings("unchecked") + public void testRecoveryOfMessageInstanceForNonExistingMessage() + { + final AMQQueue queue = createRegisteredMockQueue(); + + final long messageId = 1; + final Transaction transaction = mock(Transaction.class); + + MessageStore store = new NullMessageStore() + { + @Override + public void visitMessages(MessageHandler handler) throws StoreException + { + // no message to visit + } + + @Override + public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException + { + handler.handle(queue.getId(), messageId); + } + + @Override + public Transaction newTransaction() + { + return transaction; + } + }; + + when(_virtualHost.getMessageStore()).thenReturn(store); + + MessageStoreRecoverer recoverer = new MessageStoreRecoverer(_virtualHost, mock(MessageStoreLogSubject.class)); + recoverer.recover(); + + verify(queue, never()).enqueue(any(ServerMessage.class), any(Action.class)); + verify(transaction).dequeueMessage(same(queue), argThat(new MessageNumberMatcher(messageId))); + verify(transaction, times(1)).commitTranAsync(); + } + + public void testRecoveryOfMessageInstanceForNonExistingQueue() + { + final UUID queueId = UUID.randomUUID(); + final Transaction transaction = mock(Transaction.class); + final long messageId = 1; + final StoredMessage storedMessage = createMockStoredMessage(messageId); + + MessageStore store = new NullMessageStore() + { + @Override + public void visitMessages(MessageHandler handler) throws StoreException + { + handler.handle(storedMessage); + } + + @Override + public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException + { + handler.handle(queueId, messageId); + } + + @Override + public Transaction newTransaction() + { + return transaction; + } + }; + + when(_virtualHost.getMessageStore()).thenReturn(store); + + MessageStoreRecoverer recoverer = new MessageStoreRecoverer(_virtualHost, mock(MessageStoreLogSubject.class)); + recoverer.recover(); + + verify(transaction).dequeueMessage(argThat(new QueueIdMatcher(queueId)), argThat(new MessageNumberMatcher(messageId))); + verify(transaction, times(1)).commitTranAsync(); + } + + public void testRecoveryDeletesOrphanMessages() + { + + final long messageId = 1; + final StoredMessage storedMessage = createMockStoredMessage(messageId); + + MessageStore store = new NullMessageStore() + { + @Override + public void visitMessages(MessageHandler handler) throws StoreException + { + handler.handle(storedMessage); + } + + @Override + public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException + { + // No messages instances + } + }; + + when(_virtualHost.getMessageStore()).thenReturn(store); + + MessageStoreRecoverer recoverer = new MessageStoreRecoverer(_virtualHost, mock(MessageStoreLogSubject.class)); + recoverer.recover(); + + verify(storedMessage, times(1)).remove(); + } + + @SuppressWarnings("unchecked") + public void testRecoveryOfSingleEnqueueWithDistributedTransaction() + { + AMQQueue queue = createRegisteredMockQueue(); + + final Transaction transaction = mock(Transaction.class); + + final StoredMessage storedMessage = createMockStoredMessage(1); + long messageId = storedMessage.getMessageNumber(); + + EnqueueableMessage enqueueableMessage = createMockEnqueueableMessage(messageId, storedMessage); + Record enqueueRecord = createMockRecord(queue, enqueueableMessage); + + final long format = 1; + final byte[] globalId = new byte[] {0}; + final byte[] branchId = new byte[] {0}; + final Record[] enqueues = { enqueueRecord }; + final Record[] dequeues = {}; + + MessageStore store = new NullMessageStore() + { + @Override + public void visitMessages(MessageHandler handler) throws StoreException + { + handler.handle(storedMessage); + } + + @Override + public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException + { + // No messages instances + } + + @Override + public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException + { + handler.handle(format, globalId, branchId, enqueues, dequeues); + } + + @Override + public Transaction newTransaction() + { + return transaction; + } + }; + + DtxRegistry dtxRegistry = new DtxRegistry(); + + when(_virtualHost.getMessageStore()).thenReturn(store); + when(_virtualHost.getDtxRegistry()).thenReturn(dtxRegistry); + + MessageStoreRecoverer recoverer = new MessageStoreRecoverer(_virtualHost, mock(MessageStoreLogSubject.class)); + recoverer.recover(); + + DtxBranch branch = dtxRegistry.getBranch(new Xid(format, globalId, branchId)); + assertNotNull("Expected dtx branch to be created", branch); + branch.commit(); + + ServerMessage message = storedMessage.getMetaData().getType().createMessage(storedMessage); + verify(queue, times(1)).enqueue(eq(message), (Action)isNull()); + verify(transaction).commitTran(); + } + + public void testRecoveryOfSingleDequeueWithDistributedTransaction() + { + final AMQQueue queue = createRegisteredMockQueue(); + + + final Transaction transaction = mock(Transaction.class); + + final StoredMessage storedMessage = createMockStoredMessage(1); + final long messageId = storedMessage.getMessageNumber(); + + EnqueueableMessage enqueueableMessage = createMockEnqueueableMessage(messageId, storedMessage); + Record dequeueRecord = createMockRecord(queue, enqueueableMessage); + + QueueEntry queueEntry = mock(QueueEntry.class); + when(queue.getMessageOnTheQueue(messageId)).thenReturn(queueEntry); + + final long format = 1; + final byte[] globalId = new byte[] {0}; + final byte[] branchId = new byte[] {0}; + final Record[] enqueues = {}; + final Record[] dequeues = { dequeueRecord }; + + MessageStore store = new NullMessageStore() + { + @Override + public void visitMessages(MessageHandler handler) throws StoreException + { + handler.handle(storedMessage); + } + + @Override + public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException + { + // We need the message to be enqueued onto the queue so that later the distributed transaction + // can dequeue it. + handler.handle(queue.getId(), messageId); + } + + @Override + public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException + { + handler.handle(format, globalId, branchId, enqueues, dequeues); + } + + @Override + public Transaction newTransaction() + { + return transaction; + } + }; + + DtxRegistry dtxRegistry = new DtxRegistry(); + + when(_virtualHost.getMessageStore()).thenReturn(store); + when(_virtualHost.getDtxRegistry()).thenReturn(dtxRegistry); + + MessageStoreRecoverer recoverer = new MessageStoreRecoverer(_virtualHost, mock(MessageStoreLogSubject.class)); + recoverer.recover(); + + DtxBranch branch = dtxRegistry.getBranch(new Xid(format, globalId, branchId)); + assertNotNull("Expected dtx branch to be created", branch); + branch.commit(); + + verify(queueEntry, times(1)).delete(); + verify(transaction).commitTran(); + } + + + protected Record createMockRecord(AMQQueue queue, EnqueueableMessage enqueueableMessage) + { + Record enqueueRecord = mock(Record.class); + when(enqueueRecord.getMessage()).thenReturn(enqueueableMessage); + when(enqueueRecord.getResource()).thenReturn(queue); + return enqueueRecord; + } + + protected EnqueueableMessage createMockEnqueueableMessage(long messageId, + final StoredMessage storedMessage) + { + EnqueueableMessage enqueueableMessage = mock(EnqueueableMessage.class); + when(enqueueableMessage.getMessageNumber()).thenReturn(messageId); + when(enqueueableMessage.getStoredMessage()).thenReturn(storedMessage); + return enqueueableMessage; + } + + private StoredMessage createMockStoredMessage(final long messageId) + { + TestMessageMetaData metaData = new TestMessageMetaData(messageId, 0); + + @SuppressWarnings("unchecked") + final StoredMessage storedMessage = mock(StoredMessage.class); + when(storedMessage.getMessageNumber()).thenReturn(messageId); + when(storedMessage.getMetaData()).thenReturn(metaData); + return storedMessage; + } + + private AMQQueue createRegisteredMockQueue() + { + AMQQueue queue = mock(AMQQueue.class); + final UUID queueId = UUID.randomUUID(); + when(queue.getId()).thenReturn(queueId); + when(queue.getName()).thenReturn("test-queue"); + when(_virtualHost.getQueue(queueId)).thenReturn(queue); + return queue; + } + + + private final class QueueIdMatcher extends ArgumentMatcher + { + private UUID _queueId; + public QueueIdMatcher(UUID queueId) + { + _queueId = queueId; + } + + @Override + public boolean matches(Object argument) + { + return argument instanceof TransactionLogResource && _queueId.equals( ((TransactionLogResource)argument).getId() ); + } + } + + private final class MessageNumberMatcher extends ArgumentMatcher + { + private final long _messageId; + + private MessageNumberMatcher(long messageId) + { + _messageId = messageId; + } + + @Override + public boolean matches(Object argument) + { + return argument instanceof EnqueueableMessage && ((EnqueueableMessage)argument).getMessageNumber() == _messageId; + } + } +} diff --git a/qpid/java/broker-core/src/test/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory b/qpid/java/broker-core/src/test/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory index 48241614d8..9512fb8117 100644 --- a/qpid/java/broker-core/src/test/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory +++ b/qpid/java/broker-core/src/test/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory @@ -17,4 +17,3 @@ # under the License. # org.apache.qpid.server.store.TestMemoryMessageStoreFactory -org.apache.qpid.server.store.TestableMemoryMessageStoreFactory \ No newline at end of file diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java index e5cfced4e2..6b697f8221 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java @@ -32,7 +32,7 @@ import org.apache.qpid.server.flow.Pre0_10CreditManager; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.store.TestMemoryMessageStore; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.BrokerTestHelper; @@ -53,7 +53,7 @@ public class AckTest extends QpidTestCase private AMQProtocolSession _protocolSession; - private TestableMemoryMessageStore _messageStore; + private TestMemoryMessageStore _messageStore; private AMQChannel _channel; @@ -71,7 +71,7 @@ public class AckTest extends QpidTestCase _protocolSession = _channel.getProtocolSession(); _virtualHost = _protocolSession.getVirtualHost(); _queue = BrokerTestHelper.createQueue(getTestName(), _virtualHost); - _messageStore = (TestableMemoryMessageStore)_virtualHost.getMessageStore(); + _messageStore = (TestMemoryMessageStore)_virtualHost.getMessageStore(); } @Override diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java index 9e551c9c01..399564f5a4 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java @@ -20,13 +20,11 @@ */ package org.apache.qpid.server.protocol.v0_8; - -import org.apache.qpid.AMQException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.store.TestMemoryMessageStore; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; @@ -175,7 +173,7 @@ public class AcknowledgeTest extends QpidTestCase private void checkStoreContents(int messageCount) { - assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getMessageCount()); + assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestMemoryMessageStore) _messageStore).getMessageCount()); } } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java index e213aa8a20..520e35fe39 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java @@ -27,7 +27,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.store.TestMemoryMessageStore; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; @@ -133,7 +133,7 @@ public class QueueBrowserUsesNoAckTest extends QpidTestCase private void checkStoreContents(int messageCount) { - assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getMessageCount()); + assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestMemoryMessageStore) _messageStore).getMessageCount()); } private AMQShortString browse(AMQChannel channel, AMQQueue queue) throws Exception diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java index 227e9794da..e9c37e7b42 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java @@ -27,7 +27,7 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.store.TestMemoryMessageStore; import org.apache.qpid.test.utils.QpidTestCase; /** @@ -35,12 +35,12 @@ import org.apache.qpid.test.utils.QpidTestCase; */ public class ReferenceCountingTest extends QpidTestCase { - private TestableMemoryMessageStore _store; + private TestMemoryMessageStore _store; protected void setUp() throws Exception { - _store = new TestableMemoryMessageStore(); + _store = new TestMemoryMessageStore(); } /** @@ -83,7 +83,7 @@ public class ReferenceCountingTest extends QpidTestCase MessageMetaData mmd = new MessageMetaData(info, chb); StoredMessage storedMessage = _store.addMessage(mmd); - + storedMessage.flushToStore(); AMQMessage message = new AMQMessage(storedMessage); @@ -141,6 +141,7 @@ public class ReferenceCountingTest extends QpidTestCase MessageMetaData mmd = new MessageMetaData(info, chb); StoredMessage storedMessage = _store.addMessage(mmd); + storedMessage.flushToStore(); AMQMessage message = new AMQMessage(storedMessage); diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java index 9202672ea6..d682076350 100644 --- a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java +++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java @@ -416,12 +416,6 @@ public class DerbyMessageStore extends AbstractJDBCMessageStore implements Messa return _persistentSizeHighThreshold; } - @Override - public String getStoreType() - { - return TYPE; - } - @Override public void onDelete() { diff --git a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java index 20de4ea339..9a2d945494 100644 --- a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java +++ b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java @@ -70,10 +70,13 @@ public class DerbyMessageStoreTest extends MessageStoreTestCase private void deleteStoreIfExists() { - File location = new File(_storeLocation); - if (location.exists()) + if (_storeLocation != null) { - FileUtils.delete(location, true); + File location = new File(_storeLocation); + if (location.exists()) + { + FileUtils.delete(location, true); + } } } diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java index 4ca9cb2395..509184d243 100644 --- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java @@ -337,12 +337,6 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag return _connectionURL; } - @Override - public String getStoreType() - { - return TYPE; - } - @Override protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException { diff --git a/qpid/java/broker-plugins/memory-store/pom.xml b/qpid/java/broker-plugins/memory-store/pom.xml index b71574384e..8bec7ef981 100644 --- a/qpid/java/broker-plugins/memory-store/pom.xml +++ b/qpid/java/broker-plugins/memory-store/pom.xml @@ -36,6 +36,22 @@ ${project.version} provided + + + + org.apache.qpid + qpid-test-utils + ${project.version} + test + + + + org.apache.qpid + qpid-broker-core + ${project.version} + test-jar + test + diff --git a/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index 61fef91e83..c8dd2e6e61 100644 --- a/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -26,9 +26,4 @@ public class MemoryMessageStore extends AbstractMemoryMessageStore { public static final String TYPE = "Memory"; - @Override - public String getStoreType() - { - return TYPE; - } } diff --git a/qpid/java/broker-plugins/memory-store/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java b/qpid/java/broker-plugins/memory-store/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java new file mode 100644 index 0000000000..8fd3cbb1fe --- /dev/null +++ b/qpid/java/broker-plugins/memory-store/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.util.Collections; +import java.util.Map; + +public class MemoryMessageStoreTest extends MessageStoreTestCase +{ + + @Override + protected Map getStoreSettings() throws Exception + { + return Collections.emptyMap(); + } + + @Override + protected MessageStore createMessageStore() + { + return new MemoryMessageStore(); + } + + @Override + protected void reopenStore() throws Exception + { + // cannot re-open memory message store as it is not persistent + } + +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java deleted file mode 100644 index d89f5cc66e..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java +++ /dev/null @@ -1,880 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store; - - -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.File; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; - -import org.apache.log4j.Logger; -import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl; -import org.apache.qpid.server.exchange.DirectExchange; -import org.apache.qpid.server.exchange.ExchangeImpl; -import org.apache.qpid.server.exchange.TopicExchange; -import org.apache.qpid.server.logging.EventLogger; -import org.apache.qpid.server.message.InstanceProperties; -import org.apache.qpid.server.message.MessageSource; -import org.apache.qpid.server.model.Binding; -import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.ExclusivityPolicy; -import org.apache.qpid.server.model.LifetimePolicy; -import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.plugin.ExchangeType; -import org.apache.qpid.server.protocol.v0_8.AMQMessage; -import org.apache.qpid.server.protocol.v0_8.MessageMetaData; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.ConflationQueue; -import org.apache.qpid.server.queue.PriorityQueue; -import org.apache.qpid.server.queue.StandardQueue; -import org.apache.qpid.server.txn.AutoCommitTransaction; -import org.apache.qpid.server.txn.ServerTransaction; -import org.apache.qpid.server.util.BrokerTestHelper; -import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -import org.apache.qpid.test.utils.QpidTestCase; -import org.apache.qpid.util.FileUtils; - -/** - * This tests the MessageStores by using the available interfaces. - * - * For persistent stores, it validates that Exchanges, Queues, Bindings and - * Messages are persisted and recovered correctly. - */ -public class MessageStoreTest extends QpidTestCase -{ - private static final Logger _logger = Logger.getLogger(MessageStoreTest.class); - - public static final int DEFAULT_PRIORTY_LEVEL = 5; - public static final String SELECTOR_VALUE = "Test = 'MST'"; - public static final String LVQ_KEY = "MST-LVQ-KEY"; - - private String nonDurableExchangeName = "MST-NonDurableDirectExchange"; - private String directExchangeName = "MST-DirectExchange"; - private String topicExchangeName = "MST-TopicExchange"; - - private String durablePriorityTopicQueueName = "MST-PriorityTopicQueue-Durable"; - private String durableTopicQueueName = "MST-TopicQueue-Durable"; - private String priorityTopicQueueName = "MST-PriorityTopicQueue"; - private String topicQueueName = "MST-TopicQueue"; - - private String durableExclusiveQueueName = "MST-Queue-Durable-Exclusive"; - private String durablePriorityQueueName = "MST-PriorityQueue-Durable"; - private String durableLastValueQueueName = "MST-LastValueQueue-Durable"; - private String durableQueueName = "MST-Queue-Durable"; - private String priorityQueueName = "MST-PriorityQueue"; - private String queueName = "MST-Queue"; - - private String directRouting = "MST-direct"; - private String topicRouting = "MST-topic"; - - private String queueOwner = "MST"; - - private VirtualHost _virtualHost; - private org.apache.qpid.server.model.VirtualHost _virtualHostModel; - private Broker _broker; - private String _storePath; - - public void setUp() throws Exception - { - super.setUp(); - BrokerTestHelper.setUp(); - - String hostName = getName(); - _storePath = System.getProperty("QPID_WORK", TMP_FOLDER + File.separator + getTestName()) + File.separator + hostName; - - Map messageStoreSettings = new HashMap(); - messageStoreSettings.put(MessageStore.STORE_PATH, _storePath); - messageStoreSettings.put(MessageStore.STORE_TYPE, getTestProfileMessageStoreType()); - - _virtualHostModel = mock(org.apache.qpid.server.model.VirtualHost.class); - when(_virtualHostModel.getMessageStoreSettings()).thenReturn(messageStoreSettings); - when(_virtualHostModel.getAttribute(eq(org.apache.qpid.server.model.VirtualHost.TYPE))).thenReturn(StandardVirtualHostFactory.TYPE); - when(_virtualHostModel.getAttribute(eq(org.apache.qpid.server.model.VirtualHost.NAME))).thenReturn(hostName); - when(_virtualHostModel.getType()).thenReturn(StandardVirtualHostFactory.TYPE); - when(_virtualHostModel.getName()).thenReturn(hostName); - - cleanup(new File(_storePath)); - - _broker = BrokerTestHelper.createBrokerMock(); - - reloadVirtualHost(); - } - - protected String getStorePath() - { - return _storePath; - } - - protected org.apache.qpid.server.model.VirtualHost getVirtualHostModel() - { - return _virtualHostModel; - } - - @Override - public void tearDown() throws Exception - { - try - { - if (_virtualHost != null) - { - _virtualHost.close(); - } - } - finally - { - BrokerTestHelper.tearDown(); - super.tearDown(); - } - } - - public VirtualHost getVirtualHost() - { - return _virtualHost; - } - - protected void reloadVirtualHost() - { - VirtualHost original = getVirtualHost(); - - if (getVirtualHost() != null) - { - try - { - getVirtualHost().close(); - } - catch (Exception e) - { - _logger.error("Error closing virtual host", e); - fail(e.getMessage()); - } - } - - try - { - _virtualHost = BrokerTestHelper.createVirtualHost(new VirtualHostRegistry(new EventLogger()), getVirtualHostModel()); - when(_virtualHostModel.getId()).thenReturn(_virtualHost.getId()); - - ConfiguredObjectRecord objectRecord = mock(ConfiguredObjectRecord.class); - when(objectRecord.getId()).thenReturn(_virtualHost.getId()); - when(objectRecord.getType()).thenReturn(org.apache.qpid.server.model.VirtualHost.class.getSimpleName()); - when(_virtualHostModel.asObjectRecord()).thenReturn(objectRecord); - } - catch (Exception e) - { - _logger.error("Error creating virtual host", e); - fail(e.getMessage()); - } - - assertTrue("Virtualhost has not changed, reload was not successful", original != getVirtualHost()); - } - - /** - * Old MessageStoreTest segment which runs against both persistent and non-persistent stores - * creating queues, exchanges and bindings and then verifying message delivery to them. - */ - public void testQueueExchangeAndBindingCreation() throws Exception - { - assertEquals("Should not be any existing queues", 0, getVirtualHost().getQueues().size()); - - createAllQueues(); - createAllTopicQueues(); - - //Register Non-Durable DirectExchange - ExchangeImpl nonDurableExchange = createExchange(DirectExchange.TYPE, nonDurableExchangeName, false); - bindAllQueuesToExchange(nonDurableExchange, directRouting); - - //Register DirectExchange - ExchangeImpl directExchange = createExchange(DirectExchange.TYPE, directExchangeName, true); - bindAllQueuesToExchange(directExchange, directRouting); - - //Register TopicExchange - ExchangeImpl topicExchange = createExchange(TopicExchange.TYPE, topicExchangeName, true); - bindAllTopicQueuesToExchange(topicExchange, topicRouting); - - //Send Message To NonDurable direct Exchange = persistent - sendMessageOnExchange(nonDurableExchange, directRouting, true); - // and non-persistent - sendMessageOnExchange(nonDurableExchange, directRouting, false); - - //Send Message To direct Exchange = persistent - sendMessageOnExchange(directExchange, directRouting, true); - // and non-persistent - sendMessageOnExchange(directExchange, directRouting, false); - - //Send Message To topic Exchange = persistent - sendMessageOnExchange(topicExchange, topicRouting, true); - // and non-persistent - sendMessageOnExchange(topicExchange, topicRouting, false); - - //Ensure all the Queues have four messages (one transient, one persistent) x 2 exchange routings - validateMessageOnQueues(4, true); - //Ensure all the topics have two messages (one transient, one persistent) - validateMessageOnTopics(2, true); - - assertEquals("Not all queues correctly registered", - 10, getVirtualHost().getQueues().size()); - } - - /** - * Tests message persistence by running the testQueueExchangeAndBindingCreation() method above - * before reloading the virtual host and ensuring that the persistent messages were restored. - * - * More specific testing of message persistence is left to store-specific unit testing. - */ - public void testMessagePersistence() throws Exception - { - testQueueExchangeAndBindingCreation(); - - reloadVirtualHost(); - - //Validate durable queues and subscriptions still have the persistent messages - validateMessageOnQueues(2, false); - validateMessageOnTopics(1, false); - } - - /** - * Tests message removal by running the testMessagePersistence() method above before - * clearing the queues, reloading the virtual host, and ensuring that the persistent - * messages were removed from the queues. - */ - public void testMessageRemoval() throws Exception - { - testMessagePersistence(); - - assertEquals("Incorrect number of queues registered after recovery", - 6, getVirtualHost().getQueues().size()); - - //clear the queue - _virtualHost.getQueue(durableQueueName).clearQueue(); - - //check the messages are gone - validateMessageOnQueue(durableQueueName, 0); - - //reload and verify messages arent restored - reloadVirtualHost(); - - validateMessageOnQueue(durableQueueName, 0); - } - - /** - * Tests queue persistence by creating a selection of queues with differing properties, both - * durable and non durable, and ensuring that following the recovery process the correct queues - * are present and any property manipulations (eg queue exclusivity) are correctly recovered. - */ - public void testQueuePersistence() throws Exception - { - assertEquals("Should not be any existing queues", - 0, getVirtualHost().getQueues().size()); - - //create durable and non durable queues/topics - createAllQueues(); - createAllTopicQueues(); - - //reload the virtual host, prompting recovery of the queues/topics - reloadVirtualHost(); - - assertEquals("Incorrect number of queues registered after recovery", - 6, getVirtualHost().getQueues().size()); - - //Validate the non-Durable Queues were not recovered. - assertNull("Non-Durable queue still registered:" + priorityQueueName, - getVirtualHost().getQueue(priorityQueueName)); - assertNull("Non-Durable queue still registered:" + queueName, - getVirtualHost().getQueue(queueName)); - assertNull("Non-Durable queue still registered:" + priorityTopicQueueName, - getVirtualHost().getQueue(priorityTopicQueueName)); - assertNull("Non-Durable queue still registered:" + topicQueueName, - getVirtualHost().getQueue(topicQueueName)); - - //Validate normally expected properties of Queues/Topics - validateDurableQueueProperties(); - - //Update the durable exclusive queue's exclusivity - setQueueExclusivity(false); - validateQueueExclusivityProperty(false); - } - - /** - * Tests queue removal by creating a durable queue, verifying it recovers, and - * then removing it from the store, and ensuring that following the second reload - * process it is not recovered. - */ - public void testDurableQueueRemoval() throws Exception - { - //Register Durable Queue - createQueue(durableQueueName, false, true, false, false); - - assertEquals("Incorrect number of queues registered before recovery", - 1, getVirtualHost().getQueues().size()); - - reloadVirtualHost(); - - assertEquals("Incorrect number of queues registered after first recovery", - 1, getVirtualHost().getQueues().size()); - - //test that removing the queue means it is not recovered next time - final AMQQueue queue = getVirtualHost().getQueue(durableQueueName); - DurableConfigurationStoreHelper.removeQueue(getVirtualHost().getDurableConfigurationStore(),queue); - - reloadVirtualHost(); - - assertEquals("Incorrect number of queues registered after second recovery", - 0, getVirtualHost().getQueues().size()); - assertNull("Durable queue was not removed:" + durableQueueName, - getVirtualHost().getQueue(durableQueueName)); - } - - /** - * Tests exchange persistence by creating a selection of exchanges, both durable - * and non durable, and ensuring that following the recovery process the correct - * durable exchanges are still present. - */ - public void testExchangePersistence() throws Exception - { - int origExchangeCount = getVirtualHost().getExchanges().size(); - - Map> oldExchanges = createExchanges(); - - assertEquals("Incorrect number of exchanges registered before recovery", - origExchangeCount + 3, getVirtualHost().getExchanges().size()); - - reloadVirtualHost(); - - //verify the exchanges present after recovery - validateExchanges(origExchangeCount, oldExchanges); - } - - /** - * Tests exchange removal by creating a durable exchange, verifying it recovers, and - * then removing it from the store, and ensuring that following the second reload - * process it is not recovered. - */ - public void testDurableExchangeRemoval() throws Exception - { - int origExchangeCount = getVirtualHost().getExchanges().size(); - - createExchange(DirectExchange.TYPE, directExchangeName, true); - - assertEquals("Incorrect number of exchanges registered before recovery", - origExchangeCount + 1, getVirtualHost().getExchanges().size()); - - reloadVirtualHost(); - - assertEquals("Incorrect number of exchanges registered after first recovery", - origExchangeCount + 1, getVirtualHost().getExchanges().size()); - - //test that removing the exchange means it is not recovered next time - final ExchangeImpl exchange = getVirtualHost().getExchange(directExchangeName); - DurableConfigurationStoreHelper.removeExchange(getVirtualHost().getDurableConfigurationStore(), exchange); - - reloadVirtualHost(); - - assertEquals("Incorrect number of exchanges registered after second recovery", - origExchangeCount, getVirtualHost().getExchanges().size()); - assertNull("Durable exchange was not removed:" + directExchangeName, - getVirtualHost().getExchange(directExchangeName)); - } - - /** - * Tests binding persistence by creating a selection of queues and exchanges, both durable - * and non durable, then adding bindings with and without selectors before reloading the - * virtual host and verifying that following the recovery process the correct durable - * bindings (those for durable queues to durable exchanges) are still present. - */ - public void testBindingPersistence() throws Exception - { - int origExchangeCount = getVirtualHost().getExchanges().size(); - - createAllQueues(); - createAllTopicQueues(); - - Map> exchanges = createExchanges(); - - ExchangeImpl nonDurableExchange = exchanges.get(nonDurableExchangeName); - ExchangeImpl directExchange = exchanges.get(directExchangeName); - ExchangeImpl topicExchange = exchanges.get(topicExchangeName); - - bindAllQueuesToExchange(nonDurableExchange, directRouting); - bindAllQueuesToExchange(directExchange, directRouting); - bindAllTopicQueuesToExchange(topicExchange, topicRouting); - - assertEquals("Incorrect number of exchanges registered before recovery", - origExchangeCount + 3, getVirtualHost().getExchanges().size()); - - reloadVirtualHost(); - - validateExchanges(origExchangeCount, exchanges); - - validateBindingProperties(); - } - - /** - * Tests binding removal by creating a durable exchange, and queue, binding them together, - * recovering to verify the persistence, then removing it from the store, and ensuring - * that following the second reload process it is not recovered. - */ - public void testDurableBindingRemoval() throws Exception - { - //create durable queue and exchange, bind them - ExchangeImpl exch = createExchange(DirectExchange.TYPE, directExchangeName, true); - createQueue(durableQueueName, false, true, false, false); - bindQueueToExchange(exch, directRouting, getVirtualHost().getQueue(durableQueueName), false); - - assertEquals("Incorrect number of bindings registered before recovery", - 1, getVirtualHost().getQueue(durableQueueName).getBindings().size()); - - //verify binding is actually normally recovered - reloadVirtualHost(); - - assertEquals("Incorrect number of bindings registered after first recovery", - 1, getVirtualHost().getQueue(durableQueueName).getBindings().size()); - - exch = getVirtualHost().getExchange(directExchangeName); - assertNotNull("Exchange was not recovered", exch); - - //remove the binding and verify result after recovery - unbindQueueFromExchange(exch, directRouting, getVirtualHost().getQueue(durableQueueName), false); - - reloadVirtualHost(); - - assertEquals("Incorrect number of bindings registered after second recovery", - 0, getVirtualHost().getQueue(durableQueueName).getBindings().size()); - } - - /** - * Validates that the durable exchanges are still present, the non durable exchange is not, - * and that the new exchanges are not the same objects as the provided list (i.e. that the - * reload actually generated new exchange objects) - */ - private void validateExchanges(int originalNumExchanges, Map> oldExchanges) - { - Collection> exchanges = getVirtualHost().getExchanges(); - Collection exchangeNames = new ArrayList(exchanges.size()); - for(ExchangeImpl exchange : exchanges) - { - exchangeNames.add(exchange.getName()); - } - assertTrue(directExchangeName + " exchange NOT reloaded", - exchangeNames.contains(directExchangeName)); - assertTrue(topicExchangeName + " exchange NOT reloaded", - exchangeNames.contains(topicExchangeName)); - assertTrue(nonDurableExchangeName + " exchange reloaded", - !exchangeNames.contains(nonDurableExchangeName)); - - //check the old exchange objects are not the same as the new exchanges - assertTrue(directExchangeName + " exchange NOT reloaded", - getVirtualHost().getExchange(directExchangeName) != oldExchanges.get(directExchangeName)); - assertTrue(topicExchangeName + " exchange NOT reloaded", - getVirtualHost().getExchange(topicExchangeName) != oldExchanges.get(topicExchangeName)); - - // There should only be the original exchanges + our 2 recovered durable exchanges - assertEquals("Incorrect number of exchanges available", - originalNumExchanges + 2, getVirtualHost().getExchanges().size()); - } - - /** Validates the Durable queues and their properties are as expected following recovery */ - private void validateBindingProperties() - { - - assertEquals("Incorrect number of (durable) queues following recovery", 6, getVirtualHost().getQueues().size()); - - validateBindingProperties(getVirtualHost().getQueue(durablePriorityQueueName).getBindings(), false); - validateBindingProperties(getVirtualHost().getQueue(durablePriorityTopicQueueName).getBindings(), true); - validateBindingProperties(getVirtualHost().getQueue(durableQueueName).getBindings(), false); - validateBindingProperties(getVirtualHost().getQueue(durableTopicQueueName).getBindings(), true); - validateBindingProperties(getVirtualHost().getQueue(durableExclusiveQueueName).getBindings(), false); - } - - /** - * Validate that each queue is bound only once following recovery (i.e. that bindings for non durable - * queues or to non durable exchanges are not recovered), and if a selector should be present - * that it is and contains the correct value - * - * @param bindings the set of bindings to validate - * @param useSelectors if set, check the binding has a JMS_SELECTOR argument and the correct value for it - */ - private void validateBindingProperties(Collection bindings, boolean useSelectors) - { - assertEquals("Each queue should only be bound once.", 1, bindings.size()); - - Binding binding = bindings.iterator().next(); - - if (useSelectors) - { - assertTrue("Binding does not contain a Selector argument.", - binding.getArguments().containsKey(AMQPFilterTypes.JMS_SELECTOR.toString())); - assertEquals("The binding selector argument is incorrect", SELECTOR_VALUE, - binding.getArguments().get(AMQPFilterTypes.JMS_SELECTOR.toString()).toString()); - } - } - - private void setQueueExclusivity(boolean exclusive) throws MessageSource.ExistingConsumerPreventsExclusive - { - AMQQueue queue = getVirtualHost().getQueue(durableExclusiveQueueName); - queue.setExclusivityPolicy(exclusive ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.NONE); - } - - private void validateQueueExclusivityProperty(boolean expected) - { - AMQQueue queue = getVirtualHost().getQueue(durableExclusiveQueueName); - - assertEquals("Queue exclusivity was incorrect", queue.isExclusive(), expected); - } - - - private void validateDurableQueueProperties() - { - validateQueueProperties(getVirtualHost().getQueue(durablePriorityQueueName), true, true, false, false); - validateQueueProperties(getVirtualHost().getQueue(durablePriorityTopicQueueName), true, true, false, false); - validateQueueProperties(getVirtualHost().getQueue(durableQueueName), false, true, false, false); - validateQueueProperties(getVirtualHost().getQueue(durableTopicQueueName), false, true, false, false); - validateQueueProperties(getVirtualHost().getQueue(durableExclusiveQueueName), false, true, true, false); - validateQueueProperties(getVirtualHost().getQueue(durableLastValueQueueName), false, true, true, true); - } - - private void validateQueueProperties(AMQQueue queue, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue) - { - if(usePriority || lastValueQueue) - { - assertNotSame("Queues cant be both Priority and LastValue based", usePriority, lastValueQueue); - } - - if (usePriority) - { - assertEquals("Queue is no longer a Priority Queue", PriorityQueue.class, queue.getClass()); - assertEquals("Priority Queue does not have set priorities", - DEFAULT_PRIORTY_LEVEL, ((PriorityQueue) queue).getPriorities()); - } - else if (lastValueQueue) - { - assertEquals("Queue is no longer a LastValue Queue", ConflationQueue.class, queue.getClass()); - assertEquals("LastValue Queue Key has changed", LVQ_KEY, ((ConflationQueue) queue).getConflationKey()); - } - else - { - assertEquals("Queue is not 'simple'", StandardQueue.class, queue.getClass()); - } - - assertEquals("Queue owner is not as expected", exclusive ? queueOwner : null, queue.getOwner()); - assertEquals("Queue durability is not as expected", durable, queue.isDurable()); - assertEquals("Queue exclusivity is not as expected", exclusive, queue.isExclusive()); - } - - /** - * Delete the Store Environment path - * - * @param environmentPath The configuration that contains the store environment path. - */ - private void cleanup(File environmentPath) - { - if (environmentPath.exists()) - { - FileUtils.delete(environmentPath, true); - } - } - - private void sendMessageOnExchange(ExchangeImpl exchange, String routingKey, boolean deliveryMode) - { - //Set MessagePersistence - BasicContentHeaderProperties properties = new BasicContentHeaderProperties(); - properties.setDeliveryMode(deliveryMode ? Integer.valueOf(2).byteValue() : Integer.valueOf(1).byteValue()); - FieldTable headers = properties.getHeaders(); - headers.setString("Test", "MST"); - properties.setHeaders(headers); - - MessagePublishInfo messageInfo = new TestMessagePublishInfo(exchange, false, false, routingKey); - - ContentHeaderBody headerBody = new ContentHeaderBody(BasicConsumeBodyImpl.CLASS_ID,0,properties,0l); - - MessageMetaData mmd = new MessageMetaData(messageInfo, headerBody, System.currentTimeMillis()); - - final StoredMessage storedMessage = getVirtualHost().getMessageStore().addMessage(mmd); - storedMessage.flushToStore(); - final AMQMessage currentMessage = new AMQMessage(storedMessage); - - - - ServerTransaction trans = new AutoCommitTransaction(getVirtualHost().getMessageStore()); - exchange.send(currentMessage, routingKey, InstanceProperties.EMPTY, trans, null); - - } - - private void createAllQueues() throws Exception - { - //Register Durable Priority Queue - createQueue(durablePriorityQueueName, true, true, false, false); - - //Register Durable Simple Queue - createQueue(durableQueueName, false, true, false, false); - - //Register Durable Exclusive Simple Queue - createQueue(durableExclusiveQueueName, false, true, true, false); - - //Register Durable LastValue Queue - createQueue(durableLastValueQueueName, false, true, true, true); - - //Register NON-Durable Priority Queue - createQueue(priorityQueueName, true, false, false, false); - - //Register NON-Durable Simple Queue - createQueue(queueName, false, false, false, false); - } - - private void createAllTopicQueues() throws Exception - { - //Register Durable Priority Queue - createQueue(durablePriorityTopicQueueName, true, true, false, false); - - //Register Durable Simple Queue - createQueue(durableTopicQueueName, false, true, false, false); - - //Register NON-Durable Priority Queue - createQueue(priorityTopicQueueName, true, false, false, false); - - //Register NON-Durable Simple Queue - createQueue(topicQueueName, false, false, false, false); - } - - private void createQueue(String queueName, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue) - throws Exception - { - - Map queueArguments = new HashMap(); - - if(usePriority || lastValueQueue) - { - assertNotSame("Queues cant be both Priority and LastValue based", usePriority, lastValueQueue); - } - - if (usePriority) - { - queueArguments.put(Queue.PRIORITIES, DEFAULT_PRIORTY_LEVEL); - } - - if (lastValueQueue) - { - queueArguments.put(Queue.LVQ_KEY, LVQ_KEY); - } - - queueArguments.put(Queue.ID, UUIDGenerator.generateRandomUUID()); - queueArguments.put(Queue.NAME, queueName); - queueArguments.put(Queue.DURABLE, durable); - queueArguments.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT); - queueArguments.put(Queue.EXCLUSIVE, exclusive ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.NONE); - if(exclusive && queueOwner != null) - { - queueArguments.put(Queue.OWNER, queueOwner); - } - AMQQueue queue = null; - - //Ideally we would be able to use the QueueDeclareHandler here. - queue = getVirtualHost().createQueue(queueArguments); - - validateQueueProperties(queue, usePriority, durable, exclusive, lastValueQueue); - - - - } - - private Map> createExchanges() throws Exception - { - Map> exchanges = new HashMap>(); - - //Register non-durable DirectExchange - exchanges.put(nonDurableExchangeName, createExchange(DirectExchange.TYPE, nonDurableExchangeName, false)); - - //Register durable DirectExchange and TopicExchange - exchanges.put(directExchangeName ,createExchange(DirectExchange.TYPE, directExchangeName, true)); - exchanges.put(topicExchangeName,createExchange(TopicExchange.TYPE, topicExchangeName, true)); - - return exchanges; - } - - private ExchangeImpl createExchange(ExchangeType type, String name, boolean durable) throws Exception - { - ExchangeImpl exchange = null; - - Map attributes = new HashMap(); - - attributes.put(org.apache.qpid.server.model.Exchange.NAME, name); - attributes.put(org.apache.qpid.server.model.Exchange.TYPE, type.getType()); - attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, durable); - attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, - false ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT); - attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null); - exchange = getVirtualHost().createExchange(attributes); - - return exchange; - } - - private void bindAllQueuesToExchange(ExchangeImpl exchange, String routingKey) - { - bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(durablePriorityQueueName), false); - bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(durableQueueName), false); - bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(priorityQueueName), false); - bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(queueName), false); - bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(durableExclusiveQueueName), false); - } - - private void bindAllTopicQueuesToExchange(ExchangeImpl exchange, String routingKey) - { - - bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(durablePriorityTopicQueueName), true); - bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(durableTopicQueueName), true); - bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(priorityTopicQueueName), true); - bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(topicQueueName), true); - } - - - protected void bindQueueToExchange(ExchangeImpl exchange, - String routingKey, - AMQQueue queue, - boolean useSelector) - { - Map bindArguments = new HashMap(); - - if (useSelector) - { - bindArguments.put(AMQPFilterTypes.JMS_SELECTOR.toString(), SELECTOR_VALUE ); - } - - try - { - exchange.addBinding(routingKey, queue, bindArguments); - } - catch (Exception e) - { - fail(e.getMessage()); - } - } - - protected void unbindQueueFromExchange(ExchangeImpl exchange, - String routingKey, - AMQQueue queue, - boolean useSelector) - { - Map bindArguments = new HashMap(); - - if (useSelector) - { - bindArguments.put(AMQPFilterTypes.JMS_SELECTOR.toString(), SELECTOR_VALUE ); - } - - try - { - exchange.deleteBinding(routingKey, queue); - } - catch (Exception e) - { - fail(e.getMessage()); - } - } - - private void validateMessageOnTopics(long messageCount, boolean allQueues) - { - validateMessageOnQueue(durablePriorityTopicQueueName, messageCount); - validateMessageOnQueue(durableTopicQueueName, messageCount); - - if (allQueues) - { - validateMessageOnQueue(priorityTopicQueueName, messageCount); - validateMessageOnQueue(topicQueueName, messageCount); - } - } - - private void validateMessageOnQueues(long messageCount, boolean allQueues) - { - validateMessageOnQueue(durablePriorityQueueName, messageCount); - validateMessageOnQueue(durableQueueName, messageCount); - - if (allQueues) - { - validateMessageOnQueue(priorityQueueName, messageCount); - validateMessageOnQueue(queueName, messageCount); - } - } - - private void validateMessageOnQueue(String queueName, long messageCount) - { - AMQQueue queue = getVirtualHost().getQueue(queueName); - - assertNotNull("Queue(" + queueName + ") not correctly registered:", queue); - - assertEquals("Incorrect Message count on queue:" + queueName, messageCount, queue.getQueueDepthMessages()); - } - - private class TestMessagePublishInfo implements MessagePublishInfo - { - - ExchangeImpl _exchange; - boolean _immediate; - boolean _mandatory; - String _routingKey; - - TestMessagePublishInfo(ExchangeImpl exchange, boolean immediate, boolean mandatory, String routingKey) - { - _exchange = exchange; - _immediate = immediate; - _mandatory = mandatory; - _routingKey = routingKey; - } - - public AMQShortString getExchange() - { - return new AMQShortString(_exchange.getName()); - } - - public void setExchange(AMQShortString exchange) - { - //no-op - } - - public boolean isImmediate() - { - return _immediate; - } - - public boolean isMandatory() - { - return _mandatory; - } - - public AMQShortString getRoutingKey() - { - return new AMQShortString(_routingKey); - } - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java index 406a20d557..0993783e54 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java @@ -27,7 +27,7 @@ import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.message.MessageContentSource; import org.apache.qpid.server.model.ConfiguredObject; -public class QuotaMessageStore extends NullMessageStore +public class QuotaMessageStore extends AbstractMemoryMessageStore { public static final String TYPE = "QuotaMessageStore"; private final AtomicLong _messageId = new AtomicLong(1); @@ -155,10 +155,4 @@ public class QuotaMessageStore extends NullMessageStore } } } - - @Override - public String getStoreType() - { - return TYPE; - } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java index e20196c98d..95bffa89aa 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java @@ -31,6 +31,10 @@ import org.apache.log4j.Logger; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.plugin.MessageStoreFactory; +import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; +import org.apache.qpid.server.store.handler.DistributedTransactionHandler; +import org.apache.qpid.server.store.handler.MessageHandler; +import org.apache.qpid.server.store.handler.MessageInstanceHandler; public class SlowMessageStore implements MessageStore, DurableConfigurationStore { @@ -63,12 +67,6 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore } } - @Override - public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler) - { - _realDurableConfigurationStore.recoverConfigurationStore(recoveryHandler); - } - private void configureDelays(Map delays) { @@ -293,12 +291,6 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore } } - @Override - public void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler) - { - _realMessageStore.recoverMessageStore(messageRecoveryHandler, transactionLogRecoveryHandler); - } - @Override public void addEventListener(EventListener eventListener, Event... events) { @@ -319,15 +311,33 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore } @Override - public String getStoreType() + public void onDelete() { - return TYPE; + _realMessageStore.onDelete(); } @Override - public void onDelete() + public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws StoreException { - _realMessageStore.onDelete(); + _realDurableConfigurationStore.visitConfiguredObjectRecords(handler); + } + + @Override + public void visitMessages(MessageHandler handler) throws StoreException + { + _realMessageStore.visitMessages(handler); + } + + @Override + public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException + { + _realMessageStore.visitMessageInstances(handler); + } + + @Override + public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException + { + _realMessageStore.visitDistributedTransactions(handler); } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java new file mode 100644 index 0000000000..7db8210753 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java @@ -0,0 +1,872 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import org.apache.log4j.Logger; +import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl; +import org.apache.qpid.server.exchange.DirectExchange; +import org.apache.qpid.server.exchange.ExchangeImpl; +import org.apache.qpid.server.exchange.TopicExchange; +import org.apache.qpid.server.logging.EventLogger; +import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageSource; +import org.apache.qpid.server.model.Binding; +import org.apache.qpid.server.model.ExclusivityPolicy; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.plugin.ExchangeType; +import org.apache.qpid.server.protocol.v0_8.AMQMessage; +import org.apache.qpid.server.protocol.v0_8.MessageMetaData; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.ConflationQueue; +import org.apache.qpid.server.queue.PriorityQueue; +import org.apache.qpid.server.queue.StandardQueue; +import org.apache.qpid.server.txn.AutoCommitTransaction; +import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.BrokerTestHelper; +import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.util.FileUtils; + +/** + * + * Virtualhost/store integration test. Tests for correct behaviour of the message store + * when exercised via the higher level functions of the store. + * + * For persistent stores, it validates that Exchanges, Queues, Bindings and + * Messages are persisted and recovered correctly. + */ +public class VirtualHostMessageStoreTest extends QpidTestCase +{ + private static final Logger _logger = Logger.getLogger(VirtualHostMessageStoreTest.class); + + public static final int DEFAULT_PRIORTY_LEVEL = 5; + public static final String SELECTOR_VALUE = "Test = 'MST'"; + public static final String LVQ_KEY = "MST-LVQ-KEY"; + + private String nonDurableExchangeName = "MST-NonDurableDirectExchange"; + private String directExchangeName = "MST-DirectExchange"; + private String topicExchangeName = "MST-TopicExchange"; + + private String durablePriorityTopicQueueName = "MST-PriorityTopicQueue-Durable"; + private String durableTopicQueueName = "MST-TopicQueue-Durable"; + private String priorityTopicQueueName = "MST-PriorityTopicQueue"; + private String topicQueueName = "MST-TopicQueue"; + + private String durableExclusiveQueueName = "MST-Queue-Durable-Exclusive"; + private String durablePriorityQueueName = "MST-PriorityQueue-Durable"; + private String durableLastValueQueueName = "MST-LastValueQueue-Durable"; + private String durableQueueName = "MST-Queue-Durable"; + private String priorityQueueName = "MST-PriorityQueue"; + private String queueName = "MST-Queue"; + + private String directRouting = "MST-direct"; + private String topicRouting = "MST-topic"; + + private String queueOwner = "MST"; + + private VirtualHost _virtualHost; + private org.apache.qpid.server.model.VirtualHost _virtualHostModel; + private String _storePath; + + public void setUp() throws Exception + { + super.setUp(); + BrokerTestHelper.setUp(); + + String hostName = getName(); + _storePath = System.getProperty("QPID_WORK", TMP_FOLDER + File.separator + getTestName()) + File.separator + hostName; + + Map messageStoreSettings = new HashMap(); + messageStoreSettings.put(MessageStore.STORE_PATH, _storePath); + messageStoreSettings.put(MessageStore.STORE_TYPE, getTestProfileMessageStoreType()); + + _virtualHostModel = mock(org.apache.qpid.server.model.VirtualHost.class); + + when(_virtualHostModel.getMessageStoreSettings()).thenReturn(messageStoreSettings); + when(_virtualHostModel.getAttribute(eq(org.apache.qpid.server.model.VirtualHost.TYPE))).thenReturn(StandardVirtualHostFactory.TYPE); + when(_virtualHostModel.getAttribute(eq(org.apache.qpid.server.model.VirtualHost.NAME))).thenReturn(hostName); + when(_virtualHostModel.getType()).thenReturn(StandardVirtualHostFactory.TYPE); + when(_virtualHostModel.getName()).thenReturn(hostName); + + cleanup(new File(_storePath)); + + reloadVirtualHost(); + } + + protected String getStorePath() + { + return _storePath; + } + + protected org.apache.qpid.server.model.VirtualHost getVirtualHostModel() + { + return _virtualHostModel; + } + + @Override + public void tearDown() throws Exception + { + try + { + if (_virtualHost != null) + { + _virtualHost.close(); + } + } + finally + { + BrokerTestHelper.tearDown(); + super.tearDown(); + } + } + + public VirtualHost getVirtualHost() + { + return _virtualHost; + } + + protected void reloadVirtualHost() + { + VirtualHost original = getVirtualHost(); + + if (getVirtualHost() != null) + { + try + { + getVirtualHost().close(); + } + catch (Exception e) + { + _logger.error("Error closing virtual host", e); + fail(e.getMessage()); + } + } + + try + { + _virtualHost = BrokerTestHelper.createVirtualHost(new VirtualHostRegistry(new EventLogger()), getVirtualHostModel()); + when(_virtualHostModel.getId()).thenReturn(_virtualHost.getId()); + + ConfiguredObjectRecord objectRecord = mock(ConfiguredObjectRecord.class); + when(objectRecord.getId()).thenReturn(_virtualHost.getId()); + when(objectRecord.getType()).thenReturn(org.apache.qpid.server.model.VirtualHost.class.getSimpleName()); + when(_virtualHostModel.asObjectRecord()).thenReturn(objectRecord); + } + catch (Exception e) + { + _logger.error("Error creating virtual host", e); + fail(e.getMessage()); + } + + assertTrue("Virtualhost has not changed, reload was not successful", original != getVirtualHost()); + } + + public void testQueueExchangeAndBindingCreation() throws Exception + { + assertEquals("Should not be any existing queues", 0, getVirtualHost().getQueues().size()); + + createAllQueues(); + createAllTopicQueues(); + + //Register Non-Durable DirectExchange + ExchangeImpl nonDurableExchange = createExchange(DirectExchange.TYPE, nonDurableExchangeName, false); + bindAllQueuesToExchange(nonDurableExchange, directRouting); + + //Register DirectExchange + ExchangeImpl directExchange = createExchange(DirectExchange.TYPE, directExchangeName, true); + bindAllQueuesToExchange(directExchange, directRouting); + + //Register TopicExchange + ExchangeImpl topicExchange = createExchange(TopicExchange.TYPE, topicExchangeName, true); + bindAllTopicQueuesToExchange(topicExchange, topicRouting); + + //Send Message To NonDurable direct Exchange = persistent + sendMessageOnExchange(nonDurableExchange, directRouting, true); + // and non-persistent + sendMessageOnExchange(nonDurableExchange, directRouting, false); + + //Send Message To direct Exchange = persistent + sendMessageOnExchange(directExchange, directRouting, true); + // and non-persistent + sendMessageOnExchange(directExchange, directRouting, false); + + //Send Message To topic Exchange = persistent + sendMessageOnExchange(topicExchange, topicRouting, true); + // and non-persistent + sendMessageOnExchange(topicExchange, topicRouting, false); + + //Ensure all the Queues have four messages (one transient, one persistent) x 2 exchange routings + validateMessageOnQueues(4, true); + //Ensure all the topics have two messages (one transient, one persistent) + validateMessageOnTopics(2, true); + + assertEquals("Not all queues correctly registered", + 10, getVirtualHost().getQueues().size()); + } + + public void testMessagePersistence() throws Exception + { + testQueueExchangeAndBindingCreation(); + + reloadVirtualHost(); + + //Validate durable queues and subscriptions still have the persistent messages + validateMessageOnQueues(2, false); + validateMessageOnTopics(1, false); + } + + /** + * Tests message removal by running the testMessagePersistence() method above before + * clearing the queues, reloading the virtual host, and ensuring that the persistent + * messages were removed from the queues. + */ + public void testMessageRemoval() throws Exception + { + testMessagePersistence(); + + assertEquals("Incorrect number of queues registered after recovery", + 6, getVirtualHost().getQueues().size()); + + //clear the queue + _virtualHost.getQueue(durableQueueName).clearQueue(); + + //check the messages are gone + validateMessageOnQueue(durableQueueName, 0); + + //reload and verify messages arent restored + reloadVirtualHost(); + + validateMessageOnQueue(durableQueueName, 0); + } + + /** + * Tests queue persistence by creating a selection of queues with differing properties, both + * durable and non durable, and ensuring that following the recovery process the correct queues + * are present and any property manipulations (eg queue exclusivity) are correctly recovered. + */ + public void testQueuePersistence() throws Exception + { + assertEquals("Should not be any existing queues", + 0, getVirtualHost().getQueues().size()); + + //create durable and non durable queues/topics + createAllQueues(); + createAllTopicQueues(); + + //reload the virtual host, prompting recovery of the queues/topics + reloadVirtualHost(); + + assertEquals("Incorrect number of queues registered after recovery", + 6, getVirtualHost().getQueues().size()); + + //Validate the non-Durable Queues were not recovered. + assertNull("Non-Durable queue still registered:" + priorityQueueName, + getVirtualHost().getQueue(priorityQueueName)); + assertNull("Non-Durable queue still registered:" + queueName, + getVirtualHost().getQueue(queueName)); + assertNull("Non-Durable queue still registered:" + priorityTopicQueueName, + getVirtualHost().getQueue(priorityTopicQueueName)); + assertNull("Non-Durable queue still registered:" + topicQueueName, + getVirtualHost().getQueue(topicQueueName)); + + //Validate normally expected properties of Queues/Topics + validateDurableQueueProperties(); + + //Update the durable exclusive queue's exclusivity + setQueueExclusivity(false); + validateQueueExclusivityProperty(false); + } + + /** + * Tests queue removal by creating a durable queue, verifying it recovers, and + * then removing it from the store, and ensuring that following the second reload + * process it is not recovered. + */ + public void testDurableQueueRemoval() throws Exception + { + //Register Durable Queue + createQueue(durableQueueName, false, true, false, false); + + assertEquals("Incorrect number of queues registered before recovery", + 1, getVirtualHost().getQueues().size()); + + reloadVirtualHost(); + + assertEquals("Incorrect number of queues registered after first recovery", + 1, getVirtualHost().getQueues().size()); + + //test that removing the queue means it is not recovered next time + final AMQQueue queue = getVirtualHost().getQueue(durableQueueName); + DurableConfigurationStoreHelper.removeQueue(getVirtualHost().getDurableConfigurationStore(),queue); + + reloadVirtualHost(); + + assertEquals("Incorrect number of queues registered after second recovery", + 0, getVirtualHost().getQueues().size()); + assertNull("Durable queue was not removed:" + durableQueueName, + getVirtualHost().getQueue(durableQueueName)); + } + + /** + * Tests exchange persistence by creating a selection of exchanges, both durable + * and non durable, and ensuring that following the recovery process the correct + * durable exchanges are still present. + */ + public void testExchangePersistence() throws Exception + { + int origExchangeCount = getVirtualHost().getExchanges().size(); + + Map> oldExchanges = createExchanges(); + + assertEquals("Incorrect number of exchanges registered before recovery", + origExchangeCount + 3, getVirtualHost().getExchanges().size()); + + reloadVirtualHost(); + + //verify the exchanges present after recovery + validateExchanges(origExchangeCount, oldExchanges); + } + + /** + * Tests exchange removal by creating a durable exchange, verifying it recovers, and + * then removing it from the store, and ensuring that following the second reload + * process it is not recovered. + */ + public void testDurableExchangeRemoval() throws Exception + { + int origExchangeCount = getVirtualHost().getExchanges().size(); + + createExchange(DirectExchange.TYPE, directExchangeName, true); + + assertEquals("Incorrect number of exchanges registered before recovery", + origExchangeCount + 1, getVirtualHost().getExchanges().size()); + + reloadVirtualHost(); + + assertEquals("Incorrect number of exchanges registered after first recovery", + origExchangeCount + 1, getVirtualHost().getExchanges().size()); + + //test that removing the exchange means it is not recovered next time + final ExchangeImpl exchange = getVirtualHost().getExchange(directExchangeName); + DurableConfigurationStoreHelper.removeExchange(getVirtualHost().getDurableConfigurationStore(), exchange); + + reloadVirtualHost(); + + assertEquals("Incorrect number of exchanges registered after second recovery", + origExchangeCount, getVirtualHost().getExchanges().size()); + assertNull("Durable exchange was not removed:" + directExchangeName, + getVirtualHost().getExchange(directExchangeName)); + } + + /** + * Tests binding persistence by creating a selection of queues and exchanges, both durable + * and non durable, then adding bindings with and without selectors before reloading the + * virtual host and verifying that following the recovery process the correct durable + * bindings (those for durable queues to durable exchanges) are still present. + */ + public void testBindingPersistence() throws Exception + { + int origExchangeCount = getVirtualHost().getExchanges().size(); + + createAllQueues(); + createAllTopicQueues(); + + Map> exchanges = createExchanges(); + + ExchangeImpl nonDurableExchange = exchanges.get(nonDurableExchangeName); + ExchangeImpl directExchange = exchanges.get(directExchangeName); + ExchangeImpl topicExchange = exchanges.get(topicExchangeName); + + bindAllQueuesToExchange(nonDurableExchange, directRouting); + bindAllQueuesToExchange(directExchange, directRouting); + bindAllTopicQueuesToExchange(topicExchange, topicRouting); + + assertEquals("Incorrect number of exchanges registered before recovery", + origExchangeCount + 3, getVirtualHost().getExchanges().size()); + + reloadVirtualHost(); + + validateExchanges(origExchangeCount, exchanges); + + validateBindingProperties(); + } + + /** + * Tests binding removal by creating a durable exchange, and queue, binding them together, + * recovering to verify the persistence, then removing it from the store, and ensuring + * that following the second reload process it is not recovered. + */ + public void testDurableBindingRemoval() throws Exception + { + //create durable queue and exchange, bind them + ExchangeImpl exch = createExchange(DirectExchange.TYPE, directExchangeName, true); + createQueue(durableQueueName, false, true, false, false); + bindQueueToExchange(exch, directRouting, getVirtualHost().getQueue(durableQueueName), false); + + assertEquals("Incorrect number of bindings registered before recovery", + 1, getVirtualHost().getQueue(durableQueueName).getBindings().size()); + + //verify binding is actually normally recovered + reloadVirtualHost(); + + assertEquals("Incorrect number of bindings registered after first recovery", + 1, getVirtualHost().getQueue(durableQueueName).getBindings().size()); + + exch = getVirtualHost().getExchange(directExchangeName); + assertNotNull("Exchange was not recovered", exch); + + //remove the binding and verify result after recovery + unbindQueueFromExchange(exch, directRouting, getVirtualHost().getQueue(durableQueueName), false); + + reloadVirtualHost(); + + assertEquals("Incorrect number of bindings registered after second recovery", + 0, getVirtualHost().getQueue(durableQueueName).getBindings().size()); + } + + /** + * Validates that the durable exchanges are still present, the non durable exchange is not, + * and that the new exchanges are not the same objects as the provided list (i.e. that the + * reload actually generated new exchange objects) + */ + private void validateExchanges(int originalNumExchanges, Map> oldExchanges) + { + Collection> exchanges = getVirtualHost().getExchanges(); + Collection exchangeNames = new ArrayList(exchanges.size()); + for(ExchangeImpl exchange : exchanges) + { + exchangeNames.add(exchange.getName()); + } + assertTrue(directExchangeName + " exchange NOT reloaded", + exchangeNames.contains(directExchangeName)); + assertTrue(topicExchangeName + " exchange NOT reloaded", + exchangeNames.contains(topicExchangeName)); + assertTrue(nonDurableExchangeName + " exchange reloaded", + !exchangeNames.contains(nonDurableExchangeName)); + + //check the old exchange objects are not the same as the new exchanges + assertTrue(directExchangeName + " exchange NOT reloaded", + getVirtualHost().getExchange(directExchangeName) != oldExchanges.get(directExchangeName)); + assertTrue(topicExchangeName + " exchange NOT reloaded", + getVirtualHost().getExchange(topicExchangeName) != oldExchanges.get(topicExchangeName)); + + // There should only be the original exchanges + our 2 recovered durable exchanges + assertEquals("Incorrect number of exchanges available", + originalNumExchanges + 2, getVirtualHost().getExchanges().size()); + } + + /** Validates the Durable queues and their properties are as expected following recovery */ + @SuppressWarnings("unchecked") + private void validateBindingProperties() + { + + assertEquals("Incorrect number of (durable) queues following recovery", 6, getVirtualHost().getQueues().size()); + + validateBindingProperties(getVirtualHost().getQueue(durablePriorityQueueName).getBindings(), false); + validateBindingProperties(getVirtualHost().getQueue(durablePriorityTopicQueueName).getBindings(), true); + validateBindingProperties(getVirtualHost().getQueue(durableQueueName).getBindings(), false); + validateBindingProperties(getVirtualHost().getQueue(durableTopicQueueName).getBindings(), true); + validateBindingProperties(getVirtualHost().getQueue(durableExclusiveQueueName).getBindings(), false); + } + + /** + * Validate that each queue is bound only once following recovery (i.e. that bindings for non durable + * queues or to non durable exchanges are not recovered), and if a selector should be present + * that it is and contains the correct value + * + * @param bindings the set of bindings to validate + * @param useSelectors if set, check the binding has a JMS_SELECTOR argument and the correct value for it + */ + private void validateBindingProperties(Collection> bindings, boolean useSelectors) + { + assertEquals("Each queue should only be bound once.", 1, bindings.size()); + + Binding binding = bindings.iterator().next(); + + if (useSelectors) + { + assertTrue("Binding does not contain a Selector argument.", + binding.getArguments().containsKey(AMQPFilterTypes.JMS_SELECTOR.toString())); + assertEquals("The binding selector argument is incorrect", SELECTOR_VALUE, + binding.getArguments().get(AMQPFilterTypes.JMS_SELECTOR.toString()).toString()); + } + } + + private void setQueueExclusivity(boolean exclusive) throws MessageSource.ExistingConsumerPreventsExclusive + { + AMQQueue queue = getVirtualHost().getQueue(durableExclusiveQueueName); + queue.setExclusivityPolicy(exclusive ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.NONE); + } + + private void validateQueueExclusivityProperty(boolean expected) + { + AMQQueue queue = getVirtualHost().getQueue(durableExclusiveQueueName); + + assertEquals("Queue exclusivity was incorrect", queue.isExclusive(), expected); + } + + + private void validateDurableQueueProperties() + { + validateQueueProperties(getVirtualHost().getQueue(durablePriorityQueueName), true, true, false, false); + validateQueueProperties(getVirtualHost().getQueue(durablePriorityTopicQueueName), true, true, false, false); + validateQueueProperties(getVirtualHost().getQueue(durableQueueName), false, true, false, false); + validateQueueProperties(getVirtualHost().getQueue(durableTopicQueueName), false, true, false, false); + validateQueueProperties(getVirtualHost().getQueue(durableExclusiveQueueName), false, true, true, false); + validateQueueProperties(getVirtualHost().getQueue(durableLastValueQueueName), false, true, true, true); + } + + private void validateQueueProperties(AMQQueue queue, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue) + { + if(usePriority || lastValueQueue) + { + assertNotSame("Queues cant be both Priority and LastValue based", usePriority, lastValueQueue); + } + + if (usePriority) + { + assertEquals("Queue is no longer a Priority Queue", PriorityQueue.class, queue.getClass()); + assertEquals("Priority Queue does not have set priorities", + DEFAULT_PRIORTY_LEVEL, ((PriorityQueue) queue).getPriorities()); + } + else if (lastValueQueue) + { + assertEquals("Queue is no longer a LastValue Queue", ConflationQueue.class, queue.getClass()); + assertEquals("LastValue Queue Key has changed", LVQ_KEY, ((ConflationQueue) queue).getConflationKey()); + } + else + { + assertEquals("Queue is not 'simple'", StandardQueue.class, queue.getClass()); + } + + assertEquals("Queue owner is not as expected for queue " + queue.getName(), exclusive ? queueOwner : null, queue.getOwner()); + assertEquals("Queue durability is not as expected for queue " + queue.getName(), durable, queue.isDurable()); + assertEquals("Queue exclusivity is not as expected for queue " + queue.getName(), exclusive, queue.isExclusive()); + } + + /** + * Delete the Store Environment path + * + * @param environmentPath The configuration that contains the store environment path. + */ + private void cleanup(File environmentPath) + { + if (environmentPath.exists()) + { + FileUtils.delete(environmentPath, true); + } + } + + private void sendMessageOnExchange(ExchangeImpl exchange, String routingKey, boolean deliveryMode) + { + //Set MessagePersistence + BasicContentHeaderProperties properties = new BasicContentHeaderProperties(); + properties.setDeliveryMode(deliveryMode ? Integer.valueOf(2).byteValue() : Integer.valueOf(1).byteValue()); + FieldTable headers = properties.getHeaders(); + headers.setString("Test", "MST"); + properties.setHeaders(headers); + + MessagePublishInfo messageInfo = new TestMessagePublishInfo(exchange, false, false, routingKey); + + ContentHeaderBody headerBody = new ContentHeaderBody(BasicConsumeBodyImpl.CLASS_ID,0,properties,0l); + + MessageMetaData mmd = new MessageMetaData(messageInfo, headerBody, System.currentTimeMillis()); + + final StoredMessage storedMessage = getVirtualHost().getMessageStore().addMessage(mmd); + storedMessage.flushToStore(); + final AMQMessage currentMessage = new AMQMessage(storedMessage); + + + + ServerTransaction trans = new AutoCommitTransaction(getVirtualHost().getMessageStore()); + exchange.send(currentMessage, routingKey, InstanceProperties.EMPTY, trans, null); + + } + + private void createAllQueues() throws Exception + { + //Register Durable Priority Queue + createQueue(durablePriorityQueueName, true, true, false, false); + + //Register Durable Simple Queue + createQueue(durableQueueName, false, true, false, false); + + //Register Durable Exclusive Simple Queue + createQueue(durableExclusiveQueueName, false, true, true, false); + + //Register Durable LastValue Queue + createQueue(durableLastValueQueueName, false, true, true, true); + + //Register NON-Durable Priority Queue + createQueue(priorityQueueName, true, false, false, false); + + //Register NON-Durable Simple Queue + createQueue(queueName, false, false, false, false); + } + + private void createAllTopicQueues() throws Exception + { + //Register Durable Priority Queue + createQueue(durablePriorityTopicQueueName, true, true, false, false); + + //Register Durable Simple Queue + createQueue(durableTopicQueueName, false, true, false, false); + + //Register NON-Durable Priority Queue + createQueue(priorityTopicQueueName, true, false, false, false); + + //Register NON-Durable Simple Queue + createQueue(topicQueueName, false, false, false, false); + } + + private void createQueue(String queueName, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue) + throws Exception + { + + Map queueArguments = new HashMap(); + + if(usePriority || lastValueQueue) + { + assertNotSame("Queues cant be both Priority and LastValue based", usePriority, lastValueQueue); + } + + if (usePriority) + { + queueArguments.put(Queue.PRIORITIES, DEFAULT_PRIORTY_LEVEL); + } + + if (lastValueQueue) + { + queueArguments.put(Queue.LVQ_KEY, LVQ_KEY); + } + + queueArguments.put(Queue.ID, UUIDGenerator.generateRandomUUID()); + queueArguments.put(Queue.NAME, queueName); + queueArguments.put(Queue.DURABLE, durable); + queueArguments.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT); + queueArguments.put(Queue.EXCLUSIVE, exclusive ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.NONE); + if(exclusive && queueOwner != null) + { + queueArguments.put(Queue.OWNER, queueOwner); + } + AMQQueue queue = null; + + //Ideally we would be able to use the QueueDeclareHandler here. + queue = getVirtualHost().createQueue(queueArguments); + + validateQueueProperties(queue, usePriority, durable, exclusive, lastValueQueue); + } + + private Map> createExchanges() throws Exception + { + Map> exchanges = new HashMap>(); + + //Register non-durable DirectExchange + exchanges.put(nonDurableExchangeName, createExchange(DirectExchange.TYPE, nonDurableExchangeName, false)); + + //Register durable DirectExchange and TopicExchange + exchanges.put(directExchangeName ,createExchange(DirectExchange.TYPE, directExchangeName, true)); + exchanges.put(topicExchangeName,createExchange(TopicExchange.TYPE, topicExchangeName, true)); + + return exchanges; + } + + private ExchangeImpl createExchange(ExchangeType type, String name, boolean durable) throws Exception + { + ExchangeImpl exchange = null; + + Map attributes = new HashMap(); + + attributes.put(org.apache.qpid.server.model.Exchange.NAME, name); + attributes.put(org.apache.qpid.server.model.Exchange.TYPE, type.getType()); + attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, durable); + attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, + durable ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT); + attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null); + exchange = getVirtualHost().createExchange(attributes); + + return exchange; + } + + private void bindAllQueuesToExchange(ExchangeImpl exchange, String routingKey) + { + bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(durablePriorityQueueName), false); + bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(durableQueueName), false); + bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(priorityQueueName), false); + bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(queueName), false); + bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(durableExclusiveQueueName), false); + } + + private void bindAllTopicQueuesToExchange(ExchangeImpl exchange, String routingKey) + { + + bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(durablePriorityTopicQueueName), true); + bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(durableTopicQueueName), true); + bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(priorityTopicQueueName), true); + bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(topicQueueName), true); + } + + + protected void bindQueueToExchange(ExchangeImpl exchange, + String routingKey, + AMQQueue queue, + boolean useSelector) + { + Map bindArguments = new HashMap(); + + if (useSelector) + { + bindArguments.put(AMQPFilterTypes.JMS_SELECTOR.toString(), SELECTOR_VALUE ); + } + + try + { + exchange.addBinding(routingKey, queue, bindArguments); + } + catch (Exception e) + { + fail(e.getMessage()); + } + } + + protected void unbindQueueFromExchange(ExchangeImpl exchange, + String routingKey, + AMQQueue queue, + boolean useSelector) + { + Map bindArguments = new HashMap(); + + if (useSelector) + { + bindArguments.put(AMQPFilterTypes.JMS_SELECTOR.toString(), SELECTOR_VALUE ); + } + + try + { + exchange.deleteBinding(routingKey, queue); + } + catch (Exception e) + { + fail(e.getMessage()); + } + } + + private void validateMessageOnTopics(long messageCount, boolean allQueues) + { + validateMessageOnQueue(durablePriorityTopicQueueName, messageCount); + validateMessageOnQueue(durableTopicQueueName, messageCount); + + if (allQueues) + { + validateMessageOnQueue(priorityTopicQueueName, messageCount); + validateMessageOnQueue(topicQueueName, messageCount); + } + } + + private void validateMessageOnQueues(long messageCount, boolean allQueues) + { + validateMessageOnQueue(durablePriorityQueueName, messageCount); + validateMessageOnQueue(durableQueueName, messageCount); + + if (allQueues) + { + validateMessageOnQueue(priorityQueueName, messageCount); + validateMessageOnQueue(queueName, messageCount); + } + } + + private void validateMessageOnQueue(String queueName, long messageCount) + { + AMQQueue queue = getVirtualHost().getQueue(queueName); + + assertNotNull("Queue(" + queueName + ") not correctly registered:", queue); + + assertEquals("Incorrect Message count on queue:" + queueName, messageCount, queue.getQueueDepthMessages()); + } + + private class TestMessagePublishInfo implements MessagePublishInfo + { + + ExchangeImpl _exchange; + boolean _immediate; + boolean _mandatory; + String _routingKey; + + TestMessagePublishInfo(ExchangeImpl exchange, boolean immediate, boolean mandatory, String routingKey) + { + _exchange = exchange; + _immediate = immediate; + _mandatory = mandatory; + _routingKey = routingKey; + } + + @Override + public AMQShortString getExchange() + { + return new AMQShortString(_exchange.getName()); + } + + @Override + public void setExchange(AMQShortString exchange) + { + //no-op + } + + @Override + public boolean isImmediate() + { + return _immediate; + } + + @Override + public boolean isMandatory() + { + return _mandatory; + } + + @Override + public AMQShortString getRoutingKey() + { + return new AMQShortString(_routingKey); + } + } + +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java index 4c0e2b7ffc..59b4d496fa 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java @@ -45,10 +45,9 @@ import org.apache.qpid.server.model.SystemContext; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.security.access.FileAccessControlProviderConstants; import org.apache.qpid.server.security.group.FileGroupManagerFactory; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.ConfiguredObjectRecordImpl; -import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; public class TestBrokerConfiguration { @@ -191,7 +190,7 @@ public class TestBrokerConfiguration private ConfiguredObjectRecord findObject(final Class category, final String objectName) { final RecordFindingVisitor visitor = new RecordFindingVisitor(category, objectName); - _store.recoverConfigurationStore(visitor); + _store.visitConfiguredObjectRecords(visitor); return visitor.getFoundRecord(); } @@ -235,11 +234,12 @@ public class TestBrokerConfiguration return findObject(category, name).getAttributes(); } - private static class RecordFindingVisitor implements ConfigurationRecoveryHandler + private static class RecordFindingVisitor implements ConfiguredObjectRecordHandler { private final Class _category; private final String _objectName; public ConfiguredObjectRecord _foundRecord; + private int _version; public RecordFindingVisitor(final Class category, final String objectName) { @@ -248,26 +248,28 @@ public class TestBrokerConfiguration } @Override - public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion) + public void begin(final int configVersion) { - + _version = configVersion; } @Override - public void configuredObject(final ConfiguredObjectRecord object) + public boolean handle(final ConfiguredObjectRecord object) { if (object.getType().equals(_category.getSimpleName()) && (_objectName == null || _objectName.equals(object.getAttributes().get(ConfiguredObject.NAME)))) { _foundRecord = object; + return false; } + return true; } @Override - public int completeConfigurationRecovery() + public int end() { - return 0; + return _version; } public ConfiguredObjectRecord getFoundRecord() diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes index 6f7de94e5d..8e48e77e07 100755 --- a/qpid/java/test-profiles/CPPExcludes +++ b/qpid/java/test-profiles/CPPExcludes @@ -85,14 +85,14 @@ org.apache.qpid.server.store.PersistentStoreTest#* org.apache.qpid.server.store.SplitStoreTest#* // These tests are for the Java broker persistent store modules -org.apache.qpid.server.store.MessageStoreTest#testMessagePersistence -org.apache.qpid.server.store.MessageStoreTest#testMessageRemoval -org.apache.qpid.server.store.MessageStoreTest#testBindingPersistence -org.apache.qpid.server.store.MessageStoreTest#testDurableBindingRemoval -org.apache.qpid.server.store.MessageStoreTest#testQueuePersistence -org.apache.qpid.server.store.MessageStoreTest#testDurableQueueRemoval -org.apache.qpid.server.store.MessageStoreTest#testExchangePersistence -org.apache.qpid.server.store.MessageStoreTest#testDurableExchangeRemoval +org.apache.qpid.server.store.VirtualHostMessageStoreTest#testMessagePersistence +org.apache.qpid.server.store.VirtualHostMessageStoreTest#testMessageRemoval +org.apache.qpid.server.store.VirtualHostMessageStoreTest#testBindingPersistence +org.apache.qpid.server.store.VirtualHostMessageStoreTest#testDurableBindingRemoval +org.apache.qpid.server.store.VirtualHostMessageStoreTest#testQueuePersistence +org.apache.qpid.server.store.VirtualHostMessageStoreTest#testDurableQueueRemoval +org.apache.qpid.server.store.VirtualHostMessageStoreTest#testExchangePersistence +org.apache.qpid.server.store.VirtualHostMessageStoreTest#testDurableExchangeRemoval org.apache.qpid.server.store.DurableConfigurationStoreTest#* // CPP Broker does not follow the same Logging convention as the Java broker diff --git a/qpid/java/test-profiles/JavaBDBExcludes b/qpid/java/test-profiles/JavaBDBExcludes index 0750beb339..969b9272a7 100644 --- a/qpid/java/test-profiles/JavaBDBExcludes +++ b/qpid/java/test-profiles/JavaBDBExcludes @@ -17,7 +17,3 @@ // under the License. // -//This test is subclassed within the bdbstore module to enable it to run and -//also add some bdb-specific tests. It is excluded to prevent running twice. -org.apache.qpid.server.store.MessageStoreTest#* -org.apache.qpid.server.store.DurableConfigurationStoreTest#* diff --git a/qpid/java/test-profiles/JavaTransientExcludes b/qpid/java/test-profiles/JavaTransientExcludes index 0b060051e9..ef98882980 100644 --- a/qpid/java/test-profiles/JavaTransientExcludes +++ b/qpid/java/test-profiles/JavaTransientExcludes @@ -33,19 +33,17 @@ org.apache.qpid.test.unit.xa.TopicTest#testMultiMessagesDurSubCrash org.apache.qpid.test.unit.xa.TopicTest#testDurSubCrash org.apache.qpid.test.unit.xa.TopicTest#testRecover -org.apache.qpid.server.store.MessageStoreTest#testMessagePersistence -org.apache.qpid.server.store.MessageStoreTest#testMessageRemoval -org.apache.qpid.server.store.MessageStoreTest#testBindingPersistence -org.apache.qpid.server.store.MessageStoreTest#testDurableBindingRemoval -org.apache.qpid.server.store.MessageStoreTest#testQueuePersistence -org.apache.qpid.server.store.MessageStoreTest#testDurableQueueRemoval -org.apache.qpid.server.store.MessageStoreTest#testExchangePersistence -org.apache.qpid.server.store.MessageStoreTest#testDurableExchangeRemoval +org.apache.qpid.server.store.VirtualHostMessageStoreTest#testMessagePersistence +org.apache.qpid.server.store.VirtualHostMessageStoreTest#testMessageRemoval +org.apache.qpid.server.store.VirtualHostMessageStoreTest#testBindingPersistence +org.apache.qpid.server.store.VirtualHostMessageStoreTest#testDurableBindingRemoval +org.apache.qpid.server.store.VirtualHostMessageStoreTest#testQueuePersistence +org.apache.qpid.server.store.VirtualHostMessageStoreTest#testDurableQueueRemoval +org.apache.qpid.server.store.VirtualHostMessageStoreTest#testExchangePersistence +org.apache.qpid.server.store.VirtualHostMessageStoreTest#testDurableExchangeRemoval org.apache.qpid.server.store.berkeleydb.* -org.apache.qpid.server.store.DurableConfigurationStoreTest#* - org.apache.qpid.systest.management.jmx.QueueManagementTest#testAlternateExchangeSurvivesRestart org.apache.qpid.systest.management.jmx.QueueManagementTest#testQueueDescriptionSurvivesRestart org.apache.qpid.systest.management.jmx.QueueManagementTest#testMoveMessageBetweenQueuesWithBrokerRestart -- cgit v1.2.1