summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2012-03-27 11:04:02 +0000
committerKeith Wall <kwall@apache.org>2012-03-27 11:04:02 +0000
commit543aefb12560de0fb374c3a25fe3dc0b809a221e (patch)
treea2ed89ff318dc598c4070a6221c03154c569a416
parent30f5a131dfd3b9d628f3b9cb2dc91ada79993b60 (diff)
downloadqpid-python-543aefb12560de0fb374c3a25fe3dc0b809a221e.tar.gz
QPID-3913: Add functionality to upgrade bdbstore automatically on broker start-up. Store message content using single chunk. Change store version to 6. Remove implementations of tuple bindings for previous versions.
Applied patch from Phil Harvey<phil@philharveyonline.com> Oleksandr Rudyy<orudyy@gmail.com> git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1305809 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-xqpid/java/bdbstore/bin/storeUpgrade.sh41
-rw-r--r--qpid/java/bdbstore/build.xml7
-rw-r--r--qpid/java/bdbstore/src/main/java/BDBStoreUpgrade.log4j.xml52
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java2
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java518
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java1299
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DatabaseVisitor.java49
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/BindingRecord.java (renamed from qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/BindingRecord.java)2
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/ExchangeRecord.java (renamed from qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/ExchangeRecord.java)2
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java (renamed from qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/PreparedTransaction.java)2
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/QueueEntryKey.java (renamed from qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java)6
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/QueueRecord.java (renamed from qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java)6
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/Xid.java (renamed from qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/Xid.java)2
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_4.java44
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_5.java44
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/AMQShortStringBinding.java (renamed from qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java)26
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ContentBinding.java (renamed from qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java)38
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ExchangeBinding.java (renamed from qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java)29
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java (renamed from qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_5.java)26
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java (renamed from qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/PreparedTransactionTB.java)8
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueBinding.java (renamed from qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java)18
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueBindingTupleBinding.java (renamed from qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_4.java)27
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueEntryBinding.java (renamed from qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java)17
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/StringMapBinding.java (renamed from qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StringMapBinding.java)6
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/UUIDTupleBinding.java (renamed from qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/UUIDTupleBinding.java)8
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/XidBinding.java (renamed from qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/XidTB.java)18
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_4.java47
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_5.java46
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTupleBindingFactory.java45
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_4.java170
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java43
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java46
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_4.java70
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractStoreUpgrade.java83
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/CursorOperation.java89
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/CursorTemplate.java75
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseCallable.java (renamed from qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java)8
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseEntryCallback.java (renamed from qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageContentKey.java)28
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseRunnable.java (renamed from qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java)20
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseTemplate.java114
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java32
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java926
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java372
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeInteractionHandler.java (renamed from qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java)28
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeInteractionResponse.java (renamed from qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java)7
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java178
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java153
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java63
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java330
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java151
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseTemplateTest.java83
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java299
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java141
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java139
-rw-r--r--qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v4/test-store/00000000.jdb (renamed from qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-to-upgrade/test-store/00000000.jdb)bin1346092 -> 1357197 bytes
-rw-r--r--qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/readme.txt5
-rw-r--r--qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000000.jdbbin0 -> 1357227 bytes
-rw-r--r--qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000001.jdbbin0 -> 1332881 bytes
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java16
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/FieldTableSupport.java7
-rw-r--r--qpid/java/module.xml2
61 files changed, 3227 insertions, 2886 deletions
diff --git a/qpid/java/bdbstore/bin/storeUpgrade.sh b/qpid/java/bdbstore/bin/storeUpgrade.sh
deleted file mode 100755
index bf9d09072a..0000000000
--- a/qpid/java/bdbstore/bin/storeUpgrade.sh
+++ /dev/null
@@ -1,41 +0,0 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# Parse arguements taking all - prefixed args as JAVA_OPTS
-declare -a ARGS
-for arg in "$@"; do
- if [[ $arg == -java:* ]]; then
- JAVA_OPTS="${JAVA_OPTS}-`echo $arg|cut -d ':' -f 2` "
- else
- ARGS[${#ARGS[@]}]="$arg"
- fi
-done
-
-if [ -z "${QPID_HOME}" ]; then
- WHEREAMI=`dirname "$0"`
- export QPID_HOME=`cd ${WHEREAMI}/../ && pwd`
-fi
-
-VERSION=0.17
-
-# BDB's je JAR expected to be found in lib/opt
-LIBS="$QPID_HOME/lib/opt/*:$QPID_HOME/lib/qpid-bdbstore-${VERSION}.jar:$QPID_HOME/lib/qpid-all.jar"
-
-java -Xms256m -Dlog4j.configuration=BDBStoreUpgrade.log4j.xml -Xmx256m -Damqj.logging.level=warn ${JAVA_OPTS} -cp "${LIBS}" org.apache.qpid.server.store.berkeleydb.BDBStoreUpgrade "${ARGS[@]}"
diff --git a/qpid/java/bdbstore/build.xml b/qpid/java/bdbstore/build.xml
index af7c108aa9..d2317350ff 100644
--- a/qpid/java/bdbstore/build.xml
+++ b/qpid/java/bdbstore/build.xml
@@ -81,4 +81,11 @@ http://www.oracle.com/technetwork/database/berkeleydb/downloads/jeoslicense-0868
<fileset dir="src/test/resources/upgrade"/>
</copy>
</target>
+
+ <target name="precompile-tests">
+ <mkdir dir="${module.test.resources}"/>
+ <copy todir="${module.test.resources}">
+ <fileset dir="src/test/resources"/>
+ </copy>
+ </target>
</project>
diff --git a/qpid/java/bdbstore/src/main/java/BDBStoreUpgrade.log4j.xml b/qpid/java/bdbstore/src/main/java/BDBStoreUpgrade.log4j.xml
deleted file mode 100644
index 4d71963ea7..0000000000
--- a/qpid/java/bdbstore/src/main/java/BDBStoreUpgrade.log4j.xml
+++ /dev/null
@@ -1,52 +0,0 @@
-<?xml version="1.0"?>
-<!--
- -
- - Licensed to the Apache Software Foundation (ASF) under one
- - or more contributor license agreements. See the NOTICE file
- - distributed with this work for additional information
- - regarding copyright ownership. The ASF licenses this file
- - to you under the Apache License, Version 2.0 (the
- - "License"); you may not use this file except in compliance
- - with the License. You may obtain a copy of the License at
- -
- - http://www.apache.org/licenses/LICENSE-2.0
- -
- - Unless required by applicable law or agreed to in writing,
- - software distributed under the License is distributed on an
- - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- - KIND, either express or implied. See the License for the
- - specific language governing permissions and limitations
- - under the License.
- -
- -->
-<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
-<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
-
- <appender name="STDOUT" class="org.apache.log4j.ConsoleAppender">
-
- <layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="%d %-5p - %m%n"/>
- </layout>
- </appender>
-
- <category name="org.apache.qpid.server.store.berkeleydb.BDBStoreUpgrade">
- <priority value="info"/>
- </category>
-
- <!-- Only show errors from the BDB Store -->
- <category name="org.apache.qpid.server.store.berkeleydb.berkeleydb.BDBMessageStore">
- <priority value="error"/>
- </category>
-
- <!-- Provide warnings to standard output -->
- <category name="org.apache.qpid">
- <priority value="error"/>
- </category>
-
- <!-- Log all info events to file -->
- <root>
- <priority value="info"/>
- <appender-ref ref="STDOUT"/>
- </root>
-
-</log4j:configuration>
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java
index 354dba559c..2186597380 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java
@@ -33,7 +33,7 @@ public class AMQShortStringEncoding
public static AMQShortString readShortString(TupleInput tupleInput)
{
- int length = (int) tupleInput.readShort();
+ int length = tupleInput.readShort();
if (length < 0)
{
return null;
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index a91d8f359e..c6d95f7100 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -20,33 +20,29 @@
*/
package org.apache.qpid.server.store.berkeleydb;
-import com.sleepycat.bind.EntryBinding;
-import com.sleepycat.bind.tuple.ByteBinding;
-import com.sleepycat.bind.tuple.IntegerBinding;
-import com.sleepycat.bind.tuple.LongBinding;
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.je.CheckpointConfig;
-import com.sleepycat.je.Cursor;
-import com.sleepycat.je.Database;
-import com.sleepycat.je.DatabaseConfig;
-import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.Environment;
-import com.sleepycat.je.EnvironmentConfig;
-import com.sleepycat.je.LockConflictException;
-import com.sleepycat.je.LockMode;
-import com.sleepycat.je.OperationStatus;
-import com.sleepycat.je.Transaction;
-import com.sleepycat.je.TransactionConfig;
+import java.io.File;
+import java.lang.ref.SoftReference;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
-
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.federation.Bridge;
import org.apache.qpid.server.federation.BrokerLink;
+import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
@@ -68,33 +64,39 @@ import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_5;
-import org.apache.qpid.server.store.berkeleydb.keys.Xid;
-import org.apache.qpid.server.store.berkeleydb.records.BindingRecord;
-import org.apache.qpid.server.store.berkeleydb.records.ExchangeRecord;
-import org.apache.qpid.server.store.berkeleydb.records.PreparedTransaction;
-import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
-import org.apache.qpid.server.store.berkeleydb.tuples.BindingTupleBindingFactory;
-import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTB_5;
-import org.apache.qpid.server.store.berkeleydb.tuples.MessageMetaDataTupleBindingFactory;
-import org.apache.qpid.server.store.berkeleydb.tuples.PreparedTransactionTB;
-import org.apache.qpid.server.store.berkeleydb.tuples.QueueEntryTB;
-import org.apache.qpid.server.store.berkeleydb.tuples.QueueTupleBindingFactory;
-import org.apache.qpid.server.store.berkeleydb.tuples.XidTB;
+import org.apache.qpid.server.store.berkeleydb.entry.BindingRecord;
+import org.apache.qpid.server.store.berkeleydb.entry.ExchangeRecord;
+import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
+import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey;
+import org.apache.qpid.server.store.berkeleydb.entry.QueueRecord;
+import org.apache.qpid.server.store.berkeleydb.entry.Xid;
+import org.apache.qpid.server.store.berkeleydb.tuple.AMQShortStringBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.ExchangeBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.MessageMetaDataBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.PreparedTransactionBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.QueueBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.QueueBindingTupleBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.StringMapBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;
+import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader;
-import java.io.File;
-import java.lang.ref.SoftReference;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
+import com.sleepycat.bind.tuple.ByteBinding;
+import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.je.CheckpointConfig;
+import com.sleepycat.je.Cursor;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.LockConflictException;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.TransactionConfig;
/**
* BDBMessageStore implements a persistent {@link MessageStore} using the BDB high performance log.
@@ -111,21 +113,22 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
private static final int LOCK_RETRY_ATTEMPTS = 5;
- static final int DATABASE_FORMAT_VERSION = 5;
- private static final String DATABASE_FORMAT_VERSION_PROPERTY = "version";
+ public static final int VERSION = 6;
+
public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
private Environment _environment;
- private String MESSAGEMETADATADB_NAME = "messageMetaDataDb";
- private String MESSAGECONTENTDB_NAME = "messageContentDb";
- private String QUEUEBINDINGSDB_NAME = "queueBindingsDb";
- private String DELIVERYDB_NAME = "deliveryDb";
- private String EXCHANGEDB_NAME = "exchangeDb";
- private String QUEUEDB_NAME = "queueDb";
- private String BRIDGEDB_NAME = "bridges";
- private String LINKDB_NAME = "links";
- private String XIDDB_NAME = "xids";
+ private String MESSAGEMETADATADB_NAME = "MESSAGE_METADATA";
+ private String MESSAGECONTENTDB_NAME = "MESSAGE_CONTENT";
+ private String QUEUEBINDINGSDB_NAME = "QUEUE_BINDINGS";
+ private String DELIVERYDB_NAME = "DELIVERIES";
+ private String EXCHANGEDB_NAME = "EXCHANGES";
+ private String QUEUEDB_NAME = "QUEUES";
+ private String BRIDGEDB_NAME = "BRIDGES";
+ private String LINKDB_NAME = "LINKS";
+ private String XIDDB_NAME = "XIDS";
+
private Database _messageMetaDataDb;
private Database _messageContentDb;
@@ -168,13 +171,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
private final CommitThread _commitThread = new CommitThread("Commit-Thread");
- // Factory Classes to create the TupleBinding objects that reflect the version instance of this BDBStore
- private MessageMetaDataTupleBindingFactory _metaDataTupleBindingFactory;
- private QueueTupleBindingFactory _queueTupleBindingFactory;
- private BindingTupleBindingFactory _bindingTupleBindingFactory;
-
- /** The data version this store should run with */
- private int _version;
private enum State
{
INITIAL,
@@ -197,37 +193,8 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
public BDBMessageStore()
{
- this(DATABASE_FORMAT_VERSION);
}
- public BDBMessageStore(int version)
- {
- _version = version;
- }
-
- private void setDatabaseNames(int version)
- {
- if (version > 1)
- {
- MESSAGEMETADATADB_NAME += "_v" + version;
-
- MESSAGECONTENTDB_NAME += "_v" + version;
-
- QUEUEDB_NAME += "_v" + version;
-
- DELIVERYDB_NAME += "_v" + version;
-
- EXCHANGEDB_NAME += "_v" + version;
-
- QUEUEBINDINGSDB_NAME += "_v" + version;
-
- LINKDB_NAME += "_v" + version;
-
- BRIDGEDB_NAME += "_v" + version;
-
- XIDDB_NAME += "_v" + version;
- }
- }
public void configureConfigStore(String name,
ConfigurationRecoveryHandler recoveryHandler,
@@ -281,7 +248,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
recoverQueueEntries(recoveryHandler);
-
+
}
@@ -313,13 +280,16 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
}
- CurrentActor.get().message(_logSubject, MessageStoreMessages.STORE_LOCATION(environmentPath.getAbsolutePath()));
-
- _version = storeConfig.getInt(DATABASE_FORMAT_VERSION_PROPERTY, DATABASE_FORMAT_VERSION);
+ message(MessageStoreMessages.STORE_LOCATION(environmentPath.getAbsolutePath()));
return configure(environmentPath, false);
}
+ void message(final LogMessage message)
+ {
+ CurrentActor.message(_logSubject, message);
+ }
+
/**
* @param environmentPath location for the store to be created in/recovered from
* @param readonly if true then don't allow modifications to an existing store, and don't create a new store if none exists
@@ -334,20 +304,9 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
_log.info("Configuring BDB message store");
- createTupleBindingFactories(_version);
-
- setDatabaseNames(_version);
-
return setupStore(environmentPath, readonly);
}
- private void createTupleBindingFactories(int version)
- {
- _bindingTupleBindingFactory = new BindingTupleBindingFactory(version);
- _queueTupleBindingFactory = new QueueTupleBindingFactory(version);
- _metaDataTupleBindingFactory = new MessageMetaDataTupleBindingFactory(version);
- }
-
/**
* Move the store state from CONFIGURING to STARTED.
*
@@ -366,7 +325,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
boolean newEnvironment = createEnvironment(storePath, readonly);
- verifyVersionByTables();
+ new Upgrader(_environment, _logSubject).upgradeIfNecessary();
openDatabases(readonly);
@@ -378,40 +337,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
return newEnvironment;
}
- private void verifyVersionByTables() throws DatabaseException
- {
- for (String s : _environment.getDatabaseNames())
- {
- int versionIndex = s.indexOf("_v");
-
- // lack of _v index suggests DB is v1
- // so if _version is not v1 then error
- if (versionIndex == -1)
- {
- if (_version != 1)
- {
- closeEnvironment();
- throw new IllegalArgumentException("Error: Unable to load BDBStore as version " + _version
- + ". Store on disk contains version 1 data.");
- }
- else // DB is v1 and _version is v1
- {
- continue;
- }
- }
-
- // Otherwise Check Versions
- int version = Integer.parseInt(s.substring(versionIndex + 2));
-
- if (version != _version)
- {
- closeEnvironment();
- throw new IllegalArgumentException("Error: Unable to load BDBStore as version " + _version
- + ". Store on disk contains version " + version + " data.");
- }
- }
- }
-
private synchronized void stateTransition(State requiredState, State newState) throws AMQStoreException
{
if (_state != requiredState)
@@ -586,7 +511,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
_state = State.CLOSED;
- CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED());
+ message(MessageStoreMessages.CLOSED());
}
private void closeEnvironment() throws DatabaseException
@@ -609,7 +534,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
stateTransition(State.CONFIGURED, State.RECOVERING);
- CurrentActor.get().message(_logSubject,MessageStoreMessages.RECOVERY_START());
+ message(MessageStoreMessages.RECOVERY_START());
try
{
@@ -641,10 +566,10 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
cursor = _queueDb.openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry value = new DatabaseEntry();
- TupleBinding binding = _queueTupleBindingFactory.getInstance();
+ QueueBinding binding = QueueBinding.getInstance();
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
- QueueRecord queueRecord = (QueueRecord) binding.entryToObject(value);
+ QueueRecord queueRecord = binding.entryToObject(value);
String queueName = queueRecord.getNameShortString() == null ? null :
queueRecord.getNameShortString().asString();
@@ -677,11 +602,11 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
cursor = _exchangeDb.openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry value = new DatabaseEntry();
- TupleBinding binding = new ExchangeTB();
+ ExchangeBinding binding = ExchangeBinding.getInstance();
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
- ExchangeRecord exchangeRec = (ExchangeRecord) binding.entryToObject(value);
+ ExchangeRecord exchangeRec = binding.entryToObject(value);
String exchangeName = exchangeRec.getNameShortString() == null ? null :
exchangeRec.getNameShortString().asString();
@@ -710,7 +635,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
cursor = _queueBindingsDb.openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry value = new DatabaseEntry();
- TupleBinding<BindingRecord> binding = _bindingTupleBindingFactory.getInstance();
+ QueueBindingTupleBinding binding = QueueBindingTupleBinding.getInstance();
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
@@ -818,14 +743,14 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
cursor = _messageMetaDataDb.openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry value = new DatabaseEntry();
- EntryBinding valueBinding = _metaDataTupleBindingFactory.getInstance();
+ MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance();
long maxId = 0;
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
long messageId = LongBinding.entryToLong(key);
- StorableMessageMetaData metaData = (StorableMessageMetaData) valueBinding.entryToObject(value);
+ StorableMessageMetaData metaData = valueBinding.entryToObject(value);
StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, false);
mrh.message(message);
@@ -861,13 +786,13 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
cursor = _deliveryDb.openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new QueueEntryTB();
+ QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
DatabaseEntry value = new DatabaseEntry();
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
- QueueEntryKey qek = (QueueEntryKey) keyBinding.entryToObject(key);
+ QueueEntryKey qek = keyBinding.entryToObject(key);
entries.add(qek);
}
@@ -902,8 +827,8 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
}
-
-
+
+
TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = qerh.completeQueueEntryRecovery();
cursor = null;
@@ -911,8 +836,8 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
cursor = _xidDb.openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
- XidTB keyBinding = new XidTB();
- PreparedTransactionTB valueBinding = new PreparedTransactionTB();
+ XidBinding keyBinding = XidBinding.getInstance();
+ PreparedTransactionBinding valueBinding = new PreparedTransactionBinding();
DatabaseEntry value = new DatabaseEntry();
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
@@ -946,7 +871,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
}
-
+
dtxrh.completeDtxRecordRecovery();
}
@@ -1001,35 +926,14 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
//now remove the content data from the store if there is any.
+ DatabaseEntry contentKeyEntry = new DatabaseEntry();
+ LongBinding.longToEntry(messageId, contentKeyEntry);
+ _messageContentDb.delete(tx, contentKeyEntry);
-
-
- int offset = 0;
- do
+ if (_log.isDebugEnabled())
{
- DatabaseEntry contentKeyEntry = new DatabaseEntry();
- MessageContentKey_5 mck = new MessageContentKey_5(messageId,offset);
- TupleBinding<MessageContentKey> contentKeyTupleBinding = new MessageContentKeyTB_5();
- contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry);
- //Use a partial record for the value to prevent retrieving the
- //data itself as we only need the key to identify what to remove.
- DatabaseEntry value = new DatabaseEntry();
- value.setPartial(0, 4, true);
-
- status = _messageContentDb.get(null,contentKeyEntry, value, LockMode.READ_COMMITTED);
-
- if(status == OperationStatus.SUCCESS)
- {
-
- offset += IntegerBinding.entryToInt(value);
- _messageContentDb.delete(tx, contentKeyEntry);
- if (_log.isDebugEnabled())
- {
- _log.debug("Deleted content chunk offset " + mck.getOffset() + " for message " + messageId);
- }
- }
+ _log.debug("Deleted content for message " + messageId);
}
- while (status == OperationStatus.SUCCESS);
commit(tx, sync);
complete = true;
@@ -1128,11 +1032,11 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
exchange.getTypeShortString(), exchange.isAutoDelete());
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new AMQShortStringTB();
+ AMQShortStringBinding keyBinding = AMQShortStringBinding.getInstance();
keyBinding.objectToEntry(exchange.getNameShortString(), key);
DatabaseEntry value = new DatabaseEntry();
- TupleBinding exchangeBinding = new ExchangeTB();
+ ExchangeBinding exchangeBinding = ExchangeBinding.getInstance();
exchangeBinding.objectToEntry(exchangeRec, value);
try
@@ -1152,7 +1056,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
public void removeExchange(Exchange exchange) throws AMQStoreException
{
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new AMQShortStringTB();
+ AMQShortStringBinding keyBinding = AMQShortStringBinding.getInstance();
keyBinding.objectToEntry(exchange.getNameShortString(), key);
try
{
@@ -1182,7 +1086,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
if (_state != State.RECOVERING)
{
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = _bindingTupleBindingFactory.getInstance();
+ QueueBindingTupleBinding keyBinding = QueueBindingTupleBinding.getInstance();
keyBinding.objectToEntry(bindingRecord, key);
@@ -1211,7 +1115,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
throws AMQStoreException
{
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = _bindingTupleBindingFactory.getInstance();
+ QueueBindingTupleBinding keyBinding = QueueBindingTupleBinding.getInstance();
keyBinding.objectToEntry(new BindingRecord(exchange.getNameShortString(), queue.getNameShortString(), routingKey, args), key);
try
@@ -1268,11 +1172,11 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
if (_state != State.RECOVERING)
{
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new AMQShortStringTB();
+ AMQShortStringBinding keyBinding = AMQShortStringBinding.getInstance();
keyBinding.objectToEntry(queueRecord.getNameShortString(), key);
DatabaseEntry value = new DatabaseEntry();
- TupleBinding queueBinding = _queueTupleBindingFactory.getInstance();
+ QueueBinding queueBinding = QueueBinding.getInstance();
queueBinding.objectToEntry(queueRecord, value);
try
@@ -1306,18 +1210,18 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
try
{
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new AMQShortStringTB();
+ AMQShortStringBinding keyBinding = AMQShortStringBinding.getInstance();
keyBinding.objectToEntry(queue.getNameShortString(), key);
DatabaseEntry value = new DatabaseEntry();
DatabaseEntry newValue = new DatabaseEntry();
- TupleBinding queueBinding = _queueTupleBindingFactory.getInstance();
+ QueueBinding queueBinding = QueueBinding.getInstance();
OperationStatus status = _queueDb.get(null, key, value, LockMode.DEFAULT);
if(status == OperationStatus.SUCCESS)
{
//read the existing record and apply the new exclusivity setting
- QueueRecord queueRecord = (QueueRecord) queueBinding.entryToObject(value);
+ QueueRecord queueRecord = queueBinding.entryToObject(value);
queueRecord.setExclusive(queue.isExclusive());
//write the updated entry to the store
@@ -1353,7 +1257,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new AMQShortStringTB();
+ AMQShortStringBinding keyBinding = AMQShortStringBinding.getInstance();
keyBinding.objectToEntry(name, key);
try
{
@@ -1468,7 +1372,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
AMQShortString name = AMQShortString.valueOf(queue.getResourceName());
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new QueueEntryTB();
+ QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
QueueEntryKey dd = new QueueEntryKey(name, messageId);
keyBinding.objectToEntry(dd, key);
DatabaseEntry value = new DatabaseEntry();
@@ -1505,10 +1409,10 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
AMQShortString name = new AMQShortString(queue.getResourceName());
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new QueueEntryTB();
- QueueEntryKey dd = new QueueEntryKey(name, messageId);
+ QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
+ QueueEntryKey queueEntryKey = new QueueEntryKey(name, messageId);
- keyBinding.objectToEntry(dd, key);
+ keyBinding.objectToEntry(queueEntryKey, key);
if (_log.isDebugEnabled())
{
@@ -1545,21 +1449,21 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
- private void recordXid(com.sleepycat.je.Transaction txn,
- long format,
- byte[] globalId,
- byte[] branchId,
+ private void recordXid(com.sleepycat.je.Transaction txn,
+ long format,
+ byte[] globalId,
+ byte[] branchId,
Transaction.Record[] enqueues,
Transaction.Record[] dequeues) throws AMQStoreException
{
DatabaseEntry key = new DatabaseEntry();
Xid xid = new Xid(format, globalId, branchId);
- XidTB keyBinding = new XidTB();
+ XidBinding keyBinding = XidBinding.getInstance();
keyBinding.objectToEntry(xid,key);
DatabaseEntry value = new DatabaseEntry();
PreparedTransaction preparedTransaction = new PreparedTransaction(enqueues, dequeues);
- PreparedTransactionTB valueBinding = new PreparedTransactionTB();
+ PreparedTransactionBinding valueBinding = new PreparedTransactionBinding();
valueBinding.objectToEntry(preparedTransaction, value);
try
@@ -1578,8 +1482,8 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
DatabaseEntry key = new DatabaseEntry();
Xid xid = new Xid(format, globalId, branchId);
- XidTB keyBinding = new XidTB();
-
+ XidBinding keyBinding = XidBinding.getInstance();
+
keyBinding.objectToEntry(xid, key);
@@ -1606,7 +1510,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
throw new AMQStoreException("Error accessing database while removing xid: " + e.getMessage(), e);
}
}
-
+
/**
* Commits all operations performed within a given transaction.
*
@@ -1681,7 +1585,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
QueueEntryKey dd = new QueueEntryKey(queueName, 0);
- EntryBinding keyBinding = new QueueEntryTB();
+ QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
keyBinding.objectToEntry(dd, key);
DatabaseEntry value = new DatabaseEntry();
@@ -1689,7 +1593,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
LinkedList<Long> messageIds = new LinkedList<Long>();
OperationStatus status = cursor.getSearchKeyRange(key, value, LockMode.DEFAULT);
- dd = (QueueEntryKey) keyBinding.entryToObject(key);
+ dd = keyBinding.entryToObject(key);
while ((status == OperationStatus.SUCCESS) && dd.getQueueName().equals(queueName))
{
@@ -1698,7 +1602,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
status = cursor.getNext(key, value, LockMode.DEFAULT);
if (status == OperationStatus.SUCCESS)
{
- dd = (QueueEntryKey) keyBinding.entryToObject(key);
+ dd = keyBinding.entryToObject(key);
}
}
@@ -1739,32 +1643,29 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
*
* @param tx The transaction for the operation.
* @param messageId The message to store the data for.
- * @param offset The offset of the data chunk in the message.
* @param contentBody The content of the data chunk.
*
* @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
*/
- protected void addContent(final com.sleepycat.je.Transaction tx, long messageId, int offset,
+ protected void addContent(final com.sleepycat.je.Transaction tx, long messageId,
ByteBuffer contentBody) throws AMQStoreException
{
DatabaseEntry key = new DatabaseEntry();
- TupleBinding<MessageContentKey> keyBinding = new MessageContentKeyTB_5();
- keyBinding.objectToEntry(new MessageContentKey_5(messageId, offset), key);
+ LongBinding.longToEntry(messageId, key);
DatabaseEntry value = new DatabaseEntry();
- TupleBinding<ByteBuffer> messageBinding = new ContentTB();
- messageBinding.objectToEntry(contentBody, value);
+ ContentBinding messageBinding = ContentBinding.getInstance();
+ messageBinding.objectToEntry(contentBody.array(), value);
try
{
OperationStatus status = _messageContentDb.put(tx, key, value);
if (status != OperationStatus.SUCCESS)
{
- throw new AMQStoreException("Error adding content chunk offset" + offset + " for message id " + messageId + ": "
- + status);
+ throw new AMQStoreException("Error adding content for message id " + messageId + ": " + status);
}
if (_log.isDebugEnabled())
{
- _log.debug("Storing content chunk offset" + offset + " for message " + messageId + "[Transaction" + tx + "]");
+ _log.debug("Storing content for message " + messageId + "[Transaction" + tx + "]");
}
}
catch (DatabaseException e)
@@ -1796,7 +1697,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
LongBinding.longToEntry(messageId, key);
DatabaseEntry value = new DatabaseEntry();
- TupleBinding messageBinding = _metaDataTupleBindingFactory.getInstance();
+ MessageMetaDataBinding messageBinding = MessageMetaDataBinding.getInstance();
messageBinding.objectToEntry(messageMetaData, value);
try
{
@@ -1832,7 +1733,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
DatabaseEntry key = new DatabaseEntry();
LongBinding.longToEntry(messageId, key);
DatabaseEntry value = new DatabaseEntry();
- TupleBinding messageBinding = _metaDataTupleBindingFactory.getInstance();
+ MessageMetaDataBinding messageBinding = MessageMetaDataBinding.getInstance();
try
{
@@ -1842,7 +1743,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
throw new AMQStoreException("Metadata not found for message with id " + messageId);
}
- StorableMessageMetaData mdd = (StorableMessageMetaData) messageBinding.entryToObject(value);
+ StorableMessageMetaData mdd = messageBinding.entryToObject(value);
return mdd;
}
@@ -1868,87 +1769,41 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
DatabaseEntry contentKeyEntry = new DatabaseEntry();
- //Start from 0 offset and search for the starting chunk.
- MessageContentKey_5 mck = new MessageContentKey_5(messageId, 0);
- TupleBinding<MessageContentKey> contentKeyTupleBinding = new MessageContentKeyTB_5();
- contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry);
+ LongBinding.longToEntry(messageId, contentKeyEntry);
DatabaseEntry value = new DatabaseEntry();
- TupleBinding<ByteBuffer> contentTupleBinding = new ContentTB();
+ ContentBinding contentTupleBinding = ContentBinding.getInstance();
if (_log.isDebugEnabled())
{
_log.debug("Message Id: " + messageId + " Getting content body from offset: " + offset);
}
- int written = 0;
- int seenSoFar = 0;
-
- Cursor cursor = null;
try
{
- cursor = _messageContentDb.openCursor(null, null);
-
- OperationStatus status = cursor.getSearchKeyRange(contentKeyEntry, value, LockMode.READ_UNCOMMITTED);
-
- while (status == OperationStatus.SUCCESS)
+ int written = 0;
+ OperationStatus status = _messageContentDb.get(null, contentKeyEntry, value, LockMode.READ_UNCOMMITTED);
+ if (status == OperationStatus.SUCCESS)
{
- mck = (MessageContentKey_5) contentKeyTupleBinding.entryToObject(contentKeyEntry);
- long id = mck.getMessageId();
-
- if(id != messageId)
+ byte[] dataAsBytes = contentTupleBinding.entryToObject(value);
+ int size = dataAsBytes.length;
+ if (offset > size)
{
- //we have exhausted all chunks for this message id, break
- break;
+ throw new RuntimeException("Offset " + offset + " is greater than message size " + size
+ + " for message id " + messageId + "!");
}
- int offsetInMessage = mck.getOffset();
- ByteBuffer buf = (ByteBuffer) contentTupleBinding.entryToObject(value);
-
- final int size = (int) buf.limit();
-
- seenSoFar += size;
-
- if(seenSoFar >= offset)
+ written = size - offset;
+ if(written > dst.remaining())
{
- byte[] dataAsBytes = buf.array();
-
- int posInArray = offset + written - offsetInMessage;
- int count = size - posInArray;
- if(count > dst.remaining())
- {
- count = dst.remaining();
- }
- dst.put(dataAsBytes,posInArray,count);
- written+=count;
-
- if(dst.remaining() == 0)
- {
- break;
- }
+ written = dst.remaining();
}
-
- status = cursor.getNext(contentKeyEntry, value, LockMode.RMW);
+ dst.put(dataAsBytes, offset, written);
}
-
return written;
}
catch (DatabaseException e)
{
- throw new AMQStoreException("Error writing AMQMessage with id " + messageId + " to database: " + e.getMessage(), e);
- }
- finally
- {
- if(cursor != null)
- {
- try
- {
- cursor.close();
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error writing AMQMessage with id " + messageId + " to database: " + e.getMessage(), e);
- }
- }
+ throw new AMQStoreException("Error getting AMQMessage with id " + messageId + " to database: " + e.getMessage(), e);
}
}
@@ -1961,7 +1816,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
if(metaData.isPersistent())
{
- return new StoredBDBMessage(getNewMessageId(), metaData);
+ return (StoredMessage<T>) new StoredBDBMessage(getNewMessageId(), metaData);
}
else
{
@@ -1970,23 +1825,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
- //protected getters for the TupleBindingFactories
-
- protected QueueTupleBindingFactory getQueueTupleBindingFactory()
- {
- return _queueTupleBindingFactory;
- }
-
- protected BindingTupleBindingFactory getBindingTupleBindingFactory()
- {
- return _bindingTupleBindingFactory;
- }
-
- protected MessageMetaDataTupleBindingFactory getMetaDataTupleBindingFactory()
- {
- return _metaDataTupleBindingFactory;
- }
-
//Package getters for the various databases used by the Store
Database getMetaDataDb()
@@ -2019,65 +1857,9 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
return _queueBindingsDb;
}
- void visitMetaDataDb(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException
- {
- visitDatabase(_messageMetaDataDb, visitor);
- }
-
- void visitContentDb(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException
- {
- visitDatabase(_messageContentDb, visitor);
- }
-
- void visitQueues(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException
- {
- visitDatabase(_queueDb, visitor);
- }
-
- void visitDelivery(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException
+ Environment getEnvironment()
{
- visitDatabase(_deliveryDb, visitor);
- }
-
- void visitExchanges(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException
- {
- visitDatabase(_exchangeDb, visitor);
- }
-
- void visitBindings(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException
- {
- visitDatabase(_queueBindingsDb, visitor);
- }
-
- /**
- * Generic visitDatabase allows iteration through the specified database.
- *
- * @param database The database to visit
- * @param visitor The visitor to give each entry to.
- *
- * @throws DatabaseException If there is a problem with the Database structure
- * @throws AMQStoreException If there is a problem with the Database contents
- */
- void visitDatabase(Database database, DatabaseVisitor visitor) throws DatabaseException, AMQStoreException
- {
- Cursor cursor = database.openCursor(null, null);
-
- try
- {
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- visitor.visit(key, value);
- }
- }
- finally
- {
- if (cursor != null)
- {
- cursor.close();
- }
- }
+ return _environment;
}
private StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit) throws DatabaseException
@@ -2279,7 +2061,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
- private class StoredBDBMessage implements StoredMessage
+ private class StoredBDBMessage implements StoredMessage<StorableMessageMetaData>
{
private final long _messageId;
@@ -2376,6 +2158,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
catch (AMQStoreException e)
{
+ // TODO maybe should throw a checked exception, or at least log before throwing
throw new RuntimeException(e);
}
}
@@ -2406,7 +2189,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
_dataRef = new SoftReference<byte[]>(_data);
BDBMessageStore.this.storeMetaData(txn, _messageId, _metaData);
- BDBMessageStore.this.addContent(txn, _messageId, 0,
+ BDBMessageStore.this.addContent(txn, _messageId,
_data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data));
}
catch(DatabaseException e)
@@ -2486,17 +2269,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
BDBMessageStore.this.dequeueMessage(_txn, queue, message.getMessageNumber());
}
- public void enqueueMessage(TransactionLogResource queue, long messageId) throws AMQStoreException
- {
- BDBMessageStore.this.enqueueMessage(_txn, queue, messageId);
- }
-
- public void dequeueMessage(TransactionLogResource queue, long messageId) throws AMQStoreException
- {
- BDBMessageStore.this.dequeueMessage(_txn, queue, messageId);
-
- }
-
public void commitTran() throws AMQStoreException
{
BDBMessageStore.this.commitTranImpl(_txn, true);
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
deleted file mode 100644
index 817ba2a5f5..0000000000
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
+++ /dev/null
@@ -1,1299 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.je.Database;
-import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.Environment;
-import com.sleepycat.je.EnvironmentConfig;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.cli.PosixParser;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.logging.NullRootMessageLogger;
-import org.apache.qpid.server.logging.actors.BrokerActor;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.message.MessageMetaData;
-import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_4;
-import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_5;
-import org.apache.qpid.server.store.berkeleydb.records.BindingRecord;
-import org.apache.qpid.server.store.berkeleydb.records.ExchangeRecord;
-import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
-import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTB_4;
-import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTB_5;
-import org.apache.qpid.server.store.berkeleydb.tuples.QueueEntryTB;
-import org.apache.qpid.util.FileUtils;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.nio.ByteBuffer;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
-
-/**
- * This is a simple BerkeleyDB Store upgrade tool that will upgrade a V4 Store to a V5 Store.
- *
- * Currently upgrade is fixed from v4 -> v5
- *
- * Improvments:
- * - Add List BDBMessageStore.getDatabases(); This can the be iterated to guard against new DBs being added.
- * - A version in the store would allow automated upgrade or later with more available versions interactive upgrade.
- * - Add process logging and disable all Store and Qpid logging.
- */
-public class BDBStoreUpgrade
-{
- private static final Logger _logger = LoggerFactory.getLogger(BDBStoreUpgrade.class);
- /** The Store Directory that needs upgrading */
- private File _fromDir;
- /** The Directory that will be made to contain the upgraded store */
- private File _toDir;
- /** The Directory that will be made to backup the original store if required */
- private File _backupDir;
-
- /** The Old Store */
- private BDBMessageStore _oldMessageStore;
- /** The New Store */
- private BDBMessageStore _newMessageStore;
- /** The file ending that is used by BDB Store Files */
- private static final String BDB_FILE_ENDING = ".jdb";
-
- static final Options _options = new Options();
- private static CommandLine _commandLine;
- private boolean _interactive;
- private boolean _force;
-
- private static final String VERSION = "3.0";
- private static final String USER_ABORTED_PROCESS = "User aborted process";
- private static final int LOWEST_SUPPORTED_STORE_VERSION = 4;
- private static final String PREVIOUS_STORE_VERSION_UNSUPPORTED = "Store upgrade from version {0} is not supported."
- + " You must first run the previous store upgrade tool.";
- private static final String FOLLOWING_STORE_VERSION_UNSUPPORTED = "Store version {0} is newer than this tool supports. "
- + "You must use a newer version of the store upgrade tool";
- private static final String STORE_ALREADY_UPGRADED = "Store has already been upgraded to version {0}.";
-
- private static final String OPTION_INPUT_SHORT = "i";
- private static final String OPTION_INPUT = "input";
- private static final String OPTION_OUTPUT_SHORT = "o";
- private static final String OPTION_OUTPUT = "output";
- private static final String OPTION_BACKUP_SHORT = "b";
- private static final String OPTION_BACKUP = "backup";
- private static final String OPTION_QUIET_SHORT = "q";
- private static final String OPTION_QUIET = "quiet";
- private static final String OPTION_FORCE_SHORT = "f";
- private static final String OPTION_FORCE = "force";
- private boolean _inplace = false;
-
- public BDBStoreUpgrade(String fromDir, String toDir, String backupDir, boolean interactive, boolean force)
- {
- _interactive = interactive;
- _force = force;
-
- _fromDir = new File(fromDir);
- if (!_fromDir.exists())
- {
- throw new IllegalArgumentException("BDBStore path '" + fromDir + "' could not be read. "
- + "Ensure the path is correct and that the permissions are correct.");
- }
-
- if (!isDirectoryAStoreDir(_fromDir))
- {
- throw new IllegalArgumentException("Specified directory '" + fromDir + "' does not contain a valid BDBMessageStore.");
- }
-
- if (toDir == null)
- {
- _inplace = true;
- _toDir = new File(fromDir+"-Inplace");
- }
- else
- {
- _toDir = new File(toDir);
- }
-
- if (_toDir.exists())
- {
- if (_interactive)
- {
- if (toDir == null)
- {
- System.out.println("Upgrading in place:" + fromDir);
- }
- else
- {
- System.out.println("Upgrade destination: '" + toDir + "'");
- }
-
- if (userInteract("Upgrade destination exists do you wish to replace it?"))
- {
- if (!FileUtils.delete(_toDir, true))
- {
- throw new IllegalArgumentException("Unable to remove upgrade destination '" + _toDir + "'");
- }
- }
- else
- {
- throw new IllegalArgumentException("Upgrade destination '" + _toDir + "' already exists. ");
- }
- }
- else
- {
- if (_force)
- {
- if (!FileUtils.delete(_toDir, true))
- {
- throw new IllegalArgumentException("Unable to remove upgrade destination '" + _toDir + "'");
- }
- }
- else
- {
- throw new IllegalArgumentException("Upgrade destination '" + _toDir + "' already exists. ");
- }
- }
- }
-
- if (!_toDir.mkdirs())
- {
- throw new IllegalArgumentException("Upgrade destination '" + _toDir + "' could not be created. "
- + "Ensure the path is correct and that the permissions are correct.");
- }
-
- if (backupDir != null)
- {
- if (backupDir.equals(""))
- {
- _backupDir = new File(_fromDir.getAbsolutePath().toString() + "-Backup");
- }
- else
- {
- _backupDir = new File(backupDir);
- }
- }
- else
- {
- _backupDir = null;
- }
- }
-
- private static String ANSWER_OPTIONS = " Yes/No/Abort? ";
- private static String ANSWER_NO = "no";
- private static String ANSWER_N = "n";
- private static String ANSWER_YES = "yes";
- private static String ANSWER_Y = "y";
- private static String ANSWER_ABORT = "abort";
- private static String ANSWER_A = "a";
-
- /**
- * Interact with the user via System.in and System.out. If the user wishes to Abort then a RuntimeException is thrown.
- * Otherwise the method will return based on their response true=yes false=no.
- *
- * @param message Message to print out
- *
- * @return boolean response from user if they wish to proceed
- */
- private boolean userInteract(String message)
- {
- System.out.print(message + ANSWER_OPTIONS);
- BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
-
- String input = "";
- try
- {
- input = br.readLine();
- }
- catch (IOException e)
- {
- input = "";
- }
-
- if (input.equalsIgnoreCase(ANSWER_Y) || input.equalsIgnoreCase(ANSWER_YES))
- {
- return true;
- }
- else
- {
- if (input.equalsIgnoreCase(ANSWER_N) || input.equalsIgnoreCase(ANSWER_NO))
- {
- return false;
- }
- else
- {
- if (input.equalsIgnoreCase(ANSWER_A) || input.equalsIgnoreCase(ANSWER_ABORT))
- {
- throw new RuntimeException(USER_ABORTED_PROCESS);
- }
- }
- }
-
- return userInteract(message);
- }
-
- /**
- * Upgrade a Store of a specified version to the latest version.
- *
- * @param version the version of the current store
- *
- * @throws Exception
- */
- public void upgradeFromVersion(int version) throws Exception
- {
- upgradeFromVersion(version, _fromDir, _toDir, _backupDir, _force,
- _inplace);
- }
-
- /**
- * Upgrade a Store of a specified version to the latest version.
- *
- * @param version the version of the current store
- * @param fromDir the directory with the old Store
- * @param toDir the directrory to hold the newly Upgraded Store
- * @param backupDir the directrory to backup to if required
- * @param force suppress all questions
- * @param inplace replace the from dir with the upgraded result in toDir
- *
- * @throws Exception due to Virtualhost/MessageStore.close() being
- * rather poor at exception handling
- * @throws DatabaseException if there is a problem with the store formats
- * @throws AMQException if there is an issue creating Qpid data structures
- */
- public void upgradeFromVersion(int version, File fromDir, File toDir,
- File backupDir, boolean force,
- boolean inplace) throws Exception
- {
- _logger.info("Located store to upgrade at '" + fromDir + "'");
-
- // Verify user has created a backup, giving option to perform backup
- if (_interactive)
- {
- if (!userInteract("Have you performed a DB backup of this store."))
- {
- File backup;
- if (backupDir == null)
- {
- backup = new File(fromDir.getAbsolutePath().toString() + "-Backup");
- }
- else
- {
- backup = backupDir;
- }
-
- if (userInteract("Do you wish to perform a DB backup now? " +
- "(Store will be backed up to '" + backup.getName() + "')"))
- {
- performDBBackup(fromDir, backup, force);
- }
- else
- {
- if (!userInteract("Are you sure wish to proceed with DB migration without backup? " +
- "(For more details of the consequences check the Qpid/BDB Message Store Wiki)."))
- {
- throw new IllegalArgumentException("Upgrade stopped at user request as no DB Backup performed.");
- }
- }
- }
- else
- {
- if (!inplace)
- {
- _logger.info("Upgrade will create a new store at '" + toDir + "'");
- }
-
- _logger.info("Using the contents in the Message Store '" + fromDir + "'");
-
- if (!userInteract("Do you wish to proceed?"))
- {
- throw new IllegalArgumentException("Upgrade stopped as did not wish to proceed");
- }
- }
- }
- else
- {
- if (backupDir != null)
- {
- performDBBackup(fromDir, backupDir, force);
- }
- }
-
- CurrentActor.set(new BrokerActor(new NullRootMessageLogger()));
-
- //Create a new messageStore
- _newMessageStore = new BDBMessageStore();
- _newMessageStore.configure(toDir, false);
- _newMessageStore.start();
-
- try
- {
- //Load the old MessageStore
- switch (version)
- {
- default:
- case 4:
- _oldMessageStore = new BDBMessageStore(4);
- _oldMessageStore.configure(fromDir, true);
- _oldMessageStore.start();
- upgradeFromVersion_4();
- break;
- case 3:
- case 2:
- case 1:
- throw new IllegalArgumentException(MessageFormat.format(PREVIOUS_STORE_VERSION_UNSUPPORTED,
- new Object[] { Integer.toString(version) }));
- }
- }
- finally
- {
- _newMessageStore.close();
- if (_oldMessageStore != null)
- {
- _oldMessageStore.close();
- }
- // if we are running inplace then swap fromDir and toDir
- if (inplace)
- {
- // Remove original copy
- if (FileUtils.delete(fromDir, true))
- {
- // Rename upgraded store
- toDir.renameTo(fromDir);
- }
- else
- {
- throw new RuntimeException("Unable to upgrade inplace as " +
- "unable to delete source '"
- +fromDir+"', Store upgrade " +
- "successfully performed to :"
- +toDir);
- }
- }
- }
- }
-
- private void upgradeFromVersion_4() throws AMQException, DatabaseException
- {
- _logger.info("Starting store upgrade from version 4");
-
- //Migrate _exchangeDb
- _logger.info("Exchanges");
-
- moveContents(_oldMessageStore.getExchangesDb(), _newMessageStore.getExchangesDb(), "Exchange");
-
-
- TopicExchangeDiscoverer exchangeListVisitor = new TopicExchangeDiscoverer();
- _oldMessageStore.visitExchanges(exchangeListVisitor);
-
- //Inspect the bindings to gather a list of queues which are probably durable subscriptions, i.e. those
- //which have a colon in their name and are bound to the Topic exchanges above
- DurableSubDiscoverer durSubQueueListVisitor =
- new DurableSubDiscoverer(exchangeListVisitor.getTopicExchanges(),
- _oldMessageStore.getBindingTupleBindingFactory().getInstance());
- _oldMessageStore.visitBindings(durSubQueueListVisitor);
-
- final List<AMQShortString> durableSubQueues = durSubQueueListVisitor.getDurableSubQueues();
-
-
- //Migrate _queueBindingsDb
- _logger.info("Queue Bindings");
- BindingsVisitor bindingsVisitor = new BindingsVisitor(durableSubQueues,
- _oldMessageStore.getBindingTupleBindingFactory().getInstance(), _newMessageStore);
- _oldMessageStore.visitBindings(bindingsVisitor);
- logCount(bindingsVisitor.getVisitedCount(), "Queue Binding");
-
- //Migrate _queueDb
- _logger.info("Queues");
-
- // hold the list of existing queue names
-
- final TupleBinding<QueueRecord> queueTupleBinding = _oldMessageStore.getQueueTupleBindingFactory().getInstance();
-
- QueueVisitor queueVisitor = new QueueVisitor(queueTupleBinding, durableSubQueues, _newMessageStore);
- _oldMessageStore.visitQueues(queueVisitor);
- final List<AMQShortString> existingQueues = queueVisitor.getExistingQueues();
-
- logCount(queueVisitor.getVisitedCount(), "Queue");
-
-
- // Look for persistent messages stored for non-durable queues
- _logger.info("Checking for messages previously sent to non-durable queues");
-
- // delivery DB visitor to check message delivery and identify non existing queues
- final QueueEntryTB queueEntryTB = new QueueEntryTB();
- MessageDeliveryCheckVisitor messageDeliveryCheckVisitor =
- new MessageDeliveryCheckVisitor(queueEntryTB, queueVisitor.getExistingQueues());
- _oldMessageStore.visitDelivery(messageDeliveryCheckVisitor);
-
- final Set<Long> queueMessages = messageDeliveryCheckVisitor.getQueueMessages();
-
- if (messageDeliveryCheckVisitor.getPhantomMessageQueues().isEmpty())
- {
- _logger.info("No such messages were found");
- }
- else
- {
- _logger.info("Found " + messageDeliveryCheckVisitor.getVisitedCount()+ " such messages in total");
-
- for (Entry<String, HashSet<Long>> phantomQueue : messageDeliveryCheckVisitor.getPhantomMessageQueues().entrySet())
- {
- String queueName = phantomQueue.getKey();
- HashSet<Long> messages = phantomQueue.getValue();
-
- _logger.info(MessageFormat.format("There are {0} messages which were previously delivered to non-durable queue ''{1}''",messages.size(), queueName));
-
- boolean createQueue;
- if(!_interactive)
- {
- createQueue = true;
- _logger.info("Running in batch-mode, marking queue as durable to ensure retention of the messages.");
- }
- else
- {
- createQueue = userInteract("Do you want to make this queue durable?\n"
- + "NOTE: Answering No will result in these messages being discarded!");
- }
-
- if (createQueue)
- {
- for (Long messageId : messages)
- {
- queueMessages.add(messageId);
- }
- AMQShortString name = new AMQShortString(queueName);
- existingQueues.add(name);
- QueueRecord record = new QueueRecord(name, null, false, null);
- _newMessageStore.createQueue(record);
- }
- }
- }
-
-
- //Migrate _messageMetaDataDb
- _logger.info("Message MetaData");
-
- final Database newMetaDataDB = _newMessageStore.getMetaDataDb();
-
- MetaDataVisitor metaDataVisitor = new MetaDataVisitor(queueMessages, newMetaDataDB,
- _oldMessageStore.getMetaDataTupleBindingFactory().getInstance(),
- _newMessageStore.getMetaDataTupleBindingFactory().getInstance());
- _oldMessageStore.visitMetaDataDb(metaDataVisitor);
-
- logCount(metaDataVisitor.getVisitedCount(), "Message MetaData");
-
-
- //Migrate _messageContentDb
- _logger.info("Message Contents");
- final Database newContentDB = _newMessageStore.getContentDb();
-
- final TupleBinding<MessageContentKey> oldContentKeyTupleBinding = new MessageContentKeyTB_4();
- final TupleBinding<MessageContentKey> newContentKeyTupleBinding = new MessageContentKeyTB_5();
- final TupleBinding contentTB = new ContentTB();
-
- DatabaseVisitor contentVisitor = new ContentVisitor(oldContentKeyTupleBinding, queueMessages,
- contentTB, newContentKeyTupleBinding, newContentDB);
- _oldMessageStore.visitContentDb(contentVisitor);
-
- logCount(contentVisitor.getVisitedCount(), "Message Content");
-
-
- //Migrate _deliveryDb
- _logger.info("Delivery Records");
- final Database deliveryDb =_newMessageStore.getDeliveryDb();
- DatabaseVisitor deliveryDbVisitor = new DeliveryDbVisitor(queueEntryTB, existingQueues, deliveryDb);
- _oldMessageStore.visitDelivery(deliveryDbVisitor);
- logCount(contentVisitor.getVisitedCount(), "Delivery Record");
- }
-
- /**
- * Log the specified count for item in a user friendly way.
- *
- * @param count of items to log
- * @param item description of what is being logged.
- */
- private void logCount(int count, String item)
- {
- _logger.info(" " + count + " " + item + " " + (count == 1 ? "entry" : "entries"));
- }
-
- /**
- * @param oldDatabase The old MessageStoreDB to perform the visit on
- * @param newDatabase The new MessageStoreDB to copy the data to.
- * @param contentName The string name of the content for display purposes.
- *
- * @throws AMQException Due to createQueue thorwing AMQException
- * @throws DatabaseException If there is a problem with the loading of the data
- */
- private void moveContents(Database oldDatabase, final Database newDatabase, String contentName) throws AMQException, DatabaseException
- {
-
- DatabaseVisitor moveVisitor = new DatabaseVisitor()
- {
- public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
- {
- incrementCount();
- newDatabase.put(null, key, value);
- }
- };
-
- _oldMessageStore.visitDatabase(oldDatabase, moveVisitor);
-
- logCount(moveVisitor.getVisitedCount(), contentName);
- }
-
- private static void usage()
- {
- System.out.println("usage: BDBStoreUpgrade:\n [-h|--help] [-q|--quiet] [-f|--force] [-b|--backup <Path to backup-db>] " +
- "-i|--input <Path to input-db> [-o|--output <Path to upgraded-db>]");
- }
-
- private static void help()
- {
- System.out.println("usage: BDBStoreUpgrade:");
- System.out.println("Required:");
- for (Object obj : _options.getOptions())
- {
- Option option = (Option) obj;
- if (option.isRequired())
- {
- System.out.println("-" + option.getOpt() + "|--" + option.getLongOpt() + "\t\t-\t" + option.getDescription());
- }
- }
-
- System.out.println("\nOptions:");
- for (Object obj : _options.getOptions())
- {
- Option option = (Option) obj;
- if (!option.isRequired())
- {
- System.out.println("--" + option.getLongOpt() + "|-" + option.getOpt() + "\t\t-\t" + option.getDescription());
- }
- }
- }
-
- static boolean isDirectoryAStoreDir(File directory)
- {
- if (directory.isFile())
- {
- return false;
- }
-
- for (File file : directory.listFiles())
- {
- if (file.isFile())
- {
- if (file.getName().endsWith(BDB_FILE_ENDING))
- {
- return true;
- }
- }
- }
- return false;
- }
-
- static File[] discoverDBStores(File fromDir)
- {
- if (!fromDir.exists())
- {
- throw new IllegalArgumentException("'" + fromDir + "' does not exist unable to upgrade.");
- }
-
- // Ensure we are given a directory
- if (fromDir.isFile())
- {
- throw new IllegalArgumentException("'" + fromDir + "' is not a directory unable to upgrade.");
- }
-
- // Check to see if we have been given a single directory
- if (isDirectoryAStoreDir(fromDir))
- {
- return new File[]{fromDir};
- }
-
- // Check to see if we have been give a directory containing stores.
- List<File> stores = new LinkedList<File>();
-
- for (File directory : fromDir.listFiles())
- {
- if (directory.isDirectory())
- {
- if (isDirectoryAStoreDir(directory))
- {
- stores.add(directory);
- }
- }
- }
-
- return stores.toArray(new File[stores.size()]);
- }
-
- private static void performDBBackup(File source, File backup, boolean force) throws Exception
- {
- if (backup.exists())
- {
- if (force)
- {
- _logger.info("Backup location exists. Forced to remove.");
- FileUtils.delete(backup, true);
- }
- else
- {
- throw new IllegalArgumentException("Unable to perform backup a backup already exists.");
- }
- }
-
- try
- {
- _logger.info("Backing up '" + source + "' to '" + backup + "'");
- FileUtils.copyRecursive(source, backup);
- }
- catch (FileNotFoundException e)
- {
- //Throwing IAE here as this will be reported as a Backup not started
- throw new IllegalArgumentException("Unable to perform backup:" + e.getMessage());
- }
- catch (FileUtils.UnableToCopyException e)
- {
- //Throwing exception here as this will be reported as a Failed Backup
- throw new Exception("Unable to perform backup due to:" + e.getMessage());
- }
- }
-
- public static void main(String[] args) throws ParseException
- {
- setOptions(_options);
-
- final Options helpOptions = new Options();
- setHelpOptions(helpOptions);
-
- //Display help
- boolean displayHelp = false;
- try
- {
- if (new PosixParser().parse(helpOptions, args).hasOption("h"))
- {
- showHelp();
- }
- }
- catch (ParseException pe)
- {
- displayHelp = true;
- }
-
- //Parse commandline for required arguments
- try
- {
- _commandLine = new PosixParser().parse(_options, args);
- }
- catch (ParseException mae)
- {
- if (displayHelp)
- {
- showHelp();
- }
- else
- {
- fatalError(mae.getMessage());
- }
- }
-
- String fromDir = _commandLine.getOptionValue(OPTION_INPUT_SHORT);
- String toDir = _commandLine.getOptionValue(OPTION_OUTPUT_SHORT);
- String backupDir = _commandLine.getOptionValue(OPTION_BACKUP_SHORT);
-
- if (backupDir == null && _commandLine.hasOption(OPTION_BACKUP_SHORT))
- {
- backupDir = "";
- }
-
- //Attempt to locate possible Store to upgrade on input path
- File[] stores = new File[0];
- try
- {
- stores = discoverDBStores(new File(fromDir));
- }
- catch (IllegalArgumentException iae)
- {
- fatalError(iae.getMessage());
- }
-
- boolean interactive = !_commandLine.hasOption(OPTION_QUIET_SHORT);
- boolean force = _commandLine.hasOption(OPTION_FORCE_SHORT);
-
- try{
- for (File store : stores)
- {
-
- // if toDir is null then we are upgrading inplace so we don't need
- // to provide an upgraded toDir when upgrading multiple stores.
- if (toDir == null ||
- // Check to see if we are upgrading a store specified in
- // fromDir or if the directories are nested.
- (stores.length > 0
- && stores[0].toString().length() == fromDir.length()))
- {
- upgrade(store, toDir, backupDir, interactive, force);
- }
- else
- {
- // Add the extra part of path from store to the toDir
- upgrade(store, toDir + File.separator + store.toString().substring(fromDir.length()), backupDir, interactive, force);
- }
- }
- }
- catch (RuntimeException re)
- {
- if (!(USER_ABORTED_PROCESS).equals(re.getMessage()))
- {
- re.printStackTrace();
- _logger.error("Upgrade Failed: " + re.getMessage());
- }
- else
- {
- _logger.error("Upgrade stopped : User aborted");
- }
- }
-
- }
-
- @SuppressWarnings("static-access")
- private static void setOptions(Options options)
- {
- Option input =
- OptionBuilder.isRequired().hasArg().withDescription("Location (Path) of store to upgrade.").withLongOpt(OPTION_INPUT)
- .create(OPTION_INPUT_SHORT);
-
- Option output =
- OptionBuilder.hasArg().withDescription("Location (Path) for the upgraded-db to be written.").withLongOpt(OPTION_OUTPUT)
- .create(OPTION_OUTPUT_SHORT);
-
- Option quiet = new Option(OPTION_QUIET_SHORT, OPTION_QUIET, false, "Disable interactive options.");
-
- Option force = new Option(OPTION_FORCE_SHORT, OPTION_FORCE, false, "Force upgrade removing any existing upgrade target.");
- Option backup =
- OptionBuilder.hasOptionalArg().withDescription("Location (Path) for the backup-db to be written.").withLongOpt(OPTION_BACKUP)
- .create(OPTION_BACKUP_SHORT);
-
- options.addOption(input);
- options.addOption(output);
- options.addOption(quiet);
- options.addOption(force);
- options.addOption(backup);
- setHelpOptions(options);
- }
-
- private static void setHelpOptions(Options options)
- {
- options.addOption(new Option("h", "help", false, "Show this help."));
- }
-
- static void upgrade(File fromDir, String toDir, String backupDir, boolean interactive, boolean force)
- {
-
- _logger.info("Running BDB Message Store upgrade tool: v" + VERSION);
- int version = getStoreVersion(fromDir);
- if (!isVersionUpgradable(version))
- {
- return;
- }
- try
- {
- new BDBStoreUpgrade(fromDir.toString(), toDir, backupDir, interactive, force).upgradeFromVersion(version);
-
- _logger.info("Upgrade complete.");
- }
- catch (IllegalArgumentException iae)
- {
- _logger.error("Upgrade not started due to: " + iae.getMessage());
- }
- catch (DatabaseException de)
- {
- de.printStackTrace();
- _logger.error("Upgrade Failed: " + de.getMessage());
- }
- catch (RuntimeException re)
- {
- if (!(USER_ABORTED_PROCESS).equals(re.getMessage()))
- {
- re.printStackTrace();
- _logger.error("Upgrade Failed: " + re.getMessage());
- }
- else
- {
- throw re;
- }
- }
- catch (Exception e)
- {
- e.printStackTrace();
- _logger.error("Upgrade Failed: " + e.getMessage());
- }
- }
-
- /**
- * Utility method to verify if store of given version can be upgraded.
- *
- * @param version
- * store version to verify
- * @return true if store can be upgraded, false otherwise
- */
- protected static boolean isVersionUpgradable(int version)
- {
- boolean storeUpgradable = false;
- if (version == 0)
- {
- _logger.error("Existing store version is undefined!");
- }
- else if (version < LOWEST_SUPPORTED_STORE_VERSION)
- {
- _logger.error(MessageFormat.format(PREVIOUS_STORE_VERSION_UNSUPPORTED,
- new Object[] { Integer.toString(version) }));
- }
- else if (version == BDBMessageStore.DATABASE_FORMAT_VERSION)
- {
- _logger.error(MessageFormat.format(STORE_ALREADY_UPGRADED, new Object[] { Integer.toString(version) }));
- }
- else if (version > BDBMessageStore.DATABASE_FORMAT_VERSION)
- {
- _logger.error(MessageFormat.format(FOLLOWING_STORE_VERSION_UNSUPPORTED,
- new Object[] { Integer.toString(version) }));
- }
- else
- {
- _logger.info("Existing store version is " + version);
- storeUpgradable = true;
- }
- return storeUpgradable;
- }
-
- /**
- * Detects existing store version by checking list of database in store
- * environment
- *
- * @param fromDir
- * store folder
- * @return version
- */
- public static int getStoreVersion(File fromDir)
- {
- int version = 0;
- EnvironmentConfig envConfig = new EnvironmentConfig();
- envConfig.setAllowCreate(false);
- envConfig.setTransactional(false);
- envConfig.setReadOnly(true);
- Environment environment = null;
- try
- {
-
- environment = new Environment(fromDir, envConfig);
- List<String> databases = environment.getDatabaseNames();
- for (String name : databases)
- {
- if (name.startsWith("exchangeDb"))
- {
- if (name.startsWith("exchangeDb_v"))
- {
- version = Integer.parseInt(name.substring(12));
- }
- else
- {
- version = 1;
- }
- break;
- }
- }
- }
- catch (Exception e)
- {
- _logger.error("Failure to open existing database: " + e.getMessage());
- }
- finally
- {
- if (environment != null)
- {
- try
- {
- environment.close();
- }
- catch (Exception e)
- {
- // ignoring. It should never happen.
- }
- }
- }
- return version;
- }
-
- private static void fatalError(String message)
- {
- System.out.println(message);
- usage();
- System.exit(1);
- }
-
- private static void showHelp()
- {
- help();
- System.exit(0);
- }
-
- private static class TopicExchangeDiscoverer extends DatabaseVisitor
- {
- private final List<AMQShortString> topicExchanges = new ArrayList<AMQShortString>();
- private final TupleBinding exchangeTB = new ExchangeTB();
-
- public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
- {
- ExchangeRecord exchangeRec = (ExchangeRecord) exchangeTB.entryToObject(value);
- AMQShortString type = exchangeRec.getType();
-
- if (ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(type))
- {
- topicExchanges.add(exchangeRec.getNameShortString());
- }
- }
-
- public List<AMQShortString> getTopicExchanges()
- {
- return topicExchanges;
- }
- }
-
- private static class MessageDeliveryCheckVisitor extends DatabaseVisitor
- {
- private final QueueEntryTB _queueEntryTB;
- private final List<AMQShortString> _existingQueues;
-
- // track all message delivery to existing queues
- private final HashSet<Long> _queueMessages = new HashSet<Long>();
-
- // hold all non existing queues and their messages IDs
- private final HashMap<String, HashSet<Long>> _phantomMessageQueues = new HashMap<String, HashSet<Long>>();
-
-
-
- public MessageDeliveryCheckVisitor(QueueEntryTB queueEntryTB, List<AMQShortString> existingQueues)
- {
- _queueEntryTB = queueEntryTB;
- _existingQueues = existingQueues;
- }
-
- public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
- {
- QueueEntryKey entryKey = (QueueEntryKey) _queueEntryTB.entryToObject(key);
- Long messageId = entryKey.getMessageId();
- AMQShortString queueName = entryKey.getQueueName();
- if (!_existingQueues.contains(queueName))
- {
- String name = queueName.asString();
- HashSet<Long> messages = _phantomMessageQueues.get(name);
- if (messages == null)
- {
- messages = new HashSet<Long>();
- _phantomMessageQueues.put(name, messages);
- }
- messages.add(messageId);
- incrementCount();
- }
- else
- {
- _queueMessages.add(messageId);
- }
- }
-
- public HashSet<Long> getQueueMessages()
- {
- return _queueMessages;
- }
-
- public HashMap<String, HashSet<Long>> getPhantomMessageQueues()
- {
- return _phantomMessageQueues;
- }
- }
-
- private static class ContentVisitor extends DatabaseVisitor
- {
- private long _prevMsgId; //Initialise to invalid value
- private int _bytesSeenSoFar;
- private final TupleBinding<MessageContentKey> _oldContentKeyTupleBinding;
- private final Set<Long> _queueMessages;
- private final TupleBinding _contentTB;
- private final TupleBinding<MessageContentKey> _newContentKeyTupleBinding;
- private final Database _newContentDB;
-
- public ContentVisitor(TupleBinding<MessageContentKey> oldContentKeyTupleBinding, Set<Long> queueMessages, TupleBinding contentTB, TupleBinding<MessageContentKey> newContentKeyTupleBinding, Database newContentDB)
- {
- _oldContentKeyTupleBinding = oldContentKeyTupleBinding;
- _queueMessages = queueMessages;
- _contentTB = contentTB;
- _newContentKeyTupleBinding = newContentKeyTupleBinding;
- _newContentDB = newContentDB;
- _prevMsgId = -1;
- _bytesSeenSoFar = 0;
- }
-
- public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
- {
- incrementCount();
-
- //determine the msgId of the current entry
- MessageContentKey_4 contentKey = (MessageContentKey_4) _oldContentKeyTupleBinding.entryToObject(key);
- long msgId = contentKey.getMessageId();
-
- // ONLY copy data if message is delivered to existing queue
- if (!_queueMessages.contains(msgId))
- {
- return;
- }
- //if this is a new message, restart the byte offset count.
- if(_prevMsgId != msgId)
- {
- _bytesSeenSoFar = 0;
- }
-
- //determine the content size
- ByteBuffer content = (ByteBuffer) _contentTB.entryToObject(value);
- int contentSize = content.limit();
-
- //create the new key: id + previously seen data count
- MessageContentKey_5 newKey = new MessageContentKey_5(msgId, _bytesSeenSoFar);
- DatabaseEntry newKeyEntry = new DatabaseEntry();
- _newContentKeyTupleBinding.objectToEntry(newKey, newKeyEntry);
-
- DatabaseEntry newValueEntry = new DatabaseEntry();
- _contentTB.objectToEntry(content, newValueEntry);
-
- _newContentDB.put(null, newKeyEntry, newValueEntry);
-
- _prevMsgId = msgId;
- _bytesSeenSoFar += contentSize;
- }
- }
-
- private static class DeliveryDbVisitor extends DatabaseVisitor
- {
-
- private final QueueEntryTB _queueEntryTB;
- private final List<AMQShortString> _existingQueues;
- private final Database _deliveryDb;
-
- public DeliveryDbVisitor(QueueEntryTB queueEntryTB, List<AMQShortString> existingQueues, Database deliveryDb)
- {
- _queueEntryTB = queueEntryTB;
- _existingQueues = existingQueues;
- _deliveryDb = deliveryDb;
- }
-
- public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
- {
- incrementCount();
-
- // get message id from entry key
- QueueEntryKey entryKey = (QueueEntryKey) _queueEntryTB.entryToObject(key);
- AMQShortString queueName = entryKey.getQueueName();
-
- // ONLY copy data if message queue exists
- if (_existingQueues.contains(queueName))
- {
- _deliveryDb.put(null, key, value);
- }
- }
- }
-
- private class DurableSubDiscoverer extends DatabaseVisitor
- {
- private final List<AMQShortString> _durableSubQueues;
- private final TupleBinding<BindingRecord> _bindingTB;
- private final List<AMQShortString> _topicExchanges;
-
-
- public DurableSubDiscoverer(List<AMQShortString> topicExchanges, TupleBinding<BindingRecord> bindingTB)
- {
- _durableSubQueues = new ArrayList<AMQShortString>();
- _bindingTB = bindingTB;
- _topicExchanges = topicExchanges;
- }
-
- public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
- {
- BindingRecord bindingRec = _bindingTB.entryToObject(key);
- AMQShortString queueName = bindingRec.getQueueName();
- AMQShortString exchangeName = bindingRec.getExchangeName();
-
- if (_topicExchanges.contains(exchangeName) && queueName.asString().contains(":"))
- {
- _durableSubQueues.add(queueName);
- }
- }
-
- public List<AMQShortString> getDurableSubQueues()
- {
- return _durableSubQueues;
- }
- }
-
- private static class QueueVisitor extends DatabaseVisitor
- {
- private final TupleBinding<QueueRecord> _queueTupleBinding;
- private final List<AMQShortString> _durableSubQueues;
- private final List<AMQShortString> _existingQueues = new ArrayList<AMQShortString>();
- private final BDBMessageStore _newMessageStore;
-
- public QueueVisitor(TupleBinding<QueueRecord> queueTupleBinding,
- List<AMQShortString> durableSubQueues,
- BDBMessageStore newMessageStore)
- {
- _queueTupleBinding = queueTupleBinding;
- _durableSubQueues = durableSubQueues;
- _newMessageStore = newMessageStore;
- }
-
- public void visit(DatabaseEntry key, DatabaseEntry value) throws AMQStoreException
- {
- QueueRecord queueRec = _queueTupleBinding.entryToObject(value);
- AMQShortString queueName = queueRec.getNameShortString();
-
- //if the queue name is in the gathered list then set its exclusivity true
- if (_durableSubQueues.contains(queueName))
- {
- _logger.info("Marking as possible DurableSubscription backing queue: " + queueName);
- queueRec.setExclusive(true);
- }
-
- //The simple call to createQueue with the QueueRecord object is sufficient for a v2->v3 upgrade as
- //the extra 'exclusive' property in v3 will be defaulted to false in the record creation.
- _newMessageStore.createQueue(queueRec);
-
- incrementCount();
- _existingQueues.add(queueName);
- }
-
- public List<AMQShortString> getExistingQueues()
- {
- return _existingQueues;
- }
- }
-
- private static class BindingsVisitor extends DatabaseVisitor
- {
- private final List<AMQShortString> _durableSubQueues;
- private final BDBMessageStore _newMessageStore;
- private final TupleBinding<BindingRecord> _oldBindingTB;
- private AMQShortString _selectorFilterKey;
-
- public BindingsVisitor(List<AMQShortString> durableSubQueues,
- TupleBinding<BindingRecord> oldBindingTB,
- BDBMessageStore newMessageStore)
- {
- _oldBindingTB = oldBindingTB;
- _durableSubQueues = durableSubQueues;
- _newMessageStore = newMessageStore;
- _selectorFilterKey = AMQPFilterTypes.JMS_SELECTOR.getValue();
- }
-
- public void visit(DatabaseEntry key, DatabaseEntry value) throws AMQStoreException
- {
- //All the information required in binding entries is actually in the *key* not value.
- BindingRecord oldBindingRec = _oldBindingTB.entryToObject(key);
-
- AMQShortString queueName = oldBindingRec.getQueueName();
- AMQShortString exchangeName = oldBindingRec.getExchangeName();
- AMQShortString routingKey = oldBindingRec.getRoutingKey();
- FieldTable arguments = oldBindingRec.getArguments();
-
- //if the queue name is in the gathered list then inspect its binding arguments
- if (_durableSubQueues.contains(queueName))
- {
- if(arguments == null)
- {
- arguments = new FieldTable();
- }
-
- if(!arguments.containsKey(_selectorFilterKey))
- {
- //add the empty string (i.e. 'no selector') value for the selector argument
- arguments.put(_selectorFilterKey, "");
- }
- }
-
- //create the binding in the new store
- _newMessageStore.bindQueue(
- new BindingRecord(exchangeName, queueName, routingKey, arguments));
- }
- }
-
- private static class MetaDataVisitor extends DatabaseVisitor
- {
- private final TupleBinding<Object> _oldMetaDataTupleBinding;
- private final TupleBinding<Object> _newMetaDataTupleBinding;
- private final Set<Long> _queueMessages;
- private final Database _newMetaDataDB;
-
- public MetaDataVisitor(Set<Long> queueMessages,
- Database newMetaDataDB,
- TupleBinding<Object> oldMetaDataTupleBinding,
- TupleBinding<Object> newMetaDataTupleBinding)
- {
- _queueMessages = queueMessages;
- _newMetaDataDB = newMetaDataDB;
- _oldMetaDataTupleBinding = oldMetaDataTupleBinding;
- _newMetaDataTupleBinding = newMetaDataTupleBinding;
- }
-
-
- public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
- {
- incrementCount();
- MessageMetaData metaData = (MessageMetaData) _oldMetaDataTupleBinding.entryToObject(value);
-
- // get message id
- Long messageId = TupleBinding.getPrimitiveBinding(Long.class).entryToObject(key);
-
- // ONLY copy data if message is delivered to existing queue
- if (!_queueMessages.contains(messageId))
- {
- return;
- }
- DatabaseEntry newValue = new DatabaseEntry();
- _newMetaDataTupleBinding.objectToEntry(metaData, newValue);
-
- _newMetaDataDB.put(null, key, newValue);
- }
- }
-}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DatabaseVisitor.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DatabaseVisitor.java
deleted file mode 100644
index c6a1372d7e..0000000000
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DatabaseVisitor.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.DatabaseException;
-
-import org.apache.qpid.AMQStoreException;
-
-/** Visitor Interface so that each DatabaseEntry for a database can easily be processed. */
-public abstract class DatabaseVisitor
-{
- private int _count;
-
- abstract public void visit(DatabaseEntry entry, DatabaseEntry value) throws AMQStoreException, DatabaseException;
-
- public final int getVisitedCount()
- {
- return _count;
- }
-
- protected final void incrementCount()
- {
- _count++;
- }
-
- public void resetVisitCount()
- {
- _count = 0;
- }
-}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/BindingRecord.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/BindingRecord.java
index 394a6ea85c..b9d868f909 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/BindingRecord.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/BindingRecord.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.berkeleydb.records;
+package org.apache.qpid.server.store.berkeleydb.entry;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/ExchangeRecord.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/ExchangeRecord.java
index f20367e33b..180893178d 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/ExchangeRecord.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/ExchangeRecord.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.berkeleydb.records;
+package org.apache.qpid.server.store.berkeleydb.entry;
import org.apache.qpid.framing.AMQShortString;
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/PreparedTransaction.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java
index bfd72b9a1f..11ae8b89eb 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/PreparedTransaction.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java
@@ -19,7 +19,7 @@
*
*/
-package org.apache.qpid.server.store.berkeleydb.records;
+package org.apache.qpid.server.store.berkeleydb.entry;
import org.apache.qpid.server.store.MessageStore;
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/QueueEntryKey.java
index c274fdec8c..a716758da3 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/QueueEntryKey.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.berkeleydb;
+package org.apache.qpid.server.store.berkeleydb.entry;
import org.apache.qpid.framing.AMQShortString;
@@ -27,23 +27,19 @@ public class QueueEntryKey
private AMQShortString _queueName;
private long _messageId;
-
public QueueEntryKey(AMQShortString queueName, long messageId)
{
_queueName = queueName;
_messageId = messageId;
}
-
public AMQShortString getQueueName()
{
return _queueName;
}
-
public long getMessageId()
{
return _messageId;
}
-
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/QueueRecord.java
index fbe10433ca..5ea82427dc 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/QueueRecord.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.berkeleydb.records;
+package org.apache.qpid.server.store.berkeleydb.entry;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
@@ -47,12 +47,12 @@ public class QueueRecord extends Object
{
return _owner;
}
-
+
public boolean isExclusive()
{
return _exclusive;
}
-
+
public void setExclusive(boolean exclusive)
{
_exclusive = exclusive;
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/Xid.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/Xid.java
index f74d67b355..bed7575f9a 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/Xid.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/Xid.java
@@ -19,7 +19,7 @@
*
*/
-package org.apache.qpid.server.store.berkeleydb.keys;
+package org.apache.qpid.server.store.berkeleydb.entry;
public class Xid
{
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_4.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_4.java
deleted file mode 100644
index 30357c97d4..0000000000
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_4.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.keys;
-
-import org.apache.qpid.server.store.berkeleydb.MessageContentKey;
-
-public class MessageContentKey_4 extends MessageContentKey
-{
- private int _chunkNum;
-
- public MessageContentKey_4(long messageId, int chunkNo)
- {
- super(messageId);
- _chunkNum = chunkNo;
- }
-
- public int getChunk()
- {
- return _chunkNum;
- }
-
- public void setChunk(int chunk)
- {
- this._chunkNum = chunk;
- }
-} \ No newline at end of file
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_5.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_5.java
deleted file mode 100644
index a1a7fe80b5..0000000000
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_5.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.keys;
-
-import org.apache.qpid.server.store.berkeleydb.MessageContentKey;
-
-public class MessageContentKey_5 extends MessageContentKey
-{
- private int _offset;
-
- public MessageContentKey_5(long messageId, int chunkNo)
- {
- super(messageId);
- _offset = chunkNo;
- }
-
- public int getOffset()
- {
- return _offset;
- }
-
- public void setOffset(int chunk)
- {
- this._offset = chunk;
- }
-} \ No newline at end of file
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/AMQShortStringBinding.java
index 351b5b4f5b..b57ffb0169 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/AMQShortStringBinding.java
@@ -18,32 +18,34 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.berkeleydb;
+package org.apache.qpid.server.store.berkeleydb.tuple;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.framing.AMQShortString;
-public class AMQShortStringTB extends TupleBinding
+public class AMQShortStringBinding extends TupleBinding<AMQShortString>
{
- private static final Logger _log = Logger.getLogger(AMQShortStringTB.class);
+ private static final AMQShortStringBinding INSTANCE = new AMQShortStringBinding();
-
- public AMQShortStringTB()
+ public static AMQShortStringBinding getInstance()
{
+ return INSTANCE;
}
- public Object entryToObject(TupleInput tupleInput)
+ /** private constructor forces getInstance instead */
+ private AMQShortStringBinding() { }
+
+ public AMQShortString entryToObject(TupleInput tupleInput)
{
return AMQShortStringEncoding.readShortString(tupleInput);
}
- public void objectToEntry(Object object, TupleOutput tupleOutput)
+ public void objectToEntry(AMQShortString object, TupleOutput tupleOutput)
{
- AMQShortStringEncoding.writeShortString((AMQShortString)object, tupleOutput);
+ AMQShortStringEncoding.writeShortString(object, tupleOutput);
}
-
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ContentBinding.java
index ef9f60b2c4..9154ca114a 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ContentBinding.java
@@ -18,35 +18,35 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.berkeleydb;
+package org.apache.qpid.server.store.berkeleydb.tuple;
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
-import java.nio.ByteBuffer;
-
-public class ContentTB extends TupleBinding
+public class ContentBinding extends TupleBinding<byte[]>
{
- public Object entryToObject(TupleInput tupleInput)
- {
+ private static final ContentBinding INSTANCE = new ContentBinding();
- final int size = tupleInput.readInt();
- byte[] underlying = new byte[size];
- tupleInput.readFast(underlying);
- return ByteBuffer.wrap(underlying);
+ public static ContentBinding getInstance()
+ {
+ return INSTANCE;
}
- public void objectToEntry(Object object, TupleOutput tupleOutput)
- {
- ByteBuffer src = (ByteBuffer) object;
-
- src = src.slice();
+ /** private constructor forces getInstance instead */
+ private ContentBinding() { }
- byte[] chunkData = new byte[src.limit()];
- src.duplicate().get(chunkData);
+ @Override
+ public byte[] entryToObject(final TupleInput input)
+ {
+ byte[] data = new byte[input.available()];
+ input.read(data);
+ return data;
+ }
- tupleOutput.writeInt(chunkData.length);
- tupleOutput.writeFast(chunkData);
+ @Override
+ public void objectToEntry(final byte[] data, final TupleOutput output)
+ {
+ output.write(data);
}
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ExchangeBinding.java
index c7a409f8e1..d4b1475ac7 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ExchangeBinding.java
@@ -18,39 +18,40 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.berkeleydb;
+package org.apache.qpid.server.store.berkeleydb.tuple;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
+import org.apache.qpid.server.store.berkeleydb.entry.ExchangeRecord;
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.store.berkeleydb.records.ExchangeRecord;
-public class ExchangeTB extends TupleBinding
+public class ExchangeBinding extends TupleBinding<ExchangeRecord>
{
- private static final Logger _log = Logger.getLogger(ExchangeTB.class);
+ private static final ExchangeBinding INSTANCE = new ExchangeBinding();
- public ExchangeTB()
+ public static ExchangeBinding getInstance()
{
+ return INSTANCE;
}
- public Object entryToObject(TupleInput tupleInput)
- {
+ /** private constructor forces getInstance instead */
+ private ExchangeBinding() { }
+ public ExchangeRecord entryToObject(TupleInput tupleInput)
+ {
AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
AMQShortString typeName = AMQShortStringEncoding.readShortString(tupleInput);
boolean autoDelete = tupleInput.readBoolean();
-
+
return new ExchangeRecord(name, typeName, autoDelete);
}
- public void objectToEntry(Object object, TupleOutput tupleOutput)
+ public void objectToEntry(ExchangeRecord exchange, TupleOutput tupleOutput)
{
- ExchangeRecord exchange = (ExchangeRecord) object;
-
AMQShortStringEncoding.writeShortString(exchange.getNameShortString(), tupleOutput);
AMQShortStringEncoding.writeShortString(exchange.getType(), tupleOutput);
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_5.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java
index 4e124a03e3..2e6c8d5666 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_5.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java
@@ -18,8 +18,11 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.berkeleydb.tuples;
+package org.apache.qpid.server.store.berkeleydb.tuple;
+import java.nio.ByteBuffer;
+
+import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
@@ -29,17 +32,26 @@ import org.apache.qpid.server.store.StorableMessageMetaData;
/**
* Handles the mapping to and from message meta data
*/
-public class MessageMetaDataTB_5 extends MessageMetaDataTB_4
+public class MessageMetaDataBinding extends TupleBinding<StorableMessageMetaData>
{
+ private static final MessageMetaDataBinding INSTANCE = new MessageMetaDataBinding();
+
+ public static MessageMetaDataBinding getInstance()
+ {
+ return INSTANCE;
+ }
+
+ /** private constructor forces getInstance instead */
+ private MessageMetaDataBinding() { }
@Override
- public Object entryToObject(TupleInput tupleInput)
+ public StorableMessageMetaData entryToObject(TupleInput tupleInput)
{
final int bodySize = tupleInput.readInt();
byte[] dataAsBytes = new byte[bodySize];
tupleInput.readFast(dataAsBytes);
- java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(dataAsBytes);
+ ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
buf.position(1);
buf = buf.slice();
MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]];
@@ -49,14 +61,12 @@ public class MessageMetaDataTB_5 extends MessageMetaDataTB_4
}
@Override
- public void objectToEntry(Object object, TupleOutput tupleOutput)
+ public void objectToEntry(StorableMessageMetaData metaData, TupleOutput tupleOutput)
{
- StorableMessageMetaData metaData = (StorableMessageMetaData) object;
-
final int bodySize = 1 + metaData.getStorableSize();
byte[] underlying = new byte[bodySize];
underlying[0] = (byte) metaData.getType().ordinal();
- java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(underlying);
+ ByteBuffer buf = ByteBuffer.wrap(underlying);
buf.position(1);
buf = buf.slice();
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/PreparedTransactionTB.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java
index 3eb4cb69b5..d85bcd361e 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/PreparedTransactionTB.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java
@@ -19,7 +19,7 @@
*
*/
-package org.apache.qpid.server.store.berkeleydb.tuples;
+package org.apache.qpid.server.store.berkeleydb.tuple;
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
@@ -28,9 +28,9 @@ import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.store.berkeleydb.records.PreparedTransaction;
+import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
-public class PreparedTransactionTB extends TupleBinding<PreparedTransaction>
+public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction>
{
@Override
public PreparedTransaction entryToObject(TupleInput input)
@@ -109,7 +109,7 @@ public class PreparedTransactionTB extends TupleBinding<PreparedTransaction>
return true;
}
- public StoredMessage getStoredMessage()
+ public StoredMessage<?> getStoredMessage()
{
throw new UnsupportedOperationException();
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueBinding.java
index c9094a132d..7e1c63cc28 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueBinding.java
@@ -18,8 +18,9 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.berkeleydb.tuples;
+package org.apache.qpid.server.store.berkeleydb.tuple;
+import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
import com.sleepycat.je.DatabaseException;
@@ -29,17 +30,22 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
-import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
+import org.apache.qpid.server.store.berkeleydb.entry.QueueRecord;
-public class QueueTuple_5 extends QueueTuple_4
+public class QueueBinding extends TupleBinding<QueueRecord>
{
- private static final Logger _logger = Logger.getLogger(QueueTuple_5.class);
+ private static final Logger _logger = Logger.getLogger(QueueBinding.class);
- public QueueTuple_5()
+ private static final QueueBinding INSTANCE = new QueueBinding();
+
+ public static QueueBinding getInstance()
{
- super();
+ return INSTANCE;
}
+ /** private constructor forces getInstance instead */
+ private QueueBinding() { }
+
public QueueRecord entryToObject(TupleInput tupleInput)
{
try
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_4.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueBindingTupleBinding.java
index c6a5e63bc8..6ba929a541 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_4.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueBindingTupleBinding.java
@@ -18,29 +18,34 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.berkeleydb.tuples;
+package org.apache.qpid.server.store.berkeleydb.tuple;
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-import com.sleepycat.je.DatabaseException;
import org.apache.log4j.Logger;
-
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
-import org.apache.qpid.server.store.berkeleydb.records.BindingRecord;
+import org.apache.qpid.server.store.berkeleydb.entry.BindingRecord;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import com.sleepycat.je.DatabaseException;
-public class BindingTuple_4 extends TupleBinding<BindingRecord> implements BindingTuple
+public class QueueBindingTupleBinding extends TupleBinding<BindingRecord>
{
- protected static final Logger _log = Logger.getLogger(BindingTuple.class);
+ protected static final Logger _log = Logger.getLogger(QueueBindingTupleBinding.class);
+
+ private static final QueueBindingTupleBinding INSTANCE = new QueueBindingTupleBinding();
- public BindingTuple_4()
+ public static QueueBindingTupleBinding getInstance()
{
- super();
+ return INSTANCE;
}
+ /** private constructor forces getInstance instead */
+ private QueueBindingTupleBinding() { }
+
public BindingRecord entryToObject(TupleInput tupleInput)
{
AMQShortString exchangeName = AMQShortStringEncoding.readShortString(tupleInput);
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueEntryBinding.java
index a4ed25c0ed..f65df23706 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueEntryBinding.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.berkeleydb.tuples;
+package org.apache.qpid.server.store.berkeleydb.tuple;
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
@@ -26,10 +26,21 @@ import com.sleepycat.bind.tuple.TupleOutput;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
-import org.apache.qpid.server.store.berkeleydb.QueueEntryKey;
+import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey;
-public class QueueEntryTB extends TupleBinding<QueueEntryKey>
+public class QueueEntryBinding extends TupleBinding<QueueEntryKey>
{
+
+ private static final QueueEntryBinding INSTANCE = new QueueEntryBinding();
+
+ public static QueueEntryBinding getInstance()
+ {
+ return INSTANCE;
+ }
+
+ /** private constructor forces getInstance instead */
+ private QueueEntryBinding() { }
+
public QueueEntryKey entryToObject(TupleInput tupleInput)
{
AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StringMapBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/StringMapBinding.java
index f8fd39e127..15f31953f4 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StringMapBinding.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/StringMapBinding.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.berkeleydb;
+package org.apache.qpid.server.store.berkeleydb.tuple;
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
@@ -29,9 +29,8 @@ import java.util.Map;
public class StringMapBinding extends TupleBinding<Map<String,String>>
{
-
private static final StringMapBinding INSTANCE = new StringMapBinding();
-
+
public Map<String, String> entryToObject(final TupleInput tupleInput)
{
int entries = tupleInput.readInt();
@@ -43,7 +42,6 @@ public class StringMapBinding extends TupleBinding<Map<String,String>>
return map;
}
-
public void objectToEntry(final Map<String, String> stringStringMap, final TupleOutput tupleOutput)
{
tupleOutput.writeInt(stringStringMap.size());
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/UUIDTupleBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/UUIDTupleBinding.java
index c1a5d473f0..f8657cdd49 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/UUIDTupleBinding.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/UUIDTupleBinding.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.berkeleydb;
+package org.apache.qpid.server.store.berkeleydb.tuple;
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
@@ -29,7 +29,7 @@ import java.util.UUID;
public class UUIDTupleBinding extends TupleBinding<UUID>
{
private static final UUIDTupleBinding INSTANCE = new UUIDTupleBinding();
-
+
public UUID entryToObject(final TupleInput tupleInput)
{
return new UUID(tupleInput.readLong(), tupleInput.readLong());
@@ -38,13 +38,11 @@ public class UUIDTupleBinding extends TupleBinding<UUID>
public void objectToEntry(final UUID uuid, final TupleOutput tupleOutput)
{
tupleOutput.writeLong(uuid.getMostSignificantBits());
- tupleOutput.writeLong(uuid.getLeastSignificantBits());
+ tupleOutput.writeLong(uuid.getLeastSignificantBits());
}
public static UUIDTupleBinding getInstance()
{
return INSTANCE;
}
-
-
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/XidTB.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/XidBinding.java
index 3a5d61b2b6..01a5b75fef 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/XidTB.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/XidBinding.java
@@ -19,15 +19,27 @@
*
*/
-package org.apache.qpid.server.store.berkeleydb.tuples;
+package org.apache.qpid.server.store.berkeleydb.tuple;
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
-import org.apache.qpid.server.store.berkeleydb.keys.Xid;
-public class XidTB extends TupleBinding<Xid>
+import org.apache.qpid.server.store.berkeleydb.entry.Xid;
+
+public class XidBinding extends TupleBinding<Xid>
{
+
+ private static final XidBinding INSTANCE = new XidBinding();
+
+ public static XidBinding getInstance()
+ {
+ return INSTANCE;
+ }
+
+ /** private constructor forces getInstance instead */
+ private XidBinding() { }
+
@Override
public Xid entryToObject(TupleInput input)
{
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_4.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_4.java
deleted file mode 100644
index df857df31a..0000000000
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_4.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.tuples;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-
-import org.apache.qpid.server.store.berkeleydb.MessageContentKey;
-import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_4;
-
-public class MessageContentKeyTB_4 extends TupleBinding<MessageContentKey>
-{
-
- public MessageContentKey entryToObject(TupleInput tupleInput)
- {
- long messageId = tupleInput.readLong();
- int chunk = tupleInput.readInt();
- return new MessageContentKey_4(messageId, chunk);
- }
-
- public void objectToEntry(MessageContentKey object, TupleOutput tupleOutput)
- {
- final MessageContentKey_4 mk = (MessageContentKey_4) object;
- tupleOutput.writeLong(mk.getMessageId());
- tupleOutput.writeInt(mk.getChunk());
- }
-
-} \ No newline at end of file
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_5.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_5.java
deleted file mode 100644
index 17f88e1c2b..0000000000
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_5.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.tuples;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-
-import org.apache.qpid.server.store.berkeleydb.MessageContentKey;
-import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_5;
-
-public class MessageContentKeyTB_5 extends TupleBinding<MessageContentKey>
-{
- public MessageContentKey entryToObject(TupleInput tupleInput)
- {
- long messageId = tupleInput.readLong();
- int offset = tupleInput.readInt();
- return new MessageContentKey_5(messageId, offset);
- }
-
- public void objectToEntry(MessageContentKey object, TupleOutput tupleOutput)
- {
- final MessageContentKey_5 mk = (MessageContentKey_5) object;
- tupleOutput.writeLong(mk.getMessageId());
- tupleOutput.writeInt(mk.getOffset());
- }
-
-} \ No newline at end of file
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTupleBindingFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTupleBindingFactory.java
deleted file mode 100644
index 4a320f49c9..0000000000
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTupleBindingFactory.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.tuples;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-
-import org.apache.qpid.server.store.berkeleydb.MessageContentKey;
-
-public class MessageContentKeyTupleBindingFactory extends TupleBindingFactory<MessageContentKey>
-{
- public MessageContentKeyTupleBindingFactory(int version)
- {
- super(version);
- }
-
- public TupleBinding<MessageContentKey> getInstance()
- {
- switch (getVersion())
- {
- default:
- case 5:
- return new MessageContentKeyTB_5();
- case 4:
- return new MessageContentKeyTB_4();
- }
- }
-}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_4.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_4.java
deleted file mode 100644
index bdd806bb81..0000000000
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_4.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.tuples;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQFrameDecodingException;
-import org.apache.qpid.framing.AMQProtocolVersionException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.message.MessageMetaData;
-import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-/**
- * Handles the mapping to and from 0-8/0-9 message meta data
- */
-public class MessageMetaDataTB_4 extends TupleBinding<Object>
-{
- private static final Logger _log = Logger.getLogger(MessageMetaDataTB_4.class);
-
- public MessageMetaDataTB_4()
- {
- }
-
- public Object entryToObject(TupleInput tupleInput)
- {
- try
- {
- final MessagePublishInfo publishBody = readMessagePublishInfo(tupleInput);
- final ContentHeaderBody contentHeaderBody = readContentHeaderBody(tupleInput);
- final int contentChunkCount = tupleInput.readInt();
-
- return new MessageMetaData(publishBody, contentHeaderBody, contentChunkCount);
- }
- catch (Exception e)
- {
- _log.error("Error converting entry to object: " + e, e);
- // annoyingly just have to return null since we cannot throw
- return null;
- }
- }
-
- public void objectToEntry(Object object, TupleOutput tupleOutput)
- {
- MessageMetaData message = (MessageMetaData) object;
- try
- {
- writeMessagePublishInfo(message.getMessagePublishInfo(), tupleOutput);
- }
- catch (AMQException e)
- {
- // can't do anything else since the BDB interface precludes throwing any exceptions
- // in practice we should never get an exception
- throw new RuntimeException("Error converting object to entry: " + e, e);
- }
- writeContentHeader(message.getContentHeaderBody(), tupleOutput);
- tupleOutput.writeInt(message.getContentChunkCount());
- }
-
- private MessagePublishInfo readMessagePublishInfo(TupleInput tupleInput)
- {
-
- final AMQShortString exchange = AMQShortStringEncoding.readShortString(tupleInput);
- final AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
- final boolean mandatory = tupleInput.readBoolean();
- final boolean immediate = tupleInput.readBoolean();
-
- return new MessagePublishInfo()
- {
-
- public AMQShortString getExchange()
- {
- return exchange;
- }
-
- public void setExchange(AMQShortString exchange)
- {
-
- }
-
- public boolean isImmediate()
- {
- return immediate;
- }
-
- public boolean isMandatory()
- {
- return mandatory;
- }
-
- public AMQShortString getRoutingKey()
- {
- return routingKey;
- }
- } ;
-
- }
-
- private ContentHeaderBody readContentHeaderBody(TupleInput tupleInput) throws AMQFrameDecodingException, AMQProtocolVersionException
- {
- int bodySize = tupleInput.readInt();
- byte[] underlying = new byte[bodySize];
- tupleInput.readFast(underlying);
-
- try
- {
- return ContentHeaderBody.createFromBuffer(new DataInputStream(new ByteArrayInputStream(underlying)), bodySize);
- }
- catch (IOException e)
- {
- throw new AMQFrameDecodingException(null, e.getMessage(), e);
- }
- }
-
- private void writeMessagePublishInfo(MessagePublishInfo publishBody, TupleOutput tupleOutput) throws AMQException
- {
-
- AMQShortStringEncoding.writeShortString(publishBody.getExchange(), tupleOutput);
- AMQShortStringEncoding.writeShortString(publishBody.getRoutingKey(), tupleOutput);
- tupleOutput.writeBoolean(publishBody.isMandatory());
- tupleOutput.writeBoolean(publishBody.isImmediate());
- }
-
- private void writeContentHeader(ContentHeaderBody headerBody, TupleOutput tupleOutput)
- {
- // write out the content header body
- final int bodySize = headerBody.getSize();
- ByteArrayOutputStream baos = new ByteArrayOutputStream(bodySize);
- try
- {
- headerBody.writePayload(new DataOutputStream(baos));
- tupleOutput.writeInt(bodySize);
- tupleOutput.writeFast(baos.toByteArray());
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
-
- }
-}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java
deleted file mode 100644
index cb742e76a1..0000000000
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.tuples;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-
-public class MessageMetaDataTupleBindingFactory extends TupleBindingFactory<Object>
-{
- public MessageMetaDataTupleBindingFactory(int version)
- {
- super(version);
- }
-
- public TupleBinding<Object> getInstance()
- {
- switch (getVersion())
- {
- default:
- case 5:
- return new MessageMetaDataTB_5();
- case 4:
- return new MessageMetaDataTB_4();
- }
- }
-}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java
deleted file mode 100644
index a189786885..0000000000
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.tuples;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-
-import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
-
-public class QueueTupleBindingFactory extends TupleBindingFactory<QueueRecord>
-{
-
- public QueueTupleBindingFactory(int version)
- {
- super(version);
- }
-
- public TupleBinding<QueueRecord> getInstance()
- {
- switch (getVersion())
- {
- default:
- case 5:
- return new QueueTuple_5();
- case 4:
- return new QueueTuple_4();
- }
- }
-}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_4.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_4.java
deleted file mode 100644
index d2ba4dbbca..0000000000
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_4.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.tuples;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-import com.sleepycat.je.DatabaseException;
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
-import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
-import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
-
-public class QueueTuple_4 extends TupleBinding<QueueRecord> implements QueueTuple
-{
- private static final Logger _logger = Logger.getLogger(QueueTuple_4.class);
-
- public QueueTuple_4()
- {
- super();
- }
-
- public QueueRecord entryToObject(TupleInput tupleInput)
- {
- try
- {
- AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
- AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput);
- // Addition for Version 2 of this table, read the queue arguments
- FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput);
-
- return new QueueRecord(name, owner, false, arguments);
- }
- catch (DatabaseException e)
- {
- _logger.error("Unable to create binding: " + e, e);
- return null;
- }
-
- }
-
- public void objectToEntry(QueueRecord queue, TupleOutput tupleOutput)
- {
- AMQShortStringEncoding.writeShortString(queue.getNameShortString(), tupleOutput);
- AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput);
- // Addition for Version 2 of this table, store the queue arguments
- FieldTableEncoding.writeFieldTable(queue.getArguments(), tupleOutput);
- }
-}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractStoreUpgrade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractStoreUpgrade.java
new file mode 100644
index 0000000000..c96c751694
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractStoreUpgrade.java
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.upgrade;
+
+import java.util.List;
+
+import org.apache.log4j.Logger;
+
+import com.sleepycat.je.Database;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.Transaction;
+
+public abstract class AbstractStoreUpgrade implements StoreUpgrade
+{
+ private static final Logger _logger = Logger.getLogger(AbstractStoreUpgrade.class);
+ protected static final String[] USER_FRIENDLY_NAMES = new String[] { "Exchanges", "Queues", "Queue bindings",
+ "Message deliveries", "Message metadata", "Message content", "Bridges", "Links", "Distributed transactions" };
+
+ protected void reportFinished(Environment environment, String[] databaseNames, String[] userFriendlyNames)
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Upgraded:");
+ List<String> databases = environment.getDatabaseNames();
+ for (int i = 0; i < databaseNames.length; i++)
+ {
+ if (databases.contains(databaseNames[i]))
+ {
+ _logger.info(" " + getRowCount(databaseNames[i], environment) + " rows in " + userFriendlyNames[i]);
+ }
+ }
+ }
+ }
+
+
+ protected void reportStarting(Environment environment, String[] databaseNames, String[] userFriendlyNames)
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Upgrading:");
+ List<String> databases = environment.getDatabaseNames();
+ for (int i = 0; i < databaseNames.length; i++)
+ {
+ if (databases.contains(databaseNames[i]))
+ {
+ _logger.info(" " + getRowCount(databaseNames[i], environment) + " rows from " + userFriendlyNames[i]);
+ }
+ }
+ }
+ }
+
+ private long getRowCount(String databaseName, Environment environment)
+ {
+ DatabaseCallable<Long> operation = new DatabaseCallable<Long>()
+ {
+ @Override
+ public Long call(Database sourceDatabase, Database targetDatabase, Transaction transaction)
+ {
+ return sourceDatabase.count();
+ }
+ };
+ return new DatabaseTemplate(environment, databaseName, null).call(operation);
+ }
+
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/CursorOperation.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/CursorOperation.java
new file mode 100644
index 0000000000..42a3173e21
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/CursorOperation.java
@@ -0,0 +1,89 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.upgrade;
+
+import org.apache.log4j.Logger;
+
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.Transaction;
+
+public abstract class CursorOperation implements DatabaseRunnable
+{
+ private static final Logger _logger = Logger.getLogger(CursorOperation.class);
+
+ private CursorTemplate _template;
+ private long _rowCount;
+ private long _processedRowCount;
+
+ @Override
+ public void run(final Database sourceDatabase, final Database targetDatabase, final Transaction transaction)
+ {
+ _rowCount = sourceDatabase.count();
+ _template = new CursorTemplate(sourceDatabase, transaction, new DatabaseEntryCallback()
+ {
+ @Override
+ public void processEntry(final Database database, final Transaction transaction, final DatabaseEntry key,
+ final DatabaseEntry value)
+ {
+ _processedRowCount++;
+ CursorOperation.this.processEntry(database, targetDatabase, transaction, key, value);
+ if (getProcessedCount() % 1000 == 0)
+ {
+ _logger.info("Processed " + getProcessedCount() + " messages of " + getRowCount() + ".");
+ }
+ }
+
+ });
+ _template.processEntries();
+ }
+
+ public void abort()
+ {
+ if (_template != null)
+ {
+ _template.abort();
+ }
+ }
+
+ public boolean deleteCurrent()
+ {
+ if (_template != null)
+ {
+ return _template.deleteCurrent();
+ }
+ return false;
+ }
+
+ public long getRowCount()
+ {
+ return _rowCount;
+ }
+
+ public long getProcessedCount()
+ {
+ return _processedRowCount;
+ }
+
+ public abstract void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
+ DatabaseEntry key, DatabaseEntry value);
+
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/CursorTemplate.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/CursorTemplate.java
new file mode 100644
index 0000000000..0b14080486
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/CursorTemplate.java
@@ -0,0 +1,75 @@
+package org.apache.qpid.server.store.berkeleydb.upgrade;
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+import com.sleepycat.je.Cursor;
+import com.sleepycat.je.CursorConfig;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.Transaction;
+
+public class CursorTemplate
+{
+ private Database _database;
+ private Transaction _transaction;
+ private DatabaseEntryCallback _databaseEntryCallback;
+ private Cursor _cursor;
+ private boolean _iterating;
+
+ public CursorTemplate(Database database, Transaction transaction, DatabaseEntryCallback databaseEntryCallback)
+ {
+ _database = database;
+ _transaction = transaction;
+ _databaseEntryCallback = databaseEntryCallback;
+ }
+
+ public void processEntries()
+ {
+ _cursor = _database.openCursor(_transaction, CursorConfig.READ_COMMITTED);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+
+ try
+ {
+ _iterating = true;
+ while (_iterating && _cursor.getNext(key, value, LockMode.DEFAULT) == OperationStatus.SUCCESS)
+ {
+ _databaseEntryCallback.processEntry(_database, _transaction, key, value);
+ }
+ }
+ finally
+ {
+ _cursor.close();
+ }
+ }
+
+ public boolean deleteCurrent()
+ {
+ return _cursor.delete() == OperationStatus.SUCCESS;
+ }
+
+ public void abort()
+ {
+ _iterating = false;
+ }
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseCallable.java
index 301ee417c5..bf5462ef48 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseCallable.java
@@ -18,8 +18,12 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.berkeleydb.tuples;
+package org.apache.qpid.server.store.berkeleydb.upgrade;
-public interface BindingTuple
+import com.sleepycat.je.Database;
+import com.sleepycat.je.Transaction;
+
+public interface DatabaseCallable<T>
{
+ public T call(Database sourceDatabase, Database targetDatabase, Transaction transaction);
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageContentKey.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseEntryCallback.java
index 005e8d4604..8ac22e5dfb 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageContentKey.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseEntryCallback.java
@@ -18,25 +18,13 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.berkeleydb;
+package org.apache.qpid.server.store.berkeleydb.upgrade;
-public class MessageContentKey
-{
- private long _messageId;
-
- public MessageContentKey(long messageId)
- {
- _messageId = messageId;
- }
-
-
- public long getMessageId()
- {
- return _messageId;
- }
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.Transaction;
- public void setMessageId(long messageId)
- {
- this._messageId = messageId;
- }
-} \ No newline at end of file
+public interface DatabaseEntryCallback
+{
+ void processEntry(Database database, Transaction transaction, DatabaseEntry key, DatabaseEntry value);
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseRunnable.java
index 97b1398e10..3e9e6a3497 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseRunnable.java
@@ -18,23 +18,13 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.berkeleydb.tuples;
+package org.apache.qpid.server.store.berkeleydb.upgrade;
-import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.Transaction;
-public abstract class TupleBindingFactory<E>
+public interface DatabaseRunnable
{
- private final int _version;
+ public void run(Database sourceDatabase, Database targetDatabase, Transaction transaction);
- public TupleBindingFactory(int version)
- {
- _version = version;
- }
-
- public abstract TupleBinding<E> getInstance();
-
- public int getVersion()
- {
- return _version;
- }
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseTemplate.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseTemplate.java
new file mode 100644
index 0000000000..135158afa4
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseTemplate.java
@@ -0,0 +1,114 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.upgrade;
+
+import org.apache.log4j.Logger;
+
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.Transaction;
+
+public class DatabaseTemplate
+{
+ private static final Logger _logger = Logger.getLogger(DatabaseTemplate.class);
+
+ private Environment _environment;
+ private String _sourceDatabaseName;
+ private String _targetDatabaseName;
+ private Transaction _parentTransaction;
+
+ public DatabaseTemplate(Environment environment, String sourceDatabaseName, Transaction transaction)
+ {
+ this(environment, sourceDatabaseName, null, transaction);
+ }
+
+ public DatabaseTemplate(Environment environment, String sourceDatabaseName, String targetDatabaseName,
+ Transaction parentTransaction)
+ {
+ _environment = environment;
+ _sourceDatabaseName = sourceDatabaseName;
+ _targetDatabaseName = targetDatabaseName;
+ _parentTransaction = parentTransaction;
+ }
+
+ public void run(DatabaseRunnable databaseRunnable)
+ {
+ DatabaseCallable<Void> callable = runnableToCallable(databaseRunnable);
+ call(callable);
+ }
+
+ public <T> T call(DatabaseCallable<T> databaseCallable)
+ {
+ Database sourceDatabase = null;
+ Database targetDatabase = null;
+ try
+ {
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(true);
+
+ sourceDatabase = _environment.openDatabase(_parentTransaction, _sourceDatabaseName, dbConfig);
+
+ if (_targetDatabaseName != null)
+ {
+ targetDatabase = _environment.openDatabase(_parentTransaction, _targetDatabaseName, dbConfig);
+ }
+
+ return databaseCallable.call(sourceDatabase, targetDatabase, _parentTransaction);
+ }
+ finally
+ {
+ closeDatabase(sourceDatabase);
+ closeDatabase(targetDatabase);
+ }
+ }
+
+ private DatabaseCallable<Void> runnableToCallable(final DatabaseRunnable databaseRunnable)
+ {
+ return new DatabaseCallable<Void>()
+ {
+
+ @Override
+ public Void call(Database sourceDatabase, Database targetDatabase, Transaction transaction)
+ {
+ databaseRunnable.run(sourceDatabase, targetDatabase, transaction);
+ return null;
+ }
+ };
+ }
+
+ private void closeDatabase(Database database)
+ {
+ if (database != null)
+ {
+ try
+ {
+ database.close();
+ }
+ catch (Exception e)
+ {
+ _logger.error("Unable to close database", e);
+ }
+ }
+ }
+
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java
new file mode 100644
index 0000000000..e7a34c524c
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.upgrade;
+
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Environment;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.logging.LogSubject;
+
+public interface StoreUpgrade
+{
+ void performUpgrade(LogSubject logSubject, Environment environment, UpgradeInteractionHandler handler)
+ throws DatabaseException, AMQStoreException;
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java
new file mode 100644
index 0000000000..6b063f5c23
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java
@@ -0,0 +1,926 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.upgrade;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.AMQProtocolVersionException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
+import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
+
+import com.sleepycat.bind.tuple.ByteBinding;
+import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.bind.tuple.TupleBase;
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.Transaction;
+
+public class UpgradeFrom4To5 extends AbstractStoreUpgrade
+{
+ private static final String OLD_DELIVERY_DB = "deliveryDb_v4";
+ private static final String NEW_DELIVERY_DB = "deliveryDb_v5";
+ private static final String EXCHANGE_DB_NAME = "exchangeDb_v4";
+ private static final String OLD_BINDINGS_DB_NAME = "queueBindingsDb_v4";
+ private static final String NEW_BINDINGS_DB_NAME = "queueBindingsDb_v5";
+ private static final String OLD_QUEUE_DB_NAME = "queueDb_v4";
+ private static final String NEW_QUEUE_DB_NAME = "queueDb_v5";
+ private static final String OLD_METADATA_DB_NAME = "messageMetaDataDb_v4";
+ private static final String NEW_METADATA_DB_NAME = "messageMetaDataDb_v5";
+ private static final String OLD_CONTENT_DB_NAME = "messageContentDb_v4";
+ private static final String NEW_CONTENT_DB_NAME = "messageContentDb_v5";
+
+ private static final String[] OLD_DATABASE_NAMES = new String[] { EXCHANGE_DB_NAME, OLD_QUEUE_DB_NAME,
+ OLD_BINDINGS_DB_NAME, OLD_DELIVERY_DB, OLD_METADATA_DB_NAME, OLD_CONTENT_DB_NAME, "bridges_v4", "links_v4",
+ "xids_v4" };
+ private static final String[] NEW_DATABASE_NAMES = new String[] { "exchangeDb_v5", NEW_QUEUE_DB_NAME,
+ NEW_BINDINGS_DB_NAME, NEW_DELIVERY_DB, NEW_METADATA_DB_NAME, NEW_CONTENT_DB_NAME, "bridges_v5", "links_v5",
+ "xids_v5" };
+
+ private static final byte COLON = (byte) ':';
+
+ private static final Logger _logger = Logger.getLogger(UpgradeFrom4To5.class);
+
+ public void performUpgrade(final LogSubject logSubject, final Environment environment,
+ final UpgradeInteractionHandler handler) throws DatabaseException, AMQStoreException
+ {
+ _logger.info("Starting store upgrade from version 4");
+ Transaction transaction = null;
+ try
+ {
+ reportStarting(environment, OLD_DATABASE_NAMES, USER_FRIENDLY_NAMES);
+
+ transaction = environment.beginTransaction(null, null);
+
+ // find all queues which are bound to a topic exchange and which have a colon in their name
+ final List<AMQShortString> potentialDurableSubs = findPotentialDurableSubscriptions(logSubject, environment,
+ transaction);
+
+ Set<String> existingQueues = upgradeQueues(logSubject, environment, handler, potentialDurableSubs, transaction);
+ upgradeQueueBindings(logSubject, environment, handler, potentialDurableSubs, transaction);
+ Set<Long> messagesToDiscard = upgradeDelivery(logSubject, environment, existingQueues, handler, transaction);
+ upgradeContent(logSubject, environment, handler, messagesToDiscard, transaction);
+ upgradeMetaData(logSubject, environment, handler, messagesToDiscard, transaction);
+ renameRemainingDatabases(logSubject, environment, handler, transaction);
+ transaction.commit();
+
+ reportFinished(environment, NEW_DATABASE_NAMES, USER_FRIENDLY_NAMES);
+
+ }
+ catch (Exception e)
+ {
+ transaction.abort();
+ if (e instanceof DatabaseException)
+ {
+ throw (DatabaseException) e;
+ }
+ else if (e instanceof AMQStoreException)
+ {
+ throw (AMQStoreException) e;
+ }
+ else
+ {
+ throw new AMQStoreException("Unexpected exception", e);
+ }
+ }
+ }
+
+ private void upgradeQueueBindings(LogSubject logSubject, Environment environment, UpgradeInteractionHandler handler,
+ final List<AMQShortString> potentialDurableSubs, Transaction transaction)
+ {
+ if (environment.getDatabaseNames().contains(OLD_BINDINGS_DB_NAME))
+ {
+ _logger.info("Queue Bindings");
+ final BindingTuple bindingTuple = new BindingTuple();
+ CursorOperation databaseOperation = new CursorOperation()
+ {
+
+ @Override
+ public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
+ DatabaseEntry key, DatabaseEntry value)
+ {
+ // All the information required in binding entries is actually in the *key* not value.
+ BindingRecord oldBindingRecord = bindingTuple.entryToObject(key);
+
+ AMQShortString queueName = oldBindingRecord.getQueueName();
+ AMQShortString exchangeName = oldBindingRecord.getExchangeName();
+ AMQShortString routingKey = oldBindingRecord.getRoutingKey();
+ FieldTable arguments = oldBindingRecord.getArguments();
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug(String.format(
+ "Processing binding for queue %s, exchange %s, routingKey %s arguments %s", queueName,
+ exchangeName, routingKey, arguments));
+ }
+
+ // if the queue name is in the gathered list then inspect its binding arguments
+ // only topic exchange should have a JMS selector key in binding
+ if (potentialDurableSubs.contains(queueName)
+ && exchangeName.equals(ExchangeDefaults.TOPIC_EXCHANGE_NAME))
+ {
+ if (arguments == null)
+ {
+ arguments = new FieldTable();
+ }
+
+ AMQShortString selectorFilterKey = AMQPFilterTypes.JMS_SELECTOR.getValue();
+ if (!arguments.containsKey(selectorFilterKey))
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.info("adding the empty string (i.e. 'no selector') value for " + queueName
+ + " and exchange " + exchangeName);
+ }
+ arguments.put(selectorFilterKey, "");
+ }
+ }
+ addBindingToDatabase(bindingTuple, targetDatabase, transaction, queueName, exchangeName, routingKey,
+ arguments);
+ }
+ };
+ new DatabaseTemplate(environment, OLD_BINDINGS_DB_NAME, NEW_BINDINGS_DB_NAME, transaction)
+ .run(databaseOperation);
+ environment.removeDatabase(transaction, OLD_BINDINGS_DB_NAME);
+ _logger.info(databaseOperation.getRowCount() + " Queue Binding entries");
+ }
+ }
+
+ private Set<String> upgradeQueues(final LogSubject logSubject, final Environment environment,
+ final UpgradeInteractionHandler handler, List<AMQShortString> potentialDurableSubs, Transaction transaction)
+ {
+ _logger.info("Queues");
+ final Set<String> existingQueues = new HashSet<String>();
+ if (environment.getDatabaseNames().contains(OLD_QUEUE_DB_NAME))
+ {
+ final QueueRecordBinding binding = new QueueRecordBinding(potentialDurableSubs);
+ CursorOperation databaseOperation = new CursorOperation()
+ {
+ @Override
+ public void processEntry(final Database sourceDatabase, final Database targetDatabase,
+ final Transaction transaction, final DatabaseEntry key, final DatabaseEntry value)
+ {
+ QueueRecord record = binding.entryToObject(value);
+ DatabaseEntry newValue = new DatabaseEntry();
+ binding.objectToEntry(record, newValue);
+ targetDatabase.put(transaction, key, newValue);
+ existingQueues.add(record.getNameShortString().asString());
+ sourceDatabase.delete(transaction, key);
+ }
+ };
+ new DatabaseTemplate(environment, OLD_QUEUE_DB_NAME, NEW_QUEUE_DB_NAME, transaction).run(databaseOperation);
+ environment.removeDatabase(transaction, OLD_QUEUE_DB_NAME);
+ _logger.info(databaseOperation.getRowCount() + " Queue entries");
+ }
+ return existingQueues;
+ }
+
+ private List<AMQShortString> findPotentialDurableSubscriptions(final LogSubject logSubject,
+ final Environment environment, Transaction transaction)
+ {
+ final List<AMQShortString> exchangeNames = findTopicExchanges(logSubject, environment);
+ final List<AMQShortString> queues = new ArrayList<AMQShortString>();
+ final PartialBindingRecordBinding binding = new PartialBindingRecordBinding();
+
+ CursorOperation databaseOperation = new CursorOperation()
+ {
+ @Override
+ public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
+ DatabaseEntry key, DatabaseEntry value)
+ {
+ PartialBindingRecord record = binding.entryToObject(key);
+ if (exchangeNames.contains(record.getExchangeName()) && record.getQueueName().contains(COLON))
+ {
+ queues.add(record.getQueueName());
+ }
+ }
+ };
+ new DatabaseTemplate(environment, OLD_BINDINGS_DB_NAME, transaction).run(databaseOperation);
+ return queues;
+ }
+
+ private Set<Long> upgradeDelivery(final LogSubject logSubject, final Environment environment,
+ final Set<String> existingQueues, final UpgradeInteractionHandler handler, Transaction transaction)
+ {
+ final Set<Long> messagesToDiscard = new HashSet<Long>();
+ final Set<String> queuesToDiscard = new HashSet<String>();
+ final QueueEntryKeyBinding queueEntryKeyBinding = new QueueEntryKeyBinding();
+ _logger.info("Delivery Records");
+
+ CursorOperation databaseOperation = new CursorOperation()
+ {
+ @Override
+ public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
+ DatabaseEntry key, DatabaseEntry value)
+ {
+ QueueEntryKey entryKey = queueEntryKeyBinding.entryToObject(key);
+ Long messageId = entryKey.getMessageId();
+ final String queueName = entryKey.getQueueName().asString();
+ if (!existingQueues.contains(queueName))
+ {
+ if (queuesToDiscard.contains(queueName))
+ {
+ messagesToDiscard.add(messageId);
+ }
+ else
+ {
+ String lineSeparator = System.getProperty("line.separator");
+ String question = MessageFormat.format("Found persistent messages for non-durable queue ''{1}''. "
+ + " Do you with to create this queue and move all the messages into it?" + lineSeparator
+ + "NOTE: Answering No will result in these messages being discarded!", queueName);
+ UpgradeInteractionResponse response = handler.requireResponse(question.toString(),
+ UpgradeInteractionResponse.YES, UpgradeInteractionResponse.YES,
+ UpgradeInteractionResponse.NO, UpgradeInteractionResponse.ABORT);
+
+ if (response == UpgradeInteractionResponse.YES)
+ {
+ createQueue(environment, transaction, queueName);
+ existingQueues.add(queueName);
+ }
+ else if (response == UpgradeInteractionResponse.NO)
+ {
+ queuesToDiscard.add(queueName);
+ messagesToDiscard.add(messageId);
+ }
+ else
+ {
+ throw new RuntimeException("Unable is aborted!");
+ }
+ }
+ }
+
+ if (!messagesToDiscard.contains(messageId))
+ {
+ DatabaseEntry newKey = new DatabaseEntry();
+ queueEntryKeyBinding.objectToEntry(entryKey, newKey);
+ targetDatabase.put(transaction, newKey, value);
+
+ }
+ }
+ };
+ new DatabaseTemplate(environment, OLD_DELIVERY_DB, NEW_DELIVERY_DB, transaction).run(databaseOperation);
+
+ if (!messagesToDiscard.isEmpty())
+ {
+ databaseOperation = new CursorOperation()
+ {
+ @Override
+ public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
+ DatabaseEntry key, DatabaseEntry value)
+ {
+ QueueEntryKey entryKey = queueEntryKeyBinding.entryToObject(key);
+ Long messageId = entryKey.getMessageId();
+
+ if (messagesToDiscard.contains(messageId))
+ {
+ messagesToDiscard.remove(messageId);
+ }
+ }
+ };
+ new DatabaseTemplate(environment, NEW_DELIVERY_DB, transaction).run(databaseOperation);
+ }
+ _logger.info(databaseOperation.getRowCount() + " Delivery Records entries ");
+ environment.removeDatabase(transaction, OLD_DELIVERY_DB);
+
+ return messagesToDiscard;
+ }
+
+ protected void createQueue(final Environment environment, Transaction transaction, final String queueName)
+ {
+
+ final QueueRecordBinding binding = new QueueRecordBinding(null);
+ final BindingTuple bindingTuple = new BindingTuple();
+ DatabaseRunnable queueCreateOperation = new DatabaseRunnable()
+ {
+
+ @Override
+ public void run(Database newQueueDatabase, Database newBindingsDatabase, Transaction transaction)
+ {
+ AMQShortString queueNameAMQ = new AMQShortString(queueName);
+ QueueRecord record = new QueueRecord(queueNameAMQ, null, false, null);
+
+ DatabaseEntry key = new DatabaseEntry();
+
+ TupleOutput output = new TupleOutput();
+ AMQShortStringEncoding.writeShortString(record.getNameShortString(), output);
+ TupleBase.outputToEntry(output, key);
+
+ DatabaseEntry newValue = new DatabaseEntry();
+ binding.objectToEntry(record, newValue);
+ newQueueDatabase.put(transaction, key, newValue);
+
+ FieldTable emptyArguments = new FieldTable();
+ addBindingToDatabase(bindingTuple, newBindingsDatabase, transaction, queueNameAMQ,
+ ExchangeDefaults.DIRECT_EXCHANGE_NAME, queueNameAMQ, emptyArguments);
+
+ // TODO QPID-3490 we should not persist a default exchange binding
+ addBindingToDatabase(bindingTuple, newBindingsDatabase, transaction, queueNameAMQ,
+ ExchangeDefaults.DEFAULT_EXCHANGE_NAME, queueNameAMQ, emptyArguments);
+ }
+ };
+ new DatabaseTemplate(environment, NEW_QUEUE_DB_NAME, NEW_BINDINGS_DB_NAME, transaction).run(queueCreateOperation);
+ }
+
+ private List<AMQShortString> findTopicExchanges(final LogSubject logSubject, final Environment environment)
+ {
+ final List<AMQShortString> topicExchanges = new ArrayList<AMQShortString>();
+ final ExchangeRecordBinding binding = new ExchangeRecordBinding();
+ CursorOperation databaseOperation = new CursorOperation()
+ {
+
+ @Override
+ public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
+ DatabaseEntry key, DatabaseEntry value)
+ {
+ ExchangeRecord record = binding.entryToObject(value);
+ if (ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(record.getType()))
+ {
+ topicExchanges.add(record.getName());
+ }
+ }
+ };
+ new DatabaseTemplate(environment, EXCHANGE_DB_NAME, null).run(databaseOperation);
+ return topicExchanges;
+ }
+
+ private void upgradeMetaData(final LogSubject logSubject, final Environment environment,
+ final UpgradeInteractionHandler handler, final Set<Long> messagesToDiscard, Transaction transaction)
+ {
+ _logger.info("Message MetaData");
+ if (environment.getDatabaseNames().contains(OLD_METADATA_DB_NAME))
+ {
+ final MessageMetaDataBinding binding = new MessageMetaDataBinding();
+ CursorOperation databaseOperation = new CursorOperation()
+ {
+
+ @Override
+ public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
+ DatabaseEntry key, DatabaseEntry value)
+ {
+ StorableMessageMetaData metaData = binding.entryToObject(value);
+
+ // get message id
+ Long messageId = LongBinding.entryToLong(key);
+
+ // ONLY copy data if message is delivered to existing queue
+ if (messagesToDiscard.contains(messageId))
+ {
+ return;
+ }
+ DatabaseEntry newValue = new DatabaseEntry();
+ binding.objectToEntry(metaData, newValue);
+
+ targetDatabase.put(transaction, key, newValue);
+ targetDatabase.put(transaction, key, newValue);
+ deleteCurrent();
+
+ }
+ };
+
+ new DatabaseTemplate(environment, OLD_METADATA_DB_NAME, NEW_METADATA_DB_NAME, transaction)
+ .run(databaseOperation);
+ environment.removeDatabase(transaction, OLD_METADATA_DB_NAME);
+ _logger.info(databaseOperation.getRowCount() + " Message MetaData entries");
+ }
+ }
+
+ private void upgradeContent(final LogSubject logSubject, final Environment environment,
+ final UpgradeInteractionHandler handler, final Set<Long> messagesToDiscard, Transaction transaction)
+ {
+ _logger.info("Message Contents");
+ if (environment.getDatabaseNames().contains(OLD_CONTENT_DB_NAME))
+ {
+ final MessageContentKeyBinding keyBinding = new MessageContentKeyBinding();
+ final ContentBinding contentBinding = new ContentBinding();
+ CursorOperation cursorOperation = new CursorOperation()
+ {
+ private long _prevMsgId = -1;
+ private int _bytesSeenSoFar;
+
+ @Override
+ public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
+ DatabaseEntry key, DatabaseEntry value)
+ {
+ // determine the msgId of the current entry
+ MessageContentKey contentKey = keyBinding.entryToObject(key);
+ long msgId = contentKey.getMessageId();
+
+ // ONLY copy data if message is delivered to existing queue
+ if (messagesToDiscard.contains(msgId))
+ {
+ return;
+ }
+ // if this is a new message, restart the byte offset count.
+ if (_prevMsgId != msgId)
+ {
+ _bytesSeenSoFar = 0;
+ }
+
+ // determine the content size
+ ByteBuffer content = contentBinding.entryToObject(value);
+ int contentSize = content.limit();
+
+ // create the new key: id + previously seen data count
+ MessageContentKey newKey = new MessageContentKey(msgId, _bytesSeenSoFar);
+ DatabaseEntry newKeyEntry = new DatabaseEntry();
+ keyBinding.objectToEntry(newKey, newKeyEntry);
+
+ DatabaseEntry newValueEntry = new DatabaseEntry();
+ contentBinding.objectToEntry(content, newValueEntry);
+
+ targetDatabase.put(null, newKeyEntry, newValueEntry);
+
+ _prevMsgId = msgId;
+ _bytesSeenSoFar += contentSize;
+ }
+ };
+ new DatabaseTemplate(environment, OLD_CONTENT_DB_NAME, NEW_CONTENT_DB_NAME, transaction).run(cursorOperation);
+ environment.removeDatabase(transaction, OLD_CONTENT_DB_NAME);
+ _logger.info(cursorOperation.getRowCount() + " Message Content entries");
+ }
+ }
+
+ /**
+ * For all databases which haven't been otherwise upgraded, we still need to
+ * rename them from _v4 to _v5
+ */
+ private void renameRemainingDatabases(final LogSubject logSubject, final Environment environment,
+ final UpgradeInteractionHandler handler, Transaction transaction)
+ {
+ for (String dbName : environment.getDatabaseNames())
+ {
+ if (dbName.endsWith("_v4"))
+ {
+ String newName = dbName.substring(0, dbName.length() - 3) + "_v5";
+ _logger.info("Renaming " + dbName + " into " + newName);
+ environment.renameDatabase(transaction, dbName, newName);
+ }
+ }
+
+ }
+
+ private void addBindingToDatabase(final BindingTuple bindingTuple, Database targetDatabase, Transaction transaction,
+ AMQShortString queueName, AMQShortString exchangeName, AMQShortString routingKey, FieldTable arguments)
+ {
+
+ DatabaseEntry newKey = new DatabaseEntry();
+
+ bindingTuple.objectToEntry(new BindingRecord(exchangeName, queueName, routingKey, arguments), newKey);
+
+ DatabaseEntry newValue = new DatabaseEntry();
+ ByteBinding.byteToEntry((byte) 0, newValue);
+
+ targetDatabase.put(transaction, newKey, newValue);
+ }
+
+ private static final class ExchangeRecord
+ {
+ private final AMQShortString _name;
+ private final AMQShortString _type;
+
+ private ExchangeRecord(final AMQShortString name, final AMQShortString type)
+ {
+ _name = name;
+ _type = type;
+ }
+
+ public AMQShortString getName()
+ {
+ return _name;
+ }
+
+ public AMQShortString getType()
+ {
+ return _type;
+ }
+ }
+
+ private static final class ExchangeRecordBinding extends TupleBinding<ExchangeRecord>
+ {
+
+ @Override
+ public ExchangeRecord entryToObject(final TupleInput input)
+ {
+ return new ExchangeRecord(AMQShortStringEncoding.readShortString(input),
+ AMQShortStringEncoding.readShortString(input));
+ }
+
+ @Override
+ public void objectToEntry(final ExchangeRecord object, final TupleOutput output)
+ {
+ AMQShortStringEncoding.writeShortString(object.getName(), output);
+ AMQShortStringEncoding.writeShortString(object.getType(), output);
+ output.writeBoolean(false);
+ }
+ }
+
+ private static final class PartialBindingRecord
+ {
+ private final AMQShortString _exchangeName;
+ private final AMQShortString _queueName;
+
+ private PartialBindingRecord(final AMQShortString name, final AMQShortString type)
+ {
+ _exchangeName = name;
+ _queueName = type;
+ }
+
+ public AMQShortString getExchangeName()
+ {
+ return _exchangeName;
+ }
+
+ public AMQShortString getQueueName()
+ {
+ return _queueName;
+ }
+ }
+
+ private static final class PartialBindingRecordBinding extends TupleBinding<PartialBindingRecord>
+ {
+
+ @Override
+ public PartialBindingRecord entryToObject(final TupleInput input)
+ {
+ return new PartialBindingRecord(AMQShortStringEncoding.readShortString(input),
+ AMQShortStringEncoding.readShortString(input));
+ }
+
+ @Override
+ public void objectToEntry(final PartialBindingRecord object, final TupleOutput output)
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ static final class QueueRecord
+ {
+ private final AMQShortString _queueName;
+ private final AMQShortString _owner;
+ private final FieldTable _arguments;
+ private final boolean _exclusive;
+
+ public QueueRecord(AMQShortString queueName, AMQShortString owner, boolean exclusive, FieldTable arguments)
+ {
+ _queueName = queueName;
+ _owner = owner;
+ _exclusive = exclusive;
+ _arguments = arguments;
+ }
+
+ public AMQShortString getNameShortString()
+ {
+ return _queueName;
+ }
+
+ public AMQShortString getOwner()
+ {
+ return _owner;
+ }
+
+ public boolean isExclusive()
+ {
+ return _exclusive;
+ }
+
+ public FieldTable getArguments()
+ {
+ return _arguments;
+ }
+ }
+
+ static final class QueueRecordBinding extends TupleBinding<QueueRecord>
+ {
+ private final List<AMQShortString> _durableSubNames;
+
+ QueueRecordBinding(final List<AMQShortString> durableSubNames)
+ {
+ _durableSubNames = durableSubNames;
+ }
+
+ @Override
+ public QueueRecord entryToObject(final TupleInput input)
+ {
+ AMQShortString name = AMQShortStringEncoding.readShortString(input);
+ AMQShortString owner = AMQShortStringEncoding.readShortString(input);
+ FieldTable arguments = FieldTableEncoding.readFieldTable(input);
+ boolean exclusive = input.available() > 0 && input.readBoolean();
+ exclusive = exclusive || _durableSubNames.contains(name);
+
+ return new QueueRecord(name, owner, exclusive, arguments);
+
+ }
+
+ @Override
+ public void objectToEntry(final QueueRecord record, final TupleOutput output)
+ {
+ AMQShortStringEncoding.writeShortString(record.getNameShortString(), output);
+ AMQShortStringEncoding.writeShortString(record.getOwner(), output);
+ FieldTableEncoding.writeFieldTable(record.getArguments(), output);
+ output.writeBoolean(record.isExclusive());
+
+ }
+ }
+
+ static final class MessageMetaDataBinding extends TupleBinding<StorableMessageMetaData>
+ {
+
+ @Override
+ public MessageMetaData entryToObject(final TupleInput input)
+ {
+ try
+ {
+ final MessagePublishInfo publishBody = readMessagePublishInfo(input);
+ final ContentHeaderBody contentHeaderBody = readContentHeaderBody(input);
+ final int contentChunkCount = input.readInt();
+
+ return new MessageMetaData(publishBody, contentHeaderBody, contentChunkCount);
+ }
+ catch (Exception e)
+ {
+ _logger.error("Error converting entry to object: " + e, e);
+ // annoyingly just have to return null since we cannot throw
+ return null;
+ }
+ }
+
+ private MessagePublishInfo readMessagePublishInfo(TupleInput tupleInput)
+ {
+
+ final AMQShortString exchange = AMQShortStringEncoding.readShortString(tupleInput);
+ final AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
+ final boolean mandatory = tupleInput.readBoolean();
+ final boolean immediate = tupleInput.readBoolean();
+
+ return new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return exchange;
+ }
+
+ public void setExchange(AMQShortString exchange)
+ {
+
+ }
+
+ public boolean isImmediate()
+ {
+ return immediate;
+ }
+
+ public boolean isMandatory()
+ {
+ return mandatory;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return routingKey;
+ }
+ };
+
+ }
+
+ private ContentHeaderBody readContentHeaderBody(TupleInput tupleInput) throws AMQFrameDecodingException,
+ AMQProtocolVersionException
+ {
+ int bodySize = tupleInput.readInt();
+ byte[] underlying = new byte[bodySize];
+ tupleInput.readFast(underlying);
+
+ try
+ {
+ return ContentHeaderBody.createFromBuffer(new DataInputStream(new ByteArrayInputStream(underlying)),
+ bodySize);
+ }
+ catch (IOException e)
+ {
+ throw new AMQFrameDecodingException(null, e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void objectToEntry(final StorableMessageMetaData metaData, final TupleOutput output)
+ {
+ final int bodySize = 1 + metaData.getStorableSize();
+ byte[] underlying = new byte[bodySize];
+ underlying[0] = (byte) metaData.getType().ordinal();
+ java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(underlying);
+ buf.position(1);
+ buf = buf.slice();
+
+ metaData.writeToBuffer(0, buf);
+ output.writeInt(bodySize);
+ output.writeFast(underlying);
+ }
+ }
+
+ static final class MessageContentKey
+ {
+ private long _messageId;
+ private int _chunk;
+
+ public MessageContentKey(long messageId, int chunkNo)
+ {
+ _messageId = messageId;
+ _chunk = chunkNo;
+ }
+
+ public int getChunk()
+ {
+ return _chunk;
+ }
+
+ public long getMessageId()
+ {
+ return _messageId;
+ }
+
+ }
+
+ static final class MessageContentKeyBinding extends TupleBinding<MessageContentKey>
+ {
+
+ public MessageContentKey entryToObject(TupleInput tupleInput)
+ {
+ long messageId = tupleInput.readLong();
+ int chunk = tupleInput.readInt();
+ return new MessageContentKey(messageId, chunk);
+ }
+
+ public void objectToEntry(MessageContentKey object, TupleOutput tupleOutput)
+ {
+ final MessageContentKey mk = object;
+ tupleOutput.writeLong(mk.getMessageId());
+ tupleOutput.writeInt(mk.getChunk());
+ }
+
+ }
+
+ static final class ContentBinding extends TupleBinding<ByteBuffer>
+ {
+ public ByteBuffer entryToObject(TupleInput tupleInput)
+ {
+ final int size = tupleInput.readInt();
+ byte[] underlying = new byte[size];
+ tupleInput.readFast(underlying);
+ return ByteBuffer.wrap(underlying);
+ }
+
+ public void objectToEntry(ByteBuffer src, TupleOutput tupleOutput)
+ {
+ src = src.slice();
+
+ byte[] chunkData = new byte[src.limit()];
+ src.duplicate().get(chunkData);
+
+ tupleOutput.writeInt(chunkData.length);
+ tupleOutput.writeFast(chunkData);
+ }
+ }
+
+ static final class QueueEntryKey
+ {
+ private AMQShortString _queueName;
+ private long _messageId;
+
+ public QueueEntryKey(AMQShortString queueName, long messageId)
+ {
+ _queueName = queueName;
+ _messageId = messageId;
+ }
+
+ public AMQShortString getQueueName()
+ {
+ return _queueName;
+ }
+
+ public long getMessageId()
+ {
+ return _messageId;
+ }
+
+ }
+
+ static final class QueueEntryKeyBinding extends TupleBinding<QueueEntryKey>
+ {
+ public QueueEntryKey entryToObject(TupleInput tupleInput)
+ {
+ AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
+ long messageId = tupleInput.readLong();
+ return new QueueEntryKey(queueName, messageId);
+ }
+
+ public void objectToEntry(QueueEntryKey mk, TupleOutput tupleOutput)
+ {
+ AMQShortStringEncoding.writeShortString(mk.getQueueName(), tupleOutput);
+ tupleOutput.writeLong(mk.getMessageId());
+ }
+ }
+
+ static final class BindingRecord extends Object
+ {
+ private final AMQShortString _exchangeName;
+ private final AMQShortString _queueName;
+ private final AMQShortString _routingKey;
+ private final FieldTable _arguments;
+
+ public BindingRecord(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey,
+ FieldTable arguments)
+ {
+ _exchangeName = exchangeName;
+ _queueName = queueName;
+ _routingKey = routingKey;
+ _arguments = arguments;
+ }
+
+ public AMQShortString getExchangeName()
+ {
+ return _exchangeName;
+ }
+
+ public AMQShortString getQueueName()
+ {
+ return _queueName;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return _routingKey;
+ }
+
+ public FieldTable getArguments()
+ {
+ return _arguments;
+ }
+
+ }
+
+ static final class BindingTuple extends TupleBinding<BindingRecord>
+ {
+ public BindingRecord entryToObject(TupleInput tupleInput)
+ {
+ AMQShortString exchangeName = AMQShortStringEncoding.readShortString(tupleInput);
+ AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
+ AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
+
+ FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput);
+
+ return new BindingRecord(exchangeName, queueName, routingKey, arguments);
+ }
+
+ public void objectToEntry(BindingRecord binding, TupleOutput tupleOutput)
+ {
+ AMQShortStringEncoding.writeShortString(binding.getExchangeName(), tupleOutput);
+ AMQShortStringEncoding.writeShortString(binding.getQueueName(), tupleOutput);
+ AMQShortStringEncoding.writeShortString(binding.getRoutingKey(), tupleOutput);
+
+ FieldTableEncoding.writeFieldTable(binding.getArguments(), tupleOutput);
+ }
+
+ }
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java
new file mode 100644
index 0000000000..5aaf3675a8
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java
@@ -0,0 +1,372 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.upgrade;
+
+import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeInteractionResponse.ABORT;
+import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeInteractionResponse.NO;
+import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeInteractionResponse.YES;
+
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.logging.LogSubject;
+
+import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import com.sleepycat.je.Cursor;
+import com.sleepycat.je.CursorConfig;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.Transaction;
+
+public class UpgradeFrom5To6 extends AbstractStoreUpgrade
+{
+
+ private static final Logger _logger = Logger.getLogger(UpgradeFrom5To6.class);
+
+ private static final String OLD_CONTENT_DB_NAME = "messageContentDb_v5";
+ private static final String NEW_CONTENT_DB_NAME = "MESSAGE_CONTENT";
+ private static final String META_DATA_DB_NAME = "messageMetaDataDb_v5";
+
+ private static final String NEW_DB_NAMES[] = { "EXCHANGES", "QUEUES", "QUEUE_BINDINGS", "DELIVERIES",
+ "MESSAGE_METADATA", NEW_CONTENT_DB_NAME, "BRIDGES", "LINKS", "XIDS" };
+ private static final String OLD_DB_NAMES[] = { "exchangeDb_v5", "queueDb_v5", "queueBindingsDb_v5", "deliveryDb_v5",
+ META_DATA_DB_NAME, OLD_CONTENT_DB_NAME, "bridges_v5", "links_v5", "xids_v5" };
+
+ public void performUpgrade(final LogSubject logSubject, final Environment environment,
+ final UpgradeInteractionHandler handler) throws DatabaseException, AMQStoreException
+ {
+ _logger.info("Starting store upgrade from version 5");
+ Transaction transaction = null;
+ try
+ {
+ reportStarting(environment, OLD_DB_NAMES, USER_FRIENDLY_NAMES);
+ transaction = environment.beginTransaction(null, null);
+ performUpgradeInternal(logSubject, environment, handler, transaction);
+ transaction.commit();
+ reportFinished(environment, NEW_DB_NAMES, USER_FRIENDLY_NAMES);
+ }
+ catch (Exception e)
+ {
+ transaction.abort();
+ if (e instanceof DatabaseException)
+ {
+ throw (DatabaseException) e;
+ }
+ else if (e instanceof AMQStoreException)
+ {
+ throw (AMQStoreException) e;
+ }
+ else
+ {
+ throw new AMQStoreException("Unexpected exception", e);
+ }
+ }
+ }
+
+ /**
+ * Upgrades from a v5 database to a v6 database
+ *
+ * v6 is the first "new style" schema where we don't version every table, and the upgrade is re-runnable
+ *
+ * Change in this version:
+ *
+ * Message content is moved from the database messageContentDb_v5 to MESSAGE_CONTENT.
+ * The structure of the database changes from
+ * ( message-id: long, chunk-id: int ) -> ( size: int, byte[] data )
+ * to
+ * ( message-id: long) -> ( byte[] data )
+ *
+ * That is we keep only one record per message, which contains all the message content
+ */
+ public void performUpgradeInternal(final LogSubject logSubject, final Environment environment,
+ final UpgradeInteractionHandler handler, final Transaction transaction) throws AMQStoreException
+ {
+ _logger.info("Message Contents");
+ if (environment.getDatabaseNames().contains(OLD_CONTENT_DB_NAME))
+ {
+ DatabaseRunnable contentOperation = new DatabaseRunnable()
+ {
+ @Override
+ public void run(final Database oldContentDatabase, final Database newContentDatabase,
+ Transaction contentTransaction)
+ {
+ CursorOperation metaDataDatabaseOperation = new CursorOperation()
+ {
+
+ @Override
+ public void processEntry(Database metadataDatabase, Database notUsed,
+ Transaction metaDataTransaction, DatabaseEntry key, DatabaseEntry value)
+ {
+ long messageId = LongBinding.entryToLong(key);
+ upgradeMessage(messageId, oldContentDatabase, newContentDatabase, handler, metaDataTransaction,
+ metadataDatabase);
+ }
+ };
+ new DatabaseTemplate(environment, META_DATA_DB_NAME, contentTransaction).run(metaDataDatabaseOperation);
+ _logger.info(metaDataDatabaseOperation.getRowCount() + " Message Content Entries");
+ }
+ };
+ new DatabaseTemplate(environment, OLD_CONTENT_DB_NAME, NEW_CONTENT_DB_NAME, transaction).run(contentOperation);
+ }
+ renameDatabases(environment, transaction);
+ }
+
+ private void renameDatabases(Environment environment, Transaction transaction)
+ {
+ List<String> databases = environment.getDatabaseNames();
+ for (int i = 0; i < OLD_DB_NAMES.length; i++)
+ {
+ String oldName = OLD_DB_NAMES[i];
+ String newName = NEW_DB_NAMES[i];
+ if (databases.contains(oldName))
+ {
+ _logger.info("Renaming " + oldName + " into " + newName);
+ environment.renameDatabase(transaction, oldName, newName);
+ }
+ }
+ }
+
+ /**
+ * Upgrade an individual message, that is read all the data from the old
+ * database, consolidate it into a single byte[] and then (in a transaction)
+ * remove the record from the old database and add the corresponding record
+ * to the new database
+ */
+ private void upgradeMessage(final long messageId, final Database oldDatabase, final Database newDatabase,
+ final UpgradeInteractionHandler handler, Transaction txn, Database oldMetadataDatabase)
+ {
+ SortedMap<Integer, byte[]> messageData = getMessageData(messageId, oldDatabase);
+ byte[] consolidatedData = new byte[0];
+ for (Map.Entry<Integer, byte[]> entry : messageData.entrySet())
+ {
+ int offset = entry.getKey();
+ if (offset != consolidatedData.length)
+ {
+ String message;
+ if (offset < consolidatedData.length)
+ {
+ message = "Missing data in message id " + messageId + " between offset " + consolidatedData.length
+ + " and " + offset + ". ";
+ }
+ else
+ {
+ message = "Duplicate data in message id " + messageId + " between offset " + offset + " and "
+ + consolidatedData.length + ". ";
+ }
+ UpgradeInteractionResponse action = handler.requireResponse(message
+ + "Do you wish do recover as much of this message as "
+ + "possible (answering NO will delete the message)?", ABORT, YES, NO, ABORT);
+
+ switch (action)
+ {
+ case YES:
+ byte[] oldData = consolidatedData;
+ consolidatedData = new byte[offset];
+ System.arraycopy(oldData, 0, consolidatedData, 0, Math.min(oldData.length, consolidatedData.length));
+ break;
+ case NO:
+ DatabaseEntry key = new DatabaseEntry();
+ LongBinding.longToEntry(messageId, key);
+ oldMetadataDatabase.delete(txn, key);
+ return;
+ case ABORT:
+ _logger.error(message);
+ throw new RuntimeException("Unable to upgrade message " + messageId);
+ }
+
+ }
+ byte[] data = new byte[consolidatedData.length + entry.getValue().length];
+ System.arraycopy(consolidatedData, 0, data, 0, consolidatedData.length);
+ System.arraycopy(entry.getValue(), 0, data, offset, entry.getValue().length);
+ consolidatedData = data;
+ }
+
+ CompoundKeyBinding binding = new CompoundKeyBinding();
+ for (int offset : messageData.keySet())
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ binding.objectToEntry(new CompoundKey(messageId, offset), key);
+ oldDatabase.delete(txn, key);
+ }
+ DatabaseEntry key = new DatabaseEntry();
+ LongBinding.longToEntry(messageId, key);
+ NewDataBinding dataBinding = new NewDataBinding();
+ DatabaseEntry value = new DatabaseEntry();
+ dataBinding.objectToEntry(consolidatedData, value);
+
+ newDatabase.put(txn, key, value);
+ }
+
+ /**
+ * @return a (sorted) map of offset -> data for the given message id
+ */
+ private SortedMap<Integer, byte[]> getMessageData(final long messageId, final Database oldDatabase)
+ {
+ TreeMap<Integer, byte[]> data = new TreeMap<Integer, byte[]>();
+
+ Cursor cursor = oldDatabase.openCursor(null, CursorConfig.READ_COMMITTED);
+ try
+ {
+ DatabaseEntry contentKeyEntry = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+ CompoundKeyBinding binding = new CompoundKeyBinding();
+ binding.objectToEntry(new CompoundKey(messageId, 0), contentKeyEntry);
+
+ OperationStatus status = cursor.getSearchKeyRange(contentKeyEntry, value, LockMode.DEFAULT);
+ OldDataBinding dataBinding = new OldDataBinding();
+
+ while (status == OperationStatus.SUCCESS)
+ {
+ CompoundKey compoundKey = binding.entryToObject(contentKeyEntry);
+ long id = compoundKey.getMessageId();
+
+ if (id != messageId)
+ {
+ // we have exhausted all chunks for this message id, break
+ break;
+ }
+
+ int offsetInMessage = compoundKey.getOffset();
+ OldDataValue dataValue = dataBinding.entryToObject(value);
+ data.put(offsetInMessage, dataValue.getData());
+
+ status = cursor.getNext(contentKeyEntry, value, LockMode.DEFAULT);
+ }
+ }
+ finally
+ {
+ cursor.close();
+ }
+
+ return data;
+ }
+
+ static final class CompoundKey
+ {
+ public final long _messageId;
+ public final int _offset;
+
+ public CompoundKey(final long messageId, final int offset)
+ {
+ _messageId = messageId;
+ _offset = offset;
+ }
+
+ public long getMessageId()
+ {
+ return _messageId;
+ }
+
+ public int getOffset()
+ {
+ return _offset;
+ }
+ }
+
+ static final class CompoundKeyBinding extends TupleBinding<CompoundKey>
+ {
+
+ @Override
+ public CompoundKey entryToObject(final TupleInput input)
+ {
+ return new CompoundKey(input.readLong(), input.readInt());
+ }
+
+ @Override
+ public void objectToEntry(final CompoundKey object, final TupleOutput output)
+ {
+ output.writeLong(object._messageId);
+ output.writeInt(object._offset);
+ }
+ }
+
+ static final class OldDataValue
+ {
+ private final int _size;
+ private final byte[] _data;
+
+ private OldDataValue(final int size, final byte[] data)
+ {
+ _size = size;
+ _data = data;
+ }
+
+ public int getSize()
+ {
+ return _size;
+ }
+
+ public byte[] getData()
+ {
+ return _data;
+ }
+ }
+
+ static final class OldDataBinding extends TupleBinding<OldDataValue>
+ {
+
+ @Override
+ public OldDataValue entryToObject(final TupleInput input)
+ {
+ int size = input.readInt();
+ byte[] data = new byte[size];
+ input.read(data);
+ return new OldDataValue(size, data);
+ }
+
+ @Override
+ public void objectToEntry(OldDataValue value, final TupleOutput output)
+ {
+ output.writeInt(value.getSize());
+ output.write(value.getData());
+ }
+ }
+
+ static final class NewDataBinding extends TupleBinding<byte[]>
+ {
+
+ @Override
+ public byte[] entryToObject(final TupleInput input)
+ {
+ byte[] data = new byte[input.available()];
+ input.read(data);
+ return data;
+ }
+
+ @Override
+ public void objectToEntry(final byte[] data, final TupleOutput output)
+ {
+ output.write(data);
+ }
+ }
+
+} \ No newline at end of file
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeInteractionHandler.java
index 468096ccc5..0cedbd15e0 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeInteractionHandler.java
@@ -18,28 +18,20 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.berkeleydb.tuples;
+package org.apache.qpid.server.store.berkeleydb.upgrade;
-import com.sleepycat.bind.tuple.TupleBinding;
-
-import org.apache.qpid.server.store.berkeleydb.records.BindingRecord;
-
-public class BindingTupleBindingFactory extends TupleBindingFactory<BindingRecord>
+public interface UpgradeInteractionHandler
{
- public BindingTupleBindingFactory(int version)
- {
- super(version);
- }
+ UpgradeInteractionResponse requireResponse(String question, UpgradeInteractionResponse defaultResponse,
+ UpgradeInteractionResponse... possibleResponses);
- public TupleBinding<BindingRecord> getInstance()
+ public static final UpgradeInteractionHandler DEFAULT_HANDLER = new UpgradeInteractionHandler()
{
- switch (getVersion())
+ public UpgradeInteractionResponse requireResponse(final String question,
+ final UpgradeInteractionResponse defaultResponse,
+ final UpgradeInteractionResponse... possibleResponses)
{
- default:
- case 5:
- //no change from v4
- case 4:
- return new BindingTuple_4();
+ return defaultResponse;
}
- }
+ };
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeInteractionResponse.java
index affa9a271d..eb5a049a9a 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeInteractionResponse.java
@@ -18,8 +18,11 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.berkeleydb.tuples;
+package org.apache.qpid.server.store.berkeleydb.upgrade;
-public interface QueueTuple
+public enum UpgradeInteractionResponse
{
+ YES,
+ NO,
+ ABORT
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
new file mode 100644
index 0000000000..aa165fea2c
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
@@ -0,0 +1,178 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.upgrade;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
+
+import com.sleepycat.bind.tuple.IntegerBinding;
+import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
+
+public class Upgrader
+{
+ static final String VERSION_DB_NAME = "VERSION";
+
+ private Environment _environment;
+ private LogSubject _logSubject;
+
+ public Upgrader(Environment environment, LogSubject logSubject)
+ {
+ _environment = environment;
+ _logSubject = logSubject;
+ }
+
+ public void upgradeIfNecessary() throws AMQStoreException
+ {
+ boolean isEmpty = _environment.getDatabaseNames().isEmpty();
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(true);
+
+ Database versionDb = null;
+ try
+ {
+ versionDb = _environment.openDatabase(null, VERSION_DB_NAME, dbConfig);
+
+ if(versionDb.count() == 0L)
+ {
+ int sourceVersion = isEmpty ? BDBMessageStore.VERSION: identifyOldStoreVersion();
+ DatabaseEntry key = new DatabaseEntry();
+ IntegerBinding.intToEntry(sourceVersion, key);
+ DatabaseEntry value = new DatabaseEntry();
+ LongBinding.longToEntry(System.currentTimeMillis(), value);
+
+ versionDb.put(null, key, value);
+ }
+
+ int version = getSourceVersion(versionDb);
+
+ performUpgradeFromVersion(version, versionDb);
+ }
+ finally
+ {
+ if (versionDb != null)
+ {
+ versionDb.close();
+ }
+ }
+ }
+
+ int getSourceVersion(Database versionDb)
+ {
+ int version = BDBMessageStore.VERSION + 1;
+ OperationStatus result;
+
+ do
+ {
+ version--;
+ DatabaseEntry key = new DatabaseEntry();
+ IntegerBinding.intToEntry(version, key);
+ DatabaseEntry value = new DatabaseEntry();
+
+ result = versionDb.get(null, key, value, LockMode.READ_COMMITTED);
+ }
+ while(result == OperationStatus.NOTFOUND);
+ return version;
+ }
+
+ void performUpgradeFromVersion(int sourceVersion, Database versionDb)
+ throws AMQStoreException
+ {
+ while(sourceVersion != BDBMessageStore.VERSION)
+ {
+ upgrade(sourceVersion, ++sourceVersion);
+ DatabaseEntry key = new DatabaseEntry();
+ IntegerBinding.intToEntry(sourceVersion, key);
+ DatabaseEntry value = new DatabaseEntry();
+ LongBinding.longToEntry(System.currentTimeMillis(), value);
+ versionDb.put(null, key, value);
+ }
+ }
+
+ void upgrade(final int fromVersion, final int toVersion) throws AMQStoreException
+ {
+ try
+ {
+ @SuppressWarnings("unchecked")
+ Class<StoreUpgrade> upgradeClass =
+ (Class<StoreUpgrade>) Class.forName("org.apache.qpid.server.store.berkeleydb.upgrade."
+ + "UpgradeFrom"+fromVersion+"To"+toVersion);
+ Constructor<StoreUpgrade> ctr = upgradeClass.getConstructor();
+ StoreUpgrade upgrade = ctr.newInstance();
+ upgrade.performUpgrade(_logSubject, _environment, UpgradeInteractionHandler.DEFAULT_HANDLER);
+ }
+ catch (ClassNotFoundException e)
+ {
+ throw new AMQStoreException("Unable to upgrade BDB data store from version " + fromVersion + " to version"
+ + toVersion, e);
+ }
+ catch (NoSuchMethodException e)
+ {
+ throw new AMQStoreException("Unable to upgrade BDB data store from version " + fromVersion + " to version"
+ + toVersion, e);
+ }
+ catch (InvocationTargetException e)
+ {
+ throw new AMQStoreException("Unable to upgrade BDB data store from version " + fromVersion + " to version"
+ + toVersion, e);
+ }
+ catch (InstantiationException e)
+ {
+ throw new AMQStoreException("Unable to upgrade BDB data store from version " + fromVersion + " to version"
+ + toVersion, e);
+ }
+ catch (IllegalAccessException e)
+ {
+ throw new AMQStoreException("Unable to upgrade BDB data store from version " + fromVersion + " to version"
+ + toVersion, e);
+ }
+ }
+
+ private int identifyOldStoreVersion() throws DatabaseException
+ {
+ int version = 0;
+ for (String databaseName : _environment.getDatabaseNames())
+ {
+ if (databaseName.contains("_v"))
+ {
+ int versionIndex = databaseName.indexOf("_v");
+ if (versionIndex == -1)
+ {
+ versionIndex = 1;
+ }
+ version = Integer.parseInt(databaseName.substring(versionIndex + 2));
+ break;
+ }
+ }
+ return version;
+ }
+}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
index 3d30f02b42..74fba168a9 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -58,9 +58,11 @@ import java.util.List;
*/
public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageStoreTest
{
+ private static byte[] CONTENT_BYTES = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
+
/**
- * Tests that message metadata and content are successfully read back from a
- * store after it has been reloaded. Both 0-8 and 0-10 metadata is used to
+ * Tests that message metadata and content are successfully read back from a
+ * store after it has been reloaded. Both 0-8 and 0-10 metadata is used to
* verify their ability to co-exist within the store and be successful retrieved.
*/
public void testBDBMessagePersistence() throws Exception
@@ -73,10 +75,10 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
// Split the content into 2 chunks for the 0-8 message, as per broker behaviour.
// Use a single chunk for the 0-10 message as per broker behaviour.
String bodyText = "jfhdjsflsdhfjdshfjdslhfjdslhfsjlhfsjkhfdsjkhfdsjkfhdslkjf";
-
+
ByteBuffer firstContentBytes_0_8 = ByteBuffer.wrap(bodyText.substring(0, 10).getBytes());
ByteBuffer secondContentBytes_0_8 = ByteBuffer.wrap(bodyText.substring(10).getBytes());
-
+
ByteBuffer completeContentBody_0_10 = ByteBuffer.wrap(bodyText.getBytes());
int bodySize = completeContentBody_0_10.limit();
@@ -100,12 +102,12 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
/*
* Create and insert a 0-10 message (metadata and content)
- */
+ */
MessageProperties msgProps_0_10 = createMessageProperties_0_10(bodySize);
DeliveryProperties delProps_0_10 = createDeliveryProperties_0_10();
Header header_0_10 = new Header(delProps_0_10, msgProps_0_10);
- MessageTransfer xfr_0_10 = new MessageTransfer("destination", MessageAcceptMode.EXPLICIT,
+ MessageTransfer xfr_0_10 = new MessageTransfer("destination", MessageAcceptMode.EXPLICIT,
MessageAcquireMode.PRE_ACQUIRED, header_0_10, completeContentBody_0_10);
MessageMetaData_0_10 messageMetaData_0_10 = new MessageMetaData_0_10(xfr_0_10);
@@ -190,14 +192,14 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
private DeliveryProperties createDeliveryProperties_0_10()
{
DeliveryProperties delProps_0_10 = new DeliveryProperties();
-
+
delProps_0_10.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
delProps_0_10.setImmediate(true);
delProps_0_10.setExchange("exchange12345");
delProps_0_10.setRoutingKey("routingKey12345");
delProps_0_10.setExpiration(5);
delProps_0_10.setPriority(MessageDeliveryPriority.ABOVE_AVERAGE);
-
+
return delProps_0_10;
}
@@ -207,14 +209,14 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
msgProps_0_10.setContentLength(bodySize);
msgProps_0_10.setCorrelationId("qwerty".getBytes());
msgProps_0_10.setContentType("text/html");
-
+
return msgProps_0_10;
}
- /**
+ /**
* Close the provided store and create a new (read-only) store to read back the data.
- *
- * Use this method instead of reloading the virtual host like other tests in order
+ *
+ * Use this method instead of reloading the virtual host like other tests in order
* to avoid the recovery handler deleting the message for not being on a queue.
*/
private BDBMessageStore reloadStoreReadOnly(BDBMessageStore messageStore) throws Exception
@@ -275,7 +277,61 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
props.getHeaders().setString("Test", "MST");
return props;
}
-
+
+ public void testGetContentWithOffset() throws Exception
+ {
+ MessageStore store = getVirtualHost().getMessageStore();
+ BDBMessageStore bdbStore = assertBDBStore(store);
+ StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store);
+ long messageid_0_8 = storedMessage_0_8.getMessageNumber();
+
+ // normal case: offset is 0
+ ByteBuffer dst = ByteBuffer.allocate(10);
+ int length = bdbStore.getContent(messageid_0_8, 0, dst);
+ assertEquals("Unexpected length", CONTENT_BYTES.length, length);
+ byte[] array = dst.array();
+ assertTrue("Unexpected content", Arrays.equals(CONTENT_BYTES, array));
+
+ // offset is in the middle
+ dst = ByteBuffer.allocate(10);
+ length = bdbStore.getContent(messageid_0_8, 5, dst);
+ assertEquals("Unexpected length", 5, length);
+ array = dst.array();
+ byte[] expected = new byte[10];
+ System.arraycopy(CONTENT_BYTES, 5, expected, 0, 5);
+ assertTrue("Unexpected content", Arrays.equals(expected, array));
+
+ // offset beyond the content length
+ dst = ByteBuffer.allocate(10);
+ try
+ {
+ bdbStore.getContent(messageid_0_8, 15, dst);
+ fail("Should fail for the offset greater than message size");
+ }
+ catch (RuntimeException e)
+ {
+ assertEquals("Unexpected exception message", "Offset 15 is greater than message size 10 for message id "
+ + messageid_0_8 + "!", e.getMessage());
+ }
+
+ // buffer is smaller then message size
+ dst = ByteBuffer.allocate(5);
+ length = bdbStore.getContent(messageid_0_8, 0, dst);
+ assertEquals("Unexpected length", 5, length);
+ array = dst.array();
+ expected = new byte[5];
+ System.arraycopy(CONTENT_BYTES, 0, expected, 0, 5);
+ assertTrue("Unexpected content", Arrays.equals(expected, array));
+
+ // buffer is smaller then message size, offset is not 0
+ dst = ByteBuffer.allocate(5);
+ length = bdbStore.getContent(messageid_0_8, 2, dst);
+ assertEquals("Unexpected length", 5, length);
+ array = dst.array();
+ expected = new byte[5];
+ System.arraycopy(CONTENT_BYTES, 2, expected, 0, 5);
+ assertTrue("Unexpected content", Arrays.equals(expected, array));
+ }
/**
* Tests that messages which are added to the store and then removed using the
* public MessageStore interfaces are actually removed from the store by then
@@ -287,11 +343,10 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
MessageStore store = getVirtualHost().getMessageStore();
BDBMessageStore bdbStore = assertBDBStore(store);
- StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreMultiChunkMessage_0_8(store);
+ StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store);
long messageid_0_8 = storedMessage_0_8.getMessageNumber();
-
- //remove the message in the fashion the broker normally would
- storedMessage_0_8.remove();
+
+ bdbStore.removeMessage(messageid_0_8, true);
//verify the removal using the BDB store implementation methods directly
try
@@ -308,10 +363,10 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
//expecting no content, allocate a 1 byte
ByteBuffer dst = ByteBuffer.allocate(1);
- assertEquals("Retrieved content when none was expected",
+ assertEquals("Retrieved content when none was expected",
0, bdbStore.getContent(messageid_0_8, 0, dst));
}
-
+
private BDBMessageStore assertBDBStore(Object store)
{
if(!(store instanceof BDBMessageStore))
@@ -322,15 +377,11 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
return (BDBMessageStore) store;
}
- private StoredMessage<MessageMetaData> createAndStoreMultiChunkMessage_0_8(MessageStore store)
+ private StoredMessage<MessageMetaData> createAndStoreSingleChunkMessage_0_8(MessageStore store)
{
- byte[] body10Bytes = "0123456789".getBytes();
- byte[] body5Bytes = "01234".getBytes();
-
- ByteBuffer chunk1 = ByteBuffer.wrap(body10Bytes);
- ByteBuffer chunk2 = ByteBuffer.wrap(body5Bytes);
+ ByteBuffer chunk1 = ByteBuffer.wrap(CONTENT_BYTES);
- int bodySize = body10Bytes.length + body5Bytes.length;
+ int bodySize = CONTENT_BYTES.length;
//create and store the message using the MessageStore interface
MessagePublishInfo pubInfoBody_0_8 = createPublishInfoBody_0_8();
@@ -342,7 +393,6 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
StoredMessage<MessageMetaData> storedMessage_0_8 = store.addMessage(messageMetaData_0_8);
storedMessage_0_8.addContent(0, chunk1);
- storedMessage_0_8.addContent(chunk1.limit(), chunk2);
storedMessage_0_8.flushToStore();
return storedMessage_0_8;
@@ -360,7 +410,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
BDBMessageStore bdbStore = assertBDBStore(log);
final AMQShortString mockQueueName = new AMQShortString("queueName");
-
+
TransactionLogResource mockQueue = new TransactionLogResource()
{
public String getResourceName()
@@ -368,27 +418,27 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
return mockQueueName.asString();
}
};
-
+
MessageStore.Transaction txn = log.newTransaction();
-
+
txn.enqueueMessage(mockQueue, new MockMessage(1L));
txn.enqueueMessage(mockQueue, new MockMessage(5L));
txn.commitTran();
List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName);
-
+
assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
Long val = enqueuedIds.get(0);
assertEquals("First Message is incorrect", 1L, val.longValue());
val = enqueuedIds.get(1);
assertEquals("Second Message is incorrect", 5L, val.longValue());
}
-
-
+
+
/**
- * Tests transaction rollback before a commit has occurred by utilising the
- * enqueue and dequeue methods available in the TransactionLog interface
- * implemented by the store, and verifying the behaviour using BDB
+ * Tests transaction rollback before a commit has occurred by utilising the
+ * enqueue and dequeue methods available in the TransactionLog interface
+ * implemented by the store, and verifying the behaviour using BDB
* implementation methods.
*/
public void testTranRollbackBeforeCommit() throws Exception
@@ -398,7 +448,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
BDBMessageStore bdbStore = assertBDBStore(log);
final AMQShortString mockQueueName = new AMQShortString("queueName");
-
+
TransactionLogResource mockQueue = new TransactionLogResource()
{
public String getResourceName()
@@ -406,30 +456,30 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
return mockQueueName.asString();
}
};
-
+
MessageStore.Transaction txn = log.newTransaction();
-
+
txn.enqueueMessage(mockQueue, new MockMessage(21L));
txn.abortTran();
-
+
txn = log.newTransaction();
txn.enqueueMessage(mockQueue, new MockMessage(22L));
txn.enqueueMessage(mockQueue, new MockMessage(23L));
txn.commitTran();
List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName);
-
+
assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
Long val = enqueuedIds.get(0);
assertEquals("First Message is incorrect", 22L, val.longValue());
val = enqueuedIds.get(1);
assertEquals("Second Message is incorrect", 23L, val.longValue());
}
-
+
/**
- * Tests transaction rollback after a commit has occurred by utilising the
- * enqueue and dequeue methods available in the TransactionLog interface
- * implemented by the store, and verifying the behaviour using BDB
+ * Tests transaction rollback after a commit has occurred by utilising the
+ * enqueue and dequeue methods available in the TransactionLog interface
+ * implemented by the store, and verifying the behaviour using BDB
* implementation methods.
*/
public void testTranRollbackAfterCommit() throws Exception
@@ -439,7 +489,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
BDBMessageStore bdbStore = assertBDBStore(log);
final AMQShortString mockQueueName = new AMQShortString("queueName");
-
+
TransactionLogResource mockQueue = new TransactionLogResource()
{
public String getResourceName()
@@ -447,22 +497,22 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
return mockQueueName.asString();
}
};
-
+
MessageStore.Transaction txn = log.newTransaction();
-
+
txn.enqueueMessage(mockQueue, new MockMessage(30L));
txn.commitTran();
txn = log.newTransaction();
txn.enqueueMessage(mockQueue, new MockMessage(31L));
txn.abortTran();
-
+
txn = log.newTransaction();
txn.enqueueMessage(mockQueue, new MockMessage(32L));
txn.commitTran();
-
+
List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName);
-
+
assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
Long val = enqueuedIds.get(0);
assertEquals("First Message is incorrect", 30L, val.longValue());
@@ -470,6 +520,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
assertEquals("Second Message is incorrect", 32L, val.longValue());
}
+ @SuppressWarnings("rawtypes")
private static class MockMessage implements ServerMessage, EnqueableMessage
{
private long _messageId;
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java
index bcbb7d8b72..122f846a2d 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java
@@ -20,19 +20,36 @@
*/
package org.apache.qpid.server.store.berkeleydb;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+
import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.url.URLSyntaxException;
-import javax.jms.*;
-
/**
- * Prepares an older version brokers BDB store with the required
+ * Prepares an older version brokers BDB store with the required
* contents for use in the BDBStoreUpgradeTest.
*
* NOTE: Must be used with the equivalent older version client!
*
- * The store will then be used to verify that the upgraded is
- * completed properly and that once upgraded it functions as
+ * The store will then be used to verify that the upgraded is
+ * completed properly and that once upgraded it functions as
* expected with the new broker.
*
*/
@@ -43,9 +60,10 @@ public class BDBStoreUpgradeTestPreparer
public static final String SELECTOR_SUB_NAME="mySelectorDurSubName";
public static final String SELECTOR_TOPIC_NAME="mySelectorUpgradeTopic";
public static final String QUEUE_NAME="myUpgradeQueue";
+ public static final String NON_DURABLE_QUEUE_NAME="queue-non-durable";
private static AMQConnectionFactory _connFac;
- private static final String CONN_URL =
+ private static final String CONN_URL =
"amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672'";
/**
@@ -59,14 +77,28 @@ public class BDBStoreUpgradeTestPreparer
private void prepareBroker() throws Exception
{
prepareQueues();
+ prepareNonDurableQueue();
prepareDurableSubscriptionWithSelector();
prepareDurableSubscriptionWithoutSelector();
}
+ private void prepareNonDurableQueue() throws Exception
+ {
+ Connection connection = _connFac.createConnection();
+ AMQSession<?, ?> session = (AMQSession<?,?>)connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ AMQShortString queueName = AMQShortString.valueOf(NON_DURABLE_QUEUE_NAME);
+ AMQDestination destination = (AMQDestination) session.createQueue(NON_DURABLE_QUEUE_NAME);
+ session.sendCreateQueue(queueName, false, false, false, null);
+ session.bindQueue(queueName, queueName, null, AMQShortString.valueOf("amq.direct"), destination);
+ MessageProducer messageProducer = session.createProducer(destination);
+ sendMessages(session, messageProducer, destination, DeliveryMode.PERSISTENT, 1024, 3);
+ connection.close();
+ }
+
/**
* Prepare a queue for use in testing message and binding recovery
* after the upgrade is performed.
- *
+ *
* - Create a transacted session on the connection.
* - Use a consumer to create the (durable by default) queue.
* - Send 5 large messages to test (multi-frame) content recovery.
@@ -74,7 +106,7 @@ public class BDBStoreUpgradeTestPreparer
* - Commit the session.
* - Send 5 small messages to test that uncommitted messages are not recovered.
* following the upgrade.
- * - Close the session.
+ * - Close the session.
*/
private void prepareQueues() throws Exception
{
@@ -114,9 +146,9 @@ public class BDBStoreUpgradeTestPreparer
}
/**
- * Prepare a DurableSubscription backing queue for use in testing selector
+ * Prepare a DurableSubscription backing queue for use in testing selector
* recovery and queue exclusivity marking during the upgrade process.
- *
+ *
* - Create a transacted session on the connection.
* - Open and close a DurableSubscription with selector to create the backing queue.
* - Send a message which matches the selector.
@@ -145,7 +177,7 @@ public class BDBStoreUpgradeTestPreparer
TopicSubscriber durSub1 = session.createDurableSubscriber(topic, SELECTOR_SUB_NAME,"testprop='true'", false);
durSub1.close();
- // Create a publisher and send a persistent message which matches the selector
+ // Create a publisher and send a persistent message which matches the selector
// followed by one that does not match, and another which matches but is not
// committed and so should be 'lost'
TopicSession pubSession = connection.createTopicSession(true, Session.SESSION_TRANSACTED);
@@ -202,7 +234,7 @@ public class BDBStoreUpgradeTestPreparer
connection.close();
}
- public static void sendMessages(Session session, MessageProducer messageProducer,
+ public static void sendMessages(Session session, MessageProducer messageProducer,
Destination dest, int deliveryMode, int length, int numMesages) throws JMSException
{
for (int i = 1; i <= numMesages; i++)
@@ -213,7 +245,7 @@ public class BDBStoreUpgradeTestPreparer
}
}
- public static void publishMessages(Session session, TopicPublisher publisher,
+ public static void publishMessages(Session session, TopicPublisher publisher,
Destination dest, int deliveryMode, int length, int numMesages, String selectorProperty) throws JMSException
{
for (int i = 1; i <= numMesages; i++)
@@ -227,8 +259,8 @@ public class BDBStoreUpgradeTestPreparer
/**
* Generates a string of a given length consisting of the sequence 0,1,2,..,9,0,1,2.
- *
- * @param length number of characters in the string
+ *
+ * @param length number of characters in the string
* @return string sequence of the given length
*/
public static String generateString(int length)
@@ -248,6 +280,7 @@ public class BDBStoreUpgradeTestPreparer
*/
public static void main(String[] args) throws Exception
{
+ System.setProperty("qpid.dest_syntax", "BURL");
BDBStoreUpgradeTestPreparer producer = new BDBStoreUpgradeTestPreparer();
producer.prepareBroker();
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
index 55327e3b56..a708ceac1c 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
@@ -20,36 +20,14 @@
*/
package org.apache.qpid.server.store.berkeleydb;
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.je.DatabaseEntry;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
-import org.apache.qpid.management.common.mbeans.ManagedQueue;
-import org.apache.qpid.server.message.EnqueableMessage;
-import org.apache.qpid.server.message.MessageMetaData;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_4;
-import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTupleBindingFactory;
-import org.apache.qpid.server.store.berkeleydb.tuples.MessageMetaDataTupleBindingFactory;
-import org.apache.qpid.test.utils.JMXTestUtils;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.util.FileUtils;
-
+import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.NON_DURABLE_QUEUE_NAME;
import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_NAME;
-import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SUB_NAME;
-import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.TOPIC_NAME;
import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SELECTOR_SUB_NAME;
import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SELECTOR_TOPIC_NAME;
+import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SUB_NAME;
+import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.TOPIC_NAME;
+
+import java.io.File;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
@@ -64,18 +42,18 @@ import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
+
+import org.apache.qpid.management.common.mbeans.ManagedQueue;
+import org.apache.qpid.test.utils.JMXTestUtils;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * Tests upgrading a BDB store and using it with the new broker
- * after the required contents are entered into the store using
- * an old broker with the BDBStoreUpgradeTestPreparer. The store
- * will then be used to verify that the upgraded is completed
- * properly and that once upgraded it functions as expected with
- * the new broker.
+ * Tests upgrading a BDB store on broker startup.
+ * The store will then be used to verify that the upgrade is completed
+ * properly and that once upgraded it functions as expected.
*/
public class BDBUpgradeTest extends QpidBrokerTestCase
{
@@ -84,73 +62,31 @@ public class BDBUpgradeTest extends QpidBrokerTestCase
private static final String STRING_1024 = BDBStoreUpgradeTestPreparer.generateString(1024);
private static final String STRING_1024_256 = BDBStoreUpgradeTestPreparer.generateString(1024*256);
private static final String QPID_WORK_ORIG = System.getProperty("QPID_WORK");
- private static final String QPID_HOME = System.getProperty("QPID_HOME");
- private static final int VERSION_4 = 4;
- private String _fromDir;
- private String _toDir;
- private String _toDirTwice;
+ private String _storeLocation;
@Override
public void setUp() throws Exception
{
assertNotNull("QPID_WORK must be set", QPID_WORK_ORIG);
- assertNotNull("QPID_HOME must be set", QPID_HOME);
-
- _fromDir = QPID_HOME + "/bdbstore-to-upgrade/test-store";
- _toDir = getWorkDirBaseDir() + "/bdbstore/test-store";
- _toDirTwice = getWorkDirBaseDir() + "/bdbstore-upgraded-twice";
+ _storeLocation = QPID_WORK_ORIG + "/bdbstore/test-store";
//Clear the two target directories if they exist.
- File directory = new File(_toDir);
- if (directory.exists() && directory.isDirectory())
- {
- FileUtils.delete(directory, true);
- }
- directory = new File(_toDirTwice);
+ File directory = new File(_storeLocation);
if (directory.exists() && directory.isDirectory())
{
FileUtils.delete(directory, true);
}
- //Upgrade the test store.
- upgradeBrokerStore(_fromDir, _toDir);
+ // copy store files
+ String src = getClass().getClassLoader().getResource("upgrade/bdbstore-v4/test-store").toURI().getPath();
+ FileUtils.copyRecursive(new File(src), new File(_storeLocation));
//override the broker config used and then start the broker with the updated store
_configFile = new File("build/etc/config-systests-bdb.xml");
setConfigurationProperty("management.enabled", "true");
- super.setUp();
- }
-
- private String getWorkDirBaseDir()
- {
- return QPID_WORK_ORIG + (isInternalBroker() ? "" : "/" + getPort());
- }
-
- /**
- * Tests that the core upgrade method of the store upgrade tool passes through the exception
- * from the BDBMessageStore indicating that the data on disk can't be loaded as the previous
- * version because it has already been upgraded.
- * @throws Exception
- */
- public void testMultipleUpgrades() throws Exception
- {
- //stop the broker started by setUp() in order to allow the second upgrade attempt to proceed
- stopBroker();
-
- try
- {
- new BDBStoreUpgrade(_toDir, _toDirTwice, null, false, true).upgradeFromVersion(VERSION_4);
- fail("Second Upgrade Succeeded");
- }
- catch (Exception e)
- {
- System.err.println("Showing stack trace, we are expecting an 'Unable to load BDBStore' error");
- e.printStackTrace();
- assertTrue("Incorrect Exception Thrown:" + e.getMessage(),
- e.getMessage().contains("Unable to load BDBStore as version 4. Store on disk contains version 5 data"));
- }
+ super.setUp();
}
/**
@@ -175,26 +111,26 @@ public class BDBUpgradeTest extends QpidBrokerTestCase
try
{
ManagedQueue dursubQueue = jmxUtils.getManagedQueue("clientid" + ":" + SELECTOR_SUB_NAME);
- assertEquals("DurableSubscription backing queue should have 1 message on it initially",
+ assertEquals("DurableSubscription backing queue should have 1 message on it initially",
new Integer(1), dursubQueue.getMessageCount());
-
+
// Create a connection and start it
TopicConnection connection = (TopicConnection) getConnection();
connection.start();
-
+
// Send messages which don't match and do match the selector, checking message count
- TopicSession pubSession = connection.createTopicSession(true, org.apache.qpid.jms.Session.SESSION_TRANSACTED);
+ TopicSession pubSession = connection.createTopicSession(true, Session.SESSION_TRANSACTED);
Topic topic = pubSession.createTopic(SELECTOR_TOPIC_NAME);
TopicPublisher publisher = pubSession.createPublisher(topic);
-
+
BDBStoreUpgradeTestPreparer.publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "false");
pubSession.commit();
- assertEquals("DurableSubscription backing queue should still have 1 message on it",
+ assertEquals("DurableSubscription backing queue should still have 1 message on it",
Integer.valueOf(1), dursubQueue.getMessageCount());
-
+
BDBStoreUpgradeTestPreparer.publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "true");
pubSession.commit();
- assertEquals("DurableSubscription backing queue should now have 2 messages on it",
+ assertEquals("DurableSubscription backing queue should now have 2 messages on it",
Integer.valueOf(2), dursubQueue.getMessageCount());
TopicSubscriber durSub = pubSession.createDurableSubscriber(topic, SELECTOR_SUB_NAME,"testprop='true'", false);
@@ -240,7 +176,7 @@ public class BDBUpgradeTest extends QpidBrokerTestCase
connection.start();
// Send new message matching the topic, checking message count
- TopicSession session = connection.createTopicSession(true, org.apache.qpid.jms.Session.SESSION_TRANSACTED);
+ TopicSession session = connection.createTopicSession(true, Session.SESSION_TRANSACTED);
Topic topic = session.createTopic(TOPIC_NAME);
TopicPublisher publisher = session.createPublisher(topic);
@@ -298,10 +234,10 @@ public class BDBUpgradeTest extends QpidBrokerTestCase
/**
* Test that the upgraded queue continues to function properly when used
- * for persistent messaging and restarting the broker.
- *
+ * for persistent messaging and restarting the broker.
+ *
* Sends the new messages to the queue BEFORE consuming those which were
- * sent before the upgrade. In doing so, this also serves to test that
+ * sent before the upgrade. In doing so, this also serves to test that
* the queue bindings were successfully transitioned during the upgrade.
*/
public void testBindingAndMessageDurabability() throws Exception
@@ -329,7 +265,7 @@ public class BDBUpgradeTest extends QpidBrokerTestCase
}
/**
- * Test that all of the committed persistent messages previously sent to
+ * Test that all of the committed persistent messages previously sent to
* the broker are properly received following update of the MetaData and
* Content entries during the store upgrade process.
*/
@@ -349,200 +285,22 @@ public class BDBUpgradeTest extends QpidBrokerTestCase
*
* @throws Exception
*/
- public void testMigrationOfMessagesForNonExistingQueues() throws Exception
+ public void testMigrationOfMessagesForNonDurableQueues() throws Exception
{
- stopBroker();
-
- // copy store data into a new location for adding of phantom message
- File storeLocation = new File(_fromDir);
- File target = new File(_toDirTwice);
- if (!target.exists())
- {
- target.mkdirs();
- }
- FileUtils.copyRecursive(storeLocation, target);
-
- // delete migrated data
- File directory = new File(_toDir);
- if (directory.exists() && directory.isDirectory())
- {
- FileUtils.delete(directory, true);
- }
-
- // test data
- String nonExistingQueueName = getTestQueueName();
- String messageText = "Test Phantom Message";
-
- // add message
- addMessageForNonExistingQueue(target, VERSION_4, nonExistingQueueName, messageText);
-
- String[] inputs = { "Yes", "Yes", "Yes" };
- upgradeBrokerStoreInInterractiveMode(_toDirTwice, _toDir, inputs);
-
- // start broker
- startBroker();
-
// Create a connection and start it
Connection connection = getConnection();
connection.start();
// consume a message for non-existing store
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = session.createQueue(nonExistingQueueName);
+ Queue queue = session.createQueue(NON_DURABLE_QUEUE_NAME);
MessageConsumer messageConsumer = session.createConsumer(queue);
- Message message = messageConsumer.receive(1000);
- // assert consumed message
- assertNotNull("Message was not migrated!", message);
- assertTrue("Unexpected message received!", message instanceof TextMessage);
- String text = ((TextMessage) message).getText();
- assertEquals("Message migration failed!", messageText, text);
- }
-
- /**
- * An utility method to upgrade broker with simulation user interactions
- *
- * @param fromDir
- * location of the store to migrate
- * @param toDir
- * location of where migrated data will be stored
- * @param inputs
- * user answers on upgrade tool questions
- * @throws Exception
- */
- private void upgradeBrokerStoreInInterractiveMode(String fromDir, String toDir, final String[] inputs)
- throws Exception
- {
- // save to restore system.in after data migration
- InputStream stdin = System.in;
-
- // set fake system in to simulate user interactions
- // FIXME: it is a quite dirty simulator of system input but it does the job
- System.setIn(new InputStream()
- {
-
- private int counter = 0;
-
- public synchronized int read(byte b[], int off, int len)
- {
- byte[] src = (inputs[counter] + "\n").getBytes();
- System.arraycopy(src, 0, b, off, src.length);
- counter++;
- return src.length;
- }
-
- @Override
- public int read() throws IOException
- {
- return -1;
- }
- });
-
- try
- {
- // Upgrade the test store.
- new BDBStoreUpgrade(fromDir, toDir, null, true, true).upgradeFromVersion(VERSION_4);
- }
- finally
+ for (int i = 0; i < 3; i++)
{
- // restore system in
- System.setIn(stdin);
- }
- }
-
- @SuppressWarnings("unchecked")
- private void addMessageForNonExistingQueue(File storeLocation, int storeVersion, String nonExistingQueueName,
- String messageText) throws Exception
- {
- final AMQShortString queueName = new AMQShortString(nonExistingQueueName);
- BDBMessageStore store = new BDBMessageStore(storeVersion);
- store.configure(storeLocation, false);
- try
- {
- store.start();
-
- // store message objects
- ByteBuffer completeContentBody = ByteBuffer.wrap(messageText.getBytes("UTF-8"));
- long bodySize = completeContentBody.limit();
- MessagePublishInfo pubInfoBody = new MessagePublishInfoImpl(new AMQShortString("amq.direct"), false,
- false, queueName);
- BasicContentHeaderProperties props = new BasicContentHeaderProperties();
- props.setDeliveryMode(Integer.valueOf(BasicContentHeaderProperties.PERSISTENT).byteValue());
- props.setContentType("text/plain");
- props.setType("text/plain");
- props.setMessageId("whatever");
- props.setEncoding("UTF-8");
- props.getHeaders().setString("Test", "MST");
- MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
- int classForBasic = methodRegistry.createBasicQosOkBody().getClazz();
- ContentHeaderBody contentHeaderBody = new ContentHeaderBody(classForBasic, 1, props, bodySize);
-
- // add content entry to database
- final long messageId = store.getNewMessageId();
- TupleBinding<MessageContentKey> contentKeyTB = new MessageContentKeyTupleBindingFactory(storeVersion).getInstance();
- MessageContentKey contentKey = null;
- if (storeVersion == VERSION_4)
- {
- contentKey = new MessageContentKey_4(messageId, 0);
- }
- else
- {
- throw new Exception(storeVersion + " is not supported");
- }
- DatabaseEntry key = new DatabaseEntry();
- contentKeyTB.objectToEntry(contentKey, key);
- DatabaseEntry data = new DatabaseEntry();
- ContentTB contentTB = new ContentTB();
- contentTB.objectToEntry(completeContentBody, data);
- store.getContentDb().put(null, key, data);
-
- // add meta data entry to database
- TupleBinding<Long> longTB = TupleBinding.getPrimitiveBinding(Long.class);
- TupleBinding<Object> metaDataTB = new MessageMetaDataTupleBindingFactory(storeVersion).getInstance();
- key = new DatabaseEntry();
- data = new DatabaseEntry();
- longTB.objectToEntry(new Long(messageId), key);
- MessageMetaData metaData = new MessageMetaData(pubInfoBody, contentHeaderBody, 1);
- metaDataTB.objectToEntry(metaData, data);
- store.getMetaDataDb().put(null, key, data);
-
- // add delivery entry to database
- TransactionLogResource mockQueue = new TransactionLogResource()
- {
- public String getResourceName()
- {
- return queueName.asString();
- }
- };
-
- EnqueableMessage mockMessage = new EnqueableMessage()
- {
-
- public long getMessageNumber()
- {
- return messageId;
- }
-
- public boolean isPersistent()
- {
- return true;
- }
-
- public StoredMessage getStoredMessage()
- {
- return null;
- }
- };
-
- MessageStore log = (MessageStore) store;
- MessageStore.Transaction txn = log.newTransaction();
- txn.enqueueMessage(mockQueue, mockMessage);
- txn.commitTran();
- }
- finally
- {
- // close store
- store.close();
+ Message message = messageConsumer.receive(1000);
+ assertNotNull("Message was not migrated!", message);
+ assertTrue("Unexpected message received!", message instanceof TextMessage);
}
}
@@ -564,7 +322,7 @@ public class BDBUpgradeTest extends QpidBrokerTestCase
}
- // Retrieve the matching message
+ // Retrieve the matching message
Message m = durSub.receive(2000);
assertNotNull("Failed to receive an expected message", m);
if(selector)
@@ -623,8 +381,4 @@ public class BDBUpgradeTest extends QpidBrokerTestCase
session.close();
}
- private void upgradeBrokerStore(String fromDir, String toDir) throws Exception
- {
- new BDBStoreUpgrade(_fromDir, _toDir, null, false, true).upgradeFromVersion(VERSION_4);
- }
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java
new file mode 100644
index 0000000000..6df2f8a8db
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java
@@ -0,0 +1,151 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.upgrade;
+
+import java.io.File;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.subjects.TestBlankSubject;
+import org.apache.qpid.util.FileUtils;
+
+import com.sleepycat.je.Database;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.Transaction;
+
+public abstract class AbstractUpgradeTestCase extends TestCase
+{
+ protected static final class StaticAnswerHandler implements UpgradeInteractionHandler
+ {
+ private UpgradeInteractionResponse _response;
+
+ public StaticAnswerHandler(UpgradeInteractionResponse response)
+ {
+ _response = response;
+ }
+
+ @Override
+ public UpgradeInteractionResponse requireResponse(String question, UpgradeInteractionResponse defaultResponse,
+ UpgradeInteractionResponse... possibleResponses)
+ {
+ return _response;
+ }
+ }
+
+ public static final String[] QUEUE_NAMES = { "clientid:myDurSubName", "clientid:mySelectorDurSubName", "myUpgradeQueue",
+ "queue-non-durable" };
+ public static int[] QUEUE_SIZES = { 1, 1, 10, 3 };
+ public static int TOTAL_MESSAGE_NUMBER = 15;
+ protected static final LogSubject LOG_SUBJECT = new TestBlankSubject();
+ protected static final String TMP_FOLDER = System.getProperty("java.io.tmpdir");
+
+ // one binding per exchange
+ protected static final int TOTAL_BINDINGS = QUEUE_NAMES.length * 2;
+ protected static final int TOTAL_EXCHANGES = 5;
+
+ private File _storeLocation;
+ protected Environment _environment;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _storeLocation = copyStore(getStoreDirectoryName());
+
+ _environment = createEnvironment(_storeLocation);
+ }
+
+ /** @return eg "bdbstore-v4" - used for copying store */
+ protected abstract String getStoreDirectoryName();
+
+ protected Environment createEnvironment(File storeLocation)
+ {
+ EnvironmentConfig envConfig = new EnvironmentConfig();
+ envConfig.setAllowCreate(true);
+ envConfig.setTransactional(true);
+ envConfig.setConfigParam("je.lock.nLockTables", "7");
+ envConfig.setReadOnly(false);
+ envConfig.setSharedCache(false);
+ envConfig.setCacheSize(0);
+ return new Environment(storeLocation, envConfig);
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ try
+ {
+ _environment.close();
+ }
+ finally
+ {
+ _environment = null;
+ deleteDirectoryIfExists(_storeLocation);
+ }
+ super.tearDown();
+ }
+
+ private File copyStore(String storeDirectoryName) throws Exception
+ {
+ String src = getClass().getClassLoader().getResource("upgrade/" + storeDirectoryName).toURI().getPath();
+ File storeLocation = new File(new File(TMP_FOLDER), "test-store");
+ deleteDirectoryIfExists(storeLocation);
+ FileUtils.copyRecursive(new File(src), new File(TMP_FOLDER));
+ return storeLocation;
+ }
+
+ protected void deleteDirectoryIfExists(File dir)
+ {
+ if (dir.exists())
+ {
+ assertTrue("The provided file " + dir + " is not a directory", dir.isDirectory());
+
+ boolean deletedSuccessfully = FileUtils.delete(dir, true);
+
+ assertTrue("Files at '" + dir + "' should have been deleted", deletedSuccessfully);
+ }
+ }
+
+ protected void assertDatabaseRecordCount(String databaseName, final long expectedCountNumber)
+ {
+ long count = getDatabaseCount(databaseName);
+ assertEquals("Unexpected database '" + databaseName + "' entry number", expectedCountNumber, count);
+ }
+
+ protected long getDatabaseCount(String databaseName)
+ {
+ DatabaseCallable<Long> operation = new DatabaseCallable<Long>()
+ {
+
+ @Override
+ public Long call(Database sourceDatabase, Database targetDatabase, Transaction transaction)
+ {
+ return new Long(sourceDatabase.count());
+
+ }
+ };
+ Long count = new DatabaseTemplate(_environment, databaseName, null).call(operation);
+ return count.longValue();
+ }
+
+}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseTemplateTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseTemplateTest.java
new file mode 100644
index 0000000000..7ec442b73d
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseTemplateTest.java
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.upgrade;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import junit.framework.TestCase;
+
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.Transaction;
+
+public class DatabaseTemplateTest extends TestCase
+{
+ private static final String SOURCE_DATABASE = "sourceDatabase";
+ private Environment _environment;
+ private Database _sourceDatabase;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _environment = mock(Environment.class);
+ _sourceDatabase = mock(Database.class);
+ when(_environment.openDatabase(any(Transaction.class), same(SOURCE_DATABASE), isA(DatabaseConfig.class)))
+ .thenReturn(_sourceDatabase);
+ }
+
+ public void testExecuteWithTwoDatabases()
+ {
+ String targetDatabaseName = "targetDatabase";
+ Database targetDatabase = mock(Database.class);
+
+ Transaction txn = mock(Transaction.class);
+
+ when(_environment.openDatabase(same(txn), same(targetDatabaseName), isA(DatabaseConfig.class)))
+ .thenReturn(targetDatabase);
+
+ DatabaseTemplate databaseTemplate = new DatabaseTemplate(_environment, SOURCE_DATABASE, targetDatabaseName, txn);
+
+ DatabaseRunnable databaseOperation = mock(DatabaseRunnable.class);
+ databaseTemplate.run(databaseOperation);
+
+ verify(databaseOperation).run(_sourceDatabase, targetDatabase, txn);
+ verify(_sourceDatabase).close();
+ verify(targetDatabase).close();
+ }
+
+ public void testExecuteWithOneDatabases()
+ {
+ DatabaseTemplate databaseTemplate = new DatabaseTemplate(_environment, SOURCE_DATABASE, null, null);
+
+ DatabaseRunnable databaseOperation = mock(DatabaseRunnable.class);
+ databaseTemplate.run(databaseOperation);
+
+ verify(databaseOperation).run(_sourceDatabase, (Database)null, (Transaction)null);
+ verify(_sourceDatabase).close();
+ }
+
+}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java
new file mode 100644
index 0000000000..9341022a38
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java
@@ -0,0 +1,299 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.upgrade;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer;
+import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.BindingRecord;
+import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.BindingTuple;
+import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.MessageContentKey;
+import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.MessageContentKeyBinding;
+import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.QueueEntryKey;
+import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.QueueEntryKeyBinding;
+import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.QueueRecord;
+
+import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.Transaction;
+
+public class UpgradeFrom4to5Test extends AbstractUpgradeTestCase
+{
+ private static final String NON_DURABLE_QUEUE = BDBStoreUpgradeTestPreparer.NON_DURABLE_QUEUE_NAME;
+ private static final String DURABLE_QUEUE = BDBStoreUpgradeTestPreparer.QUEUE_NAME;
+ private static final String DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR = "clientid:mySelectorDurSubName";
+ private static final String DURABLE_SUBSCRIPTION_QUEUE = "clientid:myDurSubName";
+ private static final String EXCHANGE_DB_NAME = "exchangeDb_v5";
+ private static final String MESSAGE_META_DATA_DB_NAME = "messageMetaDataDb_v5";
+ private static final String MESSAGE_CONTENT_DB_NAME = "messageContentDb_v5";
+ private static final String DELIVERY_DB_NAME = "deliveryDb_v5";
+ private static final String BINDING_DB_NAME = "queueBindingsDb_v5";
+
+ @Override
+ protected String getStoreDirectoryName()
+ {
+ return "bdbstore-v4";
+ }
+
+ public void testPerformUpgradeWithHandlerAnsweringYes() throws Exception
+ {
+ UpgradeFrom4To5 upgrade = new UpgradeFrom4To5();
+ upgrade.performUpgrade(LOG_SUBJECT, _environment, new StaticAnswerHandler(UpgradeInteractionResponse.YES));
+
+ assertQueues(new HashSet<String>(Arrays.asList(QUEUE_NAMES)));
+
+ assertDatabaseRecordCount(DELIVERY_DB_NAME, TOTAL_MESSAGE_NUMBER);
+ assertDatabaseRecordCount(MESSAGE_META_DATA_DB_NAME, TOTAL_MESSAGE_NUMBER);
+ assertDatabaseRecordCount(EXCHANGE_DB_NAME, TOTAL_EXCHANGES);
+
+ for (int i = 0; i < QUEUE_SIZES.length; i++)
+ {
+ assertQueueMessages(QUEUE_NAMES[i], QUEUE_SIZES[i]);
+ }
+
+ final List<BindingRecord> queueBindings = loadBindings();
+
+ assertEquals("Unxpected list size", TOTAL_BINDINGS, queueBindings.size());
+ assertBindingRecord(queueBindings, DURABLE_SUBSCRIPTION_QUEUE, "amq.topic", BDBStoreUpgradeTestPreparer.TOPIC_NAME, "");
+ assertBindingRecord(queueBindings, DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, "amq.topic",
+ BDBStoreUpgradeTestPreparer.SELECTOR_TOPIC_NAME, "testprop='true'");
+ assertBindingRecord(queueBindings, DURABLE_QUEUE, "amq.direct", DURABLE_QUEUE, null);
+ assertBindingRecord(queueBindings, NON_DURABLE_QUEUE, "amq.direct", NON_DURABLE_QUEUE, null);
+ assertContent();
+ }
+
+ public void testPerformUpgradeWithHandlerAnsweringNo() throws Exception
+ {
+ UpgradeFrom4To5 upgrade = new UpgradeFrom4To5();
+ upgrade.performUpgrade(LOG_SUBJECT, _environment, new StaticAnswerHandler(UpgradeInteractionResponse.NO));
+ assertQueues(new HashSet<String>(Arrays.asList(DURABLE_SUBSCRIPTION_QUEUE, DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, DURABLE_QUEUE)));
+
+ assertDatabaseRecordCount(DELIVERY_DB_NAME, 12);
+ assertDatabaseRecordCount(MESSAGE_META_DATA_DB_NAME, 12);
+ assertDatabaseRecordCount(EXCHANGE_DB_NAME, TOTAL_EXCHANGES);
+
+ assertQueueMessages(DURABLE_SUBSCRIPTION_QUEUE, 1);
+ assertQueueMessages(DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, 1);
+ assertQueueMessages(DURABLE_QUEUE, 10);
+
+ final List<BindingRecord> queueBindings = loadBindings();
+
+ assertEquals("Unxpected list size", TOTAL_BINDINGS - 2, queueBindings.size());
+ assertBindingRecord(queueBindings, DURABLE_SUBSCRIPTION_QUEUE, "amq.topic", BDBStoreUpgradeTestPreparer.TOPIC_NAME,
+ "");
+ assertBindingRecord(queueBindings, DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, "amq.topic",
+ BDBStoreUpgradeTestPreparer.SELECTOR_TOPIC_NAME, "testprop='true'");
+ assertBindingRecord(queueBindings, DURABLE_QUEUE, "amq.direct", DURABLE_QUEUE, null);
+ assertContent();
+ }
+
+ private List<BindingRecord> loadBindings()
+ {
+ final BindingTuple bindingTuple = new BindingTuple();
+ final List<BindingRecord> queueBindings = new ArrayList<BindingRecord>();
+ CursorOperation databaseOperation = new CursorOperation()
+ {
+
+ @Override
+ public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
+ DatabaseEntry key, DatabaseEntry value)
+ {
+ BindingRecord bindingRecord = bindingTuple.entryToObject(key);
+
+ AMQShortString queueName = bindingRecord.getQueueName();
+ AMQShortString exchangeName = bindingRecord.getExchangeName();
+ AMQShortString routingKey = bindingRecord.getRoutingKey();
+ FieldTable arguments = bindingRecord.getArguments();
+ queueBindings.add(new BindingRecord(exchangeName, queueName, routingKey, arguments));
+ }
+ };
+ new DatabaseTemplate(_environment, BINDING_DB_NAME, null).run(databaseOperation);
+ return queueBindings;
+ }
+
+ private void assertBindingRecord(List<BindingRecord> queueBindings, String queueName, String exchangeName,
+ String routingKey, String selectorKey)
+ {
+ BindingRecord record = null;
+ for (BindingRecord bindingRecord : queueBindings)
+ {
+ if (bindingRecord.getQueueName().asString().equals(queueName)
+ && bindingRecord.getExchangeName().asString().equals(exchangeName))
+ {
+ record = bindingRecord;
+ break;
+ }
+ }
+ assertNotNull("Binding is not found for queue " + queueName + " and exchange " + exchangeName, record);
+ assertEquals("Unexpected routing key", routingKey, record.getRoutingKey().asString());
+
+ if (selectorKey != null)
+ {
+ assertEquals("Unexpected selector key for " + queueName, selectorKey,
+ record.getArguments().get(AMQPFilterTypes.JMS_SELECTOR.getValue()));
+ }
+ }
+
+ private void assertQueueMessages(final String queueName, final int expectedQueueSize)
+ {
+ final Set<Long> messageIdsForQueue = assertDeliveriesForQueue(queueName, expectedQueueSize);
+
+ assertMetadataForQueue(queueName, expectedQueueSize, messageIdsForQueue);
+
+ assertContentForQueue(queueName, expectedQueueSize, messageIdsForQueue);
+ }
+
+ private Set<Long> assertDeliveriesForQueue(final String queueName, final int expectedQueueSize)
+ {
+ final QueueEntryKeyBinding queueEntryKeyBinding = new QueueEntryKeyBinding();
+ final AtomicInteger deliveryCounter = new AtomicInteger();
+ final Set<Long> messagesForQueue = new HashSet<Long>();
+
+ CursorOperation deliveryDatabaseOperation = new CursorOperation()
+ {
+ @Override
+ public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
+ DatabaseEntry key, DatabaseEntry value)
+ {
+ QueueEntryKey entryKey = queueEntryKeyBinding.entryToObject(key);
+ String thisQueueName = entryKey.getQueueName().asString();
+ if (thisQueueName.equals(queueName))
+ {
+ deliveryCounter.incrementAndGet();
+ messagesForQueue.add(entryKey.getMessageId());
+ }
+ }
+ };
+ new DatabaseTemplate(_environment, DELIVERY_DB_NAME, null).run(deliveryDatabaseOperation);
+
+ assertEquals("Unxpected number of entries in delivery db for queue " + queueName, expectedQueueSize,
+ deliveryCounter.get());
+
+ return messagesForQueue;
+ }
+
+ private void assertMetadataForQueue(final String queueName, final int expectedQueueSize,
+ final Set<Long> messageIdsForQueue)
+ {
+ final AtomicInteger metadataCounter = new AtomicInteger();
+ CursorOperation databaseOperation = new CursorOperation()
+ {
+
+ @Override
+ public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
+ DatabaseEntry key, DatabaseEntry value)
+ {
+ Long messageId = LongBinding.entryToLong(key);
+
+ boolean messageIsForTheRightQueue = messageIdsForQueue.contains(messageId);
+ if (messageIsForTheRightQueue)
+ {
+ metadataCounter.incrementAndGet();
+ }
+ }
+ };
+ new DatabaseTemplate(_environment, MESSAGE_META_DATA_DB_NAME, null).run(databaseOperation);
+
+ assertEquals("Unxpected number of entries in metadata db for queue " + queueName, expectedQueueSize,
+ metadataCounter.get());
+ }
+
+ private void assertContentForQueue(String queueName, int expectedQueueSize, final Set<Long> messageIdsForQueue)
+ {
+ final AtomicInteger contentCounter = new AtomicInteger();
+ final MessageContentKeyBinding keyBinding = new MessageContentKeyBinding();
+ CursorOperation cursorOperation = new CursorOperation()
+ {
+ private long _prevMsgId = -1;
+
+ @Override
+ public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
+ DatabaseEntry key, DatabaseEntry value)
+ {
+ MessageContentKey contentKey = keyBinding.entryToObject(key);
+ long msgId = contentKey.getMessageId();
+
+ if (_prevMsgId != msgId && messageIdsForQueue.contains(msgId))
+ {
+ contentCounter.incrementAndGet();
+ }
+
+ _prevMsgId = msgId;
+ }
+ };
+ new DatabaseTemplate(_environment, MESSAGE_CONTENT_DB_NAME, null).run(cursorOperation);
+
+ assertEquals("Unxpected number of entries in content db for queue " + queueName, expectedQueueSize,
+ contentCounter.get());
+ }
+
+ private void assertQueues(Set<String> expectedQueueNames)
+ {
+ List<AMQShortString> durableSubNames = new ArrayList<AMQShortString>();
+ final UpgradeFrom4To5.QueueRecordBinding binding = new UpgradeFrom4To5.QueueRecordBinding(durableSubNames);
+ final Set<String> actualQueueNames = new HashSet<String>();
+
+ CursorOperation queueNameCollector = new CursorOperation()
+ {
+
+ @Override
+ public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
+ DatabaseEntry key, DatabaseEntry value)
+ {
+ QueueRecord record = binding.entryToObject(value);
+ String queueName = record.getNameShortString().asString();
+ actualQueueNames.add(queueName);
+ }
+ };
+ new DatabaseTemplate(_environment, "queueDb_v5", null).run(queueNameCollector);
+
+ assertEquals("Unexpected queue names", expectedQueueNames, actualQueueNames);
+ }
+
+ private void assertContent()
+ {
+ final UpgradeFrom4To5.ContentBinding contentBinding = new UpgradeFrom4To5.ContentBinding();
+ CursorOperation contentCursorOperation = new CursorOperation()
+ {
+
+ @Override
+ public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, DatabaseEntry key,
+ DatabaseEntry value)
+ {
+ long id = LongBinding.entryToLong(key);
+ assertTrue("Unexpected id", id > 0);
+ ByteBuffer content = contentBinding.entryToObject(value);
+ assertNotNull("Unexpected content", content);
+ }
+ };
+ new DatabaseTemplate(_environment, MESSAGE_CONTENT_DB_NAME, null).run(contentCursorOperation);
+ }
+}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
new file mode 100644
index 0000000000..cca5923ffd
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
@@ -0,0 +1,141 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.upgrade;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CompoundKey;
+import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CompoundKeyBinding;
+import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NewDataBinding;
+
+import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.Transaction;
+
+public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase
+{
+ private static final Logger _logger = Logger.getLogger(UpgradeFrom5To6Test.class);
+
+ @Override
+ protected String getStoreDirectoryName()
+ {
+ return "bdbstore-v5";
+ }
+
+ public void testPerformUpgrade() throws Exception
+ {
+ UpgradeFrom5To6 upgrade = new UpgradeFrom5To6();
+ upgrade.performUpgrade(LOG_SUBJECT, _environment, UpgradeInteractionHandler.DEFAULT_HANDLER);
+
+ assertDatabaseRecordCounts();
+ assertContent();
+ }
+
+ public void testPerformUpgradeWithMissingMessageChunkKeepsIncompleteMessage() throws Exception
+ {
+ corruptDatabase();
+
+ UpgradeFrom5To6 upgrade = new UpgradeFrom5To6();
+ upgrade.performUpgrade(LOG_SUBJECT, _environment, new StaticAnswerHandler(UpgradeInteractionResponse.YES));
+
+ assertDatabaseRecordCounts();
+ }
+
+ public void testPerformUpgradeWithMissingMessageChunkDiscardsIncompleteMessage() throws Exception
+ {
+ corruptDatabase();
+
+ UpgradeFrom5To6 upgrade = new UpgradeFrom5To6();
+
+ UpgradeInteractionHandler discardMessageInteractionHandler = new StaticAnswerHandler(UpgradeInteractionResponse.NO);
+
+ upgrade.performUpgrade(LOG_SUBJECT, _environment, discardMessageInteractionHandler);
+
+ assertDatabaseRecordCount("MESSAGE_METADATA", 11);
+ assertDatabaseRecordCount("MESSAGE_CONTENT", 11);
+ }
+
+ /**
+ * modify the chunk offset of a message to be wrong, so we can test logic
+ * that preserves incomplete messages
+ */
+ private void corruptDatabase()
+ {
+ CursorOperation cursorOperation = new CursorOperation()
+ {
+
+ @Override
+ public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
+ DatabaseEntry key, DatabaseEntry value)
+ {
+ CompoundKeyBinding binding = new CompoundKeyBinding();
+ CompoundKey originalCompoundKey = binding.entryToObject(key);
+ int corruptedOffset = originalCompoundKey.getOffset() + 2;
+ CompoundKey corruptedCompoundKey = new CompoundKey(originalCompoundKey.getMessageId(), corruptedOffset);
+ DatabaseEntry newKey = new DatabaseEntry();
+ binding.objectToEntry(corruptedCompoundKey, newKey);
+
+ _logger.info("Deliberately corrupted message id " + originalCompoundKey.getMessageId()
+ + ", changed offset from " + originalCompoundKey.getOffset() + " to "
+ + corruptedCompoundKey.getOffset());
+
+ deleteCurrent();
+ sourceDatabase.put(transaction, newKey, value);
+
+ abort();
+ }
+ };
+
+ Transaction transaction = _environment.beginTransaction(null, null);
+ new DatabaseTemplate(_environment, "messageContentDb_v5", transaction).run(cursorOperation);
+ transaction.commit();
+ }
+
+ private void assertDatabaseRecordCounts()
+ {
+ assertDatabaseRecordCount("EXCHANGES", 5);
+ assertDatabaseRecordCount("QUEUES", 3);
+ assertDatabaseRecordCount("QUEUE_BINDINGS", 6);
+ assertDatabaseRecordCount("DELIVERIES", 12);
+
+ assertDatabaseRecordCount("MESSAGE_METADATA", 12);
+ assertDatabaseRecordCount("MESSAGE_CONTENT", 12);
+ }
+
+ private void assertContent()
+ {
+ final NewDataBinding contentBinding = new NewDataBinding();
+ CursorOperation contentCursorOperation = new CursorOperation()
+ {
+
+ @Override
+ public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, DatabaseEntry key,
+ DatabaseEntry value)
+ {
+ long id = LongBinding.entryToLong(key);
+ assertTrue("Unexpected id", id > 0);
+ byte[] content = contentBinding.entryToObject(value);
+ assertNotNull("Unexpected content", content);
+ }
+ };
+ new DatabaseTemplate(_environment, "MESSAGE_CONTENT", null).run(contentCursorOperation);
+ }
+}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java
new file mode 100644
index 0000000000..0e9d722089
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java
@@ -0,0 +1,139 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.upgrade;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.qpid.server.logging.subjects.TestBlankSubject;
+import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
+import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding;
+
+import com.sleepycat.bind.tuple.IntegerBinding;
+import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.je.Cursor;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.Transaction;
+
+public class UpgraderTest extends AbstractUpgradeTestCase
+{
+ private Upgrader _upgrader;
+
+ @Override
+ protected String getStoreDirectoryName()
+ {
+ return "bdbstore-v4";
+ }
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _upgrader = new Upgrader(_environment, new TestBlankSubject());
+ }
+
+ private int getStoreVersion()
+ {
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(true);
+ int storeVersion = -1;
+ Database versionDb = null;
+ Cursor cursor = null;
+ try
+ {
+ versionDb = _environment.openDatabase(null, Upgrader.VERSION_DB_NAME, dbConfig);
+ cursor = versionDb.openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+ while (cursor.getNext(key, value, null) == OperationStatus.SUCCESS)
+ {
+ int version = IntegerBinding.entryToInt(key);
+ if (storeVersion < version)
+ {
+ storeVersion = version;
+ }
+ }
+ }
+ finally
+ {
+ if (cursor != null)
+ {
+ cursor.close();
+ }
+ if (versionDb != null)
+ {
+ versionDb.close();
+ }
+ }
+ return storeVersion;
+ }
+
+ public void testUpgrade() throws Exception
+ {
+ assertEquals("Unexpected store version", -1, getStoreVersion());
+ _upgrader.upgradeIfNecessary();
+ assertEquals("Unexpected store version", BDBMessageStore.VERSION, getStoreVersion());
+ assertContent();
+ }
+
+ public void testEmptyDatabaseUpgradeDoesNothing() throws Exception
+ {
+ File nonExistentStoreLocation = new File(TMP_FOLDER, getName());
+ deleteDirectoryIfExists(nonExistentStoreLocation);
+
+ nonExistentStoreLocation.mkdir();
+ _environment = createEnvironment(nonExistentStoreLocation);
+ _upgrader = new Upgrader(_environment, new TestBlankSubject());
+ _upgrader.upgradeIfNecessary();
+
+ List<String> databaseNames = _environment.getDatabaseNames();
+ List<String> expectedDatabases = new ArrayList<String>();
+ expectedDatabases.add("VERSION");
+ assertEquals("Expectedonly VERSION table in initially empty store after upgrade: ", expectedDatabases, databaseNames);
+ assertEquals("Unexpected store version", BDBMessageStore.VERSION, getStoreVersion());
+
+ nonExistentStoreLocation.delete();
+ }
+
+ private void assertContent()
+ {
+ final ContentBinding contentBinding = ContentBinding.getInstance();
+ CursorOperation contentCursorOperation = new CursorOperation()
+ {
+
+ @Override
+ public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, DatabaseEntry key,
+ DatabaseEntry value)
+ {
+ long id = LongBinding.entryToLong(key);
+ assertTrue("Unexpected id", id > 0);
+ byte[] content = contentBinding.entryToObject(value);
+ assertNotNull("Unexpected content", content);
+ }
+ };
+ new DatabaseTemplate(_environment, "MESSAGE_CONTENT", null).run(contentCursorOperation);
+ }
+}
diff --git a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-to-upgrade/test-store/00000000.jdb b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v4/test-store/00000000.jdb
index 38158a55e7..167ab7f0ca 100644
--- a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-to-upgrade/test-store/00000000.jdb
+++ b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v4/test-store/00000000.jdb
Binary files differ
diff --git a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/readme.txt b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/readme.txt
new file mode 100644
index 0000000000..a7e754f967
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/readme.txt
@@ -0,0 +1,5 @@
+The bdbstore v5 data were obtained by upgrading the bdbstore v4 data as part of running
+test UpgradeFrom4to5Test#testPerformUpgradeWithHandlerAnsweringNo.
+
+The rationale for not using BDBStoreUpgradeTestPreparer in this case is that we need chunked content.
+Current implementation of BDBMessageStore only stores messages in one chunk. \ No newline at end of file
diff --git a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000000.jdb b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000000.jdb
new file mode 100644
index 0000000000..d44b21a83e
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000000.jdb
Binary files differ
diff --git a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000001.jdb b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000001.jdb
new file mode 100644
index 0000000000..9b85860c19
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000001.jdb
Binary files differ
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java
index feacf35d41..97134515a0 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java
@@ -20,11 +20,13 @@
*/
package org.apache.qpid.server.logging.actors;
-import org.apache.qpid.server.logging.LogActor;
-
import java.util.EmptyStackException;
import java.util.Stack;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogMessage;
+import org.apache.qpid.server.logging.LogSubject;
+
/**
* The CurrentActor is a ThreadLocal wrapper that allows threads in the broker
* to retrieve an actor to perform logging. This approach is used so for two
@@ -126,4 +128,14 @@ public class CurrentActor
{
_defaultActor = defaultActor;
}
+
+ public static void message(LogSubject subject, LogMessage message)
+ {
+ get().message(subject, message);
+ }
+
+ public static void message(LogMessage message)
+ {
+ get().message(message);
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/FieldTableSupport.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/FieldTableSupport.java
index 31a0440b04..bd63cdb5c5 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/FieldTableSupport.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/FieldTableSupport.java
@@ -48,9 +48,12 @@ public class FieldTableSupport
public static Map<String,Object> convertToMap(FieldTable ft)
{
Map<String,Object> map = new HashMap<String,Object>();
- for (AMQShortString key: ft.keySet() )
+ if(ft != null)
{
- map.put(key.asString(), ft.getObject(key));
+ for (AMQShortString key: ft.keySet() )
+ {
+ map.put(key.asString(), ft.getObject(key));
+ }
}
return map;
diff --git a/qpid/java/module.xml b/qpid/java/module.xml
index 3c2be75d8f..438d4fc5a7 100644
--- a/qpid/java/module.xml
+++ b/qpid/java/module.xml
@@ -48,6 +48,7 @@
<property name="module.api" location="${build.api}/${module}/"/>
<property name="module.test.api" location="${build.test.api}/${module}"/>
<property name="module.test.classes" location="${module.build}/test/classes"/>
+ <property name="module.test.resources" location="${module.build}/test/resources"/>
<property name="module.results" location="${build.results}/${module}"/>
<property name="module.failed" location="${module.results}/FAILED"/>
<property name="module.src" location="src/main/java"/>
@@ -179,6 +180,7 @@
<path refid="module.class.path"/>
<pathelement path="${module.test.depends.path}"/>
<path refid="module.test.libs"/>
+ <pathelement path="${module.test.resources}"/>
</path>
<property name="javac.deprecation" value="off"/>