summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-04-04 22:10:24 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-04-04 22:10:24 +0000
commit676b0069419e84024417ea72f46487ae003dfd4c (patch)
tree563df3e3637b1868857d4e30e6ac7e356c8ad344
parent0a64e2b857dc189a621d51dffedf573746bd08e5 (diff)
downloadqpid-python-java-broker-config-store-changes.tar.gz
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-config-store-changes@1584926 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java17
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java542
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/XidBinding.java2
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java (renamed from qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java)352
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/BrokerStoreUpgrader.java21
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/JsonConfigurationEntryStore.java27
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandler.java34
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java25
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java589
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java297
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecordRecoveverAndUpgrader.java60
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java18
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java14
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java18
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStoreRecoveryHandler.java33
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java34
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java42
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Xid.java102
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/ConfiguredObjectRecordHandler.java41
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/DistributedTransactionHandler.java (renamed from qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/Xid.java)30
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/MessageHandler.java30
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/MessageInstanceHandler.java29
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java357
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java18
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java350
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java81
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java60
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreConfigurationTest.java6
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java57
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java4
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java404
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java21
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java15
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java64
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestRecord.java91
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java149
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java18
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java3
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MessageStoreRecovererTest.java414
-rw-r--r--qpid/java/broker-core/src/test/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory1
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java6
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java6
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java4
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java9
-rw-r--r--qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java6
-rw-r--r--qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java9
-rw-r--r--qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java6
-rw-r--r--qpid/java/broker-plugins/memory-store/pom.xml16
-rw-r--r--qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java5
-rw-r--r--qpid/java/broker-plugins/memory-store/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java (renamed from qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java)18
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java8
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java42
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java (renamed from qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java)96
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java20
-rwxr-xr-xqpid/java/test-profiles/CPPExcludes16
-rw-r--r--qpid/java/test-profiles/JavaBDBExcludes4
-rw-r--r--qpid/java/test-profiles/JavaTransientExcludes18
59 files changed, 2803 insertions, 1962 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
index aae0a56a40..a58bc274a9 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
@@ -29,15 +29,15 @@ import org.apache.qpid.server.logging.messages.MessageStoreMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.stats.StatisticsGatherer;
-import org.apache.qpid.server.store.DurableConfigurationRecoverer;
+import org.apache.qpid.server.store.ConfiguredObjectRecordRecoveverAndUpgrader;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
-import org.apache.qpid.server.virtualhost.DefaultUpgraderProvider;
+import org.apache.qpid.server.virtualhost.MessageStoreRecoverer;
import org.apache.qpid.server.virtualhost.State;
-import org.apache.qpid.server.virtualhost.VirtualHostConfigRecoveryHandler;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import com.sleepycat.je.rep.StateChangeEvent;
@@ -98,17 +98,12 @@ public class BDBHAVirtualHost extends AbstractVirtualHost
{
_messageStore.getEnvironmentFacade().getEnvironment().flushLog(true);
- DefaultUpgraderProvider upgraderProvider = new DefaultUpgraderProvider(this);
-
- DurableConfigurationRecoverer configRecoverer =
- new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(),
- upgraderProvider, getEventLogger());
- _messageStore.recoverConfigurationStore(configRecoverer);
+ ConfiguredObjectRecordHandler upgraderRecoverer = new ConfiguredObjectRecordRecoveverAndUpgrader(this, getDurableConfigurationRecoverers());
+ _messageStore.visitConfiguredObjectRecords(upgraderRecoverer);
initialiseModel();
- VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(BDBHAVirtualHost.this, getMessageStoreLogSubject());
- _messageStore.recoverMessageStore(recoveryHandler, recoveryHandler);
+ new MessageStoreRecoverer(this, getMessageStoreLogSubject()).recover();
attainActivation();
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index c64bc43066..652e4c135d 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -27,8 +27,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
@@ -38,29 +36,23 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
import org.apache.qpid.server.store.EventManager;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMemoryMessage;
import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
-import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.store.Xid;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory.EnvironmentFacadeTask;
import org.apache.qpid.server.store.berkeleydb.entry.HierarchyKey;
import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey;
-import org.apache.qpid.server.store.berkeleydb.entry.Xid;
import org.apache.qpid.server.store.berkeleydb.tuple.ConfiguredObjectBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.HierarchyKeyBinding;
@@ -70,6 +62,10 @@ import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;
import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.util.FileUtils;
@@ -129,7 +125,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
private long _persistentSizeHighThreshold;
private final EventManager _eventManager = new EventManager();
- private final String _type;
private final EnvironmentFacadeFactory _environmentFacadeFactory;
@@ -143,7 +138,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
public BDBMessageStore(EnvironmentFacadeFactory environmentFacadeFactory)
{
- _type = environmentFacadeFactory.getType();
_environmentFacadeFactory = environmentFacadeFactory;
}
@@ -160,18 +154,19 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
if (_environmentFacade == null)
{
- String[] databaseNames = null;
+ EnvironmentFacadeTask[] initialisationTasks = null;
if (MapValueConverter.getBooleanAttribute(IS_MESSAGE_STORE_TOO, storeSettings, false))
{
- databaseNames = new String[CONFIGURATION_STORE_DATABASE_NAMES.length + MESSAGE_STORE_DATABASE_NAMES.length];
+ String[] databaseNames = new String[CONFIGURATION_STORE_DATABASE_NAMES.length + MESSAGE_STORE_DATABASE_NAMES.length];
System.arraycopy(CONFIGURATION_STORE_DATABASE_NAMES, 0, databaseNames, 0, CONFIGURATION_STORE_DATABASE_NAMES.length);
System.arraycopy(MESSAGE_STORE_DATABASE_NAMES, 0, databaseNames, CONFIGURATION_STORE_DATABASE_NAMES.length, MESSAGE_STORE_DATABASE_NAMES.length);
+ initialisationTasks = new EnvironmentFacadeTask[]{new UpgradeTask(parent), new OpenDatabasesTask(databaseNames), new DiskSpaceTask(), new MaxMessageIdTask() };
}
else
{
- databaseNames = CONFIGURATION_STORE_DATABASE_NAMES;
+ initialisationTasks = new EnvironmentFacadeTask[]{new UpgradeTask(parent), new OpenDatabasesTask(CONFIGURATION_STORE_DATABASE_NAMES)};
}
- _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(storeSettings, new UpgradeTask(parent), new OpenDatabasesTask(databaseNames));
+ _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(storeSettings, initialisationTasks);
}
else
{
@@ -181,11 +176,88 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
@Override
- public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler)
+ public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler)
{
checkConfigurationStoreOpen();
- recoverConfig(recoveryHandler);
+ try
+ {
+ int configVersion = getConfigVersion();
+
+ handler.begin(configVersion);
+ doVisitAllConfiguredObjectRecords(handler);
+
+ int newConfigVersion = handler.end();
+ if(newConfigVersion != configVersion)
+ {
+ updateConfigVersion(newConfigVersion);
+ }
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Cannot visit configured object records", e);
+ }
+
+ }
+
+ private void doVisitAllConfiguredObjectRecords(ConfiguredObjectRecordHandler handler)
+ {
+ Map<UUID, BDBConfiguredObjectRecord> configuredObjects = new HashMap<UUID, BDBConfiguredObjectRecord>();
+ Cursor objectsCursor = null;
+ Cursor hierarchyCursor = null;
+ try
+ {
+ objectsCursor = getConfiguredObjectsDb().openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+
+
+ while (objectsCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ UUID id = UUIDTupleBinding.getInstance().entryToObject(key);
+
+ BDBConfiguredObjectRecord configuredObject =
+ (BDBConfiguredObjectRecord) new ConfiguredObjectBinding(id).entryToObject(value);
+ configuredObjects.put(configuredObject.getId(), configuredObject);
+ }
+
+ // set parents
+ hierarchyCursor = getConfiguredObjectHierarchyDb().openCursor(null, null);
+ while (hierarchyCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ HierarchyKey hk = HierarchyKeyBinding.getInstance().entryToObject(key);
+ UUID parentId = UUIDTupleBinding.getInstance().entryToObject(value);
+ BDBConfiguredObjectRecord child = configuredObjects.get(hk.getChildId());
+ if(child != null)
+ {
+ ConfiguredObjectRecord parent = configuredObjects.get(parentId);
+ if(parent != null)
+ {
+ child.addParent(hk.getParentType(), parent);
+ }
+ else if(hk.getParentType().equals("Exchange"))
+ {
+ // TODO - remove this hack for the pre-defined exchanges
+ child.addParent(hk.getParentType(), new BDBConfiguredObjectRecord(parentId, "Exchange", Collections.<String,Object>emptyMap()));
+ }
+ }
+ }
+ }
+ finally
+ {
+ closeCursorSafely(objectsCursor);
+ closeCursorSafely(hierarchyCursor);
+ }
+
+ for (ConfiguredObjectRecord record : configuredObjects.values())
+ {
+ boolean shoudlContinue = handler.handle(record);
+ if (!shoudlContinue)
+ {
+ break;
+ }
+ }
+
}
@Override
@@ -209,7 +281,8 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
if (_environmentFacade == null)
{
- _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(messageStoreSettings, new UpgradeTask(parent), new OpenDatabasesTask(MESSAGE_STORE_DATABASE_NAMES), new DiskSpaceTask());
+ _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(messageStoreSettings,
+ new UpgradeTask(parent), new OpenDatabasesTask(MESSAGE_STORE_DATABASE_NAMES), new DiskSpaceTask(), new MaxMessageIdTask());
}
_committer = _environmentFacade.createCommitter(parent.getName());
@@ -218,21 +291,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
@Override
- public synchronized void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler) throws StoreException
- {
- checkMessageStoreOpen();
-
- if(messageRecoveryHandler != null)
- {
- recoverMessages(messageRecoveryHandler);
- }
- if(transactionLogRecoveryHandler != null)
- {
- recoverQueueEntries(transactionLogRecoveryHandler);
- }
- }
-
- @Override
public org.apache.qpid.server.store.Transaction newTransaction() throws StoreException
{
checkMessageStoreOpen();
@@ -314,27 +372,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
}
- private void recoverConfig(ConfigurationRecoveryHandler recoveryHandler) throws StoreException
- {
- try
- {
- final int configVersion = getConfigVersion();
- recoveryHandler.beginConfigurationRecovery(this, configVersion);
- loadConfiguredObjects(recoveryHandler);
-
- final int newConfigVersion = recoveryHandler.completeConfigurationRecovery();
- if(newConfigVersion != configVersion)
- {
- updateConfigVersion(newConfigVersion);
- }
- }
- catch (DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("Error recovering persistent state: " + e.getMessage(), e);
- }
-
- }
-
@SuppressWarnings("resource")
private void updateConfigVersion(int newConfigVersion) throws StoreException
{
@@ -399,62 +436,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
}
- private void loadConfiguredObjects(ConfigurationRecoveryHandler crh) throws DatabaseException, StoreException
- {
- Cursor objectsCursor = null;
- Cursor hierarchyCursor = null;
- try
- {
- objectsCursor = getConfiguredObjectsDb().openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
-
- Map<UUID, BDBConfiguredObjectRecord> configuredObjects =
- new HashMap<UUID, BDBConfiguredObjectRecord>();
-
- while (objectsCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- UUID id = UUIDTupleBinding.getInstance().entryToObject(key);
-
- BDBConfiguredObjectRecord configuredObject =
- (BDBConfiguredObjectRecord) new ConfiguredObjectBinding(id).entryToObject(value);
- configuredObjects.put(configuredObject.getId(), configuredObject);
- }
-
- // set parents
- hierarchyCursor = getConfiguredObjectHierarchyDb().openCursor(null, null);
- while (hierarchyCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- HierarchyKey hk = HierarchyKeyBinding.getInstance().entryToObject(key);
- UUID parentId = UUIDTupleBinding.getInstance().entryToObject(value);
- BDBConfiguredObjectRecord child = configuredObjects.get(hk.getChildId());
- if(child != null)
- {
- ConfiguredObjectRecord parent = configuredObjects.get(parentId);
- if(parent != null)
- {
- child.addParent(hk.getParentType(), parent);
- }
- else if(hk.getParentType().equals("Exchange"))
- {
- // TODO - remove this hack for the pre-defined exchanges
- child.addParent(hk.getParentType(), new BDBConfiguredObjectRecord(parentId, "Exchange", Collections.<String,Object>emptyMap()));
- }
- }
- }
-
- for (ConfiguredObjectRecord record : configuredObjects.values())
- {
- crh.configuredObject(record);
- }
- }
- finally
- {
- closeCursorSafely(objectsCursor);
- closeCursorSafely(hierarchyCursor);
- }
- }
-
private void closeCursorSafely(Cursor cursor) throws StoreException
{
if (cursor != null)
@@ -470,124 +451,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
}
- private void recoverMessages(MessageStoreRecoveryHandler msrh) throws StoreException
- {
- StoredMessageRecoveryHandler mrh = msrh.begin();
-
- Cursor cursor = null;
- try
- {
- cursor = getMessageMetaDataDb().openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
- MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance();
-
- long maxId = 0;
-
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- long messageId = LongBinding.entryToLong(key);
- StorableMessageMetaData metaData = valueBinding.entryToObject(value);
-
- StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true);
-
- mrh.message(message);
-
- maxId = Math.max(maxId, messageId);
- }
-
- _messageId.set(maxId);
- mrh.completeMessageRecovery();
- }
- catch (DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("Cannot recover messages", e);
- }
- finally
- {
- closeCursorSafely(cursor);
- }
- }
-
- private void recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler)
- throws StoreException
- {
- QueueEntryRecoveryHandler qerh = recoveryHandler.begin(this);
-
- ArrayList<QueueEntryKey> entries = new ArrayList<QueueEntryKey>();
-
- Cursor cursor = null;
- try
- {
- cursor = getDeliveryDb().openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
-
- DatabaseEntry value = new DatabaseEntry();
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- QueueEntryKey qek = keyBinding.entryToObject(key);
-
- entries.add(qek);
- }
-
- try
- {
- cursor.close();
- }
- finally
- {
- cursor = null;
- }
-
- for(QueueEntryKey entry : entries)
- {
- UUID queueId = entry.getQueueId();
- long messageId = entry.getMessageId();
- qerh.queueEntry(queueId, messageId);
- }
- }
- catch (DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("Cannot recover queue entries", e);
- }
- finally
- {
- closeCursorSafely(cursor);
- }
-
- TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = qerh.completeQueueEntryRecovery();
-
- cursor = null;
- try
- {
- cursor = getXidDb().openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- XidBinding keyBinding = XidBinding.getInstance();
- PreparedTransactionBinding valueBinding = new PreparedTransactionBinding();
- DatabaseEntry value = new DatabaseEntry();
-
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- Xid xid = keyBinding.entryToObject(key);
- PreparedTransaction preparedTransaction = valueBinding.entryToObject(value);
- dtxrh.dtxRecord(xid.getFormat(),xid.getGlobalId(),xid.getBranchId(),
- preparedTransaction.getEnqueues(),preparedTransaction.getDequeues());
- }
-
- }
- catch (DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("Cannot recover transactions", e);
- }
- finally
- {
- closeCursorSafely(cursor);
- }
-
-
- dtxrh.completeDtxRecordRecovery();
- }
void removeMessage(long messageId, boolean sync) throws StoreException
{
@@ -738,6 +601,12 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
public void create(ConfiguredObjectRecord configuredObject) throws StoreException
{
checkConfigurationStoreOpen();
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Create " + configuredObject);
+ }
+
com.sleepycat.je.Transaction txn = null;
try
{
@@ -831,7 +700,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("Updating " + record.getType() + ", id: " + record.getId());
+ LOGGER.debug("Updating, creating " + createIfNecessary + " : " + record);
}
DatabaseEntry key = new DatabaseEntry();
@@ -889,8 +758,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Enqueuing message " + messageId + " on queue "
- + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId()
- + " in transaction " + tx);
+ + queue.getName() + " with id " + queue.getId() + " in transaction " + tx);
}
getDeliveryDb().put(tx, key, value);
}
@@ -898,8 +766,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
LOGGER.error("Failed to enqueue: " + e.getMessage(), e);
throw _environmentFacade.handleDatabaseException("Error writing enqueued message with id " + messageId + " for queue "
- + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId()
- + " to database", e);
+ + queue.getName() + " with id " + queue.getId() + " to database", e);
}
}
@@ -924,7 +791,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Dequeue message id " + messageId + " from queue "
- + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id);
+ + queue.getName() + " with id " + id);
}
try
@@ -934,19 +801,18 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
if (status == OperationStatus.NOTFOUND)
{
throw new StoreException("Unable to find message with id " + messageId + " on queue "
- + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id);
+ + queue.getName() + " with id " + id);
}
else if (status != OperationStatus.SUCCESS)
{
throw new StoreException("Unable to remove message with id " + messageId + " on queue"
- + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id);
+ + queue.getName() + " with id " + id);
}
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Removed message " + messageId + " on queue "
- + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id
- + " from delivery db");
+ + queue.getName() + " with id " + id);
}
}
@@ -1072,57 +938,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
/**
- * Primarily for testing purposes.
- *
- * @param queueId
- *
- * @return a list of message ids for messages enqueued for a particular queue
- */
- List<Long> getEnqueuedMessages(UUID queueId) throws StoreException
- {
- Cursor cursor = null;
- try
- {
- cursor = getDeliveryDb().openCursor(null, null);
-
- DatabaseEntry key = new DatabaseEntry();
-
- QueueEntryKey dd = new QueueEntryKey(queueId, 0);
-
- QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
- keyBinding.objectToEntry(dd, key);
-
- DatabaseEntry value = new DatabaseEntry();
-
- LinkedList<Long> messageIds = new LinkedList<Long>();
-
- OperationStatus status = cursor.getSearchKeyRange(key, value, LockMode.DEFAULT);
- dd = keyBinding.entryToObject(key);
-
- while ((status == OperationStatus.SUCCESS) && dd.getQueueId().equals(queueId))
- {
-
- messageIds.add(dd.getMessageId());
- status = cursor.getNext(key, value, LockMode.DEFAULT);
- if (status == OperationStatus.SUCCESS)
- {
- dd = keyBinding.entryToObject(key);
- }
- }
-
- return messageIds;
- }
- catch (DatabaseException e)
- {
- throw new StoreException("Database error: " + e.getMessage(), e);
- }
- finally
- {
- closeCursorSafely(cursor);
- }
- }
-
- /**
* Return a valid, currently unused message id.
*
* @return A fresh message id.
@@ -1792,12 +1607,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
}
- @Override
- public String getStoreType()
- {
- return _type;
- }
-
private Database getConfiguredObjectsDb()
{
return _environmentFacade.getOpenDatabase(CONFIGURED_OBJECTS_DB_NAME);
@@ -1901,4 +1710,147 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
}
+
+ public class MaxMessageIdTask implements EnvironmentFacadeTask, MessageHandler
+ {
+ private long _maxId;
+
+ @Override
+ public void execute(EnvironmentFacade facade)
+ {
+ visitMessagesInternal(this, facade);
+ _messageId.set(_maxId);
+ }
+
+ @Override
+ public boolean handle(StoredMessage<?> storedMessage)
+ {
+ long id = storedMessage.getMessageNumber();
+ if (_maxId<id)
+ {
+ _maxId = id;
+ }
+ return true;
+ }
+
+ }
+
+ @Override
+ public void visitMessages(MessageHandler handler) throws StoreException
+ {
+ checkMessageStoreOpen();
+ visitMessagesInternal(handler, _environmentFacade);
+ }
+
+ private void visitMessagesInternal(MessageHandler handler, EnvironmentFacade environmentFacade)
+ {
+ Cursor cursor = null;
+ try
+ {
+ cursor = environmentFacade.getOpenDatabase(MESSAGE_META_DATA_DB_NAME).openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+ MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance();
+
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ long messageId = LongBinding.entryToLong(key);
+ StorableMessageMetaData metaData = valueBinding.entryToObject(value);
+ StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true);
+
+ if (!handler.handle(message))
+ {
+ break;
+ }
+ }
+ }
+ catch (DatabaseException e)
+ {
+ throw environmentFacade.handleDatabaseException("Cannot recover messages", e);
+ }
+ finally
+ {
+ if (cursor != null)
+ {
+ try
+ {
+ cursor.close();
+ }
+ catch(DatabaseException e)
+ {
+ throw environmentFacade.handleDatabaseException("Cannot close cursor", e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ Cursor cursor = null;
+ try
+ {
+ cursor = getDeliveryDb().openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
+
+ DatabaseEntry value = new DatabaseEntry();
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ QueueEntryKey entry = keyBinding.entryToObject(key);
+ UUID queueId = entry.getQueueId();
+ long messageId = entry.getMessageId();
+ if (!handler.handle(queueId, messageId))
+ {
+ break;
+ }
+ }
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Cannot visit message instances", e);
+ }
+ finally
+ {
+ closeCursorSafely(cursor);
+ }
+ }
+
+ @Override
+ public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ Cursor cursor = null;
+ try
+ {
+ cursor = getXidDb().openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ XidBinding keyBinding = XidBinding.getInstance();
+ PreparedTransactionBinding valueBinding = new PreparedTransactionBinding();
+ DatabaseEntry value = new DatabaseEntry();
+
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ Xid xid = keyBinding.entryToObject(key);
+ PreparedTransaction preparedTransaction = valueBinding.entryToObject(value);
+ if (!handler.handle(xid.getFormat(),xid.getGlobalId(),xid.getBranchId(),
+ preparedTransaction.getEnqueues(),preparedTransaction.getDequeues()))
+ {
+ break;
+ }
+ }
+
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Cannot recover distributed transactions", e);
+ }
+ finally
+ {
+ closeCursorSafely(cursor);
+ }
+ }
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/XidBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/XidBinding.java
index 01a5b75fef..5918e5ab54 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/XidBinding.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/XidBinding.java
@@ -25,7 +25,7 @@ import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
-import org.apache.qpid.server.store.berkeleydb.entry.Xid;
+import org.apache.qpid.server.store.Xid;
public class XidBinding extends TupleBinding<Xid>
{
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
index 465c49e0c4..6fba1b215e 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -20,14 +20,11 @@
*/
package org.apache.qpid.server.store.berkeleydb;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
import java.io.File;
import java.nio.ByteBuffer;
import java.util.Arrays;
-import java.util.List;
-import java.util.UUID;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -35,25 +32,15 @@ import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.message.EnqueueableMessage;
-import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.protocol.v0_10.MessageMetaDataType_0_10;
import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10;
import org.apache.qpid.server.protocol.v0_8.MessageMetaData;
import org.apache.qpid.server.protocol.v0_8.MessageMetaDataType_0_8;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
-import org.apache.qpid.server.store.MessageStoreTest;
+import org.apache.qpid.server.store.MessageStoreTestCase;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.Transaction;
-import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageAcceptMode;
@@ -62,15 +49,31 @@ import org.apache.qpid.transport.MessageDeliveryMode;
import org.apache.qpid.transport.MessageDeliveryPriority;
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.MessageTransfer;
+import org.apache.qpid.util.FileUtils;
/**
- * Subclass of MessageStoreTest which runs the standard tests from the superclass against
+ * Subclass of MessageStoreTestCase which runs the standard tests from the superclass against
* the BDB Store as well as additional tests specific to the BDB store-implementation.
*/
-public class BDBMessageStoreTest extends MessageStoreTest
+public class BDBMessageStoreTest extends MessageStoreTestCase
{
private static byte[] CONTENT_BYTES = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
+ private String _storeLocation;
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ super.tearDown();
+ }
+ finally
+ {
+ deleteStoreIfExists();
+ }
+ }
+
/**
* Tests that message metadata and content are successfully read back from a
* store after it has been reloaded. Both 0-8 and 0-10 metadata is used to
@@ -78,9 +81,7 @@ public class BDBMessageStoreTest extends MessageStoreTest
*/
public void testBDBMessagePersistence() throws Exception
{
- MessageStore store = getVirtualHost().getMessageStore();
-
- BDBMessageStore bdbStore = assertBDBStore(store);
+ BDBMessageStore bdbStore = (BDBMessageStore)getStore();
// Create content ByteBuffers.
// Split the content into 2 chunks for the 0-8 message, as per broker behaviour.
@@ -133,12 +134,13 @@ public class BDBMessageStoreTest extends MessageStoreTest
/*
* reload the store only (read-only)
*/
- BDBMessageStore readOnlyStore = reloadStore(bdbStore);
+ reopenStore();
/*
* Read back and validate the 0-8 message metadata and content
*/
- StorableMessageMetaData storeableMMD_0_8 = readOnlyStore.getMessageMetaData(messageid_0_8);
+ BDBMessageStore reopenedBdbStore = (BDBMessageStore) getStore();
+ StorableMessageMetaData storeableMMD_0_8 = reopenedBdbStore.getMessageMetaData(messageid_0_8);
assertEquals("Unexpected message type", MessageMetaDataType_0_8.TYPE, storeableMMD_0_8.getType().ordinal());
assertTrue("Unexpected instance type", storeableMMD_0_8 instanceof MessageMetaData);
@@ -162,7 +164,7 @@ public class BDBMessageStoreTest extends MessageStoreTest
assertEquals("Property MessageID has changed", props_0_8.getMessageIdAsString(), returnedProperties_0_8.getMessageIdAsString());
ByteBuffer recoveredContent_0_8 = ByteBuffer.allocate((int) chb_0_8.getBodySize()) ;
- long recoveredCount_0_8 = readOnlyStore.getContent(messageid_0_8, 0, recoveredContent_0_8);
+ long recoveredCount_0_8 = reopenedBdbStore.getContent(messageid_0_8, 0, recoveredContent_0_8);
assertEquals("Incorrect amount of payload data recovered", chb_0_8.getBodySize(), recoveredCount_0_8);
String returnedPayloadString_0_8 = new String(recoveredContent_0_8.array());
assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_8);
@@ -170,7 +172,7 @@ public class BDBMessageStoreTest extends MessageStoreTest
/*
* Read back and validate the 0-10 message metadata and content
*/
- StorableMessageMetaData storeableMMD_0_10 = readOnlyStore.getMessageMetaData(messageid_0_10);
+ StorableMessageMetaData storeableMMD_0_10 = reopenedBdbStore.getMessageMetaData(messageid_0_10);
assertEquals("Unexpected message type", MessageMetaDataType_0_10.TYPE, storeableMMD_0_10.getType().ordinal());
assertTrue("Unexpected instance type", storeableMMD_0_10 instanceof MessageMetaData_0_10);
@@ -193,13 +195,13 @@ public class BDBMessageStoreTest extends MessageStoreTest
assertEquals("Message content type has changed", msgProps_0_10.getContentType(), returnedMsgProps.getContentType());
ByteBuffer recoveredContent = ByteBuffer.allocate((int) msgProps_0_10.getContentLength()) ;
- long recoveredCount = readOnlyStore.getContent(messageid_0_10, 0, recoveredContent);
+ long recoveredCount = reopenedBdbStore.getContent(messageid_0_10, 0, recoveredContent);
assertEquals("Incorrect amount of payload data recovered", msgProps_0_10.getContentLength(), recoveredCount);
String returnedPayloadString_0_10 = new String(recoveredContent.array());
assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_10);
- readOnlyStore.closeMessageStore();
+ reopenedBdbStore.closeMessageStore();
}
private DeliveryProperties createDeliveryProperties_0_10()
@@ -226,28 +228,6 @@ public class BDBMessageStoreTest extends MessageStoreTest
return msgProps_0_10;
}
- /**
- * Close the provided store and create a new (read-only) store to read back the data.
- *
- * Use this method instead of reloading the virtual host like other tests in order
- * to avoid the recovery handler deleting the message for not being on a queue.
- */
- private BDBMessageStore reloadStore(BDBMessageStore messageStore) throws Exception
- {
- messageStore.closeMessageStore();
-
-
- BDBMessageStore newStore = new BDBMessageStore();
-
- MessageStoreRecoveryHandler recoveryHandler = mock(MessageStoreRecoveryHandler.class);
- when(recoveryHandler.begin()).thenReturn(mock(StoredMessageRecoveryHandler.class));
- VirtualHost<?> virtualHost = getVirtualHostModel();
- newStore.openMessageStore(virtualHost, virtualHost.getMessageStoreSettings());
-
- newStore.recoverMessageStore(recoveryHandler, null);
-
- return newStore;
- }
private MessagePublishInfo createPublishInfoBody_0_8()
{
@@ -258,20 +238,24 @@ public class BDBMessageStoreTest extends MessageStoreTest
return new AMQShortString("exchange12345");
}
+ @Override
public void setExchange(AMQShortString exchange)
{
}
+ @Override
public boolean isImmediate()
{
return false;
}
+ @Override
public boolean isMandatory()
{
return true;
}
+ @Override
public AMQShortString getRoutingKey()
{
return new AMQShortString("routingKey12345");
@@ -298,9 +282,8 @@ public class BDBMessageStoreTest extends MessageStoreTest
public void testGetContentWithOffset() throws Exception
{
- MessageStore store = getVirtualHost().getMessageStore();
- BDBMessageStore bdbStore = assertBDBStore(store);
- StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store);
+ BDBMessageStore bdbStore = (BDBMessageStore) getStore();
+ StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(bdbStore);
long messageid_0_8 = storedMessage_0_8.getMessageNumber();
// normal case: offset is 0
@@ -350,6 +333,7 @@ public class BDBMessageStoreTest extends MessageStoreTest
System.arraycopy(CONTENT_BYTES, 2, expected, 0, 5);
assertTrue("Unexpected content", Arrays.equals(expected, array));
}
+
/**
* Tests that messages which are added to the store and then removed using the
* public MessageStore interfaces are actually removed from the store by then
@@ -358,10 +342,9 @@ public class BDBMessageStoreTest extends MessageStoreTest
*/
public void testMessageCreationAndRemoval() throws Exception
{
- MessageStore store = getVirtualHost().getMessageStore();
- BDBMessageStore bdbStore = assertBDBStore(store);
+ BDBMessageStore bdbStore = (BDBMessageStore)getStore();
- StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store);
+ StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(bdbStore);
long messageid_0_8 = storedMessage_0_8.getMessageNumber();
bdbStore.removeMessage(messageid_0_8, true);
@@ -384,13 +367,6 @@ public class BDBMessageStoreTest extends MessageStoreTest
assertEquals("Retrieved content when none was expected",
0, bdbStore.getContent(messageid_0_8, 0, dst));
}
- private BDBMessageStore assertBDBStore(MessageStore store)
- {
-
- assertEquals("Test requires an instance of BDBMessageStore to proceed", BDBMessageStore.class, store.getClass());
-
- return (BDBMessageStore) store;
- }
private StoredMessage<MessageMetaData> createAndStoreSingleChunkMessage_0_8(MessageStore store)
{
@@ -413,254 +389,48 @@ public class BDBMessageStoreTest extends MessageStoreTest
return storedMessage_0_8;
}
- /**
- * Tests transaction commit by utilising the enqueue and dequeue methods available
- * in the TransactionLog interface implemented by the store, and verifying the
- * behaviour using BDB implementation methods.
- */
- public void testTranCommit() throws Exception
- {
- MessageStore log = getVirtualHost().getMessageStore();
-
- BDBMessageStore bdbStore = assertBDBStore(log);
-
- final UUID mockQueueId = UUIDGenerator.generateRandomUUID();
- TransactionLogResource mockQueue = new TransactionLogResource()
- {
- @Override
- public String getName()
- {
- return getId().toString();
- }
-
- @Override
- public UUID getId()
- {
- return mockQueueId;
- }
-
- @Override
- public boolean isDurable()
- {
- return true;
- }
- };
-
- Transaction txn = log.newTransaction();
-
- txn.enqueueMessage(mockQueue, new MockMessage(1L));
- txn.enqueueMessage(mockQueue, new MockMessage(5L));
- txn.commitTran();
-
- List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueId);
-
- assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
- Long val = enqueuedIds.get(0);
- assertEquals("First Message is incorrect", 1L, val.longValue());
- val = enqueuedIds.get(1);
- assertEquals("Second Message is incorrect", 5L, val.longValue());
- }
-
-
- /**
- * Tests transaction rollback before a commit has occurred by utilising the
- * enqueue and dequeue methods available in the TransactionLog interface
- * implemented by the store, and verifying the behaviour using BDB
- * implementation methods.
- */
- public void testTranRollbackBeforeCommit() throws Exception
- {
- MessageStore log = getVirtualHost().getMessageStore();
-
- BDBMessageStore bdbStore = assertBDBStore(log);
-
- final UUID mockQueueId = UUIDGenerator.generateRandomUUID();
- TransactionLogResource mockQueue = new TransactionLogResource()
- {
- @Override
- public String getName()
- {
- return getId().toString();
- }
-
- @Override
- public UUID getId()
- {
- return mockQueueId;
- }
-
- @Override
- public boolean isDurable()
- {
- return true;
- }
- };
-
- Transaction txn = log.newTransaction();
-
- txn.enqueueMessage(mockQueue, new MockMessage(21L));
- txn.abortTran();
-
- txn = log.newTransaction();
- txn.enqueueMessage(mockQueue, new MockMessage(22L));
- txn.enqueueMessage(mockQueue, new MockMessage(23L));
- txn.commitTran();
-
- List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueId);
-
- assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
- Long val = enqueuedIds.get(0);
- assertEquals("First Message is incorrect", 22L, val.longValue());
- val = enqueuedIds.get(1);
- assertEquals("Second Message is incorrect", 23L, val.longValue());
- }
-
public void testOnDelete() throws Exception
{
- MessageStore log = getVirtualHost().getMessageStore();
- BDBMessageStore bdbStore = assertBDBStore(log);
- String storeLocation = bdbStore.getStoreLocation();
+ String storeLocation = getStore().getStoreLocation();
File location = new File(storeLocation);
assertTrue("Store does not exist at " + storeLocation, location.exists());
- bdbStore.closeMessageStore();
+ getStore().closeMessageStore();
assertTrue("Store does not exist at " + storeLocation, location.exists());
- bdbStore.onDelete();
+ getStore().onDelete();
assertFalse("Store exists at " + storeLocation, location.exists());
}
- /**
- * Tests transaction rollback after a commit has occurred by utilising the
- * enqueue and dequeue methods available in the TransactionLog interface
- * implemented by the store, and verifying the behaviour using BDB
- * implementation methods.
- */
- public void testTranRollbackAfterCommit() throws Exception
+
+ @Override
+ protected Map<String, Object> getStoreSettings() throws Exception
{
- MessageStore log = getVirtualHost().getMessageStore();
+ _storeLocation = TMP_FOLDER + File.separator + getTestName();
+ deleteStoreIfExists();
+ Map<String, Object> messageStoreSettings = new HashMap<String, Object>();
+ messageStoreSettings.put(MessageStore.STORE_PATH, _storeLocation);
+ return messageStoreSettings;
- BDBMessageStore bdbStore = assertBDBStore(log);
+ }
- final UUID mockQueueId = UUIDGenerator.generateRandomUUID();
- TransactionLogResource mockQueue = new TransactionLogResource()
+ private void deleteStoreIfExists()
+ {
+ if (_storeLocation != null)
{
- @Override
- public String getName()
+ File location = new File(_storeLocation);
+ if (location.exists())
{
- return getId().toString();
+ FileUtils.delete(location, true);
}
-
- @Override
- public UUID getId()
- {
- return mockQueueId;
- }
-
- @Override
- public boolean isDurable()
- {
- return true;
- }
- };
-
- Transaction txn = log.newTransaction();
-
- txn.enqueueMessage(mockQueue, new MockMessage(30L));
- txn.commitTran();
-
- txn = log.newTransaction();
- txn.enqueueMessage(mockQueue, new MockMessage(31L));
- txn.abortTran();
-
- txn = log.newTransaction();
- txn.enqueueMessage(mockQueue, new MockMessage(32L));
- txn.commitTran();
-
- List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueId);
-
- assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
- Long val = enqueuedIds.get(0);
- assertEquals("First Message is incorrect", 30L, val.longValue());
- val = enqueuedIds.get(1);
- assertEquals("Second Message is incorrect", 32L, val.longValue());
+ }
}
- @SuppressWarnings("rawtypes")
- private static class MockMessage implements ServerMessage, EnqueueableMessage
+ @Override
+ protected MessageStore createMessageStore()
{
- private long _messageId;
-
- public MockMessage(long messageId)
- {
- _messageId = messageId;
- }
-
- public String getInitialRoutingAddress()
- {
- return null;
- }
-
- public AMQMessageHeader getMessageHeader()
- {
- return null;
- }
-
- public StoredMessage getStoredMessage()
- {
- return null;
- }
-
- public boolean isPersistent()
- {
- return true;
- }
-
- public long getSize()
- {
- return 0;
- }
-
- public boolean isImmediate()
- {
- return false;
- }
-
- public long getExpiration()
- {
- return 0;
- }
-
- public MessageReference newReference()
- {
- return null;
- }
-
- public long getMessageNumber()
- {
- return _messageId;
- }
-
- public long getArrivalTime()
- {
- return 0;
- }
-
- public int getContent(ByteBuffer buf, int offset)
- {
- return 0;
- }
-
- public ByteBuffer getContent(int offset, int length)
- {
- return null;
- }
-
- @Override
- public Object getConnectionReference()
- {
- return null;
- }
+ return new BDBMessageStore();
}
+
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
index 0460b1ce4c..717534a6b8 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
@@ -48,7 +48,7 @@ import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
-import org.apache.qpid.server.store.berkeleydb.entry.Xid;
+import org.apache.qpid.server.store.Xid;
import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;
import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CompoundKey;
import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CompoundKeyBinding;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/BrokerStoreUpgrader.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/BrokerStoreUpgrader.java
index 1446cca156..b7b672fd58 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/BrokerStoreUpgrader.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/BrokerStoreUpgrader.java
@@ -35,12 +35,12 @@ import org.apache.qpid.server.configuration.store.StoreConfigurationChangeListen
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Model;
import org.apache.qpid.server.model.SystemContext;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.DurableConfigurationStoreUpgrader;
import org.apache.qpid.server.store.NonNullUpgrader;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
public class BrokerStoreUpgrader
{
@@ -583,17 +583,17 @@ public class BrokerStoreUpgrader
public Broker upgrade(DurableConfigurationStore store)
{
- final BrokerStoreRecoveryHandler recoveryHandler = new BrokerStoreRecoveryHandler(_systemContext);
+ final BrokerStoreRecoveryHandler recoveryHandler = new BrokerStoreRecoveryHandler(_systemContext, store);
store.openConfigurationStore(_systemContext, Collections.<String,Object>emptyMap());
- store.recoverConfigurationStore(recoveryHandler);
+ store.visitConfiguredObjectRecords(recoveryHandler);
return recoveryHandler.getBroker();
}
- private static class BrokerStoreRecoveryHandler implements ConfigurationRecoveryHandler
+ private static class BrokerStoreRecoveryHandler implements ConfiguredObjectRecordHandler
{
- private static Logger LOGGER = Logger.getLogger(ConfigurationRecoveryHandler.class);
+ private static Logger LOGGER = Logger.getLogger(BrokerStoreRecoveryHandler.class);
private DurableConfigurationStoreUpgrader _upgrader;
private DurableConfigurationStore _store;
@@ -601,27 +601,28 @@ public class BrokerStoreUpgrader
private int _version;
private final SystemContext _systemContext;
- private BrokerStoreRecoveryHandler(final SystemContext systemContext)
+ private BrokerStoreRecoveryHandler(final SystemContext systemContext, DurableConfigurationStore store)
{
_systemContext = systemContext;
+ _store = store;
}
@Override
- public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion)
+ public void begin(final int configVersion)
{
- _store = store;
_version = configVersion;
}
@Override
- public void configuredObject(final ConfiguredObjectRecord object)
+ public boolean handle(final ConfiguredObjectRecord object)
{
_records.put(object.getId(), object);
+ return true;
}
@Override
- public int completeConfigurationRecovery()
+ public int end()
{
String version = getCurrentVersion();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/JsonConfigurationEntryStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/JsonConfigurationEntryStore.java
index 7024068099..59f248c9f5 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/JsonConfigurationEntryStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/JsonConfigurationEntryStore.java
@@ -20,14 +20,6 @@
*/
package org.apache.qpid.server.configuration.store;
-import org.apache.qpid.server.configuration.ConfigurationEntry;
-import org.apache.qpid.server.configuration.IllegalConfigurationException;
-import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
-import org.apache.qpid.server.store.ConfiguredObjectRecord;
-import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.server.store.StoreException;
-
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
@@ -35,6 +27,14 @@ import java.util.Collections;
import java.util.Map;
import java.util.UUID;
+import org.apache.qpid.server.configuration.ConfigurationEntry;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
+import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+
public class JsonConfigurationEntryStore extends MemoryConfigurationEntryStore
{
public static final String STORE_TYPE = "json";
@@ -124,30 +124,31 @@ public class JsonConfigurationEntryStore extends MemoryConfigurationEntryStore
else
{
final Collection<ConfiguredObjectRecord> records = new ArrayList<ConfiguredObjectRecord>();
- final ConfigurationRecoveryHandler replayHandler = new ConfigurationRecoveryHandler()
+ final ConfiguredObjectRecordHandler replayHandler = new ConfiguredObjectRecordHandler()
{
private int _configVersion;
@Override
- public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion)
+ public void begin(final int configVersion)
{
_configVersion = configVersion;
}
@Override
- public void configuredObject(ConfiguredObjectRecord record)
+ public boolean handle(ConfiguredObjectRecord record)
{
records.add(record);
+ return true;
}
@Override
- public int completeConfigurationRecovery()
+ public int end()
{
return _configVersion;
}
};
initialStore.openConfigurationStore(_parentObject, Collections.<String,Object>emptyMap());
- initialStore.recoverConfigurationStore(replayHandler);
+ initialStore.visitConfiguredObjectRecords(replayHandler);
update(true, records.toArray(new ConfiguredObjectRecord[records.size()]));
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandler.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandler.java
index 21fffea80f..cdf44822ef 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandler.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandler.java
@@ -38,11 +38,11 @@ import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
import org.apache.qpid.server.util.MapValueConverter;
public class ManagementModeStoreHandler implements DurableConfigurationStore
@@ -80,20 +80,21 @@ public class ManagementModeStoreHandler implements DurableConfigurationStore
_records = new HashMap<UUID, ConfiguredObjectRecord>();
- final ConfigurationRecoveryHandler localRecoveryHandler = new ConfigurationRecoveryHandler()
+ final ConfiguredObjectRecordHandler localRecoveryHandler = new ConfiguredObjectRecordHandler()
{
private int _version;
private boolean _quiesceRmiPort = _options.getManagementModeRmiPortOverride() > 0;
private boolean _quiesceJmxPort = _options.getManagementModeJmxPortOverride() > 0;
private boolean _quiesceHttpPort = _options.getManagementModeHttpPortOverride() > 0;
+
@Override
- public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion)
+ public void begin(final int configVersion)
{
_version = configVersion;
}
@Override
- public void configuredObject(final ConfiguredObjectRecord object)
+ public boolean handle(final ConfiguredObjectRecord object)
{
String entryType = object.getType();
Map<String, Object> attributes = object.getAttributes();
@@ -153,11 +154,12 @@ public class ManagementModeStoreHandler implements DurableConfigurationStore
{
_records.put(object.getId(), object);
}
+ return true;
}
@Override
- public int completeConfigurationRecovery()
+ public int end()
{
return _version;
}
@@ -166,7 +168,7 @@ public class ManagementModeStoreHandler implements DurableConfigurationStore
- _store.recoverConfigurationStore(localRecoveryHandler);
+ _store.visitConfiguredObjectRecords(localRecoveryHandler);
_cliEntries = createPortsFromCommandLineOptions(_options);
@@ -179,17 +181,20 @@ public class ManagementModeStoreHandler implements DurableConfigurationStore
}
@Override
- public void recoverConfigurationStore(final ConfigurationRecoveryHandler recoveryHandler) throws StoreException
+ public void visitConfiguredObjectRecords(final ConfiguredObjectRecordHandler recoveryHandler) throws StoreException
{
- recoveryHandler.beginConfigurationRecovery(this,0);
+ recoveryHandler.begin(0);
for(ConfiguredObjectRecord record : _records.values())
{
- recoveryHandler.configuredObject(record);
+ if(!recoveryHandler.handle(record))
+ {
+ break;
+ }
}
- recoveryHandler.completeConfigurationRecovery();
+ recoveryHandler.end();
}
@@ -357,16 +362,16 @@ public class ManagementModeStoreHandler implements DurableConfigurationStore
final int managementModeJmxPortOverride = options.getManagementModeJmxPortOverride();
final int managementModeHttpPortOverride = options.getManagementModeHttpPortOverride();
- _store.recoverConfigurationStore(new ConfigurationRecoveryHandler()
+ _store.visitConfiguredObjectRecords(new ConfiguredObjectRecordHandler()
{
@Override
- public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion)
+ public void begin(final int configVersion)
{
}
@Override
- public void configuredObject(final ConfiguredObjectRecord entry)
+ public boolean handle(final ConfiguredObjectRecord entry)
{
String entryType = entry.getType();
Map<String, Object> attributes = entry.getAttributes();
@@ -417,11 +422,12 @@ public class ManagementModeStoreHandler implements DurableConfigurationStore
// save original state
quiescedEntries.put(entry.getId(), attributes.get(ATTRIBUTE_STATE));
}
+ return true;
}
@Override
- public int completeConfigurationRecovery()
+ public int end()
{
return 0;
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java
index b4f095b51e..d534814410 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java
@@ -58,10 +58,9 @@ import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Model;
import org.apache.qpid.server.model.SystemContext;
import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
-import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
import org.apache.qpid.util.Strings;
import org.apache.qpid.util.Strings.ChainedResolver;
@@ -128,30 +127,31 @@ public class MemoryConfigurationEntryStore implements ConfigurationEntryStore
_storeLocation = initialStore.getStoreLocation();
}
final Collection<ConfiguredObjectRecord> records = new ArrayList<ConfiguredObjectRecord>();
- final ConfigurationRecoveryHandler replayHandler = new ConfigurationRecoveryHandler()
+ final ConfiguredObjectRecordHandler replayHandler = new ConfiguredObjectRecordHandler()
{
private int _configVersion;
@Override
- public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion)
+ public void begin(final int configVersion)
{
_configVersion = configVersion;
}
@Override
- public void configuredObject(ConfiguredObjectRecord record)
+ public boolean handle(ConfiguredObjectRecord record)
{
records.add(record);
+ return true;
}
@Override
- public int completeConfigurationRecovery()
+ public int end()
{
return _configVersion;
}
};
initialStore.openConfigurationStore(parentObject, Collections.<String,Object>emptyMap());
- initialStore.recoverConfigurationStore(replayHandler);
+ initialStore.visitConfiguredObjectRecords(replayHandler);
update(true, records.toArray(new ConfiguredObjectRecord[records.size()]));
@@ -365,10 +365,10 @@ public class MemoryConfigurationEntryStore implements ConfigurationEntryStore
}
@Override
- public void recoverConfigurationStore(final ConfigurationRecoveryHandler recoveryHandler) throws StoreException
+ public void visitConfiguredObjectRecords(final ConfiguredObjectRecordHandler recoveryHandler) throws StoreException
{
- recoveryHandler.beginConfigurationRecovery(this,0);
+ recoveryHandler.begin(0);
final Map<UUID,Map<String,UUID>> parentMap = new HashMap<UUID, Map<String, UUID>>();
@@ -435,9 +435,12 @@ public class MemoryConfigurationEntryStore implements ConfigurationEntryStore
}
for(ConfiguredObjectRecord record : records.values())
{
- recoveryHandler.configuredObject(record);
+ if(!recoveryHandler.handle(record))
+ {
+ break;
+ }
}
- recoveryHandler.completeConfigurationRecovery();
+ recoveryHandler.end();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index e7b6adaf7a..6be5460d5f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -49,7 +49,10 @@ import org.apache.log4j.Logger;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.plugin.MessageMetaDataType;
-import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.JsonParseException;
@@ -221,19 +224,125 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
@Override
- public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler)
+ public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler)
{
checkConfigurationStoreOpen();
try
{
- recoveryHandler.beginConfigurationRecovery(this, getConfigVersion());
- loadConfiguredObjects(recoveryHandler);
- setConfigVersion(recoveryHandler.completeConfigurationRecovery());
+ int configVersion = getConfigVersion();
+
+ handler.begin(configVersion);
+ doVisitAllConfiguredObjectRecords(handler);
+
+ int newConfigVersion = handler.end();
+ if(newConfigVersion != configVersion)
+ {
+ setConfigVersion(newConfigVersion);
+ }
}
catch (SQLException e)
{
- throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ throw new StoreException("Cannot visit configured object records", e);
+ }
+
+ }
+
+ private void doVisitAllConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws SQLException
+ {
+ Connection conn = newAutoCommitConnection();
+ Map<UUID, ConfiguredObjectRecordImpl> configuredObjects = new HashMap<UUID, ConfiguredObjectRecordImpl>();
+ final ObjectMapper objectMapper = new ObjectMapper();
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS);
+ try
+ {
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+ while (rs.next())
+ {
+ String id = rs.getString(1);
+ String objectType = rs.getString(2);
+ String attributes = getBlobAsString(rs, 3);
+ final ConfiguredObjectRecordImpl configuredObjectRecord =
+ new ConfiguredObjectRecordImpl(UUID.fromString(id), objectType,
+ objectMapper.readValue(attributes, Map.class));
+ configuredObjects.put(configuredObjectRecord.getId(),configuredObjectRecord);
+
+ }
+ }
+ catch (JsonMappingException e)
+ {
+ throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ catch (JsonParseException e)
+ {
+ throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ catch (IOException e)
+ {
+ throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+ stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECT_HIERARCHY);
+ try
+ {
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+ while (rs.next())
+ {
+ UUID childId = UUID.fromString(rs.getString(1));
+ String parentType = rs.getString(2);
+ UUID parentId = UUID.fromString(rs.getString(3));
+
+ ConfiguredObjectRecordImpl child = configuredObjects.get(childId);
+ ConfiguredObjectRecordImpl parent = configuredObjects.get(parentId);
+
+ if(child != null && parent != null)
+ {
+ child.addParent(parentType, parent);
+ }
+ else if(child != null && child.getType().endsWith("Binding") && parentType.equals("Exchange"))
+ {
+ // TODO - remove this hack for amq. exchanges
+ child.addParent(parentType, new ConfiguredObjectRecordImpl(parentId, parentType, Collections.<String,Object>emptyMap()));
+ }
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ }
+ finally
+ {
+ conn.close();
+ }
+
+ for(ConfiguredObjectRecord record : configuredObjects.values())
+ {
+ boolean shoudlContinue = handler.handle(record);
+ if (!shoudlContinue)
+ {
+ break;
+ }
}
}
@@ -282,44 +391,25 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
{
createOrOpenMessageStoreDatabase();
upgradeIfNecessary(parent);
- }
- catch (SQLException e)
- {
- throw new StoreException("Unable to activate message store ", e);
- }
- }
- }
- @Override
- public void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler)
- {
- checkMessageStoreOpen();
-
- if(messageRecoveryHandler != null)
- {
- try
- {
- recoverMessages(messageRecoveryHandler);
- }
- catch (SQLException e)
- {
- throw new StoreException("Error encountered when restoring message data from " +
- "persistent store ", e);
- }
- }
- if(transactionLogRecoveryHandler != null)
- {
- try
- {
- TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = recoverQueueEntries(transactionLogRecoveryHandler);
- recoverXids(dtxrh);
+ visitMessages(new MessageHandler()
+ {
+ @Override
+ public boolean handle(StoredMessage<?> storedMessage)
+ {
+ long id = storedMessage.getMessageNumber();
+ if (_messageId.get() < id)
+ {
+ _messageId.set(id);
+ }
+ return true;
+ }
+ });
}
catch (SQLException e)
{
- throw new StoreException("Error encountered when restoring distributed transaction " +
- "data from persistent store ", e);
+ throw new StoreException("Unable to activate message store ", e);
}
-
}
}
@@ -1043,11 +1133,9 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
getLogger().debug("Enqueuing message "
+ messageId
+ " on queue "
- + (queue instanceof AMQQueue
- ? ((AMQQueue) queue).getName()
- : "")
- + queue.getId()
- + "[Connection"
+ + queue.getName()
+ + " with id " + queue.getId()
+ + " [Connection"
+ conn
+ "]");
}
@@ -1068,7 +1156,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
catch (SQLException e)
{
getLogger().error("Failed to enqueue: " + e.getMessage(), e);
- throw new StoreException("Error writing enqueued message with id " + messageId + " for queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" ) + " with id " + queue.getId()
+ throw new StoreException("Error writing enqueued message with id " + messageId + " for queue " + queue.getName() + " with id " + queue.getId()
+ " to database", e);
}
@@ -1093,15 +1181,13 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
if(results != 1)
{
- throw new StoreException("Unable to find message with id " + messageId + " on queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" )
+ throw new StoreException("Unable to find message with id " + messageId + " on queue " + queue.getName()
+ " with id " + queue.getId());
}
if (getLogger().isDebugEnabled())
{
- getLogger().debug("Dequeuing message " + messageId + " on queue " + (queue instanceof AMQQueue
- ? ((AMQQueue) queue).getName()
- : "")
+ getLogger().debug("Dequeuing message " + messageId + " on queue " + queue.getName()
+ " with id " + queue.getId());
}
}
@@ -1114,7 +1200,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
catch (SQLException e)
{
getLogger().error("Failed to dequeue: " + e.getMessage(), e);
- throw new StoreException("Error deleting enqueued message with id " + messageId + " for queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" )
+ throw new StoreException("Error deleting enqueued message with id " + messageId + " for queue " + queue.getName()
+ " with id " + queue.getId() + " from database", e);
}
@@ -1363,131 +1449,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
- private void recoverMessages(MessageStoreRecoveryHandler recoveryHandler) throws SQLException
- {
- Connection conn = newAutoCommitConnection();
- try
- {
- MessageStoreRecoveryHandler.StoredMessageRecoveryHandler messageHandler = recoveryHandler.begin();
-
- Statement stmt = conn.createStatement();
- try
- {
- ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_META_DATA);
- try
- {
-
- long maxId = 0;
-
- while(rs.next())
- {
-
- long messageId = rs.getLong(1);
- if(messageId > maxId)
- {
- maxId = messageId;
- }
-
- byte[] dataAsBytes = getBlobAsBytes(rs, 2);
-
- ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
- buf.position(1);
- buf = buf.slice();
- MessageMetaDataType type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]);
- StorableMessageMetaData metaData = type.createMetaData(buf);
- StoredJDBCMessage message = new StoredJDBCMessage(messageId, metaData, true);
- messageHandler.message(message);
- }
-
- _messageId.set(maxId);
-
- messageHandler.completeMessageRecovery();
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
- }
- finally
- {
- conn.close();
- }
- }
-
-
- private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) throws SQLException
- {
- Connection conn = newAutoCommitConnection();
- try
- {
- TransactionLogRecoveryHandler.QueueEntryRecoveryHandler queueEntryHandler = recoveryHandler.begin(this);
-
- Statement stmt = conn.createStatement();
- try
- {
- ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY);
- try
- {
- while(rs.next())
- {
-
- String id = rs.getString(1);
- long messageId = rs.getLong(2);
- queueEntryHandler.queueEntry(UUID.fromString(id), messageId);
- }
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
-
- return queueEntryHandler.completeQueueEntryRecovery();
- }
- finally
- {
- conn.close();
- }
- }
-
- private static final class Xid
- {
-
- private final long _format;
- private final byte[] _globalId;
- private final byte[] _branchId;
-
- public Xid(long format, byte[] globalId, byte[] branchId)
- {
- _format = format;
- _globalId = globalId;
- _branchId = branchId;
- }
-
- public long getFormat()
- {
- return _format;
- }
-
- public byte[] getGlobalId()
- {
- return _globalId;
- }
-
- public byte[] getBranchId()
- {
- return _branchId;
- }
- }
private static class RecordImpl implements Transaction.Record, TransactionLogResource, EnqueueableMessage
{
@@ -1550,93 +1511,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
}
- private void recoverXids(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh) throws SQLException
- {
- Connection conn = newAutoCommitConnection();
- try
- {
- List<Xid> xids = new ArrayList<Xid>();
-
- Statement stmt = conn.createStatement();
- try
- {
- ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_XIDS);
- try
- {
- while(rs.next())
- {
-
- long format = rs.getLong(1);
- byte[] globalId = rs.getBytes(2);
- byte[] branchId = rs.getBytes(3);
- xids.add(new Xid(format, globalId, branchId));
- }
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
-
-
-
- for(Xid xid : xids)
- {
- List<RecordImpl> enqueues = new ArrayList<RecordImpl>();
- List<RecordImpl> dequeues = new ArrayList<RecordImpl>();
-
- PreparedStatement pstmt = conn.prepareStatement(SELECT_ALL_FROM_XID_ACTIONS);
-
- try
- {
- pstmt.setLong(1, xid.getFormat());
- pstmt.setBytes(2, xid.getGlobalId());
- pstmt.setBytes(3, xid.getBranchId());
-
- ResultSet rs = pstmt.executeQuery();
- try
- {
- while(rs.next())
- {
-
- String actionType = rs.getString(1);
- UUID queueId = UUID.fromString(rs.getString(2));
- long messageId = rs.getLong(3);
-
- RecordImpl record = new RecordImpl(queueId, messageId);
- List<RecordImpl> records = "E".equals(actionType) ? enqueues : dequeues;
- records.add(record);
- }
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- pstmt.close();
- }
-
- dtxrh.dtxRecord(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(),
- enqueues.toArray(new RecordImpl[enqueues.size()]),
- dequeues.toArray(new RecordImpl[dequeues.size()]));
- }
-
-
- dtxrh.completeDtxRecordRecovery();
- }
- finally
- {
- conn.close();
- }
-
- }
-
private StorableMessageMetaData getMetaData(long messageId) throws SQLException
{
@@ -2357,43 +2231,81 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
}
- private void loadConfiguredObjects(ConfigurationRecoveryHandler recoveryHandler) throws SQLException,
- StoreException
+ @Override
+ public void visitMessages(MessageHandler handler) throws StoreException
{
- Connection conn = newAutoCommitConnection();
- Map<UUID, ConfiguredObjectRecordImpl> configuredObjects = new HashMap<UUID, ConfiguredObjectRecordImpl>();
- final ObjectMapper objectMapper = new ObjectMapper();
+ checkMessageStoreOpen();
+
+ Connection conn = null;
try
{
- PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS);
+ conn = newAutoCommitConnection();
+ Statement stmt = conn.createStatement();
try
{
- ResultSet rs = stmt.executeQuery();
+ ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_META_DATA);
try
{
while (rs.next())
{
- String id = rs.getString(1);
- String objectType = rs.getString(2);
- String attributes = getBlobAsString(rs, 3);
- final ConfiguredObjectRecordImpl configuredObjectRecord =
- new ConfiguredObjectRecordImpl(UUID.fromString(id), objectType,
- objectMapper.readValue(attributes, Map.class));
- configuredObjects.put(configuredObjectRecord.getId(),configuredObjectRecord);
-
+ long messageId = rs.getLong(1);
+ byte[] dataAsBytes = getBlobAsBytes(rs, 2);
+ ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
+ buf.position(1);
+ buf = buf.slice();
+ MessageMetaDataType<?> type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]);
+ StorableMessageMetaData metaData = type.createMetaData(buf);
+ StoredJDBCMessage message = new StoredJDBCMessage(messageId, metaData, true);
+ if (!handler.handle(message))
+ {
+ break;
+ }
}
}
- catch (JsonMappingException e)
- {
- throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
- }
- catch (JsonParseException e)
+ finally
{
- throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ rs.close();
}
- catch (IOException e)
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Error encountered when visiting messages", e);
+ }
+ finally
+ {
+ closeConnection(conn);
+ }
+ }
+
+ @Override
+ public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ Connection conn = null;
+ try
+ {
+ conn = newAutoCommitConnection();
+ Statement stmt = conn.createStatement();
+ try
+ {
+ ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY);
+ try
{
- throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ while(rs.next())
+ {
+ String id = rs.getString(1);
+ long messageId = rs.getLong(2);
+ if (!handler.handle(UUID.fromString(id), messageId))
+ {
+ break;
+ }
+ }
}
finally
{
@@ -2404,31 +2316,41 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
{
stmt.close();
}
- stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECT_HIERARCHY);
+ }
+ catch(SQLException e)
+ {
+ throw new StoreException("Error encountered when visiting message instances", e);
+ }
+ finally
+ {
+ closeConnection(conn);
+ }
+ }
+
+ @Override
+ public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ Connection conn = null;
+ try
+ {
+ conn = newAutoCommitConnection();
+ List<Xid> xids = new ArrayList<Xid>();
+
+ Statement stmt = conn.createStatement();
try
{
- ResultSet rs = stmt.executeQuery();
+ ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_XIDS);
try
{
- while (rs.next())
+ while(rs.next())
{
- UUID childId = UUID.fromString(rs.getString(1));
- String parentType = rs.getString(2);
- UUID parentId = UUID.fromString(rs.getString(3));
-
- ConfiguredObjectRecordImpl child = configuredObjects.get(childId);
- ConfiguredObjectRecordImpl parent = configuredObjects.get(parentId);
-
- if(child != null && parent != null)
- {
- child.addParent(parentType, parent);
- }
- else if(child != null && child.getType().endsWith("Binding") && parentType.equals("Exchange"))
- {
- // TODO - remove this hack for amq. exchanges
- child.addParent(parentType, new ConfiguredObjectRecordImpl(parentId, parentType, Collections.<String,Object>emptyMap()));
- }
+ long format = rs.getLong(1);
+ byte[] globalId = rs.getBytes(2);
+ byte[] branchId = rs.getBytes(3);
+ xids.add(new Xid(format, globalId, branchId));
}
}
finally
@@ -2441,18 +2363,67 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
stmt.close();
}
+
+
+ for(Xid xid : xids)
+ {
+ List<RecordImpl> enqueues = new ArrayList<RecordImpl>();
+ List<RecordImpl> dequeues = new ArrayList<RecordImpl>();
+
+ PreparedStatement pstmt = conn.prepareStatement(SELECT_ALL_FROM_XID_ACTIONS);
+
+ try
+ {
+ pstmt.setLong(1, xid.getFormat());
+ pstmt.setBytes(2, xid.getGlobalId());
+ pstmt.setBytes(3, xid.getBranchId());
+
+ ResultSet rs = pstmt.executeQuery();
+ try
+ {
+ while(rs.next())
+ {
+
+ String actionType = rs.getString(1);
+ UUID queueId = UUID.fromString(rs.getString(2));
+ long messageId = rs.getLong(3);
+
+ RecordImpl record = new RecordImpl(queueId, messageId);
+ List<RecordImpl> records = "E".equals(actionType) ? enqueues : dequeues;
+ records.add(record);
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ pstmt.close();
+ }
+
+ if (!handler.handle(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(),
+ enqueues.toArray(new RecordImpl[enqueues.size()]),
+ dequeues.toArray(new RecordImpl[dequeues.size()])))
+ {
+ break;
+ }
+ }
+
}
- finally
+ catch (SQLException e)
{
- conn.close();
- }
+ throw new StoreException("Error encountered when visiting distributed transactions", e);
- for(ConfiguredObjectRecord record : configuredObjects.values())
+ }
+ finally
{
- recoveryHandler.configuredObject(record);
+ closeConnection(conn);
}
}
+
protected abstract String getBlobAsString(ResultSet rs, int col) throws SQLException;
protected abstract void storedSizeChange(int storeSizeIncrease);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java
index a7e9ef2ab6..99785c48a9 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java
@@ -20,17 +20,36 @@
*/
package org.apache.qpid.server.store;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.Transaction.Record;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
/** A simple message store that stores the messages in a thread-safe structure in memory. */
-abstract public class AbstractMemoryMessageStore extends NullMessageStore
+abstract class AbstractMemoryMessageStore implements MessageStore, DurableConfigurationStore
{
- private final AtomicLong _messageId = new AtomicLong(1);
-
- private static final Transaction IN_MEMORY_TRANSACTION = new Transaction()
+ private final class MemoryMessageStoreTransaction implements Transaction
{
+ private Map<UUID, Set<Long>> _localEnqueueMap = new HashMap<UUID, Set<Long>>();
+ private Map<UUID, Set<Long>> _localDequeueMap = new HashMap<UUID, Set<Long>>();
+
+ private Map<Xid, DistributedTransactionRecords> _localDistributedTransactionsRecords = new HashMap<Xid, DistributedTransactionRecords>();
+ private Set<Xid> _localDistributedTransactionsRemoves = new HashSet<Xid>();
+
@Override
public StoreFuture commitTranAsync()
{
@@ -40,50 +59,145 @@ abstract public class AbstractMemoryMessageStore extends NullMessageStore
@Override
public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message)
{
+ Set<Long> messageIds = _localEnqueueMap.get(queue.getId());
+ if (messageIds == null)
+ {
+ messageIds = new HashSet<Long>();
+ _localEnqueueMap.put(queue.getId(), messageIds);
+ }
+ messageIds.add(message.getMessageNumber());
}
@Override
- public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message)
+ public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message)
{
+ Set<Long> messageIds = _localDequeueMap.get(queue.getId());
+ if (messageIds == null)
+ {
+ messageIds = new HashSet<Long>();
+ _localDequeueMap.put(queue.getId(), messageIds);
+ }
+ messageIds.add(message.getMessageNumber());
}
@Override
public void commitTran()
{
+ commitTransactionInternal(this);
+ _localEnqueueMap.clear();
+ _localDequeueMap.clear();
}
@Override
public void abortTran()
{
+ _localEnqueueMap.clear();
+ _localDequeueMap.clear();
}
@Override
public void removeXid(long format, byte[] globalId, byte[] branchId)
{
+ _localDistributedTransactionsRemoves.add(new Xid(format, globalId, branchId));
}
@Override
public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
{
+ _localDistributedTransactionsRecords.put(new Xid(format, globalId, branchId), new DistributedTransactionRecords(enqueues, dequeues));
}
- };
+ }
+
+ private final AtomicLong _messageId = new AtomicLong(1);
- private final EventManager _eventManager = new EventManager();
+ private final ConcurrentHashMap<UUID, ConfiguredObjectRecord> _configuredObjectRecords = new ConcurrentHashMap<UUID, ConfiguredObjectRecord>();
+ protected ConcurrentHashMap<Long, StoredMemoryMessage> _messages = new ConcurrentHashMap<Long, StoredMemoryMessage>();
+ private Object _transactionLock = new Object();
+ private Map<UUID, Set<Long>> _messageInstances = new HashMap<UUID, Set<Long>>();
+ private Map<Xid, DistributedTransactionRecords> _distributedTransactions = new HashMap<Xid, DistributedTransactionRecords>();
+
+ @SuppressWarnings("unchecked")
@Override
- public StoredMessage addMessage(StorableMessageMetaData metaData)
+ public StoredMessage<StorableMessageMetaData> addMessage(final StorableMessageMetaData metaData)
{
- final long id = _messageId.getAndIncrement();
- StoredMemoryMessage message = new StoredMemoryMessage(id, metaData);
+ long id = _messageId.getAndIncrement();
+
+ if(metaData.isPersistent())
+ {
+ return new StoredMemoryMessage(id, metaData)
+ {
+
+ @Override
+ public StoreFuture flushToStore()
+ {
+ _messages.putIfAbsent(getMessageNumber(), this) ;
+ return super.flushToStore();
+ }
+
+ @Override
+ public void remove()
+ {
+ _messages.remove(getMessageNumber());
+ super.remove();
+ }
+
+ };
+ }
+ else
+ {
+ return new StoredMemoryMessage(id, metaData);
+ }
+ }
+
+ private void commitTransactionInternal(MemoryMessageStoreTransaction transaction)
+ {
+ synchronized (_transactionLock )
+ {
+ for (Map.Entry<UUID, Set<Long>> loacalEnqueuedEntry : transaction._localEnqueueMap.entrySet())
+ {
+ Set<Long> messageIds = _messageInstances.get(loacalEnqueuedEntry.getKey());
+ if (messageIds == null)
+ {
+ messageIds = new HashSet<Long>();
+ _messageInstances.put(loacalEnqueuedEntry.getKey(), messageIds);
+ }
+ messageIds.addAll(loacalEnqueuedEntry.getValue());
+ }
+
+ for (Map.Entry<UUID, Set<Long>> loacalDequeueEntry : transaction._localDequeueMap.entrySet())
+ {
+ Set<Long> messageIds = _messageInstances.get(loacalDequeueEntry.getKey());
+ if (messageIds != null)
+ {
+ messageIds.removeAll(loacalDequeueEntry.getValue());
+ if (messageIds.isEmpty())
+ {
+ _messageInstances.remove(loacalDequeueEntry.getKey());
+ }
+ }
+ }
+
+ for (Map.Entry<Xid, DistributedTransactionRecords> entry : transaction._localDistributedTransactionsRecords.entrySet())
+ {
+ _distributedTransactions.put(entry.getKey(), entry.getValue());
+ }
+
+ for (Xid removed : transaction._localDistributedTransactionsRemoves)
+ {
+ _distributedTransactions.remove(removed);
+ }
+
+ }
+
- return message;
}
@Override
public Transaction newTransaction()
{
- return IN_MEMORY_TRANSACTION;
+ return new MemoryMessageStoreTransaction();
}
@Override
@@ -95,7 +209,164 @@ abstract public class AbstractMemoryMessageStore extends NullMessageStore
@Override
public void addEventListener(EventListener eventListener, Event... events)
{
- _eventManager.addEventListener(eventListener, events);
}
+ @Override
+ public void create(ConfiguredObjectRecord record)
+ {
+ if (_configuredObjectRecords.putIfAbsent(record.getId(), record) != null)
+ {
+ throw new StoreException("Record with id " + record.getId() + " is already present");
+ }
+ }
+
+ @Override
+ public void update(boolean createIfNecessary, ConfiguredObjectRecord... records)
+ {
+ for (ConfiguredObjectRecord record : records)
+ {
+ ConfiguredObjectRecord previousValue = _configuredObjectRecords.replace(record.getId(), record);
+ if (previousValue == null && !createIfNecessary)
+ {
+ throw new StoreException("Record with id " + record.getId() + " does not exist");
+ }
+ }
+ }
+
+ @Override
+ public UUID[] remove(final ConfiguredObjectRecord... objects)
+ {
+ List<UUID> removed = new ArrayList<UUID>();
+ for (ConfiguredObjectRecord record : objects)
+ {
+ if (_configuredObjectRecords.remove(record.getId()) != null)
+ {
+ removed.add(record.getId());
+ }
+ }
+ return removed.toArray(new UUID[removed.size()]);
+ }
+
+ @Override
+ public void closeConfigurationStore()
+ {
+ _configuredObjectRecords.clear();
+ }
+
+ @Override
+ public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings)
+ {
+ }
+
+ @Override
+ public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws StoreException
+ {
+ handler.begin(VirtualHost.CURRENT_CONFIG_VERSION);
+ for (ConfiguredObjectRecord record : _configuredObjectRecords.values())
+ {
+ if (!handler.handle(record))
+ {
+ break;
+ }
+ }
+ handler.end();
+ }
+
+ @Override
+ public void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings)
+ {
+ }
+
+ @Override
+ public void closeMessageStore()
+ {
+ _messages.clear();
+ synchronized (_transactionLock)
+ {
+ _messageInstances.clear();
+ _distributedTransactions.clear();
+ }
+ }
+
+ @Override
+ public String getStoreLocation()
+ {
+ return null;
+ }
+
+ @Override
+ public void onDelete()
+ {
+ }
+
+ @Override
+ public void visitMessages(MessageHandler handler) throws StoreException
+ {
+ for (StoredMemoryMessage message : _messages.values())
+ {
+ if(!handler.handle(message))
+ {
+ break;
+ }
+ }
+ }
+
+ @Override
+ public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
+ {
+ synchronized (_transactionLock)
+ {
+ for (Map.Entry<UUID, Set<Long>> enqueuedEntry : _messageInstances.entrySet())
+ {
+ UUID resourceId = enqueuedEntry.getKey();
+ for (Long messageId : enqueuedEntry.getValue())
+ {
+ if (!handler.handle(resourceId, messageId))
+ {
+ return;
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
+ {
+ synchronized (_transactionLock)
+ {
+ for (Map.Entry<Xid, DistributedTransactionRecords> entry : _distributedTransactions.entrySet())
+ {
+ Xid xid = entry.getKey();
+ DistributedTransactionRecords records = entry.getValue();
+ if (!handler.handle(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(), records.getEnqueues(), records.getDequeues()))
+ {
+ break;
+ }
+ }
+ }
+ }
+
+ private static final class DistributedTransactionRecords
+ {
+ private Record[] _enqueues;
+ private Record[] _dequeues;
+
+ public DistributedTransactionRecords(Record[] enqueues, Record[] dequeues)
+ {
+ super();
+ _enqueues = enqueues;
+ _dequeues = dequeues;
+ }
+
+ public Record[] getEnqueues()
+ {
+ return _enqueues;
+ }
+
+ public Record[] getDequeues()
+ {
+ return _dequeues;
+ }
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecordRecoveverAndUpgrader.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecordRecoveverAndUpgrader.java
new file mode 100644
index 0000000000..85265d986e
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecordRecoveverAndUpgrader.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.util.Map;
+
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.server.virtualhost.DefaultUpgraderProvider;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class ConfiguredObjectRecordRecoveverAndUpgrader implements ConfiguredObjectRecordHandler
+{
+ private DurableConfigurationRecoverer _configRecoverer;
+ private DurableConfigurationStore _store;
+
+ public ConfiguredObjectRecordRecoveverAndUpgrader(VirtualHost virtualHost, Map<String, DurableConfiguredObjectRecoverer> recoverers)
+ {
+ DefaultUpgraderProvider upgraderProvider = new DefaultUpgraderProvider(virtualHost);
+ _configRecoverer = new DurableConfigurationRecoverer(virtualHost.getName(), recoverers, upgraderProvider, virtualHost.getEventLogger());
+ _store = virtualHost.getDurableConfigurationStore();
+ }
+
+ @Override
+ public void begin(int configVersion)
+ {
+ _configRecoverer.beginConfigurationRecovery(_store, configVersion);
+ }
+
+ @Override
+ public boolean handle(ConfiguredObjectRecord record)
+ {
+ _configRecoverer.configuredObject(record);
+ return true;
+ }
+
+ @Override
+ public int end()
+ {
+ return _configRecoverer.completeConfigurationRecovery();
+ }
+
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
index 9f610b0199..7d93f18906 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
@@ -20,11 +20,12 @@
*/
package org.apache.qpid.server.store;
-import org.apache.qpid.server.model.ConfiguredObject;
-
import java.util.Map;
import java.util.UUID;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+
public interface DurableConfigurationStore
{
String STORE_TYPE = "storeType";
@@ -47,12 +48,6 @@ public interface DurableConfigurationStore
void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings) throws StoreException;
/**
- * Recovers configuration from the store using given recovery handler
- * @param recoveryHandler recovery handler
- */
- void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler) throws StoreException;
-
- /**
* Makes the specified object persistent.
*
* @param object The object to persist.
@@ -85,4 +80,11 @@ public interface DurableConfigurationStore
void closeConfigurationStore() throws StoreException;
+ /**
+ * Visit all configured object records with given handler.
+ *
+ * @param handler a handler to invoke on each configured object record
+ * @throws StoreException
+ */
+ void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws StoreException;
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
index 819da86ca0..a5ace16cfa 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
@@ -31,6 +31,7 @@ import java.util.*;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Model;
import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.JsonProcessingException;
@@ -97,22 +98,27 @@ public class JsonFileConfigStore implements DurableConfigurationStore
}
@Override
- public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler)
+ public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler)
{
- recoveryHandler.beginConfigurationRecovery(this,_configVersion);
+ handler.begin(_configVersion);
List<ConfiguredObjectRecord> records = new ArrayList<ConfiguredObjectRecord>(_objectsById.values());
for(ConfiguredObjectRecord record : records)
{
- recoveryHandler.configuredObject(record);
+ boolean shouldContinue = handler.handle(record);
+ if (!shouldContinue)
+ {
+ break;
+ }
}
int oldConfigVersion = _configVersion;
- _configVersion = recoveryHandler.completeConfigurationRecovery();
+ _configVersion = handler.end();
if(oldConfigVersion != _configVersion)
{
save();
}
}
+
private void setup(final Map<String, Object> configurationStoreSettings)
{
Object storePathAttr = configurationStoreSettings.get(DurableConfigurationStore.STORE_PATH);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
index 69f9073f6e..433f618d1a 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
@@ -23,6 +23,9 @@ package org.apache.qpid.server.store;
import java.util.Map;
import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
/**
* MessageStore defines the interface to a storage area, which can be used to preserve the state of messages.
@@ -43,13 +46,6 @@ public interface MessageStore
*/
void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings);
- /**
- * Called after opening to recover messages and transactions with given recovery handlers
- * @param messageRecoveryHandler
- * @param transactionLogRecoveryHandler
- */
- void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler);
-
public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData);
@@ -71,8 +67,10 @@ public interface MessageStore
String getStoreLocation();
- // TODO dead method - remove??
- String getStoreType();
-
void onDelete();
+
+ void visitMessages(MessageHandler handler) throws StoreException;
+ void visitMessageInstances(MessageInstanceHandler handler) throws StoreException;
+ void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException;
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStoreRecoveryHandler.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStoreRecoveryHandler.java
deleted file mode 100755
index ba65b8e1ec..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStoreRecoveryHandler.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-package org.apache.qpid.server.store;
-
-public interface MessageStoreRecoveryHandler
-{
- StoredMessageRecoveryHandler begin();
-
- public static interface StoredMessageRecoveryHandler
- {
- void message(StoredMessage message);
-
- void completeMessageRecovery();
- }
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
index 59b4530014..a3ed4bea05 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
@@ -23,6 +23,10 @@ import java.util.Map;
import java.util.UUID;
import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
public abstract class NullMessageStore implements MessageStore, DurableConfigurationStore
{
@@ -33,11 +37,6 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura
}
@Override
- public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler)
- {
- }
-
- @Override
public void update(boolean createIfNecessary, ConfiguredObjectRecord... records)
{
}
@@ -92,11 +91,6 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura
}
@Override
- public void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler)
- {
- }
-
- @Override
public void addEventListener(EventListener eventListener, Event... events)
{
}
@@ -112,4 +106,24 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura
{
}
+ @Override
+ public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws StoreException
+ {
+ }
+
+ @Override
+ public void visitMessages(MessageHandler handler) throws StoreException
+ {
+ }
+
+ @Override
+ public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
+ {
+ }
+
+ @Override
+ public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
+ {
+ }
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java
deleted file mode 100755
index bd4da648f9..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-package org.apache.qpid.server.store;
-
-import java.util.UUID;
-
-public interface TransactionLogRecoveryHandler
-{
- QueueEntryRecoveryHandler begin(MessageStore log);
-
- public static interface QueueEntryRecoveryHandler
- {
- DtxRecordRecoveryHandler completeQueueEntryRecovery();
-
- void queueEntry(UUID queueId, long messageId);
- }
-
- public static interface DtxRecordRecoveryHandler
- {
- void dtxRecord(long format, byte[] globalId, byte[] branchId, Transaction.Record[] enqueues, Transaction.Record[] dequeues);
-
- void completeDtxRecordRecovery();
- }
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Xid.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Xid.java
new file mode 100644
index 0000000000..4db33861a4
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Xid.java
@@ -0,0 +1,102 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.util.Arrays;
+
+public final class Xid
+{
+ private final long _format;
+ private final byte[] _globalId;
+ private final byte[] _branchId;
+
+ public Xid(long format, byte[] globalId, byte[] branchId)
+ {
+ _format = format;
+ _globalId = globalId;
+ _branchId = branchId;
+ }
+
+ public long getFormat()
+ {
+ return _format;
+ }
+
+ public byte[] getGlobalId()
+ {
+ return _globalId;
+ }
+
+ public byte[] getBranchId()
+ {
+ return _branchId;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + Arrays.hashCode(_branchId);
+ result = prime * result + (int) (_format ^ (_format >>> 32));
+ result = prime * result + Arrays.hashCode(_globalId);
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj)
+ {
+ return true;
+ }
+
+ if (obj == null)
+ {
+ return false;
+ }
+
+ if (getClass() != obj.getClass())
+ {
+ return false;
+ }
+
+ Xid other = (Xid) obj;
+
+ if (!Arrays.equals(_branchId, other._branchId))
+ {
+ return false;
+ }
+
+ if (_format != other._format)
+ {
+ return false;
+ }
+
+ if (!Arrays.equals(_globalId, other._globalId))
+ {
+ return false;
+ }
+ return true;
+ }
+
+
+} \ No newline at end of file
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/ConfiguredObjectRecordHandler.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/ConfiguredObjectRecordHandler.java
new file mode 100644
index 0000000000..747c735ff1
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/ConfiguredObjectRecordHandler.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.handler;
+
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
+
+public interface ConfiguredObjectRecordHandler
+{
+ // TODO configVersion argument will be removed.
+ void begin(int configVersion);
+
+ /**
+ * Handles the given record.
+ *
+ * @param record
+ * @return false is returned if the handler does not wish to handle other record, true otherwise
+ */
+ boolean handle(ConfiguredObjectRecord record);
+
+ //TODO: return should be void
+ // temporarily returning new config version
+ int end();
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/Xid.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/DistributedTransactionHandler.java
index bed7575f9a..733c93355b 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/Xid.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/DistributedTransactionHandler.java
@@ -18,35 +18,13 @@
* under the License.
*
*/
+package org.apache.qpid.server.store.handler;
-package org.apache.qpid.server.store.berkeleydb.entry;
+import org.apache.qpid.server.store.Transaction.Record;
-public class Xid
+public interface DistributedTransactionHandler
{
- private final long _format;
- private final byte[] _globalId;
- private final byte[] _branchId;
+ boolean handle(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues);
- public Xid(long format, byte[] globalId, byte[] branchId)
- {
- _format = format;
- _globalId = globalId;
- _branchId = branchId;
- }
-
- public long getFormat()
- {
- return _format;
- }
-
- public byte[] getGlobalId()
- {
- return _globalId;
- }
-
- public byte[] getBranchId()
- {
- return _branchId;
- }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/MessageHandler.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/MessageHandler.java
new file mode 100644
index 0000000000..30c1f7b450
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/MessageHandler.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.handler;
+
+import org.apache.qpid.server.store.StoredMessage;
+
+public interface MessageHandler
+{
+
+ boolean handle(StoredMessage<?> storedMessage);
+
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/MessageInstanceHandler.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/MessageInstanceHandler.java
new file mode 100644
index 0000000000..3775ec4fee
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/MessageInstanceHandler.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.handler;
+
+import java.util.UUID;
+
+public interface MessageInstanceHandler
+{
+ boolean handle(UUID queueId, long messageId);
+
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
index 7e0562afec..46b5dbb9fc 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
@@ -78,6 +78,10 @@ public class DefaultUpgraderProvider implements UpgraderProvider
public DurableConfigurationStoreUpgrader getUpgrader(final int configVersion, DurableConfigurationRecoverer recoverer)
{
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Getting upgrader for configVersion: " + configVersion);
+ }
DurableConfigurationStoreUpgrader currentUpgrader = null;
switch(configVersion)
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java
new file mode 100644
index 0000000000..df47c85f64
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java
@@ -0,0 +1,357 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.messages.MessageStoreMessages;
+import org.apache.qpid.server.logging.messages.TransactionLogMessages;
+import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
+import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.plugin.MessageMetaDataType;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.store.Transaction.Record;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
+import org.apache.qpid.server.txn.DtxBranch;
+import org.apache.qpid.server.txn.DtxRegistry;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.transport.Xid;
+import org.apache.qpid.transport.util.Functions;
+
+public class MessageStoreRecoverer
+{
+ private static final Logger _logger = Logger.getLogger(MessageStoreRecoverer.class);
+
+ private final VirtualHost _virtualHost;
+
+ private final Map<String, Integer> _queueRecoveries = new TreeMap<String, Integer>();
+ private final Map<Long, ServerMessage<?>> _recoveredMessages = new HashMap<Long, ServerMessage<?>>();
+ private final Map<Long, StoredMessage<?>> _unusedMessages = new HashMap<Long, StoredMessage<?>>();
+ private final EventLogger _eventLogger;
+
+ private final MessageStoreLogSubject _logSubject;
+ private final MessageStore _store;
+
+
+ public MessageStoreRecoverer(VirtualHost virtualHost, MessageStoreLogSubject logSubject)
+ {
+ super();
+ _virtualHost = virtualHost;
+ _eventLogger = virtualHost.getEventLogger();
+ _logSubject = logSubject;
+ _store = virtualHost.getMessageStore();
+ }
+
+
+ public void recover()
+ {
+ _eventLogger.message(_logSubject, MessageStoreMessages.RECOVERY_START());
+ _store.visitMessages(messageVisitor);
+
+ _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERY_START(null, false));
+ _store.visitMessageInstances(messageAndMessageInstanceRecoverer);
+
+ for(Map.Entry<String,Integer> entry : _queueRecoveries.entrySet())
+ {
+ _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERED(entry.getValue(), entry.getKey()));
+ _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(entry.getKey(), true));
+ }
+
+ _store.visitDistributedTransactions(distributedTransactionRecoverer);
+
+
+
+ for(StoredMessage<?> m : _unusedMessages.values())
+ {
+ _logger.warn("Message id " + m.getMessageNumber() + " in store, but not in any queue - removing....");
+ m.remove();
+ }
+ _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false));
+
+ _eventLogger.message(_logSubject, MessageStoreMessages.RECOVERED(_recoveredMessages.size() - _unusedMessages.size()));
+ _eventLogger.message(_logSubject, MessageStoreMessages.RECOVERY_COMPLETE());
+
+
+ }
+
+ MessageHandler messageVisitor = new MessageHandler()
+ {
+
+ @Override
+ public boolean handle(StoredMessage<?> message)
+ {
+ StorableMessageMetaData metaData = message.getMetaData();
+
+ @SuppressWarnings("rawtypes")
+ MessageMetaDataType type = metaData.getType();
+
+ @SuppressWarnings("unchecked")
+ ServerMessage<?> serverMessage = type.createMessage(message);
+
+ _recoveredMessages.put(message.getMessageNumber(), serverMessage);
+ _unusedMessages.put(message.getMessageNumber(), message);
+ return true;
+ }
+
+ };
+
+ MessageInstanceHandler messageAndMessageInstanceRecoverer = new MessageInstanceHandler()
+ {
+ @Override
+ public boolean handle(final UUID queueId, long messageId)
+ {
+ AMQQueue<?> queue = _virtualHost.getQueue(queueId);
+ if(queue != null)
+ {
+ String queueName = queue.getName();
+ ServerMessage<?> message = _recoveredMessages.get(messageId);
+ _unusedMessages.remove(messageId);
+
+ if(message != null)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("On recovery, delivering " + message.getMessageNumber() + " to " + queueName);
+ }
+
+ Integer count = _queueRecoveries.get(queueName);
+ if (count == null)
+ {
+ count = 0;
+ }
+
+ queue.enqueue(message,null);
+
+ _queueRecoveries.put(queueName, ++count);
+ }
+ else
+ {
+ _logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queueName + " is unknown, entry will be discarded");
+ Transaction txn = _store.newTransaction();
+ txn.dequeueMessage(queue, new DummyMessage(messageId));
+ txn.commitTranAsync();
+ }
+ }
+ else
+ {
+ _logger.warn("Message id " + messageId + " in log references queue with id " + queueId + " which is not in the configuration, entry will be discarded");
+ Transaction txn = _store.newTransaction();
+ TransactionLogResource mockQueue =
+ new TransactionLogResource()
+ {
+ @Override
+ public String getName()
+ {
+ return "<<UNKNOWN>>";
+ }
+
+ @Override
+ public UUID getId()
+ {
+ return queueId;
+ }
+
+ @Override
+ public boolean isDurable()
+ {
+ return false;
+ }
+ };
+ txn.dequeueMessage(mockQueue, new DummyMessage(messageId));
+ txn.commitTranAsync();
+ }
+ return true;
+ }
+ };
+
+ private DistributedTransactionHandler distributedTransactionRecoverer = new DistributedTransactionHandler()
+ {
+
+ @Override
+ public boolean handle(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
+ {
+ Xid id = new Xid(format, globalId, branchId);
+ DtxRegistry dtxRegistry = _virtualHost.getDtxRegistry();
+ DtxBranch branch = dtxRegistry.getBranch(id);
+ if(branch == null)
+ {
+ branch = new DtxBranch(id, _store, _virtualHost);
+ dtxRegistry.registerBranch(branch);
+ }
+ for(Transaction.Record record : enqueues)
+ {
+ final AMQQueue<?> queue = _virtualHost.getQueue(record.getResource().getId());
+ if(queue != null)
+ {
+ final long messageId = record.getMessage().getMessageNumber();
+ final ServerMessage<?> message = _recoveredMessages.get(messageId);
+ _unusedMessages.remove(messageId);
+
+ if(message != null)
+ {
+ final MessageReference<?> ref = message.newReference();
+
+ branch.enqueue(queue,message);
+
+ branch.addPostTransactionAction(new ServerTransaction.Action()
+ {
+
+ public void postCommit()
+ {
+ queue.enqueue(message, null);
+ ref.release();
+ }
+
+ public void onRollback()
+ {
+ ref.release();
+ }
+ });
+ }
+ else
+ {
+ StringBuilder xidString = xidAsString(id);
+ _eventLogger.message(_logSubject,
+ TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(),
+ Long.toString(messageId)));
+ }
+ }
+ else
+ {
+ StringBuilder xidString = xidAsString(id);
+ _eventLogger.message(_logSubject,
+ TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
+ record.getResource().getId().toString()));
+
+ }
+ }
+ for(Transaction.Record record : dequeues)
+ {
+ final AMQQueue<?> queue = _virtualHost.getQueue(record.getResource().getId());
+ if(queue != null)
+ {
+ final long messageId = record.getMessage().getMessageNumber();
+ final ServerMessage<?> message = _recoveredMessages.get(messageId);
+ _unusedMessages.remove(messageId);
+
+ if(message != null)
+ {
+ final QueueEntry entry = queue.getMessageOnTheQueue(messageId);
+
+ entry.acquire();
+
+ branch.dequeue(queue, message);
+
+ branch.addPostTransactionAction(new ServerTransaction.Action()
+ {
+
+ public void postCommit()
+ {
+ entry.delete();
+ }
+
+ public void onRollback()
+ {
+ entry.release();
+ }
+ });
+ }
+ else
+ {
+ StringBuilder xidString = xidAsString(id);
+ _eventLogger.message(_logSubject,
+ TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(),
+ Long.toString(messageId)));
+
+ }
+
+ }
+ else
+ {
+ StringBuilder xidString = xidAsString(id);
+ _eventLogger.message(_logSubject,
+ TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
+ record.getResource().getId().toString()));
+ }
+
+ }
+
+ branch.setState(DtxBranch.State.PREPARED);
+ branch.prePrepareTransaction();
+ return true;
+ }
+
+ private StringBuilder xidAsString(Xid id)
+ {
+ return new StringBuilder("(")
+ .append(id.getFormat())
+ .append(',')
+ .append(Functions.str(id.getGlobalId()))
+ .append(',')
+ .append(Functions.str(id.getBranchId()))
+ .append(')');
+ }
+
+
+ };
+
+
+ private static class DummyMessage implements EnqueueableMessage
+ {
+
+ private final long _messageId;
+
+ public DummyMessage(long messageId)
+ {
+ _messageId = messageId;
+ }
+
+ public long getMessageNumber()
+ {
+ return _messageId;
+ }
+
+ public boolean isPersistent()
+ {
+ return true;
+ }
+
+ public StoredMessage getStoredMessage()
+ {
+ return null;
+ }
+ }
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
index e3fd938225..14849aea1e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
@@ -29,11 +29,11 @@ import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.plugin.MessageStoreFactory;
import org.apache.qpid.server.stats.StatisticsGatherer;
-
-import org.apache.qpid.server.store.DurableConfigurationRecoverer;
+import org.apache.qpid.server.store.ConfiguredObjectRecordRecoveverAndUpgrader;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.DurableConfigurationStoreCreator;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
public class StandardVirtualHost extends AbstractVirtualHost
{
@@ -107,18 +107,22 @@ public class StandardVirtualHost extends AbstractVirtualHost
if (_configurationStoreLogSubject != null)
{
getEventLogger().message(_configurationStoreLogSubject, ConfigStoreMessages.STORE_LOCATION(configurationStoreSettings.toString()));
+ getEventLogger().message(_configurationStoreLogSubject, ConfigStoreMessages.RECOVERY_START());
}
- DurableConfigurationRecoverer configRecoverer = new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(),
- new DefaultUpgraderProvider(this), getEventLogger());
+ ConfiguredObjectRecordHandler upgraderRecoverer = new ConfiguredObjectRecordRecoveverAndUpgrader(this, getDurableConfigurationRecoverers());
+
+ _durableConfigurationStore.visitConfiguredObjectRecords(upgraderRecoverer);
- _durableConfigurationStore.recoverConfigurationStore(configRecoverer);
+ if (_configurationStoreLogSubject != null)
+ {
+ getEventLogger().message(_configurationStoreLogSubject, ConfigStoreMessages.RECOVERY_COMPLETE());
+ }
// If store does not have entries for standard exchanges (amq.*), the following will create them.
initialiseModel();
- VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this, getMessageStoreLogSubject());
- _messageStore.recoverMessageStore(recoveryHandler, recoveryHandler);
+ new MessageStoreRecoverer(this, getMessageStoreLogSubject()).recover();
attainActivation();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
deleted file mode 100755
index 3216115967..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
+++ /dev/null
@@ -1,350 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-package org.apache.qpid.server.virtualhost;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.UUID;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.logging.EventLogger;
-import org.apache.qpid.server.logging.messages.MessageStoreMessages;
-import org.apache.qpid.server.logging.messages.TransactionLogMessages;
-import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
-import org.apache.qpid.server.message.EnqueueableMessage;
-import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.Transaction;
-import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
-import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.txn.DtxBranch;
-import org.apache.qpid.server.txn.DtxRegistry;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.transport.Xid;
-import org.apache.qpid.transport.util.Functions;
-
-public class VirtualHostConfigRecoveryHandler implements
- MessageStoreRecoveryHandler,
- MessageStoreRecoveryHandler.StoredMessageRecoveryHandler,
- TransactionLogRecoveryHandler,
- TransactionLogRecoveryHandler.QueueEntryRecoveryHandler,
- TransactionLogRecoveryHandler.DtxRecordRecoveryHandler
-{
- private static final Logger _logger = Logger.getLogger(VirtualHostConfigRecoveryHandler.class);
-
- private final VirtualHost _virtualHost;
-
- private final Map<String, Integer> _queueRecoveries = new TreeMap<String, Integer>();
- private final Map<Long, ServerMessage> _recoveredMessages = new HashMap<Long, ServerMessage>();
- private final Map<Long, StoredMessage> _unusedMessages = new HashMap<Long, StoredMessage>();
- private final EventLogger _eventLogger;
-
- private final MessageStoreLogSubject _logSubject;
- private MessageStore _store;
-
- public VirtualHostConfigRecoveryHandler(VirtualHost virtualHost, MessageStoreLogSubject logSubject)
- {
- _virtualHost = virtualHost;
- _eventLogger = virtualHost.getEventLogger();
- _logSubject = logSubject;
- }
-
- public VirtualHostConfigRecoveryHandler begin(MessageStore store)
- {
- _store = store;
- _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERY_START(null, false));
- return this;
- }
-
- public StoredMessageRecoveryHandler begin()
- {
- _eventLogger.message(_logSubject, MessageStoreMessages.RECOVERY_START());
- return this;
- }
-
- public void message(StoredMessage message)
- {
- ServerMessage serverMessage = message.getMetaData().getType().createMessage(message);
-
- _recoveredMessages.put(message.getMessageNumber(), serverMessage);
- _unusedMessages.put(message.getMessageNumber(), message);
- }
-
- public void completeMessageRecovery()
- {
- }
-
- public void dtxRecord(long format, byte[] globalId, byte[] branchId,
- Transaction.Record[] enqueues,
- Transaction.Record[] dequeues)
- {
- Xid id = new Xid(format, globalId, branchId);
- DtxRegistry dtxRegistry = _virtualHost.getDtxRegistry();
- DtxBranch branch = dtxRegistry.getBranch(id);
- if(branch == null)
- {
- branch = new DtxBranch(id, _store, _virtualHost);
- dtxRegistry.registerBranch(branch);
- }
- for(Transaction.Record record : enqueues)
- {
- final AMQQueue queue = _virtualHost.getQueue(record.getResource().getId());
- if(queue != null)
- {
- final long messageId = record.getMessage().getMessageNumber();
- final ServerMessage message = _recoveredMessages.get(messageId);
- _unusedMessages.remove(messageId);
-
- if(message != null)
- {
- final MessageReference ref = message.newReference();
-
-
- branch.enqueue(queue,message);
-
- branch.addPostTransactionAction(new ServerTransaction.Action()
- {
-
- public void postCommit()
- {
- queue.enqueue(message, null);
- ref.release();
- }
-
- public void onRollback()
- {
- ref.release();
- }
- });
- }
- else
- {
- StringBuilder xidString = xidAsString(id);
- _eventLogger.message(_logSubject,
- TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(),
- Long.toString(messageId)));
-
- }
-
- }
- else
- {
- StringBuilder xidString = xidAsString(id);
- _eventLogger.message(_logSubject,
- TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
- record.getResource().getId().toString()));
-
- }
- }
- for(Transaction.Record record : dequeues)
- {
- final AMQQueue queue = _virtualHost.getQueue(record.getResource().getId());
- if(queue != null)
- {
- final long messageId = record.getMessage().getMessageNumber();
- final ServerMessage message = _recoveredMessages.get(messageId);
- _unusedMessages.remove(messageId);
-
- if(message != null)
- {
- final QueueEntry entry = queue.getMessageOnTheQueue(messageId);
-
- entry.acquire();
-
- branch.dequeue(queue, message);
-
- branch.addPostTransactionAction(new ServerTransaction.Action()
- {
-
- public void postCommit()
- {
- entry.delete();
- }
-
- public void onRollback()
- {
- entry.release();
- }
- });
- }
- else
- {
- StringBuilder xidString = xidAsString(id);
- _eventLogger.message(_logSubject,
- TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(),
- Long.toString(messageId)));
-
- }
-
- }
- else
- {
- StringBuilder xidString = xidAsString(id);
- _eventLogger.message(_logSubject,
- TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
- record.getResource().getId().toString()));
- }
-
- }
-
- branch.setState(DtxBranch.State.PREPARED);
- branch.prePrepareTransaction();
- }
-
- private static StringBuilder xidAsString(Xid id)
- {
- return new StringBuilder("(")
- .append(id.getFormat())
- .append(',')
- .append(Functions.str(id.getGlobalId()))
- .append(',')
- .append(Functions.str(id.getBranchId()))
- .append(')');
- }
-
- public void completeDtxRecordRecovery()
- {
- for(StoredMessage m : _unusedMessages.values())
- {
- _logger.warn("Message id " + m.getMessageNumber() + " in store, but not in any queue - removing....");
- m.remove();
- }
- _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false));
-
- _eventLogger.message(_logSubject, MessageStoreMessages.RECOVERED(_recoveredMessages.size() - _unusedMessages.size()));
- _eventLogger.message(_logSubject, MessageStoreMessages.RECOVERY_COMPLETE());
- }
-
- public void queueEntry(final UUID queueId, long messageId)
- {
- AMQQueue queue = _virtualHost.getQueue(queueId);
- if(queue != null)
- {
- String queueName = queue.getName();
- ServerMessage message = _recoveredMessages.get(messageId);
- _unusedMessages.remove(messageId);
-
- if(message != null)
- {
-
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("On recovery, delivering " + message.getMessageNumber() + " to " + queueName);
- }
-
- Integer count = _queueRecoveries.get(queueName);
- if (count == null)
- {
- count = 0;
- }
-
- queue.enqueue(message,null);
-
- _queueRecoveries.put(queueName, ++count);
- }
- else
- {
- _logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queueName + " is unknown, entry will be discarded");
- Transaction txn = _store.newTransaction();
- txn.dequeueMessage(queue, new DummyMessage(messageId));
- txn.commitTranAsync();
- }
- }
- else
- {
- _logger.warn("Message id " + messageId + " in log references queue with id " + queueId + " which is not in the configuration, entry will be discarded");
- Transaction txn = _store.newTransaction();
- TransactionLogResource mockQueue =
- new TransactionLogResource()
- {
- @Override
- public String getName()
- {
- return "<<UNKNOWN>>";
- }
-
- @Override
- public UUID getId()
- {
- return queueId;
- }
-
- @Override
- public boolean isDurable()
- {
- return false;
- }
- };
- txn.dequeueMessage(mockQueue, new DummyMessage(messageId));
- txn.commitTranAsync();
- }
- }
-
- public DtxRecordRecoveryHandler completeQueueEntryRecovery()
- {
-
- for(Map.Entry<String,Integer> entry : _queueRecoveries.entrySet())
- {
- _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERED(entry.getValue(), entry.getKey()));
-
- _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(entry.getKey(), true));
- }
-
- return this;
- }
-
- private static class DummyMessage implements EnqueueableMessage
- {
-
-
- private final long _messageId;
-
- public DummyMessage(long messageId)
- {
- _messageId = messageId;
- }
-
- public long getMessageNumber()
- {
- return _messageId;
- }
-
-
- public boolean isPersistent()
- {
- return true;
- }
-
-
- public StoredMessage getStoredMessage()
- {
- return null;
- }
- }
-
-}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java
index 0fe9d1ac49..1de857d224 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java
@@ -50,10 +50,10 @@ import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.SystemContext;
import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
import org.apache.qpid.test.utils.QpidTestCase;
public class ManagementModeStoreHandlerTest extends QpidTestCase
@@ -89,20 +89,22 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase
when(_portEntry.getParents()).thenReturn(Collections.singletonMap(Broker.class.getSimpleName(), _root));
when(_portEntry.getType()).thenReturn(Port.class.getSimpleName());
- final ArgumentCaptor<ConfigurationRecoveryHandler> recovererArgumentCaptor = ArgumentCaptor.forClass(ConfigurationRecoveryHandler.class);
+ final ArgumentCaptor<ConfiguredObjectRecordHandler> recovererArgumentCaptor = ArgumentCaptor.forClass(ConfiguredObjectRecordHandler.class);
doAnswer(
new Answer()
{
@Override
public Object answer(final InvocationOnMock invocation) throws Throwable
{
- ConfigurationRecoveryHandler recoverer = recovererArgumentCaptor.getValue();
- recoverer.configuredObject(_root);
- recoverer.configuredObject(_portEntry);
+ ConfiguredObjectRecordHandler recoverer = recovererArgumentCaptor.getValue();
+ if(recoverer.handle(_root))
+ {
+ recoverer.handle(_portEntry);
+ }
return null;
}
}
- ).when(_store).recoverConfigurationStore(recovererArgumentCaptor.capture());
+ ).when(_store).visitConfiguredObjectRecords(recovererArgumentCaptor.capture());
_options = new BrokerOptions();
_handler = new ManagementModeStoreHandler(_store, _options);
@@ -112,21 +114,21 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase
private ConfiguredObjectRecord getRootEntry()
{
BrokerFinder brokerFinder = new BrokerFinder();
- _handler.recoverConfigurationStore(brokerFinder);
+ _handler.visitConfiguredObjectRecords(brokerFinder);
return brokerFinder.getBrokerRecord();
}
private ConfiguredObjectRecord getEntry(UUID id)
{
RecordFinder recordFinder = new RecordFinder(id);
- _handler.recoverConfigurationStore(recordFinder);
+ _handler.visitConfiguredObjectRecords(recordFinder);
return recordFinder.getFoundRecord();
}
private Collection<UUID> getChildrenIds(ConfiguredObjectRecord record)
{
ChildFinder childFinder = new ChildFinder(record);
- _handler.recoverConfigurationStore(childFinder);
+ _handler.visitConfiguredObjectRecords(childFinder);
return childFinder.getChildIds();
}
@@ -288,21 +290,25 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase
attributes.put(VirtualHost.TYPE, "STANDARD");
final ConfiguredObjectRecord virtualHost = new ConfiguredObjectRecordImpl(virtualHostId, VirtualHost.class.getSimpleName(), attributes, Collections.singletonMap(Broker.class.getSimpleName(), _root));
- final ArgumentCaptor<ConfigurationRecoveryHandler> recovererArgumentCaptor = ArgumentCaptor.forClass(ConfigurationRecoveryHandler.class);
+ final ArgumentCaptor<ConfiguredObjectRecordHandler> recovererArgumentCaptor = ArgumentCaptor.forClass(ConfiguredObjectRecordHandler.class);
doAnswer(
new Answer()
{
@Override
public Object answer(final InvocationOnMock invocation) throws Throwable
{
- ConfigurationRecoveryHandler recoverer = recovererArgumentCaptor.getValue();
- recoverer.configuredObject(_root);
- recoverer.configuredObject(_portEntry);
- recoverer.configuredObject(virtualHost);
+ ConfiguredObjectRecordHandler recoverer = recovererArgumentCaptor.getValue();
+ if(recoverer.handle(_root))
+ {
+ if(recoverer.handle(_portEntry))
+ {
+ recoverer.handle(virtualHost);
+ }
+ }
return null;
}
}
- ).when(_store).recoverConfigurationStore(recovererArgumentCaptor.capture());
+ ).when(_store).visitConfiguredObjectRecords(recovererArgumentCaptor.capture());
State expectedState = mmQuiesceVhosts ? State.QUIESCED : null;
if(mmQuiesceVhosts)
@@ -457,28 +463,32 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase
}
- private class BrokerFinder implements ConfigurationRecoveryHandler
+ private class BrokerFinder implements ConfiguredObjectRecordHandler
{
private ConfiguredObjectRecord _brokerRecord;
+ private int _version;
+
@Override
- public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion)
+ public void begin(final int configVersion)
{
-
+ _version = configVersion;
}
@Override
- public void configuredObject(final ConfiguredObjectRecord object)
+ public boolean handle(final ConfiguredObjectRecord object)
{
if(object.getType().equals(Broker.class.getSimpleName()))
{
_brokerRecord = object;
+ return false;
}
+ return true;
}
@Override
- public int completeConfigurationRecovery()
+ public int end()
{
- return 0;
+ return _version;
}
public ConfiguredObjectRecord getBrokerRecord()
@@ -487,10 +497,11 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase
}
}
- private class RecordFinder implements ConfigurationRecoveryHandler
+ private class RecordFinder implements ConfiguredObjectRecordHandler
{
private final UUID _id;
private ConfiguredObjectRecord _foundRecord;
+ private int _version;
private RecordFinder(final UUID id)
{
@@ -498,24 +509,26 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase
}
@Override
- public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion)
+ public void begin(final int configVersion)
{
-
+ _version = configVersion;
}
@Override
- public void configuredObject(final ConfiguredObjectRecord object)
+ public boolean handle(final ConfiguredObjectRecord object)
{
if(object.getId().equals(_id))
{
_foundRecord = object;
+ return false;
}
+ return true;
}
@Override
- public int completeConfigurationRecovery()
+ public int end()
{
- return 0;
+ return _version;
}
public ConfiguredObjectRecord getFoundRecord()
@@ -524,10 +537,11 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase
}
}
- private class ChildFinder implements ConfigurationRecoveryHandler
+ private class ChildFinder implements ConfiguredObjectRecordHandler
{
private final Collection<UUID> _childIds = new HashSet<UUID>();
private final ConfiguredObjectRecord _parent;
+ private int _version;
private ChildFinder(final ConfiguredObjectRecord parent)
{
@@ -535,13 +549,13 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase
}
@Override
- public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion)
+ public void begin(final int configVersion)
{
-
+ _version = configVersion;
}
@Override
- public void configuredObject(final ConfiguredObjectRecord object)
+ public boolean handle(final ConfiguredObjectRecord object)
{
if(object.getParents() != null)
@@ -555,12 +569,13 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase
}
}
+ return true;
}
@Override
- public int completeConfigurationRecovery()
+ public int end()
{
- return 0;
+ return _version;
}
public Collection<UUID> getChildIds()
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
index 83052110a1..b38d9d7bd2 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
@@ -21,9 +21,7 @@
package org.apache.qpid.server.store;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
@@ -49,6 +47,7 @@ import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.FileUtils;
import org.mockito.ArgumentCaptor;
@@ -71,9 +70,8 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
private String _storePath;
private String _storeName;
- private ConfigurationRecoveryHandler _recoveryHandler;
+ private ConfiguredObjectRecordHandler _handler;
- private ExchangeImpl _exchange = mock(ExchangeImpl.class);
private static final String ROUTING_KEY = "routingKey";
private static final String QUEUE_NAME = "queueName";
private Map<String,Object> _bindingArgs;
@@ -96,16 +94,8 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
FileUtils.delete(new File(_storePath), true);
setTestSystemProperty("QPID_WORK", TMP_FOLDER);
- _recoveryHandler = mock(ConfigurationRecoveryHandler.class);
- when(_exchange.getName()).thenReturn(EXCHANGE_NAME);
- when(_exchange.getId()).thenReturn(_exchangeId);
- when(_exchange.getExchangeType()).thenReturn(mock(ExchangeType.class));
- when(_exchange.getEventLogger()).thenReturn(new EventLogger());
-
- ConfiguredObjectRecord exchangeRecord = mock(ConfiguredObjectRecord.class);
- when(exchangeRecord.getId()).thenReturn(_exchangeId);
- when(exchangeRecord.getType()).thenReturn(Exchange.class.getSimpleName());
- when(_exchange.asObjectRecord()).thenReturn(exchangeRecord);
+ _handler = mock(ConfiguredObjectRecordHandler.class);
+ when(_handler.handle(any(ConfiguredObjectRecord.class))).thenReturn(true);
_bindingArgs = new HashMap<String, Object>();
String argKey = AMQPFilterTypes.JMS_SELECTOR.toString();
@@ -134,7 +124,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
DurableConfigurationStoreHelper.createExchange(_configStore, exchange);
reopenStore();
- verify(_recoveryHandler).configuredObject(matchesRecord(_exchangeId, EXCHANGE,
+ verify(_handler).handle(matchesRecord(_exchangeId, EXCHANGE,
map( org.apache.qpid.server.model.Exchange.NAME, getName(),
org.apache.qpid.server.model.Exchange.TYPE, getName()+"Type",
org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS.name())));
@@ -168,14 +158,16 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
DurableConfigurationStoreHelper.removeExchange(_configStore, exchange);
reopenStore();
- verify(_recoveryHandler, never()).configuredObject(any(ConfiguredObjectRecord.class));
+ verify(_handler, never()).handle(any(ConfiguredObjectRecord.class));
}
public void testBindQueue() throws Exception
{
+ ExchangeImpl<?> exchange = createTestExchange();
AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false, null);
BindingImpl binding = new BindingImpl(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue,
- _exchange, _bindingArgs);
+ exchange, _bindingArgs);
+ DurableConfigurationStoreHelper.createExchange(_configStore, exchange);
DurableConfigurationStoreHelper.createQueue(_configStore, queue);
DurableConfigurationStoreHelper.createBinding(_configStore, binding);
@@ -187,10 +179,10 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
Map<String,UUID> parents = new HashMap<String, UUID>();
- parents.put(Exchange.class.getSimpleName(), _exchange.getId());
+ parents.put(Exchange.class.getSimpleName(), exchange.getId());
parents.put(Queue.class.getSimpleName(), queue.getId());
- verify(_recoveryHandler).configuredObject(matchesRecord(binding.getId(), BINDING, map, parents));
+ verify(_handler).handle(matchesRecord(binding.getId(), BINDING, map, parents));
}
@@ -260,15 +252,18 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
public void testUnbindQueue() throws Exception
{
+ ExchangeImpl<?> exchange = createTestExchange();
+ DurableConfigurationStoreHelper.createExchange(_configStore, exchange);
+
AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false, null);
BindingImpl binding = new BindingImpl(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue,
- _exchange, _bindingArgs);
+ exchange, _bindingArgs);
DurableConfigurationStoreHelper.createBinding(_configStore, binding);
DurableConfigurationStoreHelper.removeBinding(_configStore, binding);
reopenStore();
- verify(_recoveryHandler, never()).configuredObject(matchesRecord(ANY_UUID, BINDING,
+ verify(_handler, never()).handle(matchesRecord(ANY_UUID, BINDING,
ANY_MAP));
}
@@ -282,7 +277,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
queueAttributes.put(Queue.NAME, getName());
queueAttributes.put(Queue.OWNER, getName()+"Owner");
queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name());
- verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes));
+ verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes));
}
public void testCreateQueueAMQQueueFieldTable() throws Exception
@@ -304,7 +299,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name());
queueAttributes.putAll(attributes);
- verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes));
+ verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes));
}
public void testCreateQueueAMQQueueWithAlternateExchange() throws Exception
@@ -322,7 +317,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name());
queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString());
- verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes));
+ verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes));
}
private ExchangeImpl createTestAlternateExchange()
@@ -355,7 +350,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
queueAttributes.put(Queue.NAME, getName());
queueAttributes.putAll(attributes);
- verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes));
+ verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes));
}
@@ -382,7 +377,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
queueAttributes.putAll(attributes);
queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString());
- verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes));
+ verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes));
}
public void testRemoveQueue() throws Exception
@@ -397,7 +392,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
// remove queue
DurableConfigurationStoreHelper.removeQueue(_configStore,queue);
reopenStore();
- verify(_recoveryHandler, never()).configuredObject(any(ConfiguredObjectRecord.class));
+ verify(_handler, never()).handle(any(ConfiguredObjectRecord.class));
}
private AMQQueue createTestQueue(String queueName,
@@ -463,11 +458,9 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
{
ExchangeImpl exchange = mock(ExchangeImpl.class);
Map<String,Object> actualAttributes = new HashMap<String, Object>();
- actualAttributes.put("id", _exchangeId);
actualAttributes.put("name", getName());
actualAttributes.put("type", getName() + "Type");
actualAttributes.put("lifetimePolicy", LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
- when(exchange.getActualAttributes()).thenReturn(actualAttributes);
when(exchange.getName()).thenReturn(getName());
when(exchange.getTypeName()).thenReturn(getName() + "Type");
when(exchange.isAutoDelete()).thenReturn(true);
@@ -475,11 +468,10 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
ConfiguredObjectRecord exchangeRecord = mock(ConfiguredObjectRecord.class);
when(exchangeRecord.getId()).thenReturn(_exchangeId);
when(exchangeRecord.getType()).thenReturn(Exchange.class.getSimpleName());
- Map<String,Object> actualAttributesExceptId = new HashMap<String, Object>(actualAttributes);
- actualAttributesExceptId.remove("id");
- when(exchangeRecord.getAttributes()).thenReturn(actualAttributesExceptId);
+ when(exchangeRecord.getAttributes()).thenReturn(actualAttributes);
when(exchange.asObjectRecord()).thenReturn(exchangeRecord);
-
+ when(exchange.getExchangeType()).thenReturn(mock(ExchangeType.class));
+ when(exchange.getEventLogger()).thenReturn(new EventLogger());
return exchange;
}
@@ -491,7 +483,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
ConfiguredObject<?> parent = mock(ConfiguredObject.class);
when(parent.getName()).thenReturn("testName");
_configStore.openConfigurationStore(parent, _configurationStoreSettings);
- _configStore.recoverConfigurationStore(_recoveryHandler);
+ _configStore.visitConfiguredObjectRecords(_handler);
}
protected abstract DurableConfigurationStore createConfigStore() throws Exception;
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreConfigurationTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreConfigurationTest.java
index 8f2d0029f6..2400a68c93 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreConfigurationTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreConfigurationTest.java
@@ -27,10 +27,4 @@ public class JsonFileConfigStoreConfigurationTest extends AbstractDurableConfigu
{
return new JsonFileConfigStore();
}
-
- @Override
- public void testBindQueue() throws Exception
- {
- // TODO: Temporarily disable the test as it is already fixed on trunk
- }
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java
index 1de24e371d..6907898a6c 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java
@@ -28,6 +28,7 @@ import java.util.UUID;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.test.utils.TestFileUtils;
@@ -43,15 +44,15 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.times;
public class JsonFileConfigStoreTest extends QpidTestCase
{
- private final ConfigurationRecoveryHandler _recoveryHandler = mock(ConfigurationRecoveryHandler.class);
-
private JsonFileConfigStore _store;
private HashMap<String, Object> _configurationStoreSettings;
private ConfiguredObject<?> _virtualHost;
private File _storeLocation;
+ private ConfiguredObjectRecordHandler _handler;
private static final UUID ANY_UUID = UUID.randomUUID();
@@ -69,6 +70,9 @@ public class JsonFileConfigStoreTest extends QpidTestCase
_configurationStoreSettings.put(JsonFileConfigStore.STORE_TYPE, JsonFileConfigStore.TYPE);
_configurationStoreSettings.put(JsonFileConfigStore.STORE_PATH, _storeLocation.getAbsolutePath());
_store = new JsonFileConfigStore();
+
+ _handler = mock(ConfiguredObjectRecordHandler.class);
+ when(_handler.handle(any(ConfiguredObjectRecord.class))).thenReturn(true);
}
@Override
@@ -113,35 +117,35 @@ public class JsonFileConfigStoreTest extends QpidTestCase
}
}
- public void testStartFromNoStore() throws Exception
+ public void testVisitEmptyStore()
{
_store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
- _store.recoverConfigurationStore(_recoveryHandler);
- InOrder inorder = inOrder(_recoveryHandler);
- inorder.verify(_recoveryHandler).beginConfigurationRecovery(eq(_store), eq(0));
- inorder.verify(_recoveryHandler,never()).configuredObject(any(ConfiguredObjectRecord.class));
- inorder.verify(_recoveryHandler).completeConfigurationRecovery();
+ _store.visitConfiguredObjectRecords(_handler);
+ InOrder inorder = inOrder(_handler);
+ inorder.verify(_handler).begin(eq(0));
+ inorder.verify(_handler,never()).handle(any(ConfiguredObjectRecord.class));
+ inorder.verify(_handler).end();
_store.closeConfigurationStore();
}
public void testUpdatedConfigVersionIsRetained() throws Exception
{
final int NEW_CONFIG_VERSION = 42;
- when(_recoveryHandler.completeConfigurationRecovery()).thenReturn(NEW_CONFIG_VERSION);
+ when(_handler.end()).thenReturn(NEW_CONFIG_VERSION);
_store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
- _store.recoverConfigurationStore(_recoveryHandler);
+ _store.visitConfiguredObjectRecords(_handler);
_store.closeConfigurationStore();
_store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
- _store.recoverConfigurationStore(_recoveryHandler);
- InOrder inorder = inOrder(_recoveryHandler);
+ _store.visitConfiguredObjectRecords(_handler);
+ InOrder inorder = inOrder(_handler);
// first time the config version should be the initial version - 0
- inorder.verify(_recoveryHandler).beginConfigurationRecovery(eq(_store), eq(0));
+ inorder.verify(_handler).begin(eq(0));
// second time the config version should be the updated version
- inorder.verify(_recoveryHandler).beginConfigurationRecovery(eq(_store), eq(NEW_CONFIG_VERSION));
+ inorder.verify(_handler).begin(eq(NEW_CONFIG_VERSION));
_store.closeConfigurationStore();
}
@@ -157,8 +161,9 @@ public class JsonFileConfigStoreTest extends QpidTestCase
_store.closeConfigurationStore();
_store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
- _store.recoverConfigurationStore(_recoveryHandler);
- verify(_recoveryHandler).configuredObject(matchesRecord(queueId, queueType, queueAttr));
+
+ _store.visitConfiguredObjectRecords(_handler);
+ verify(_handler, times(1)).handle(matchesRecord(queueId, queueType, queueAttr));
_store.closeConfigurationStore();
}
@@ -179,8 +184,8 @@ public class JsonFileConfigStoreTest extends QpidTestCase
_store.closeConfigurationStore();
_store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
- _store.recoverConfigurationStore(_recoveryHandler);
- verify(_recoveryHandler).configuredObject(matchesRecord(queueId, queueType, queueAttr));
+ _store.visitConfiguredObjectRecords(_handler);
+ verify(_handler, times(1)).handle(matchesRecord(queueId, queueType, queueAttr));
_store.closeConfigurationStore();
}
@@ -201,8 +206,8 @@ public class JsonFileConfigStoreTest extends QpidTestCase
_store.closeConfigurationStore();
_store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
- _store.recoverConfigurationStore(_recoveryHandler);
- verify(_recoveryHandler, never()).configuredObject(any(ConfiguredObjectRecord.class));
+ _store.visitConfiguredObjectRecords(_handler);
+ verify(_handler, never()).handle(any(ConfiguredObjectRecord.class));
_store.closeConfigurationStore();
}
@@ -311,12 +316,12 @@ public class JsonFileConfigStoreTest extends QpidTestCase
_store.update(true, bindingRecord, binding2Record);
_store.closeConfigurationStore();
_store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
- _store.recoverConfigurationStore(_recoveryHandler);
- verify(_recoveryHandler).configuredObject(matchesRecord(queueId, "Queue", EMPTY_ATTR));
- verify(_recoveryHandler).configuredObject(matchesRecord(queue2Id, "Queue", EMPTY_ATTR));
- verify(_recoveryHandler).configuredObject(matchesRecord(exchangeId, "Exchange", EMPTY_ATTR));
- verify(_recoveryHandler).configuredObject(matchesRecord(bindingId, "Binding", EMPTY_ATTR));
- verify(_recoveryHandler).configuredObject(matchesRecord(binding2Id, "Binding", EMPTY_ATTR));
+ _store.visitConfiguredObjectRecords(_handler);
+ verify(_handler).handle(matchesRecord(queueId, "Queue", EMPTY_ATTR));
+ verify(_handler).handle(matchesRecord(queue2Id, "Queue", EMPTY_ATTR));
+ verify(_handler).handle(matchesRecord(exchangeId, "Exchange", EMPTY_ATTR));
+ verify(_handler).handle(matchesRecord(bindingId, "Binding", EMPTY_ATTR));
+ verify(_handler).handle(matchesRecord(binding2Id, "Binding", EMPTY_ATTR));
_store.closeConfigurationStore();
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
index 451a2744c3..89fef15e7e 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
@@ -33,7 +33,6 @@ import java.util.UUID;
import org.apache.log4j.Logger;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.FileUtils;
@@ -66,12 +65,9 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple
_store = createStore();
- MessageStoreRecoveryHandler recoveryHandler = mock(MessageStoreRecoveryHandler.class);
- when(recoveryHandler.begin()).thenReturn(mock(StoredMessageRecoveryHandler.class));
ConfiguredObject<?> parent = mock(ConfiguredObject.class);
when(parent.getName()).thenReturn("test");
_store.openMessageStore(parent, storeSettings);
- _store.recoverMessageStore(recoveryHandler, null);
_transactionResource = UUID.randomUUID();
_events = new ArrayList<Event>();
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
index 51d3fc15d2..8bf981bd7b 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
@@ -21,29 +21,31 @@
package org.apache.qpid.server.store;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
import org.apache.qpid.server.store.Transaction.Record;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
import org.apache.qpid.test.utils.QpidTestCase;
+import org.mockito.ArgumentMatcher;
public abstract class MessageStoreTestCase extends QpidTestCase
{
- private MessageStoreRecoveryHandler _messageStoreRecoveryHandler;
- private StoredMessageRecoveryHandler _storedMessageRecoveryHandler;
- private TransactionLogRecoveryHandler _logRecoveryHandler;
- private TransactionLogRecoveryHandler.QueueEntryRecoveryHandler _queueEntryRecoveryHandler;
- private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler _dtxRecordRecoveryHandler;
-
private MessageStore _store;
private Map<String, Object> _storeSettings;
private ConfiguredObject<?> _parent;
@@ -55,35 +57,34 @@ public abstract class MessageStoreTestCase extends QpidTestCase
_parent = mock(ConfiguredObject.class);
when(_parent.getName()).thenReturn("test");
- _storedMessageRecoveryHandler = mock(StoredMessageRecoveryHandler.class);
- _logRecoveryHandler = mock(TransactionLogRecoveryHandler.class);
- _messageStoreRecoveryHandler = mock(MessageStoreRecoveryHandler.class);
- _queueEntryRecoveryHandler = mock(TransactionLogRecoveryHandler.QueueEntryRecoveryHandler.class);
- _dtxRecordRecoveryHandler = mock(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler.class);
-
- when(_messageStoreRecoveryHandler.begin()).thenReturn(_storedMessageRecoveryHandler);
- when(_logRecoveryHandler.begin(any(MessageStore.class))).thenReturn(_queueEntryRecoveryHandler);
- when(_queueEntryRecoveryHandler.completeQueueEntryRecovery()).thenReturn(_dtxRecordRecoveryHandler);
-
_storeSettings = getStoreSettings();
_store = createMessageStore();
_store.openMessageStore(_parent, _storeSettings);
- _store.recoverMessageStore(_messageStoreRecoveryHandler, _logRecoveryHandler);
+
}
protected abstract Map<String, Object> getStoreSettings() throws Exception;
protected abstract MessageStore createMessageStore();
- public MessageStore getStore()
+ protected MessageStore getStore()
{
return _store;
}
- public void testRecordXid() throws Exception
+ protected void reopenStore() throws Exception
{
+ _store.closeMessageStore();
+
+ _store = createMessageStore();
+ _store.openMessageStore(_parent, _storeSettings);
+ }
+
+ public void testAddAndRemoveRecordXid() throws Exception
+ {
+ long format = 1l;
Record enqueueRecord = getTestRecord(1);
Record dequeueRecord = getTestRecord(2);
Record[] enqueues = { enqueueRecord };
@@ -92,27 +93,287 @@ public abstract class MessageStoreTestCase extends QpidTestCase
byte[] branchId = new byte[] { 2 };
Transaction transaction = _store.newTransaction();
- transaction.recordXid(1l, globalId, branchId, enqueues, dequeues);
+ transaction.recordXid(format, globalId, branchId, enqueues, dequeues);
transaction.commitTran();
+
reopenStore();
- verify(_dtxRecordRecoveryHandler).dtxRecord(1l, globalId, branchId, enqueues, dequeues);
+
+ DistributedTransactionHandler handler = mock(DistributedTransactionHandler.class);
+ _store.visitDistributedTransactions(handler);
+ verify(handler, times(1)).handle(format,globalId, branchId, enqueues, dequeues);
transaction = _store.newTransaction();
transaction.removeXid(1l, globalId, branchId);
transaction.commitTran();
reopenStore();
- verify(_dtxRecordRecoveryHandler, times(1)).dtxRecord(1l, globalId, branchId, enqueues, dequeues);
+
+ handler = mock(DistributedTransactionHandler.class);
+ _store.visitDistributedTransactions(handler);
+ verify(handler, never()).handle(format,globalId, branchId, enqueues, dequeues);
}
- private void reopenStore() throws Exception
+ public void testVisitMessages() throws Exception
{
- _store.closeMessageStore();
+ long messageId = 1;
+ int contentSize = 0;
+ final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize));
+ StoreFuture flushFuture = message.flushToStore();
+ flushFuture.waitForCompletion();
- _store = createMessageStore();
- _store.openMessageStore(_parent, _storeSettings);
- _store.recoverMessageStore(_messageStoreRecoveryHandler, _logRecoveryHandler);
+ MessageHandler handler = mock(MessageHandler.class);
+ _store.visitMessages(handler);
+
+ verify(handler, times(1)).handle(argThat(new MessageMetaDataMatcher(messageId)));
+
+ }
+
+ public void testVisitMessagesAborted() throws Exception
+ {
+ int contentSize = 0;
+ for (int i = 0; i < 3; i++)
+ {
+ final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(i + 1, contentSize));
+ StoreFuture flushFuture = message.flushToStore();
+ flushFuture.waitForCompletion();
+ }
+
+ MessageHandler handler = mock(MessageHandler.class);
+ when(handler.handle(any(StoredMessage.class))).thenReturn(true, false);
+
+ _store.visitMessages(handler);
+
+ verify(handler, times(2)).handle(any(StoredMessage.class));
+ }
+
+ public void testReopenedMessageStoreUsesLastMessageId() throws Exception
+ {
+ int contentSize = 0;
+ for (int i = 0; i < 3; i++)
+ {
+ final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(i + 1, contentSize));
+ StoreFuture flushFuture = message.flushToStore();
+ flushFuture.waitForCompletion();
+ }
+
+ reopenStore();
+
+ final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(4, contentSize));
+
+ StoreFuture flushFuture = message.flushToStore();
+ flushFuture.waitForCompletion();
+
+ assertEquals("Unexpected message id", 4, message.getMessageNumber());
+ }
+
+ public void testVisitMessageInstances() throws Exception
+ {
+ long messageId = 1;
+ int contentSize = 0;
+ final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize));
+ StoreFuture flushFuture = message.flushToStore();
+ flushFuture.waitForCompletion();
+
+ EnqueueableMessage enqueueableMessage = createMockEnqueueableMessage(messageId, message);
+
+ UUID queueId = UUID.randomUUID();
+ TransactionLogResource queue = createTransactionLogResource(queueId);
+
+ Transaction transaction = _store.newTransaction();
+ transaction.enqueueMessage(queue, enqueueableMessage);
+ transaction.commitTran();
+
+ MessageInstanceHandler handler = mock(MessageInstanceHandler.class);
+ _store.visitMessageInstances(handler);
+
+ verify(handler, times(1)).handle(queueId, messageId);
+ }
+
+ public void testVisitDistributedTransactions() throws Exception
+ {
+ long format = 1l;
+ byte[] branchId = new byte[] { 2 };
+ byte[] globalId = new byte[] { 1 };
+ Record enqueueRecord = getTestRecord(1);
+ Record dequeueRecord = getTestRecord(2);
+ Record[] enqueues = { enqueueRecord };
+ Record[] dequeues = { dequeueRecord };
+
+ Transaction transaction = _store.newTransaction();
+ transaction.recordXid(format, globalId, branchId, enqueues, dequeues);
+ transaction.commitTran();
+
+ DistributedTransactionHandler handler = mock(DistributedTransactionHandler.class);
+ _store.visitDistributedTransactions(handler);
+
+ verify(handler, times(1)).handle(format,globalId, branchId, enqueues, dequeues);
+
+ }
+
+ public void testCommitTransaction() throws Exception
+ {
+ final UUID mockQueueId = UUIDGenerator.generateRandomUUID();
+ TransactionLogResource mockQueue = createTransactionLogResource(mockQueueId);
+
+ Transaction txn = getStore().newTransaction();
+
+ long messageId1 = 1L;
+ long messageId2 = 5L;
+ final EnqueueableMessage enqueueableMessage1 = createEnqueueableMessage(messageId1);
+ final EnqueueableMessage enqueueableMessage2 = createEnqueueableMessage(messageId2);
+
+ txn.enqueueMessage(mockQueue, enqueueableMessage1);
+ txn.enqueueMessage(mockQueue, enqueueableMessage2);
+ txn.commitTran();
+
+ QueueFilteringMessageInstanceHandler filter = new QueueFilteringMessageInstanceHandler(mockQueueId);
+ getStore().visitMessageInstances(filter);
+ Set<Long> enqueuedIds = filter.getEnqueuedIds();
+
+ assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
+ assertTrue("Message with id " + messageId1 + " is not found", enqueuedIds.contains(messageId1));
+ assertTrue("Message with id " + messageId2 + " is not found", enqueuedIds.contains(messageId2));
}
+
+ public void testRollbackTransactionBeforeCommit() throws Exception
+ {
+ final UUID mockQueueId = UUIDGenerator.generateRandomUUID();
+ TransactionLogResource mockQueue = createTransactionLogResource(mockQueueId);
+
+ long messageId1 = 21L;
+ long messageId2 = 22L;
+ long messageId3 = 23L;
+ final EnqueueableMessage enqueueableMessage1 = createEnqueueableMessage(messageId1);
+ final EnqueueableMessage enqueueableMessage2 = createEnqueueableMessage(messageId2);
+ final EnqueueableMessage enqueueableMessage3 = createEnqueueableMessage(messageId3);
+
+ Transaction txn = getStore().newTransaction();
+
+ txn.enqueueMessage(mockQueue, enqueueableMessage1);
+ txn.abortTran();
+
+ txn = getStore().newTransaction();
+ txn.enqueueMessage(mockQueue, enqueueableMessage2);
+ txn.enqueueMessage(mockQueue, enqueueableMessage3);
+ txn.commitTran();
+
+ QueueFilteringMessageInstanceHandler filter = new QueueFilteringMessageInstanceHandler(mockQueueId);
+ getStore().visitMessageInstances(filter);
+ Set<Long> enqueuedIds = filter.getEnqueuedIds();
+
+ assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
+ assertTrue("Message with id " + messageId2 + " is not found", enqueuedIds.contains(messageId2));
+ assertTrue("Message with id " + messageId3 + " is not found", enqueuedIds.contains(messageId3));
+ }
+
+ public void testRollbackTransactionAfterCommit() throws Exception
+ {
+ final UUID mockQueueId = UUIDGenerator.generateRandomUUID();
+ TransactionLogResource mockQueue = createTransactionLogResource(mockQueueId);
+
+ long messageId1 = 30L;
+ long messageId2 = 31L;
+ long messageId3 = 32L;
+
+ final EnqueueableMessage enqueueableMessage1 = createEnqueueableMessage(messageId1);
+ final EnqueueableMessage enqueueableMessage2 = createEnqueueableMessage(messageId2);
+ final EnqueueableMessage enqueueableMessage3 = createEnqueueableMessage(messageId3);
+
+ Transaction txn = getStore().newTransaction();
+
+ txn.enqueueMessage(mockQueue, enqueueableMessage1);
+ txn.commitTran();
+
+ txn = getStore().newTransaction();
+ txn.enqueueMessage(mockQueue, enqueueableMessage2);
+ txn.abortTran();
+
+ txn = getStore().newTransaction();
+ txn.enqueueMessage(mockQueue, enqueueableMessage3);
+ txn.commitTran();
+
+ QueueFilteringMessageInstanceHandler filter = new QueueFilteringMessageInstanceHandler(mockQueueId);
+ getStore().visitMessageInstances(filter);
+ Set<Long> enqueuedIds = filter.getEnqueuedIds();
+
+ assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
+ assertTrue("Message with id " + messageId1 + " is not found", enqueuedIds.contains(messageId1));
+ assertTrue("Message with id " + messageId3 + " is not found", enqueuedIds.contains(messageId3));
+ }
+
+ public void testStoreIgnoresTransientMessage() throws Exception
+ {
+ long messageId = 1;
+ int contentSize = 0;
+ final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize, false));
+ StoreFuture flushFuture = message.flushToStore();
+ flushFuture.waitForCompletion();
+
+ MessageHandler handler = mock(MessageHandler.class);
+ _store.visitMessages(handler);
+
+ verify(handler, times(0)).handle(argThat(new MessageMetaDataMatcher(messageId)));
+ }
+
+ public void testAddAndRemoveMessageWithoutContent() throws Exception
+ {
+ long messageId = 1;
+ int contentSize = 0;
+ final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize));
+ StoreFuture flushFuture = message.flushToStore();
+ flushFuture.waitForCompletion();
+
+ final AtomicReference<StoredMessage<?>> retrievedMessageRef = new AtomicReference<StoredMessage<?>>();
+ _store.visitMessages(new MessageHandler()
+ {
+
+ @Override
+ public boolean handle(StoredMessage<?> storedMessage)
+ {
+ retrievedMessageRef.set(storedMessage);
+ return true;
+ }
+ });
+
+ StoredMessage<?> retrievedMessage = retrievedMessageRef.get();
+ assertNotNull("Message was not found", retrievedMessageRef);
+ assertEquals("Unexpected retreived message", message.getMessageNumber(), retrievedMessage.getMessageNumber());
+
+ retrievedMessage.remove();
+
+ retrievedMessageRef.set(null);
+ _store.visitMessages(new MessageHandler()
+ {
+
+ @Override
+ public boolean handle(StoredMessage<?> storedMessage)
+ {
+ retrievedMessageRef.set(storedMessage);
+ return true;
+ }
+ });
+ assertNull(retrievedMessageRef.get());
+ }
+
+
+ private TransactionLogResource createTransactionLogResource(UUID queueId)
+ {
+ TransactionLogResource queue = mock(TransactionLogResource.class);
+ when(queue.getId()).thenReturn(queueId);
+ when(queue.getName()).thenReturn("testQueue");
+ when(queue.isDurable()).thenReturn(true);
+ return queue;
+ }
+
+ private EnqueueableMessage createMockEnqueueableMessage(long messageId, final StoredMessage<TestMessageMetaData> message)
+ {
+ EnqueueableMessage enqueueableMessage = mock(EnqueueableMessage.class);
+ when(enqueueableMessage.isPersistent()).thenReturn(true);
+ when(enqueueableMessage.getMessageNumber()).thenReturn(messageId);
+ when(enqueueableMessage.getStoredMessage()).thenReturn(message);
+ return enqueueableMessage;
+ }
+
private Record getTestRecord(long messageNumber)
{
UUID queueId1 = UUIDGenerator.generateRandomUUID();
@@ -121,77 +382,66 @@ public abstract class MessageStoreTestCase extends QpidTestCase
EnqueueableMessage message1 = mock(EnqueueableMessage.class);
when(message1.isPersistent()).thenReturn(true);
when(message1.getMessageNumber()).thenReturn(messageNumber);
- final StoredMessage storedMessage = mock(StoredMessage.class);
+ final StoredMessage<?> storedMessage = mock(StoredMessage.class);
when(storedMessage.getMessageNumber()).thenReturn(messageNumber);
when(message1.getStoredMessage()).thenReturn(storedMessage);
Record enqueueRecord = new TestRecord(queue1, message1);
return enqueueRecord;
}
- private static class TestRecord implements Record
+ private EnqueueableMessage createEnqueueableMessage(long messageId1)
{
- private TransactionLogResource _queue;
- private EnqueueableMessage _message;
+ final StoredMessage<TestMessageMetaData> message1 = _store.addMessage(new TestMessageMetaData(messageId1, 0));
+ StoreFuture flushFuture = message1.flushToStore();
+ flushFuture.waitForCompletion();
+ EnqueueableMessage enqueueableMessage1 = createMockEnqueueableMessage(messageId1, message1);
+ return enqueueableMessage1;
+ }
- public TestRecord(TransactionLogResource queue, EnqueueableMessage message)
+ private class MessageMetaDataMatcher extends ArgumentMatcher<StoredMessage<?>>
+ {
+ private long _messageNumber;
+
+ public MessageMetaDataMatcher(long messageNumber)
{
super();
- _queue = queue;
- _message = message;
+ _messageNumber = messageNumber;
}
- @Override
- public TransactionLogResource getResource()
+ public boolean matches(Object obj)
{
- return _queue;
+ return obj instanceof StoredMessage && ((StoredMessage<?>)obj).getMessageNumber() == _messageNumber;
}
+ }
- @Override
- public EnqueueableMessage getMessage()
- {
- return _message;
- }
+ private class QueueFilteringMessageInstanceHandler implements MessageInstanceHandler
+ {
+ private final UUID _queueId;
+ private final Set<Long> _enqueuedIds = new HashSet<Long>();
- @Override
- public int hashCode()
+ public QueueFilteringMessageInstanceHandler(UUID queueId)
{
- final int prime = 31;
- int result = 1;
- result = prime * result + ((_message == null) ? 0 : new Long(_message.getMessageNumber()).hashCode());
- result = prime * result + ((_queue == null) ? 0 : _queue.getId().hashCode());
- return result;
+ _queueId = queueId;
}
@Override
- public boolean equals(Object obj)
+ public boolean handle(UUID queueId, long messageId)
{
- if (this == obj)
- {
- return true;
- }
- if (obj == null)
- {
- return false;
- }
- if (!(obj instanceof Record))
+ if (queueId.equals(_queueId))
{
- return false;
+ if (_enqueuedIds.contains(messageId))
+ {
+ fail("Queue with id " + _queueId + " contains duplicate message ids");
+ }
+ _enqueuedIds.add(messageId);
}
- Record other = (Record) obj;
- if (_message == null && other.getMessage() != null)
- {
- return false;
- }
- if (_queue == null && other.getResource() != null)
- {
- return false;
- }
- if (_message.getMessageNumber() != other.getMessage().getMessageNumber())
- {
- return false;
- }
- return _queue.getId().equals(other.getResource().getId());
+ return true;
}
+ public Set<Long> getEnqueuedIds()
+ {
+ return _enqueuedIds;
+ }
}
+
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
index 32df355c07..bfa4e1d52e 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
@@ -20,15 +20,30 @@
*/
package org.apache.qpid.server.store;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.qpid.server.store.handler.MessageHandler;
+
/** A simple message store that stores the messages in a thread-safe structure in memory. */
public class TestMemoryMessageStore extends AbstractMemoryMessageStore
{
public static final String TYPE = "TestMemory";
- @Override
- public String getStoreType()
+ public int getMessageCount()
{
- return TYPE;
+ final AtomicInteger counter = new AtomicInteger();
+ visitMessages(new MessageHandler()
+ {
+
+ @Override
+ public boolean handle(StoredMessage<?> storedMessage)
+ {
+ counter.incrementAndGet();
+ return true;
+ }
+ });
+ return counter.get();
}
+
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java
index e14b41b221..6e55b468a6 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java
@@ -34,13 +34,20 @@ public class TestMessageMetaData implements StorableMessageMetaData
private static final TestMessageMetaDataType TYPE = new TestMessageMetaDataType();
- private int _contentSize;
- private long _messageId;
+ private final int _contentSize;
+ private final long _messageId;
+ private final boolean _persistent;
public TestMessageMetaData(long messageId, int contentSize)
{
+ this(messageId, contentSize, true);
+ }
+
+ public TestMessageMetaData(long messageId, int contentSize, boolean persistent)
+ {
_contentSize = contentSize;
_messageId = messageId;
+ _persistent = persistent;
}
@Override
@@ -59,7 +66,7 @@ public class TestMessageMetaData implements StorableMessageMetaData
}
@Override
- public MessageMetaDataType getType()
+ public MessageMetaDataType<TestMessageMetaData> getType()
{
return TYPE;
}
@@ -67,7 +74,7 @@ public class TestMessageMetaData implements StorableMessageMetaData
@Override
public boolean isPersistent()
{
- return true;
+ return _persistent;
}
@Override
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
index 5622383f3f..e5c94cf66b 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
@@ -70,7 +70,22 @@ public class TestMessageMetaDataType implements MessageMetaDataType<TestMessageM
private static class TestServerMessage implements ServerMessage<TestMessageMetaData>
{
- private StoredMessage<TestMessageMetaData> _storedMsg;
+ private final StoredMessage<TestMessageMetaData> _storedMsg;
+
+ private final MessageReference<ServerMessage> _messageReference = new MessageReference<ServerMessage>()
+ {
+
+ @Override
+ public ServerMessage getMessage()
+ {
+ return TestServerMessage.this;
+ }
+
+ @Override
+ public void release()
+ {
+ }
+ };
public TestServerMessage(StoredMessage<TestMessageMetaData> storedMsg)
{
@@ -115,7 +130,7 @@ public class TestMessageMetaDataType implements MessageMetaDataType<TestMessageM
@Override
public long getMessageNumber()
{
- return 0;
+ return _storedMsg.getMessageNumber();
}
@Override
@@ -140,13 +155,54 @@ public class TestMessageMetaDataType implements MessageMetaDataType<TestMessageM
@Override
public boolean isPersistent()
{
- return false;
+ return _storedMsg.getMetaData().isPersistent();
}
@Override
public MessageReference newReference()
{
- return null;
+ return _messageReference;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((_storedMsg == null) ? 0 : _storedMsg.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj)
+ {
+ return true;
+ }
+ if (obj == null)
+ {
+ return false;
+ }
+ if (getClass() != obj.getClass())
+ {
+ return false;
+ }
+ TestServerMessage other = (TestServerMessage) obj;
+ if (_storedMsg == null)
+ {
+ if (other._storedMsg != null)
+ {
+ return false;
+ }
+ }
+ else if (!_storedMsg.equals(other._storedMsg))
+ {
+ return false;
+ }
+ return true;
}
+
+
}
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestRecord.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestRecord.java
new file mode 100644
index 0000000000..668d9d5242
--- /dev/null
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestRecord.java
@@ -0,0 +1,91 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.store.Transaction.Record;
+
+public class TestRecord implements Record
+{
+ private TransactionLogResource _queue;
+ private EnqueueableMessage _message;
+
+ public TestRecord(TransactionLogResource queue, EnqueueableMessage message)
+ {
+ super();
+ _queue = queue;
+ _message = message;
+ }
+
+ @Override
+ public TransactionLogResource getResource()
+ {
+ return _queue;
+ }
+
+ @Override
+ public EnqueueableMessage getMessage()
+ {
+ return _message;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((_message == null) ? 0 : new Long(_message.getMessageNumber()).hashCode());
+ result = prime * result + ((_queue == null) ? 0 : _queue.getId().hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj)
+ {
+ return true;
+ }
+ if (obj == null)
+ {
+ return false;
+ }
+ if (!(obj instanceof Record))
+ {
+ return false;
+ }
+ Record other = (Record) obj;
+ if (_message == null && other.getMessage() != null)
+ {
+ return false;
+ }
+ if (_queue == null && other.getResource() != null)
+ {
+ return false;
+ }
+ if (_message.getMessageNumber() != other.getMessage().getMessageNumber())
+ {
+ return false;
+ }
+ return _queue.getId().equals(other.getResource().getId());
+ }
+
+} \ No newline at end of file
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
deleted file mode 100644
index 7d4dcd0280..0000000000
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store;
-
-import org.apache.qpid.server.message.EnqueueableMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Adds some extra methods to the memory message store for testing purposes.
- */
-public class TestableMemoryMessageStore extends TestMemoryMessageStore
-{
- public static final String TYPE = "TestableMemory";
- private final Map<Long, AMQQueue> _messages = new HashMap<Long, AMQQueue>();
- private final AtomicInteger _messageCount = new AtomicInteger(0);
-
- @Override
- public StoredMessage addMessage(StorableMessageMetaData metaData)
- {
- return new TestableStoredMessage(super.addMessage(metaData));
- }
-
- public int getMessageCount()
- {
- return _messageCount.get();
- }
-
- public Map<Long, AMQQueue> getMessages()
- {
- return _messages;
- }
-
- private class TestableTransaction implements Transaction
- {
- @Override
- public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message)
- {
- getMessages().put(message.getMessageNumber(), (AMQQueue)queue);
- }
-
- @Override
- public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message)
- {
- getMessages().remove(message.getMessageNumber());
- }
-
- @Override
- public void commitTran()
- {
- }
-
- @Override
- public StoreFuture commitTranAsync()
- {
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
- public void abortTran()
- {
- }
-
- public void removeXid(long format, byte[] globalId, byte[] branchId)
- {
- }
-
- public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
- {
- }
- }
-
-
- @Override
- public Transaction newTransaction()
- {
- return new TestableTransaction();
- }
-
-
- private class TestableStoredMessage implements StoredMessage
- {
- private final StoredMessage _storedMessage;
-
- public TestableStoredMessage(StoredMessage storedMessage)
- {
- _messageCount.incrementAndGet();
- _storedMessage = storedMessage;
- }
-
- public StorableMessageMetaData getMetaData()
- {
- return _storedMessage.getMetaData();
- }
-
- public long getMessageNumber()
- {
- return _storedMessage.getMessageNumber();
- }
-
- public void addContent(int offsetInMessage, ByteBuffer src)
- {
- _storedMessage.addContent(offsetInMessage, src);
- }
-
- public int getContent(int offsetInMessage, ByteBuffer dst)
- {
- return _storedMessage.getContent(offsetInMessage, dst);
- }
-
-
- public ByteBuffer getContent(int offsetInMessage, int size)
- {
- return _storedMessage.getContent(offsetInMessage, size);
- }
-
- public StoreFuture flushToStore()
- {
- return _storedMessage.flushToStore();
- }
-
- public void remove()
- {
- _storedMessage.remove();
- _messageCount.decrementAndGet();
- }
- }
-}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
index ab18c8f41d..da868a01f1 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
@@ -33,14 +33,14 @@ import org.apache.qpid.server.util.ServerScopedRuntimeException;
* Mock implementation of a (Store) Transaction allow its state to be observed.
* Also provide a factory method to produce TestTransactionLog objects suitable
* for unit test use.
- *
+ *
*/
class MockStoreTransaction implements Transaction
{
enum TransactionState {NOT_STARTED, STARTED, COMMITTED, ABORTED};
private TransactionState _state = TransactionState.NOT_STARTED;
-
+
private int _numberOfEnqueuedMessages = 0;
private int _numberOfDequeuedMessages = 0;
private boolean _throwExceptionOnQueueOp;
@@ -52,7 +52,7 @@ class MockStoreTransaction implements Transaction
public void setState(TransactionState state)
{
- _state = state;
+ _state = state;
}
public TransactionState getState()
@@ -64,10 +64,10 @@ class MockStoreTransaction implements Transaction
{
if (_throwExceptionOnQueueOp)
{
-
+
throw new ServerScopedRuntimeException("Mocked exception");
}
-
+
_numberOfEnqueuedMessages++;
}
@@ -87,7 +87,7 @@ class MockStoreTransaction implements Transaction
{
throw new ServerScopedRuntimeException("Mocked exception");
}
-
+
_numberOfDequeuedMessages++;
}
@@ -124,12 +124,6 @@ class MockStoreTransaction implements Transaction
storeTransaction.setState(TransactionState.STARTED);
return storeTransaction;
}
-
- @Override
- public String getStoreType()
- {
- return "TEST";
- }
};
}
} \ No newline at end of file
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
index fd56f3fa1c..1b131a18e1 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
@@ -44,7 +44,6 @@ import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestMemoryMessageStore;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.plugin.PluggableFactoryLoader;
@@ -110,7 +109,7 @@ public class BrokerTestHelper
when(virtualHost.getAttribute(org.apache.qpid.server.model.VirtualHost.TYPE)).thenReturn(StandardVirtualHostFactory.TYPE);
Map<String, Object> messageStoreSettings = new HashMap<String, Object>();
- messageStoreSettings.put(MessageStore.STORE_TYPE, TestableMemoryMessageStore.TYPE);
+ messageStoreSettings.put(MessageStore.STORE_TYPE, TestMemoryMessageStore.TYPE);
when(virtualHost.getMessageStoreSettings()).thenReturn(messageStoreSettings);
when(virtualHost.getName()).thenReturn(name);
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MessageStoreRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MessageStoreRecovererTest.java
new file mode 100644
index 0000000000..ce5616b9ca
--- /dev/null
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MessageStoreRecovererTest.java
@@ -0,0 +1,414 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.UUID;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
+import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.NullMessageStore;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TestMessageMetaData;
+import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.store.Transaction.Record;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
+import org.apache.qpid.server.txn.DtxBranch;
+import org.apache.qpid.server.txn.DtxRegistry;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.transport.Xid;
+import org.mockito.ArgumentMatcher;
+
+public class MessageStoreRecovererTest extends TestCase
+{
+ private VirtualHost _virtualHost;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ _virtualHost = mock(VirtualHost.class);
+ when(_virtualHost.getEventLogger()).thenReturn(new EventLogger());
+
+ }
+
+ @SuppressWarnings("unchecked")
+ public void testRecoveryOfSingleMessageOnSingleQueue()
+ {
+ final AMQQueue<?> queue = createRegisteredMockQueue();
+
+ final long messageId = 1;
+ final StoredMessage<StorableMessageMetaData> storedMessage = createMockStoredMessage(messageId);
+
+ MessageStore store = new NullMessageStore()
+ {
+ @Override
+ public void visitMessages(MessageHandler handler) throws StoreException
+ {
+ handler.handle(storedMessage);
+ }
+
+ @Override
+ public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
+ {
+ handler.handle(queue.getId(), messageId);
+ }
+ };
+
+ when(_virtualHost.getMessageStore()).thenReturn(store);
+
+ MessageStoreRecoverer recoverer = new MessageStoreRecoverer(_virtualHost, mock(MessageStoreLogSubject.class));
+ recoverer.recover();
+
+ ServerMessage<?> message = storedMessage.getMetaData().getType().createMessage(storedMessage);
+ verify(queue, times(1)).enqueue(eq(message), (Action<? super MessageInstance>)isNull());
+ }
+
+ @SuppressWarnings("unchecked")
+ public void testRecoveryOfMessageInstanceForNonExistingMessage()
+ {
+ final AMQQueue<?> queue = createRegisteredMockQueue();
+
+ final long messageId = 1;
+ final Transaction transaction = mock(Transaction.class);
+
+ MessageStore store = new NullMessageStore()
+ {
+ @Override
+ public void visitMessages(MessageHandler handler) throws StoreException
+ {
+ // no message to visit
+ }
+
+ @Override
+ public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
+ {
+ handler.handle(queue.getId(), messageId);
+ }
+
+ @Override
+ public Transaction newTransaction()
+ {
+ return transaction;
+ }
+ };
+
+ when(_virtualHost.getMessageStore()).thenReturn(store);
+
+ MessageStoreRecoverer recoverer = new MessageStoreRecoverer(_virtualHost, mock(MessageStoreLogSubject.class));
+ recoverer.recover();
+
+ verify(queue, never()).enqueue(any(ServerMessage.class), any(Action.class));
+ verify(transaction).dequeueMessage(same(queue), argThat(new MessageNumberMatcher(messageId)));
+ verify(transaction, times(1)).commitTranAsync();
+ }
+
+ public void testRecoveryOfMessageInstanceForNonExistingQueue()
+ {
+ final UUID queueId = UUID.randomUUID();
+ final Transaction transaction = mock(Transaction.class);
+ final long messageId = 1;
+ final StoredMessage<StorableMessageMetaData> storedMessage = createMockStoredMessage(messageId);
+
+ MessageStore store = new NullMessageStore()
+ {
+ @Override
+ public void visitMessages(MessageHandler handler) throws StoreException
+ {
+ handler.handle(storedMessage);
+ }
+
+ @Override
+ public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
+ {
+ handler.handle(queueId, messageId);
+ }
+
+ @Override
+ public Transaction newTransaction()
+ {
+ return transaction;
+ }
+ };
+
+ when(_virtualHost.getMessageStore()).thenReturn(store);
+
+ MessageStoreRecoverer recoverer = new MessageStoreRecoverer(_virtualHost, mock(MessageStoreLogSubject.class));
+ recoverer.recover();
+
+ verify(transaction).dequeueMessage(argThat(new QueueIdMatcher(queueId)), argThat(new MessageNumberMatcher(messageId)));
+ verify(transaction, times(1)).commitTranAsync();
+ }
+
+ public void testRecoveryDeletesOrphanMessages()
+ {
+
+ final long messageId = 1;
+ final StoredMessage<StorableMessageMetaData> storedMessage = createMockStoredMessage(messageId);
+
+ MessageStore store = new NullMessageStore()
+ {
+ @Override
+ public void visitMessages(MessageHandler handler) throws StoreException
+ {
+ handler.handle(storedMessage);
+ }
+
+ @Override
+ public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
+ {
+ // No messages instances
+ }
+ };
+
+ when(_virtualHost.getMessageStore()).thenReturn(store);
+
+ MessageStoreRecoverer recoverer = new MessageStoreRecoverer(_virtualHost, mock(MessageStoreLogSubject.class));
+ recoverer.recover();
+
+ verify(storedMessage, times(1)).remove();
+ }
+
+ @SuppressWarnings("unchecked")
+ public void testRecoveryOfSingleEnqueueWithDistributedTransaction()
+ {
+ AMQQueue<?> queue = createRegisteredMockQueue();
+
+ final Transaction transaction = mock(Transaction.class);
+
+ final StoredMessage<StorableMessageMetaData> storedMessage = createMockStoredMessage(1);
+ long messageId = storedMessage.getMessageNumber();
+
+ EnqueueableMessage enqueueableMessage = createMockEnqueueableMessage(messageId, storedMessage);
+ Record enqueueRecord = createMockRecord(queue, enqueueableMessage);
+
+ final long format = 1;
+ final byte[] globalId = new byte[] {0};
+ final byte[] branchId = new byte[] {0};
+ final Record[] enqueues = { enqueueRecord };
+ final Record[] dequeues = {};
+
+ MessageStore store = new NullMessageStore()
+ {
+ @Override
+ public void visitMessages(MessageHandler handler) throws StoreException
+ {
+ handler.handle(storedMessage);
+ }
+
+ @Override
+ public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
+ {
+ // No messages instances
+ }
+
+ @Override
+ public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
+ {
+ handler.handle(format, globalId, branchId, enqueues, dequeues);
+ }
+
+ @Override
+ public Transaction newTransaction()
+ {
+ return transaction;
+ }
+ };
+
+ DtxRegistry dtxRegistry = new DtxRegistry();
+
+ when(_virtualHost.getMessageStore()).thenReturn(store);
+ when(_virtualHost.getDtxRegistry()).thenReturn(dtxRegistry);
+
+ MessageStoreRecoverer recoverer = new MessageStoreRecoverer(_virtualHost, mock(MessageStoreLogSubject.class));
+ recoverer.recover();
+
+ DtxBranch branch = dtxRegistry.getBranch(new Xid(format, globalId, branchId));
+ assertNotNull("Expected dtx branch to be created", branch);
+ branch.commit();
+
+ ServerMessage<?> message = storedMessage.getMetaData().getType().createMessage(storedMessage);
+ verify(queue, times(1)).enqueue(eq(message), (Action<? super MessageInstance>)isNull());
+ verify(transaction).commitTran();
+ }
+
+ public void testRecoveryOfSingleDequeueWithDistributedTransaction()
+ {
+ final AMQQueue<?> queue = createRegisteredMockQueue();
+
+
+ final Transaction transaction = mock(Transaction.class);
+
+ final StoredMessage<StorableMessageMetaData> storedMessage = createMockStoredMessage(1);
+ final long messageId = storedMessage.getMessageNumber();
+
+ EnqueueableMessage enqueueableMessage = createMockEnqueueableMessage(messageId, storedMessage);
+ Record dequeueRecord = createMockRecord(queue, enqueueableMessage);
+
+ QueueEntry queueEntry = mock(QueueEntry.class);
+ when(queue.getMessageOnTheQueue(messageId)).thenReturn(queueEntry);
+
+ final long format = 1;
+ final byte[] globalId = new byte[] {0};
+ final byte[] branchId = new byte[] {0};
+ final Record[] enqueues = {};
+ final Record[] dequeues = { dequeueRecord };
+
+ MessageStore store = new NullMessageStore()
+ {
+ @Override
+ public void visitMessages(MessageHandler handler) throws StoreException
+ {
+ handler.handle(storedMessage);
+ }
+
+ @Override
+ public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
+ {
+ // We need the message to be enqueued onto the queue so that later the distributed transaction
+ // can dequeue it.
+ handler.handle(queue.getId(), messageId);
+ }
+
+ @Override
+ public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
+ {
+ handler.handle(format, globalId, branchId, enqueues, dequeues);
+ }
+
+ @Override
+ public Transaction newTransaction()
+ {
+ return transaction;
+ }
+ };
+
+ DtxRegistry dtxRegistry = new DtxRegistry();
+
+ when(_virtualHost.getMessageStore()).thenReturn(store);
+ when(_virtualHost.getDtxRegistry()).thenReturn(dtxRegistry);
+
+ MessageStoreRecoverer recoverer = new MessageStoreRecoverer(_virtualHost, mock(MessageStoreLogSubject.class));
+ recoverer.recover();
+
+ DtxBranch branch = dtxRegistry.getBranch(new Xid(format, globalId, branchId));
+ assertNotNull("Expected dtx branch to be created", branch);
+ branch.commit();
+
+ verify(queueEntry, times(1)).delete();
+ verify(transaction).commitTran();
+ }
+
+
+ protected Record createMockRecord(AMQQueue<?> queue, EnqueueableMessage enqueueableMessage)
+ {
+ Record enqueueRecord = mock(Record.class);
+ when(enqueueRecord.getMessage()).thenReturn(enqueueableMessage);
+ when(enqueueRecord.getResource()).thenReturn(queue);
+ return enqueueRecord;
+ }
+
+ protected EnqueueableMessage createMockEnqueueableMessage(long messageId,
+ final StoredMessage<StorableMessageMetaData> storedMessage)
+ {
+ EnqueueableMessage enqueueableMessage = mock(EnqueueableMessage.class);
+ when(enqueueableMessage.getMessageNumber()).thenReturn(messageId);
+ when(enqueueableMessage.getStoredMessage()).thenReturn(storedMessage);
+ return enqueueableMessage;
+ }
+
+ private StoredMessage<StorableMessageMetaData> createMockStoredMessage(final long messageId)
+ {
+ TestMessageMetaData metaData = new TestMessageMetaData(messageId, 0);
+
+ @SuppressWarnings("unchecked")
+ final StoredMessage<StorableMessageMetaData> storedMessage = mock(StoredMessage.class);
+ when(storedMessage.getMessageNumber()).thenReturn(messageId);
+ when(storedMessage.getMetaData()).thenReturn(metaData);
+ return storedMessage;
+ }
+
+ private AMQQueue<?> createRegisteredMockQueue()
+ {
+ AMQQueue<?> queue = mock(AMQQueue.class);
+ final UUID queueId = UUID.randomUUID();
+ when(queue.getId()).thenReturn(queueId);
+ when(queue.getName()).thenReturn("test-queue");
+ when(_virtualHost.getQueue(queueId)).thenReturn(queue);
+ return queue;
+ }
+
+
+ private final class QueueIdMatcher extends ArgumentMatcher<TransactionLogResource>
+ {
+ private UUID _queueId;
+ public QueueIdMatcher(UUID queueId)
+ {
+ _queueId = queueId;
+ }
+
+ @Override
+ public boolean matches(Object argument)
+ {
+ return argument instanceof TransactionLogResource && _queueId.equals( ((TransactionLogResource)argument).getId() );
+ }
+ }
+
+ private final class MessageNumberMatcher extends ArgumentMatcher<EnqueueableMessage>
+ {
+ private final long _messageId;
+
+ private MessageNumberMatcher(long messageId)
+ {
+ _messageId = messageId;
+ }
+
+ @Override
+ public boolean matches(Object argument)
+ {
+ return argument instanceof EnqueueableMessage && ((EnqueueableMessage)argument).getMessageNumber() == _messageId;
+ }
+ }
+}
diff --git a/qpid/java/broker-core/src/test/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory b/qpid/java/broker-core/src/test/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory
index 48241614d8..9512fb8117 100644
--- a/qpid/java/broker-core/src/test/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory
+++ b/qpid/java/broker-core/src/test/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory
@@ -17,4 +17,3 @@
# under the License.
#
org.apache.qpid.server.store.TestMemoryMessageStoreFactory
-org.apache.qpid.server.store.TestableMemoryMessageStoreFactory \ No newline at end of file
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
index e5cfced4e2..6b697f8221 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
@@ -32,7 +32,7 @@ import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.TestMemoryMessageStore;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.BrokerTestHelper;
@@ -53,7 +53,7 @@ public class AckTest extends QpidTestCase
private AMQProtocolSession _protocolSession;
- private TestableMemoryMessageStore _messageStore;
+ private TestMemoryMessageStore _messageStore;
private AMQChannel _channel;
@@ -71,7 +71,7 @@ public class AckTest extends QpidTestCase
_protocolSession = _channel.getProtocolSession();
_virtualHost = _protocolSession.getVirtualHost();
_queue = BrokerTestHelper.createQueue(getTestName(), _virtualHost);
- _messageStore = (TestableMemoryMessageStore)_virtualHost.getMessageStore();
+ _messageStore = (TestMemoryMessageStore)_virtualHost.getMessageStore();
}
@Override
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
index 9e551c9c01..399564f5a4 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
@@ -20,13 +20,11 @@
*/
package org.apache.qpid.server.protocol.v0_8;
-
-import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.TestMemoryMessageStore;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -175,7 +173,7 @@ public class AcknowledgeTest extends QpidTestCase
private void checkStoreContents(int messageCount)
{
- assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getMessageCount());
+ assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestMemoryMessageStore) _messageStore).getMessageCount());
}
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
index e213aa8a20..520e35fe39 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
@@ -27,7 +27,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.TestMemoryMessageStore;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -133,7 +133,7 @@ public class QueueBrowserUsesNoAckTest extends QpidTestCase
private void checkStoreContents(int messageCount)
{
- assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getMessageCount());
+ assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestMemoryMessageStore) _messageStore).getMessageCount());
}
private AMQShortString browse(AMQChannel channel, AMQQueue queue) throws Exception
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java
index 227e9794da..e9c37e7b42 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java
@@ -27,7 +27,7 @@ import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.TestMemoryMessageStore;
import org.apache.qpid.test.utils.QpidTestCase;
/**
@@ -35,12 +35,12 @@ import org.apache.qpid.test.utils.QpidTestCase;
*/
public class ReferenceCountingTest extends QpidTestCase
{
- private TestableMemoryMessageStore _store;
+ private TestMemoryMessageStore _store;
protected void setUp() throws Exception
{
- _store = new TestableMemoryMessageStore();
+ _store = new TestMemoryMessageStore();
}
/**
@@ -83,7 +83,7 @@ public class ReferenceCountingTest extends QpidTestCase
MessageMetaData mmd = new MessageMetaData(info, chb);
StoredMessage storedMessage = _store.addMessage(mmd);
-
+ storedMessage.flushToStore();
AMQMessage message = new AMQMessage(storedMessage);
@@ -141,6 +141,7 @@ public class ReferenceCountingTest extends QpidTestCase
MessageMetaData mmd = new MessageMetaData(info, chb);
StoredMessage storedMessage = _store.addMessage(mmd);
+ storedMessage.flushToStore();
AMQMessage message = new AMQMessage(storedMessage);
diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
index 9202672ea6..d682076350 100644
--- a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
+++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
@@ -417,12 +417,6 @@ public class DerbyMessageStore extends AbstractJDBCMessageStore implements Messa
}
@Override
- public String getStoreType()
- {
- return TYPE;
- }
-
- @Override
public void onDelete()
{
if (_logger.isDebugEnabled())
diff --git a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java
index 20de4ea339..9a2d945494 100644
--- a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java
+++ b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java
@@ -70,10 +70,13 @@ public class DerbyMessageStoreTest extends MessageStoreTestCase
private void deleteStoreIfExists()
{
- File location = new File(_storeLocation);
- if (location.exists())
+ if (_storeLocation != null)
{
- FileUtils.delete(location, true);
+ File location = new File(_storeLocation);
+ if (location.exists())
+ {
+ FileUtils.delete(location, true);
+ }
}
}
diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
index 4ca9cb2395..509184d243 100644
--- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
+++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
@@ -338,12 +338,6 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag
}
@Override
- public String getStoreType()
- {
- return TYPE;
- }
-
- @Override
protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException
{
if(_useBytesMethodsForBlob)
diff --git a/qpid/java/broker-plugins/memory-store/pom.xml b/qpid/java/broker-plugins/memory-store/pom.xml
index b71574384e..8bec7ef981 100644
--- a/qpid/java/broker-plugins/memory-store/pom.xml
+++ b/qpid/java/broker-plugins/memory-store/pom.xml
@@ -36,6 +36,22 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
+
+ <!-- test dependencies -->
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-test-utils</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-broker-core</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index 61fef91e83..c8dd2e6e61 100644
--- a/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -26,9 +26,4 @@ public class MemoryMessageStore extends AbstractMemoryMessageStore
{
public static final String TYPE = "Memory";
- @Override
- public String getStoreType()
- {
- return TYPE;
- }
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java b/qpid/java/broker-plugins/memory-store/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java
index ba9b7c155e..8fd3cbb1fe 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java
+++ b/qpid/java/broker-plugins/memory-store/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java
@@ -18,30 +18,30 @@
* under the License.
*
*/
-
package org.apache.qpid.server.store;
+import java.util.Collections;
import java.util.Map;
-import org.apache.qpid.server.plugin.MessageStoreFactory;
-
-public class TestableMemoryMessageStoreFactory implements MessageStoreFactory
+public class MemoryMessageStoreTest extends MessageStoreTestCase
{
+
@Override
- public String getType()
+ protected Map<String, Object> getStoreSettings() throws Exception
{
- return TestableMemoryMessageStore.TYPE;
+ return Collections.<String, Object>emptyMap();
}
@Override
- public MessageStore createMessageStore()
+ protected MessageStore createMessageStore()
{
- return new TestableMemoryMessageStore();
+ return new MemoryMessageStore();
}
@Override
- public void validateAttributes(Map<String, Object> attributes)
+ protected void reopenStore() throws Exception
{
+ // cannot re-open memory message store as it is not persistent
}
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
index 406a20d557..0993783e54 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
@@ -27,7 +27,7 @@ import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.server.model.ConfiguredObject;
-public class QuotaMessageStore extends NullMessageStore
+public class QuotaMessageStore extends AbstractMemoryMessageStore
{
public static final String TYPE = "QuotaMessageStore";
private final AtomicLong _messageId = new AtomicLong(1);
@@ -155,10 +155,4 @@ public class QuotaMessageStore extends NullMessageStore
}
}
}
-
- @Override
- public String getStoreType()
- {
- return TYPE;
- }
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
index e20196c98d..95bffa89aa 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
@@ -31,6 +31,10 @@ import org.apache.log4j.Logger;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.plugin.MessageStoreFactory;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
public class SlowMessageStore implements MessageStore, DurableConfigurationStore
{
@@ -63,12 +67,6 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
}
}
- @Override
- public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler)
- {
- _realDurableConfigurationStore.recoverConfigurationStore(recoveryHandler);
- }
-
private void configureDelays(Map<String, Object> delays)
{
@@ -294,12 +292,6 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
}
@Override
- public void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler)
- {
- _realMessageStore.recoverMessageStore(messageRecoveryHandler, transactionLogRecoveryHandler);
- }
-
- @Override
public void addEventListener(EventListener eventListener, Event... events)
{
if (_realMessageStore == null)
@@ -319,15 +311,33 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
}
@Override
- public String getStoreType()
+ public void onDelete()
{
- return TYPE;
+ _realMessageStore.onDelete();
}
@Override
- public void onDelete()
+ public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws StoreException
{
- _realMessageStore.onDelete();
+ _realDurableConfigurationStore.visitConfiguredObjectRecords(handler);
+ }
+
+ @Override
+ public void visitMessages(MessageHandler handler) throws StoreException
+ {
+ _realMessageStore.visitMessages(handler);
+ }
+
+ @Override
+ public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
+ {
+ _realMessageStore.visitMessageInstances(handler);
+ }
+
+ @Override
+ public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
+ {
+ _realMessageStore.visitDistributedTransactions(handler);
}
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
index d89f5cc66e..7db8210753 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.store;
-
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -46,7 +45,6 @@ import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.Binding;
-import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
@@ -68,14 +66,16 @@ import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.FileUtils;
/**
- * This tests the MessageStores by using the available interfaces.
+ *
+ * Virtualhost/store integration test. Tests for correct behaviour of the message store
+ * when exercised via the higher level functions of the store.
*
* For persistent stores, it validates that Exchanges, Queues, Bindings and
* Messages are persisted and recovered correctly.
*/
-public class MessageStoreTest extends QpidTestCase
+public class VirtualHostMessageStoreTest extends QpidTestCase
{
- private static final Logger _logger = Logger.getLogger(MessageStoreTest.class);
+ private static final Logger _logger = Logger.getLogger(VirtualHostMessageStoreTest.class);
public static final int DEFAULT_PRIORTY_LEVEL = 5;
public static final String SELECTOR_VALUE = "Test = 'MST'";
@@ -103,8 +103,7 @@ public class MessageStoreTest extends QpidTestCase
private String queueOwner = "MST";
private VirtualHost _virtualHost;
- private org.apache.qpid.server.model.VirtualHost _virtualHostModel;
- private Broker _broker;
+ private org.apache.qpid.server.model.VirtualHost<?> _virtualHostModel;
private String _storePath;
public void setUp() throws Exception
@@ -120,6 +119,7 @@ public class MessageStoreTest extends QpidTestCase
messageStoreSettings.put(MessageStore.STORE_TYPE, getTestProfileMessageStoreType());
_virtualHostModel = mock(org.apache.qpid.server.model.VirtualHost.class);
+
when(_virtualHostModel.getMessageStoreSettings()).thenReturn(messageStoreSettings);
when(_virtualHostModel.getAttribute(eq(org.apache.qpid.server.model.VirtualHost.TYPE))).thenReturn(StandardVirtualHostFactory.TYPE);
when(_virtualHostModel.getAttribute(eq(org.apache.qpid.server.model.VirtualHost.NAME))).thenReturn(hostName);
@@ -128,8 +128,6 @@ public class MessageStoreTest extends QpidTestCase
cleanup(new File(_storePath));
- _broker = BrokerTestHelper.createBrokerMock();
-
reloadVirtualHost();
}
@@ -201,10 +199,6 @@ public class MessageStoreTest extends QpidTestCase
assertTrue("Virtualhost has not changed, reload was not successful", original != getVirtualHost());
}
- /**
- * Old MessageStoreTest segment which runs against both persistent and non-persistent stores
- * creating queues, exchanges and bindings and then verifying message delivery to them.
- */
public void testQueueExchangeAndBindingCreation() throws Exception
{
assertEquals("Should not be any existing queues", 0, getVirtualHost().getQueues().size());
@@ -213,15 +207,15 @@ public class MessageStoreTest extends QpidTestCase
createAllTopicQueues();
//Register Non-Durable DirectExchange
- ExchangeImpl nonDurableExchange = createExchange(DirectExchange.TYPE, nonDurableExchangeName, false);
+ ExchangeImpl<?> nonDurableExchange = createExchange(DirectExchange.TYPE, nonDurableExchangeName, false);
bindAllQueuesToExchange(nonDurableExchange, directRouting);
//Register DirectExchange
- ExchangeImpl directExchange = createExchange(DirectExchange.TYPE, directExchangeName, true);
+ ExchangeImpl<?> directExchange = createExchange(DirectExchange.TYPE, directExchangeName, true);
bindAllQueuesToExchange(directExchange, directRouting);
//Register TopicExchange
- ExchangeImpl topicExchange = createExchange(TopicExchange.TYPE, topicExchangeName, true);
+ ExchangeImpl<?> topicExchange = createExchange(TopicExchange.TYPE, topicExchangeName, true);
bindAllTopicQueuesToExchange(topicExchange, topicRouting);
//Send Message To NonDurable direct Exchange = persistent
@@ -248,12 +242,6 @@ public class MessageStoreTest extends QpidTestCase
10, getVirtualHost().getQueues().size());
}
- /**
- * Tests message persistence by running the testQueueExchangeAndBindingCreation() method above
- * before reloading the virtual host and ensuring that the persistent messages were restored.
- *
- * More specific testing of message persistence is left to store-specific unit testing.
- */
public void testMessagePersistence() throws Exception
{
testQueueExchangeAndBindingCreation();
@@ -346,7 +334,7 @@ public class MessageStoreTest extends QpidTestCase
1, getVirtualHost().getQueues().size());
//test that removing the queue means it is not recovered next time
- final AMQQueue queue = getVirtualHost().getQueue(durableQueueName);
+ final AMQQueue<?> queue = getVirtualHost().getQueue(durableQueueName);
DurableConfigurationStoreHelper.removeQueue(getVirtualHost().getDurableConfigurationStore(),queue);
reloadVirtualHost();
@@ -397,7 +385,7 @@ public class MessageStoreTest extends QpidTestCase
origExchangeCount + 1, getVirtualHost().getExchanges().size());
//test that removing the exchange means it is not recovered next time
- final ExchangeImpl exchange = getVirtualHost().getExchange(directExchangeName);
+ final ExchangeImpl<?> exchange = getVirtualHost().getExchange(directExchangeName);
DurableConfigurationStoreHelper.removeExchange(getVirtualHost().getDurableConfigurationStore(), exchange);
reloadVirtualHost();
@@ -423,9 +411,9 @@ public class MessageStoreTest extends QpidTestCase
Map<String, ExchangeImpl<?>> exchanges = createExchanges();
- ExchangeImpl nonDurableExchange = exchanges.get(nonDurableExchangeName);
- ExchangeImpl directExchange = exchanges.get(directExchangeName);
- ExchangeImpl topicExchange = exchanges.get(topicExchangeName);
+ ExchangeImpl<?> nonDurableExchange = exchanges.get(nonDurableExchangeName);
+ ExchangeImpl<?> directExchange = exchanges.get(directExchangeName);
+ ExchangeImpl<?> topicExchange = exchanges.get(topicExchangeName);
bindAllQueuesToExchange(nonDurableExchange, directRouting);
bindAllQueuesToExchange(directExchange, directRouting);
@@ -449,7 +437,7 @@ public class MessageStoreTest extends QpidTestCase
public void testDurableBindingRemoval() throws Exception
{
//create durable queue and exchange, bind them
- ExchangeImpl exch = createExchange(DirectExchange.TYPE, directExchangeName, true);
+ ExchangeImpl<?> exch = createExchange(DirectExchange.TYPE, directExchangeName, true);
createQueue(durableQueueName, false, true, false, false);
bindQueueToExchange(exch, directRouting, getVirtualHost().getQueue(durableQueueName), false);
@@ -482,7 +470,7 @@ public class MessageStoreTest extends QpidTestCase
private void validateExchanges(int originalNumExchanges, Map<String, ExchangeImpl<?>> oldExchanges)
{
Collection<ExchangeImpl<?>> exchanges = getVirtualHost().getExchanges();
- Collection<String> exchangeNames = new ArrayList(exchanges.size());
+ Collection<String> exchangeNames = new ArrayList<String>(exchanges.size());
for(ExchangeImpl<?> exchange : exchanges)
{
exchangeNames.add(exchange.getName());
@@ -506,6 +494,7 @@ public class MessageStoreTest extends QpidTestCase
}
/** Validates the Durable queues and their properties are as expected following recovery */
+ @SuppressWarnings("unchecked")
private void validateBindingProperties()
{
@@ -526,11 +515,11 @@ public class MessageStoreTest extends QpidTestCase
* @param bindings the set of bindings to validate
* @param useSelectors if set, check the binding has a JMS_SELECTOR argument and the correct value for it
*/
- private void validateBindingProperties(Collection<? extends Binding> bindings, boolean useSelectors)
+ private void validateBindingProperties(Collection<? extends Binding<?>> bindings, boolean useSelectors)
{
assertEquals("Each queue should only be bound once.", 1, bindings.size());
- Binding binding = bindings.iterator().next();
+ Binding<?> binding = bindings.iterator().next();
if (useSelectors)
{
@@ -543,13 +532,13 @@ public class MessageStoreTest extends QpidTestCase
private void setQueueExclusivity(boolean exclusive) throws MessageSource.ExistingConsumerPreventsExclusive
{
- AMQQueue queue = getVirtualHost().getQueue(durableExclusiveQueueName);
+ AMQQueue<?> queue = getVirtualHost().getQueue(durableExclusiveQueueName);
queue.setExclusivityPolicy(exclusive ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.NONE);
}
private void validateQueueExclusivityProperty(boolean expected)
{
- AMQQueue queue = getVirtualHost().getQueue(durableExclusiveQueueName);
+ AMQQueue<?> queue = getVirtualHost().getQueue(durableExclusiveQueueName);
assertEquals("Queue exclusivity was incorrect", queue.isExclusive(), expected);
}
@@ -565,7 +554,7 @@ public class MessageStoreTest extends QpidTestCase
validateQueueProperties(getVirtualHost().getQueue(durableLastValueQueueName), false, true, true, true);
}
- private void validateQueueProperties(AMQQueue queue, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue)
+ private void validateQueueProperties(AMQQueue<?> queue, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue)
{
if(usePriority || lastValueQueue)
{
@@ -588,9 +577,9 @@ public class MessageStoreTest extends QpidTestCase
assertEquals("Queue is not 'simple'", StandardQueue.class, queue.getClass());
}
- assertEquals("Queue owner is not as expected", exclusive ? queueOwner : null, queue.getOwner());
- assertEquals("Queue durability is not as expected", durable, queue.isDurable());
- assertEquals("Queue exclusivity is not as expected", exclusive, queue.isExclusive());
+ assertEquals("Queue owner is not as expected for queue " + queue.getName(), exclusive ? queueOwner : null, queue.getOwner());
+ assertEquals("Queue durability is not as expected for queue " + queue.getName(), durable, queue.isDurable());
+ assertEquals("Queue exclusivity is not as expected for queue " + queue.getName(), exclusive, queue.isExclusive());
}
/**
@@ -606,7 +595,7 @@ public class MessageStoreTest extends QpidTestCase
}
}
- private void sendMessageOnExchange(ExchangeImpl exchange, String routingKey, boolean deliveryMode)
+ private void sendMessageOnExchange(ExchangeImpl<?> exchange, String routingKey, boolean deliveryMode)
{
//Set MessagePersistence
BasicContentHeaderProperties properties = new BasicContentHeaderProperties();
@@ -698,15 +687,12 @@ public class MessageStoreTest extends QpidTestCase
{
queueArguments.put(Queue.OWNER, queueOwner);
}
- AMQQueue queue = null;
+ AMQQueue<?> queue = null;
//Ideally we would be able to use the QueueDeclareHandler here.
queue = getVirtualHost().createQueue(queueArguments);
validateQueueProperties(queue, usePriority, durable, exclusive, lastValueQueue);
-
-
-
}
private Map<String, ExchangeImpl<?>> createExchanges() throws Exception
@@ -733,14 +719,14 @@ public class MessageStoreTest extends QpidTestCase
attributes.put(org.apache.qpid.server.model.Exchange.TYPE, type.getType());
attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, durable);
attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY,
- false ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
+ durable ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null);
exchange = getVirtualHost().createExchange(attributes);
return exchange;
}
- private void bindAllQueuesToExchange(ExchangeImpl exchange, String routingKey)
+ private void bindAllQueuesToExchange(ExchangeImpl<?> exchange, String routingKey)
{
bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(durablePriorityQueueName), false);
bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(durableQueueName), false);
@@ -749,7 +735,7 @@ public class MessageStoreTest extends QpidTestCase
bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(durableExclusiveQueueName), false);
}
- private void bindAllTopicQueuesToExchange(ExchangeImpl exchange, String routingKey)
+ private void bindAllTopicQueuesToExchange(ExchangeImpl<?> exchange, String routingKey)
{
bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(durablePriorityTopicQueueName), true);
@@ -759,9 +745,9 @@ public class MessageStoreTest extends QpidTestCase
}
- protected void bindQueueToExchange(ExchangeImpl exchange,
+ protected void bindQueueToExchange(ExchangeImpl<?> exchange,
String routingKey,
- AMQQueue queue,
+ AMQQueue<?> queue,
boolean useSelector)
{
Map<String,Object> bindArguments = new HashMap<String, Object>();
@@ -781,9 +767,9 @@ public class MessageStoreTest extends QpidTestCase
}
}
- protected void unbindQueueFromExchange(ExchangeImpl exchange,
+ protected void unbindQueueFromExchange(ExchangeImpl<?> exchange,
String routingKey,
- AMQQueue queue,
+ AMQQueue<?> queue,
boolean useSelector)
{
Map<String,Object> bindArguments = new HashMap<String, Object>();
@@ -829,7 +815,7 @@ public class MessageStoreTest extends QpidTestCase
private void validateMessageOnQueue(String queueName, long messageCount)
{
- AMQQueue queue = getVirtualHost().getQueue(queueName);
+ AMQQueue<?> queue = getVirtualHost().getQueue(queueName);
assertNotNull("Queue(" + queueName + ") not correctly registered:", queue);
@@ -839,12 +825,12 @@ public class MessageStoreTest extends QpidTestCase
private class TestMessagePublishInfo implements MessagePublishInfo
{
- ExchangeImpl _exchange;
+ ExchangeImpl<?> _exchange;
boolean _immediate;
boolean _mandatory;
String _routingKey;
- TestMessagePublishInfo(ExchangeImpl exchange, boolean immediate, boolean mandatory, String routingKey)
+ TestMessagePublishInfo(ExchangeImpl<?> exchange, boolean immediate, boolean mandatory, String routingKey)
{
_exchange = exchange;
_immediate = immediate;
@@ -852,29 +838,35 @@ public class MessageStoreTest extends QpidTestCase
_routingKey = routingKey;
}
+ @Override
public AMQShortString getExchange()
{
return new AMQShortString(_exchange.getName());
}
+ @Override
public void setExchange(AMQShortString exchange)
{
//no-op
}
+ @Override
public boolean isImmediate()
{
return _immediate;
}
+ @Override
public boolean isMandatory()
{
return _mandatory;
}
+ @Override
public AMQShortString getRoutingKey()
{
return new AMQShortString(_routingKey);
}
}
+
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java
index 4c0e2b7ffc..59b4d496fa 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java
@@ -45,10 +45,9 @@ import org.apache.qpid.server.model.SystemContext;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.security.access.FileAccessControlProviderConstants;
import org.apache.qpid.server.security.group.FileGroupManagerFactory;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
-import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
public class TestBrokerConfiguration
{
@@ -191,7 +190,7 @@ public class TestBrokerConfiguration
private ConfiguredObjectRecord findObject(final Class<? extends ConfiguredObject> category, final String objectName)
{
final RecordFindingVisitor visitor = new RecordFindingVisitor(category, objectName);
- _store.recoverConfigurationStore(visitor);
+ _store.visitConfiguredObjectRecords(visitor);
return visitor.getFoundRecord();
}
@@ -235,11 +234,12 @@ public class TestBrokerConfiguration
return findObject(category, name).getAttributes();
}
- private static class RecordFindingVisitor implements ConfigurationRecoveryHandler
+ private static class RecordFindingVisitor implements ConfiguredObjectRecordHandler
{
private final Class<? extends ConfiguredObject> _category;
private final String _objectName;
public ConfiguredObjectRecord _foundRecord;
+ private int _version;
public RecordFindingVisitor(final Class<? extends ConfiguredObject> category, final String objectName)
{
@@ -248,26 +248,28 @@ public class TestBrokerConfiguration
}
@Override
- public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion)
+ public void begin(final int configVersion)
{
-
+ _version = configVersion;
}
@Override
- public void configuredObject(final ConfiguredObjectRecord object)
+ public boolean handle(final ConfiguredObjectRecord object)
{
if (object.getType().equals(_category.getSimpleName())
&& (_objectName == null
|| _objectName.equals(object.getAttributes().get(ConfiguredObject.NAME))))
{
_foundRecord = object;
+ return false;
}
+ return true;
}
@Override
- public int completeConfigurationRecovery()
+ public int end()
{
- return 0;
+ return _version;
}
public ConfiguredObjectRecord getFoundRecord()
diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes
index 6f7de94e5d..8e48e77e07 100755
--- a/qpid/java/test-profiles/CPPExcludes
+++ b/qpid/java/test-profiles/CPPExcludes
@@ -85,14 +85,14 @@ org.apache.qpid.server.store.PersistentStoreTest#*
org.apache.qpid.server.store.SplitStoreTest#*
// These tests are for the Java broker persistent store modules
-org.apache.qpid.server.store.MessageStoreTest#testMessagePersistence
-org.apache.qpid.server.store.MessageStoreTest#testMessageRemoval
-org.apache.qpid.server.store.MessageStoreTest#testBindingPersistence
-org.apache.qpid.server.store.MessageStoreTest#testDurableBindingRemoval
-org.apache.qpid.server.store.MessageStoreTest#testQueuePersistence
-org.apache.qpid.server.store.MessageStoreTest#testDurableQueueRemoval
-org.apache.qpid.server.store.MessageStoreTest#testExchangePersistence
-org.apache.qpid.server.store.MessageStoreTest#testDurableExchangeRemoval
+org.apache.qpid.server.store.VirtualHostMessageStoreTest#testMessagePersistence
+org.apache.qpid.server.store.VirtualHostMessageStoreTest#testMessageRemoval
+org.apache.qpid.server.store.VirtualHostMessageStoreTest#testBindingPersistence
+org.apache.qpid.server.store.VirtualHostMessageStoreTest#testDurableBindingRemoval
+org.apache.qpid.server.store.VirtualHostMessageStoreTest#testQueuePersistence
+org.apache.qpid.server.store.VirtualHostMessageStoreTest#testDurableQueueRemoval
+org.apache.qpid.server.store.VirtualHostMessageStoreTest#testExchangePersistence
+org.apache.qpid.server.store.VirtualHostMessageStoreTest#testDurableExchangeRemoval
org.apache.qpid.server.store.DurableConfigurationStoreTest#*
// CPP Broker does not follow the same Logging convention as the Java broker
diff --git a/qpid/java/test-profiles/JavaBDBExcludes b/qpid/java/test-profiles/JavaBDBExcludes
index 0750beb339..969b9272a7 100644
--- a/qpid/java/test-profiles/JavaBDBExcludes
+++ b/qpid/java/test-profiles/JavaBDBExcludes
@@ -17,7 +17,3 @@
// under the License.
//
-//This test is subclassed within the bdbstore module to enable it to run and
-//also add some bdb-specific tests. It is excluded to prevent running twice.
-org.apache.qpid.server.store.MessageStoreTest#*
-org.apache.qpid.server.store.DurableConfigurationStoreTest#*
diff --git a/qpid/java/test-profiles/JavaTransientExcludes b/qpid/java/test-profiles/JavaTransientExcludes
index 0b060051e9..ef98882980 100644
--- a/qpid/java/test-profiles/JavaTransientExcludes
+++ b/qpid/java/test-profiles/JavaTransientExcludes
@@ -33,19 +33,17 @@ org.apache.qpid.test.unit.xa.TopicTest#testMultiMessagesDurSubCrash
org.apache.qpid.test.unit.xa.TopicTest#testDurSubCrash
org.apache.qpid.test.unit.xa.TopicTest#testRecover
-org.apache.qpid.server.store.MessageStoreTest#testMessagePersistence
-org.apache.qpid.server.store.MessageStoreTest#testMessageRemoval
-org.apache.qpid.server.store.MessageStoreTest#testBindingPersistence
-org.apache.qpid.server.store.MessageStoreTest#testDurableBindingRemoval
-org.apache.qpid.server.store.MessageStoreTest#testQueuePersistence
-org.apache.qpid.server.store.MessageStoreTest#testDurableQueueRemoval
-org.apache.qpid.server.store.MessageStoreTest#testExchangePersistence
-org.apache.qpid.server.store.MessageStoreTest#testDurableExchangeRemoval
+org.apache.qpid.server.store.VirtualHostMessageStoreTest#testMessagePersistence
+org.apache.qpid.server.store.VirtualHostMessageStoreTest#testMessageRemoval
+org.apache.qpid.server.store.VirtualHostMessageStoreTest#testBindingPersistence
+org.apache.qpid.server.store.VirtualHostMessageStoreTest#testDurableBindingRemoval
+org.apache.qpid.server.store.VirtualHostMessageStoreTest#testQueuePersistence
+org.apache.qpid.server.store.VirtualHostMessageStoreTest#testDurableQueueRemoval
+org.apache.qpid.server.store.VirtualHostMessageStoreTest#testExchangePersistence
+org.apache.qpid.server.store.VirtualHostMessageStoreTest#testDurableExchangeRemoval
org.apache.qpid.server.store.berkeleydb.*
-org.apache.qpid.server.store.DurableConfigurationStoreTest#*
-
org.apache.qpid.systest.management.jmx.QueueManagementTest#testAlternateExchangeSurvivesRestart
org.apache.qpid.systest.management.jmx.QueueManagementTest#testQueueDescriptionSurvivesRestart
org.apache.qpid.systest.management.jmx.QueueManagementTest#testMoveMessageBetweenQueuesWithBrokerRestart