summaryrefslogtreecommitdiff
path: root/java/bdbstore/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'java/bdbstore/src/test')
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncodingTest.java88
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java470
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java232
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java540
-rw-r--r--java/bdbstore/src/test/resources/upgrade/bdbstore-to-upgrade/test-store/00000000.jdbbin0 -> 1330321 bytes
5 files changed, 1330 insertions, 0 deletions
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncodingTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncodingTest.java
new file mode 100644
index 0000000000..d076babc61
--- /dev/null
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncodingTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.qpid.framing.AMQShortString;
+
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+import junit.framework.TestCase;
+
+/**
+ * Tests for {@code AMQShortStringEncoding} including corner cases when string
+ * is null or over 127 characters in length
+ */
+public class AMQShortStringEncodingTest extends TestCase
+{
+
+ public void testWriteReadNullValues()
+ {
+ // write into tuple output
+ TupleOutput tupleOutput = new TupleOutput();
+ AMQShortStringEncoding.writeShortString(null, tupleOutput);
+ byte[] data = tupleOutput.getBufferBytes();
+
+ // read from tuple input
+ TupleInput tupleInput = new TupleInput(data);
+ AMQShortString result = AMQShortStringEncoding.readShortString(tupleInput);
+ assertNull("Expected null but got " + result, result);
+ }
+
+ public void testWriteReadShortStringWithLengthOver127()
+ {
+ AMQShortString value = createString('a', 128);
+
+ // write into tuple output
+ TupleOutput tupleOutput = new TupleOutput();
+ AMQShortStringEncoding.writeShortString(value, tupleOutput);
+ byte[] data = tupleOutput.getBufferBytes();
+
+ // read from tuple input
+ TupleInput tupleInput = new TupleInput(data);
+ AMQShortString result = AMQShortStringEncoding.readShortString(tupleInput);
+ assertEquals("Expected " + value + " but got " + result, value, result);
+ }
+
+ public void testWriteReadShortStringWithLengthLess127()
+ {
+ AMQShortString value = new AMQShortString("test");
+
+ // write into tuple output
+ TupleOutput tupleOutput = new TupleOutput();
+ AMQShortStringEncoding.writeShortString(value, tupleOutput);
+ byte[] data = tupleOutput.getBufferBytes();
+
+ // read from tuple input
+ TupleInput tupleInput = new TupleInput(data);
+ AMQShortString result = AMQShortStringEncoding.readShortString(tupleInput);
+ assertEquals("Expected " + value + " but got " + result, value, result);
+ }
+
+ private AMQShortString createString(char ch, int length)
+ {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < length; i++)
+ {
+ sb.append(ch);
+ }
+ return new AMQShortString(sb.toString());
+ }
+
+}
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
new file mode 100644
index 0000000000..ef31b78cfe
--- /dev/null
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -0,0 +1,470 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.server.message.MessageMetaData_0_10;
+import org.apache.qpid.server.store.MessageMetaDataType;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.MessageDeliveryMode;
+import org.apache.qpid.transport.MessageDeliveryPriority;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.MessageTransfer;
+
+/**
+ * Subclass of MessageStoreTest which runs the standard tests from the superclass against
+ * the BDB Store as well as additional tests specific to the DBB store-implementation.
+ */
+public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageStoreTest
+{
+ /**
+ * Tests that message metadata and content are successfully read back from a
+ * store after it has been reloaded. Both 0-8 and 0-10 metadata is used to
+ * verify their ability to co-exist within the store and be successful retrieved.
+ */
+ public void testBDBMessagePersistence() throws Exception
+ {
+ MessageStore store = getVirtualHost().getMessageStore();
+
+ BDBMessageStore bdbStore = assertBDBStore(store);
+
+ // Create content ByteBuffers.
+ // Split the content into 2 chunks for the 0-8 message, as per broker behaviour.
+ // Use a single chunk for the 0-10 message as per broker behaviour.
+ String bodyText = "jfhdjsflsdhfjdshfjdslhfjdslhfsjlhfsjkhfdsjkhfdsjkfhdslkjf";
+
+ ByteBuffer firstContentBytes_0_8 = ByteBuffer.wrap(bodyText.substring(0, 10).getBytes());
+ ByteBuffer secondContentBytes_0_8 = ByteBuffer.wrap(bodyText.substring(10).getBytes());
+
+ ByteBuffer completeContentBody_0_10 = ByteBuffer.wrap(bodyText.getBytes());
+ int bodySize = completeContentBody_0_10.limit();
+
+ /*
+ * Create and insert a 0-8 message (metadata and multi-chunk content)
+ */
+ MessagePublishInfo pubInfoBody_0_8 = createPublishInfoBody_0_8();
+ BasicContentHeaderProperties props_0_8 = createContentHeaderProperties_0_8();
+
+ ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize);
+
+ MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8, 0);
+ StoredMessage<MessageMetaData> storedMessage_0_8 = bdbStore.addMessage(messageMetaData_0_8);
+
+ long origArrivalTime_0_8 = messageMetaData_0_8.getArrivalTime();
+ long messageid_0_8 = storedMessage_0_8.getMessageNumber();
+
+ storedMessage_0_8.addContent(0, firstContentBytes_0_8);
+ storedMessage_0_8.addContent(firstContentBytes_0_8.limit(), secondContentBytes_0_8);
+ storedMessage_0_8.flushToStore();
+
+ /*
+ * Create and insert a 0-10 message (metadata and content)
+ */
+ MessageProperties msgProps_0_10 = createMessageProperties_0_10(bodySize);
+ DeliveryProperties delProps_0_10 = createDeliveryProperties_0_10();
+ Header header_0_10 = new Header(msgProps_0_10, delProps_0_10);
+
+ MessageTransfer xfr_0_10 = new MessageTransfer("destination", MessageAcceptMode.EXPLICIT,
+ MessageAcquireMode.PRE_ACQUIRED, header_0_10, completeContentBody_0_10);
+
+ MessageMetaData_0_10 messageMetaData_0_10 = new MessageMetaData_0_10(xfr_0_10);
+ StoredMessage<MessageMetaData_0_10> storedMessage_0_10 = bdbStore.addMessage(messageMetaData_0_10);
+
+ long origArrivalTime_0_10 = messageMetaData_0_10.getArrivalTime();
+ long messageid_0_10 = storedMessage_0_10.getMessageNumber();
+
+ storedMessage_0_10.addContent(0, completeContentBody_0_10);
+ storedMessage_0_10.flushToStore();
+
+ /*
+ * reload the store only (read-only)
+ */
+ bdbStore = reloadStoreReadOnly(bdbStore);
+
+ /*
+ * Read back and validate the 0-8 message metadata and content
+ */
+ StorableMessageMetaData storeableMMD_0_8 = bdbStore.getMessageMetaData(messageid_0_8);
+
+ assertEquals("Unexpected message type",MessageMetaDataType.META_DATA_0_8, storeableMMD_0_8.getType());
+ assertTrue("Unexpected instance type", storeableMMD_0_8 instanceof MessageMetaData);
+ MessageMetaData returnedMMD_0_8 = (MessageMetaData) storeableMMD_0_8;
+
+ assertEquals("Message arrival time has changed", origArrivalTime_0_8, returnedMMD_0_8.getArrivalTime());
+
+ MessagePublishInfo returnedPubBody_0_8 = returnedMMD_0_8.getMessagePublishInfo();
+ assertEquals("Message exchange has changed", pubInfoBody_0_8.getExchange(), returnedPubBody_0_8.getExchange());
+ assertEquals("Immediate flag has changed", pubInfoBody_0_8.isImmediate(), returnedPubBody_0_8.isImmediate());
+ assertEquals("Mandatory flag has changed", pubInfoBody_0_8.isMandatory(), returnedPubBody_0_8.isMandatory());
+ assertEquals("Routing key has changed", pubInfoBody_0_8.getRoutingKey(), returnedPubBody_0_8.getRoutingKey());
+
+ ContentHeaderBody returnedHeaderBody_0_8 = returnedMMD_0_8.getContentHeaderBody();
+ assertEquals("ContentHeader ClassID has changed", chb_0_8.classId, returnedHeaderBody_0_8.classId);
+ assertEquals("ContentHeader weight has changed", chb_0_8.weight, returnedHeaderBody_0_8.weight);
+ assertEquals("ContentHeader bodySize has changed", chb_0_8.bodySize, returnedHeaderBody_0_8.bodySize);
+
+ BasicContentHeaderProperties returnedProperties_0_8 = (BasicContentHeaderProperties) returnedHeaderBody_0_8.getProperties();
+ assertEquals("Property ContentType has changed", props_0_8.getContentTypeAsString(), returnedProperties_0_8.getContentTypeAsString());
+ assertEquals("Property MessageID has changed", props_0_8.getMessageIdAsString(), returnedProperties_0_8.getMessageIdAsString());
+
+ ByteBuffer recoveredContent_0_8 = ByteBuffer.allocate((int) chb_0_8.bodySize) ;
+ long recoveredCount_0_8 = bdbStore.getContent(messageid_0_8, 0, recoveredContent_0_8);
+ assertEquals("Incorrect amount of payload data recovered", chb_0_8.bodySize, recoveredCount_0_8);
+ String returnedPayloadString_0_8 = new String(recoveredContent_0_8.array());
+ assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_8);
+
+ /*
+ * Read back and validate the 0-10 message metadata and content
+ */
+ StorableMessageMetaData storeableMMD_0_10 = bdbStore.getMessageMetaData(messageid_0_10);
+
+ assertEquals("Unexpected message type",MessageMetaDataType.META_DATA_0_10, storeableMMD_0_10.getType());
+ assertTrue("Unexpected instance type", storeableMMD_0_10 instanceof MessageMetaData_0_10);
+ MessageMetaData_0_10 returnedMMD_0_10 = (MessageMetaData_0_10) storeableMMD_0_10;
+
+ assertEquals("Message arrival time has changed", origArrivalTime_0_10, returnedMMD_0_10.getArrivalTime());
+
+ DeliveryProperties returnedDelProps_0_10 = returnedMMD_0_10.getHeader().get(DeliveryProperties.class);
+ assertNotNull("DeliveryProperties were not returned", returnedDelProps_0_10);
+ assertEquals("Immediate flag has changed", delProps_0_10.getImmediate(), returnedDelProps_0_10.getImmediate());
+ assertEquals("Routing key has changed", delProps_0_10.getRoutingKey(), returnedDelProps_0_10.getRoutingKey());
+ assertEquals("Message exchange has changed", delProps_0_10.getExchange(), returnedDelProps_0_10.getExchange());
+ assertEquals("Message expiration has changed", delProps_0_10.getExpiration(), returnedDelProps_0_10.getExpiration());
+ assertEquals("Message delivery priority has changed", delProps_0_10.getPriority(), returnedDelProps_0_10.getPriority());
+
+ MessageProperties returnedMsgProps = returnedMMD_0_10.getHeader().get(MessageProperties.class);
+ assertNotNull("MessageProperties were not returned", returnedMsgProps);
+ assertTrue("Message correlationID has changed", Arrays.equals(msgProps_0_10.getCorrelationId(), returnedMsgProps.getCorrelationId()));
+ assertEquals("Message content length has changed", msgProps_0_10.getContentLength(), returnedMsgProps.getContentLength());
+ assertEquals("Message content type has changed", msgProps_0_10.getContentType(), returnedMsgProps.getContentType());
+
+ ByteBuffer recoveredContent = ByteBuffer.allocate((int) msgProps_0_10.getContentLength()) ;
+ long recoveredCount = bdbStore.getContent(messageid_0_10, 0, recoveredContent);
+ assertEquals("Incorrect amount of payload data recovered", msgProps_0_10.getContentLength(), recoveredCount);
+
+ String returnedPayloadString_0_10 = new String(recoveredContent.array());
+ assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_10);
+ }
+
+ private DeliveryProperties createDeliveryProperties_0_10()
+ {
+ DeliveryProperties delProps_0_10 = new DeliveryProperties();
+
+ delProps_0_10.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
+ delProps_0_10.setImmediate(true);
+ delProps_0_10.setExchange("exchange12345");
+ delProps_0_10.setRoutingKey("routingKey12345");
+ delProps_0_10.setExpiration(5);
+ delProps_0_10.setPriority(MessageDeliveryPriority.ABOVE_AVERAGE);
+
+ return delProps_0_10;
+ }
+
+ private MessageProperties createMessageProperties_0_10(int bodySize)
+ {
+ MessageProperties msgProps_0_10 = new MessageProperties();
+ msgProps_0_10.setContentLength(bodySize);
+ msgProps_0_10.setCorrelationId("qwerty".getBytes());
+ msgProps_0_10.setContentType("text/html");
+
+ return msgProps_0_10;
+ }
+
+ /**
+ * Close the provided store and create a new (read-only) store to read back the data.
+ *
+ * Use this method instead of reloading the virtual host like other tests in order
+ * to avoid the recovery handler deleting the message for not being on a queue.
+ */
+ private BDBMessageStore reloadStoreReadOnly(BDBMessageStore messageStore) throws Exception
+ {
+ messageStore.close();
+ File storePath = new File(String.valueOf(_config.getProperty("store.environment-path")));
+
+ BDBMessageStore newStore = new BDBMessageStore();
+ newStore.configure(storePath, false);
+ newStore.start();
+
+ return newStore;
+ }
+
+ private MessagePublishInfo createPublishInfoBody_0_8()
+ {
+ return new MessagePublishInfo()
+ {
+ public AMQShortString getExchange()
+ {
+ return new AMQShortString("exchange12345");
+ }
+
+ public void setExchange(AMQShortString exchange)
+ {
+ }
+
+ public boolean isImmediate()
+ {
+ return false;
+ }
+
+ public boolean isMandatory()
+ {
+ return true;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return new AMQShortString("routingKey12345");
+ }
+ };
+
+ }
+
+ private ContentHeaderBody createContentHeaderBody_0_8(BasicContentHeaderProperties props, int length)
+ {
+ MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+ int classForBasic = methodRegistry.createBasicQosOkBody().getClazz();
+ return new ContentHeaderBody(classForBasic, 1, props, length);
+ }
+
+ private BasicContentHeaderProperties createContentHeaderProperties_0_8()
+ {
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+ props.setDeliveryMode(Integer.valueOf(BasicContentHeaderProperties.PERSISTENT).byteValue());
+ props.setContentType("text/html");
+ props.getHeaders().setString("Test", "MST");
+ return props;
+ }
+
+ /**
+ * Tests that messages which are added to the store and then removed using the
+ * public MessageStore interfaces are actually removed from the store by then
+ * interrogating the store with its own implementation methods and verifying
+ * expected exceptions are thrown to indicate the message is not present.
+ */
+ public void testMessageCreationAndRemoval() throws Exception
+ {
+ MessageStore store = getVirtualHost().getMessageStore();
+ BDBMessageStore bdbStore = assertBDBStore(store);
+
+ StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreMultiChunkMessage_0_8(store);
+ long messageid_0_8 = storedMessage_0_8.getMessageNumber();
+
+ //remove the message in the fashion the broker normally would
+ storedMessage_0_8.remove();
+
+ //verify the removal using the BDB store implementation methods directly
+ try
+ {
+ // the next line should throw since the message id should not be found
+ bdbStore.getMessageMetaData(messageid_0_8);
+ fail("No exception thrown when message id not found getting metadata");
+ }
+ catch (AMQStoreException e)
+ {
+ // pass since exception expected
+ }
+
+ //expecting no content, allocate a 1 byte
+ ByteBuffer dst = ByteBuffer.allocate(1);
+
+ assertEquals("Retrieved content when none was expected",
+ 0, bdbStore.getContent(messageid_0_8, 0, dst));
+ }
+
+ private BDBMessageStore assertBDBStore(Object store)
+ {
+ if(!(store instanceof BDBMessageStore))
+ {
+ fail("Test requires an instance of BDBMessageStore to proceed");
+ }
+
+ return (BDBMessageStore) store;
+ }
+
+ private StoredMessage<MessageMetaData> createAndStoreMultiChunkMessage_0_8(MessageStore store)
+ {
+ byte[] body10Bytes = "0123456789".getBytes();
+ byte[] body5Bytes = "01234".getBytes();
+
+ ByteBuffer chunk1 = ByteBuffer.wrap(body10Bytes);
+ ByteBuffer chunk2 = ByteBuffer.wrap(body5Bytes);
+
+ int bodySize = body10Bytes.length + body5Bytes.length;
+
+ //create and store the message using the MessageStore interface
+ MessagePublishInfo pubInfoBody_0_8 = createPublishInfoBody_0_8();
+ BasicContentHeaderProperties props_0_8 = createContentHeaderProperties_0_8();
+
+ ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize);
+
+ MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8, 0);
+ StoredMessage<MessageMetaData> storedMessage_0_8 = store.addMessage(messageMetaData_0_8);
+
+ storedMessage_0_8.addContent(0, chunk1);
+ storedMessage_0_8.addContent(chunk1.limit(), chunk2);
+ storedMessage_0_8.flushToStore();
+
+ return storedMessage_0_8;
+ }
+
+ /**
+ * Tests transaction commit by utilising the enqueue and dequeue methods available
+ * in the TransactionLog interface implemented by the store, and verifying the
+ * behaviour using BDB implementation methods.
+ */
+ public void testTranCommit() throws Exception
+ {
+ TransactionLog log = getVirtualHost().getTransactionLog();
+
+ BDBMessageStore bdbStore = assertBDBStore(log);
+
+ final AMQShortString mockQueueName = new AMQShortString("queueName");
+
+ TransactionLogResource mockQueue = new TransactionLogResource()
+ {
+ public String getResourceName()
+ {
+ return mockQueueName.asString();
+ }
+ };
+
+ TransactionLog.Transaction txn = log.newTransaction();
+
+ txn.enqueueMessage(mockQueue, 1L);
+ txn.enqueueMessage(mockQueue, 5L);
+ txn.commitTran();
+
+ List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName);
+
+ assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
+ Long val = enqueuedIds.get(0);
+ assertEquals("First Message is incorrect", 1L, val.longValue());
+ val = enqueuedIds.get(1);
+ assertEquals("Second Message is incorrect", 5L, val.longValue());
+ }
+
+
+ /**
+ * Tests transaction rollback before a commit has occurred by utilising the
+ * enqueue and dequeue methods available in the TransactionLog interface
+ * implemented by the store, and verifying the behaviour using BDB
+ * implementation methods.
+ */
+ public void testTranRollbackBeforeCommit() throws Exception
+ {
+ TransactionLog log = getVirtualHost().getTransactionLog();
+
+ BDBMessageStore bdbStore = assertBDBStore(log);
+
+ final AMQShortString mockQueueName = new AMQShortString("queueName");
+
+ TransactionLogResource mockQueue = new TransactionLogResource()
+ {
+ public String getResourceName()
+ {
+ return mockQueueName.asString();
+ }
+ };
+
+ TransactionLog.Transaction txn = log.newTransaction();
+
+ txn.enqueueMessage(mockQueue, 21L);
+ txn.abortTran();
+
+ txn = log.newTransaction();
+ txn.enqueueMessage(mockQueue, 22L);
+ txn.enqueueMessage(mockQueue, 23L);
+ txn.commitTran();
+
+ List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName);
+
+ assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
+ Long val = enqueuedIds.get(0);
+ assertEquals("First Message is incorrect", 22L, val.longValue());
+ val = enqueuedIds.get(1);
+ assertEquals("Second Message is incorrect", 23L, val.longValue());
+ }
+
+ /**
+ * Tests transaction rollback after a commit has occurred by utilising the
+ * enqueue and dequeue methods available in the TransactionLog interface
+ * implemented by the store, and verifying the behaviour using BDB
+ * implementation methods.
+ */
+ public void testTranRollbackAfterCommit() throws Exception
+ {
+ TransactionLog log = getVirtualHost().getTransactionLog();
+
+ BDBMessageStore bdbStore = assertBDBStore(log);
+
+ final AMQShortString mockQueueName = new AMQShortString("queueName");
+
+ TransactionLogResource mockQueue = new TransactionLogResource()
+ {
+ public String getResourceName()
+ {
+ return mockQueueName.asString();
+ }
+ };
+
+ TransactionLog.Transaction txn = log.newTransaction();
+
+ txn.enqueueMessage(mockQueue, 30L);
+ txn.commitTran();
+
+ txn = log.newTransaction();
+ txn.enqueueMessage(mockQueue, 31L);
+ txn.abortTran();
+
+ txn = log.newTransaction();
+ txn.enqueueMessage(mockQueue, 32L);
+ txn.commitTran();
+
+ List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName);
+
+ assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
+ Long val = enqueuedIds.get(0);
+ assertEquals("First Message is incorrect", 30L, val.longValue());
+ val = enqueuedIds.get(1);
+ assertEquals("Second Message is incorrect", 32L, val.longValue());
+ }
+
+}
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java
new file mode 100644
index 0000000000..cc19bcf5d8
--- /dev/null
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java
@@ -0,0 +1,232 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.url.URLSyntaxException;
+
+/**
+ * Prepares an older version brokers BDB store with the required
+ * contents for use in the BDBStoreUpgradeTest.
+ *
+ * The store will then be used to verify that the upgraded is
+ * completed properly and that once upgraded it functions as
+ * expected with the new broker.
+ */
+public class BDBStoreUpgradeTestPreparer extends TestCase
+{
+ public static final String TOPIC_NAME="myUpgradeTopic";
+ public static final String SUB_NAME="myDurSubName";
+ public static final String QUEUE_NAME="myUpgradeQueue";
+
+ private static AMQConnectionFactory _connFac;
+ private static final String CONN_URL =
+ "amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672'";
+
+ /**
+ * Create a BDBStoreUpgradeTestPreparer instance
+ */
+ public BDBStoreUpgradeTestPreparer () throws URLSyntaxException
+ {
+ _connFac = new AMQConnectionFactory(CONN_URL);
+ }
+
+ /**
+ * Utility test method to allow running the preparation tool
+ * using the test framework
+ */
+ public void testPrepareBroker() throws Exception
+ {
+ prepareBroker();
+ }
+
+ private void prepareBroker() throws Exception
+ {
+ prepareQueues();
+ prepareDurableSubscription();
+ }
+
+ /**
+ * Prepare a queue for use in testing message and binding recovery
+ * after the upgrade is performed.
+ *
+ * - Create a transacted session on the connection.
+ * - Use a consumer to create the (durable by default) queue.
+ * - Send 5 large messages to test (multi-frame) content recovery.
+ * - Send 1 small message to test (single-frame) content recovery.
+ * - Commit the session.
+ * - Send 5 small messages to test that uncommitted messages are not recovered.
+ * following the upgrade.
+ * - Close the session.
+ */
+ private void prepareQueues() throws Exception
+ {
+ // Create a connection
+ Connection connection = _connFac.createConnection();
+ connection.start();
+ connection.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException e)
+ {
+ e.printStackTrace();
+ }
+ });
+ // Create a session on the connection, transacted to confirm delivery
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue(QUEUE_NAME);
+ // Create a consumer to ensure the queue gets created
+ // (and enter it into the store, as queues are made durable by default)
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ messageConsumer.close();
+
+ // Create a Message producer
+ MessageProducer messageProducer = session.createProducer(queue);
+
+ // Publish 5 persistent messages, 256k chars to ensure they are multi-frame
+ sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 256*1024, 5);
+ // Publish 5 persistent messages, 1k chars to ensure they are single-frame
+ sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 1*1024, 5);
+
+ session.commit();
+
+ // Publish 5 persistent messages which will NOT be committed and so should be 'lost'
+ sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 1*1024, 5);
+
+ session.close();
+ connection.close();
+ }
+
+ /**
+ * Prepare a DurableSubscription backing queue for use in testing selector
+ * recovery and queue exclusivity marking during the upgrade process.
+ *
+ * - Create a transacted session on the connection.
+ * - Open and close a DurableSubscription with selector to create the backing queue.
+ * - Send a message which matches the selector.
+ * - Send a message which does not match the selector.
+ * - Send a message which matches the selector but will remain uncommitted.
+ * - Close the session.
+ */
+ private void prepareDurableSubscription() throws Exception
+ {
+
+ // Create a connection
+ TopicConnection connection = _connFac.createTopicConnection();
+ connection.start();
+ connection.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException e)
+ {
+ e.printStackTrace();
+ }
+ });
+ // Create a session on the connection, transacted to confirm delivery
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Topic topic = session.createTopic(TOPIC_NAME);
+
+ // Create and register a durable subscriber with selector and then close it
+ TopicSubscriber durSub1 = session.createDurableSubscriber(topic, SUB_NAME,"testprop='true'", false);
+ durSub1.close();
+
+ // Create a publisher and send a persistent message which matches the selector
+ // followed by one that does not match, and another which matches but is not
+ // committed and so should be 'lost'
+ TopicSession pubSession = connection.createTopicSession(true, Session.SESSION_TRANSACTED);
+ TopicPublisher publisher = pubSession.createPublisher(topic);
+
+ publishMessages(session, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "true");
+ publishMessages(session, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "false");
+ pubSession.commit();
+ publishMessages(session, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "true");
+
+ publisher.close();
+ pubSession.close();
+
+ }
+
+ public static void sendMessages(Session session, MessageProducer messageProducer,
+ Destination dest, int deliveryMode, int length, int numMesages) throws JMSException
+ {
+ for (int i = 1; i <= numMesages; i++)
+ {
+ Message message = session.createTextMessage(generateString(length));
+ message.setIntProperty("ID", i);
+ messageProducer.send(message, deliveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+ }
+ }
+
+ public static void publishMessages(Session session, TopicPublisher publisher,
+ Destination dest, int deliveryMode, int length, int numMesages, String selectorProperty) throws JMSException
+ {
+ for (int i = 1; i <= numMesages; i++)
+ {
+ Message message = session.createTextMessage(generateString(length));
+ message.setIntProperty("ID", i);
+ message.setStringProperty("testprop", selectorProperty);
+ publisher.publish(message, deliveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+ }
+ }
+
+ /**
+ * Generates a string of a given length consisting of the sequence 0,1,2,..,9,0,1,2.
+ *
+ * @param length number of characters in the string
+ * @return string sequence of the given length
+ */
+ public static String generateString(int length)
+ {
+ char[] base_chars = new char[]{'0','1','2','3','4','5','6','7','8','9'};
+ char[] chars = new char[length];
+ for (int i = 0; i < (length); i++)
+ {
+ chars[i] = base_chars[i % 10];
+ }
+ return new String(chars);
+ }
+
+ /**
+ * Run the preparation tool.
+ * @param args Command line arguments.
+ */
+ public static void main(String[] args) throws Exception
+ {
+ BDBStoreUpgradeTestPreparer producer = new BDBStoreUpgradeTestPreparer();
+ producer.prepareBroker();
+ }
+} \ No newline at end of file
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
new file mode 100644
index 0000000000..4861e007af
--- /dev/null
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
@@ -0,0 +1,540 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_NAME;
+import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SUB_NAME;
+import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.TOPIC_NAME;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.management.common.mbeans.ManagedQueue;
+import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_4;
+import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTupleBindingFactory;
+import org.apache.qpid.server.store.berkeleydb.tuples.MessageMetaDataTupleBindingFactory;
+import org.apache.qpid.test.utils.JMXTestUtils;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.je.DatabaseEntry;
+
+/**
+ * Tests upgrading a BDB store and using it with the new broker
+ * after the required contents are entered into the store using
+ * an old broker with the BDBStoreUpgradeTestPreparer. The store
+ * will then be used to verify that the upgraded is completed
+ * properly and that once upgraded it functions as expected with
+ * the new broker.
+ */
+public class BDBUpgradeTest extends QpidBrokerTestCase
+{
+ protected static final Logger _logger = LoggerFactory.getLogger(BDBUpgradeTest.class);
+
+ private static final String STRING_1024 = BDBStoreUpgradeTestPreparer.generateString(1024);
+ private static final String STRING_1024_256 = BDBStoreUpgradeTestPreparer.generateString(1024*256);
+ private static final String QPID_WORK_ORIG = System.getProperty("QPID_WORK");
+ private static final String QPID_HOME = System.getProperty("QPID_HOME");
+ private static final int VERSION_4 = 4;
+
+ private String _fromDir;
+ private String _toDir;
+ private String _toDirTwice;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ assertNotNull("QPID_WORK must be set", QPID_WORK_ORIG);
+ assertNotNull("QPID_HOME must be set", QPID_HOME);
+
+ if(! isExternalBroker())
+ {
+ //override QPID_WORK to add the InVM port used so the store
+ //output from the upgrade tool can be found by the broker
+ setSystemProperty("QPID_WORK", QPID_WORK_ORIG + "/" + getPort());
+ }
+
+ _fromDir = QPID_HOME + "/bdbstore-to-upgrade/test-store";
+ _toDir = getWorkDirBaseDir() + "/bdbstore/test-store";
+ _toDirTwice = getWorkDirBaseDir() + "/bdbstore-upgraded-twice";
+
+ //Clear the two target directories if they exist.
+ File directory = new File(_toDir);
+ if (directory.exists() && directory.isDirectory())
+ {
+ FileUtils.delete(directory, true);
+ }
+ directory = new File(_toDirTwice);
+ if (directory.exists() && directory.isDirectory())
+ {
+ FileUtils.delete(directory, true);
+ }
+
+ //Upgrade the test store.
+ upgradeBrokerStore(_fromDir, _toDir);
+
+ //override the broker config used and then start the broker with the updated store
+ _configFile = new File("build/etc/config-systests-bdb.xml");
+ setConfigurationProperty("management.enabled", "true");
+
+ super.setUp();
+ }
+
+ private String getWorkDirBaseDir()
+ {
+ return QPID_WORK_ORIG + (isInternalBroker() ? "" : "/" + getPort());
+ }
+
+ /**
+ * Tests that the core upgrade method of the store upgrade tool passes through the exception
+ * from the BDBMessageStore indicating that the data on disk can't be loaded as the previous
+ * version because it has already been upgraded.
+ * @throws Exception
+ */
+ public void testMultipleUpgrades() throws Exception
+ {
+ //stop the broker started by setUp() in order to allow the second upgrade attempt to proceed
+ stopBroker();
+
+ try
+ {
+ new BDBStoreUpgrade(_toDir, _toDirTwice, null, false, true).upgradeFromVersion(VERSION_4);
+ fail("Second Upgrade Succeeded");
+ }
+ catch (Exception e)
+ {
+ System.err.println("Showing stack trace, we are expecting an 'Unable to load BDBStore' error");
+ e.printStackTrace();
+ assertTrue("Incorrect Exception Thrown:" + e.getMessage(),
+ e.getMessage().contains("Unable to load BDBStore as version 4. Store on disk contains version 5 data"));
+ }
+ }
+
+ /**
+ * Test that the selector applied to the DurableSubscription was successfully
+ * transfered to the new store, and functions as expected with continued use
+ * by monitoring message count while sending new messages to the topic.
+ */
+ public void testSelectorDurability() throws Exception
+ {
+ JMXTestUtils jmxUtils = null;
+ try
+ {
+ jmxUtils = new JMXTestUtils(this, "guest", "guest");
+ jmxUtils.open();
+ }
+ catch (Exception e)
+ {
+ fail("Unable to establish JMX connection, test cannot proceed");
+ }
+
+ try
+ {
+ ManagedQueue dursubQueue = jmxUtils.getManagedQueue("clientid" + ":" + SUB_NAME);
+ assertEquals("DurableSubscription backing queue should have 1 message on it initially",
+ new Integer(1), dursubQueue.getMessageCount());
+
+ // Create a connection and start it
+ TopicConnection connection = (TopicConnection) getConnection();
+ connection.start();
+
+ // Send messages which don't match and do match the selector, checking message count
+ TopicSession pubSession = connection.createTopicSession(true, org.apache.qpid.jms.Session.SESSION_TRANSACTED);
+ Topic topic = pubSession.createTopic(TOPIC_NAME);
+ TopicPublisher publisher = pubSession.createPublisher(topic);
+
+ BDBStoreUpgradeTestPreparer.publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "false");
+ pubSession.commit();
+ assertEquals("DurableSubscription backing queue should still have 1 message on it",
+ new Integer(1), dursubQueue.getMessageCount());
+
+ BDBStoreUpgradeTestPreparer.publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "true");
+ pubSession.commit();
+ assertEquals("DurableSubscription backing queue should now have 2 messages on it",
+ new Integer(2), dursubQueue.getMessageCount());
+
+ dursubQueue.clearQueue();
+ pubSession.close();
+ }
+ finally
+ {
+ jmxUtils.close();
+ }
+ }
+
+ /**
+ * Test that the backing queue for the durable subscription created was successfully
+ * detected and set as being exclusive during the upgrade process, and that the
+ * regular queue was not.
+ */
+ public void testQueueExclusivity() throws Exception
+ {
+ JMXTestUtils jmxUtils = null;
+ try
+ {
+ jmxUtils = new JMXTestUtils(this, "guest", "guest");
+ jmxUtils.open();
+ }
+ catch (Exception e)
+ {
+ fail("Unable to establish JMX connection, test cannot proceed");
+ }
+
+ try
+ {
+ ManagedQueue queue = jmxUtils.getManagedQueue(QUEUE_NAME);
+ assertFalse("Queue should not have been marked as Exclusive during upgrade", queue.isExclusive());
+
+ ManagedQueue dursubQueue = jmxUtils.getManagedQueue("clientid" + ":" + SUB_NAME);
+ assertTrue("DurableSubscription backing queue should have been marked as Exclusive during upgrade", dursubQueue.isExclusive());
+ }
+ finally
+ {
+ jmxUtils.close();
+ }
+ }
+
+ /**
+ * Test that the upgraded queue continues to function properly when used
+ * for persistent messaging and restarting the broker.
+ *
+ * Sends the new messages to the queue BEFORE consuming those which were
+ * sent before the upgrade. In doing so, this also serves to test that
+ * the queue bindings were successfully transitioned during the upgrade.
+ */
+ public void testBindingAndMessageDurabability() throws Exception
+ {
+ // Create a connection and start it
+ TopicConnection connection = (TopicConnection) getConnection();
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(QUEUE_NAME);
+ MessageProducer messageProducer = session.createProducer(queue);
+
+ // Send a new message
+ BDBStoreUpgradeTestPreparer.sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 256*1024, 1);
+
+ session.close();
+
+ // Restart the broker
+ restartBroker();
+
+ // Drain the queue of all messages
+ connection = (TopicConnection) getConnection();
+ connection.start();
+ consumeQueueMessages(connection, true);
+ }
+
+ /**
+ * Test that all of the committed persistent messages previously sent to
+ * the broker are properly received following update of the MetaData and
+ * Content entries during the store upgrade process.
+ */
+ public void testConsumptionOfUpgradedMessages() throws Exception
+ {
+ // Create a connection and start it
+ Connection connection = getConnection();
+ connection.start();
+
+ consumeDurableSubscriptionMessages(connection);
+ consumeQueueMessages(connection, false);
+ }
+
+ /**
+ * Tests store migration containing messages for non-existing queue.
+ *
+ * @throws Exception
+ */
+ public void testMigrationOfMessagesForNonExistingQueues() throws Exception
+ {
+ stopBroker();
+
+ // copy store data into a new location for adding of phantom message
+ File storeLocation = new File(_fromDir);
+ File target = new File(_toDirTwice);
+ if (!target.exists())
+ {
+ target.mkdirs();
+ }
+ FileUtils.copyRecursive(storeLocation, target);
+
+ // delete migrated data
+ File directory = new File(_toDir);
+ if (directory.exists() && directory.isDirectory())
+ {
+ FileUtils.delete(directory, true);
+ }
+
+ // test data
+ String nonExistingQueueName = getTestQueueName();
+ String messageText = "Test Phantom Message";
+
+ // add message
+ addMessageForNonExistingQueue(target, VERSION_4, nonExistingQueueName, messageText);
+
+ String[] inputs = { "Yes", "Yes", "Yes" };
+ upgradeBrokerStoreInInterractiveMode(_toDirTwice, _toDir, inputs);
+
+ // start broker
+ startBroker();
+
+ // Create a connection and start it
+ Connection connection = getConnection();
+ connection.start();
+
+ // consume a message for non-existing store
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(nonExistingQueueName);
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ Message message = messageConsumer.receive(1000);
+
+ // assert consumed message
+ assertNotNull("Message was not migrated!", message);
+ assertTrue("Unexpected message received!", message instanceof TextMessage);
+ String text = ((TextMessage) message).getText();
+ assertEquals("Message migration failed!", messageText, text);
+ }
+
+ /**
+ * An utility method to upgrade broker with simulation user interactions
+ *
+ * @param fromDir
+ * location of the store to migrate
+ * @param toDir
+ * location of where migrated data will be stored
+ * @param inputs
+ * user answers on upgrade tool questions
+ * @throws Exception
+ */
+ private void upgradeBrokerStoreInInterractiveMode(String fromDir, String toDir, final String[] inputs)
+ throws Exception
+ {
+ // save to restore system.in after data migration
+ InputStream stdin = System.in;
+
+ // set fake system in to simulate user interactions
+ // FIXME: it is a quite dirty simulator of system input but it does the job
+ System.setIn(new InputStream()
+ {
+
+ int counter = 0;
+
+ public synchronized int read(byte b[], int off, int len)
+ {
+ byte[] src = (inputs[counter] + "\n").getBytes();
+ System.arraycopy(src, 0, b, off, src.length);
+ counter++;
+ return src.length;
+ }
+
+ @Override
+ public int read() throws IOException
+ {
+ return -1;
+ }
+ });
+
+ try
+ {
+ // Upgrade the test store.
+ new BDBStoreUpgrade(fromDir, toDir, null, true, true).upgradeFromVersion(VERSION_4);
+ }
+ finally
+ {
+ // restore system in
+ System.setIn(stdin);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void addMessageForNonExistingQueue(File storeLocation, int storeVersion, String nonExistingQueueName,
+ String messageText) throws Exception
+ {
+ final AMQShortString queueName = new AMQShortString(nonExistingQueueName);
+ BDBMessageStore store = new BDBMessageStore(storeVersion);
+ store.configure(storeLocation, false);
+ try
+ {
+ store.start();
+
+ // store message objects
+ ByteBuffer completeContentBody = ByteBuffer.wrap(messageText.getBytes("UTF-8"));
+ long bodySize = completeContentBody.limit();
+ MessagePublishInfo pubInfoBody = new MessagePublishInfoImpl(new AMQShortString("amq.direct"), false,
+ false, queueName);
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+ props.setDeliveryMode(Integer.valueOf(BasicContentHeaderProperties.PERSISTENT).byteValue());
+ props.setContentType("text/plain");
+ props.setType("text/plain");
+ props.setMessageId("whatever");
+ props.setEncoding("UTF-8");
+ props.getHeaders().setString("Test", "MST");
+ MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+ int classForBasic = methodRegistry.createBasicQosOkBody().getClazz();
+ ContentHeaderBody contentHeaderBody = new ContentHeaderBody(classForBasic, 1, props, bodySize);
+
+ // add content entry to database
+ long messageId = store.getNewMessageId();
+ TupleBinding<MessageContentKey> contentKeyTB = new MessageContentKeyTupleBindingFactory(storeVersion).getInstance();
+ MessageContentKey contentKey = null;
+ if (storeVersion == VERSION_4)
+ {
+ contentKey = new MessageContentKey_4(messageId, 0);
+ }
+ else
+ {
+ throw new Exception(storeVersion + " is not supported");
+ }
+ DatabaseEntry key = new DatabaseEntry();
+ contentKeyTB.objectToEntry(contentKey, key);
+ DatabaseEntry data = new DatabaseEntry();
+ ContentTB contentTB = new ContentTB();
+ contentTB.objectToEntry(completeContentBody, data);
+ store.getContentDb().put(null, key, data);
+
+ // add meta data entry to database
+ TupleBinding<Long> longTB = TupleBinding.getPrimitiveBinding(Long.class);
+ TupleBinding<Object> metaDataTB = new MessageMetaDataTupleBindingFactory(storeVersion).getInstance();
+ key = new DatabaseEntry();
+ data = new DatabaseEntry();
+ longTB.objectToEntry(new Long(messageId), key);
+ MessageMetaData metaData = new MessageMetaData(pubInfoBody, contentHeaderBody, 1);
+ metaDataTB.objectToEntry(metaData, data);
+ store.getMetaDataDb().put(null, key, data);
+
+ // add delivery entry to database
+ TransactionLogResource mockQueue = new TransactionLogResource()
+ {
+ public String getResourceName()
+ {
+ return queueName.asString();
+ }
+ };
+ TransactionLog log = (TransactionLog) store;
+ TransactionLog.Transaction txn = log.newTransaction();
+ txn.enqueueMessage(mockQueue, messageId);
+ txn.commitTran();
+ }
+ finally
+ {
+ // close store
+ store.close();
+ }
+ }
+
+ private void consumeDurableSubscriptionMessages(Connection connection) throws Exception
+ {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic(TOPIC_NAME);
+
+ TopicSubscriber durSub = session.createDurableSubscriber(topic, SUB_NAME,"testprop='true'", false);
+
+ // Retrieve the matching message
+ Message m = durSub.receive(2000);
+ assertNotNull("Failed to receive an expected message", m);
+ assertEquals("Selector property did not match", "true", m.getStringProperty("testprop"));
+ assertEquals("ID property did not match", 1, m.getIntProperty("ID"));
+ assertEquals("Message content was not as expected",BDBStoreUpgradeTestPreparer.generateString(1024) , ((TextMessage)m).getText());
+
+ // Verify that neither the non-matching or uncommitted message are received
+ m = durSub.receive(1000);
+ assertNull("No more messages should have been recieved", m);
+
+ durSub.close();
+ session.close();
+ }
+
+ private void consumeQueueMessages(Connection connection, boolean extraMessage) throws Exception
+ {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(QUEUE_NAME);
+
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message m;
+
+ // Retrieve the initial pre-upgrade messages
+ for (int i=1; i <= 5 ; i++)
+ {
+ m = consumer.receive(2000);
+ assertNotNull("Failed to receive an expected message", m);
+ assertEquals("ID property did not match", i, m.getIntProperty("ID"));
+ assertEquals("Message content was not as expected", STRING_1024_256, ((TextMessage)m).getText());
+ }
+ for (int i=1; i <= 5 ; i++)
+ {
+ m = consumer.receive(2000);
+ assertNotNull("Failed to receive an expected message", m);
+ assertEquals("ID property did not match", i, m.getIntProperty("ID"));
+ assertEquals("Message content was not as expected", STRING_1024, ((TextMessage)m).getText());
+ }
+
+ if(extraMessage)
+ {
+ //verify that the extra message is received
+ m = consumer.receive(2000);
+ assertNotNull("Failed to receive an expected message", m);
+ assertEquals("ID property did not match", 1, m.getIntProperty("ID"));
+ assertEquals("Message content was not as expected", STRING_1024_256, ((TextMessage)m).getText());
+ }
+
+ // Verify that no more messages are received
+ m = consumer.receive(1000);
+ assertNull("No more messages should have been recieved", m);
+
+ consumer.close();
+ session.close();
+ }
+
+ private void upgradeBrokerStore(String fromDir, String toDir) throws Exception
+ {
+ new BDBStoreUpgrade(_fromDir, _toDir, null, false, true).upgradeFromVersion(VERSION_4);
+ }
+}
diff --git a/java/bdbstore/src/test/resources/upgrade/bdbstore-to-upgrade/test-store/00000000.jdb b/java/bdbstore/src/test/resources/upgrade/bdbstore-to-upgrade/test-store/00000000.jdb
new file mode 100644
index 0000000000..c4e4e6c306
--- /dev/null
+++ b/java/bdbstore/src/test/resources/upgrade/bdbstore-to-upgrade/test-store/00000000.jdb
Binary files differ