summaryrefslogtreecommitdiff
path: root/java/bdbstore/src
diff options
context:
space:
mode:
Diffstat (limited to 'java/bdbstore/src')
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java169
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java4
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreFactory.java41
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java4
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java41
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java31
-rw-r--r--java/bdbstore/src/main/resources/META-INF/services/org.apache.qpid.server.store.MessageStoreFactory20
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java2
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java74
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java2
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java62
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java127
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java6
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java36
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java44
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java39
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java47
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java63
-rw-r--r--java/bdbstore/src/test/resources/upgrade/bdbstore-v4/test-store/00000000.jdbbin1361990 -> 1366145 bytes
-rw-r--r--java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000000.jdbbin1361990 -> 1366145 bytes
-rw-r--r--java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000001.jdbbin1333643 -> 1336563 bytes
21 files changed, 537 insertions, 275 deletions
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index 9323111fdd..6e64ea5597 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -54,13 +54,10 @@ import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.federation.Bridge;
-import org.apache.qpid.server.federation.BrokerLink;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.*;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
@@ -73,7 +70,6 @@ import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.MessageMetaDataBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.PreparedTransactionBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding;
-import org.apache.qpid.server.store.berkeleydb.tuple.StringMapBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;
import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader;
@@ -423,8 +419,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
BindingRecoveryHandler brh = qrh.completeQueueRecovery();
_configuredObjectHelper.recoverBindings(brh, configuredObjects);
- BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery();
- recoverBrokerLinks(lrh);
+ brh.completeBindingRecovery();
}
catch (DatabaseException e)
{
@@ -466,66 +461,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
}
- private void recoverBrokerLinks(final ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh)
- {
- Cursor cursor = null;
-
- try
- {
- cursor = _linkDb.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
-
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- UUID id = UUIDTupleBinding.getInstance().entryToObject(key);
- long createTime = LongBinding.entryToLong(value);
- Map<String,String> arguments = StringMapBinding.getInstance().entryToObject(value);
-
- ConfigurationRecoveryHandler.BridgeRecoveryHandler brh = lrh.brokerLink(id, createTime, arguments);
-
- recoverBridges(brh, id);
- }
- }
- finally
- {
- closeCursorSafely(cursor);
- }
-
- }
-
- private void recoverBridges(final ConfigurationRecoveryHandler.BridgeRecoveryHandler brh, final UUID linkId)
- {
- Cursor cursor = null;
-
- try
- {
- cursor = _bridgeDb.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
-
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- UUID id = UUIDTupleBinding.getInstance().entryToObject(key);
-
- UUID parentId = UUIDTupleBinding.getInstance().entryToObject(value);
- if(parentId.equals(linkId))
- {
-
- long createTime = LongBinding.entryToLong(value);
- Map<String,String> arguments = StringMapBinding.getInstance().entryToObject(value);
- brh.bridge(id,createTime,arguments);
- }
- }
- brh.completeBridgeRecoveryForLink();
- }
- finally
- {
- closeCursorSafely(cursor);
- }
-
- }
-
private void recoverMessages(MessageStoreRecoveryHandler msrh) throws DatabaseException
{
@@ -940,89 +875,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
}
- public void createBrokerLink(final BrokerLink link) throws AMQStoreException
- {
- if (_stateManager.isInState(State.ACTIVE))
- {
- DatabaseEntry key = new DatabaseEntry();
- UUIDTupleBinding.getInstance().objectToEntry(link.getQMFId(), key);
-
- DatabaseEntry value = new DatabaseEntry();
- LongBinding.longToEntry(link.getCreateTime(), value);
- StringMapBinding.getInstance().objectToEntry(link.getArguments(), value);
-
- try
- {
- _linkDb.put(null, key, value);
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error writing Link " + link
- + " to database: " + e.getMessage(), e);
- }
- }
- }
-
- public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException
- {
- DatabaseEntry key = new DatabaseEntry();
- UUIDTupleBinding.getInstance().objectToEntry(link.getQMFId(), key);
- try
- {
- OperationStatus status = _linkDb.delete(null, key);
- if (status == OperationStatus.NOTFOUND)
- {
- throw new AMQStoreException("Link " + link + " not found");
- }
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error deleting the Link " + link + " from database: " + e.getMessage(), e);
- }
- }
-
- public void createBridge(final Bridge bridge) throws AMQStoreException
- {
- if (_stateManager.isInState(State.ACTIVE))
- {
- DatabaseEntry key = new DatabaseEntry();
- UUIDTupleBinding.getInstance().objectToEntry(bridge.getQMFId(), key);
-
- DatabaseEntry value = new DatabaseEntry();
- UUIDTupleBinding.getInstance().objectToEntry(bridge.getLink().getQMFId(),value);
- LongBinding.longToEntry(bridge.getCreateTime(),value);
- StringMapBinding.getInstance().objectToEntry(bridge.getArguments(), value);
-
- try
- {
- _bridgeDb.put(null, key, value);
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error writing Bridge " + bridge
- + " to database: " + e.getMessage(), e);
- }
-
- }
- }
-
- public void deleteBridge(final Bridge bridge) throws AMQStoreException
- {
- DatabaseEntry key = new DatabaseEntry();
- UUIDTupleBinding.getInstance().objectToEntry(bridge.getQMFId(), key);
- try
- {
- OperationStatus status = _bridgeDb.delete(null, key);
- if (status == OperationStatus.NOTFOUND)
- {
- throw new AMQStoreException("Bridge " + bridge + " not found");
- }
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error deleting the Bridge " + bridge + " from database: " + e.getMessage(), e);
- }
- }
/**
* Places a message onto a specified queue, in a given transaction.
@@ -1050,7 +902,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
{
LOGGER.debug("Enqueuing message " + messageId + " on queue "
+ (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId()
- + " [Transaction" + tx + "]");
+ + " in transaction " + tx);
}
_deliveryDb.put(tx, key, value);
}
@@ -1204,7 +1056,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("commitTranImpl completed for [Transaction:" + tx + "]");
+ String transactionType = syncCommit ? "synchronous" : "asynchronous";
+ LOGGER.debug("commitTranImpl completed " + transactionType + " transaction " + tx);
}
}
catch (DatabaseException e)
@@ -1226,7 +1079,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
{
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("abortTran called for [Transaction:" + tx + "]");
+ LOGGER.debug("abortTran called for transaction " + tx);
}
try
@@ -1338,7 +1191,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("Storing content for message " + messageId + "[Transaction" + tx + "]");
+ LOGGER.debug("Storing content for message " + messageId + " in transaction " + tx);
}
}
@@ -1363,8 +1216,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore
{
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("public void storeMetaData(Txn tx = " + tx + ", Long messageId = "
- + messageId + ", MessageMetaData messageMetaData = " + messageMetaData + "): called");
+ LOGGER.debug("storeMetaData called for transaction " + tx
+ + ", messageId " + messageId
+ + ", messageMetaData " + messageMetaData);
}
DatabaseEntry key = new DatabaseEntry();
@@ -1378,7 +1232,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
_messageMetaDataDb.put(tx, key, value);
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("Storing message metadata for message id " + messageId + "[Transaction" + tx + "]");
+ LOGGER.debug("Storing message metadata for message id " + messageId + " in transaction " + tx);
}
}
catch (DatabaseException e)
@@ -1680,7 +1534,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore
else
{
ByteBuffer buf = ByteBuffer.allocate(size);
- getContent(offsetInMessage, buf);
+ int length = getContent(offsetInMessage, buf);
+ buf.limit(length);
buf.position(0);
return buf;
}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
index c40f24dbc3..ba111e8091 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
@@ -105,7 +105,7 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
put(ReplicationConfig.LOG_FLUSH_TASK_INTERVAL, "1 min");
}});
- public static final String BDB_HA_STORE_TYPE = "BDB-HA";
+ public static final String TYPE = "BDB-HA";
private String _groupName;
private String _nodeName;
@@ -602,6 +602,6 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
@Override
public String getStoreType()
{
- return BDB_HA_STORE_TYPE;
+ return TYPE;
}
}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreFactory.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreFactory.java
new file mode 100644
index 0000000000..20dce2628d
--- /dev/null
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreFactory.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreFactory;
+
+public class BDBHAMessageStoreFactory implements MessageStoreFactory
+{
+
+ @Override
+ public String getType()
+ {
+ return BDBHAMessageStore.TYPE;
+ }
+
+ @Override
+ public MessageStore createMessageStore()
+ {
+ return new BDBHAMessageStore();
+ }
+
+}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index 82bc3d8564..4028de4b80 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -42,7 +42,7 @@ import com.sleepycat.je.EnvironmentConfig;
public class BDBMessageStore extends AbstractBDBMessageStore
{
private static final Logger LOGGER = Logger.getLogger(BDBMessageStore.class);
- private static final String BDB_STORE_TYPE = "BDB";
+ public static final String TYPE = "BDB";
private CommitThreadWrapper _commitThreadWrapper;
@Override
@@ -108,7 +108,7 @@ public class BDBMessageStore extends AbstractBDBMessageStore
@Override
public String getStoreType()
{
- return BDB_STORE_TYPE;
+ return TYPE;
}
}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java
new file mode 100644
index 0000000000..126bf1928a
--- /dev/null
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreFactory;
+
+public class BDBMessageStoreFactory implements MessageStoreFactory
+{
+
+ @Override
+ public String getType()
+ {
+ return BDBMessageStore.TYPE;
+ }
+
+ @Override
+ public MessageStore createMessageStore()
+ {
+ return new BDBMessageStore();
+ }
+
+}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
index fe1556b5a6..598d20146c 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
@@ -80,7 +80,7 @@ public class CommitThreadWrapper
{
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("public synchronized void complete(): called (Transaction = " + _tx + ")");
+ LOGGER.debug("complete() called for transaction " + _tx);
}
_complete = true;
@@ -101,7 +101,10 @@ public class CommitThreadWrapper
if(!_syncCommit)
{
- LOGGER.debug("CommitAsync was requested, returning immediately.");
+ if(LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("CommitAsync was requested, returning immediately.");
+ }
return;
}
@@ -121,6 +124,12 @@ public class CommitThreadWrapper
public synchronized void waitForCompletion()
{
+ long startTime = 0;
+ if(LOGGER.isDebugEnabled())
+ {
+ startTime = System.currentTimeMillis();
+ }
+
while (!isComplete())
{
_commitThread.explicitNotify();
@@ -133,6 +142,12 @@ public class CommitThreadWrapper
throw new RuntimeException(e);
}
}
+
+ if(LOGGER.isDebugEnabled())
+ {
+ long duration = System.currentTimeMillis() - startTime;
+ LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx);
+ }
}
}
@@ -198,8 +213,20 @@ public class CommitThreadWrapper
try
{
+ long startTime = 0;
+ if(LOGGER.isDebugEnabled())
+ {
+ startTime = System.currentTimeMillis();
+ }
+
_environment.flushLog(true);
+ if(LOGGER.isDebugEnabled())
+ {
+ long duration = System.currentTimeMillis() - startTime;
+ LOGGER.debug("flushLog completed in " + duration + " ms");
+ }
+
for(int i = 0; i < size; i++)
{
BDBCommitFuture commit = _jobQueue.poll();
diff --git a/java/bdbstore/src/main/resources/META-INF/services/org.apache.qpid.server.store.MessageStoreFactory b/java/bdbstore/src/main/resources/META-INF/services/org.apache.qpid.server.store.MessageStoreFactory
new file mode 100644
index 0000000000..0be7035e2e
--- /dev/null
+++ b/java/bdbstore/src/main/resources/META-INF/services/org.apache.qpid.server.store.MessageStoreFactory
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+org.apache.qpid.server.store.berkeleydb.BDBMessageStoreFactory
+org.apache.qpid.server.store.berkeleydb.BDBHAMessageStoreFactory \ No newline at end of file
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java
index 342c185b99..7c04d83e79 100644
--- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java
@@ -63,7 +63,7 @@ public class BDBBackupTest extends QpidBrokerTestCase
// It would be preferable to lookup the store path using #getConfigurationStringProperty("virtualhosts...")
// but the config as known to QBTC does not pull-in the virtualhost section from its separate source file
- _backupFromDir = new File(qpidWork + "/bdbstore/" + TEST_VHOST + "-store");
+ _backupFromDir = new File(qpidWork + File.separator + TEST_VHOST + "-store");
boolean fromDirExistsAndIsDir = _backupFromDir.isDirectory();
assertTrue("backupFromDir " + _backupFromDir + " should already exist", fromDirExistsAndIsDir);
}
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java
index a04fb20680..8e32a1d113 100644
--- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java
@@ -24,12 +24,8 @@ import java.io.File;
import java.net.InetAddress;
import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.qpid.server.configuration.ServerConfiguration;
-import org.apache.qpid.server.logging.SystemOutMessageLogger;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.TestLogActor;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.FileUtils;
@@ -51,6 +47,7 @@ public class BDBHAMessageStoreTest extends QpidTestCase
private int _masterPort;
private String _host;
private XMLConfiguration _configXml;
+ private VirtualHost _virtualHost;
public void setUp() throws Exception
{
@@ -63,41 +60,48 @@ public class BDBHAMessageStoreTest extends QpidTestCase
FileUtils.delete(new File(_workDir), true);
_configXml = new XMLConfiguration();
- }
- public void tearDown() throws Exception
- {
- FileUtils.delete(new File(_workDir), true);
- super.tearDown();
- }
+ BrokerTestHelper.setUp();
+ }
- public void testSetSystemConfiguration() throws Exception
+ public void tearDown() throws Exception
{
- // create virtual host configuration, registry and host instance
- addVirtualHostConfiguration();
- TestApplicationRegistry registry = initialize();
try
{
- VirtualHost virtualhost = registry.getVirtualHostRegistry().getVirtualHost("test" + _masterPort);
- BDBHAMessageStore store = (BDBHAMessageStore) virtualhost.getMessageStore();
-
- // test whether JVM system settings were applied
- Environment env = store.getEnvironment();
- assertEquals("Unexpected number of cleaner threads", TEST_NUMBER_OF_THREADS, env.getConfig().getConfigParam(EnvironmentConfig.CLEANER_THREADS));
- assertEquals("Unexpected log file max", TEST_LOG_FILE_MAX, env.getConfig().getConfigParam(EnvironmentConfig.LOG_FILE_MAX));
-
- ReplicatedEnvironment repEnv = store.getReplicatedEnvironment();
- assertEquals("Unexpected number of elections primary retries", TEST_ELECTION_RETRIES,
- repEnv.getConfig().getConfigParam(ReplicationConfig.ELECTIONS_PRIMARY_RETRIES));
- assertEquals("Unexpected number of elections primary retries", TEST_ENV_CONSISTENCY_TIMEOUT,
- repEnv.getConfig().getConfigParam(ReplicationConfig.ENV_CONSISTENCY_TIMEOUT));
+ FileUtils.delete(new File(_workDir), true);
+ if (_virtualHost != null)
+ {
+ _virtualHost.close();
+ }
}
finally
{
- ApplicationRegistry.remove();
+ BrokerTestHelper.tearDown();
+ super.tearDown();
}
}
+ public void testSetSystemConfiguration() throws Exception
+ {
+ // create virtual host configuration, registry and host instance
+ addVirtualHostConfiguration();
+ String vhostName = "test" + _masterPort;
+ VirtualHostConfiguration configuration = new VirtualHostConfiguration(vhostName, _configXml.subset("virtualhosts.virtualhost." + vhostName), BrokerTestHelper.createBrokerMock());
+ _virtualHost = BrokerTestHelper.createVirtualHost(configuration);
+ BDBHAMessageStore store = (BDBHAMessageStore) _virtualHost.getMessageStore();
+
+ // test whether JVM system settings were applied
+ Environment env = store.getEnvironment();
+ assertEquals("Unexpected number of cleaner threads", TEST_NUMBER_OF_THREADS, env.getConfig().getConfigParam(EnvironmentConfig.CLEANER_THREADS));
+ assertEquals("Unexpected log file max", TEST_LOG_FILE_MAX, env.getConfig().getConfigParam(EnvironmentConfig.LOG_FILE_MAX));
+
+ ReplicatedEnvironment repEnv = store.getReplicatedEnvironment();
+ assertEquals("Unexpected number of elections primary retries", TEST_ELECTION_RETRIES,
+ repEnv.getConfig().getConfigParam(ReplicationConfig.ELECTIONS_PRIMARY_RETRIES));
+ assertEquals("Unexpected number of elections primary retries", TEST_ENV_CONSISTENCY_TIMEOUT,
+ repEnv.getConfig().getConfigParam(ReplicationConfig.ENV_CONSISTENCY_TIMEOUT));
+ }
+
private void addVirtualHostConfiguration() throws Exception
{
int port = findFreePort();
@@ -152,14 +156,4 @@ public class BDBHAMessageStoreTest extends QpidTestCase
}
return _host + ":" + _masterPort;
}
-
- private TestApplicationRegistry initialize() throws Exception
- {
- CurrentActor.set(new TestLogActor(new SystemOutMessageLogger()));
- ServerConfiguration configuration = new ServerConfiguration(_configXml);
- TestApplicationRegistry registry = new TestApplicationRegistry(configuration);
- ApplicationRegistry.initialise(registry);
- registry.getVirtualHostRegistry().setDefaultVirtualHostName("test" + _masterPort);
- return registry;
- }
}
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
index eef9f7eab4..d18c850ecf 100644
--- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -225,7 +225,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
messageStore.close();
AbstractBDBMessageStore newStore = new BDBMessageStore();
- newStore.configure("", _config.subset("store"));
+ newStore.configure("", getConfig().subset("store"));
newStore.startWithNoRecover();
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java
index 122f846a2d..390d667db0 100644
--- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.server.store.berkeleydb;
+import java.util.HashMap;
+import java.util.Map;
+
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
@@ -35,6 +38,11 @@ import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.client.AMQDestination;
@@ -62,6 +70,11 @@ public class BDBStoreUpgradeTestPreparer
public static final String QUEUE_NAME="myUpgradeQueue";
public static final String NON_DURABLE_QUEUE_NAME="queue-non-durable";
+ public static final String PRIORITY_QUEUE_NAME="myPriorityQueue";
+ public static final String QUEUE_WITH_DLQ_NAME="myQueueWithDLQ";
+ public static final String NONEXCLUSIVE_WITH_ERRONEOUS_OWNER = "nonexclusive-with-erroneous-owner";
+ public static final String MISUSED_OWNER = "misused-owner-as-description";
+
private static AMQConnectionFactory _connFac;
private static final String CONN_URL =
"amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672'";
@@ -86,10 +99,10 @@ public class BDBStoreUpgradeTestPreparer
{
Connection connection = _connFac.createConnection();
AMQSession<?, ?> session = (AMQSession<?,?>)connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- AMQShortString queueName = AMQShortString.valueOf(NON_DURABLE_QUEUE_NAME);
+ AMQShortString queueName = new AMQShortString(NON_DURABLE_QUEUE_NAME);
AMQDestination destination = (AMQDestination) session.createQueue(NON_DURABLE_QUEUE_NAME);
session.sendCreateQueue(queueName, false, false, false, null);
- session.bindQueue(queueName, queueName, null, AMQShortString.valueOf("amq.direct"), destination);
+ session.bindQueue(queueName, queueName, null, new AMQShortString("amq.direct"), destination);
MessageProducer messageProducer = session.createProducer(destination);
sendMessages(session, messageProducer, destination, DeliveryMode.PERSISTENT, 1024, 3);
connection.close();
@@ -140,11 +153,56 @@ public class BDBStoreUpgradeTestPreparer
// Publish 5 persistent messages which will NOT be committed and so should be 'lost'
sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 1*1024, 5);
+ messageProducer.close();
+ session.close();
+
+ session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ // Create a priority queue on broker
+ final Map<String,Object> priorityQueueArguments = new HashMap<String, Object>();
+ priorityQueueArguments.put("x-qpid-priorities",10);
+ createAndBindQueueOnBroker(session, PRIORITY_QUEUE_NAME, priorityQueueArguments);
+
+ // Create a queue that has a DLQ
+ final Map<String,Object> queueWithDLQArguments = new HashMap<String, Object>();
+ queueWithDLQArguments.put("x-qpid-dlq-enabled", true);
+ queueWithDLQArguments.put("x-qpid-maximum-delivery-count", 2);
+ createAndBindQueueOnBroker(session, QUEUE_WITH_DLQ_NAME, queueWithDLQArguments);
+
+ // Send message to the DLQ
+ Queue dlq = session.createQueue("fanout://" + QUEUE_WITH_DLQ_NAME + "_DLE//does-not-matter");
+ MessageProducer dlqMessageProducer = session.createProducer(dlq);
+ sendMessages(session, dlqMessageProducer, dlq, DeliveryMode.PERSISTENT, 1*1024, 1);
+ session.commit();
+ // Create a queue with JMX specifying an owner, so it can later be moved into description
+ createAndBindQueueOnBrokerWithJMX(NONEXCLUSIVE_WITH_ERRONEOUS_OWNER, MISUSED_OWNER, priorityQueueArguments);
session.close();
connection.close();
}
+ private void createAndBindQueueOnBroker(Session session, String queueName, final Map<String, Object> arguments) throws Exception
+ {
+ ((AMQSession<?,?>) session).createQueue(new AMQShortString(queueName), false, true, false, arguments);
+ Queue queue = (Queue) session.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='true'");
+ ((AMQSession<?,?>) session).declareAndBind((AMQDestination)queue);
+ }
+
+ private void createAndBindQueueOnBrokerWithJMX(String queueName, String owner, final Map<String, Object> arguments) throws Exception
+ {
+ Map<String, Object> environment = new HashMap<String, Object>();
+ environment.put(JMXConnector.CREDENTIALS, new String[] {"admin","admin"});
+ JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:8999/jmxrmi");
+ JMXConnector jmxConnector = JMXConnectorFactory.connect(url, environment);
+ MBeanServerConnection mbsc = jmxConnector.getMBeanServerConnection();
+ ObjectName virtualHost = new ObjectName("org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost=\"test\"");
+
+ Object[] params = new Object[] {queueName, owner, true, arguments};
+ String[] signature = new String[] {String.class.getName(), String.class.getName(), boolean.class.getName(), Map.class.getName()};
+ mbsc.invoke(virtualHost, "createNewQueue", params, signature);
+
+ ObjectName directExchange = new ObjectName("org.apache.qpid:type=VirtualHost.Exchange,VirtualHost=\"test\",name=\"amq.direct\",ExchangeType=direct");
+ mbsc.invoke(directExchange, "createNewBinding", new Object[] {queueName, queueName}, new String[] {String.class.getName(), String.class.getName()});
+ }
/**
* Prepare a DurableSubscription backing queue for use in testing selector
* recovery and queue exclusivity marking during the upgrade process.
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
index 4e201d5473..e4837b212e 100644
--- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
@@ -22,16 +22,20 @@ package org.apache.qpid.server.store.berkeleydb;
import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.NON_DURABLE_QUEUE_NAME;
+import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.PRIORITY_QUEUE_NAME;
import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_NAME;
+import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_WITH_DLQ_NAME;
import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SELECTOR_SUB_NAME;
import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SELECTOR_TOPIC_NAME;
import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SUB_NAME;
import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.TOPIC_NAME;
import java.io.File;
+import java.io.InputStream;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@@ -43,7 +47,10 @@ import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularDataSupport;
+import org.apache.qpid.management.common.mbeans.ManagedExchange;
import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.test.utils.JMXTestUtils;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
@@ -70,7 +77,7 @@ public class BDBUpgradeTest extends QpidBrokerTestCase
public void setUp() throws Exception
{
assertNotNull("QPID_WORK must be set", QPID_WORK_ORIG);
- _storeLocation = getWorkDirBaseDir() + "/bdbstore/test-store";
+ _storeLocation = getWorkDirBaseDir() + File.separator + "test-store";
//Clear the two target directories if they exist.
File directory = new File(_storeLocation);
@@ -78,15 +85,13 @@ public class BDBUpgradeTest extends QpidBrokerTestCase
{
FileUtils.delete(directory, true);
}
+ directory.mkdirs();
// copy store files
- String src = getClass().getClassLoader().getResource("upgrade/bdbstore-v4/test-store").toURI().getPath();
- FileUtils.copyRecursive(new File(src), new File(_storeLocation));
-
- //override the broker config used and then start the broker with the updated store
- _configFile = new File("build/etc/config-systests-bdb.xml");
- setConfigurationProperty("management.enabled", "true");
+ InputStream src = getClass().getClassLoader().getResourceAsStream("upgrade/bdbstore-v4/test-store/00000000.jdb");
+ FileUtils.copy(src, new File(_storeLocation, "00000000.jdb"));
+ getBrokerConfiguration().addJmxManagementConfiguration();
super.setUp();
}
@@ -302,11 +307,110 @@ public class BDBUpgradeTest extends QpidBrokerTestCase
Queue queue = session.createQueue(NON_DURABLE_QUEUE_NAME);
MessageConsumer messageConsumer = session.createConsumer(queue);
- for (int i = 0; i < 3; i++)
+ for (int i = 1; i <= 3; i++)
{
Message message = messageConsumer.receive(1000);
assertNotNull("Message was not migrated!", message);
assertTrue("Unexpected message received!", message instanceof TextMessage);
+ assertEquals("ID property did not match", i, message.getIntProperty("ID"));
+ }
+ }
+
+ /**
+ * Tests store upgrade has maintained the priority queue configuration,
+ * such that sending messages with priorities out-of-order and then consuming
+ * them gets the messages back in priority order.
+ */
+ public void testPriorityQueue() throws Exception
+ {
+ // Create a connection and start it
+ Connection connection = getConnection();
+ connection.start();
+
+ // send some messages to the priority queue
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(PRIORITY_QUEUE_NAME);
+ MessageProducer producer = session.createProducer(queue);
+
+ producer.setPriority(4);
+ producer.send(createMessage(1, false, session, producer));
+ producer.setPriority(1);
+ producer.send(createMessage(2, false, session, producer));
+ producer.setPriority(9);
+ producer.send(createMessage(3, false, session, producer));
+ session.close();
+
+ //consume the messages, expected order: msg 3, msg 1, msg 2.
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ Message msg = consumer.receive(1500);
+ assertNotNull("expected message was not received", msg);
+ assertEquals(3, msg.getIntProperty("msg"));
+ msg = consumer.receive(1500);
+ assertNotNull("expected message was not received", msg);
+ assertEquals(1, msg.getIntProperty("msg"));
+ msg = consumer.receive(1500);
+ assertNotNull("expected message was not received", msg);
+ assertEquals(2, msg.getIntProperty("msg"));
+ }
+
+ /**
+ * Test that the queue configured to have a DLQ was recovered and has the alternate exchange
+ * and max delivery count, the DLE exists, the DLQ exists with no max delivery count, the
+ * DLQ is bound to the DLE, and that the DLQ does not itself have a DLQ.
+ *
+ * DLQs are NOT enabled at the virtualhost level, we are testing recovery of the arguments
+ * that turned it on for this specific queue.
+ */
+ public void testRecoveryOfQueueWithDLQ() throws Exception
+ {
+ JMXTestUtils jmxUtils = null;
+ try
+ {
+ jmxUtils = new JMXTestUtils(this, "guest", "guest");
+ jmxUtils.open();
+ }
+ catch (Exception e)
+ {
+ fail("Unable to establish JMX connection, test cannot proceed");
+ }
+
+ try
+ {
+ //verify the DLE exchange exists, has the expected type, and a single binding for the DLQ
+ ManagedExchange exchange = jmxUtils.getManagedExchange(QUEUE_WITH_DLQ_NAME + "_DLE");
+ assertEquals("Wrong exchange type", "fanout", exchange.getExchangeType());
+ TabularDataSupport bindings = (TabularDataSupport) exchange.bindings();
+ assertEquals(1, bindings.size());
+ for(Object o : bindings.values())
+ {
+ CompositeData binding = (CompositeData) o;
+
+ String bindingKey = (String) binding.get(ManagedExchange.BINDING_KEY);
+ String[] queueNames = (String[]) binding.get(ManagedExchange.QUEUE_NAMES);
+
+ //Because its a fanout exchange, we just return a single '*' key with all bound queues
+ assertEquals("unexpected binding key", "*", bindingKey);
+ assertEquals("unexpected number of queues bound", 1, queueNames.length);
+ assertEquals("unexpected queue name", QUEUE_WITH_DLQ_NAME + "_DLQ", queueNames[0]);
+ }
+
+ //verify the queue exists, has the expected alternate exchange and max delivery count
+ ManagedQueue queue = jmxUtils.getManagedQueue(QUEUE_WITH_DLQ_NAME);
+ assertEquals("Queue does not have the expected AlternateExchange", QUEUE_WITH_DLQ_NAME + "_DLE", queue.getAlternateExchange());
+ assertEquals("Unexpected maximum delivery count", Integer.valueOf(2), queue.getMaximumDeliveryCount());
+
+ ManagedQueue dlQqueue = jmxUtils.getManagedQueue(QUEUE_WITH_DLQ_NAME + "_DLQ");
+ assertNull("Queue should not have an AlternateExchange", dlQqueue.getAlternateExchange());
+ assertEquals("Unexpected maximum delivery count", Integer.valueOf(0), dlQqueue.getMaximumDeliveryCount());
+
+ String dlqDlqObjectNameString = jmxUtils.getQueueObjectNameString("test", QUEUE_WITH_DLQ_NAME + "_DLQ" + "_DLQ");
+ assertFalse("a DLQ should not exist for the DLQ itself", jmxUtils.doesManagedObjectExist(dlqDlqObjectNameString));
+ }
+ finally
+ {
+ jmxUtils.close();
}
}
@@ -387,4 +491,11 @@ public class BDBUpgradeTest extends QpidBrokerTestCase
session.close();
}
+ private Message createMessage(int msgId, boolean first, Session producerSession, MessageProducer producer) throws JMSException
+ {
+ Message send = producerSession.createTextMessage("Message: " + msgId);
+ send.setIntProperty("msg", msgId);
+
+ return send;
+ }
}
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java
index c6a9ba8f8b..0e1ef7b25d 100644
--- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java
@@ -31,6 +31,7 @@ import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.test.utils.TestUtils;
import com.sleepycat.je.rep.ReplicationConfig;
@@ -134,7 +135,10 @@ public class HAClusterBlackboxTest extends QpidBrokerTestCase
public void assertFailoverOccurs(long delay) throws InterruptedException
{
- _failoverLatch.await(delay, TimeUnit.MILLISECONDS);
+ if (!_failoverLatch.await(delay, TimeUnit.MILLISECONDS))
+ {
+ LOGGER.warn("Test thread dump:\n\n" + TestUtils.dumpThreads() + "\n");
+ }
assertEquals("Failover did not occur", 0, _failoverLatch.getCount());
}
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
index abe13edc32..4c2fa910f5 100644
--- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
@@ -43,6 +43,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.test.utils.TestBrokerConfiguration;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.apache.qpid.url.URLSyntaxException;
@@ -67,7 +68,7 @@ public class HATestClusterCreator
private final Map<Integer, Integer> _brokerPortToBdbPortMap = new HashMap<Integer, Integer>();
private final Map<Integer, BrokerConfigHolder> _brokerConfigurations = new TreeMap<Integer, BrokerConfigHolder>();
private final String _virtualHostName;
- private final String _storeConfigKeyPrefix;
+ private final String _vhostStoreConfigKeyPrefix;
private final String _ipAddressOfBroker;
private final String _groupName ;
@@ -82,7 +83,7 @@ public class HATestClusterCreator
_groupName = "group" + _testcase.getName();
_ipAddressOfBroker = getIpAddressOfBrokerHost();
_numberOfNodes = numberOfNodes;
- _storeConfigKeyPrefix = "virtualhosts.virtualhost." + _virtualHostName + ".store.";
+ _vhostStoreConfigKeyPrefix = "virtualhosts.virtualhost." + _virtualHostName + ".store.";
_bdbHelperPort = 0;
}
@@ -102,7 +103,9 @@ public class HATestClusterCreator
}
configureClusterNode(brokerPort, bdbPort);
- collectConfig(brokerPort, _testcase.getTestConfiguration(), _testcase.getTestVirtualhosts());
+ TestBrokerConfiguration brokerConfiguration = _testcase.getBrokerConfiguration(brokerPort);
+ brokerConfiguration.addJmxManagementConfiguration();
+ collectConfig(brokerPort, brokerConfiguration, _testcase.getTestVirtualhosts());
brokerPort = _testcase.getNextAvailable(bdbPort + 1);
}
@@ -127,7 +130,7 @@ public class HATestClusterCreator
*/
private String getConfigKey(String configKeySuffix)
{
- final String configKey = StringUtils.substringAfter(_storeConfigKeyPrefix + configKeySuffix, "virtualhosts.");
+ final String configKey = StringUtils.substringAfter(_vhostStoreConfigKeyPrefix + configKeySuffix, "virtualhosts.");
return configKey;
}
@@ -135,7 +138,6 @@ public class HATestClusterCreator
{
final BrokerConfigHolder brokerConfigHolder = _brokerConfigurations.get(brokerPortNumber);
- _testcase.setTestConfiguration(brokerConfigHolder.getTestConfiguration());
_testcase.setTestVirtualhosts(brokerConfigHolder.getTestVirtualhosts());
_testcase.startBroker(brokerPortNumber);
@@ -204,7 +206,7 @@ public class HATestClusterCreator
public void stopNode(final int brokerPortNumber)
{
- _testcase.stopBroker(brokerPortNumber);
+ _testcase.killBroker(brokerPortNumber);
}
public void stopCluster() throws Exception
@@ -348,12 +350,12 @@ public class HATestClusterCreator
{
final String nodeName = getNodeNameForNodeAt(bdbPort);
- _testcase.setConfigurationProperty(_storeConfigKeyPrefix + "class", "org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore");
+ _testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "class", "org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore");
- _testcase.setConfigurationProperty(_storeConfigKeyPrefix + "highAvailability.groupName", _groupName);
- _testcase.setConfigurationProperty(_storeConfigKeyPrefix + "highAvailability.nodeName", nodeName);
- _testcase.setConfigurationProperty(_storeConfigKeyPrefix + "highAvailability.nodeHostPort", getNodeHostPortForNodeAt(bdbPort));
- _testcase.setConfigurationProperty(_storeConfigKeyPrefix + "highAvailability.helperHostPort", getHelperHostPort());
+ _testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "highAvailability.groupName", _groupName);
+ _testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "highAvailability.nodeName", nodeName);
+ _testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "highAvailability.nodeHostPort", getNodeHostPortForNodeAt(bdbPort));
+ _testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "highAvailability.helperHostPort", getHelperHostPort());
}
public String getIpAddressOfBrokerHost()
@@ -369,24 +371,24 @@ public class HATestClusterCreator
}
}
- private void collectConfig(final int brokerPortNumber, XMLConfiguration testConfiguration, XMLConfiguration testVirtualhosts)
+ private void collectConfig(final int brokerPortNumber, TestBrokerConfiguration testConfiguration, XMLConfiguration testVirtualhosts)
{
- _brokerConfigurations.put(brokerPortNumber, new BrokerConfigHolder((XMLConfiguration) testConfiguration.clone(),
+ _brokerConfigurations.put(brokerPortNumber, new BrokerConfigHolder(testConfiguration,
(XMLConfiguration) testVirtualhosts.clone()));
}
public class BrokerConfigHolder
{
- private final XMLConfiguration _testConfiguration;
+ private final TestBrokerConfiguration _testConfiguration;
private final XMLConfiguration _testVirtualhosts;
- public BrokerConfigHolder(XMLConfiguration testConfiguration, XMLConfiguration testVirtualhosts)
+ public BrokerConfigHolder(TestBrokerConfiguration testConfiguration, XMLConfiguration testVirtualhosts)
{
_testConfiguration = testConfiguration;
_testVirtualhosts = testVirtualhosts;
}
- public XMLConfiguration getTestConfiguration()
+ public TestBrokerConfiguration getTestConfiguration()
{
return _testConfiguration;
}
@@ -416,7 +418,7 @@ public class HATestClusterCreator
public String getStoreConfigKeyPrefix()
{
- return _storeConfigKeyPrefix;
+ return _vhostStoreConfigKeyPrefix;
}
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java
new file mode 100644
index 0000000000..d33eb868c2
--- /dev/null
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreCreator;
+import org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore;
+import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
+import org.apache.qpid.server.store.derby.DerbyMessageStore;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class MessageStoreCreatorTest extends QpidTestCase
+{
+ private static final String[] STORE_TYPES = {MemoryMessageStore.TYPE, DerbyMessageStore.TYPE, BDBMessageStore.TYPE, BDBHAMessageStore.TYPE};
+
+ public void testMessageStoreCreator()
+ {
+ MessageStoreCreator messageStoreCreator = new MessageStoreCreator();
+ for (String type : STORE_TYPES)
+ {
+ MessageStore store = messageStoreCreator.createMessageStore(type);
+ assertNotNull("Store of type " + type + " is not created", store);
+ }
+ }
+}
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java
index cd2654f79f..b2b28b3c2d 100644
--- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java
@@ -20,7 +20,14 @@
*/
package org.apache.qpid.server.store.berkeleydb.upgrade;
+import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.NONEXCLUSIVE_WITH_ERRONEOUS_OWNER;
+import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.NON_DURABLE_QUEUE_NAME;
+import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.PRIORITY_QUEUE_NAME;
+import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_NAME;
+import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_WITH_DLQ_NAME;
+
import java.io.File;
+import java.io.InputStream;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.subjects.TestBlankSubject;
@@ -51,15 +58,15 @@ public abstract class AbstractUpgradeTestCase extends QpidTestCase
}
}
- public static final String[] QUEUE_NAMES = { "clientid:myDurSubName", "clientid:mySelectorDurSubName", "myUpgradeQueue",
- "queue-non-durable", "nonexclusive-with-erroneous-owner" };
- public static int[] QUEUE_SIZES = { 1, 1, 10, 3, 0};
- public static int TOTAL_MESSAGE_NUMBER = 15;
+ public static final String[] QUEUE_NAMES = { "clientid:myDurSubName", "clientid:mySelectorDurSubName", QUEUE_NAME, NON_DURABLE_QUEUE_NAME,
+ NONEXCLUSIVE_WITH_ERRONEOUS_OWNER, PRIORITY_QUEUE_NAME, QUEUE_WITH_DLQ_NAME, QUEUE_WITH_DLQ_NAME + "_DLQ" };
+ public static int[] QUEUE_SIZES = { 1, 1, 10, 3, 0, 0, 0, 1};
+ public static int TOTAL_MESSAGE_NUMBER = 16;
protected static final LogSubject LOG_SUBJECT = new TestBlankSubject();
- // one binding per exchange
- protected static final int TOTAL_BINDINGS = QUEUE_NAMES.length * 2;
- protected static final int TOTAL_EXCHANGES = 5;
+ // myQueueWithDLQ_DLQ is not bound to the default exchange
+ protected static final int TOTAL_BINDINGS = QUEUE_NAMES.length * 2 - 1;
+ protected static final int TOTAL_EXCHANGES = 6;
private File _storeLocation;
protected Environment _environment;
@@ -105,10 +112,24 @@ public abstract class AbstractUpgradeTestCase extends QpidTestCase
private File copyStore(String storeDirectoryName) throws Exception
{
- String src = getClass().getClassLoader().getResource("upgrade/" + storeDirectoryName).toURI().getPath();
File storeLocation = new File(new File(TMP_FOLDER), "test-store");
deleteDirectoryIfExists(storeLocation);
- FileUtils.copyRecursive(new File(src), new File(TMP_FOLDER));
+ storeLocation.mkdirs();
+ int index = 0;
+ String prefix = "0000000";
+ String extension = ".jdb";
+ InputStream is = null;
+ do
+ {
+ String fileName = prefix + index + extension;
+ is = getClass().getClassLoader().getResourceAsStream("upgrade/" + storeDirectoryName + "/test-store/" + fileName);
+ if (is != null)
+ {
+ FileUtils.copy(is, new File(storeLocation, fileName));
+ }
+ index++;
+ }
+ while (is != null);
return storeLocation;
}
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java
index 65a8bb03fb..500fb0a919 100644
--- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java
@@ -20,6 +20,13 @@
*/
package org.apache.qpid.server.store.berkeleydb.upgrade;
+import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.NONEXCLUSIVE_WITH_ERRONEOUS_OWNER;
+import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.NON_DURABLE_QUEUE_NAME;
+import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_NAME;
+import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_WITH_DLQ_NAME;
+import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SELECTOR_TOPIC_NAME;
+import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.TOPIC_NAME;
+
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@@ -34,7 +41,6 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer;
import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.BindingRecord;
import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.BindingTuple;
import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.MessageContentKey;
@@ -50,9 +56,6 @@ import com.sleepycat.je.Transaction;
public class UpgradeFrom4to5Test extends AbstractUpgradeTestCase
{
- private static final String NON_DURABLE_QUEUE = BDBStoreUpgradeTestPreparer.NON_DURABLE_QUEUE_NAME;
- private static final String DURABLE_QUEUE = BDBStoreUpgradeTestPreparer.QUEUE_NAME;
- private static final String NON_EXCLUSIVE_WITH_ERRONEOUS_OWNER = "nonexclusive-with-erroneous-owner";
private static final String DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR = "clientid:mySelectorDurSubName";
private static final String DURABLE_SUBSCRIPTION_QUEUE = "clientid:myDurSubName";
private static final String EXCHANGE_DB_NAME = "exchangeDb_v5";
@@ -85,15 +88,14 @@ public class UpgradeFrom4to5Test extends AbstractUpgradeTestCase
final List<BindingRecord> queueBindings = loadBindings();
- assertEquals("Unxpected list size", TOTAL_BINDINGS, queueBindings.size());
- assertBindingRecord(queueBindings, DURABLE_SUBSCRIPTION_QUEUE, "amq.topic", BDBStoreUpgradeTestPreparer.TOPIC_NAME, "");
- assertBindingRecord(queueBindings, DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, "amq.topic",
- BDBStoreUpgradeTestPreparer.SELECTOR_TOPIC_NAME, "testprop='true'");
- assertBindingRecord(queueBindings, DURABLE_QUEUE, "amq.direct", DURABLE_QUEUE, null);
- assertBindingRecord(queueBindings, NON_DURABLE_QUEUE, "amq.direct", NON_DURABLE_QUEUE, null);
- assertBindingRecord(queueBindings, NON_EXCLUSIVE_WITH_ERRONEOUS_OWNER, "amq.direct", NON_EXCLUSIVE_WITH_ERRONEOUS_OWNER, null);
+ assertEquals("Unxpected bindings size", TOTAL_BINDINGS, queueBindings.size());
+ assertBindingRecord(queueBindings, DURABLE_SUBSCRIPTION_QUEUE, "amq.topic", TOPIC_NAME, "");
+ assertBindingRecord(queueBindings, DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, "amq.topic", SELECTOR_TOPIC_NAME, "testprop='true'");
+ assertBindingRecord(queueBindings, QUEUE_NAME, "amq.direct", QUEUE_NAME, null);
+ assertBindingRecord(queueBindings, NON_DURABLE_QUEUE_NAME, "amq.direct", NON_DURABLE_QUEUE_NAME, null);
+ assertBindingRecord(queueBindings, NONEXCLUSIVE_WITH_ERRONEOUS_OWNER, "amq.direct", NONEXCLUSIVE_WITH_ERRONEOUS_OWNER, null);
- assertQueueHasOwner(NON_EXCLUSIVE_WITH_ERRONEOUS_OWNER, "misused-owner-as-description");
+ assertQueueHasOwner(NONEXCLUSIVE_WITH_ERRONEOUS_OWNER, "misused-owner-as-description");
assertContent();
}
@@ -102,26 +104,29 @@ public class UpgradeFrom4to5Test extends AbstractUpgradeTestCase
{
UpgradeFrom4To5 upgrade = new UpgradeFrom4To5();
upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.NO), getVirtualHostName());
- assertQueues(new HashSet<String>(Arrays.asList(DURABLE_SUBSCRIPTION_QUEUE, DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, DURABLE_QUEUE, NON_EXCLUSIVE_WITH_ERRONEOUS_OWNER)));
+ HashSet<String> queues = new HashSet<String>(Arrays.asList(QUEUE_NAMES));
+ assertTrue(NON_DURABLE_QUEUE_NAME + " should be in the list of queues" , queues.remove(NON_DURABLE_QUEUE_NAME));
+
+ assertQueues(queues);
- assertDatabaseRecordCount(DELIVERY_DB_NAME, 12);
- assertDatabaseRecordCount(MESSAGE_META_DATA_DB_NAME, 12);
+ assertDatabaseRecordCount(DELIVERY_DB_NAME, 13);
+ assertDatabaseRecordCount(MESSAGE_META_DATA_DB_NAME, 13);
assertDatabaseRecordCount(EXCHANGE_DB_NAME, TOTAL_EXCHANGES);
assertQueueMessages(DURABLE_SUBSCRIPTION_QUEUE, 1);
assertQueueMessages(DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, 1);
- assertQueueMessages(DURABLE_QUEUE, 10);
+ assertQueueMessages(QUEUE_NAME, 10);
+ assertQueueMessages(QUEUE_WITH_DLQ_NAME + "_DLQ", 1);
final List<BindingRecord> queueBindings = loadBindings();
assertEquals("Unxpected list size", TOTAL_BINDINGS - 2, queueBindings.size());
- assertBindingRecord(queueBindings, DURABLE_SUBSCRIPTION_QUEUE, "amq.topic", BDBStoreUpgradeTestPreparer.TOPIC_NAME,
- "");
+ assertBindingRecord(queueBindings, DURABLE_SUBSCRIPTION_QUEUE, "amq.topic", TOPIC_NAME, "");
assertBindingRecord(queueBindings, DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, "amq.topic",
- BDBStoreUpgradeTestPreparer.SELECTOR_TOPIC_NAME, "testprop='true'");
- assertBindingRecord(queueBindings, DURABLE_QUEUE, "amq.direct", DURABLE_QUEUE, null);
+ SELECTOR_TOPIC_NAME, "testprop='true'");
+ assertBindingRecord(queueBindings, QUEUE_NAME, "amq.direct", QUEUE_NAME, null);
- assertQueueHasOwner(NON_EXCLUSIVE_WITH_ERRONEOUS_OWNER, "misused-owner-as-description");
+ assertQueueHasOwner(NONEXCLUSIVE_WITH_ERRONEOUS_OWNER, "misused-owner-as-description");
assertContent();
}
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
index 2d2a6b20a2..c33d427868 100644
--- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.store.berkeleydb.upgrade;
+import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.PRIORITY_QUEUE_NAME;
+import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_WITH_DLQ_NAME;
import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CONFIGURED_OBJECTS_DB_NAME;
import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NEW_CONTENT_DB_NAME;
import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NEW_DELIVERY_DB_NAME;
@@ -42,6 +44,7 @@ import java.util.UUID;
import org.apache.log4j.Logger;
import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AMQQueueFactory;
@@ -50,7 +53,6 @@ import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;
import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CompoundKey;
import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CompoundKeyBinding;
import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.ConfiguredObjectBinding;
-import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.UpgradeConfiguredObjectRecord;
import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NewDataBinding;
import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NewPreparedTransaction;
import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NewPreparedTransactionBinding;
@@ -60,6 +62,7 @@ import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NewRecord
import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.OldPreparedTransaction;
import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.OldPreparedTransactionBinding;
import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.OldRecordImpl;
+import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.UpgradeConfiguredObjectRecord;
import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.UpgradeUUIDBinding;
import org.apache.qpid.server.util.MapJsonSerializer;
@@ -115,8 +118,8 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase
upgrade.performUpgrade(_environment, discardMessageInteractionHandler, getVirtualHostName());
- assertDatabaseRecordCount(NEW_METADATA_DB_NAME, 11);
- assertDatabaseRecordCount(NEW_CONTENT_DB_NAME, 11);
+ assertDatabaseRecordCount(NEW_METADATA_DB_NAME, 12);
+ assertDatabaseRecordCount(NEW_CONTENT_DB_NAME, 12);
assertConfiguredObjects();
assertQueueEntries();
@@ -264,17 +267,17 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase
private void assertDatabaseRecordCounts()
{
- assertDatabaseRecordCount(CONFIGURED_OBJECTS_DB_NAME, 12);
- assertDatabaseRecordCount(NEW_DELIVERY_DB_NAME, 12);
+ assertDatabaseRecordCount(CONFIGURED_OBJECTS_DB_NAME, 21);
+ assertDatabaseRecordCount(NEW_DELIVERY_DB_NAME, 13);
- assertDatabaseRecordCount(NEW_METADATA_DB_NAME, 12);
- assertDatabaseRecordCount(NEW_CONTENT_DB_NAME, 12);
+ assertDatabaseRecordCount(NEW_METADATA_DB_NAME, 13);
+ assertDatabaseRecordCount(NEW_CONTENT_DB_NAME, 13);
}
private void assertConfiguredObjects()
{
Map<UUID, UpgradeConfiguredObjectRecord> configuredObjects = loadConfiguredObjects();
- assertEquals("Unexpected number of configured objects", 12, configuredObjects.size());
+ assertEquals("Unexpected number of configured objects", 21, configuredObjects.size());
Set<Map<String, Object>> expected = new HashSet<Map<String, Object>>(12);
List<UUID> expectedBindingIDs = new ArrayList<UUID>();
@@ -282,8 +285,26 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase
expected.add(createExpectedQueueMap("myUpgradeQueue", Boolean.FALSE, null, null));
expected.add(createExpectedQueueMap("clientid:mySelectorDurSubName", Boolean.TRUE, "clientid", null));
expected.add(createExpectedQueueMap("clientid:myDurSubName", Boolean.TRUE, "clientid", null));
- expected.add(createExpectedQueueMap("nonexclusive-with-erroneous-owner", Boolean.FALSE, null,
- Collections.singletonMap(AMQQueueFactory.X_QPID_DESCRIPTION, "misused-owner-as-description")));
+
+ final Map<String, Object> queueWithOwnerArguments = new HashMap<String, Object>();
+ queueWithOwnerArguments.put("x-qpid-priorities", 10);
+ queueWithOwnerArguments.put(AMQQueueFactory.X_QPID_DESCRIPTION, "misused-owner-as-description");
+ expected.add(createExpectedQueueMap("nonexclusive-with-erroneous-owner", Boolean.FALSE, null,queueWithOwnerArguments));
+
+ final Map<String, Object> priorityQueueArguments = new HashMap<String, Object>();
+ priorityQueueArguments.put("x-qpid-priorities", 10);
+ expected.add(createExpectedQueueMap(PRIORITY_QUEUE_NAME, Boolean.FALSE, null, priorityQueueArguments));
+
+ final Map<String, Object> queueWithDLQArguments = new HashMap<String, Object>();
+ queueWithDLQArguments.put("x-qpid-dlq-enabled", true);
+ queueWithDLQArguments.put("x-qpid-maximum-delivery-count", 2);
+ expected.add(createExpectedQueueMap(QUEUE_WITH_DLQ_NAME, Boolean.FALSE, null, queueWithDLQArguments));
+
+ final Map<String, Object> dlqArguments = new HashMap<String, Object>();
+ dlqArguments.put("x-qpid-dlq-enabled", false);
+ dlqArguments.put("x-qpid-maximum-delivery-count", 0);
+ expected.add(createExpectedQueueMap(QUEUE_WITH_DLQ_NAME + "_DLQ", Boolean.FALSE, null, dlqArguments));
+ expected.add(createExpectedExchangeMap(QUEUE_WITH_DLQ_NAME + "_DLE", "fanout"));
expected.add(createExpectedQueueBindingMapAndID("myUpgradeQueue","myUpgradeQueue", "<<default>>", null, expectedBindingIDs));
expected.add(createExpectedQueueBindingMapAndID("myUpgradeQueue", "myUpgradeQueue", "amq.direct", null, expectedBindingIDs));
@@ -296,6 +317,13 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase
expected.add(createExpectedQueueBindingMapAndID("nonexclusive-with-erroneous-owner", "nonexclusive-with-erroneous-owner", "amq.direct", null, expectedBindingIDs));
expected.add(createExpectedQueueBindingMapAndID("nonexclusive-with-erroneous-owner","nonexclusive-with-erroneous-owner", "<<default>>", null, expectedBindingIDs));
+ expected.add(createExpectedQueueBindingMapAndID(PRIORITY_QUEUE_NAME, PRIORITY_QUEUE_NAME, "<<default>>", null, expectedBindingIDs));
+ expected.add(createExpectedQueueBindingMapAndID(PRIORITY_QUEUE_NAME, PRIORITY_QUEUE_NAME, "amq.direct", null, expectedBindingIDs));
+
+ expected.add(createExpectedQueueBindingMapAndID(QUEUE_WITH_DLQ_NAME, QUEUE_WITH_DLQ_NAME, "<<default>>", null, expectedBindingIDs));
+ expected.add(createExpectedQueueBindingMapAndID(QUEUE_WITH_DLQ_NAME, QUEUE_WITH_DLQ_NAME, "amq.direct", null, expectedBindingIDs));
+ expected.add(createExpectedQueueBindingMapAndID(QUEUE_WITH_DLQ_NAME + "_DLQ", "dlq", QUEUE_WITH_DLQ_NAME + "_DLE", null, expectedBindingIDs));
+
Set<String> expectedTypes = new HashSet<String>();
expectedTypes.add(Queue.class.getName());
expectedTypes.add(Exchange.class.getName());
@@ -305,7 +333,9 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase
{
UpgradeConfiguredObjectRecord object = entry.getValue();
Map<String, Object> deserialized = jsonSerializer.deserialize(object.getAttributes());
- assertTrue("Unexpected entry:" + object.getAttributes(), expected.remove(deserialized));
+
+ assertTrue("Unexpected entry in a store - json [" + object.getAttributes() + "], map [" + deserialized + "]",
+ expected.remove(deserialized));
String type = object.getType();
assertTrue("Unexpected type:" + type, expectedTypes.contains(type));
UUID key = entry.getKey();
@@ -350,7 +380,7 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase
return expectedQueueBinding;
}
- private Map<String, Object> createExpectedQueueMap(String name, boolean exclusiveFlag, String owner, Map<String, String> argumentMap)
+ private Map<String, Object> createExpectedQueueMap(String name, boolean exclusiveFlag, String owner, Map<String, Object> argumentMap)
{
Map<String, Object> expectedQueueEntry = new HashMap<String, Object>();
expectedQueueEntry.put(Queue.NAME, name);
@@ -363,6 +393,15 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase
return expectedQueueEntry;
}
+ private Map<String, Object> createExpectedExchangeMap(String name, String type)
+ {
+ Map<String, Object> expectedExchnageEntry = new HashMap<String, Object>();
+ expectedExchnageEntry.put(Exchange.NAME, name);
+ expectedExchnageEntry.put(Exchange.TYPE, type);
+ expectedExchnageEntry.put(Exchange.LIFETIME_POLICY, LifetimePolicy.PERMANENT.name());
+ return expectedExchnageEntry;
+ }
+
private Map<UUID, UpgradeConfiguredObjectRecord> loadConfiguredObjects()
{
final Map<UUID, UpgradeConfiguredObjectRecord> configuredObjectsRecords = new HashMap<UUID, UpgradeConfiguredObjectRecord>();
diff --git a/java/bdbstore/src/test/resources/upgrade/bdbstore-v4/test-store/00000000.jdb b/java/bdbstore/src/test/resources/upgrade/bdbstore-v4/test-store/00000000.jdb
index f5ed9aa5a2..cfc1f05d28 100644
--- a/java/bdbstore/src/test/resources/upgrade/bdbstore-v4/test-store/00000000.jdb
+++ b/java/bdbstore/src/test/resources/upgrade/bdbstore-v4/test-store/00000000.jdb
Binary files differ
diff --git a/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000000.jdb b/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000000.jdb
index f5ed9aa5a2..cfc1f05d28 100644
--- a/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000000.jdb
+++ b/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000000.jdb
Binary files differ
diff --git a/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000001.jdb b/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000001.jdb
index d5ae8c1096..4b45ff61e6 100644
--- a/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000001.jdb
+++ b/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000001.jdb
Binary files differ