summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/bdbstore/src')
-rw-r--r--qpid/java/bdbstore/src/main/java/BDBStoreUpgrade.log4j.xml52
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java59
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java48
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackup.java344
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java2124
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java1125
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingKey.java62
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java52
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DatabaseVisitor.java44
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java58
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java74
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageContentKey.java42
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java49
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_4.java44
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_5.java44
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/ExchangeRecord.java53
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java66
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/testclient/BackupTestClient.java120
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java25
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java45
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_4.java76
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_4.java47
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_5.java46
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTupleBindingFactory.java45
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_4.java162
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_5.java77
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java43
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java46
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java25
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java46
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_4.java72
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java75
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java35
-rw-r--r--qpid/java/bdbstore/src/resources/backup-log4j.xml65
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncodingTest.java88
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java470
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java232
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java540
-rw-r--r--qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-to-upgrade/test-store/00000000.jdbbin0 -> 1330321 bytes
39 files changed, 6720 insertions, 0 deletions
diff --git a/qpid/java/bdbstore/src/main/java/BDBStoreUpgrade.log4j.xml b/qpid/java/bdbstore/src/main/java/BDBStoreUpgrade.log4j.xml
new file mode 100644
index 0000000000..4d71963ea7
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/BDBStoreUpgrade.log4j.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0"?>
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements. See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+ -->
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+
+ <appender name="STDOUT" class="org.apache.log4j.ConsoleAppender">
+
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %-5p - %m%n"/>
+ </layout>
+ </appender>
+
+ <category name="org.apache.qpid.server.store.berkeleydb.BDBStoreUpgrade">
+ <priority value="info"/>
+ </category>
+
+ <!-- Only show errors from the BDB Store -->
+ <category name="org.apache.qpid.server.store.berkeleydb.berkeleydb.BDBMessageStore">
+ <priority value="error"/>
+ </category>
+
+ <!-- Provide warnings to standard output -->
+ <category name="org.apache.qpid">
+ <priority value="error"/>
+ </category>
+
+ <!-- Log all info events to file -->
+ <root>
+ <priority value="info"/>
+ <appender-ref ref="STDOUT"/>
+ </root>
+
+</log4j:configuration>
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java
new file mode 100644
index 0000000000..8b887b1876
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.qpid.framing.AMQShortString;
+
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+public class AMQShortStringEncoding
+{
+ public static AMQShortString readShortString(TupleInput tupleInput)
+ {
+ int length = (int) tupleInput.readShort();
+ if (length < 0)
+ {
+ return null;
+ }
+ else
+ {
+ byte[] stringBytes = new byte[length];
+ tupleInput.readFast(stringBytes);
+ return new AMQShortString(stringBytes);
+ }
+
+ }
+
+ public static void writeShortString(AMQShortString shortString, TupleOutput tupleOutput)
+ {
+
+ if (shortString == null)
+ {
+ tupleOutput.writeShort(-1);
+ }
+ else
+ {
+ tupleOutput.writeShort(shortString.length());
+ tupleOutput.writeFast(shortString.getBytes(), 0, shortString.length());
+ }
+ }
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java
new file mode 100644
index 0000000000..81ae315fe2
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import org.apache.log4j.Logger;
+import org.apache.qpid.framing.AMQShortString;
+
+public class AMQShortStringTB extends TupleBinding
+{
+ private static final Logger _log = Logger.getLogger(AMQShortStringTB.class);
+
+
+ public AMQShortStringTB()
+ {
+ }
+
+ public Object entryToObject(TupleInput tupleInput)
+ {
+ return AMQShortStringEncoding.readShortString(tupleInput);
+ }
+
+ public void objectToEntry(Object object, TupleOutput tupleOutput)
+ {
+ AMQShortStringEncoding.writeShortString((AMQShortString)object, tupleOutput);
+ }
+
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackup.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackup.java
new file mode 100644
index 0000000000..c515ca5d8e
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackup.java
@@ -0,0 +1,344 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.util.DbBackup;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.util.CommandLineParser;
+import org.apache.qpid.util.FileUtils;
+
+import java.io.*;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * BDBBackup is a utility for taking hot backups of the current state of a BDB transaction log database.
+ *
+ * <p/>This utility makes the following assumptions/performs the following actions:
+ *
+ * <p/><ul> <li>The from and to directory locations will already exist. This scripts does not create them. <li>If this
+ * script fails to complete in one minute it will terminate. <li>This script always exits with code 1 on error, code 0
+ * on success (standard unix convention). <li>This script will log out at info level, when it starts and ends and a list
+ * of all files backed up. <li>This script logs all errors at error level. <li>This script does not perform regular
+ * backups, wrap its calling script in a cron job or similar to do this. </ul>
+ *
+ * <p/>This utility is build around the BDB provided backup helper utility class, DbBackup. This utility class provides
+ * an ability to force BDB to stop writing to the current log file set, whilst the backup is taken, to ensure that a
+ * consistent snapshot is acquired. Preventing BDB from writing to the current log file set, does not stop BDB from
+ * continuing to run concurrently while the backup is running, it simply moves onto a new set of log files; this
+ * provides a 'hot' backup facility.
+ *
+ * <p/>DbBackup can also help with incremental backups, by providing the number of the last log file backed up.
+ * Subsequent backups can be taken, from later log files only. In a messaging application, messages are not expected to
+ * be long-lived in most cases, so the log files will usually have been completely turned over between backups. This
+ * utility does not support incremental backups for this reason.
+ *
+ * <p/>If the database is locked by BDB, as is required when using transactions, and therefore will always be the case
+ * in Qpid, this utility cannot make use of the DbBackup utility in a seperate process. DbBackup, needs to ensure that
+ * the BDB envinronment used to take the backup has exclusive write access to the log files. This utility can take a
+ * backup as a standalone utility against log files, when a broker is not running, using the {@link #takeBackup(String,
+ *String,com.sleepycat.je.Environment)} method.
+ *
+ * <p/>A seperate backup machanism is provided by the {@link #takeBackupNoLock(String,String)} method which can take a
+ * hot backup against a running broker. This works by finding out the set of files to copy, and then opening them all to
+ * read, and repeating this process until a consistent set of open files is obtained. This is done to avoid the
+ * situation where the BDB cleanup thread deletes a file, between the directory listing and opening of the file to copy.
+ * All consistently opened files are copied. This is the default mechanism the the {@link #main} method of this utility
+ * uses.
+ *
+ * <p/><table id="crc><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations <tr><td> Hot copy all
+ * BDB log files from one directory to another. </table>
+ */
+public class BDBBackup
+{
+ /** Used for debugging. */
+ private static final Logger log = Logger.getLogger(BDBBackup.class);
+
+ /** Used for communicating with the user. */
+ private static final Logger console = Logger.getLogger("Console");
+
+ /** Defines the suffix used to identify BDB log files. */
+ private static final String LOG_FILE_SUFFIX = ".jdb";
+
+ /** Defines the command line format for this utility. */
+ public static final String[][] COMMAND_LINE_SPEC =
+ new String[][]
+ {
+ { "fromdir", "The path to the directory to back the bdb log file from.", "dir", "true" },
+ { "todir", "The path to the directory to save the backed up bdb log files to.", "dir", "true" }
+ };
+
+ /** Defines the timeout to terminate the backup operation on if it fails to complete. One minte. */
+ public static final long TIMEOUT = 60000;
+
+ /**
+ * Runs a backup of the BDB log files in a specified directory, copying the backed up files to another specified
+ * directory.
+ *
+ * <p/>The following arguments must be specified:
+ *
+ * <p/><table><caption>Command Line</caption> <tr><th> Option <th> Comment <tr><td> -fromdir <td> The path to the
+ * directory to back the bdb log file from. <tr><td> -todir <td> The path to the directory to save the backed up
+ * bdb log files to. </table>
+ *
+ * @param args The command line arguments.
+ */
+ public static void main(String[] args)
+ {
+ // Process the command line using standard handling (errors and usage followed by System.exit when it is wrong).
+ Properties options =
+ CommandLineParser.processCommandLine(args, new CommandLineParser(COMMAND_LINE_SPEC), System.getProperties());
+
+ // Extract the from and to directory locations and perform a backup between them.
+ try
+ {
+ String fromDir = options.getProperty("fromdir");
+ String toDir = options.getProperty("todir");
+
+ log.info("BDBBackup Utility: Starting Hot Backup.");
+
+ BDBBackup bdbBackup = new BDBBackup();
+ String[] backedUpFiles = bdbBackup.takeBackupNoLock(fromDir, toDir);
+
+ if (log.isInfoEnabled())
+ {
+ log.info("BDBBackup Utility: Hot Backup Completed. Files backed up: " + backedUpFiles);
+ }
+ }
+ catch (Exception e)
+ {
+ console.info("Backup script encountered an error and has failed: " + e.getMessage());
+ log.error("Backup script got exception: " + e.getMessage(), e);
+ System.exit(1);
+ }
+ }
+
+ /**
+ * Creates a backup of the BDB log files in the source directory, copying them to the destination directory.
+ *
+ * @param fromdir The source directory path.
+ * @param todir The destination directory path.
+ * @param environment An open BDB environment to perform the back up.
+ *
+ * @throws DatabaseException Any underlying execeptions from BDB are allowed to fall through.
+ */
+ public void takeBackup(String fromdir, String todir, Environment environment) throws DatabaseException
+ {
+ DbBackup backupHelper = null;
+
+ try
+ {
+ backupHelper = new DbBackup(environment);
+
+ // Prevent BDB from writing to its log files while the backup it taken.
+ backupHelper.startBackup();
+
+ // Back up the BDB log files to the destination directory.
+ String[] filesForBackup = backupHelper.getLogFilesInBackupSet();
+
+ for (int i = 0; i < filesForBackup.length; i++)
+ {
+ File sourceFile = new File(fromdir + File.separator + filesForBackup[i]);
+ File destFile = new File(todir + File.separator + filesForBackup[i]);
+ FileUtils.copy(sourceFile, destFile);
+ }
+ }
+ finally
+ {
+ // Remember to exit backup mode, or all log files won't be cleaned and disk usage will bloat.
+ if (backupHelper != null)
+ {
+ backupHelper.endBackup();
+ }
+ }
+ }
+
+ /**
+ * Takes a hot backup when another process has locked the BDB database.
+ *
+ * @param fromdir The source directory path.
+ * @param todir The destination directory path.
+ *
+ * @return A list of all of the names of the files succesfully backed up.
+ */
+ public String[] takeBackupNoLock(String fromdir, String todir)
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug("public void takeBackupNoLock(String fromdir = " + fromdir + ", String todir = " + todir
+ + "): called");
+ }
+
+ File fromDirFile = new File(fromdir);
+
+ if (!fromDirFile.isDirectory())
+ {
+ throw new IllegalArgumentException("The specified fromdir(" + fromdir
+ + ") must be the directory containing your bdbstore.");
+ }
+
+ File toDirFile = new File(todir);
+
+ if (!toDirFile.exists())
+ {
+ // Create directory if it doesn't exist
+ toDirFile.mkdirs();
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("Created backup directory:" + toDirFile);
+ }
+ }
+
+ if (!toDirFile.isDirectory())
+ {
+ throw new IllegalArgumentException("The specified todir(" + todir + ") must be a directory.");
+ }
+
+ // Repeat until manage to open consistent set of files for reading.
+ boolean consistentSet = false;
+ FileInputStream[] fileInputStreams = new FileInputStream[0];
+ File[] fileSet = new File[0];
+ long start = System.currentTimeMillis();
+
+ while (!consistentSet)
+ {
+ // List all .jdb files in the directory.
+ fileSet = fromDirFile.listFiles(new FilenameFilter()
+ {
+ public boolean accept(File dir, String name)
+ {
+ return name.endsWith(LOG_FILE_SUFFIX);
+ }
+ });
+
+ // Open them all for reading.
+ fileInputStreams = new FileInputStream[fileSet.length];
+
+ if (fileSet.length == 0)
+ {
+ throw new RuntimeException("There are no BDB log files to backup in the " + fromdir + " directory.");
+ }
+
+ for (int i = 0; i < fileSet.length; i++)
+ {
+ try
+ {
+ fileInputStreams[i] = new FileInputStream(fileSet[i]);
+ }
+ catch (FileNotFoundException e)
+ {
+ // Close any files opened for reading so far.
+ for (int j = 0; j < i; j++)
+ {
+ if (fileInputStreams[j] != null)
+ {
+ try
+ {
+ fileInputStreams[j].close();
+ }
+ catch (IOException ioEx)
+ {
+ // Rethrow this as a runtime exception, as something strange has happened.
+ throw new RuntimeException(ioEx);
+ }
+ }
+ }
+
+ // Could not open a consistent file set so try again.
+ break;
+ }
+
+ // A consistent set has been opened if all files were sucesfully opened for reading.
+ if (i == (fileSet.length - 1))
+ {
+ consistentSet = true;
+ }
+ }
+
+ // Check that the script has not timed out, and raise an error if it has.
+ long now = System.currentTimeMillis();
+ if ((now - start) > TIMEOUT)
+ {
+ throw new RuntimeException("Hot backup script failed to complete in " + (TIMEOUT / 1000) + " seconds.");
+ }
+ }
+
+ // Copy the consistent set of open files.
+ List<String> backedUpFileNames = new LinkedList<String>();
+
+ for (int j = 0; j < fileSet.length; j++)
+ {
+ File destFile = new File(todir + File.separator + fileSet[j].getName());
+ try
+ {
+ FileUtils.copy(fileSet[j], destFile);
+ }
+ catch (RuntimeException re)
+ {
+ Throwable cause = re.getCause();
+ if ((cause != null) && (cause instanceof IOException))
+ {
+ throw new RuntimeException(re.getMessage() + " fromDir:" + fromdir + " toDir:" + toDirFile, cause);
+ }
+ else
+ {
+ throw re;
+ }
+ }
+
+ backedUpFileNames.add(destFile.getName());
+
+ // Close all of the files.
+ try
+ {
+ fileInputStreams[j].close();
+ }
+ catch (IOException e)
+ {
+ // Rethrow this as a runtime exception, as something strange has happened.
+ throw new RuntimeException(e);
+ }
+ }
+
+ return backedUpFileNames.toArray(new String[backedUpFileNames.size()]);
+ }
+
+ /*
+ * Creates an environment for the bdb log files in the specified directory. This envrinonment can only be used
+ * to backup these files, if they are not locked by another database instance.
+ *
+ * @param fromdir The path to the directory to create the environment for.
+ *
+ * @throws DatabaseException Any underlying exceptions from BDB are allowed to fall through.
+ */
+ private Environment createSourceDirEnvironment(String fromdir) throws DatabaseException
+ {
+ // Initialize the BDB backup utility on the source directory.
+ return new Environment(new File(fromdir), new EnvironmentConfig());
+ }
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
new file mode 100644
index 0000000000..f900159808
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -0,0 +1,2124 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.lang.ref.SoftReference;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
+import org.apache.qpid.server.logging.messages.MessageStoreMessages;
+import org.apache.qpid.server.logging.messages.TransactionLogMessages;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
+import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoredMemoryMessage;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
+import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
+import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler;
+import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_5;
+import org.apache.qpid.server.store.berkeleydb.records.ExchangeRecord;
+import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
+import org.apache.qpid.server.store.berkeleydb.tuples.BindingTupleBindingFactory;
+import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTB_5;
+import org.apache.qpid.server.store.berkeleydb.tuples.MessageMetaDataTupleBindingFactory;
+import org.apache.qpid.server.store.berkeleydb.tuples.QueueEntryTB;
+import org.apache.qpid.server.store.berkeleydb.tuples.QueueTupleBindingFactory;
+
+import com.sleepycat.bind.EntryBinding;
+import com.sleepycat.bind.tuple.ByteBinding;
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.je.CheckpointConfig;
+import com.sleepycat.je.Cursor;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.TransactionConfig;
+
+/**
+ * BDBMessageStore implements a persistent {@link MessageStore} using the BDB high performance log.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations <tr><td> Accept
+ * transaction boundary demarcations: Begin, Commit, Abort. <tr><td> Store and remove queues. <tr><td> Store and remove
+ * exchanges. <tr><td> Store and remove messages. <tr><td> Bind and unbind queues to exchanges. <tr><td> Enqueue and
+ * dequeue messages to queues. <tr><td> Generate message identifiers. </table>
+ */
+@SuppressWarnings({"unchecked"})
+public class BDBMessageStore implements MessageStore
+{
+ private static final Logger _log = Logger.getLogger(BDBMessageStore.class);
+
+ static final int DATABASE_FORMAT_VERSION = 5;
+ private static final String DATABASE_FORMAT_VERSION_PROPERTY = "version";
+ public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
+
+ private Environment _environment;
+
+ private String MESSAGEMETADATADB_NAME = "messageMetaDataDb";
+ private String MESSAGECONTENTDB_NAME = "messageContentDb";
+ private String QUEUEBINDINGSDB_NAME = "queueBindingsDb";
+ private String DELIVERYDB_NAME = "deliveryDb";
+ private String EXCHANGEDB_NAME = "exchangeDb";
+ private String QUEUEDB_NAME = "queueDb";
+ private Database _messageMetaDataDb;
+ private Database _messageContentDb;
+ private Database _queueBindingsDb;
+ private Database _deliveryDb;
+ private Database _exchangeDb;
+ private Database _queueDb;
+
+ /* =======
+ * Schema:
+ * =======
+ *
+ * Queue:
+ * name(AMQShortString) - name(AMQShortString), owner(AMQShortString),
+ * arguments(FieldTable encoded as binary), exclusive (boolean)
+ *
+ * Exchange:
+ * name(AMQShortString) - name(AMQShortString), typeName(AMQShortString), autodelete (boolean)
+ *
+ * Binding:
+ * exchangeName(AMQShortString), queueName(AMQShortString), routingKey(AMQShortString),
+ * arguments (FieldTable encoded as binary) - 0 (zero)
+ *
+ * QueueEntry:
+ * queueName(AMQShortString), messageId (long) - 0 (zero)
+ *
+ * Message (MetaData):
+ * messageId (long) - bodySize (integer), metaData (MessageMetaData encoded as binary)
+ *
+ * Message (Content):
+ * messageId (long), byteOffset (integer) - dataLength(integer), data(binary);
+ */
+
+ private LogSubject _logSubject;
+
+ private final AtomicLong _messageId = new AtomicLong(0);
+
+ private final CommitThread _commitThread = new CommitThread("Commit-Thread");
+
+ // Factory Classes to create the TupleBinding objects that reflect the version instance of this BDBStore
+ private MessageMetaDataTupleBindingFactory _metaDataTupleBindingFactory;
+ private QueueTupleBindingFactory _queueTupleBindingFactory;
+ private BindingTupleBindingFactory _bindingTupleBindingFactory;
+
+ /** The data version this store should run with */
+ private int _version;
+ private enum State
+ {
+ INITIAL,
+ CONFIGURING,
+ CONFIGURED,
+ RECOVERING,
+ STARTED,
+ CLOSING,
+ CLOSED
+ }
+
+ private State _state = State.INITIAL;
+
+ private TransactionConfig _transactionConfig = new TransactionConfig();
+
+ private boolean _readOnly = false;
+
+ private boolean _configured;
+
+
+ public BDBMessageStore()
+ {
+ this(DATABASE_FORMAT_VERSION);
+ }
+
+ public BDBMessageStore(int version)
+ {
+ _version = version;
+ }
+
+ private void setDatabaseNames(int version)
+ {
+ if (version > 1)
+ {
+ MESSAGEMETADATADB_NAME += "_v" + version;
+
+ MESSAGECONTENTDB_NAME += "_v" + version;
+
+ QUEUEDB_NAME += "_v" + version;
+
+ DELIVERYDB_NAME += "_v" + version;
+
+ EXCHANGEDB_NAME += "_v" + version;
+
+ QUEUEBINDINGSDB_NAME += "_v" + version;
+ }
+ }
+
+ public void configureConfigStore(String name,
+ ConfigurationRecoveryHandler recoveryHandler,
+ Configuration storeConfiguration,
+ LogSubject logSubject) throws Exception
+ {
+ _logSubject = logSubject;
+ CurrentActor.get().message(_logSubject, ConfigStoreMessages.CREATED(this.getClass().getName()));
+
+ if(_configured)
+ {
+ throw new Exception("ConfigStore already configured");
+ }
+
+ configure(name,storeConfiguration);
+
+ _configured = true;
+ stateTransition(State.CONFIGURING, State.CONFIGURED);
+
+ recover(recoveryHandler);
+ stateTransition(State.RECOVERING, State.STARTED);
+ }
+
+ public void configureMessageStore(String name,
+ MessageStoreRecoveryHandler recoveryHandler,
+ Configuration storeConfiguration,
+ LogSubject logSubject) throws Exception
+ {
+ CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName()));
+
+ if(!_configured)
+ {
+ throw new Exception("ConfigStore not configured");
+ }
+
+ recoverMessages(recoveryHandler);
+ }
+
+ public void configureTransactionLog(String name, TransactionLogRecoveryHandler recoveryHandler,
+ Configuration storeConfiguration, LogSubject logSubject) throws Exception
+ {
+ CurrentActor.get().message(_logSubject, TransactionLogMessages.CREATED(this.getClass().getName()));
+
+ if(!_configured)
+ {
+ throw new Exception("ConfigStore not configured");
+ }
+
+ recoverQueueEntries(recoveryHandler);
+
+
+ }
+
+ public org.apache.qpid.server.store.TransactionLog.Transaction newTransaction()
+ {
+ return new BDBTransaction();
+ }
+
+
+ /**
+ * Called after instantiation in order to configure the message store.
+ *
+ * @param name The name of the virtual host using this store
+ * @return whether a new store environment was created or not (to indicate whether recovery is necessary)
+ *
+ * @throws Exception If any error occurs that means the store is unable to configure itself.
+ */
+ public boolean configure(String name, Configuration storeConfig) throws Exception
+ {
+ File environmentPath = new File(storeConfig.getString(ENVIRONMENT_PATH_PROPERTY,
+ System.getProperty("QPID_WORK") + "/bdbstore/" + name));
+ if (!environmentPath.exists())
+ {
+ if (!environmentPath.mkdirs())
+ {
+ throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. "
+ + "Ensure the path is correct and that the permissions are correct.");
+ }
+ }
+
+ CurrentActor.get().message(_logSubject, MessageStoreMessages.STORE_LOCATION(environmentPath.getAbsolutePath()));
+
+ _version = storeConfig.getInt(DATABASE_FORMAT_VERSION_PROPERTY, DATABASE_FORMAT_VERSION);
+
+ return configure(environmentPath, false);
+ }
+
+ /**
+ * @param environmentPath location for the store to be created in/recovered from
+ * @param readonly if true then don't allow modifications to an existing store, and don't create a new store if none exists
+ * @return whether or not a new store environment was created
+ * @throws AMQStoreException
+ * @throws DatabaseException
+ */
+ protected boolean configure(File environmentPath, boolean readonly) throws AMQStoreException, DatabaseException
+ {
+ _readOnly = readonly;
+ stateTransition(State.INITIAL, State.CONFIGURING);
+
+ _log.info("Configuring BDB message store");
+
+ createTupleBindingFactories(_version);
+
+ setDatabaseNames(_version);
+
+ return setupStore(environmentPath, readonly);
+ }
+
+ private void createTupleBindingFactories(int version)
+ {
+ _bindingTupleBindingFactory = new BindingTupleBindingFactory(version);
+ _queueTupleBindingFactory = new QueueTupleBindingFactory(version);
+ _metaDataTupleBindingFactory = new MessageMetaDataTupleBindingFactory(version);
+ }
+
+ /**
+ * Move the store state from CONFIGURING to STARTED.
+ *
+ * This is required if you do not want to perform recovery of the store data
+ *
+ * @throws AMQStoreException if the store is not in the correct state
+ */
+ public void start() throws AMQStoreException
+ {
+ stateTransition(State.CONFIGURING, State.STARTED);
+ }
+
+ private boolean setupStore(File storePath, boolean readonly) throws DatabaseException, AMQStoreException
+ {
+ checkState(State.CONFIGURING);
+
+ boolean newEnvironment = createEnvironment(storePath, readonly);
+
+ verifyVersionByTables();
+
+ openDatabases(readonly);
+
+ if (!readonly)
+ {
+ _commitThread.start();
+ }
+
+ return newEnvironment;
+ }
+
+ private void verifyVersionByTables() throws DatabaseException
+ {
+ for (String s : _environment.getDatabaseNames())
+ {
+ int versionIndex = s.indexOf("_v");
+
+ // lack of _v index suggests DB is v1
+ // so if _version is not v1 then error
+ if (versionIndex == -1)
+ {
+ if (_version != 1)
+ {
+ closeEnvironment();
+ throw new IllegalArgumentException("Error: Unable to load BDBStore as version " + _version
+ + ". Store on disk contains version 1 data.");
+ }
+ else // DB is v1 and _version is v1
+ {
+ continue;
+ }
+ }
+
+ // Otherwise Check Versions
+ int version = Integer.parseInt(s.substring(versionIndex + 2));
+
+ if (version != _version)
+ {
+ closeEnvironment();
+ throw new IllegalArgumentException("Error: Unable to load BDBStore as version " + _version
+ + ". Store on disk contains version " + version + " data.");
+ }
+ }
+ }
+
+ private synchronized void stateTransition(State requiredState, State newState) throws AMQStoreException
+ {
+ if (_state != requiredState)
+ {
+ throw new AMQStoreException("Cannot transition to the state: " + newState + "; need to be in state: " + requiredState
+ + "; currently in state: " + _state);
+ }
+
+ _state = newState;
+ }
+
+ private void checkState(State requiredState) throws AMQStoreException
+ {
+ if (_state != requiredState)
+ {
+ throw new AMQStoreException("Unexpected state: " + _state + "; required state: " + requiredState);
+ }
+ }
+
+ private boolean createEnvironment(File environmentPath, boolean readonly) throws DatabaseException
+ {
+ _log.info("BDB message store using environment path " + environmentPath.getAbsolutePath());
+ EnvironmentConfig envConfig = new EnvironmentConfig();
+ // This is what allows the creation of the store if it does not already exist.
+ envConfig.setAllowCreate(true);
+ envConfig.setTransactional(true);
+ envConfig.setConfigParam("je.lock.nLockTables", "7");
+
+ // Restore 500,000 default timeout.
+ //envConfig.setLockTimeout(15000);
+
+ // Added to help diagnosis of Deadlock issue
+ // http://www.oracle.com/technology/products/berkeley-db/faq/je_faq.html#23
+ if (Boolean.getBoolean("qpid.bdb.lock.debug"))
+ {
+ envConfig.setConfigParam("je.txn.deadlockStackTrace", "true");
+ envConfig.setConfigParam("je.txn.dumpLocks", "true");
+ }
+
+ // Set transaction mode
+ _transactionConfig.setReadCommitted(true);
+
+ //This prevents background threads running which will potentially update the store.
+ envConfig.setReadOnly(readonly);
+ try
+ {
+ _environment = new Environment(environmentPath, envConfig);
+ return false;
+ }
+ catch (DatabaseException de)
+ {
+ if (de.getMessage().contains("Environment.setAllowCreate is false"))
+ {
+ //Allow the creation this time
+ envConfig.setAllowCreate(true);
+ if (_environment != null )
+ {
+ _environment.cleanLog();
+ _environment.close();
+ }
+ _environment = new Environment(environmentPath, envConfig);
+
+ return true;
+ }
+ else
+ {
+ throw de;
+ }
+ }
+ }
+
+ private void openDatabases(boolean readonly) throws DatabaseException
+ {
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(true);
+
+ //This is required if we are wanting read only access.
+ dbConfig.setReadOnly(readonly);
+
+ _messageMetaDataDb = _environment.openDatabase(null, MESSAGEMETADATADB_NAME, dbConfig);
+ _queueDb = _environment.openDatabase(null, QUEUEDB_NAME, dbConfig);
+ _exchangeDb = _environment.openDatabase(null, EXCHANGEDB_NAME, dbConfig);
+ _queueBindingsDb = _environment.openDatabase(null, QUEUEBINDINGSDB_NAME, dbConfig);
+ _messageContentDb = _environment.openDatabase(null, MESSAGECONTENTDB_NAME, dbConfig);
+ _deliveryDb = _environment.openDatabase(null, DELIVERYDB_NAME, dbConfig);
+
+ }
+
+ /**
+ * Called to close and cleanup any resources used by the message store.
+ *
+ * @throws Exception If the close fails.
+ */
+ public void close() throws Exception
+ {
+ if (_state != State.STARTED)
+ {
+ return;
+ }
+
+ _state = State.CLOSING;
+
+ _commitThread.close();
+ _commitThread.join();
+
+ if (_messageMetaDataDb != null)
+ {
+ _log.info("Closing message metadata database");
+ _messageMetaDataDb.close();
+ }
+
+ if (_messageContentDb != null)
+ {
+ _log.info("Closing message content database");
+ _messageContentDb.close();
+ }
+
+ if (_exchangeDb != null)
+ {
+ _log.info("Closing exchange database");
+ _exchangeDb.close();
+ }
+
+ if (_queueBindingsDb != null)
+ {
+ _log.info("Closing bindings database");
+ _queueBindingsDb.close();
+ }
+
+ if (_queueDb != null)
+ {
+ _log.info("Closing queue database");
+ _queueDb.close();
+ }
+
+ if (_deliveryDb != null)
+ {
+ _log.info("Close delivery database");
+ _deliveryDb.close();
+ }
+
+ closeEnvironment();
+
+ _state = State.CLOSED;
+
+ CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED());
+ }
+
+ private void closeEnvironment() throws DatabaseException
+ {
+ if (_environment != null)
+ {
+ if(!_readOnly)
+ {
+ // Clean the log before closing. This makes sure it doesn't contain
+ // redundant data. Closing without doing this means the cleaner may not
+ // get a chance to finish.
+ _environment.cleanLog();
+ }
+ _environment.close();
+ }
+ }
+
+
+ public void recover(ConfigurationRecoveryHandler recoveryHandler) throws AMQStoreException
+ {
+ stateTransition(State.CONFIGURED, State.RECOVERING);
+
+ CurrentActor.get().message(_logSubject,MessageStoreMessages.RECOVERY_START());
+
+ try
+ {
+ QueueRecoveryHandler qrh = recoveryHandler.begin(this);
+ loadQueues(qrh);
+
+ ExchangeRecoveryHandler erh = qrh.completeQueueRecovery();
+ loadExchanges(erh);
+
+ BindingRecoveryHandler brh = erh.completeExchangeRecovery();
+ recoverBindings(brh);
+
+ brh.completeBindingRecovery();
+ }
+ catch (DatabaseException e)
+ {
+ throw new AMQStoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+
+ }
+
+ private void loadQueues(QueueRecoveryHandler qrh) throws DatabaseException
+ {
+ Cursor cursor = null;
+
+ try
+ {
+ cursor = _queueDb.openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+ TupleBinding binding = _queueTupleBindingFactory.getInstance();
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ QueueRecord queueRecord = (QueueRecord) binding.entryToObject(value);
+
+ String queueName = queueRecord.getNameShortString() == null ? null :
+ queueRecord.getNameShortString().asString();
+ String owner = queueRecord.getOwner() == null ? null :
+ queueRecord.getOwner().asString();
+ boolean exclusive = queueRecord.isExclusive();
+
+ FieldTable arguments = queueRecord.getArguments();
+
+ qrh.queue(queueName, owner, exclusive, arguments);
+ }
+
+ }
+ finally
+ {
+ if (cursor != null)
+ {
+ cursor.close();
+ }
+ }
+ }
+
+
+ private void loadExchanges(ExchangeRecoveryHandler erh) throws DatabaseException
+ {
+ Cursor cursor = null;
+
+ try
+ {
+ cursor = _exchangeDb.openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+ TupleBinding binding = new ExchangeTB();
+
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ ExchangeRecord exchangeRec = (ExchangeRecord) binding.entryToObject(value);
+
+ String exchangeName = exchangeRec.getNameShortString() == null ? null :
+ exchangeRec.getNameShortString().asString();
+ String type = exchangeRec.getType() == null ? null :
+ exchangeRec.getType().asString();
+ boolean autoDelete = exchangeRec.isAutoDelete();
+
+ erh.exchange(exchangeName, type, autoDelete);
+ }
+ }
+ finally
+ {
+ if (cursor != null)
+ {
+ cursor.close();
+ }
+ }
+
+ }
+
+ private void recoverBindings(BindingRecoveryHandler brh) throws DatabaseException
+ {
+ Cursor cursor = null;
+ try
+ {
+ cursor = _queueBindingsDb.openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+ TupleBinding binding = _bindingTupleBindingFactory.getInstance();
+
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ //yes, this is retrieving all the useful information from the key only.
+ //For table compatibility it shall currently be left as is
+ BindingKey bindingRecord = (BindingKey) binding.entryToObject(key);
+
+ String exchangeName = bindingRecord.getExchangeName() == null ? null :
+ bindingRecord.getExchangeName().asString();
+ String queueName = bindingRecord.getQueueName() == null ? null :
+ bindingRecord.getQueueName().asString();
+ String routingKey = bindingRecord.getRoutingKey() == null ? null :
+ bindingRecord.getRoutingKey().asString();
+ ByteBuffer argumentsBB = (bindingRecord.getArguments() == null ? null :
+ java.nio.ByteBuffer.wrap(bindingRecord.getArguments().getDataAsBytes()));
+
+ brh.binding(exchangeName, queueName, routingKey, argumentsBB);
+ }
+ }
+ finally
+ {
+ if (cursor != null)
+ {
+ cursor.close();
+ }
+ }
+
+ }
+
+ private void recoverMessages(MessageStoreRecoveryHandler msrh) throws DatabaseException
+ {
+ StoredMessageRecoveryHandler mrh = msrh.begin();
+
+ Cursor cursor = null;
+ try
+ {
+ cursor = _messageMetaDataDb.openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);;
+
+ DatabaseEntry value = new DatabaseEntry();
+ EntryBinding valueBinding = _metaDataTupleBindingFactory.getInstance();
+
+ long maxId = 0;
+
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ long messageId = (Long) keyBinding.entryToObject(key);
+ StorableMessageMetaData metaData = (StorableMessageMetaData) valueBinding.entryToObject(value);
+
+ StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, false);
+ mrh.message(message);
+
+ maxId = Math.max(maxId, messageId);
+ }
+
+ _messageId.set(maxId);
+ }
+ catch (DatabaseException e)
+ {
+ _log.error("Database Error: " + e.getMessage(), e);
+ throw e;
+ }
+ finally
+ {
+ if (cursor != null)
+ {
+ cursor.close();
+ }
+ }
+ }
+
+ private void recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler)
+ throws DatabaseException
+ {
+ QueueEntryRecoveryHandler qerh = recoveryHandler.begin(this);
+
+ ArrayList<QueueEntryKey> entries = new ArrayList<QueueEntryKey>();
+
+ Cursor cursor = null;
+ try
+ {
+ cursor = _deliveryDb.openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ EntryBinding keyBinding = new QueueEntryTB();
+
+ DatabaseEntry value = new DatabaseEntry();
+
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ QueueEntryKey qek = (QueueEntryKey) keyBinding.entryToObject(key);
+
+ entries.add(qek);
+ }
+
+ try
+ {
+ cursor.close();
+ }
+ finally
+ {
+ cursor = null;
+ }
+
+ for(QueueEntryKey entry : entries)
+ {
+ AMQShortString queueName = entry.getQueueName();
+ long messageId = entry.getMessageId();
+
+ qerh.queueEntry(queueName.asString(),messageId);
+ }
+ }
+ catch (DatabaseException e)
+ {
+ _log.error("Database Error: " + e.getMessage(), e);
+ throw e;
+ }
+ finally
+ {
+ if (cursor != null)
+ {
+ cursor.close();
+ }
+ }
+
+ qerh.completeQueueEntryRecovery();
+ }
+
+ /**
+ * Removes the specified message from the store.
+ *
+ * @param messageId Identifies the message to remove.
+ *
+ * @throws AMQInternalException If the operation fails for any reason.
+ */
+ public void removeMessage(Long messageId) throws AMQStoreException
+ {
+ // _log.debug("public void removeMessage(Long messageId = " + messageId): called");
+
+ com.sleepycat.je.Transaction tx = null;
+
+ Cursor cursor = null;
+ try
+ {
+ tx = _environment.beginTransaction(null, null);
+
+ //remove the message meta data from the store
+ DatabaseEntry key = new DatabaseEntry();
+ EntryBinding metaKeyBindingTuple = TupleBinding.getPrimitiveBinding(Long.class);
+ metaKeyBindingTuple.objectToEntry(messageId, key);
+
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Removing message id " + messageId);
+ }
+
+
+ OperationStatus status = _messageMetaDataDb.delete(tx, key);
+ if (status == OperationStatus.NOTFOUND)
+ {
+ tx.abort();
+
+ throw new AMQStoreException("Message metadata not found for message id " + messageId);
+ }
+
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Deleted metadata for message " + messageId);
+ }
+
+ //now remove the content data from the store if there is any.
+
+ DatabaseEntry contentKeyEntry = new DatabaseEntry();
+ MessageContentKey_5 mck = new MessageContentKey_5(messageId,0);
+
+ TupleBinding<MessageContentKey> contentKeyTupleBinding = new MessageContentKeyTB_5();
+ contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry);
+
+ //Use a partial record for the value to prevent retrieving the
+ //data itself as we only need the key to identify what to remove.
+ DatabaseEntry value = new DatabaseEntry();
+ value.setPartial(0, 0, true);
+
+ cursor = _messageContentDb.openCursor(tx, null);
+
+ status = cursor.getSearchKeyRange(contentKeyEntry, value, LockMode.RMW);
+ while (status == OperationStatus.SUCCESS)
+ {
+ mck = (MessageContentKey_5) contentKeyTupleBinding.entryToObject(contentKeyEntry);
+
+ if(mck.getMessageId() != messageId)
+ {
+ //we have exhausted all chunks for this message id, break
+ break;
+ }
+ else
+ {
+ status = cursor.delete();
+
+ if(status == OperationStatus.NOTFOUND)
+ {
+ cursor.close();
+ cursor = null;
+
+ tx.abort();
+ throw new AMQStoreException("Content chunk offset" + mck.getOffset() + " not found for message " + messageId);
+ }
+
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Deleted content chunk offset " + mck.getOffset() + " for message " + messageId);
+ }
+ }
+
+ status = cursor.getNext(contentKeyEntry, value, LockMode.RMW);
+ }
+
+ cursor.close();
+ cursor = null;
+
+ commit(tx, true);
+ }
+ catch (DatabaseException e)
+ {
+ e.printStackTrace();
+
+ if (tx != null)
+ {
+ try
+ {
+ if(cursor != null)
+ {
+ cursor.close();
+ cursor = null;
+ }
+
+ tx.abort();
+ }
+ catch (DatabaseException e1)
+ {
+ throw new AMQStoreException("Error aborting transaction " + e1, e1);
+ }
+ }
+
+ throw new AMQStoreException("Error removing message with id " + messageId + " from database: " + e.getMessage(), e);
+ }
+ finally
+ {
+ if(cursor != null)
+ {
+ try
+ {
+ cursor.close();
+ }
+ catch (DatabaseException e)
+ {
+ throw new AMQStoreException("Error closing database connection: " + e.getMessage(), e);
+ }
+ }
+ }
+ }
+
+ /**
+ * @see DurableConfigurationStore#createExchange(Exchange)
+ */
+ public void createExchange(Exchange exchange) throws AMQStoreException
+ {
+ if (_state != State.RECOVERING)
+ {
+ ExchangeRecord exchangeRec = new ExchangeRecord(exchange.getNameShortString(),
+ exchange.getTypeShortString(), exchange.isAutoDelete());
+
+ DatabaseEntry key = new DatabaseEntry();
+ EntryBinding keyBinding = new AMQShortStringTB();
+ keyBinding.objectToEntry(exchange.getNameShortString(), key);
+
+ DatabaseEntry value = new DatabaseEntry();
+ TupleBinding exchangeBinding = new ExchangeTB();
+ exchangeBinding.objectToEntry(exchangeRec, value);
+
+ try
+ {
+ _exchangeDb.put(null, key, value);
+ }
+ catch (DatabaseException e)
+ {
+ throw new AMQStoreException("Error writing Exchange with name " + exchange.getName() + " to database: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ /**
+ * @see DurableConfigurationStore#removeExchange(Exchange)
+ */
+ public void removeExchange(Exchange exchange) throws AMQStoreException
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ EntryBinding keyBinding = new AMQShortStringTB();
+ keyBinding.objectToEntry(exchange.getNameShortString(), key);
+ try
+ {
+ OperationStatus status = _exchangeDb.delete(null, key);
+ if (status == OperationStatus.NOTFOUND)
+ {
+ throw new AMQStoreException("Exchange " + exchange.getName() + " not found");
+ }
+ }
+ catch (DatabaseException e)
+ {
+ throw new AMQStoreException("Error writing deleting with name " + exchange.getName() + " from database: " + e.getMessage(), e);
+ }
+ }
+
+
+
+
+ /**
+ * @see DurableConfigurationStore#bindQueue(Exchange, AMQShortString, AMQQueue, FieldTable)
+ */
+ public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
+ {
+ // _log.debug("public void bindQueue(Exchange exchange = " + exchange + ", AMQShortString routingKey = " + routingKey
+ // + ", AMQQueue queue = " + queue + ", FieldTable args = " + args + "): called");
+
+ if (_state != State.RECOVERING)
+ {
+ BindingKey bindingRecord = new BindingKey(exchange.getNameShortString(),
+ queue.getNameShortString(), routingKey, args);
+
+ DatabaseEntry key = new DatabaseEntry();
+ EntryBinding keyBinding = _bindingTupleBindingFactory.getInstance();
+
+ keyBinding.objectToEntry(bindingRecord, key);
+
+ //yes, this is writing out 0 as a value and putting all the
+ //useful info into the key, don't ask me why. For table
+ //compatibility it shall currently be left as is
+ DatabaseEntry value = new DatabaseEntry();
+ ByteBinding.byteToEntry((byte) 0, value);
+
+ try
+ {
+ _queueBindingsDb.put(null, key, value);
+ }
+ catch (DatabaseException e)
+ {
+ throw new AMQStoreException("Error writing binding for AMQQueue with name " + queue.getName() + " to exchange "
+ + exchange.getName() + " to database: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ /**
+ * @see DurableConfigurationStore#unbindQueue(Exchange, AMQShortString, AMQQueue, FieldTable)
+ */
+ public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args)
+ throws AMQStoreException
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ EntryBinding keyBinding = _bindingTupleBindingFactory.getInstance();
+ keyBinding.objectToEntry(new BindingKey(exchange.getNameShortString(), queue.getNameShortString(), routingKey, args), key);
+
+ try
+ {
+ OperationStatus status = _queueBindingsDb.delete(null, key);
+ if (status == OperationStatus.NOTFOUND)
+ {
+ throw new AMQStoreException("Queue binding for queue with name " + queue.getName() + " to exchange "
+ + exchange.getName() + " not found");
+ }
+ }
+ catch (DatabaseException e)
+ {
+ throw new AMQStoreException("Error deleting queue binding for queue with name " + queue.getName() + " to exchange "
+ + exchange.getName() + " from database: " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * @see DurableConfigurationStore#createQueue(AMQQueue)
+ */
+ public void createQueue(AMQQueue queue) throws AMQStoreException
+ {
+ createQueue(queue, null);
+ }
+
+ /**
+ * @see DurableConfigurationStore#createQueue(AMQQueue, FieldTable)
+ */
+ public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("public void createQueue(AMQQueue queue(" + queue.getName() + ") = " + queue + "): called");
+ }
+
+ QueueRecord queueRecord= new QueueRecord(queue.getNameShortString(),
+ queue.getOwner(), queue.isExclusive(), arguments);
+
+ createQueue(queueRecord);
+ }
+
+ /**
+ * Makes the specified queue persistent.
+ *
+ * Only intended for direct use during store upgrades.
+ *
+ * @param queueRecord Details of the queue to store.
+ *
+ * @throws AMQStoreException If the operation fails for any reason.
+ */
+ protected void createQueue(QueueRecord queueRecord) throws AMQStoreException
+ {
+ if (_state != State.RECOVERING)
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ EntryBinding keyBinding = new AMQShortStringTB();
+ keyBinding.objectToEntry(queueRecord.getNameShortString(), key);
+
+ DatabaseEntry value = new DatabaseEntry();
+ TupleBinding queueBinding = _queueTupleBindingFactory.getInstance();
+
+ queueBinding.objectToEntry(queueRecord, value);
+ try
+ {
+ _queueDb.put(null, key, value);
+ }
+ catch (DatabaseException e)
+ {
+ throw new AMQStoreException("Error writing AMQQueue with name " + queueRecord.getNameShortString().asString()
+ + " to database: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ /**
+ * Updates the specified queue in the persistent store, IF it is already present. If the queue
+ * is not present in the store, it will not be added.
+ *
+ * NOTE: Currently only updates the exclusivity.
+ *
+ * @param queue The queue to update the entry for.
+ * @throws AMQStoreException If the operation fails for any reason.
+ */
+ public void updateQueue(final AMQQueue queue) throws AMQStoreException
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Updating queue: " + queue.getName());
+ }
+
+ try
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ EntryBinding keyBinding = new AMQShortStringTB();
+ keyBinding.objectToEntry(queue.getNameShortString(), key);
+
+ DatabaseEntry value = new DatabaseEntry();
+ DatabaseEntry newValue = new DatabaseEntry();
+ TupleBinding queueBinding = _queueTupleBindingFactory.getInstance();
+
+ OperationStatus status = _queueDb.get(null, key, value, LockMode.DEFAULT);
+ if(status == OperationStatus.SUCCESS)
+ {
+ //read the existing record and apply the new exclusivity setting
+ QueueRecord queueRecord = (QueueRecord) queueBinding.entryToObject(value);
+ queueRecord.setExclusive(queue.isExclusive());
+
+ //write the updated entry to the store
+ queueBinding.objectToEntry(queueRecord, newValue);
+
+ _queueDb.put(null, key, newValue);
+ }
+ else if(status != OperationStatus.NOTFOUND)
+ {
+ throw new AMQStoreException("Error updating queue details within the store: " + status);
+ }
+ }
+ catch (DatabaseException e)
+ {
+ throw new AMQStoreException("Error updating queue details within the store: " + e,e);
+ }
+ }
+
+ /**
+ * Removes the specified queue from the persistent store.
+ *
+ * @param queue The queue to remove.
+ *
+ * @throws AMQStoreException If the operation fails for any reason.
+ */
+ public void removeQueue(final AMQQueue queue) throws AMQStoreException
+ {
+ AMQShortString name = queue.getNameShortString();
+
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("public void removeQueue(AMQShortString name = " + name + "): called");
+ }
+
+ DatabaseEntry key = new DatabaseEntry();
+ EntryBinding keyBinding = new AMQShortStringTB();
+ keyBinding.objectToEntry(name, key);
+ try
+ {
+ OperationStatus status = _queueDb.delete(null, key);
+ if (status == OperationStatus.NOTFOUND)
+ {
+ throw new AMQStoreException("Queue " + name + " not found");
+ }
+ }
+ catch (DatabaseException e)
+ {
+ throw new AMQStoreException("Error writing deleting with name " + name + " from database: " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Places a message onto a specified queue, in a given transaction.
+ *
+ * @param tx The transaction for the operation.
+ * @param queue The the queue to place the message on.
+ * @param messageId The message to enqueue.
+ *
+ * @throws AMQStoreException If the operation fails for any reason.
+ */
+ public void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, Long messageId) throws AMQStoreException
+ {
+ // _log.debug("public void enqueueMessage(Transaction tx = " + tx + ", AMQShortString name = " + name + ", Long messageId): called");
+
+ AMQShortString name = new AMQShortString(queue.getResourceName());
+
+ DatabaseEntry key = new DatabaseEntry();
+ EntryBinding keyBinding = new QueueEntryTB();
+ QueueEntryKey dd = new QueueEntryKey(name, messageId);
+ keyBinding.objectToEntry(dd, key);
+ DatabaseEntry value = new DatabaseEntry();
+ ByteBinding.byteToEntry((byte) 0, value);
+
+ try
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Enqueuing message " + messageId + " on queue " + name + " [Transaction" + tx + "]");
+ }
+ _deliveryDb.put(tx, key, value);
+ }
+ catch (DatabaseException e)
+ {
+ _log.error("Failed to enqueue: " + e.getMessage(), e);
+ throw new AMQStoreException("Error writing enqueued message with id " + messageId + " for queue " + name
+ + " to database", e);
+ }
+ }
+
+ /**
+ * Extracts a message from a specified queue, in a given transaction.
+ *
+ * @param tx The transaction for the operation.
+ * @param queue The name queue to take the message from.
+ * @param messageId The message to dequeue.
+ *
+ * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
+ */
+ public void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, Long messageId) throws AMQStoreException
+ {
+ AMQShortString name = new AMQShortString(queue.getResourceName());
+
+ DatabaseEntry key = new DatabaseEntry();
+ EntryBinding keyBinding = new QueueEntryTB();
+ QueueEntryKey dd = new QueueEntryKey(name, messageId);
+
+ keyBinding.objectToEntry(dd, key);
+
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Dequeue message id " + messageId);
+ }
+
+ try
+ {
+
+ OperationStatus status = _deliveryDb.delete(tx, key);
+ if (status == OperationStatus.NOTFOUND)
+ {
+ throw new AMQStoreException("Unable to find message with id " + messageId + " on queue " + name);
+ }
+ else if (status != OperationStatus.SUCCESS)
+ {
+ throw new AMQStoreException("Unable to remove message with id " + messageId + " on queue " + name);
+ }
+
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Removed message " + messageId + ", " + name + " from delivery db");
+
+ }
+ }
+ catch (DatabaseException e)
+ {
+
+ _log.error("Failed to dequeue message " + messageId + ": " + e.getMessage(), e);
+ _log.error(tx);
+
+ throw new AMQStoreException("Error accessing database while dequeuing message: " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Commits all operations performed within a given transaction.
+ *
+ * @param tx The transaction to commit all operations for.
+ *
+ * @throws AMQStoreException If the operation fails for any reason.
+ */
+ private StoreFuture commitTranImpl(final com.sleepycat.je.Transaction tx, boolean syncCommit) throws AMQStoreException
+ {
+ //if (_log.isDebugEnabled())
+ //{
+ // _log.debug("public void commitTranImpl() called with (Transaction=" + tx + ", syncCommit= "+ syncCommit + ")");
+ //}
+
+ if (tx == null)
+ {
+ throw new AMQStoreException("Fatal internal error: transactional is null at commitTran");
+ }
+
+ StoreFuture result;
+ try
+ {
+ result = commit(tx, syncCommit);
+
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("commitTranImpl completed for [Transaction:" + tx + "]");
+ }
+ }
+ catch (DatabaseException e)
+ {
+ throw new AMQStoreException("Error commit tx: " + e.getMessage(), e);
+ }
+
+ return result;
+ }
+
+ /**
+ * Abandons all operations performed within a given transaction.
+ *
+ * @param tx The transaction to abandon.
+ *
+ * @throws AMQStoreException If the operation fails for any reason.
+ */
+ public void abortTran(final com.sleepycat.je.Transaction tx) throws AMQStoreException
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("abortTran called for [Transaction:" + tx + "]");
+ }
+
+ try
+ {
+ tx.abort();
+ }
+ catch (DatabaseException e)
+ {
+ throw new AMQStoreException("Error aborting transaction: " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Primarily for testing purposes.
+ *
+ * @param queueName
+ *
+ * @return a list of message ids for messages enqueued for a particular queue
+ */
+ List<Long> getEnqueuedMessages(AMQShortString queueName) throws AMQStoreException
+ {
+ Cursor cursor = null;
+ try
+ {
+ cursor = _deliveryDb.openCursor(null, null);
+
+ DatabaseEntry key = new DatabaseEntry();
+
+ QueueEntryKey dd = new QueueEntryKey(queueName, 0);
+
+ EntryBinding keyBinding = new QueueEntryTB();
+ keyBinding.objectToEntry(dd, key);
+
+ DatabaseEntry value = new DatabaseEntry();
+
+ LinkedList<Long> messageIds = new LinkedList<Long>();
+
+ OperationStatus status = cursor.getSearchKeyRange(key, value, LockMode.DEFAULT);
+ dd = (QueueEntryKey) keyBinding.entryToObject(key);
+
+ while ((status == OperationStatus.SUCCESS) && dd.getQueueName().equals(queueName))
+ {
+
+ messageIds.add(dd.getMessageId());
+ status = cursor.getNext(key, value, LockMode.DEFAULT);
+ if (status == OperationStatus.SUCCESS)
+ {
+ dd = (QueueEntryKey) keyBinding.entryToObject(key);
+ }
+ }
+
+ return messageIds;
+ }
+ catch (DatabaseException e)
+ {
+ throw new AMQStoreException("Database error: " + e.getMessage(), e);
+ }
+ finally
+ {
+ if (cursor != null)
+ {
+ try
+ {
+ cursor.close();
+ }
+ catch (DatabaseException e)
+ {
+ throw new AMQStoreException("Error closing cursor: " + e.getMessage(), e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Return a valid, currently unused message id.
+ *
+ * @return A fresh message id.
+ */
+ public Long getNewMessageId()
+ {
+ return _messageId.incrementAndGet();
+ }
+
+ /**
+ * Stores a chunk of message data.
+ *
+ * @param tx The transaction for the operation.
+ * @param messageId The message to store the data for.
+ * @param offset The offset of the data chunk in the message.
+ * @param contentBody The content of the data chunk.
+ *
+ * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
+ */
+ protected void addContent(final com.sleepycat.je.Transaction tx, Long messageId, int offset,
+ ByteBuffer contentBody) throws AMQStoreException
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ TupleBinding<MessageContentKey> keyBinding = new MessageContentKeyTB_5();
+ keyBinding.objectToEntry(new MessageContentKey_5(messageId, offset), key);
+ DatabaseEntry value = new DatabaseEntry();
+ TupleBinding<ByteBuffer> messageBinding = new ContentTB();
+ messageBinding.objectToEntry(contentBody, value);
+ try
+ {
+ OperationStatus status = _messageContentDb.put(tx, key, value);
+ if (status != OperationStatus.SUCCESS)
+ {
+ throw new AMQStoreException("Error adding content chunk offset" + offset + " for message id " + messageId + ": "
+ + status);
+ }
+
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Storing content chunk offset" + offset + " for message " + messageId + "[Transaction" + tx + "]");
+ }
+ }
+ catch (DatabaseException e)
+ {
+ throw new AMQStoreException("Error writing AMQMessage with id " + messageId + " to database: " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Stores message meta-data.
+ *
+ * @param tx The transaction for the operation.
+ * @param messageId The message to store the data for.
+ * @param messageMetaData The message meta data to store.
+ *
+ * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
+ */
+ private void storeMetaData(final com.sleepycat.je.Transaction tx, Long messageId, StorableMessageMetaData messageMetaData)
+ throws AMQStoreException
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("public void storeMetaData(Txn tx = " + tx + ", Long messageId = "
+ + messageId + ", MessageMetaData messageMetaData = " + messageMetaData + "): called");
+ }
+
+ DatabaseEntry key = new DatabaseEntry();
+ EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);
+ keyBinding.objectToEntry(messageId, key);
+ DatabaseEntry value = new DatabaseEntry();
+
+ TupleBinding messageBinding = _metaDataTupleBindingFactory.getInstance();
+ messageBinding.objectToEntry(messageMetaData, value);
+ try
+ {
+ _messageMetaDataDb.put(tx, key, value);
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Storing message metadata for message id " + messageId + "[Transaction" + tx + "]");
+ }
+ }
+ catch (DatabaseException e)
+ {
+ throw new AMQStoreException("Error writing message metadata with id " + messageId + " to database: " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Retrieves message meta-data.
+ *
+ * @param messageId The message to get the meta-data for.
+ *
+ * @return The message meta data.
+ *
+ * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
+ */
+ public StorableMessageMetaData getMessageMetaData(Long messageId) throws AMQStoreException
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("public MessageMetaData getMessageMetaData(Long messageId = "
+ + messageId + "): called");
+ }
+
+ DatabaseEntry key = new DatabaseEntry();
+ EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);
+ keyBinding.objectToEntry(messageId, key);
+ DatabaseEntry value = new DatabaseEntry();
+ TupleBinding messageBinding = _metaDataTupleBindingFactory.getInstance();
+
+ try
+ {
+ OperationStatus status = _messageMetaDataDb.get(null, key, value, LockMode.READ_UNCOMMITTED);
+ if (status != OperationStatus.SUCCESS)
+ {
+ throw new AMQStoreException("Metadata not found for message with id " + messageId);
+ }
+
+ StorableMessageMetaData mdd = (StorableMessageMetaData) messageBinding.entryToObject(value);
+
+ return mdd;
+ }
+ catch (DatabaseException e)
+ {
+ throw new AMQStoreException("Error reading message metadata for message with id " + messageId + ": " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Fills the provided ByteBuffer with as much content for the specified message as possible, starting
+ * from the specified offset in the message.
+ *
+ * @param messageId The message to get the data for.
+ * @param offset The offset of the data within the message.
+ * @param dst The destination of the content read back
+ *
+ * @return The number of bytes inserted into the destination
+ *
+ * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
+ */
+ public int getContent(Long messageId, int offset, ByteBuffer dst) throws AMQStoreException
+ {
+ DatabaseEntry contentKeyEntry = new DatabaseEntry();
+
+ //Start from 0 offset and search for the starting chunk.
+ MessageContentKey_5 mck = new MessageContentKey_5(messageId, 0);
+ TupleBinding<MessageContentKey> contentKeyTupleBinding = new MessageContentKeyTB_5();
+ contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry);
+ DatabaseEntry value = new DatabaseEntry();
+ TupleBinding<ByteBuffer> contentTupleBinding = new ContentTB();
+
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Message Id: " + messageId + " Getting content body from offset: " + offset);
+ }
+
+ int written = 0;
+ int seenSoFar = 0;
+
+ Cursor cursor = null;
+ try
+ {
+ cursor = _messageContentDb.openCursor(null, null);
+
+ OperationStatus status = cursor.getSearchKeyRange(contentKeyEntry, value, LockMode.READ_UNCOMMITTED);
+
+ while (status == OperationStatus.SUCCESS)
+ {
+ mck = (MessageContentKey_5) contentKeyTupleBinding.entryToObject(contentKeyEntry);
+ long id = mck.getMessageId();
+
+ if(id != messageId)
+ {
+ //we have exhausted all chunks for this message id, break
+ break;
+ }
+
+ int offsetInMessage = mck.getOffset();
+ ByteBuffer buf = (ByteBuffer) contentTupleBinding.entryToObject(value);
+
+ final int size = (int) buf.limit();
+
+ seenSoFar += size;
+
+ if(seenSoFar >= offset)
+ {
+ byte[] dataAsBytes = buf.array();
+
+ int posInArray = offset + written - offsetInMessage;
+ int count = size - posInArray;
+ if(count > dst.remaining())
+ {
+ count = dst.remaining();
+ }
+ dst.put(dataAsBytes,posInArray,count);
+ written+=count;
+
+ if(dst.remaining() == 0)
+ {
+ break;
+ }
+ }
+
+ status = cursor.getNext(contentKeyEntry, value, LockMode.RMW);
+ }
+
+ return written;
+ }
+ catch (DatabaseException e)
+ {
+ throw new AMQStoreException("Error writing AMQMessage with id " + messageId + " to database: " + e.getMessage(), e);
+ }
+ finally
+ {
+ if(cursor != null)
+ {
+ try
+ {
+ cursor.close();
+ }
+ catch (DatabaseException e)
+ {
+ throw new AMQStoreException("Error writing AMQMessage with id " + messageId + " to database: " + e.getMessage(), e);
+ }
+ }
+ }
+ }
+
+ public boolean isPersistent()
+ {
+ return true;
+ }
+
+ public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData)
+ {
+ if(metaData.isPersistent())
+ {
+ return new StoredBDBMessage(getNewMessageId(), metaData);
+ }
+ else
+ {
+ return new StoredMemoryMessage(getNewMessageId(), metaData);
+ }
+ }
+
+
+ //protected getters for the TupleBindingFactories
+
+ protected QueueTupleBindingFactory getQueueTupleBindingFactory()
+ {
+ return _queueTupleBindingFactory;
+ }
+
+ protected BindingTupleBindingFactory getBindingTupleBindingFactory()
+ {
+ return _bindingTupleBindingFactory;
+ }
+
+ protected MessageMetaDataTupleBindingFactory getMetaDataTupleBindingFactory()
+ {
+ return _metaDataTupleBindingFactory;
+ }
+
+ //Package getters for the various databases used by the Store
+
+ Database getMetaDataDb()
+ {
+ return _messageMetaDataDb;
+ }
+
+ Database getContentDb()
+ {
+ return _messageContentDb;
+ }
+
+ Database getQueuesDb()
+ {
+ return _queueDb;
+ }
+
+ Database getDeliveryDb()
+ {
+ return _deliveryDb;
+ }
+
+ Database getExchangesDb()
+ {
+ return _exchangeDb;
+ }
+
+ Database getBindingsDb()
+ {
+ return _queueBindingsDb;
+ }
+
+ void visitMetaDataDb(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException
+ {
+ visitDatabase(_messageMetaDataDb, visitor);
+ }
+
+ void visitContentDb(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException
+ {
+ visitDatabase(_messageContentDb, visitor);
+ }
+
+ void visitQueues(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException
+ {
+ visitDatabase(_queueDb, visitor);
+ }
+
+ void visitDelivery(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException
+ {
+ visitDatabase(_deliveryDb, visitor);
+ }
+
+ void visitExchanges(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException
+ {
+ visitDatabase(_exchangeDb, visitor);
+ }
+
+ void visitBindings(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException
+ {
+ visitDatabase(_queueBindingsDb, visitor);
+ }
+
+ /**
+ * Generic visitDatabase allows iteration through the specified database.
+ *
+ * @param database The database to visit
+ * @param visitor The visitor to give each entry to.
+ *
+ * @throws DatabaseException If there is a problem with the Database structure
+ * @throws AMQStoreException If there is a problem with the Database contents
+ */
+ void visitDatabase(Database database, DatabaseVisitor visitor) throws DatabaseException, AMQStoreException
+ {
+ Cursor cursor = database.openCursor(null, null);
+
+ try
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ visitor.visit(key, value);
+ }
+ }
+ finally
+ {
+ if (cursor != null)
+ {
+ cursor.close();
+ }
+ }
+ }
+
+ private StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit) throws DatabaseException
+ {
+ // _log.debug("void commit(Transaction tx = " + tx + ", sync = " + syncCommit + "): called");
+
+ tx.commitNoSync();
+
+ BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit);
+ commitFuture.commit();
+
+ return commitFuture;
+ }
+
+ public void startCommitThread()
+ {
+ _commitThread.start();
+ }
+
+ private static final class BDBCommitFuture implements StoreFuture
+ {
+ // private static final Logger _log = Logger.getLogger(BDBCommitFuture.class);
+
+ private final CommitThread _commitThread;
+ private final com.sleepycat.je.Transaction _tx;
+ private DatabaseException _databaseException;
+ private boolean _complete;
+ private boolean _syncCommit;
+
+ public BDBCommitFuture(CommitThread commitThread, com.sleepycat.je.Transaction tx, boolean syncCommit)
+ {
+ // _log.debug("public Commit(CommitThread commitThread = " + commitThread + ", Transaction tx = " + tx
+ // + "): called");
+
+ _commitThread = commitThread;
+ _tx = tx;
+ _syncCommit = syncCommit;
+ }
+
+ public synchronized void complete()
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("public synchronized void complete(): called (Transaction = " + _tx + ")");
+ }
+
+ _complete = true;
+
+ notifyAll();
+ }
+
+ public synchronized void abort(DatabaseException databaseException)
+ {
+ // _log.debug("public synchronized void abort(DatabaseException databaseException = " + databaseException
+ // + "): called");
+
+ _complete = true;
+ _databaseException = databaseException;
+
+ notifyAll();
+ }
+
+ public void commit() throws DatabaseException
+ {
+ //_log.debug("public void commit(): called");
+
+ _commitThread.addJob(this);
+
+ if(!_syncCommit)
+ {
+ _log.debug("CommitAsync was requested, returning immediately.");
+ return;
+ }
+
+ synchronized (BDBCommitFuture.this)
+ {
+ while (!_complete)
+ {
+ try
+ {
+ wait(250);
+ }
+ catch (InterruptedException e)
+ {
+ // _log.error("Unexpected thread interruption: " + e, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ // _log.debug("Commit completed, _databaseException = " + _databaseException);
+
+ if (_databaseException != null)
+ {
+ throw _databaseException;
+ }
+ }
+ }
+
+ public synchronized boolean isComplete()
+ {
+ return _complete;
+ }
+
+ public void waitForCompletion()
+ {
+ while (!isComplete())
+ {
+ try
+ {
+ wait(250);
+ }
+ catch (InterruptedException e)
+ {
+ //TODO Should we ignore, or throw a 'StoreException'?
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations
+ * themselves are responsible for adding themselves to the queue and waiting for the commit to happen before
+ * continuing, but it is the responsibility of this thread to tell the commit operations when they have been
+ * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collarations </table>
+ */
+ private class CommitThread extends Thread
+ {
+ // private final Logger _log = Logger.getLogger(CommitThread.class);
+
+ private final AtomicBoolean _stopped = new AtomicBoolean(false);
+ private final AtomicReference<Queue<BDBCommitFuture>> _jobQueue = new AtomicReference<Queue<BDBCommitFuture>>(new ConcurrentLinkedQueue<BDBCommitFuture>());
+ private final CheckpointConfig _config = new CheckpointConfig();
+ private final Object _lock = new Object();
+
+ public CommitThread(String name)
+ {
+ super(name);
+ _config.setForce(true);
+
+ }
+
+ public void run()
+ {
+ while (!_stopped.get())
+ {
+ synchronized (_lock)
+ {
+ while (!_stopped.get() && !hasJobs())
+ {
+ try
+ {
+ // RHM-7 Periodically wake up and check, just in case we
+ // missed a notification. Don't want to lock the broker hard.
+ _lock.wait(250);
+ }
+ catch (InterruptedException e)
+ {
+ // _log.info(getName() + " interrupted. ");
+ }
+ }
+ }
+ processJobs();
+ }
+ }
+
+ private void processJobs()
+ {
+ // _log.debug("private void processJobs(): called");
+
+ // we replace the old queue atomically with a new one and this avoids any need to
+ // copy elements out of the queue
+ Queue<BDBCommitFuture> jobs = _jobQueue.getAndSet(new ConcurrentLinkedQueue<BDBCommitFuture>());
+
+ try
+ {
+ // _environment.checkpoint(_config);
+ _environment.sync();
+
+ for (BDBCommitFuture commit : jobs)
+ {
+ commit.complete();
+ }
+ }
+ catch (DatabaseException e)
+ {
+ for (BDBCommitFuture commit : jobs)
+ {
+ commit.abort(e);
+ }
+ }
+
+ }
+
+ private boolean hasJobs()
+ {
+ return !_jobQueue.get().isEmpty();
+ }
+
+ public void addJob(BDBCommitFuture commit)
+ {
+ synchronized (_lock)
+ {
+ _jobQueue.get().add(commit);
+ _lock.notifyAll();
+ }
+ }
+
+ public void close()
+ {
+ synchronized (_lock)
+ {
+ _stopped.set(true);
+ _lock.notifyAll();
+ }
+ }
+ }
+
+
+ private class StoredBDBMessage implements StoredMessage
+ {
+
+ private final long _messageId;
+ private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
+ private com.sleepycat.je.Transaction _txn;
+
+ StoredBDBMessage(long messageId, StorableMessageMetaData metaData)
+ {
+ this(messageId, metaData, true);
+ }
+
+
+ StoredBDBMessage(long messageId,
+ StorableMessageMetaData metaData, boolean persist)
+ {
+ try
+ {
+ _messageId = messageId;
+
+ _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
+ if(persist)
+ {
+ _txn = _environment.beginTransaction(null, null);
+ storeMetaData(_txn, messageId, metaData);
+ }
+ }
+ catch (DatabaseException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (AMQStoreException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ public StorableMessageMetaData getMetaData()
+ {
+ StorableMessageMetaData metaData = _metaDataRef.get();
+ if(metaData == null)
+ {
+ try
+ {
+ metaData = BDBMessageStore.this.getMessageMetaData(_messageId);
+ }
+ catch (AMQStoreException e)
+ {
+ throw new RuntimeException(e);
+ }
+ _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
+ }
+
+ return metaData;
+ }
+
+ public long getMessageNumber()
+ {
+ return _messageId;
+ }
+
+ public void addContent(int offsetInMessage, java.nio.ByteBuffer src)
+ {
+ try
+ {
+ BDBMessageStore.this.addContent(_txn, _messageId, offsetInMessage, src);
+ }
+ catch (AMQStoreException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public int getContent(int offsetInMessage, java.nio.ByteBuffer dst)
+ {
+ try
+ {
+ return BDBMessageStore.this.getContent(_messageId, offsetInMessage, dst);
+ }
+ catch (AMQStoreException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public StoreFuture flushToStore()
+ {
+ try
+ {
+ if(_txn != null)
+ {
+ //if(_log.isDebugEnabled())
+ //{
+ // _log.debug("Flushing message " + _messageId + " to store");
+ //}
+ BDBMessageStore.this.commitTranImpl(_txn, true);
+ }
+ }
+ catch (AMQStoreException e)
+ {
+ throw new RuntimeException(e);
+ }
+ finally
+ {
+ _txn = null;
+ }
+ return IMMEDIATE_FUTURE;
+ }
+
+ public void remove()
+ {
+ flushToStore();
+ try
+ {
+ BDBMessageStore.this.removeMessage(_messageId);
+ }
+ catch (AMQStoreException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private class BDBTransaction implements Transaction
+ {
+ private com.sleepycat.je.Transaction _txn;
+
+ private BDBTransaction()
+ {
+ try
+ {
+ _txn = _environment.beginTransaction(null, null);
+ }
+ catch (DatabaseException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+ {
+ BDBMessageStore.this.enqueueMessage(_txn, queue, messageId);
+ }
+
+ public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+ {
+ BDBMessageStore.this.dequeueMessage(_txn, queue, messageId);
+
+ }
+
+ public void commitTran() throws AMQStoreException
+ {
+ BDBMessageStore.this.commitTranImpl(_txn, true);
+ }
+
+ public StoreFuture commitTranAsync() throws AMQStoreException
+ {
+ return BDBMessageStore.this.commitTranImpl(_txn, false);
+ }
+
+ public void abortTran() throws AMQStoreException
+ {
+ BDBMessageStore.this.abortTran(_txn);
+ }
+ }
+
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
new file mode 100644
index 0000000000..211c025dcd
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
@@ -0,0 +1,1125 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
+import org.apache.qpid.server.store.berkeleydb.BindingKey;
+import org.apache.qpid.server.store.berkeleydb.ContentTB;
+import org.apache.qpid.server.store.berkeleydb.DatabaseVisitor;
+import org.apache.qpid.server.store.berkeleydb.ExchangeTB;
+import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_4;
+import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_5;
+import org.apache.qpid.server.store.berkeleydb.records.ExchangeRecord;
+import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
+import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTB_4;
+import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTB_5;
+import org.apache.qpid.server.store.berkeleydb.tuples.QueueEntryTB;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.BrokerActor;
+import org.apache.qpid.server.logging.NullRootMessageLogger;
+import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.util.FileUtils;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+
+import java.io.File;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.IOException;
+import java.io.FileNotFoundException;
+import java.nio.ByteBuffer;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map.Entry;
+
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.bind.tuple.TupleBinding;
+
+/**
+ * This is a simple BerkeleyDB Store upgrade tool that will upgrade a V4 Store to a V5 Store.
+ *
+ * Currently upgrade is fixed from v4 -> v5
+ *
+ * Improvments:
+ * - Add List BDBMessageStore.getDatabases(); This can the be iterated to guard against new DBs being added.
+ * - A version in the store would allow automated upgrade or later with more available versions interactive upgrade.
+ * - Add process logging and disable all Store and Qpid logging.
+ */
+public class BDBStoreUpgrade
+{
+ private static final Logger _logger = LoggerFactory.getLogger(BDBStoreUpgrade.class);
+ /** The Store Directory that needs upgrading */
+ File _fromDir;
+ /** The Directory that will be made to contain the upgraded store */
+ File _toDir;
+ /** The Directory that will be made to backup the original store if required */
+ File _backupDir;
+
+ /** The Old Store */
+ BDBMessageStore _oldMessageStore;
+ /** The New Store */
+ BDBMessageStore _newMessageStore;
+ /** The file ending that is used by BDB Store Files */
+ private static final String BDB_FILE_ENDING = ".jdb";
+
+ static final Options _options = new Options();
+ static CommandLine _commandLine;
+ private boolean _interactive;
+ private boolean _force;
+
+ private static final String VERSION = "3.0";
+ private static final String USER_ABORTED_PROCESS = "User aborted process";
+ private static final int LOWEST_SUPPORTED_STORE_VERSION = 4;
+ private static final String PREVIOUS_STORE_VERSION_UNSUPPORTED = "Store upgrade from version {0} is not supported."
+ + " You must first run the previous store upgrade tool.";
+ private static final String FOLLOWING_STORE_VERSION_UNSUPPORTED = "Store version {0} is newer than this tool supports. "
+ + "You must use a newer version of the store upgrade tool";
+ private static final String STORE_ALREADY_UPGRADED = "Store has already been upgraded to version {0}.";
+
+ private static final String OPTION_INPUT_SHORT = "i";
+ private static final String OPTION_INPUT = "input";
+ private static final String OPTION_OUTPUT_SHORT = "o";
+ private static final String OPTION_OUTPUT = "output";
+ private static final String OPTION_BACKUP_SHORT = "b";
+ private static final String OPTION_BACKUP = "backup";
+ private static final String OPTION_QUIET_SHORT = "q";
+ private static final String OPTION_QUIET = "quiet";
+ private static final String OPTION_FORCE_SHORT = "f";
+ private static final String OPTION_FORCE = "force";
+ private boolean _inplace = false;
+
+ public BDBStoreUpgrade(String fromDir, String toDir, String backupDir, boolean interactive, boolean force)
+ {
+ _interactive = interactive;
+ _force = force;
+
+ _fromDir = new File(fromDir);
+ if (!_fromDir.exists())
+ {
+ throw new IllegalArgumentException("BDBStore path '" + fromDir + "' could not be read. "
+ + "Ensure the path is correct and that the permissions are correct.");
+ }
+
+ if (!isDirectoryAStoreDir(_fromDir))
+ {
+ throw new IllegalArgumentException("Specified directory '" + fromDir + "' does not contain a valid BDBMessageStore.");
+ }
+
+ if (toDir == null)
+ {
+ _inplace = true;
+ _toDir = new File(fromDir+"-Inplace");
+ }
+ else
+ {
+ _toDir = new File(toDir);
+ }
+
+ if (_toDir.exists())
+ {
+ if (_interactive)
+ {
+ if (toDir == null)
+ {
+ System.out.println("Upgrading in place:" + fromDir);
+ }
+ else
+ {
+ System.out.println("Upgrade destination: '" + toDir + "'");
+ }
+
+ if (userInteract("Upgrade destination exists do you wish to replace it?"))
+ {
+ if (!FileUtils.delete(_toDir, true))
+ {
+ throw new IllegalArgumentException("Unable to remove upgrade destination '" + _toDir + "'");
+ }
+ }
+ else
+ {
+ throw new IllegalArgumentException("Upgrade destination '" + _toDir + "' already exists. ");
+ }
+ }
+ else
+ {
+ if (_force)
+ {
+ if (!FileUtils.delete(_toDir, true))
+ {
+ throw new IllegalArgumentException("Unable to remove upgrade destination '" + _toDir + "'");
+ }
+ }
+ else
+ {
+ throw new IllegalArgumentException("Upgrade destination '" + _toDir + "' already exists. ");
+ }
+ }
+ }
+
+ if (!_toDir.mkdirs())
+ {
+ throw new IllegalArgumentException("Upgrade destination '" + _toDir + "' could not be created. "
+ + "Ensure the path is correct and that the permissions are correct.");
+ }
+
+ if (backupDir != null)
+ {
+ if (backupDir.equals(""))
+ {
+ _backupDir = new File(_fromDir.getAbsolutePath().toString() + "-Backup");
+ }
+ else
+ {
+ _backupDir = new File(backupDir);
+ }
+ }
+ else
+ {
+ _backupDir = null;
+ }
+ }
+
+ private static String ANSWER_OPTIONS = " Yes/No/Abort? ";
+ private static String ANSWER_NO = "no";
+ private static String ANSWER_N = "n";
+ private static String ANSWER_YES = "yes";
+ private static String ANSWER_Y = "y";
+ private static String ANSWER_ABORT = "abort";
+ private static String ANSWER_A = "a";
+
+ /**
+ * Interact with the user via System.in and System.out. If the user wishes to Abort then a RuntimeException is thrown.
+ * Otherwise the method will return based on their response true=yes false=no.
+ *
+ * @param message Message to print out
+ *
+ * @return boolean response from user if they wish to proceed
+ */
+ private boolean userInteract(String message)
+ {
+ System.out.print(message + ANSWER_OPTIONS);
+ BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
+
+ String input = "";
+ try
+ {
+ input = br.readLine();
+ }
+ catch (IOException e)
+ {
+ input = "";
+ }
+
+ if (input.equalsIgnoreCase(ANSWER_Y) || input.equalsIgnoreCase(ANSWER_YES))
+ {
+ return true;
+ }
+ else
+ {
+ if (input.equalsIgnoreCase(ANSWER_N) || input.equalsIgnoreCase(ANSWER_NO))
+ {
+ return false;
+ }
+ else
+ {
+ if (input.equalsIgnoreCase(ANSWER_A) || input.equalsIgnoreCase(ANSWER_ABORT))
+ {
+ throw new RuntimeException(USER_ABORTED_PROCESS);
+ }
+ }
+ }
+
+ return userInteract(message);
+ }
+
+ /**
+ * Upgrade a Store of a specified version to the latest version.
+ *
+ * @param version the version of the current store
+ *
+ * @throws Exception
+ */
+ public void upgradeFromVersion(int version) throws Exception
+ {
+ upgradeFromVersion(version, _fromDir, _toDir, _backupDir, _force,
+ _inplace);
+ }
+
+ /**
+ * Upgrade a Store of a specified version to the latest version.
+ *
+ * @param version the version of the current store
+ * @param fromDir the directory with the old Store
+ * @param toDir the directrory to hold the newly Upgraded Store
+ * @param backupDir the directrory to backup to if required
+ * @param force suppress all questions
+ * @param inplace replace the from dir with the upgraded result in toDir
+ *
+ * @throws Exception due to Virtualhost/MessageStore.close() being
+ * rather poor at exception handling
+ * @throws DatabaseException if there is a problem with the store formats
+ * @throws AMQException if there is an issue creating Qpid data structures
+ */
+ public void upgradeFromVersion(int version, File fromDir, File toDir,
+ File backupDir, boolean force,
+ boolean inplace) throws Exception
+ {
+ _logger.info("Located store to upgrade at '" + fromDir + "'");
+
+ // Verify user has created a backup, giving option to perform backup
+ if (_interactive)
+ {
+ if (!userInteract("Have you performed a DB backup of this store."))
+ {
+ File backup;
+ if (backupDir == null)
+ {
+ backup = new File(fromDir.getAbsolutePath().toString() + "-Backup");
+ }
+ else
+ {
+ backup = backupDir;
+ }
+
+ if (userInteract("Do you wish to perform a DB backup now? " +
+ "(Store will be backed up to '" + backup.getName() + "')"))
+ {
+ performDBBackup(fromDir, backup, force);
+ }
+ else
+ {
+ if (!userInteract("Are you sure wish to proceed with DB migration without backup? " +
+ "(For more details of the consequences check the Qpid/BDB Message Store Wiki)."))
+ {
+ throw new IllegalArgumentException("Upgrade stopped at user request as no DB Backup performed.");
+ }
+ }
+ }
+ else
+ {
+ if (!inplace)
+ {
+ _logger.info("Upgrade will create a new store at '" + toDir + "'");
+ }
+
+ _logger.info("Using the contents in the Message Store '" + fromDir + "'");
+
+ if (!userInteract("Do you wish to proceed?"))
+ {
+ throw new IllegalArgumentException("Upgrade stopped as did not wish to proceed");
+ }
+ }
+ }
+ else
+ {
+ if (backupDir != null)
+ {
+ performDBBackup(fromDir, backupDir, force);
+ }
+ }
+
+ CurrentActor.set(new BrokerActor(new NullRootMessageLogger()));
+
+ //Create a new messageStore
+ _newMessageStore = new BDBMessageStore();
+ _newMessageStore.configure(toDir, false);
+ _newMessageStore.start();
+
+ try
+ {
+ //Load the old MessageStore
+ switch (version)
+ {
+ default:
+ case 4:
+ _oldMessageStore = new BDBMessageStore(4);
+ _oldMessageStore.configure(fromDir, true);
+ _oldMessageStore.start();
+ upgradeFromVersion_4();
+ break;
+ case 3:
+ case 2:
+ case 1:
+ throw new IllegalArgumentException(MessageFormat.format(PREVIOUS_STORE_VERSION_UNSUPPORTED,
+ new Object[] { Integer.toString(version) }));
+ }
+ }
+ finally
+ {
+ _newMessageStore.close();
+ if (_oldMessageStore != null)
+ {
+ _oldMessageStore.close();
+ }
+ // if we are running inplace then swap fromDir and toDir
+ if (inplace)
+ {
+ // Remove original copy
+ if (FileUtils.delete(fromDir, true))
+ {
+ // Rename upgraded store
+ toDir.renameTo(fromDir);
+ }
+ else
+ {
+ throw new RuntimeException("Unable to upgrade inplace as " +
+ "unable to delete source '"
+ +fromDir+"', Store upgrade " +
+ "successfully performed to :"
+ +toDir);
+ }
+ }
+ }
+ }
+
+ private void upgradeFromVersion_4() throws AMQException, DatabaseException
+ {
+ _logger.info("Starting store upgrade from version 4");
+
+ //Migrate _exchangeDb;
+ _logger.info("Exchanges");
+
+ moveContents(_oldMessageStore.getExchangesDb(), _newMessageStore.getExchangesDb(), "Exchange");
+
+ final List<AMQShortString> topicExchanges = new ArrayList<AMQShortString>();
+ final TupleBinding exchangeTB = new ExchangeTB();
+
+ DatabaseVisitor exchangeListVisitor = new DatabaseVisitor()
+ {
+ public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
+ {
+ ExchangeRecord exchangeRec = (ExchangeRecord) exchangeTB.entryToObject(value);
+ AMQShortString type = exchangeRec.getType();
+
+ if (ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(type))
+ {
+ topicExchanges.add(exchangeRec.getNameShortString());
+ }
+ }
+ };
+ _oldMessageStore.visitExchanges(exchangeListVisitor);
+
+
+ //Migrate _queueBindingsDb;
+ _logger.info("Queue Bindings");
+ moveContents(_oldMessageStore.getBindingsDb(), _newMessageStore.getBindingsDb(), "Queue Binding");
+
+ //Inspect the bindings to gather a list of queues which are probably durable subscriptions, i.e. those
+ //which have a colon in their name and are bound to the Topic exchanges above
+ final List<AMQShortString> durableSubQueues = new ArrayList<AMQShortString>();
+ final TupleBinding<BindingKey> bindingTB = _oldMessageStore.getBindingTupleBindingFactory().getInstance();
+
+ DatabaseVisitor durSubQueueListVisitor = new DatabaseVisitor()
+ {
+ public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
+ {
+ BindingKey bindingRec = (BindingKey) bindingTB.entryToObject(key);
+ AMQShortString queueName = bindingRec.getQueueName();
+ AMQShortString exchangeName = bindingRec.getExchangeName();
+
+ if (topicExchanges.contains(exchangeName) && queueName.asString().contains(":"))
+ {
+ durableSubQueues.add(queueName);
+ }
+ }
+ };
+ _oldMessageStore.visitBindings(durSubQueueListVisitor);
+
+
+ //Migrate _queueDb;
+ _logger.info("Queues");
+
+ // hold the list of existing queue names
+ final List<AMQShortString> existingQueues = new ArrayList<AMQShortString>();
+
+ final TupleBinding<QueueRecord> queueTupleBinding = _oldMessageStore.getQueueTupleBindingFactory().getInstance();
+
+ DatabaseVisitor queueVisitor = new DatabaseVisitor()
+ {
+ public void visit(DatabaseEntry key, DatabaseEntry value) throws AMQStoreException
+ {
+ QueueRecord queueRec = (QueueRecord) queueTupleBinding.entryToObject(value);
+ AMQShortString queueName = queueRec.getNameShortString();
+
+ //if the queue name is in the gathered list then set its exclusivity true
+ if (durableSubQueues.contains(queueName))
+ {
+ _logger.info("Marking as possible DurableSubscription backing queue: " + queueName);
+ queueRec.setExclusive(true);
+ }
+
+ //The simple call to createQueue with the QueueRecord object is sufficient for a v2->v3 upgrade as
+ //the extra 'exclusive' property in v3 will be defaulted to false in the record creation.
+ _newMessageStore.createQueue(queueRec);
+
+ _count++;
+ existingQueues.add(queueName);
+ }
+ };
+ _oldMessageStore.visitQueues(queueVisitor);
+
+ logCount(queueVisitor.getVisitedCount(), "Queue");
+
+
+ // Look for persistent messages stored for non-durable queues
+ _logger.info("Checking for messages previously sent to non-durable queues");
+
+ // track all message delivery to existing queues
+ final HashSet<Long> queueMessages = new HashSet<Long>();
+
+ // hold all non existing queues and their messages IDs
+ final HashMap<String, HashSet<Long>> phantomMessageQueues = new HashMap<String, HashSet<Long>>();
+
+ // delivery DB visitor to check message delivery and identify non existing queues
+ final QueueEntryTB queueEntryTB = new QueueEntryTB();
+ DatabaseVisitor messageDeliveryCheckVisitor = new DatabaseVisitor()
+ {
+ public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
+ {
+ QueueEntryKey entryKey = (QueueEntryKey) queueEntryTB.entryToObject(key);
+ Long messageId = entryKey.getMessageId();
+ AMQShortString queueName = entryKey.getQueueName();
+ if (!existingQueues.contains(queueName))
+ {
+ String name = queueName.asString();
+ HashSet<Long> messages = phantomMessageQueues.get(name);
+ if (messages == null)
+ {
+ messages = new HashSet<Long>();
+ phantomMessageQueues.put(name, messages);
+ }
+ messages.add(messageId);
+ _count++;
+ }
+ else
+ {
+ queueMessages.add(messageId);
+ }
+ }
+ };
+ _oldMessageStore.visitDelivery(messageDeliveryCheckVisitor);
+
+ if (phantomMessageQueues.isEmpty())
+ {
+ _logger.info("No such messages were found");
+ }
+ else
+ {
+ _logger.info("Found " + messageDeliveryCheckVisitor.getVisitedCount()+ " such messages in total");
+
+ for (Entry<String, HashSet<Long>> phantomQueue : phantomMessageQueues.entrySet())
+ {
+ String queueName = phantomQueue.getKey();
+ HashSet<Long> messages = phantomQueue.getValue();
+
+ _logger.info(MessageFormat.format("There are {0} messages which were previously delivered to non-durable queue ''{1}''",messages.size(), queueName));
+
+ boolean createQueue;
+ if(!_interactive)
+ {
+ createQueue = true;
+ _logger.info("Running in batch-mode, marking queue as durable to ensure retention of the messages.");
+ }
+ else
+ {
+ createQueue = userInteract("Do you want to make this queue durable?\n"
+ + "NOTE: Answering No will result in these messages being discarded!");
+ }
+
+ if (createQueue)
+ {
+ for (Long messageId : messages)
+ {
+ queueMessages.add(messageId);
+ }
+ AMQShortString name = new AMQShortString(queueName);
+ existingQueues.add(name);
+ QueueRecord record = new QueueRecord(name, null, false, null);
+ _newMessageStore.createQueue(record);
+ }
+ }
+ }
+
+
+ //Migrate _messageMetaDataDb;
+ _logger.info("Message MetaData");
+
+ final Database newMetaDataDB = _newMessageStore.getMetaDataDb();
+ final TupleBinding<Object> oldMetaDataTupleBinding = _oldMessageStore.getMetaDataTupleBindingFactory().getInstance();
+ final TupleBinding<Object> newMetaDataTupleBinding = _newMessageStore.getMetaDataTupleBindingFactory().getInstance();
+
+ DatabaseVisitor metaDataVisitor = new DatabaseVisitor()
+ {
+ public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
+ {
+ _count++;
+ MessageMetaData metaData = (MessageMetaData) oldMetaDataTupleBinding.entryToObject(value);
+
+ // get message id
+ Long messageId = TupleBinding.getPrimitiveBinding(Long.class).entryToObject(key);
+
+ // ONLY copy data if message is delivered to existing queue
+ if (!queueMessages.contains(messageId))
+ {
+ return;
+ }
+ DatabaseEntry newValue = new DatabaseEntry();
+ newMetaDataTupleBinding.objectToEntry(metaData, newValue);
+
+ newMetaDataDB.put(null, key, newValue);
+ }
+ };
+ _oldMessageStore.visitMetaDataDb(metaDataVisitor);
+
+ logCount(metaDataVisitor.getVisitedCount(), "Message MetaData");
+
+
+ //Migrate _messageContentDb;
+ _logger.info("Message Contents");
+ final Database newContentDB = _newMessageStore.getContentDb();
+
+ final TupleBinding<MessageContentKey> oldContentKeyTupleBinding = new MessageContentKeyTB_4();
+ final TupleBinding<MessageContentKey> newContentKeyTupleBinding = new MessageContentKeyTB_5();
+ final TupleBinding contentTB = new ContentTB();
+
+ DatabaseVisitor contentVisitor = new DatabaseVisitor()
+ {
+ long _prevMsgId = -1; //Initialise to invalid value
+ int _bytesSeenSoFar = 0;
+
+ public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
+ {
+ _count++;
+
+ //determine the msgId of the current entry
+ MessageContentKey_4 contentKey = (MessageContentKey_4) oldContentKeyTupleBinding.entryToObject(key);
+ long msgId = contentKey.getMessageId();
+
+ // ONLY copy data if message is delivered to existing queue
+ if (!queueMessages.contains(msgId))
+ {
+ return;
+ }
+ //if this is a new message, restart the byte offset count.
+ if(_prevMsgId != msgId)
+ {
+ _bytesSeenSoFar = 0;
+ }
+
+ //determine the content size
+ ByteBuffer content = (ByteBuffer) contentTB.entryToObject(value);
+ int contentSize = content.limit();
+
+ //create the new key: id + previously seen data count
+ MessageContentKey_5 newKey = new MessageContentKey_5(msgId, _bytesSeenSoFar);
+ DatabaseEntry newKeyEntry = new DatabaseEntry();
+ newContentKeyTupleBinding.objectToEntry(newKey, newKeyEntry);
+
+ DatabaseEntry newValueEntry = new DatabaseEntry();
+ contentTB.objectToEntry(content, newValueEntry);
+
+ newContentDB.put(null, newKeyEntry, newValueEntry);
+
+ _prevMsgId = msgId;
+ _bytesSeenSoFar += contentSize;
+ }
+ };
+ _oldMessageStore.visitContentDb(contentVisitor);
+
+ logCount(contentVisitor.getVisitedCount(), "Message Content");
+
+
+ //Migrate _deliveryDb;
+ _logger.info("Delivery Records");
+ final Database deliveryDb =_newMessageStore.getDeliveryDb();
+ DatabaseVisitor deliveryDbVisitor = new DatabaseVisitor()
+ {
+
+ public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
+ {
+ _count++;
+
+ // get message id from entry key
+ QueueEntryKey entryKey = (QueueEntryKey) queueEntryTB.entryToObject(key);
+ AMQShortString queueName = entryKey.getQueueName();
+
+ // ONLY copy data if message queue exists
+ if (existingQueues.contains(queueName))
+ {
+ deliveryDb.put(null, key, value);
+ }
+ }
+ };
+ _oldMessageStore.visitDelivery(deliveryDbVisitor);
+ logCount(contentVisitor.getVisitedCount(), "Delivery Record");
+ }
+
+ /**
+ * Log the specified count for item in a user friendly way.
+ *
+ * @param count of items to log
+ * @param item description of what is being logged.
+ */
+ private void logCount(int count, String item)
+ {
+ _logger.info(" " + count + " " + item + " " + (count == 1 ? "entry" : "entries"));
+ }
+
+ /**
+ * @param oldDatabase The old MessageStoreDB to perform the visit on
+ * @param newDatabase The new MessageStoreDB to copy the data to.
+ * @param contentName The string name of the content for display purposes.
+ *
+ * @throws AMQException Due to createQueue thorwing AMQException
+ * @throws DatabaseException If there is a problem with the loading of the data
+ */
+ private void moveContents(Database oldDatabase, final Database newDatabase, String contentName) throws AMQException, DatabaseException
+ {
+
+ DatabaseVisitor moveVisitor = new DatabaseVisitor()
+ {
+ public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
+ {
+ _count++;
+ newDatabase.put(null, key, value);
+ }
+ };
+
+ _oldMessageStore.visitDatabase(oldDatabase, moveVisitor);
+
+ logCount(moveVisitor.getVisitedCount(), contentName);
+ }
+
+ private static void usage()
+ {
+ System.out.println("usage: BDBStoreUpgrade:\n [-h|--help] [-q|--quiet] [-f|--force] [-b|--backup <Path to backup-db>] " +
+ "-i|--input <Path to input-db> [-o|--output <Path to upgraded-db>]");
+ }
+
+ private static void help()
+ {
+ System.out.println("usage: BDBStoreUpgrade:");
+ System.out.println("Required:");
+ for (Object obj : _options.getOptions())
+ {
+ Option option = (Option) obj;
+ if (option.isRequired())
+ {
+ System.out.println("-" + option.getOpt() + "|--" + option.getLongOpt() + "\t\t-\t" + option.getDescription());
+ }
+ }
+
+ System.out.println("\nOptions:");
+ for (Object obj : _options.getOptions())
+ {
+ Option option = (Option) obj;
+ if (!option.isRequired())
+ {
+ System.out.println("--" + option.getLongOpt() + "|-" + option.getOpt() + "\t\t-\t" + option.getDescription());
+ }
+ }
+ }
+
+ static boolean isDirectoryAStoreDir(File directory)
+ {
+ if (directory.isFile())
+ {
+ return false;
+ }
+
+ for (File file : directory.listFiles())
+ {
+ if (file.isFile())
+ {
+ if (file.getName().endsWith(BDB_FILE_ENDING))
+ {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ static File[] discoverDBStores(File fromDir)
+ {
+ if (!fromDir.exists())
+ {
+ throw new IllegalArgumentException("'" + fromDir + "' does not exist unable to upgrade.");
+ }
+
+ // Ensure we are given a directory
+ if (fromDir.isFile())
+ {
+ throw new IllegalArgumentException("'" + fromDir + "' is not a directory unable to upgrade.");
+ }
+
+ // Check to see if we have been given a single directory
+ if (isDirectoryAStoreDir(fromDir))
+ {
+ return new File[]{fromDir};
+ }
+
+ // Check to see if we have been give a directory containing stores.
+ List<File> stores = new LinkedList<File>();
+
+ for (File directory : fromDir.listFiles())
+ {
+ if (directory.isDirectory())
+ {
+ if (isDirectoryAStoreDir(directory))
+ {
+ stores.add(directory);
+ }
+ }
+ }
+
+ return stores.toArray(new File[stores.size()]);
+ }
+
+ private static void performDBBackup(File source, File backup, boolean force) throws Exception
+ {
+ if (backup.exists())
+ {
+ if (force)
+ {
+ _logger.info("Backup location exists. Forced to remove.");
+ FileUtils.delete(backup, true);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Unable to perform backup a backup already exists.");
+ }
+ }
+
+ try
+ {
+ _logger.info("Backing up '" + source + "' to '" + backup + "'");
+ FileUtils.copyRecursive(source, backup);
+ }
+ catch (FileNotFoundException e)
+ {
+ //Throwing IAE here as this will be reported as a Backup not started
+ throw new IllegalArgumentException("Unable to perform backup:" + e.getMessage());
+ }
+ catch (FileUtils.UnableToCopyException e)
+ {
+ //Throwing exception here as this will be reported as a Failed Backup
+ throw new Exception("Unable to perform backup due to:" + e.getMessage());
+ }
+ }
+
+ public static void main(String[] args) throws ParseException
+ {
+ setOptions(_options);
+
+ final Options helpOptions = new Options();
+ setHelpOptions(helpOptions);
+
+ //Display help
+ boolean displayHelp = false;
+ try
+ {
+ if (new PosixParser().parse(helpOptions, args).hasOption("h"))
+ {
+ showHelp();
+ }
+ }
+ catch (ParseException pe)
+ {
+ displayHelp = true;
+ }
+
+ //Parse commandline for required arguments
+ try
+ {
+ _commandLine = new PosixParser().parse(_options, args);
+ }
+ catch (ParseException mae)
+ {
+ if (displayHelp)
+ {
+ showHelp();
+ }
+ else
+ {
+ fatalError(mae.getMessage());
+ }
+ }
+
+ String fromDir = _commandLine.getOptionValue(OPTION_INPUT_SHORT);
+ String toDir = _commandLine.getOptionValue(OPTION_OUTPUT_SHORT);
+ String backupDir = _commandLine.getOptionValue(OPTION_BACKUP_SHORT);
+
+ if (backupDir == null && _commandLine.hasOption(OPTION_BACKUP_SHORT))
+ {
+ backupDir = "";
+ }
+
+ //Attempt to locate possible Store to upgrade on input path
+ File[] stores = new File[0];
+ try
+ {
+ stores = discoverDBStores(new File(fromDir));
+ }
+ catch (IllegalArgumentException iae)
+ {
+ fatalError(iae.getMessage());
+ }
+
+ boolean interactive = !_commandLine.hasOption(OPTION_QUIET_SHORT);
+ boolean force = _commandLine.hasOption(OPTION_FORCE_SHORT);
+
+ try{
+ for (File store : stores)
+ {
+
+ // if toDir is null then we are upgrading inplace so we don't need
+ // to provide an upgraded toDir when upgrading multiple stores.
+ if (toDir == null ||
+ // Check to see if we are upgrading a store specified in
+ // fromDir or if the directories are nested.
+ (stores.length > 0
+ && stores[0].toString().length() == fromDir.length()))
+ {
+ upgrade(store, toDir, backupDir, interactive, force);
+ }
+ else
+ {
+ // Add the extra part of path from store to the toDir
+ upgrade(store, toDir + File.separator + store.toString().substring(fromDir.length()), backupDir, interactive, force);
+ }
+ }
+ }
+ catch (RuntimeException re)
+ {
+ if (!(USER_ABORTED_PROCESS).equals(re.getMessage()))
+ {
+ re.printStackTrace();
+ _logger.error("Upgrade Failed: " + re.getMessage());
+ }
+ else
+ {
+ _logger.error("Upgrade stopped : User aborted");
+ }
+ }
+
+ }
+
+ @SuppressWarnings("static-access")
+ private static void setOptions(Options options)
+ {
+ Option input =
+ OptionBuilder.isRequired().hasArg().withDescription("Location (Path) of store to upgrade.").withLongOpt(OPTION_INPUT)
+ .create(OPTION_INPUT_SHORT);
+
+ Option output =
+ OptionBuilder.hasArg().withDescription("Location (Path) for the upgraded-db to be written.").withLongOpt(OPTION_OUTPUT)
+ .create(OPTION_OUTPUT_SHORT);
+
+ Option quiet = new Option(OPTION_QUIET_SHORT, OPTION_QUIET, false, "Disable interactive options.");
+
+ Option force = new Option(OPTION_FORCE_SHORT, OPTION_FORCE, false, "Force upgrade removing any existing upgrade target.");
+ Option backup =
+ OptionBuilder.hasOptionalArg().withDescription("Location (Path) for the backup-db to be written.").withLongOpt(OPTION_BACKUP)
+ .create(OPTION_BACKUP_SHORT);
+
+ options.addOption(input);
+ options.addOption(output);
+ options.addOption(quiet);
+ options.addOption(force);
+ options.addOption(backup);
+ setHelpOptions(options);
+ }
+
+ private static void setHelpOptions(Options options)
+ {
+ options.addOption(new Option("h", "help", false, "Show this help."));
+ }
+
+ static void upgrade(File fromDir, String toDir, String backupDir, boolean interactive, boolean force)
+ {
+
+ _logger.info("Running BDB Message Store upgrade tool: v" + VERSION);
+ int version = getStoreVersion(fromDir);
+ if (!isVersionUpgradable(version))
+ {
+ return;
+ }
+ try
+ {
+ new BDBStoreUpgrade(fromDir.toString(), toDir, backupDir, interactive, force).upgradeFromVersion(version);
+
+ _logger.info("Upgrade complete.");
+ }
+ catch (IllegalArgumentException iae)
+ {
+ _logger.error("Upgrade not started due to: " + iae.getMessage());
+ }
+ catch (DatabaseException de)
+ {
+ de.printStackTrace();
+ _logger.error("Upgrade Failed: " + de.getMessage());
+ }
+ catch (RuntimeException re)
+ {
+ if (!(USER_ABORTED_PROCESS).equals(re.getMessage()))
+ {
+ re.printStackTrace();
+ _logger.error("Upgrade Failed: " + re.getMessage());
+ }
+ else
+ {
+ throw re;
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ _logger.error("Upgrade Failed: " + e.getMessage());
+ }
+ }
+
+ /**
+ * Utility method to verify if store of given version can be upgraded.
+ *
+ * @param version
+ * store version to verify
+ * @return true if store can be upgraded, false otherwise
+ */
+ protected static boolean isVersionUpgradable(int version)
+ {
+ boolean storeUpgradable = false;
+ if (version == 0)
+ {
+ _logger.error("Existing store version is undefined!");
+ }
+ else if (version < LOWEST_SUPPORTED_STORE_VERSION)
+ {
+ _logger.error(MessageFormat.format(PREVIOUS_STORE_VERSION_UNSUPPORTED,
+ new Object[] { Integer.toString(version) }));
+ }
+ else if (version == BDBMessageStore.DATABASE_FORMAT_VERSION)
+ {
+ _logger.error(MessageFormat.format(STORE_ALREADY_UPGRADED, new Object[] { Integer.toString(version) }));
+ }
+ else if (version > BDBMessageStore.DATABASE_FORMAT_VERSION)
+ {
+ _logger.error(MessageFormat.format(FOLLOWING_STORE_VERSION_UNSUPPORTED,
+ new Object[] { Integer.toString(version) }));
+ }
+ else
+ {
+ _logger.info("Existing store version is " + version);
+ storeUpgradable = true;
+ }
+ return storeUpgradable;
+ }
+
+ /**
+ * Detects existing store version by checking list of database in store
+ * environment
+ *
+ * @param fromDir
+ * store folder
+ * @return version
+ */
+ public static int getStoreVersion(File fromDir)
+ {
+ int version = 0;
+ EnvironmentConfig envConfig = new EnvironmentConfig();
+ envConfig.setAllowCreate(false);
+ envConfig.setTransactional(false);
+ envConfig.setReadOnly(true);
+ Environment environment = null;
+ try
+ {
+
+ environment = new Environment(fromDir, envConfig);
+ List<String> databases = environment.getDatabaseNames();
+ for (String name : databases)
+ {
+ if (name.startsWith("exchangeDb"))
+ {
+ if (name.startsWith("exchangeDb_v"))
+ {
+ version = Integer.parseInt(name.substring(12));
+ }
+ else
+ {
+ version = 1;
+ }
+ break;
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.error("Failure to open existing database: " + e.getMessage());
+ }
+ finally
+ {
+ if (environment != null)
+ {
+ try
+ {
+ environment.close();
+ }
+ catch (Exception e)
+ {
+ // ignoring. It should never happen.
+ }
+ }
+ }
+ return version;
+ }
+
+ private static void fatalError(String message)
+ {
+ System.out.println(message);
+ usage();
+ System.exit(1);
+ }
+
+ private static void showHelp()
+ {
+ help();
+ System.exit(0);
+ }
+
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingKey.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingKey.java
new file mode 100644
index 0000000000..396f0ed817
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingKey.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+
+public class BindingKey extends Object
+{
+ private final AMQShortString _exchangeName;
+ private final AMQShortString _queueName;
+ private final AMQShortString _routingKey;
+ private final FieldTable _arguments;
+
+ public BindingKey(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey, FieldTable arguments)
+ {
+ _exchangeName = exchangeName;
+ _queueName = queueName;
+ _routingKey = routingKey;
+ _arguments = arguments;
+ }
+
+
+ public AMQShortString getExchangeName()
+ {
+ return _exchangeName;
+ }
+
+ public AMQShortString getQueueName()
+ {
+ return _queueName;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return _routingKey;
+ }
+
+ public FieldTable getArguments()
+ {
+ return _arguments;
+ }
+
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java
new file mode 100644
index 0000000000..5ea3e9c2e8
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.nio.ByteBuffer;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+public class ContentTB extends TupleBinding
+{
+ public Object entryToObject(TupleInput tupleInput)
+ {
+
+ final int size = tupleInput.readInt();
+ byte[] underlying = new byte[size];
+ tupleInput.readFast(underlying);
+ return ByteBuffer.wrap(underlying);
+ }
+
+ public void objectToEntry(Object object, TupleOutput tupleOutput)
+ {
+ ByteBuffer src = (ByteBuffer) object;
+
+ src = src.slice();
+
+ byte[] chunkData = new byte[src.limit()];
+ src.duplicate().get(chunkData);
+
+ tupleOutput.writeInt(chunkData.length);
+ tupleOutput.writeFast(chunkData);
+ }
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DatabaseVisitor.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DatabaseVisitor.java
new file mode 100644
index 0000000000..9bd879025f
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DatabaseVisitor.java
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.qpid.AMQStoreException;
+
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+
+/** Visitor Interface so that each DatabaseEntry for a database can easily be processed. */
+public abstract class DatabaseVisitor
+{
+ protected int _count;
+
+ abstract public void visit(DatabaseEntry entry, DatabaseEntry value) throws AMQStoreException, DatabaseException;
+
+ public int getVisitedCount()
+ {
+ return _count;
+ }
+
+ public void resetVisitCount()
+ {
+ _count = 0;
+ }
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java
new file mode 100644
index 0000000000..f9c7828bef
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import org.apache.log4j.Logger;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.store.berkeleydb.records.ExchangeRecord;
+
+public class ExchangeTB extends TupleBinding
+{
+ private static final Logger _log = Logger.getLogger(ExchangeTB.class);
+
+ public ExchangeTB()
+ {
+ }
+
+ public Object entryToObject(TupleInput tupleInput)
+ {
+
+ AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
+ AMQShortString typeName = AMQShortStringEncoding.readShortString(tupleInput);
+
+ boolean autoDelete = tupleInput.readBoolean();
+
+ return new ExchangeRecord(name, typeName, autoDelete);
+ }
+
+ public void objectToEntry(Object object, TupleOutput tupleOutput)
+ {
+ ExchangeRecord exchange = (ExchangeRecord) object;
+
+ AMQShortStringEncoding.writeShortString(exchange.getNameShortString(), tupleOutput);
+ AMQShortStringEncoding.writeShortString(exchange.getType(), tupleOutput);
+
+ tupleOutput.writeBoolean(exchange.isAutoDelete());
+ }
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java
new file mode 100644
index 0000000000..c09498cce3
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java
@@ -0,0 +1,74 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.qpid.framing.FieldTable;
+
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import com.sleepycat.je.DatabaseException;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+public class FieldTableEncoding
+{
+ public static FieldTable readFieldTable(TupleInput tupleInput) throws DatabaseException
+ {
+ long length = tupleInput.readLong();
+ if (length <= 0)
+ {
+ return null;
+ }
+ else
+ {
+
+ byte[] data = new byte[(int)length];
+ tupleInput.readFast(data);
+
+ try
+ {
+ return new FieldTable(new DataInputStream(new ByteArrayInputStream(data)),length);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ }
+
+ public static void writeFieldTable(FieldTable fieldTable, TupleOutput tupleOutput)
+ {
+
+ if (fieldTable == null)
+ {
+ tupleOutput.writeLong(0);
+ }
+ else
+ {
+ tupleOutput.writeLong(fieldTable.getEncodedSize());
+ tupleOutput.writeFast(fieldTable.getDataAsBytes());
+ }
+ }
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageContentKey.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageContentKey.java
new file mode 100644
index 0000000000..005e8d4604
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageContentKey.java
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+public class MessageContentKey
+{
+ private long _messageId;
+
+ public MessageContentKey(long messageId)
+ {
+ _messageId = messageId;
+ }
+
+
+ public long getMessageId()
+ {
+ return _messageId;
+ }
+
+ public void setMessageId(long messageId)
+ {
+ this._messageId = messageId;
+ }
+} \ No newline at end of file
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java
new file mode 100644
index 0000000000..c274fdec8c
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.qpid.framing.AMQShortString;
+
+public class QueueEntryKey
+{
+ private AMQShortString _queueName;
+ private long _messageId;
+
+
+ public QueueEntryKey(AMQShortString queueName, long messageId)
+ {
+ _queueName = queueName;
+ _messageId = messageId;
+ }
+
+
+ public AMQShortString getQueueName()
+ {
+ return _queueName;
+ }
+
+
+ public long getMessageId()
+ {
+ return _messageId;
+ }
+
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_4.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_4.java
new file mode 100644
index 0000000000..30357c97d4
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_4.java
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.keys;
+
+import org.apache.qpid.server.store.berkeleydb.MessageContentKey;
+
+public class MessageContentKey_4 extends MessageContentKey
+{
+ private int _chunkNum;
+
+ public MessageContentKey_4(long messageId, int chunkNo)
+ {
+ super(messageId);
+ _chunkNum = chunkNo;
+ }
+
+ public int getChunk()
+ {
+ return _chunkNum;
+ }
+
+ public void setChunk(int chunk)
+ {
+ this._chunkNum = chunk;
+ }
+} \ No newline at end of file
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_5.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_5.java
new file mode 100644
index 0000000000..a1a7fe80b5
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_5.java
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.keys;
+
+import org.apache.qpid.server.store.berkeleydb.MessageContentKey;
+
+public class MessageContentKey_5 extends MessageContentKey
+{
+ private int _offset;
+
+ public MessageContentKey_5(long messageId, int chunkNo)
+ {
+ super(messageId);
+ _offset = chunkNo;
+ }
+
+ public int getOffset()
+ {
+ return _offset;
+ }
+
+ public void setOffset(int chunk)
+ {
+ this._offset = chunk;
+ }
+} \ No newline at end of file
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/ExchangeRecord.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/ExchangeRecord.java
new file mode 100644
index 0000000000..f20367e33b
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/ExchangeRecord.java
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.records;
+
+import org.apache.qpid.framing.AMQShortString;
+
+public class ExchangeRecord extends Object
+{
+ private final AMQShortString _exchangeName;
+ private final AMQShortString _exchangeType;
+ private final boolean _autoDelete;
+
+ public ExchangeRecord(AMQShortString exchangeName, AMQShortString exchangeType, boolean autoDelete)
+ {
+ _exchangeName = exchangeName;
+ _exchangeType = exchangeType;
+ _autoDelete = autoDelete;
+ }
+
+ public AMQShortString getNameShortString()
+ {
+ return _exchangeName;
+ }
+
+ public AMQShortString getType()
+ {
+ return _exchangeType;
+ }
+
+ public boolean isAutoDelete()
+ {
+ return _autoDelete;
+ }
+
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java
new file mode 100644
index 0000000000..fbe10433ca
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java
@@ -0,0 +1,66 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.records;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+
+public class QueueRecord extends Object
+{
+ private final AMQShortString _queueName;
+ private final AMQShortString _owner;
+ private final FieldTable _arguments;
+ private boolean _exclusive;
+
+ public QueueRecord(AMQShortString queueName, AMQShortString owner, boolean exclusive, FieldTable arguments)
+ {
+ _queueName = queueName;
+ _owner = owner;
+ _exclusive = exclusive;
+ _arguments = arguments;
+ }
+
+ public AMQShortString getNameShortString()
+ {
+ return _queueName;
+ }
+
+ public AMQShortString getOwner()
+ {
+ return _owner;
+ }
+
+ public boolean isExclusive()
+ {
+ return _exclusive;
+ }
+
+ public void setExclusive(boolean exclusive)
+ {
+ _exclusive = exclusive;
+ }
+
+ public FieldTable getArguments()
+ {
+ return _arguments;
+ }
+
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/testclient/BackupTestClient.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/testclient/BackupTestClient.java
new file mode 100644
index 0000000000..f6344b3d7d
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/testclient/BackupTestClient.java
@@ -0,0 +1,120 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.testclient;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.ping.PingDurableClient;
+import org.apache.qpid.server.store.berkeleydb.BDBBackup;
+import org.apache.qpid.util.CommandLineParser;
+
+import java.util.Properties;
+
+/**
+ * BackupTestClient extends {@link PingDurableClient} with an action that takes a BDB backup when a configurable
+ * message count is reached. This enables a test user to restore this beackup, knowing how many committed but undelivered
+ * messages were in the backup, in order to check that all are re-delivered when the backup is retored.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Perform BDB Backup on configurable message count.
+ * </table>
+ */
+public class BackupTestClient extends PingDurableClient
+{
+ /** Used for debugging. */
+ private static final Logger log = Logger.getLogger(BackupTestClient.class);
+
+ /** Holds the from directory to take backups from. */
+ private String fromDir;
+
+ /** Holds the to directory to store backups in. */
+ private String toDir;
+
+ /**
+ * Default constructor, passes all property overrides to the parent.
+ *
+ * @param overrides Any property overrides to apply to the defaults.
+ *
+ * @throws Exception Any underlying exception is allowed to fall through.
+ */
+ BackupTestClient(Properties overrides) throws Exception
+ {
+ super(overrides);
+ }
+
+ /**
+ * Starts the ping/wait/receive process. From and to directory locations for the BDB backups must be specified
+ * on the command line:
+ *
+ * <p/><table><caption>Command Line</caption>
+ * <tr><th> Option <th> Comment
+ * <tr><td> -fromdir <td> The path to the directory to back the bdb log file from.
+ * <tr><td> -todir <td> The path to the directory to save the backed up bdb log files to.
+ * </table>
+ *
+ * @param args The command line arguments.
+ */
+ public static void main(String[] args)
+ {
+ try
+ {
+ // Use the same command line format as BDBBackup utility, (compulsory from and to directories).
+ Properties options =
+ CommandLineParser.processCommandLine(args, new CommandLineParser(BDBBackup.COMMAND_LINE_SPEC),
+ System.getProperties());
+ BackupTestClient pingProducer = new BackupTestClient(options);
+
+ // Keep the from and to directories for backups.
+ pingProducer.fromDir = options.getProperty("fromdir");
+ pingProducer.toDir = options.getProperty("todir");
+
+ // Create a shutdown hook to terminate the ping-pong producer.
+ Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook());
+
+ // Ensure that the ping pong producer is registered to listen for exceptions on the connection too.
+ // pingProducer.getConnection().setExceptionListener(pingProducer);
+
+ // Run the test procedure.
+ int sent = pingProducer.send();
+ pingProducer.waitForUser("Press return to begin receiving the pings.");
+ pingProducer.receive(sent);
+
+ System.exit(0);
+ }
+ catch (Exception e)
+ {
+ System.err.println(e.getMessage());
+ log.error("Top level handler caught execption.", e);
+ System.exit(1);
+ }
+ }
+
+ /**
+ * Supplies a triggered action extension, based on message count. This action takes a BDB log file backup.
+ */
+ public void takeAction()
+ {
+ BDBBackup backupUtil = new BDBBackup();
+ backupUtil.takeBackupNoLock(fromDir, toDir);
+ System.out.println("Took backup of BDB log files from directory: " + fromDir);
+ }
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java
new file mode 100644
index 0000000000..301ee417c5
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java
@@ -0,0 +1,25 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+public interface BindingTuple
+{
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java
new file mode 100644
index 0000000000..8e17f074d7
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import org.apache.qpid.server.store.berkeleydb.BindingKey;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+
+public class BindingTupleBindingFactory extends TupleBindingFactory<BindingKey>
+{
+ public BindingTupleBindingFactory(int version)
+ {
+ super(version);
+ }
+
+ public TupleBinding<BindingKey> getInstance()
+ {
+ switch (_version)
+ {
+ default:
+ case 5:
+ //no change from v4
+ case 4:
+ return new BindingTuple_4();
+ }
+ }
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_4.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_4.java
new file mode 100644
index 0000000000..52b131a7f2
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_4.java
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
+import org.apache.qpid.server.store.berkeleydb.BindingKey;
+import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import com.sleepycat.je.DatabaseException;
+
+public class BindingTuple_4 extends TupleBinding<BindingKey> implements BindingTuple
+{
+ protected static final Logger _log = Logger.getLogger(BindingTuple.class);
+
+ public BindingTuple_4()
+ {
+ super();
+ }
+
+ public BindingKey entryToObject(TupleInput tupleInput)
+ {
+ AMQShortString exchangeName = AMQShortStringEncoding.readShortString(tupleInput);
+ AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
+ AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
+
+ FieldTable arguments;
+
+ // Addition for Version 2 of this table
+ try
+ {
+ arguments = FieldTableEncoding.readFieldTable(tupleInput);
+ }
+ catch (DatabaseException e)
+ {
+ _log.error("Unable to create binding: " + e, e);
+ return null;
+ }
+
+ return new BindingKey(exchangeName, queueName, routingKey, arguments);
+ }
+
+ public void objectToEntry(BindingKey binding, TupleOutput tupleOutput)
+ {
+ AMQShortStringEncoding.writeShortString(binding.getExchangeName(), tupleOutput);
+ AMQShortStringEncoding.writeShortString(binding.getQueueName(), tupleOutput);
+ AMQShortStringEncoding.writeShortString(binding.getRoutingKey(), tupleOutput);
+
+ // Addition for Version 2 of this table
+ FieldTableEncoding.writeFieldTable(binding.getArguments(), tupleOutput);
+ }
+
+} \ No newline at end of file
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_4.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_4.java
new file mode 100644
index 0000000000..f5ba6bbce3
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_4.java
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import org.apache.qpid.server.store.berkeleydb.MessageContentKey;
+import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_4;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+public class MessageContentKeyTB_4 extends TupleBinding<MessageContentKey>
+{
+
+ public MessageContentKey entryToObject(TupleInput tupleInput)
+ {
+ long messageId = tupleInput.readLong();
+ int chunk = tupleInput.readInt();
+ return new MessageContentKey_4(messageId, chunk);
+ }
+
+ public void objectToEntry(MessageContentKey object, TupleOutput tupleOutput)
+ {
+ final MessageContentKey_4 mk = (MessageContentKey_4) object;
+ tupleOutput.writeLong(mk.getMessageId());
+ tupleOutput.writeInt(mk.getChunk());
+ }
+
+} \ No newline at end of file
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_5.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_5.java
new file mode 100644
index 0000000000..e6a2fd23a8
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_5.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import org.apache.qpid.server.store.berkeleydb.MessageContentKey;
+import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_5;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+public class MessageContentKeyTB_5 extends TupleBinding<MessageContentKey>
+{
+ public MessageContentKey entryToObject(TupleInput tupleInput)
+ {
+ long messageId = tupleInput.readLong();
+ int offset = tupleInput.readInt();
+ return new MessageContentKey_5(messageId, offset);
+ }
+
+ public void objectToEntry(MessageContentKey object, TupleOutput tupleOutput)
+ {
+ final MessageContentKey_5 mk = (MessageContentKey_5) object;
+ tupleOutput.writeLong(mk.getMessageId());
+ tupleOutput.writeInt(mk.getOffset());
+ }
+
+} \ No newline at end of file
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTupleBindingFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTupleBindingFactory.java
new file mode 100644
index 0000000000..76ee4f66e4
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTupleBindingFactory.java
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import org.apache.qpid.server.store.berkeleydb.MessageContentKey;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+
+public class MessageContentKeyTupleBindingFactory extends TupleBindingFactory<MessageContentKey>
+{
+ public MessageContentKeyTupleBindingFactory(int version)
+ {
+ super(version);
+ }
+
+ public TupleBinding<MessageContentKey> getInstance()
+ {
+ switch (_version)
+ {
+ default:
+ case 5:
+ return new MessageContentKeyTB_5();
+ case 4:
+ return new MessageContentKeyTB_4();
+ }
+ }
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_4.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_4.java
new file mode 100644
index 0000000000..e26b544e38
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_4.java
@@ -0,0 +1,162 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
+
+import java.io.*;
+
+/**
+ * Handles the mapping to and from 0-8/0-9 message meta data
+ */
+public class MessageMetaDataTB_4 extends TupleBinding<Object>
+{
+ private static final Logger _log = Logger.getLogger(MessageMetaDataTB_4.class);
+
+ public MessageMetaDataTB_4()
+ {
+ }
+
+ public Object entryToObject(TupleInput tupleInput)
+ {
+ try
+ {
+ final MessagePublishInfo publishBody = readMessagePublishInfo(tupleInput);
+ final ContentHeaderBody contentHeaderBody = readContentHeaderBody(tupleInput);
+ final int contentChunkCount = tupleInput.readInt();
+
+ return new MessageMetaData(publishBody, contentHeaderBody, contentChunkCount);
+ }
+ catch (Exception e)
+ {
+ _log.error("Error converting entry to object: " + e, e);
+ // annoyingly just have to return null since we cannot throw
+ return null;
+ }
+ }
+
+ public void objectToEntry(Object object, TupleOutput tupleOutput)
+ {
+ MessageMetaData message = (MessageMetaData) object;
+ try
+ {
+ writeMessagePublishInfo(message.getMessagePublishInfo(), tupleOutput);
+ }
+ catch (AMQException e)
+ {
+ // can't do anything else since the BDB interface precludes throwing any exceptions
+ // in practice we should never get an exception
+ throw new RuntimeException("Error converting object to entry: " + e, e);
+ }
+ writeContentHeader(message.getContentHeaderBody(), tupleOutput);
+ tupleOutput.writeInt(message.getContentChunkCount());
+ }
+
+ private MessagePublishInfo readMessagePublishInfo(TupleInput tupleInput)
+ {
+
+ final AMQShortString exchange = AMQShortStringEncoding.readShortString(tupleInput);
+ final AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
+ final boolean mandatory = tupleInput.readBoolean();
+ final boolean immediate = tupleInput.readBoolean();
+
+ return new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return exchange;
+ }
+
+ public void setExchange(AMQShortString exchange)
+ {
+
+ }
+
+ public boolean isImmediate()
+ {
+ return immediate;
+ }
+
+ public boolean isMandatory()
+ {
+ return mandatory;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return routingKey;
+ }
+ } ;
+
+ }
+
+ private ContentHeaderBody readContentHeaderBody(TupleInput tupleInput) throws AMQFrameDecodingException, AMQProtocolVersionException
+ {
+ int bodySize = tupleInput.readInt();
+ byte[] underlying = new byte[bodySize];
+ tupleInput.readFast(underlying);
+
+ try
+ {
+ return ContentHeaderBody.createFromBuffer(new DataInputStream(new ByteArrayInputStream(underlying)), bodySize);
+ }
+ catch (IOException e)
+ {
+ throw new AMQFrameDecodingException(null, e.getMessage(), e);
+ }
+ }
+
+ private void writeMessagePublishInfo(MessagePublishInfo publishBody, TupleOutput tupleOutput) throws AMQException
+ {
+
+ AMQShortStringEncoding.writeShortString(publishBody.getExchange(), tupleOutput);
+ AMQShortStringEncoding.writeShortString(publishBody.getRoutingKey(), tupleOutput);
+ tupleOutput.writeBoolean(publishBody.isMandatory());
+ tupleOutput.writeBoolean(publishBody.isImmediate());
+ }
+
+ private void writeContentHeader(ContentHeaderBody headerBody, TupleOutput tupleOutput)
+ {
+ // write out the content header body
+ final int bodySize = headerBody.getSize();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(bodySize);
+ try
+ {
+ headerBody.writePayload(new DataOutputStream(baos));
+ tupleOutput.writeInt(bodySize);
+ tupleOutput.writeFast(baos.toByteArray());
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ }
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_5.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_5.java
new file mode 100644
index 0000000000..6dc041cb23
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_5.java
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.store.MessageMetaDataType;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+
+/**
+ * Handles the mapping to and from message meta data
+ */
+public class MessageMetaDataTB_5 extends MessageMetaDataTB_4
+{
+ private static final Logger _log = Logger.getLogger(MessageMetaDataTB_5.class);
+
+ @Override
+ public Object entryToObject(TupleInput tupleInput)
+ {
+ try
+ {
+ final int bodySize = tupleInput.readInt();
+ byte[] dataAsBytes = new byte[bodySize];
+ tupleInput.readFast(dataAsBytes);
+
+ java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(dataAsBytes);
+ buf.position(1);
+ buf = buf.slice();
+ MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]];
+ StorableMessageMetaData metaData = type.getFactory().createMetaData(buf);
+
+ return metaData;
+ }
+ catch (Exception e)
+ {
+ _log.error("Error converting entry to object: " + e, e);
+ // annoyingly just have to return null since we cannot throw
+ return null;
+ }
+ }
+
+ @Override
+ public void objectToEntry(Object object, TupleOutput tupleOutput)
+ {
+ StorableMessageMetaData metaData = (StorableMessageMetaData) object;
+
+ final int bodySize = 1 + metaData.getStorableSize();
+ byte[] underlying = new byte[bodySize];
+ underlying[0] = (byte) metaData.getType().ordinal();
+ java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(underlying);
+ buf.position(1);
+ buf = buf.slice();
+
+ metaData.writeToBuffer(0, buf);
+ tupleOutput.writeInt(bodySize);
+ tupleOutput.writeFast(underlying);
+ }
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java
new file mode 100644
index 0000000000..40153c13ea
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+
+public class MessageMetaDataTupleBindingFactory extends TupleBindingFactory<Object>
+{
+ public MessageMetaDataTupleBindingFactory(int version)
+ {
+ super(version);
+ }
+
+ public TupleBinding<Object> getInstance()
+ {
+ switch (_version)
+ {
+ default:
+ case 5:
+ return new MessageMetaDataTB_5();
+ case 4:
+ return new MessageMetaDataTB_4();
+ }
+ }
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java
new file mode 100644
index 0000000000..975e558874
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
+import org.apache.qpid.server.store.berkeleydb.QueueEntryKey;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+public class QueueEntryTB extends TupleBinding<QueueEntryKey>
+{
+ public QueueEntryKey entryToObject(TupleInput tupleInput)
+ {
+ AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
+ Long messageId = tupleInput.readLong();
+
+ return new QueueEntryKey(queueName, messageId);
+ }
+
+ public void objectToEntry(QueueEntryKey mk, TupleOutput tupleOutput)
+ {
+ AMQShortStringEncoding.writeShortString(mk.getQueueName(),tupleOutput);
+ tupleOutput.writeLong(mk.getMessageId());
+ }
+} \ No newline at end of file
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java
new file mode 100644
index 0000000000..affa9a271d
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java
@@ -0,0 +1,25 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+public interface QueueTuple
+{
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java
new file mode 100644
index 0000000000..512e319f96
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+
+public class QueueTupleBindingFactory extends TupleBindingFactory<QueueRecord>
+{
+
+ public QueueTupleBindingFactory(int version)
+ {
+ super(version);
+ }
+
+ public TupleBinding<QueueRecord> getInstance()
+ {
+ switch (_version)
+ {
+ default:
+ case 5:
+ return new QueueTuple_5();
+ case 4:
+ return new QueueTuple_4();
+ }
+ }
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_4.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_4.java
new file mode 100644
index 0000000000..347eecf08e
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_4.java
@@ -0,0 +1,72 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import com.sleepycat.je.DatabaseException;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
+import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
+import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+
+public class QueueTuple_4 extends TupleBinding<QueueRecord> implements QueueTuple
+{
+ protected static final Logger _logger = Logger.getLogger(QueueTuple_4.class);
+
+ protected FieldTable _arguments;
+
+ public QueueTuple_4()
+ {
+ super();
+ }
+
+ public QueueRecord entryToObject(TupleInput tupleInput)
+ {
+ try
+ {
+ AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
+ AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput);
+ // Addition for Version 2 of this table, read the queue arguments
+ FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput);
+
+ return new QueueRecord(name, owner, false, arguments);
+ }
+ catch (DatabaseException e)
+ {
+ _logger.error("Unable to create binding: " + e, e);
+ return null;
+ }
+
+ }
+
+ public void objectToEntry(QueueRecord queue, TupleOutput tupleOutput)
+ {
+ AMQShortStringEncoding.writeShortString(queue.getNameShortString(), tupleOutput);
+ AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput);
+ // Addition for Version 2 of this table, store the queue arguments
+ FieldTableEncoding.writeFieldTable(queue.getArguments(), tupleOutput);
+ }
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java
new file mode 100644
index 0000000000..0f293b79b7
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java
@@ -0,0 +1,75 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import com.sleepycat.je.DatabaseException;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
+import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
+import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+
+public class QueueTuple_5 extends QueueTuple_4
+{
+ protected static final Logger _logger = Logger.getLogger(QueueTuple_5.class);
+
+ protected FieldTable _arguments;
+
+ public QueueTuple_5()
+ {
+ super();
+ }
+
+ public QueueRecord entryToObject(TupleInput tupleInput)
+ {
+ try
+ {
+ AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
+ AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput);
+ // Addition for Version 2 of this table, read the queue arguments
+ FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput);
+ // Addition for Version 3 of this table, read the queue exclusivity
+ boolean exclusive = tupleInput.readBoolean();
+
+ return new QueueRecord(name, owner, exclusive, arguments);
+ }
+ catch (DatabaseException e)
+ {
+ _logger.error("Unable to create binding: " + e, e);
+ return null;
+ }
+
+ }
+
+ public void objectToEntry(QueueRecord queue, TupleOutput tupleOutput)
+ {
+ AMQShortStringEncoding.writeShortString(queue.getNameShortString(), tupleOutput);
+ AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput);
+ // Addition for Version 2 of this table, store the queue arguments
+ FieldTableEncoding.writeFieldTable(queue.getArguments(), tupleOutput);
+ // Addition for Version 3 of this table, store the queue exclusivity
+ tupleOutput.writeBoolean(queue.isExclusive());
+ }
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java
new file mode 100644
index 0000000000..2adac1f9a3
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+
+public abstract class TupleBindingFactory<E>
+{
+ protected int _version;
+
+ public TupleBindingFactory(int version)
+ {
+ _version = version;
+ }
+
+ public abstract TupleBinding<E> getInstance();
+}
diff --git a/qpid/java/bdbstore/src/resources/backup-log4j.xml b/qpid/java/bdbstore/src/resources/backup-log4j.xml
new file mode 100644
index 0000000000..6b0619f0b6
--- /dev/null
+++ b/qpid/java/bdbstore/src/resources/backup-log4j.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements. See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+ -->
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<!-- =============================================================================== -->
+<!-- This is a Log4j configuration specially created for the BDB Backup utility, -->
+<!-- it outputs logging to the console for specifically designated console loggers -->
+<!-- at info level or above only. This avoids spamming the user with any internals -->
+<!-- of the Qpid code. -->
+<!-- Use a different logging set up to capture debugging output to diagnose errors. -->
+<!-- =============================================================================== -->
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false">
+
+ <!-- ====================================================== -->
+ <!-- Append messages to the console at info level or above. -->
+ <!-- ====================================================== -->
+
+ <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
+ <param name="Target" value="System.out"/>
+ <param name="Threshold" value="info"/>
+
+ <layout class="org.apache.log4j.PatternLayout">
+ <!-- The default pattern: Date Priority [Category] Message\n -->
+ <param name="ConversionPattern" value="%m%n"/>
+ </layout>
+
+ </appender>
+
+ <!-- ================ -->
+ <!-- Limit categories -->
+ <!-- ================ -->
+
+ <category name="org.apache.qpid.server.store.berkeleydb.BDBBackup">
+ <priority value="info"/>
+ </category>
+
+ <!-- ======================= -->
+ <!-- Setup the Root category -->
+ <!-- ======================= -->
+
+ <root>
+ <appender-ref ref="CONSOLE"/>
+ </root>
+
+</log4j:configuration>
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncodingTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncodingTest.java
new file mode 100644
index 0000000000..d076babc61
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncodingTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.qpid.framing.AMQShortString;
+
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+import junit.framework.TestCase;
+
+/**
+ * Tests for {@code AMQShortStringEncoding} including corner cases when string
+ * is null or over 127 characters in length
+ */
+public class AMQShortStringEncodingTest extends TestCase
+{
+
+ public void testWriteReadNullValues()
+ {
+ // write into tuple output
+ TupleOutput tupleOutput = new TupleOutput();
+ AMQShortStringEncoding.writeShortString(null, tupleOutput);
+ byte[] data = tupleOutput.getBufferBytes();
+
+ // read from tuple input
+ TupleInput tupleInput = new TupleInput(data);
+ AMQShortString result = AMQShortStringEncoding.readShortString(tupleInput);
+ assertNull("Expected null but got " + result, result);
+ }
+
+ public void testWriteReadShortStringWithLengthOver127()
+ {
+ AMQShortString value = createString('a', 128);
+
+ // write into tuple output
+ TupleOutput tupleOutput = new TupleOutput();
+ AMQShortStringEncoding.writeShortString(value, tupleOutput);
+ byte[] data = tupleOutput.getBufferBytes();
+
+ // read from tuple input
+ TupleInput tupleInput = new TupleInput(data);
+ AMQShortString result = AMQShortStringEncoding.readShortString(tupleInput);
+ assertEquals("Expected " + value + " but got " + result, value, result);
+ }
+
+ public void testWriteReadShortStringWithLengthLess127()
+ {
+ AMQShortString value = new AMQShortString("test");
+
+ // write into tuple output
+ TupleOutput tupleOutput = new TupleOutput();
+ AMQShortStringEncoding.writeShortString(value, tupleOutput);
+ byte[] data = tupleOutput.getBufferBytes();
+
+ // read from tuple input
+ TupleInput tupleInput = new TupleInput(data);
+ AMQShortString result = AMQShortStringEncoding.readShortString(tupleInput);
+ assertEquals("Expected " + value + " but got " + result, value, result);
+ }
+
+ private AMQShortString createString(char ch, int length)
+ {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < length; i++)
+ {
+ sb.append(ch);
+ }
+ return new AMQShortString(sb.toString());
+ }
+
+}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
new file mode 100644
index 0000000000..ef31b78cfe
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -0,0 +1,470 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.server.message.MessageMetaData_0_10;
+import org.apache.qpid.server.store.MessageMetaDataType;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.MessageDeliveryMode;
+import org.apache.qpid.transport.MessageDeliveryPriority;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.MessageTransfer;
+
+/**
+ * Subclass of MessageStoreTest which runs the standard tests from the superclass against
+ * the BDB Store as well as additional tests specific to the DBB store-implementation.
+ */
+public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageStoreTest
+{
+ /**
+ * Tests that message metadata and content are successfully read back from a
+ * store after it has been reloaded. Both 0-8 and 0-10 metadata is used to
+ * verify their ability to co-exist within the store and be successful retrieved.
+ */
+ public void testBDBMessagePersistence() throws Exception
+ {
+ MessageStore store = getVirtualHost().getMessageStore();
+
+ BDBMessageStore bdbStore = assertBDBStore(store);
+
+ // Create content ByteBuffers.
+ // Split the content into 2 chunks for the 0-8 message, as per broker behaviour.
+ // Use a single chunk for the 0-10 message as per broker behaviour.
+ String bodyText = "jfhdjsflsdhfjdshfjdslhfjdslhfsjlhfsjkhfdsjkhfdsjkfhdslkjf";
+
+ ByteBuffer firstContentBytes_0_8 = ByteBuffer.wrap(bodyText.substring(0, 10).getBytes());
+ ByteBuffer secondContentBytes_0_8 = ByteBuffer.wrap(bodyText.substring(10).getBytes());
+
+ ByteBuffer completeContentBody_0_10 = ByteBuffer.wrap(bodyText.getBytes());
+ int bodySize = completeContentBody_0_10.limit();
+
+ /*
+ * Create and insert a 0-8 message (metadata and multi-chunk content)
+ */
+ MessagePublishInfo pubInfoBody_0_8 = createPublishInfoBody_0_8();
+ BasicContentHeaderProperties props_0_8 = createContentHeaderProperties_0_8();
+
+ ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize);
+
+ MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8, 0);
+ StoredMessage<MessageMetaData> storedMessage_0_8 = bdbStore.addMessage(messageMetaData_0_8);
+
+ long origArrivalTime_0_8 = messageMetaData_0_8.getArrivalTime();
+ long messageid_0_8 = storedMessage_0_8.getMessageNumber();
+
+ storedMessage_0_8.addContent(0, firstContentBytes_0_8);
+ storedMessage_0_8.addContent(firstContentBytes_0_8.limit(), secondContentBytes_0_8);
+ storedMessage_0_8.flushToStore();
+
+ /*
+ * Create and insert a 0-10 message (metadata and content)
+ */
+ MessageProperties msgProps_0_10 = createMessageProperties_0_10(bodySize);
+ DeliveryProperties delProps_0_10 = createDeliveryProperties_0_10();
+ Header header_0_10 = new Header(msgProps_0_10, delProps_0_10);
+
+ MessageTransfer xfr_0_10 = new MessageTransfer("destination", MessageAcceptMode.EXPLICIT,
+ MessageAcquireMode.PRE_ACQUIRED, header_0_10, completeContentBody_0_10);
+
+ MessageMetaData_0_10 messageMetaData_0_10 = new MessageMetaData_0_10(xfr_0_10);
+ StoredMessage<MessageMetaData_0_10> storedMessage_0_10 = bdbStore.addMessage(messageMetaData_0_10);
+
+ long origArrivalTime_0_10 = messageMetaData_0_10.getArrivalTime();
+ long messageid_0_10 = storedMessage_0_10.getMessageNumber();
+
+ storedMessage_0_10.addContent(0, completeContentBody_0_10);
+ storedMessage_0_10.flushToStore();
+
+ /*
+ * reload the store only (read-only)
+ */
+ bdbStore = reloadStoreReadOnly(bdbStore);
+
+ /*
+ * Read back and validate the 0-8 message metadata and content
+ */
+ StorableMessageMetaData storeableMMD_0_8 = bdbStore.getMessageMetaData(messageid_0_8);
+
+ assertEquals("Unexpected message type",MessageMetaDataType.META_DATA_0_8, storeableMMD_0_8.getType());
+ assertTrue("Unexpected instance type", storeableMMD_0_8 instanceof MessageMetaData);
+ MessageMetaData returnedMMD_0_8 = (MessageMetaData) storeableMMD_0_8;
+
+ assertEquals("Message arrival time has changed", origArrivalTime_0_8, returnedMMD_0_8.getArrivalTime());
+
+ MessagePublishInfo returnedPubBody_0_8 = returnedMMD_0_8.getMessagePublishInfo();
+ assertEquals("Message exchange has changed", pubInfoBody_0_8.getExchange(), returnedPubBody_0_8.getExchange());
+ assertEquals("Immediate flag has changed", pubInfoBody_0_8.isImmediate(), returnedPubBody_0_8.isImmediate());
+ assertEquals("Mandatory flag has changed", pubInfoBody_0_8.isMandatory(), returnedPubBody_0_8.isMandatory());
+ assertEquals("Routing key has changed", pubInfoBody_0_8.getRoutingKey(), returnedPubBody_0_8.getRoutingKey());
+
+ ContentHeaderBody returnedHeaderBody_0_8 = returnedMMD_0_8.getContentHeaderBody();
+ assertEquals("ContentHeader ClassID has changed", chb_0_8.classId, returnedHeaderBody_0_8.classId);
+ assertEquals("ContentHeader weight has changed", chb_0_8.weight, returnedHeaderBody_0_8.weight);
+ assertEquals("ContentHeader bodySize has changed", chb_0_8.bodySize, returnedHeaderBody_0_8.bodySize);
+
+ BasicContentHeaderProperties returnedProperties_0_8 = (BasicContentHeaderProperties) returnedHeaderBody_0_8.getProperties();
+ assertEquals("Property ContentType has changed", props_0_8.getContentTypeAsString(), returnedProperties_0_8.getContentTypeAsString());
+ assertEquals("Property MessageID has changed", props_0_8.getMessageIdAsString(), returnedProperties_0_8.getMessageIdAsString());
+
+ ByteBuffer recoveredContent_0_8 = ByteBuffer.allocate((int) chb_0_8.bodySize) ;
+ long recoveredCount_0_8 = bdbStore.getContent(messageid_0_8, 0, recoveredContent_0_8);
+ assertEquals("Incorrect amount of payload data recovered", chb_0_8.bodySize, recoveredCount_0_8);
+ String returnedPayloadString_0_8 = new String(recoveredContent_0_8.array());
+ assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_8);
+
+ /*
+ * Read back and validate the 0-10 message metadata and content
+ */
+ StorableMessageMetaData storeableMMD_0_10 = bdbStore.getMessageMetaData(messageid_0_10);
+
+ assertEquals("Unexpected message type",MessageMetaDataType.META_DATA_0_10, storeableMMD_0_10.getType());
+ assertTrue("Unexpected instance type", storeableMMD_0_10 instanceof MessageMetaData_0_10);
+ MessageMetaData_0_10 returnedMMD_0_10 = (MessageMetaData_0_10) storeableMMD_0_10;
+
+ assertEquals("Message arrival time has changed", origArrivalTime_0_10, returnedMMD_0_10.getArrivalTime());
+
+ DeliveryProperties returnedDelProps_0_10 = returnedMMD_0_10.getHeader().get(DeliveryProperties.class);
+ assertNotNull("DeliveryProperties were not returned", returnedDelProps_0_10);
+ assertEquals("Immediate flag has changed", delProps_0_10.getImmediate(), returnedDelProps_0_10.getImmediate());
+ assertEquals("Routing key has changed", delProps_0_10.getRoutingKey(), returnedDelProps_0_10.getRoutingKey());
+ assertEquals("Message exchange has changed", delProps_0_10.getExchange(), returnedDelProps_0_10.getExchange());
+ assertEquals("Message expiration has changed", delProps_0_10.getExpiration(), returnedDelProps_0_10.getExpiration());
+ assertEquals("Message delivery priority has changed", delProps_0_10.getPriority(), returnedDelProps_0_10.getPriority());
+
+ MessageProperties returnedMsgProps = returnedMMD_0_10.getHeader().get(MessageProperties.class);
+ assertNotNull("MessageProperties were not returned", returnedMsgProps);
+ assertTrue("Message correlationID has changed", Arrays.equals(msgProps_0_10.getCorrelationId(), returnedMsgProps.getCorrelationId()));
+ assertEquals("Message content length has changed", msgProps_0_10.getContentLength(), returnedMsgProps.getContentLength());
+ assertEquals("Message content type has changed", msgProps_0_10.getContentType(), returnedMsgProps.getContentType());
+
+ ByteBuffer recoveredContent = ByteBuffer.allocate((int) msgProps_0_10.getContentLength()) ;
+ long recoveredCount = bdbStore.getContent(messageid_0_10, 0, recoveredContent);
+ assertEquals("Incorrect amount of payload data recovered", msgProps_0_10.getContentLength(), recoveredCount);
+
+ String returnedPayloadString_0_10 = new String(recoveredContent.array());
+ assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_10);
+ }
+
+ private DeliveryProperties createDeliveryProperties_0_10()
+ {
+ DeliveryProperties delProps_0_10 = new DeliveryProperties();
+
+ delProps_0_10.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
+ delProps_0_10.setImmediate(true);
+ delProps_0_10.setExchange("exchange12345");
+ delProps_0_10.setRoutingKey("routingKey12345");
+ delProps_0_10.setExpiration(5);
+ delProps_0_10.setPriority(MessageDeliveryPriority.ABOVE_AVERAGE);
+
+ return delProps_0_10;
+ }
+
+ private MessageProperties createMessageProperties_0_10(int bodySize)
+ {
+ MessageProperties msgProps_0_10 = new MessageProperties();
+ msgProps_0_10.setContentLength(bodySize);
+ msgProps_0_10.setCorrelationId("qwerty".getBytes());
+ msgProps_0_10.setContentType("text/html");
+
+ return msgProps_0_10;
+ }
+
+ /**
+ * Close the provided store and create a new (read-only) store to read back the data.
+ *
+ * Use this method instead of reloading the virtual host like other tests in order
+ * to avoid the recovery handler deleting the message for not being on a queue.
+ */
+ private BDBMessageStore reloadStoreReadOnly(BDBMessageStore messageStore) throws Exception
+ {
+ messageStore.close();
+ File storePath = new File(String.valueOf(_config.getProperty("store.environment-path")));
+
+ BDBMessageStore newStore = new BDBMessageStore();
+ newStore.configure(storePath, false);
+ newStore.start();
+
+ return newStore;
+ }
+
+ private MessagePublishInfo createPublishInfoBody_0_8()
+ {
+ return new MessagePublishInfo()
+ {
+ public AMQShortString getExchange()
+ {
+ return new AMQShortString("exchange12345");
+ }
+
+ public void setExchange(AMQShortString exchange)
+ {
+ }
+
+ public boolean isImmediate()
+ {
+ return false;
+ }
+
+ public boolean isMandatory()
+ {
+ return true;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return new AMQShortString("routingKey12345");
+ }
+ };
+
+ }
+
+ private ContentHeaderBody createContentHeaderBody_0_8(BasicContentHeaderProperties props, int length)
+ {
+ MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+ int classForBasic = methodRegistry.createBasicQosOkBody().getClazz();
+ return new ContentHeaderBody(classForBasic, 1, props, length);
+ }
+
+ private BasicContentHeaderProperties createContentHeaderProperties_0_8()
+ {
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+ props.setDeliveryMode(Integer.valueOf(BasicContentHeaderProperties.PERSISTENT).byteValue());
+ props.setContentType("text/html");
+ props.getHeaders().setString("Test", "MST");
+ return props;
+ }
+
+ /**
+ * Tests that messages which are added to the store and then removed using the
+ * public MessageStore interfaces are actually removed from the store by then
+ * interrogating the store with its own implementation methods and verifying
+ * expected exceptions are thrown to indicate the message is not present.
+ */
+ public void testMessageCreationAndRemoval() throws Exception
+ {
+ MessageStore store = getVirtualHost().getMessageStore();
+ BDBMessageStore bdbStore = assertBDBStore(store);
+
+ StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreMultiChunkMessage_0_8(store);
+ long messageid_0_8 = storedMessage_0_8.getMessageNumber();
+
+ //remove the message in the fashion the broker normally would
+ storedMessage_0_8.remove();
+
+ //verify the removal using the BDB store implementation methods directly
+ try
+ {
+ // the next line should throw since the message id should not be found
+ bdbStore.getMessageMetaData(messageid_0_8);
+ fail("No exception thrown when message id not found getting metadata");
+ }
+ catch (AMQStoreException e)
+ {
+ // pass since exception expected
+ }
+
+ //expecting no content, allocate a 1 byte
+ ByteBuffer dst = ByteBuffer.allocate(1);
+
+ assertEquals("Retrieved content when none was expected",
+ 0, bdbStore.getContent(messageid_0_8, 0, dst));
+ }
+
+ private BDBMessageStore assertBDBStore(Object store)
+ {
+ if(!(store instanceof BDBMessageStore))
+ {
+ fail("Test requires an instance of BDBMessageStore to proceed");
+ }
+
+ return (BDBMessageStore) store;
+ }
+
+ private StoredMessage<MessageMetaData> createAndStoreMultiChunkMessage_0_8(MessageStore store)
+ {
+ byte[] body10Bytes = "0123456789".getBytes();
+ byte[] body5Bytes = "01234".getBytes();
+
+ ByteBuffer chunk1 = ByteBuffer.wrap(body10Bytes);
+ ByteBuffer chunk2 = ByteBuffer.wrap(body5Bytes);
+
+ int bodySize = body10Bytes.length + body5Bytes.length;
+
+ //create and store the message using the MessageStore interface
+ MessagePublishInfo pubInfoBody_0_8 = createPublishInfoBody_0_8();
+ BasicContentHeaderProperties props_0_8 = createContentHeaderProperties_0_8();
+
+ ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize);
+
+ MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8, 0);
+ StoredMessage<MessageMetaData> storedMessage_0_8 = store.addMessage(messageMetaData_0_8);
+
+ storedMessage_0_8.addContent(0, chunk1);
+ storedMessage_0_8.addContent(chunk1.limit(), chunk2);
+ storedMessage_0_8.flushToStore();
+
+ return storedMessage_0_8;
+ }
+
+ /**
+ * Tests transaction commit by utilising the enqueue and dequeue methods available
+ * in the TransactionLog interface implemented by the store, and verifying the
+ * behaviour using BDB implementation methods.
+ */
+ public void testTranCommit() throws Exception
+ {
+ TransactionLog log = getVirtualHost().getTransactionLog();
+
+ BDBMessageStore bdbStore = assertBDBStore(log);
+
+ final AMQShortString mockQueueName = new AMQShortString("queueName");
+
+ TransactionLogResource mockQueue = new TransactionLogResource()
+ {
+ public String getResourceName()
+ {
+ return mockQueueName.asString();
+ }
+ };
+
+ TransactionLog.Transaction txn = log.newTransaction();
+
+ txn.enqueueMessage(mockQueue, 1L);
+ txn.enqueueMessage(mockQueue, 5L);
+ txn.commitTran();
+
+ List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName);
+
+ assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
+ Long val = enqueuedIds.get(0);
+ assertEquals("First Message is incorrect", 1L, val.longValue());
+ val = enqueuedIds.get(1);
+ assertEquals("Second Message is incorrect", 5L, val.longValue());
+ }
+
+
+ /**
+ * Tests transaction rollback before a commit has occurred by utilising the
+ * enqueue and dequeue methods available in the TransactionLog interface
+ * implemented by the store, and verifying the behaviour using BDB
+ * implementation methods.
+ */
+ public void testTranRollbackBeforeCommit() throws Exception
+ {
+ TransactionLog log = getVirtualHost().getTransactionLog();
+
+ BDBMessageStore bdbStore = assertBDBStore(log);
+
+ final AMQShortString mockQueueName = new AMQShortString("queueName");
+
+ TransactionLogResource mockQueue = new TransactionLogResource()
+ {
+ public String getResourceName()
+ {
+ return mockQueueName.asString();
+ }
+ };
+
+ TransactionLog.Transaction txn = log.newTransaction();
+
+ txn.enqueueMessage(mockQueue, 21L);
+ txn.abortTran();
+
+ txn = log.newTransaction();
+ txn.enqueueMessage(mockQueue, 22L);
+ txn.enqueueMessage(mockQueue, 23L);
+ txn.commitTran();
+
+ List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName);
+
+ assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
+ Long val = enqueuedIds.get(0);
+ assertEquals("First Message is incorrect", 22L, val.longValue());
+ val = enqueuedIds.get(1);
+ assertEquals("Second Message is incorrect", 23L, val.longValue());
+ }
+
+ /**
+ * Tests transaction rollback after a commit has occurred by utilising the
+ * enqueue and dequeue methods available in the TransactionLog interface
+ * implemented by the store, and verifying the behaviour using BDB
+ * implementation methods.
+ */
+ public void testTranRollbackAfterCommit() throws Exception
+ {
+ TransactionLog log = getVirtualHost().getTransactionLog();
+
+ BDBMessageStore bdbStore = assertBDBStore(log);
+
+ final AMQShortString mockQueueName = new AMQShortString("queueName");
+
+ TransactionLogResource mockQueue = new TransactionLogResource()
+ {
+ public String getResourceName()
+ {
+ return mockQueueName.asString();
+ }
+ };
+
+ TransactionLog.Transaction txn = log.newTransaction();
+
+ txn.enqueueMessage(mockQueue, 30L);
+ txn.commitTran();
+
+ txn = log.newTransaction();
+ txn.enqueueMessage(mockQueue, 31L);
+ txn.abortTran();
+
+ txn = log.newTransaction();
+ txn.enqueueMessage(mockQueue, 32L);
+ txn.commitTran();
+
+ List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName);
+
+ assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
+ Long val = enqueuedIds.get(0);
+ assertEquals("First Message is incorrect", 30L, val.longValue());
+ val = enqueuedIds.get(1);
+ assertEquals("Second Message is incorrect", 32L, val.longValue());
+ }
+
+}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java
new file mode 100644
index 0000000000..cc19bcf5d8
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java
@@ -0,0 +1,232 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.url.URLSyntaxException;
+
+/**
+ * Prepares an older version brokers BDB store with the required
+ * contents for use in the BDBStoreUpgradeTest.
+ *
+ * The store will then be used to verify that the upgraded is
+ * completed properly and that once upgraded it functions as
+ * expected with the new broker.
+ */
+public class BDBStoreUpgradeTestPreparer extends TestCase
+{
+ public static final String TOPIC_NAME="myUpgradeTopic";
+ public static final String SUB_NAME="myDurSubName";
+ public static final String QUEUE_NAME="myUpgradeQueue";
+
+ private static AMQConnectionFactory _connFac;
+ private static final String CONN_URL =
+ "amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672'";
+
+ /**
+ * Create a BDBStoreUpgradeTestPreparer instance
+ */
+ public BDBStoreUpgradeTestPreparer () throws URLSyntaxException
+ {
+ _connFac = new AMQConnectionFactory(CONN_URL);
+ }
+
+ /**
+ * Utility test method to allow running the preparation tool
+ * using the test framework
+ */
+ public void testPrepareBroker() throws Exception
+ {
+ prepareBroker();
+ }
+
+ private void prepareBroker() throws Exception
+ {
+ prepareQueues();
+ prepareDurableSubscription();
+ }
+
+ /**
+ * Prepare a queue for use in testing message and binding recovery
+ * after the upgrade is performed.
+ *
+ * - Create a transacted session on the connection.
+ * - Use a consumer to create the (durable by default) queue.
+ * - Send 5 large messages to test (multi-frame) content recovery.
+ * - Send 1 small message to test (single-frame) content recovery.
+ * - Commit the session.
+ * - Send 5 small messages to test that uncommitted messages are not recovered.
+ * following the upgrade.
+ * - Close the session.
+ */
+ private void prepareQueues() throws Exception
+ {
+ // Create a connection
+ Connection connection = _connFac.createConnection();
+ connection.start();
+ connection.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException e)
+ {
+ e.printStackTrace();
+ }
+ });
+ // Create a session on the connection, transacted to confirm delivery
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue(QUEUE_NAME);
+ // Create a consumer to ensure the queue gets created
+ // (and enter it into the store, as queues are made durable by default)
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ messageConsumer.close();
+
+ // Create a Message producer
+ MessageProducer messageProducer = session.createProducer(queue);
+
+ // Publish 5 persistent messages, 256k chars to ensure they are multi-frame
+ sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 256*1024, 5);
+ // Publish 5 persistent messages, 1k chars to ensure they are single-frame
+ sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 1*1024, 5);
+
+ session.commit();
+
+ // Publish 5 persistent messages which will NOT be committed and so should be 'lost'
+ sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 1*1024, 5);
+
+ session.close();
+ connection.close();
+ }
+
+ /**
+ * Prepare a DurableSubscription backing queue for use in testing selector
+ * recovery and queue exclusivity marking during the upgrade process.
+ *
+ * - Create a transacted session on the connection.
+ * - Open and close a DurableSubscription with selector to create the backing queue.
+ * - Send a message which matches the selector.
+ * - Send a message which does not match the selector.
+ * - Send a message which matches the selector but will remain uncommitted.
+ * - Close the session.
+ */
+ private void prepareDurableSubscription() throws Exception
+ {
+
+ // Create a connection
+ TopicConnection connection = _connFac.createTopicConnection();
+ connection.start();
+ connection.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException e)
+ {
+ e.printStackTrace();
+ }
+ });
+ // Create a session on the connection, transacted to confirm delivery
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Topic topic = session.createTopic(TOPIC_NAME);
+
+ // Create and register a durable subscriber with selector and then close it
+ TopicSubscriber durSub1 = session.createDurableSubscriber(topic, SUB_NAME,"testprop='true'", false);
+ durSub1.close();
+
+ // Create a publisher and send a persistent message which matches the selector
+ // followed by one that does not match, and another which matches but is not
+ // committed and so should be 'lost'
+ TopicSession pubSession = connection.createTopicSession(true, Session.SESSION_TRANSACTED);
+ TopicPublisher publisher = pubSession.createPublisher(topic);
+
+ publishMessages(session, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "true");
+ publishMessages(session, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "false");
+ pubSession.commit();
+ publishMessages(session, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "true");
+
+ publisher.close();
+ pubSession.close();
+
+ }
+
+ public static void sendMessages(Session session, MessageProducer messageProducer,
+ Destination dest, int deliveryMode, int length, int numMesages) throws JMSException
+ {
+ for (int i = 1; i <= numMesages; i++)
+ {
+ Message message = session.createTextMessage(generateString(length));
+ message.setIntProperty("ID", i);
+ messageProducer.send(message, deliveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+ }
+ }
+
+ public static void publishMessages(Session session, TopicPublisher publisher,
+ Destination dest, int deliveryMode, int length, int numMesages, String selectorProperty) throws JMSException
+ {
+ for (int i = 1; i <= numMesages; i++)
+ {
+ Message message = session.createTextMessage(generateString(length));
+ message.setIntProperty("ID", i);
+ message.setStringProperty("testprop", selectorProperty);
+ publisher.publish(message, deliveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+ }
+ }
+
+ /**
+ * Generates a string of a given length consisting of the sequence 0,1,2,..,9,0,1,2.
+ *
+ * @param length number of characters in the string
+ * @return string sequence of the given length
+ */
+ public static String generateString(int length)
+ {
+ char[] base_chars = new char[]{'0','1','2','3','4','5','6','7','8','9'};
+ char[] chars = new char[length];
+ for (int i = 0; i < (length); i++)
+ {
+ chars[i] = base_chars[i % 10];
+ }
+ return new String(chars);
+ }
+
+ /**
+ * Run the preparation tool.
+ * @param args Command line arguments.
+ */
+ public static void main(String[] args) throws Exception
+ {
+ BDBStoreUpgradeTestPreparer producer = new BDBStoreUpgradeTestPreparer();
+ producer.prepareBroker();
+ }
+} \ No newline at end of file
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
new file mode 100644
index 0000000000..4861e007af
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
@@ -0,0 +1,540 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_NAME;
+import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SUB_NAME;
+import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.TOPIC_NAME;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.management.common.mbeans.ManagedQueue;
+import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_4;
+import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTupleBindingFactory;
+import org.apache.qpid.server.store.berkeleydb.tuples.MessageMetaDataTupleBindingFactory;
+import org.apache.qpid.test.utils.JMXTestUtils;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.je.DatabaseEntry;
+
+/**
+ * Tests upgrading a BDB store and using it with the new broker
+ * after the required contents are entered into the store using
+ * an old broker with the BDBStoreUpgradeTestPreparer. The store
+ * will then be used to verify that the upgraded is completed
+ * properly and that once upgraded it functions as expected with
+ * the new broker.
+ */
+public class BDBUpgradeTest extends QpidBrokerTestCase
+{
+ protected static final Logger _logger = LoggerFactory.getLogger(BDBUpgradeTest.class);
+
+ private static final String STRING_1024 = BDBStoreUpgradeTestPreparer.generateString(1024);
+ private static final String STRING_1024_256 = BDBStoreUpgradeTestPreparer.generateString(1024*256);
+ private static final String QPID_WORK_ORIG = System.getProperty("QPID_WORK");
+ private static final String QPID_HOME = System.getProperty("QPID_HOME");
+ private static final int VERSION_4 = 4;
+
+ private String _fromDir;
+ private String _toDir;
+ private String _toDirTwice;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ assertNotNull("QPID_WORK must be set", QPID_WORK_ORIG);
+ assertNotNull("QPID_HOME must be set", QPID_HOME);
+
+ if(! isExternalBroker())
+ {
+ //override QPID_WORK to add the InVM port used so the store
+ //output from the upgrade tool can be found by the broker
+ setSystemProperty("QPID_WORK", QPID_WORK_ORIG + "/" + getPort());
+ }
+
+ _fromDir = QPID_HOME + "/bdbstore-to-upgrade/test-store";
+ _toDir = getWorkDirBaseDir() + "/bdbstore/test-store";
+ _toDirTwice = getWorkDirBaseDir() + "/bdbstore-upgraded-twice";
+
+ //Clear the two target directories if they exist.
+ File directory = new File(_toDir);
+ if (directory.exists() && directory.isDirectory())
+ {
+ FileUtils.delete(directory, true);
+ }
+ directory = new File(_toDirTwice);
+ if (directory.exists() && directory.isDirectory())
+ {
+ FileUtils.delete(directory, true);
+ }
+
+ //Upgrade the test store.
+ upgradeBrokerStore(_fromDir, _toDir);
+
+ //override the broker config used and then start the broker with the updated store
+ _configFile = new File("build/etc/config-systests-bdb.xml");
+ setConfigurationProperty("management.enabled", "true");
+
+ super.setUp();
+ }
+
+ private String getWorkDirBaseDir()
+ {
+ return QPID_WORK_ORIG + (isInternalBroker() ? "" : "/" + getPort());
+ }
+
+ /**
+ * Tests that the core upgrade method of the store upgrade tool passes through the exception
+ * from the BDBMessageStore indicating that the data on disk can't be loaded as the previous
+ * version because it has already been upgraded.
+ * @throws Exception
+ */
+ public void testMultipleUpgrades() throws Exception
+ {
+ //stop the broker started by setUp() in order to allow the second upgrade attempt to proceed
+ stopBroker();
+
+ try
+ {
+ new BDBStoreUpgrade(_toDir, _toDirTwice, null, false, true).upgradeFromVersion(VERSION_4);
+ fail("Second Upgrade Succeeded");
+ }
+ catch (Exception e)
+ {
+ System.err.println("Showing stack trace, we are expecting an 'Unable to load BDBStore' error");
+ e.printStackTrace();
+ assertTrue("Incorrect Exception Thrown:" + e.getMessage(),
+ e.getMessage().contains("Unable to load BDBStore as version 4. Store on disk contains version 5 data"));
+ }
+ }
+
+ /**
+ * Test that the selector applied to the DurableSubscription was successfully
+ * transfered to the new store, and functions as expected with continued use
+ * by monitoring message count while sending new messages to the topic.
+ */
+ public void testSelectorDurability() throws Exception
+ {
+ JMXTestUtils jmxUtils = null;
+ try
+ {
+ jmxUtils = new JMXTestUtils(this, "guest", "guest");
+ jmxUtils.open();
+ }
+ catch (Exception e)
+ {
+ fail("Unable to establish JMX connection, test cannot proceed");
+ }
+
+ try
+ {
+ ManagedQueue dursubQueue = jmxUtils.getManagedQueue("clientid" + ":" + SUB_NAME);
+ assertEquals("DurableSubscription backing queue should have 1 message on it initially",
+ new Integer(1), dursubQueue.getMessageCount());
+
+ // Create a connection and start it
+ TopicConnection connection = (TopicConnection) getConnection();
+ connection.start();
+
+ // Send messages which don't match and do match the selector, checking message count
+ TopicSession pubSession = connection.createTopicSession(true, org.apache.qpid.jms.Session.SESSION_TRANSACTED);
+ Topic topic = pubSession.createTopic(TOPIC_NAME);
+ TopicPublisher publisher = pubSession.createPublisher(topic);
+
+ BDBStoreUpgradeTestPreparer.publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "false");
+ pubSession.commit();
+ assertEquals("DurableSubscription backing queue should still have 1 message on it",
+ new Integer(1), dursubQueue.getMessageCount());
+
+ BDBStoreUpgradeTestPreparer.publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "true");
+ pubSession.commit();
+ assertEquals("DurableSubscription backing queue should now have 2 messages on it",
+ new Integer(2), dursubQueue.getMessageCount());
+
+ dursubQueue.clearQueue();
+ pubSession.close();
+ }
+ finally
+ {
+ jmxUtils.close();
+ }
+ }
+
+ /**
+ * Test that the backing queue for the durable subscription created was successfully
+ * detected and set as being exclusive during the upgrade process, and that the
+ * regular queue was not.
+ */
+ public void testQueueExclusivity() throws Exception
+ {
+ JMXTestUtils jmxUtils = null;
+ try
+ {
+ jmxUtils = new JMXTestUtils(this, "guest", "guest");
+ jmxUtils.open();
+ }
+ catch (Exception e)
+ {
+ fail("Unable to establish JMX connection, test cannot proceed");
+ }
+
+ try
+ {
+ ManagedQueue queue = jmxUtils.getManagedQueue(QUEUE_NAME);
+ assertFalse("Queue should not have been marked as Exclusive during upgrade", queue.isExclusive());
+
+ ManagedQueue dursubQueue = jmxUtils.getManagedQueue("clientid" + ":" + SUB_NAME);
+ assertTrue("DurableSubscription backing queue should have been marked as Exclusive during upgrade", dursubQueue.isExclusive());
+ }
+ finally
+ {
+ jmxUtils.close();
+ }
+ }
+
+ /**
+ * Test that the upgraded queue continues to function properly when used
+ * for persistent messaging and restarting the broker.
+ *
+ * Sends the new messages to the queue BEFORE consuming those which were
+ * sent before the upgrade. In doing so, this also serves to test that
+ * the queue bindings were successfully transitioned during the upgrade.
+ */
+ public void testBindingAndMessageDurabability() throws Exception
+ {
+ // Create a connection and start it
+ TopicConnection connection = (TopicConnection) getConnection();
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(QUEUE_NAME);
+ MessageProducer messageProducer = session.createProducer(queue);
+
+ // Send a new message
+ BDBStoreUpgradeTestPreparer.sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 256*1024, 1);
+
+ session.close();
+
+ // Restart the broker
+ restartBroker();
+
+ // Drain the queue of all messages
+ connection = (TopicConnection) getConnection();
+ connection.start();
+ consumeQueueMessages(connection, true);
+ }
+
+ /**
+ * Test that all of the committed persistent messages previously sent to
+ * the broker are properly received following update of the MetaData and
+ * Content entries during the store upgrade process.
+ */
+ public void testConsumptionOfUpgradedMessages() throws Exception
+ {
+ // Create a connection and start it
+ Connection connection = getConnection();
+ connection.start();
+
+ consumeDurableSubscriptionMessages(connection);
+ consumeQueueMessages(connection, false);
+ }
+
+ /**
+ * Tests store migration containing messages for non-existing queue.
+ *
+ * @throws Exception
+ */
+ public void testMigrationOfMessagesForNonExistingQueues() throws Exception
+ {
+ stopBroker();
+
+ // copy store data into a new location for adding of phantom message
+ File storeLocation = new File(_fromDir);
+ File target = new File(_toDirTwice);
+ if (!target.exists())
+ {
+ target.mkdirs();
+ }
+ FileUtils.copyRecursive(storeLocation, target);
+
+ // delete migrated data
+ File directory = new File(_toDir);
+ if (directory.exists() && directory.isDirectory())
+ {
+ FileUtils.delete(directory, true);
+ }
+
+ // test data
+ String nonExistingQueueName = getTestQueueName();
+ String messageText = "Test Phantom Message";
+
+ // add message
+ addMessageForNonExistingQueue(target, VERSION_4, nonExistingQueueName, messageText);
+
+ String[] inputs = { "Yes", "Yes", "Yes" };
+ upgradeBrokerStoreInInterractiveMode(_toDirTwice, _toDir, inputs);
+
+ // start broker
+ startBroker();
+
+ // Create a connection and start it
+ Connection connection = getConnection();
+ connection.start();
+
+ // consume a message for non-existing store
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(nonExistingQueueName);
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ Message message = messageConsumer.receive(1000);
+
+ // assert consumed message
+ assertNotNull("Message was not migrated!", message);
+ assertTrue("Unexpected message received!", message instanceof TextMessage);
+ String text = ((TextMessage) message).getText();
+ assertEquals("Message migration failed!", messageText, text);
+ }
+
+ /**
+ * An utility method to upgrade broker with simulation user interactions
+ *
+ * @param fromDir
+ * location of the store to migrate
+ * @param toDir
+ * location of where migrated data will be stored
+ * @param inputs
+ * user answers on upgrade tool questions
+ * @throws Exception
+ */
+ private void upgradeBrokerStoreInInterractiveMode(String fromDir, String toDir, final String[] inputs)
+ throws Exception
+ {
+ // save to restore system.in after data migration
+ InputStream stdin = System.in;
+
+ // set fake system in to simulate user interactions
+ // FIXME: it is a quite dirty simulator of system input but it does the job
+ System.setIn(new InputStream()
+ {
+
+ int counter = 0;
+
+ public synchronized int read(byte b[], int off, int len)
+ {
+ byte[] src = (inputs[counter] + "\n").getBytes();
+ System.arraycopy(src, 0, b, off, src.length);
+ counter++;
+ return src.length;
+ }
+
+ @Override
+ public int read() throws IOException
+ {
+ return -1;
+ }
+ });
+
+ try
+ {
+ // Upgrade the test store.
+ new BDBStoreUpgrade(fromDir, toDir, null, true, true).upgradeFromVersion(VERSION_4);
+ }
+ finally
+ {
+ // restore system in
+ System.setIn(stdin);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void addMessageForNonExistingQueue(File storeLocation, int storeVersion, String nonExistingQueueName,
+ String messageText) throws Exception
+ {
+ final AMQShortString queueName = new AMQShortString(nonExistingQueueName);
+ BDBMessageStore store = new BDBMessageStore(storeVersion);
+ store.configure(storeLocation, false);
+ try
+ {
+ store.start();
+
+ // store message objects
+ ByteBuffer completeContentBody = ByteBuffer.wrap(messageText.getBytes("UTF-8"));
+ long bodySize = completeContentBody.limit();
+ MessagePublishInfo pubInfoBody = new MessagePublishInfoImpl(new AMQShortString("amq.direct"), false,
+ false, queueName);
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+ props.setDeliveryMode(Integer.valueOf(BasicContentHeaderProperties.PERSISTENT).byteValue());
+ props.setContentType("text/plain");
+ props.setType("text/plain");
+ props.setMessageId("whatever");
+ props.setEncoding("UTF-8");
+ props.getHeaders().setString("Test", "MST");
+ MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+ int classForBasic = methodRegistry.createBasicQosOkBody().getClazz();
+ ContentHeaderBody contentHeaderBody = new ContentHeaderBody(classForBasic, 1, props, bodySize);
+
+ // add content entry to database
+ long messageId = store.getNewMessageId();
+ TupleBinding<MessageContentKey> contentKeyTB = new MessageContentKeyTupleBindingFactory(storeVersion).getInstance();
+ MessageContentKey contentKey = null;
+ if (storeVersion == VERSION_4)
+ {
+ contentKey = new MessageContentKey_4(messageId, 0);
+ }
+ else
+ {
+ throw new Exception(storeVersion + " is not supported");
+ }
+ DatabaseEntry key = new DatabaseEntry();
+ contentKeyTB.objectToEntry(contentKey, key);
+ DatabaseEntry data = new DatabaseEntry();
+ ContentTB contentTB = new ContentTB();
+ contentTB.objectToEntry(completeContentBody, data);
+ store.getContentDb().put(null, key, data);
+
+ // add meta data entry to database
+ TupleBinding<Long> longTB = TupleBinding.getPrimitiveBinding(Long.class);
+ TupleBinding<Object> metaDataTB = new MessageMetaDataTupleBindingFactory(storeVersion).getInstance();
+ key = new DatabaseEntry();
+ data = new DatabaseEntry();
+ longTB.objectToEntry(new Long(messageId), key);
+ MessageMetaData metaData = new MessageMetaData(pubInfoBody, contentHeaderBody, 1);
+ metaDataTB.objectToEntry(metaData, data);
+ store.getMetaDataDb().put(null, key, data);
+
+ // add delivery entry to database
+ TransactionLogResource mockQueue = new TransactionLogResource()
+ {
+ public String getResourceName()
+ {
+ return queueName.asString();
+ }
+ };
+ TransactionLog log = (TransactionLog) store;
+ TransactionLog.Transaction txn = log.newTransaction();
+ txn.enqueueMessage(mockQueue, messageId);
+ txn.commitTran();
+ }
+ finally
+ {
+ // close store
+ store.close();
+ }
+ }
+
+ private void consumeDurableSubscriptionMessages(Connection connection) throws Exception
+ {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic(TOPIC_NAME);
+
+ TopicSubscriber durSub = session.createDurableSubscriber(topic, SUB_NAME,"testprop='true'", false);
+
+ // Retrieve the matching message
+ Message m = durSub.receive(2000);
+ assertNotNull("Failed to receive an expected message", m);
+ assertEquals("Selector property did not match", "true", m.getStringProperty("testprop"));
+ assertEquals("ID property did not match", 1, m.getIntProperty("ID"));
+ assertEquals("Message content was not as expected",BDBStoreUpgradeTestPreparer.generateString(1024) , ((TextMessage)m).getText());
+
+ // Verify that neither the non-matching or uncommitted message are received
+ m = durSub.receive(1000);
+ assertNull("No more messages should have been recieved", m);
+
+ durSub.close();
+ session.close();
+ }
+
+ private void consumeQueueMessages(Connection connection, boolean extraMessage) throws Exception
+ {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(QUEUE_NAME);
+
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message m;
+
+ // Retrieve the initial pre-upgrade messages
+ for (int i=1; i <= 5 ; i++)
+ {
+ m = consumer.receive(2000);
+ assertNotNull("Failed to receive an expected message", m);
+ assertEquals("ID property did not match", i, m.getIntProperty("ID"));
+ assertEquals("Message content was not as expected", STRING_1024_256, ((TextMessage)m).getText());
+ }
+ for (int i=1; i <= 5 ; i++)
+ {
+ m = consumer.receive(2000);
+ assertNotNull("Failed to receive an expected message", m);
+ assertEquals("ID property did not match", i, m.getIntProperty("ID"));
+ assertEquals("Message content was not as expected", STRING_1024, ((TextMessage)m).getText());
+ }
+
+ if(extraMessage)
+ {
+ //verify that the extra message is received
+ m = consumer.receive(2000);
+ assertNotNull("Failed to receive an expected message", m);
+ assertEquals("ID property did not match", 1, m.getIntProperty("ID"));
+ assertEquals("Message content was not as expected", STRING_1024_256, ((TextMessage)m).getText());
+ }
+
+ // Verify that no more messages are received
+ m = consumer.receive(1000);
+ assertNull("No more messages should have been recieved", m);
+
+ consumer.close();
+ session.close();
+ }
+
+ private void upgradeBrokerStore(String fromDir, String toDir) throws Exception
+ {
+ new BDBStoreUpgrade(_fromDir, _toDir, null, false, true).upgradeFromVersion(VERSION_4);
+ }
+}
diff --git a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-to-upgrade/test-store/00000000.jdb b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-to-upgrade/test-store/00000000.jdb
new file mode 100644
index 0000000000..c4e4e6c306
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-to-upgrade/test-store/00000000.jdb
Binary files differ