diff options
Diffstat (limited to 'qpid/java/bdbstore/src/main/java')
33 files changed, 5325 insertions, 0 deletions
diff --git a/qpid/java/bdbstore/src/main/java/BDBStoreUpgrade.log4j.xml b/qpid/java/bdbstore/src/main/java/BDBStoreUpgrade.log4j.xml new file mode 100644 index 0000000000..4d71963ea7 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/BDBStoreUpgrade.log4j.xml @@ -0,0 +1,52 @@ +<?xml version="1.0"?> +<!-- + - + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + - + --> +<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> +<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"> + + <appender name="STDOUT" class="org.apache.log4j.ConsoleAppender"> + + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%d %-5p - %m%n"/> + </layout> + </appender> + + <category name="org.apache.qpid.server.store.berkeleydb.BDBStoreUpgrade"> + <priority value="info"/> + </category> + + <!-- Only show errors from the BDB Store --> + <category name="org.apache.qpid.server.store.berkeleydb.berkeleydb.BDBMessageStore"> + <priority value="error"/> + </category> + + <!-- Provide warnings to standard output --> + <category name="org.apache.qpid"> + <priority value="error"/> + </category> + + <!-- Log all info events to file --> + <root> + <priority value="info"/> + <appender-ref ref="STDOUT"/> + </root> + +</log4j:configuration> diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java new file mode 100644 index 0000000000..8b887b1876 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import org.apache.qpid.framing.AMQShortString; + +import com.sleepycat.bind.tuple.TupleInput; +import com.sleepycat.bind.tuple.TupleOutput; + +public class AMQShortStringEncoding +{ + public static AMQShortString readShortString(TupleInput tupleInput) + { + int length = (int) tupleInput.readShort(); + if (length < 0) + { + return null; + } + else + { + byte[] stringBytes = new byte[length]; + tupleInput.readFast(stringBytes); + return new AMQShortString(stringBytes); + } + + } + + public static void writeShortString(AMQShortString shortString, TupleOutput tupleOutput) + { + + if (shortString == null) + { + tupleOutput.writeShort(-1); + } + else + { + tupleOutput.writeShort(shortString.length()); + tupleOutput.writeFast(shortString.getBytes(), 0, shortString.length()); + } + } +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java new file mode 100644 index 0000000000..81ae315fe2 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import com.sleepycat.bind.tuple.TupleBinding; +import com.sleepycat.bind.tuple.TupleInput; +import com.sleepycat.bind.tuple.TupleOutput; +import org.apache.log4j.Logger; +import org.apache.qpid.framing.AMQShortString; + +public class AMQShortStringTB extends TupleBinding +{ + private static final Logger _log = Logger.getLogger(AMQShortStringTB.class); + + + public AMQShortStringTB() + { + } + + public Object entryToObject(TupleInput tupleInput) + { + return AMQShortStringEncoding.readShortString(tupleInput); + } + + public void objectToEntry(Object object, TupleOutput tupleOutput) + { + AMQShortStringEncoding.writeShortString((AMQShortString)object, tupleOutput); + } + +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackup.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackup.java new file mode 100644 index 0000000000..c515ca5d8e --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackup.java @@ -0,0 +1,344 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import com.sleepycat.je.DatabaseException; +import com.sleepycat.je.Environment; +import com.sleepycat.je.EnvironmentConfig; +import com.sleepycat.je.util.DbBackup; + +import org.apache.log4j.Logger; + +import org.apache.qpid.util.CommandLineParser; +import org.apache.qpid.util.FileUtils; + +import java.io.*; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; + +/** + * BDBBackup is a utility for taking hot backups of the current state of a BDB transaction log database. + * + * <p/>This utility makes the following assumptions/performs the following actions: + * + * <p/><ul> <li>The from and to directory locations will already exist. This scripts does not create them. <li>If this + * script fails to complete in one minute it will terminate. <li>This script always exits with code 1 on error, code 0 + * on success (standard unix convention). <li>This script will log out at info level, when it starts and ends and a list + * of all files backed up. <li>This script logs all errors at error level. <li>This script does not perform regular + * backups, wrap its calling script in a cron job or similar to do this. </ul> + * + * <p/>This utility is build around the BDB provided backup helper utility class, DbBackup. This utility class provides + * an ability to force BDB to stop writing to the current log file set, whilst the backup is taken, to ensure that a + * consistent snapshot is acquired. Preventing BDB from writing to the current log file set, does not stop BDB from + * continuing to run concurrently while the backup is running, it simply moves onto a new set of log files; this + * provides a 'hot' backup facility. + * + * <p/>DbBackup can also help with incremental backups, by providing the number of the last log file backed up. + * Subsequent backups can be taken, from later log files only. In a messaging application, messages are not expected to + * be long-lived in most cases, so the log files will usually have been completely turned over between backups. This + * utility does not support incremental backups for this reason. + * + * <p/>If the database is locked by BDB, as is required when using transactions, and therefore will always be the case + * in Qpid, this utility cannot make use of the DbBackup utility in a seperate process. DbBackup, needs to ensure that + * the BDB envinronment used to take the backup has exclusive write access to the log files. This utility can take a + * backup as a standalone utility against log files, when a broker is not running, using the {@link #takeBackup(String, + *String,com.sleepycat.je.Environment)} method. + * + * <p/>A seperate backup machanism is provided by the {@link #takeBackupNoLock(String,String)} method which can take a + * hot backup against a running broker. This works by finding out the set of files to copy, and then opening them all to + * read, and repeating this process until a consistent set of open files is obtained. This is done to avoid the + * situation where the BDB cleanup thread deletes a file, between the directory listing and opening of the file to copy. + * All consistently opened files are copied. This is the default mechanism the the {@link #main} method of this utility + * uses. + * + * <p/><table id="crc><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations <tr><td> Hot copy all + * BDB log files from one directory to another. </table> + */ +public class BDBBackup +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(BDBBackup.class); + + /** Used for communicating with the user. */ + private static final Logger console = Logger.getLogger("Console"); + + /** Defines the suffix used to identify BDB log files. */ + private static final String LOG_FILE_SUFFIX = ".jdb"; + + /** Defines the command line format for this utility. */ + public static final String[][] COMMAND_LINE_SPEC = + new String[][] + { + { "fromdir", "The path to the directory to back the bdb log file from.", "dir", "true" }, + { "todir", "The path to the directory to save the backed up bdb log files to.", "dir", "true" } + }; + + /** Defines the timeout to terminate the backup operation on if it fails to complete. One minte. */ + public static final long TIMEOUT = 60000; + + /** + * Runs a backup of the BDB log files in a specified directory, copying the backed up files to another specified + * directory. + * + * <p/>The following arguments must be specified: + * + * <p/><table><caption>Command Line</caption> <tr><th> Option <th> Comment <tr><td> -fromdir <td> The path to the + * directory to back the bdb log file from. <tr><td> -todir <td> The path to the directory to save the backed up + * bdb log files to. </table> + * + * @param args The command line arguments. + */ + public static void main(String[] args) + { + // Process the command line using standard handling (errors and usage followed by System.exit when it is wrong). + Properties options = + CommandLineParser.processCommandLine(args, new CommandLineParser(COMMAND_LINE_SPEC), System.getProperties()); + + // Extract the from and to directory locations and perform a backup between them. + try + { + String fromDir = options.getProperty("fromdir"); + String toDir = options.getProperty("todir"); + + log.info("BDBBackup Utility: Starting Hot Backup."); + + BDBBackup bdbBackup = new BDBBackup(); + String[] backedUpFiles = bdbBackup.takeBackupNoLock(fromDir, toDir); + + if (log.isInfoEnabled()) + { + log.info("BDBBackup Utility: Hot Backup Completed. Files backed up: " + backedUpFiles); + } + } + catch (Exception e) + { + console.info("Backup script encountered an error and has failed: " + e.getMessage()); + log.error("Backup script got exception: " + e.getMessage(), e); + System.exit(1); + } + } + + /** + * Creates a backup of the BDB log files in the source directory, copying them to the destination directory. + * + * @param fromdir The source directory path. + * @param todir The destination directory path. + * @param environment An open BDB environment to perform the back up. + * + * @throws DatabaseException Any underlying execeptions from BDB are allowed to fall through. + */ + public void takeBackup(String fromdir, String todir, Environment environment) throws DatabaseException + { + DbBackup backupHelper = null; + + try + { + backupHelper = new DbBackup(environment); + + // Prevent BDB from writing to its log files while the backup it taken. + backupHelper.startBackup(); + + // Back up the BDB log files to the destination directory. + String[] filesForBackup = backupHelper.getLogFilesInBackupSet(); + + for (int i = 0; i < filesForBackup.length; i++) + { + File sourceFile = new File(fromdir + File.separator + filesForBackup[i]); + File destFile = new File(todir + File.separator + filesForBackup[i]); + FileUtils.copy(sourceFile, destFile); + } + } + finally + { + // Remember to exit backup mode, or all log files won't be cleaned and disk usage will bloat. + if (backupHelper != null) + { + backupHelper.endBackup(); + } + } + } + + /** + * Takes a hot backup when another process has locked the BDB database. + * + * @param fromdir The source directory path. + * @param todir The destination directory path. + * + * @return A list of all of the names of the files succesfully backed up. + */ + public String[] takeBackupNoLock(String fromdir, String todir) + { + if (log.isDebugEnabled()) + { + log.debug("public void takeBackupNoLock(String fromdir = " + fromdir + ", String todir = " + todir + + "): called"); + } + + File fromDirFile = new File(fromdir); + + if (!fromDirFile.isDirectory()) + { + throw new IllegalArgumentException("The specified fromdir(" + fromdir + + ") must be the directory containing your bdbstore."); + } + + File toDirFile = new File(todir); + + if (!toDirFile.exists()) + { + // Create directory if it doesn't exist + toDirFile.mkdirs(); + + if (log.isDebugEnabled()) + { + log.debug("Created backup directory:" + toDirFile); + } + } + + if (!toDirFile.isDirectory()) + { + throw new IllegalArgumentException("The specified todir(" + todir + ") must be a directory."); + } + + // Repeat until manage to open consistent set of files for reading. + boolean consistentSet = false; + FileInputStream[] fileInputStreams = new FileInputStream[0]; + File[] fileSet = new File[0]; + long start = System.currentTimeMillis(); + + while (!consistentSet) + { + // List all .jdb files in the directory. + fileSet = fromDirFile.listFiles(new FilenameFilter() + { + public boolean accept(File dir, String name) + { + return name.endsWith(LOG_FILE_SUFFIX); + } + }); + + // Open them all for reading. + fileInputStreams = new FileInputStream[fileSet.length]; + + if (fileSet.length == 0) + { + throw new RuntimeException("There are no BDB log files to backup in the " + fromdir + " directory."); + } + + for (int i = 0; i < fileSet.length; i++) + { + try + { + fileInputStreams[i] = new FileInputStream(fileSet[i]); + } + catch (FileNotFoundException e) + { + // Close any files opened for reading so far. + for (int j = 0; j < i; j++) + { + if (fileInputStreams[j] != null) + { + try + { + fileInputStreams[j].close(); + } + catch (IOException ioEx) + { + // Rethrow this as a runtime exception, as something strange has happened. + throw new RuntimeException(ioEx); + } + } + } + + // Could not open a consistent file set so try again. + break; + } + + // A consistent set has been opened if all files were sucesfully opened for reading. + if (i == (fileSet.length - 1)) + { + consistentSet = true; + } + } + + // Check that the script has not timed out, and raise an error if it has. + long now = System.currentTimeMillis(); + if ((now - start) > TIMEOUT) + { + throw new RuntimeException("Hot backup script failed to complete in " + (TIMEOUT / 1000) + " seconds."); + } + } + + // Copy the consistent set of open files. + List<String> backedUpFileNames = new LinkedList<String>(); + + for (int j = 0; j < fileSet.length; j++) + { + File destFile = new File(todir + File.separator + fileSet[j].getName()); + try + { + FileUtils.copy(fileSet[j], destFile); + } + catch (RuntimeException re) + { + Throwable cause = re.getCause(); + if ((cause != null) && (cause instanceof IOException)) + { + throw new RuntimeException(re.getMessage() + " fromDir:" + fromdir + " toDir:" + toDirFile, cause); + } + else + { + throw re; + } + } + + backedUpFileNames.add(destFile.getName()); + + // Close all of the files. + try + { + fileInputStreams[j].close(); + } + catch (IOException e) + { + // Rethrow this as a runtime exception, as something strange has happened. + throw new RuntimeException(e); + } + } + + return backedUpFileNames.toArray(new String[backedUpFileNames.size()]); + } + + /* + * Creates an environment for the bdb log files in the specified directory. This envrinonment can only be used + * to backup these files, if they are not locked by another database instance. + * + * @param fromdir The path to the directory to create the environment for. + * + * @throws DatabaseException Any underlying exceptions from BDB are allowed to fall through. + */ + private Environment createSourceDirEnvironment(String fromdir) throws DatabaseException + { + // Initialize the BDB backup utility on the source directory. + return new Environment(new File(fromdir), new EnvironmentConfig()); + } +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java new file mode 100644 index 0000000000..f900159808 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -0,0 +1,2124 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; +import java.lang.ref.SoftReference; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.configuration.Configuration; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.messages.ConfigStoreMessages; +import org.apache.qpid.server.logging.messages.MessageStoreMessages; +import org.apache.qpid.server.logging.messages.TransactionLogMessages; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.store.ConfigurationRecoveryHandler; +import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreRecoveryHandler; +import org.apache.qpid.server.store.StorableMessageMetaData; +import org.apache.qpid.server.store.StoredMemoryMessage; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.store.TransactionLogRecoveryHandler; +import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler; +import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler; +import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler; +import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; +import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler; +import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_5; +import org.apache.qpid.server.store.berkeleydb.records.ExchangeRecord; +import org.apache.qpid.server.store.berkeleydb.records.QueueRecord; +import org.apache.qpid.server.store.berkeleydb.tuples.BindingTupleBindingFactory; +import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTB_5; +import org.apache.qpid.server.store.berkeleydb.tuples.MessageMetaDataTupleBindingFactory; +import org.apache.qpid.server.store.berkeleydb.tuples.QueueEntryTB; +import org.apache.qpid.server.store.berkeleydb.tuples.QueueTupleBindingFactory; + +import com.sleepycat.bind.EntryBinding; +import com.sleepycat.bind.tuple.ByteBinding; +import com.sleepycat.bind.tuple.TupleBinding; +import com.sleepycat.je.CheckpointConfig; +import com.sleepycat.je.Cursor; +import com.sleepycat.je.Database; +import com.sleepycat.je.DatabaseConfig; +import com.sleepycat.je.DatabaseEntry; +import com.sleepycat.je.DatabaseException; +import com.sleepycat.je.Environment; +import com.sleepycat.je.EnvironmentConfig; +import com.sleepycat.je.LockMode; +import com.sleepycat.je.OperationStatus; +import com.sleepycat.je.TransactionConfig; + +/** + * BDBMessageStore implements a persistent {@link MessageStore} using the BDB high performance log. + * + * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations <tr><td> Accept + * transaction boundary demarcations: Begin, Commit, Abort. <tr><td> Store and remove queues. <tr><td> Store and remove + * exchanges. <tr><td> Store and remove messages. <tr><td> Bind and unbind queues to exchanges. <tr><td> Enqueue and + * dequeue messages to queues. <tr><td> Generate message identifiers. </table> + */ +@SuppressWarnings({"unchecked"}) +public class BDBMessageStore implements MessageStore +{ + private static final Logger _log = Logger.getLogger(BDBMessageStore.class); + + static final int DATABASE_FORMAT_VERSION = 5; + private static final String DATABASE_FORMAT_VERSION_PROPERTY = "version"; + public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path"; + + private Environment _environment; + + private String MESSAGEMETADATADB_NAME = "messageMetaDataDb"; + private String MESSAGECONTENTDB_NAME = "messageContentDb"; + private String QUEUEBINDINGSDB_NAME = "queueBindingsDb"; + private String DELIVERYDB_NAME = "deliveryDb"; + private String EXCHANGEDB_NAME = "exchangeDb"; + private String QUEUEDB_NAME = "queueDb"; + private Database _messageMetaDataDb; + private Database _messageContentDb; + private Database _queueBindingsDb; + private Database _deliveryDb; + private Database _exchangeDb; + private Database _queueDb; + + /* ======= + * Schema: + * ======= + * + * Queue: + * name(AMQShortString) - name(AMQShortString), owner(AMQShortString), + * arguments(FieldTable encoded as binary), exclusive (boolean) + * + * Exchange: + * name(AMQShortString) - name(AMQShortString), typeName(AMQShortString), autodelete (boolean) + * + * Binding: + * exchangeName(AMQShortString), queueName(AMQShortString), routingKey(AMQShortString), + * arguments (FieldTable encoded as binary) - 0 (zero) + * + * QueueEntry: + * queueName(AMQShortString), messageId (long) - 0 (zero) + * + * Message (MetaData): + * messageId (long) - bodySize (integer), metaData (MessageMetaData encoded as binary) + * + * Message (Content): + * messageId (long), byteOffset (integer) - dataLength(integer), data(binary); + */ + + private LogSubject _logSubject; + + private final AtomicLong _messageId = new AtomicLong(0); + + private final CommitThread _commitThread = new CommitThread("Commit-Thread"); + + // Factory Classes to create the TupleBinding objects that reflect the version instance of this BDBStore + private MessageMetaDataTupleBindingFactory _metaDataTupleBindingFactory; + private QueueTupleBindingFactory _queueTupleBindingFactory; + private BindingTupleBindingFactory _bindingTupleBindingFactory; + + /** The data version this store should run with */ + private int _version; + private enum State + { + INITIAL, + CONFIGURING, + CONFIGURED, + RECOVERING, + STARTED, + CLOSING, + CLOSED + } + + private State _state = State.INITIAL; + + private TransactionConfig _transactionConfig = new TransactionConfig(); + + private boolean _readOnly = false; + + private boolean _configured; + + + public BDBMessageStore() + { + this(DATABASE_FORMAT_VERSION); + } + + public BDBMessageStore(int version) + { + _version = version; + } + + private void setDatabaseNames(int version) + { + if (version > 1) + { + MESSAGEMETADATADB_NAME += "_v" + version; + + MESSAGECONTENTDB_NAME += "_v" + version; + + QUEUEDB_NAME += "_v" + version; + + DELIVERYDB_NAME += "_v" + version; + + EXCHANGEDB_NAME += "_v" + version; + + QUEUEBINDINGSDB_NAME += "_v" + version; + } + } + + public void configureConfigStore(String name, + ConfigurationRecoveryHandler recoveryHandler, + Configuration storeConfiguration, + LogSubject logSubject) throws Exception + { + _logSubject = logSubject; + CurrentActor.get().message(_logSubject, ConfigStoreMessages.CREATED(this.getClass().getName())); + + if(_configured) + { + throw new Exception("ConfigStore already configured"); + } + + configure(name,storeConfiguration); + + _configured = true; + stateTransition(State.CONFIGURING, State.CONFIGURED); + + recover(recoveryHandler); + stateTransition(State.RECOVERING, State.STARTED); + } + + public void configureMessageStore(String name, + MessageStoreRecoveryHandler recoveryHandler, + Configuration storeConfiguration, + LogSubject logSubject) throws Exception + { + CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName())); + + if(!_configured) + { + throw new Exception("ConfigStore not configured"); + } + + recoverMessages(recoveryHandler); + } + + public void configureTransactionLog(String name, TransactionLogRecoveryHandler recoveryHandler, + Configuration storeConfiguration, LogSubject logSubject) throws Exception + { + CurrentActor.get().message(_logSubject, TransactionLogMessages.CREATED(this.getClass().getName())); + + if(!_configured) + { + throw new Exception("ConfigStore not configured"); + } + + recoverQueueEntries(recoveryHandler); + + + } + + public org.apache.qpid.server.store.TransactionLog.Transaction newTransaction() + { + return new BDBTransaction(); + } + + + /** + * Called after instantiation in order to configure the message store. + * + * @param name The name of the virtual host using this store + * @return whether a new store environment was created or not (to indicate whether recovery is necessary) + * + * @throws Exception If any error occurs that means the store is unable to configure itself. + */ + public boolean configure(String name, Configuration storeConfig) throws Exception + { + File environmentPath = new File(storeConfig.getString(ENVIRONMENT_PATH_PROPERTY, + System.getProperty("QPID_WORK") + "/bdbstore/" + name)); + if (!environmentPath.exists()) + { + if (!environmentPath.mkdirs()) + { + throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. " + + "Ensure the path is correct and that the permissions are correct."); + } + } + + CurrentActor.get().message(_logSubject, MessageStoreMessages.STORE_LOCATION(environmentPath.getAbsolutePath())); + + _version = storeConfig.getInt(DATABASE_FORMAT_VERSION_PROPERTY, DATABASE_FORMAT_VERSION); + + return configure(environmentPath, false); + } + + /** + * @param environmentPath location for the store to be created in/recovered from + * @param readonly if true then don't allow modifications to an existing store, and don't create a new store if none exists + * @return whether or not a new store environment was created + * @throws AMQStoreException + * @throws DatabaseException + */ + protected boolean configure(File environmentPath, boolean readonly) throws AMQStoreException, DatabaseException + { + _readOnly = readonly; + stateTransition(State.INITIAL, State.CONFIGURING); + + _log.info("Configuring BDB message store"); + + createTupleBindingFactories(_version); + + setDatabaseNames(_version); + + return setupStore(environmentPath, readonly); + } + + private void createTupleBindingFactories(int version) + { + _bindingTupleBindingFactory = new BindingTupleBindingFactory(version); + _queueTupleBindingFactory = new QueueTupleBindingFactory(version); + _metaDataTupleBindingFactory = new MessageMetaDataTupleBindingFactory(version); + } + + /** + * Move the store state from CONFIGURING to STARTED. + * + * This is required if you do not want to perform recovery of the store data + * + * @throws AMQStoreException if the store is not in the correct state + */ + public void start() throws AMQStoreException + { + stateTransition(State.CONFIGURING, State.STARTED); + } + + private boolean setupStore(File storePath, boolean readonly) throws DatabaseException, AMQStoreException + { + checkState(State.CONFIGURING); + + boolean newEnvironment = createEnvironment(storePath, readonly); + + verifyVersionByTables(); + + openDatabases(readonly); + + if (!readonly) + { + _commitThread.start(); + } + + return newEnvironment; + } + + private void verifyVersionByTables() throws DatabaseException + { + for (String s : _environment.getDatabaseNames()) + { + int versionIndex = s.indexOf("_v"); + + // lack of _v index suggests DB is v1 + // so if _version is not v1 then error + if (versionIndex == -1) + { + if (_version != 1) + { + closeEnvironment(); + throw new IllegalArgumentException("Error: Unable to load BDBStore as version " + _version + + ". Store on disk contains version 1 data."); + } + else // DB is v1 and _version is v1 + { + continue; + } + } + + // Otherwise Check Versions + int version = Integer.parseInt(s.substring(versionIndex + 2)); + + if (version != _version) + { + closeEnvironment(); + throw new IllegalArgumentException("Error: Unable to load BDBStore as version " + _version + + ". Store on disk contains version " + version + " data."); + } + } + } + + private synchronized void stateTransition(State requiredState, State newState) throws AMQStoreException + { + if (_state != requiredState) + { + throw new AMQStoreException("Cannot transition to the state: " + newState + "; need to be in state: " + requiredState + + "; currently in state: " + _state); + } + + _state = newState; + } + + private void checkState(State requiredState) throws AMQStoreException + { + if (_state != requiredState) + { + throw new AMQStoreException("Unexpected state: " + _state + "; required state: " + requiredState); + } + } + + private boolean createEnvironment(File environmentPath, boolean readonly) throws DatabaseException + { + _log.info("BDB message store using environment path " + environmentPath.getAbsolutePath()); + EnvironmentConfig envConfig = new EnvironmentConfig(); + // This is what allows the creation of the store if it does not already exist. + envConfig.setAllowCreate(true); + envConfig.setTransactional(true); + envConfig.setConfigParam("je.lock.nLockTables", "7"); + + // Restore 500,000 default timeout. + //envConfig.setLockTimeout(15000); + + // Added to help diagnosis of Deadlock issue + // http://www.oracle.com/technology/products/berkeley-db/faq/je_faq.html#23 + if (Boolean.getBoolean("qpid.bdb.lock.debug")) + { + envConfig.setConfigParam("je.txn.deadlockStackTrace", "true"); + envConfig.setConfigParam("je.txn.dumpLocks", "true"); + } + + // Set transaction mode + _transactionConfig.setReadCommitted(true); + + //This prevents background threads running which will potentially update the store. + envConfig.setReadOnly(readonly); + try + { + _environment = new Environment(environmentPath, envConfig); + return false; + } + catch (DatabaseException de) + { + if (de.getMessage().contains("Environment.setAllowCreate is false")) + { + //Allow the creation this time + envConfig.setAllowCreate(true); + if (_environment != null ) + { + _environment.cleanLog(); + _environment.close(); + } + _environment = new Environment(environmentPath, envConfig); + + return true; + } + else + { + throw de; + } + } + } + + private void openDatabases(boolean readonly) throws DatabaseException + { + DatabaseConfig dbConfig = new DatabaseConfig(); + dbConfig.setTransactional(true); + dbConfig.setAllowCreate(true); + + //This is required if we are wanting read only access. + dbConfig.setReadOnly(readonly); + + _messageMetaDataDb = _environment.openDatabase(null, MESSAGEMETADATADB_NAME, dbConfig); + _queueDb = _environment.openDatabase(null, QUEUEDB_NAME, dbConfig); + _exchangeDb = _environment.openDatabase(null, EXCHANGEDB_NAME, dbConfig); + _queueBindingsDb = _environment.openDatabase(null, QUEUEBINDINGSDB_NAME, dbConfig); + _messageContentDb = _environment.openDatabase(null, MESSAGECONTENTDB_NAME, dbConfig); + _deliveryDb = _environment.openDatabase(null, DELIVERYDB_NAME, dbConfig); + + } + + /** + * Called to close and cleanup any resources used by the message store. + * + * @throws Exception If the close fails. + */ + public void close() throws Exception + { + if (_state != State.STARTED) + { + return; + } + + _state = State.CLOSING; + + _commitThread.close(); + _commitThread.join(); + + if (_messageMetaDataDb != null) + { + _log.info("Closing message metadata database"); + _messageMetaDataDb.close(); + } + + if (_messageContentDb != null) + { + _log.info("Closing message content database"); + _messageContentDb.close(); + } + + if (_exchangeDb != null) + { + _log.info("Closing exchange database"); + _exchangeDb.close(); + } + + if (_queueBindingsDb != null) + { + _log.info("Closing bindings database"); + _queueBindingsDb.close(); + } + + if (_queueDb != null) + { + _log.info("Closing queue database"); + _queueDb.close(); + } + + if (_deliveryDb != null) + { + _log.info("Close delivery database"); + _deliveryDb.close(); + } + + closeEnvironment(); + + _state = State.CLOSED; + + CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED()); + } + + private void closeEnvironment() throws DatabaseException + { + if (_environment != null) + { + if(!_readOnly) + { + // Clean the log before closing. This makes sure it doesn't contain + // redundant data. Closing without doing this means the cleaner may not + // get a chance to finish. + _environment.cleanLog(); + } + _environment.close(); + } + } + + + public void recover(ConfigurationRecoveryHandler recoveryHandler) throws AMQStoreException + { + stateTransition(State.CONFIGURED, State.RECOVERING); + + CurrentActor.get().message(_logSubject,MessageStoreMessages.RECOVERY_START()); + + try + { + QueueRecoveryHandler qrh = recoveryHandler.begin(this); + loadQueues(qrh); + + ExchangeRecoveryHandler erh = qrh.completeQueueRecovery(); + loadExchanges(erh); + + BindingRecoveryHandler brh = erh.completeExchangeRecovery(); + recoverBindings(brh); + + brh.completeBindingRecovery(); + } + catch (DatabaseException e) + { + throw new AMQStoreException("Error recovering persistent state: " + e.getMessage(), e); + } + + } + + private void loadQueues(QueueRecoveryHandler qrh) throws DatabaseException + { + Cursor cursor = null; + + try + { + cursor = _queueDb.openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + DatabaseEntry value = new DatabaseEntry(); + TupleBinding binding = _queueTupleBindingFactory.getInstance(); + while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + QueueRecord queueRecord = (QueueRecord) binding.entryToObject(value); + + String queueName = queueRecord.getNameShortString() == null ? null : + queueRecord.getNameShortString().asString(); + String owner = queueRecord.getOwner() == null ? null : + queueRecord.getOwner().asString(); + boolean exclusive = queueRecord.isExclusive(); + + FieldTable arguments = queueRecord.getArguments(); + + qrh.queue(queueName, owner, exclusive, arguments); + } + + } + finally + { + if (cursor != null) + { + cursor.close(); + } + } + } + + + private void loadExchanges(ExchangeRecoveryHandler erh) throws DatabaseException + { + Cursor cursor = null; + + try + { + cursor = _exchangeDb.openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + DatabaseEntry value = new DatabaseEntry(); + TupleBinding binding = new ExchangeTB(); + + while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + ExchangeRecord exchangeRec = (ExchangeRecord) binding.entryToObject(value); + + String exchangeName = exchangeRec.getNameShortString() == null ? null : + exchangeRec.getNameShortString().asString(); + String type = exchangeRec.getType() == null ? null : + exchangeRec.getType().asString(); + boolean autoDelete = exchangeRec.isAutoDelete(); + + erh.exchange(exchangeName, type, autoDelete); + } + } + finally + { + if (cursor != null) + { + cursor.close(); + } + } + + } + + private void recoverBindings(BindingRecoveryHandler brh) throws DatabaseException + { + Cursor cursor = null; + try + { + cursor = _queueBindingsDb.openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + DatabaseEntry value = new DatabaseEntry(); + TupleBinding binding = _bindingTupleBindingFactory.getInstance(); + + while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + //yes, this is retrieving all the useful information from the key only. + //For table compatibility it shall currently be left as is + BindingKey bindingRecord = (BindingKey) binding.entryToObject(key); + + String exchangeName = bindingRecord.getExchangeName() == null ? null : + bindingRecord.getExchangeName().asString(); + String queueName = bindingRecord.getQueueName() == null ? null : + bindingRecord.getQueueName().asString(); + String routingKey = bindingRecord.getRoutingKey() == null ? null : + bindingRecord.getRoutingKey().asString(); + ByteBuffer argumentsBB = (bindingRecord.getArguments() == null ? null : + java.nio.ByteBuffer.wrap(bindingRecord.getArguments().getDataAsBytes())); + + brh.binding(exchangeName, queueName, routingKey, argumentsBB); + } + } + finally + { + if (cursor != null) + { + cursor.close(); + } + } + + } + + private void recoverMessages(MessageStoreRecoveryHandler msrh) throws DatabaseException + { + StoredMessageRecoveryHandler mrh = msrh.begin(); + + Cursor cursor = null; + try + { + cursor = _messageMetaDataDb.openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);; + + DatabaseEntry value = new DatabaseEntry(); + EntryBinding valueBinding = _metaDataTupleBindingFactory.getInstance(); + + long maxId = 0; + + while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + long messageId = (Long) keyBinding.entryToObject(key); + StorableMessageMetaData metaData = (StorableMessageMetaData) valueBinding.entryToObject(value); + + StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, false); + mrh.message(message); + + maxId = Math.max(maxId, messageId); + } + + _messageId.set(maxId); + } + catch (DatabaseException e) + { + _log.error("Database Error: " + e.getMessage(), e); + throw e; + } + finally + { + if (cursor != null) + { + cursor.close(); + } + } + } + + private void recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) + throws DatabaseException + { + QueueEntryRecoveryHandler qerh = recoveryHandler.begin(this); + + ArrayList<QueueEntryKey> entries = new ArrayList<QueueEntryKey>(); + + Cursor cursor = null; + try + { + cursor = _deliveryDb.openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + EntryBinding keyBinding = new QueueEntryTB(); + + DatabaseEntry value = new DatabaseEntry(); + + while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + QueueEntryKey qek = (QueueEntryKey) keyBinding.entryToObject(key); + + entries.add(qek); + } + + try + { + cursor.close(); + } + finally + { + cursor = null; + } + + for(QueueEntryKey entry : entries) + { + AMQShortString queueName = entry.getQueueName(); + long messageId = entry.getMessageId(); + + qerh.queueEntry(queueName.asString(),messageId); + } + } + catch (DatabaseException e) + { + _log.error("Database Error: " + e.getMessage(), e); + throw e; + } + finally + { + if (cursor != null) + { + cursor.close(); + } + } + + qerh.completeQueueEntryRecovery(); + } + + /** + * Removes the specified message from the store. + * + * @param messageId Identifies the message to remove. + * + * @throws AMQInternalException If the operation fails for any reason. + */ + public void removeMessage(Long messageId) throws AMQStoreException + { + // _log.debug("public void removeMessage(Long messageId = " + messageId): called"); + + com.sleepycat.je.Transaction tx = null; + + Cursor cursor = null; + try + { + tx = _environment.beginTransaction(null, null); + + //remove the message meta data from the store + DatabaseEntry key = new DatabaseEntry(); + EntryBinding metaKeyBindingTuple = TupleBinding.getPrimitiveBinding(Long.class); + metaKeyBindingTuple.objectToEntry(messageId, key); + + if (_log.isDebugEnabled()) + { + _log.debug("Removing message id " + messageId); + } + + + OperationStatus status = _messageMetaDataDb.delete(tx, key); + if (status == OperationStatus.NOTFOUND) + { + tx.abort(); + + throw new AMQStoreException("Message metadata not found for message id " + messageId); + } + + if (_log.isDebugEnabled()) + { + _log.debug("Deleted metadata for message " + messageId); + } + + //now remove the content data from the store if there is any. + + DatabaseEntry contentKeyEntry = new DatabaseEntry(); + MessageContentKey_5 mck = new MessageContentKey_5(messageId,0); + + TupleBinding<MessageContentKey> contentKeyTupleBinding = new MessageContentKeyTB_5(); + contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry); + + //Use a partial record for the value to prevent retrieving the + //data itself as we only need the key to identify what to remove. + DatabaseEntry value = new DatabaseEntry(); + value.setPartial(0, 0, true); + + cursor = _messageContentDb.openCursor(tx, null); + + status = cursor.getSearchKeyRange(contentKeyEntry, value, LockMode.RMW); + while (status == OperationStatus.SUCCESS) + { + mck = (MessageContentKey_5) contentKeyTupleBinding.entryToObject(contentKeyEntry); + + if(mck.getMessageId() != messageId) + { + //we have exhausted all chunks for this message id, break + break; + } + else + { + status = cursor.delete(); + + if(status == OperationStatus.NOTFOUND) + { + cursor.close(); + cursor = null; + + tx.abort(); + throw new AMQStoreException("Content chunk offset" + mck.getOffset() + " not found for message " + messageId); + } + + if (_log.isDebugEnabled()) + { + _log.debug("Deleted content chunk offset " + mck.getOffset() + " for message " + messageId); + } + } + + status = cursor.getNext(contentKeyEntry, value, LockMode.RMW); + } + + cursor.close(); + cursor = null; + + commit(tx, true); + } + catch (DatabaseException e) + { + e.printStackTrace(); + + if (tx != null) + { + try + { + if(cursor != null) + { + cursor.close(); + cursor = null; + } + + tx.abort(); + } + catch (DatabaseException e1) + { + throw new AMQStoreException("Error aborting transaction " + e1, e1); + } + } + + throw new AMQStoreException("Error removing message with id " + messageId + " from database: " + e.getMessage(), e); + } + finally + { + if(cursor != null) + { + try + { + cursor.close(); + } + catch (DatabaseException e) + { + throw new AMQStoreException("Error closing database connection: " + e.getMessage(), e); + } + } + } + } + + /** + * @see DurableConfigurationStore#createExchange(Exchange) + */ + public void createExchange(Exchange exchange) throws AMQStoreException + { + if (_state != State.RECOVERING) + { + ExchangeRecord exchangeRec = new ExchangeRecord(exchange.getNameShortString(), + exchange.getTypeShortString(), exchange.isAutoDelete()); + + DatabaseEntry key = new DatabaseEntry(); + EntryBinding keyBinding = new AMQShortStringTB(); + keyBinding.objectToEntry(exchange.getNameShortString(), key); + + DatabaseEntry value = new DatabaseEntry(); + TupleBinding exchangeBinding = new ExchangeTB(); + exchangeBinding.objectToEntry(exchangeRec, value); + + try + { + _exchangeDb.put(null, key, value); + } + catch (DatabaseException e) + { + throw new AMQStoreException("Error writing Exchange with name " + exchange.getName() + " to database: " + e.getMessage(), e); + } + } + } + + /** + * @see DurableConfigurationStore#removeExchange(Exchange) + */ + public void removeExchange(Exchange exchange) throws AMQStoreException + { + DatabaseEntry key = new DatabaseEntry(); + EntryBinding keyBinding = new AMQShortStringTB(); + keyBinding.objectToEntry(exchange.getNameShortString(), key); + try + { + OperationStatus status = _exchangeDb.delete(null, key); + if (status == OperationStatus.NOTFOUND) + { + throw new AMQStoreException("Exchange " + exchange.getName() + " not found"); + } + } + catch (DatabaseException e) + { + throw new AMQStoreException("Error writing deleting with name " + exchange.getName() + " from database: " + e.getMessage(), e); + } + } + + + + + /** + * @see DurableConfigurationStore#bindQueue(Exchange, AMQShortString, AMQQueue, FieldTable) + */ + public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException + { + // _log.debug("public void bindQueue(Exchange exchange = " + exchange + ", AMQShortString routingKey = " + routingKey + // + ", AMQQueue queue = " + queue + ", FieldTable args = " + args + "): called"); + + if (_state != State.RECOVERING) + { + BindingKey bindingRecord = new BindingKey(exchange.getNameShortString(), + queue.getNameShortString(), routingKey, args); + + DatabaseEntry key = new DatabaseEntry(); + EntryBinding keyBinding = _bindingTupleBindingFactory.getInstance(); + + keyBinding.objectToEntry(bindingRecord, key); + + //yes, this is writing out 0 as a value and putting all the + //useful info into the key, don't ask me why. For table + //compatibility it shall currently be left as is + DatabaseEntry value = new DatabaseEntry(); + ByteBinding.byteToEntry((byte) 0, value); + + try + { + _queueBindingsDb.put(null, key, value); + } + catch (DatabaseException e) + { + throw new AMQStoreException("Error writing binding for AMQQueue with name " + queue.getName() + " to exchange " + + exchange.getName() + " to database: " + e.getMessage(), e); + } + } + } + + /** + * @see DurableConfigurationStore#unbindQueue(Exchange, AMQShortString, AMQQueue, FieldTable) + */ + public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) + throws AMQStoreException + { + DatabaseEntry key = new DatabaseEntry(); + EntryBinding keyBinding = _bindingTupleBindingFactory.getInstance(); + keyBinding.objectToEntry(new BindingKey(exchange.getNameShortString(), queue.getNameShortString(), routingKey, args), key); + + try + { + OperationStatus status = _queueBindingsDb.delete(null, key); + if (status == OperationStatus.NOTFOUND) + { + throw new AMQStoreException("Queue binding for queue with name " + queue.getName() + " to exchange " + + exchange.getName() + " not found"); + } + } + catch (DatabaseException e) + { + throw new AMQStoreException("Error deleting queue binding for queue with name " + queue.getName() + " to exchange " + + exchange.getName() + " from database: " + e.getMessage(), e); + } + } + + /** + * @see DurableConfigurationStore#createQueue(AMQQueue) + */ + public void createQueue(AMQQueue queue) throws AMQStoreException + { + createQueue(queue, null); + } + + /** + * @see DurableConfigurationStore#createQueue(AMQQueue, FieldTable) + */ + public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException + { + if (_log.isDebugEnabled()) + { + _log.debug("public void createQueue(AMQQueue queue(" + queue.getName() + ") = " + queue + "): called"); + } + + QueueRecord queueRecord= new QueueRecord(queue.getNameShortString(), + queue.getOwner(), queue.isExclusive(), arguments); + + createQueue(queueRecord); + } + + /** + * Makes the specified queue persistent. + * + * Only intended for direct use during store upgrades. + * + * @param queueRecord Details of the queue to store. + * + * @throws AMQStoreException If the operation fails for any reason. + */ + protected void createQueue(QueueRecord queueRecord) throws AMQStoreException + { + if (_state != State.RECOVERING) + { + DatabaseEntry key = new DatabaseEntry(); + EntryBinding keyBinding = new AMQShortStringTB(); + keyBinding.objectToEntry(queueRecord.getNameShortString(), key); + + DatabaseEntry value = new DatabaseEntry(); + TupleBinding queueBinding = _queueTupleBindingFactory.getInstance(); + + queueBinding.objectToEntry(queueRecord, value); + try + { + _queueDb.put(null, key, value); + } + catch (DatabaseException e) + { + throw new AMQStoreException("Error writing AMQQueue with name " + queueRecord.getNameShortString().asString() + + " to database: " + e.getMessage(), e); + } + } + } + + /** + * Updates the specified queue in the persistent store, IF it is already present. If the queue + * is not present in the store, it will not be added. + * + * NOTE: Currently only updates the exclusivity. + * + * @param queue The queue to update the entry for. + * @throws AMQStoreException If the operation fails for any reason. + */ + public void updateQueue(final AMQQueue queue) throws AMQStoreException + { + if (_log.isDebugEnabled()) + { + _log.debug("Updating queue: " + queue.getName()); + } + + try + { + DatabaseEntry key = new DatabaseEntry(); + EntryBinding keyBinding = new AMQShortStringTB(); + keyBinding.objectToEntry(queue.getNameShortString(), key); + + DatabaseEntry value = new DatabaseEntry(); + DatabaseEntry newValue = new DatabaseEntry(); + TupleBinding queueBinding = _queueTupleBindingFactory.getInstance(); + + OperationStatus status = _queueDb.get(null, key, value, LockMode.DEFAULT); + if(status == OperationStatus.SUCCESS) + { + //read the existing record and apply the new exclusivity setting + QueueRecord queueRecord = (QueueRecord) queueBinding.entryToObject(value); + queueRecord.setExclusive(queue.isExclusive()); + + //write the updated entry to the store + queueBinding.objectToEntry(queueRecord, newValue); + + _queueDb.put(null, key, newValue); + } + else if(status != OperationStatus.NOTFOUND) + { + throw new AMQStoreException("Error updating queue details within the store: " + status); + } + } + catch (DatabaseException e) + { + throw new AMQStoreException("Error updating queue details within the store: " + e,e); + } + } + + /** + * Removes the specified queue from the persistent store. + * + * @param queue The queue to remove. + * + * @throws AMQStoreException If the operation fails for any reason. + */ + public void removeQueue(final AMQQueue queue) throws AMQStoreException + { + AMQShortString name = queue.getNameShortString(); + + if (_log.isDebugEnabled()) + { + _log.debug("public void removeQueue(AMQShortString name = " + name + "): called"); + } + + DatabaseEntry key = new DatabaseEntry(); + EntryBinding keyBinding = new AMQShortStringTB(); + keyBinding.objectToEntry(name, key); + try + { + OperationStatus status = _queueDb.delete(null, key); + if (status == OperationStatus.NOTFOUND) + { + throw new AMQStoreException("Queue " + name + " not found"); + } + } + catch (DatabaseException e) + { + throw new AMQStoreException("Error writing deleting with name " + name + " from database: " + e.getMessage(), e); + } + } + + /** + * Places a message onto a specified queue, in a given transaction. + * + * @param tx The transaction for the operation. + * @param queue The the queue to place the message on. + * @param messageId The message to enqueue. + * + * @throws AMQStoreException If the operation fails for any reason. + */ + public void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, Long messageId) throws AMQStoreException + { + // _log.debug("public void enqueueMessage(Transaction tx = " + tx + ", AMQShortString name = " + name + ", Long messageId): called"); + + AMQShortString name = new AMQShortString(queue.getResourceName()); + + DatabaseEntry key = new DatabaseEntry(); + EntryBinding keyBinding = new QueueEntryTB(); + QueueEntryKey dd = new QueueEntryKey(name, messageId); + keyBinding.objectToEntry(dd, key); + DatabaseEntry value = new DatabaseEntry(); + ByteBinding.byteToEntry((byte) 0, value); + + try + { + if (_log.isDebugEnabled()) + { + _log.debug("Enqueuing message " + messageId + " on queue " + name + " [Transaction" + tx + "]"); + } + _deliveryDb.put(tx, key, value); + } + catch (DatabaseException e) + { + _log.error("Failed to enqueue: " + e.getMessage(), e); + throw new AMQStoreException("Error writing enqueued message with id " + messageId + " for queue " + name + + " to database", e); + } + } + + /** + * Extracts a message from a specified queue, in a given transaction. + * + * @param tx The transaction for the operation. + * @param queue The name queue to take the message from. + * @param messageId The message to dequeue. + * + * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist. + */ + public void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, Long messageId) throws AMQStoreException + { + AMQShortString name = new AMQShortString(queue.getResourceName()); + + DatabaseEntry key = new DatabaseEntry(); + EntryBinding keyBinding = new QueueEntryTB(); + QueueEntryKey dd = new QueueEntryKey(name, messageId); + + keyBinding.objectToEntry(dd, key); + + if (_log.isDebugEnabled()) + { + _log.debug("Dequeue message id " + messageId); + } + + try + { + + OperationStatus status = _deliveryDb.delete(tx, key); + if (status == OperationStatus.NOTFOUND) + { + throw new AMQStoreException("Unable to find message with id " + messageId + " on queue " + name); + } + else if (status != OperationStatus.SUCCESS) + { + throw new AMQStoreException("Unable to remove message with id " + messageId + " on queue " + name); + } + + if (_log.isDebugEnabled()) + { + _log.debug("Removed message " + messageId + ", " + name + " from delivery db"); + + } + } + catch (DatabaseException e) + { + + _log.error("Failed to dequeue message " + messageId + ": " + e.getMessage(), e); + _log.error(tx); + + throw new AMQStoreException("Error accessing database while dequeuing message: " + e.getMessage(), e); + } + } + + /** + * Commits all operations performed within a given transaction. + * + * @param tx The transaction to commit all operations for. + * + * @throws AMQStoreException If the operation fails for any reason. + */ + private StoreFuture commitTranImpl(final com.sleepycat.je.Transaction tx, boolean syncCommit) throws AMQStoreException + { + //if (_log.isDebugEnabled()) + //{ + // _log.debug("public void commitTranImpl() called with (Transaction=" + tx + ", syncCommit= "+ syncCommit + ")"); + //} + + if (tx == null) + { + throw new AMQStoreException("Fatal internal error: transactional is null at commitTran"); + } + + StoreFuture result; + try + { + result = commit(tx, syncCommit); + + if (_log.isDebugEnabled()) + { + _log.debug("commitTranImpl completed for [Transaction:" + tx + "]"); + } + } + catch (DatabaseException e) + { + throw new AMQStoreException("Error commit tx: " + e.getMessage(), e); + } + + return result; + } + + /** + * Abandons all operations performed within a given transaction. + * + * @param tx The transaction to abandon. + * + * @throws AMQStoreException If the operation fails for any reason. + */ + public void abortTran(final com.sleepycat.je.Transaction tx) throws AMQStoreException + { + if (_log.isDebugEnabled()) + { + _log.debug("abortTran called for [Transaction:" + tx + "]"); + } + + try + { + tx.abort(); + } + catch (DatabaseException e) + { + throw new AMQStoreException("Error aborting transaction: " + e.getMessage(), e); + } + } + + /** + * Primarily for testing purposes. + * + * @param queueName + * + * @return a list of message ids for messages enqueued for a particular queue + */ + List<Long> getEnqueuedMessages(AMQShortString queueName) throws AMQStoreException + { + Cursor cursor = null; + try + { + cursor = _deliveryDb.openCursor(null, null); + + DatabaseEntry key = new DatabaseEntry(); + + QueueEntryKey dd = new QueueEntryKey(queueName, 0); + + EntryBinding keyBinding = new QueueEntryTB(); + keyBinding.objectToEntry(dd, key); + + DatabaseEntry value = new DatabaseEntry(); + + LinkedList<Long> messageIds = new LinkedList<Long>(); + + OperationStatus status = cursor.getSearchKeyRange(key, value, LockMode.DEFAULT); + dd = (QueueEntryKey) keyBinding.entryToObject(key); + + while ((status == OperationStatus.SUCCESS) && dd.getQueueName().equals(queueName)) + { + + messageIds.add(dd.getMessageId()); + status = cursor.getNext(key, value, LockMode.DEFAULT); + if (status == OperationStatus.SUCCESS) + { + dd = (QueueEntryKey) keyBinding.entryToObject(key); + } + } + + return messageIds; + } + catch (DatabaseException e) + { + throw new AMQStoreException("Database error: " + e.getMessage(), e); + } + finally + { + if (cursor != null) + { + try + { + cursor.close(); + } + catch (DatabaseException e) + { + throw new AMQStoreException("Error closing cursor: " + e.getMessage(), e); + } + } + } + } + + /** + * Return a valid, currently unused message id. + * + * @return A fresh message id. + */ + public Long getNewMessageId() + { + return _messageId.incrementAndGet(); + } + + /** + * Stores a chunk of message data. + * + * @param tx The transaction for the operation. + * @param messageId The message to store the data for. + * @param offset The offset of the data chunk in the message. + * @param contentBody The content of the data chunk. + * + * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist. + */ + protected void addContent(final com.sleepycat.je.Transaction tx, Long messageId, int offset, + ByteBuffer contentBody) throws AMQStoreException + { + DatabaseEntry key = new DatabaseEntry(); + TupleBinding<MessageContentKey> keyBinding = new MessageContentKeyTB_5(); + keyBinding.objectToEntry(new MessageContentKey_5(messageId, offset), key); + DatabaseEntry value = new DatabaseEntry(); + TupleBinding<ByteBuffer> messageBinding = new ContentTB(); + messageBinding.objectToEntry(contentBody, value); + try + { + OperationStatus status = _messageContentDb.put(tx, key, value); + if (status != OperationStatus.SUCCESS) + { + throw new AMQStoreException("Error adding content chunk offset" + offset + " for message id " + messageId + ": " + + status); + } + + if (_log.isDebugEnabled()) + { + _log.debug("Storing content chunk offset" + offset + " for message " + messageId + "[Transaction" + tx + "]"); + } + } + catch (DatabaseException e) + { + throw new AMQStoreException("Error writing AMQMessage with id " + messageId + " to database: " + e.getMessage(), e); + } + } + + /** + * Stores message meta-data. + * + * @param tx The transaction for the operation. + * @param messageId The message to store the data for. + * @param messageMetaData The message meta data to store. + * + * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist. + */ + private void storeMetaData(final com.sleepycat.je.Transaction tx, Long messageId, StorableMessageMetaData messageMetaData) + throws AMQStoreException + { + if (_log.isDebugEnabled()) + { + _log.debug("public void storeMetaData(Txn tx = " + tx + ", Long messageId = " + + messageId + ", MessageMetaData messageMetaData = " + messageMetaData + "): called"); + } + + DatabaseEntry key = new DatabaseEntry(); + EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class); + keyBinding.objectToEntry(messageId, key); + DatabaseEntry value = new DatabaseEntry(); + + TupleBinding messageBinding = _metaDataTupleBindingFactory.getInstance(); + messageBinding.objectToEntry(messageMetaData, value); + try + { + _messageMetaDataDb.put(tx, key, value); + if (_log.isDebugEnabled()) + { + _log.debug("Storing message metadata for message id " + messageId + "[Transaction" + tx + "]"); + } + } + catch (DatabaseException e) + { + throw new AMQStoreException("Error writing message metadata with id " + messageId + " to database: " + e.getMessage(), e); + } + } + + /** + * Retrieves message meta-data. + * + * @param messageId The message to get the meta-data for. + * + * @return The message meta data. + * + * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist. + */ + public StorableMessageMetaData getMessageMetaData(Long messageId) throws AMQStoreException + { + if (_log.isDebugEnabled()) + { + _log.debug("public MessageMetaData getMessageMetaData(Long messageId = " + + messageId + "): called"); + } + + DatabaseEntry key = new DatabaseEntry(); + EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class); + keyBinding.objectToEntry(messageId, key); + DatabaseEntry value = new DatabaseEntry(); + TupleBinding messageBinding = _metaDataTupleBindingFactory.getInstance(); + + try + { + OperationStatus status = _messageMetaDataDb.get(null, key, value, LockMode.READ_UNCOMMITTED); + if (status != OperationStatus.SUCCESS) + { + throw new AMQStoreException("Metadata not found for message with id " + messageId); + } + + StorableMessageMetaData mdd = (StorableMessageMetaData) messageBinding.entryToObject(value); + + return mdd; + } + catch (DatabaseException e) + { + throw new AMQStoreException("Error reading message metadata for message with id " + messageId + ": " + e.getMessage(), e); + } + } + + /** + * Fills the provided ByteBuffer with as much content for the specified message as possible, starting + * from the specified offset in the message. + * + * @param messageId The message to get the data for. + * @param offset The offset of the data within the message. + * @param dst The destination of the content read back + * + * @return The number of bytes inserted into the destination + * + * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist. + */ + public int getContent(Long messageId, int offset, ByteBuffer dst) throws AMQStoreException + { + DatabaseEntry contentKeyEntry = new DatabaseEntry(); + + //Start from 0 offset and search for the starting chunk. + MessageContentKey_5 mck = new MessageContentKey_5(messageId, 0); + TupleBinding<MessageContentKey> contentKeyTupleBinding = new MessageContentKeyTB_5(); + contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry); + DatabaseEntry value = new DatabaseEntry(); + TupleBinding<ByteBuffer> contentTupleBinding = new ContentTB(); + + if (_log.isDebugEnabled()) + { + _log.debug("Message Id: " + messageId + " Getting content body from offset: " + offset); + } + + int written = 0; + int seenSoFar = 0; + + Cursor cursor = null; + try + { + cursor = _messageContentDb.openCursor(null, null); + + OperationStatus status = cursor.getSearchKeyRange(contentKeyEntry, value, LockMode.READ_UNCOMMITTED); + + while (status == OperationStatus.SUCCESS) + { + mck = (MessageContentKey_5) contentKeyTupleBinding.entryToObject(contentKeyEntry); + long id = mck.getMessageId(); + + if(id != messageId) + { + //we have exhausted all chunks for this message id, break + break; + } + + int offsetInMessage = mck.getOffset(); + ByteBuffer buf = (ByteBuffer) contentTupleBinding.entryToObject(value); + + final int size = (int) buf.limit(); + + seenSoFar += size; + + if(seenSoFar >= offset) + { + byte[] dataAsBytes = buf.array(); + + int posInArray = offset + written - offsetInMessage; + int count = size - posInArray; + if(count > dst.remaining()) + { + count = dst.remaining(); + } + dst.put(dataAsBytes,posInArray,count); + written+=count; + + if(dst.remaining() == 0) + { + break; + } + } + + status = cursor.getNext(contentKeyEntry, value, LockMode.RMW); + } + + return written; + } + catch (DatabaseException e) + { + throw new AMQStoreException("Error writing AMQMessage with id " + messageId + " to database: " + e.getMessage(), e); + } + finally + { + if(cursor != null) + { + try + { + cursor.close(); + } + catch (DatabaseException e) + { + throw new AMQStoreException("Error writing AMQMessage with id " + messageId + " to database: " + e.getMessage(), e); + } + } + } + } + + public boolean isPersistent() + { + return true; + } + + public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData) + { + if(metaData.isPersistent()) + { + return new StoredBDBMessage(getNewMessageId(), metaData); + } + else + { + return new StoredMemoryMessage(getNewMessageId(), metaData); + } + } + + + //protected getters for the TupleBindingFactories + + protected QueueTupleBindingFactory getQueueTupleBindingFactory() + { + return _queueTupleBindingFactory; + } + + protected BindingTupleBindingFactory getBindingTupleBindingFactory() + { + return _bindingTupleBindingFactory; + } + + protected MessageMetaDataTupleBindingFactory getMetaDataTupleBindingFactory() + { + return _metaDataTupleBindingFactory; + } + + //Package getters for the various databases used by the Store + + Database getMetaDataDb() + { + return _messageMetaDataDb; + } + + Database getContentDb() + { + return _messageContentDb; + } + + Database getQueuesDb() + { + return _queueDb; + } + + Database getDeliveryDb() + { + return _deliveryDb; + } + + Database getExchangesDb() + { + return _exchangeDb; + } + + Database getBindingsDb() + { + return _queueBindingsDb; + } + + void visitMetaDataDb(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException + { + visitDatabase(_messageMetaDataDb, visitor); + } + + void visitContentDb(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException + { + visitDatabase(_messageContentDb, visitor); + } + + void visitQueues(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException + { + visitDatabase(_queueDb, visitor); + } + + void visitDelivery(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException + { + visitDatabase(_deliveryDb, visitor); + } + + void visitExchanges(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException + { + visitDatabase(_exchangeDb, visitor); + } + + void visitBindings(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException + { + visitDatabase(_queueBindingsDb, visitor); + } + + /** + * Generic visitDatabase allows iteration through the specified database. + * + * @param database The database to visit + * @param visitor The visitor to give each entry to. + * + * @throws DatabaseException If there is a problem with the Database structure + * @throws AMQStoreException If there is a problem with the Database contents + */ + void visitDatabase(Database database, DatabaseVisitor visitor) throws DatabaseException, AMQStoreException + { + Cursor cursor = database.openCursor(null, null); + + try + { + DatabaseEntry key = new DatabaseEntry(); + DatabaseEntry value = new DatabaseEntry(); + while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + visitor.visit(key, value); + } + } + finally + { + if (cursor != null) + { + cursor.close(); + } + } + } + + private StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit) throws DatabaseException + { + // _log.debug("void commit(Transaction tx = " + tx + ", sync = " + syncCommit + "): called"); + + tx.commitNoSync(); + + BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit); + commitFuture.commit(); + + return commitFuture; + } + + public void startCommitThread() + { + _commitThread.start(); + } + + private static final class BDBCommitFuture implements StoreFuture + { + // private static final Logger _log = Logger.getLogger(BDBCommitFuture.class); + + private final CommitThread _commitThread; + private final com.sleepycat.je.Transaction _tx; + private DatabaseException _databaseException; + private boolean _complete; + private boolean _syncCommit; + + public BDBCommitFuture(CommitThread commitThread, com.sleepycat.je.Transaction tx, boolean syncCommit) + { + // _log.debug("public Commit(CommitThread commitThread = " + commitThread + ", Transaction tx = " + tx + // + "): called"); + + _commitThread = commitThread; + _tx = tx; + _syncCommit = syncCommit; + } + + public synchronized void complete() + { + if (_log.isDebugEnabled()) + { + _log.debug("public synchronized void complete(): called (Transaction = " + _tx + ")"); + } + + _complete = true; + + notifyAll(); + } + + public synchronized void abort(DatabaseException databaseException) + { + // _log.debug("public synchronized void abort(DatabaseException databaseException = " + databaseException + // + "): called"); + + _complete = true; + _databaseException = databaseException; + + notifyAll(); + } + + public void commit() throws DatabaseException + { + //_log.debug("public void commit(): called"); + + _commitThread.addJob(this); + + if(!_syncCommit) + { + _log.debug("CommitAsync was requested, returning immediately."); + return; + } + + synchronized (BDBCommitFuture.this) + { + while (!_complete) + { + try + { + wait(250); + } + catch (InterruptedException e) + { + // _log.error("Unexpected thread interruption: " + e, e); + throw new RuntimeException(e); + } + } + + // _log.debug("Commit completed, _databaseException = " + _databaseException); + + if (_databaseException != null) + { + throw _databaseException; + } + } + } + + public synchronized boolean isComplete() + { + return _complete; + } + + public void waitForCompletion() + { + while (!isComplete()) + { + try + { + wait(250); + } + catch (InterruptedException e) + { + //TODO Should we ignore, or throw a 'StoreException'? + throw new RuntimeException(e); + } + } + } + } + + /** + * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations + * themselves are responsible for adding themselves to the queue and waiting for the commit to happen before + * continuing, but it is the responsibility of this thread to tell the commit operations when they have been + * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods. + * + * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collarations </table> + */ + private class CommitThread extends Thread + { + // private final Logger _log = Logger.getLogger(CommitThread.class); + + private final AtomicBoolean _stopped = new AtomicBoolean(false); + private final AtomicReference<Queue<BDBCommitFuture>> _jobQueue = new AtomicReference<Queue<BDBCommitFuture>>(new ConcurrentLinkedQueue<BDBCommitFuture>()); + private final CheckpointConfig _config = new CheckpointConfig(); + private final Object _lock = new Object(); + + public CommitThread(String name) + { + super(name); + _config.setForce(true); + + } + + public void run() + { + while (!_stopped.get()) + { + synchronized (_lock) + { + while (!_stopped.get() && !hasJobs()) + { + try + { + // RHM-7 Periodically wake up and check, just in case we + // missed a notification. Don't want to lock the broker hard. + _lock.wait(250); + } + catch (InterruptedException e) + { + // _log.info(getName() + " interrupted. "); + } + } + } + processJobs(); + } + } + + private void processJobs() + { + // _log.debug("private void processJobs(): called"); + + // we replace the old queue atomically with a new one and this avoids any need to + // copy elements out of the queue + Queue<BDBCommitFuture> jobs = _jobQueue.getAndSet(new ConcurrentLinkedQueue<BDBCommitFuture>()); + + try + { + // _environment.checkpoint(_config); + _environment.sync(); + + for (BDBCommitFuture commit : jobs) + { + commit.complete(); + } + } + catch (DatabaseException e) + { + for (BDBCommitFuture commit : jobs) + { + commit.abort(e); + } + } + + } + + private boolean hasJobs() + { + return !_jobQueue.get().isEmpty(); + } + + public void addJob(BDBCommitFuture commit) + { + synchronized (_lock) + { + _jobQueue.get().add(commit); + _lock.notifyAll(); + } + } + + public void close() + { + synchronized (_lock) + { + _stopped.set(true); + _lock.notifyAll(); + } + } + } + + + private class StoredBDBMessage implements StoredMessage + { + + private final long _messageId; + private volatile SoftReference<StorableMessageMetaData> _metaDataRef; + private com.sleepycat.je.Transaction _txn; + + StoredBDBMessage(long messageId, StorableMessageMetaData metaData) + { + this(messageId, metaData, true); + } + + + StoredBDBMessage(long messageId, + StorableMessageMetaData metaData, boolean persist) + { + try + { + _messageId = messageId; + + _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData); + if(persist) + { + _txn = _environment.beginTransaction(null, null); + storeMetaData(_txn, messageId, metaData); + } + } + catch (DatabaseException e) + { + throw new RuntimeException(e); + } + catch (AMQStoreException e) + { + throw new RuntimeException(e); + } + + } + + public StorableMessageMetaData getMetaData() + { + StorableMessageMetaData metaData = _metaDataRef.get(); + if(metaData == null) + { + try + { + metaData = BDBMessageStore.this.getMessageMetaData(_messageId); + } + catch (AMQStoreException e) + { + throw new RuntimeException(e); + } + _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData); + } + + return metaData; + } + + public long getMessageNumber() + { + return _messageId; + } + + public void addContent(int offsetInMessage, java.nio.ByteBuffer src) + { + try + { + BDBMessageStore.this.addContent(_txn, _messageId, offsetInMessage, src); + } + catch (AMQStoreException e) + { + throw new RuntimeException(e); + } + } + + public int getContent(int offsetInMessage, java.nio.ByteBuffer dst) + { + try + { + return BDBMessageStore.this.getContent(_messageId, offsetInMessage, dst); + } + catch (AMQStoreException e) + { + throw new RuntimeException(e); + } + } + + public StoreFuture flushToStore() + { + try + { + if(_txn != null) + { + //if(_log.isDebugEnabled()) + //{ + // _log.debug("Flushing message " + _messageId + " to store"); + //} + BDBMessageStore.this.commitTranImpl(_txn, true); + } + } + catch (AMQStoreException e) + { + throw new RuntimeException(e); + } + finally + { + _txn = null; + } + return IMMEDIATE_FUTURE; + } + + public void remove() + { + flushToStore(); + try + { + BDBMessageStore.this.removeMessage(_messageId); + } + catch (AMQStoreException e) + { + throw new RuntimeException(e); + } + } + } + + private class BDBTransaction implements Transaction + { + private com.sleepycat.je.Transaction _txn; + + private BDBTransaction() + { + try + { + _txn = _environment.beginTransaction(null, null); + } + catch (DatabaseException e) + { + throw new RuntimeException(e); + } + } + + public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException + { + BDBMessageStore.this.enqueueMessage(_txn, queue, messageId); + } + + public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException + { + BDBMessageStore.this.dequeueMessage(_txn, queue, messageId); + + } + + public void commitTran() throws AMQStoreException + { + BDBMessageStore.this.commitTranImpl(_txn, true); + } + + public StoreFuture commitTranAsync() throws AMQStoreException + { + return BDBMessageStore.this.commitTranImpl(_txn, false); + } + + public void abortTran() throws AMQStoreException + { + BDBMessageStore.this.abortTran(_txn); + } + } + +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java new file mode 100644 index 0000000000..211c025dcd --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java @@ -0,0 +1,1125 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.store.berkeleydb.BDBMessageStore; +import org.apache.qpid.server.store.berkeleydb.BindingKey; +import org.apache.qpid.server.store.berkeleydb.ContentTB; +import org.apache.qpid.server.store.berkeleydb.DatabaseVisitor; +import org.apache.qpid.server.store.berkeleydb.ExchangeTB; +import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_4; +import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_5; +import org.apache.qpid.server.store.berkeleydb.records.ExchangeRecord; +import org.apache.qpid.server.store.berkeleydb.records.QueueRecord; +import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTB_4; +import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTB_5; +import org.apache.qpid.server.store.berkeleydb.tuples.QueueEntryTB; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.BrokerActor; +import org.apache.qpid.server.logging.NullRootMessageLogger; +import org.apache.qpid.server.message.MessageMetaData; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.util.FileUtils; +import org.apache.commons.cli.PosixParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; + +import java.io.File; +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.io.IOException; +import java.io.FileNotFoundException; +import java.nio.ByteBuffer; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map.Entry; + +import com.sleepycat.je.DatabaseEntry; +import com.sleepycat.je.DatabaseException; +import com.sleepycat.je.Database; +import com.sleepycat.je.Environment; +import com.sleepycat.je.EnvironmentConfig; +import com.sleepycat.bind.tuple.TupleBinding; + +/** + * This is a simple BerkeleyDB Store upgrade tool that will upgrade a V4 Store to a V5 Store. + * + * Currently upgrade is fixed from v4 -> v5 + * + * Improvments: + * - Add List BDBMessageStore.getDatabases(); This can the be iterated to guard against new DBs being added. + * - A version in the store would allow automated upgrade or later with more available versions interactive upgrade. + * - Add process logging and disable all Store and Qpid logging. + */ +public class BDBStoreUpgrade +{ + private static final Logger _logger = LoggerFactory.getLogger(BDBStoreUpgrade.class); + /** The Store Directory that needs upgrading */ + File _fromDir; + /** The Directory that will be made to contain the upgraded store */ + File _toDir; + /** The Directory that will be made to backup the original store if required */ + File _backupDir; + + /** The Old Store */ + BDBMessageStore _oldMessageStore; + /** The New Store */ + BDBMessageStore _newMessageStore; + /** The file ending that is used by BDB Store Files */ + private static final String BDB_FILE_ENDING = ".jdb"; + + static final Options _options = new Options(); + static CommandLine _commandLine; + private boolean _interactive; + private boolean _force; + + private static final String VERSION = "3.0"; + private static final String USER_ABORTED_PROCESS = "User aborted process"; + private static final int LOWEST_SUPPORTED_STORE_VERSION = 4; + private static final String PREVIOUS_STORE_VERSION_UNSUPPORTED = "Store upgrade from version {0} is not supported." + + " You must first run the previous store upgrade tool."; + private static final String FOLLOWING_STORE_VERSION_UNSUPPORTED = "Store version {0} is newer than this tool supports. " + + "You must use a newer version of the store upgrade tool"; + private static final String STORE_ALREADY_UPGRADED = "Store has already been upgraded to version {0}."; + + private static final String OPTION_INPUT_SHORT = "i"; + private static final String OPTION_INPUT = "input"; + private static final String OPTION_OUTPUT_SHORT = "o"; + private static final String OPTION_OUTPUT = "output"; + private static final String OPTION_BACKUP_SHORT = "b"; + private static final String OPTION_BACKUP = "backup"; + private static final String OPTION_QUIET_SHORT = "q"; + private static final String OPTION_QUIET = "quiet"; + private static final String OPTION_FORCE_SHORT = "f"; + private static final String OPTION_FORCE = "force"; + private boolean _inplace = false; + + public BDBStoreUpgrade(String fromDir, String toDir, String backupDir, boolean interactive, boolean force) + { + _interactive = interactive; + _force = force; + + _fromDir = new File(fromDir); + if (!_fromDir.exists()) + { + throw new IllegalArgumentException("BDBStore path '" + fromDir + "' could not be read. " + + "Ensure the path is correct and that the permissions are correct."); + } + + if (!isDirectoryAStoreDir(_fromDir)) + { + throw new IllegalArgumentException("Specified directory '" + fromDir + "' does not contain a valid BDBMessageStore."); + } + + if (toDir == null) + { + _inplace = true; + _toDir = new File(fromDir+"-Inplace"); + } + else + { + _toDir = new File(toDir); + } + + if (_toDir.exists()) + { + if (_interactive) + { + if (toDir == null) + { + System.out.println("Upgrading in place:" + fromDir); + } + else + { + System.out.println("Upgrade destination: '" + toDir + "'"); + } + + if (userInteract("Upgrade destination exists do you wish to replace it?")) + { + if (!FileUtils.delete(_toDir, true)) + { + throw new IllegalArgumentException("Unable to remove upgrade destination '" + _toDir + "'"); + } + } + else + { + throw new IllegalArgumentException("Upgrade destination '" + _toDir + "' already exists. "); + } + } + else + { + if (_force) + { + if (!FileUtils.delete(_toDir, true)) + { + throw new IllegalArgumentException("Unable to remove upgrade destination '" + _toDir + "'"); + } + } + else + { + throw new IllegalArgumentException("Upgrade destination '" + _toDir + "' already exists. "); + } + } + } + + if (!_toDir.mkdirs()) + { + throw new IllegalArgumentException("Upgrade destination '" + _toDir + "' could not be created. " + + "Ensure the path is correct and that the permissions are correct."); + } + + if (backupDir != null) + { + if (backupDir.equals("")) + { + _backupDir = new File(_fromDir.getAbsolutePath().toString() + "-Backup"); + } + else + { + _backupDir = new File(backupDir); + } + } + else + { + _backupDir = null; + } + } + + private static String ANSWER_OPTIONS = " Yes/No/Abort? "; + private static String ANSWER_NO = "no"; + private static String ANSWER_N = "n"; + private static String ANSWER_YES = "yes"; + private static String ANSWER_Y = "y"; + private static String ANSWER_ABORT = "abort"; + private static String ANSWER_A = "a"; + + /** + * Interact with the user via System.in and System.out. If the user wishes to Abort then a RuntimeException is thrown. + * Otherwise the method will return based on their response true=yes false=no. + * + * @param message Message to print out + * + * @return boolean response from user if they wish to proceed + */ + private boolean userInteract(String message) + { + System.out.print(message + ANSWER_OPTIONS); + BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); + + String input = ""; + try + { + input = br.readLine(); + } + catch (IOException e) + { + input = ""; + } + + if (input.equalsIgnoreCase(ANSWER_Y) || input.equalsIgnoreCase(ANSWER_YES)) + { + return true; + } + else + { + if (input.equalsIgnoreCase(ANSWER_N) || input.equalsIgnoreCase(ANSWER_NO)) + { + return false; + } + else + { + if (input.equalsIgnoreCase(ANSWER_A) || input.equalsIgnoreCase(ANSWER_ABORT)) + { + throw new RuntimeException(USER_ABORTED_PROCESS); + } + } + } + + return userInteract(message); + } + + /** + * Upgrade a Store of a specified version to the latest version. + * + * @param version the version of the current store + * + * @throws Exception + */ + public void upgradeFromVersion(int version) throws Exception + { + upgradeFromVersion(version, _fromDir, _toDir, _backupDir, _force, + _inplace); + } + + /** + * Upgrade a Store of a specified version to the latest version. + * + * @param version the version of the current store + * @param fromDir the directory with the old Store + * @param toDir the directrory to hold the newly Upgraded Store + * @param backupDir the directrory to backup to if required + * @param force suppress all questions + * @param inplace replace the from dir with the upgraded result in toDir + * + * @throws Exception due to Virtualhost/MessageStore.close() being + * rather poor at exception handling + * @throws DatabaseException if there is a problem with the store formats + * @throws AMQException if there is an issue creating Qpid data structures + */ + public void upgradeFromVersion(int version, File fromDir, File toDir, + File backupDir, boolean force, + boolean inplace) throws Exception + { + _logger.info("Located store to upgrade at '" + fromDir + "'"); + + // Verify user has created a backup, giving option to perform backup + if (_interactive) + { + if (!userInteract("Have you performed a DB backup of this store.")) + { + File backup; + if (backupDir == null) + { + backup = new File(fromDir.getAbsolutePath().toString() + "-Backup"); + } + else + { + backup = backupDir; + } + + if (userInteract("Do you wish to perform a DB backup now? " + + "(Store will be backed up to '" + backup.getName() + "')")) + { + performDBBackup(fromDir, backup, force); + } + else + { + if (!userInteract("Are you sure wish to proceed with DB migration without backup? " + + "(For more details of the consequences check the Qpid/BDB Message Store Wiki).")) + { + throw new IllegalArgumentException("Upgrade stopped at user request as no DB Backup performed."); + } + } + } + else + { + if (!inplace) + { + _logger.info("Upgrade will create a new store at '" + toDir + "'"); + } + + _logger.info("Using the contents in the Message Store '" + fromDir + "'"); + + if (!userInteract("Do you wish to proceed?")) + { + throw new IllegalArgumentException("Upgrade stopped as did not wish to proceed"); + } + } + } + else + { + if (backupDir != null) + { + performDBBackup(fromDir, backupDir, force); + } + } + + CurrentActor.set(new BrokerActor(new NullRootMessageLogger())); + + //Create a new messageStore + _newMessageStore = new BDBMessageStore(); + _newMessageStore.configure(toDir, false); + _newMessageStore.start(); + + try + { + //Load the old MessageStore + switch (version) + { + default: + case 4: + _oldMessageStore = new BDBMessageStore(4); + _oldMessageStore.configure(fromDir, true); + _oldMessageStore.start(); + upgradeFromVersion_4(); + break; + case 3: + case 2: + case 1: + throw new IllegalArgumentException(MessageFormat.format(PREVIOUS_STORE_VERSION_UNSUPPORTED, + new Object[] { Integer.toString(version) })); + } + } + finally + { + _newMessageStore.close(); + if (_oldMessageStore != null) + { + _oldMessageStore.close(); + } + // if we are running inplace then swap fromDir and toDir + if (inplace) + { + // Remove original copy + if (FileUtils.delete(fromDir, true)) + { + // Rename upgraded store + toDir.renameTo(fromDir); + } + else + { + throw new RuntimeException("Unable to upgrade inplace as " + + "unable to delete source '" + +fromDir+"', Store upgrade " + + "successfully performed to :" + +toDir); + } + } + } + } + + private void upgradeFromVersion_4() throws AMQException, DatabaseException + { + _logger.info("Starting store upgrade from version 4"); + + //Migrate _exchangeDb; + _logger.info("Exchanges"); + + moveContents(_oldMessageStore.getExchangesDb(), _newMessageStore.getExchangesDb(), "Exchange"); + + final List<AMQShortString> topicExchanges = new ArrayList<AMQShortString>(); + final TupleBinding exchangeTB = new ExchangeTB(); + + DatabaseVisitor exchangeListVisitor = new DatabaseVisitor() + { + public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException + { + ExchangeRecord exchangeRec = (ExchangeRecord) exchangeTB.entryToObject(value); + AMQShortString type = exchangeRec.getType(); + + if (ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(type)) + { + topicExchanges.add(exchangeRec.getNameShortString()); + } + } + }; + _oldMessageStore.visitExchanges(exchangeListVisitor); + + + //Migrate _queueBindingsDb; + _logger.info("Queue Bindings"); + moveContents(_oldMessageStore.getBindingsDb(), _newMessageStore.getBindingsDb(), "Queue Binding"); + + //Inspect the bindings to gather a list of queues which are probably durable subscriptions, i.e. those + //which have a colon in their name and are bound to the Topic exchanges above + final List<AMQShortString> durableSubQueues = new ArrayList<AMQShortString>(); + final TupleBinding<BindingKey> bindingTB = _oldMessageStore.getBindingTupleBindingFactory().getInstance(); + + DatabaseVisitor durSubQueueListVisitor = new DatabaseVisitor() + { + public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException + { + BindingKey bindingRec = (BindingKey) bindingTB.entryToObject(key); + AMQShortString queueName = bindingRec.getQueueName(); + AMQShortString exchangeName = bindingRec.getExchangeName(); + + if (topicExchanges.contains(exchangeName) && queueName.asString().contains(":")) + { + durableSubQueues.add(queueName); + } + } + }; + _oldMessageStore.visitBindings(durSubQueueListVisitor); + + + //Migrate _queueDb; + _logger.info("Queues"); + + // hold the list of existing queue names + final List<AMQShortString> existingQueues = new ArrayList<AMQShortString>(); + + final TupleBinding<QueueRecord> queueTupleBinding = _oldMessageStore.getQueueTupleBindingFactory().getInstance(); + + DatabaseVisitor queueVisitor = new DatabaseVisitor() + { + public void visit(DatabaseEntry key, DatabaseEntry value) throws AMQStoreException + { + QueueRecord queueRec = (QueueRecord) queueTupleBinding.entryToObject(value); + AMQShortString queueName = queueRec.getNameShortString(); + + //if the queue name is in the gathered list then set its exclusivity true + if (durableSubQueues.contains(queueName)) + { + _logger.info("Marking as possible DurableSubscription backing queue: " + queueName); + queueRec.setExclusive(true); + } + + //The simple call to createQueue with the QueueRecord object is sufficient for a v2->v3 upgrade as + //the extra 'exclusive' property in v3 will be defaulted to false in the record creation. + _newMessageStore.createQueue(queueRec); + + _count++; + existingQueues.add(queueName); + } + }; + _oldMessageStore.visitQueues(queueVisitor); + + logCount(queueVisitor.getVisitedCount(), "Queue"); + + + // Look for persistent messages stored for non-durable queues + _logger.info("Checking for messages previously sent to non-durable queues"); + + // track all message delivery to existing queues + final HashSet<Long> queueMessages = new HashSet<Long>(); + + // hold all non existing queues and their messages IDs + final HashMap<String, HashSet<Long>> phantomMessageQueues = new HashMap<String, HashSet<Long>>(); + + // delivery DB visitor to check message delivery and identify non existing queues + final QueueEntryTB queueEntryTB = new QueueEntryTB(); + DatabaseVisitor messageDeliveryCheckVisitor = new DatabaseVisitor() + { + public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException + { + QueueEntryKey entryKey = (QueueEntryKey) queueEntryTB.entryToObject(key); + Long messageId = entryKey.getMessageId(); + AMQShortString queueName = entryKey.getQueueName(); + if (!existingQueues.contains(queueName)) + { + String name = queueName.asString(); + HashSet<Long> messages = phantomMessageQueues.get(name); + if (messages == null) + { + messages = new HashSet<Long>(); + phantomMessageQueues.put(name, messages); + } + messages.add(messageId); + _count++; + } + else + { + queueMessages.add(messageId); + } + } + }; + _oldMessageStore.visitDelivery(messageDeliveryCheckVisitor); + + if (phantomMessageQueues.isEmpty()) + { + _logger.info("No such messages were found"); + } + else + { + _logger.info("Found " + messageDeliveryCheckVisitor.getVisitedCount()+ " such messages in total"); + + for (Entry<String, HashSet<Long>> phantomQueue : phantomMessageQueues.entrySet()) + { + String queueName = phantomQueue.getKey(); + HashSet<Long> messages = phantomQueue.getValue(); + + _logger.info(MessageFormat.format("There are {0} messages which were previously delivered to non-durable queue ''{1}''",messages.size(), queueName)); + + boolean createQueue; + if(!_interactive) + { + createQueue = true; + _logger.info("Running in batch-mode, marking queue as durable to ensure retention of the messages."); + } + else + { + createQueue = userInteract("Do you want to make this queue durable?\n" + + "NOTE: Answering No will result in these messages being discarded!"); + } + + if (createQueue) + { + for (Long messageId : messages) + { + queueMessages.add(messageId); + } + AMQShortString name = new AMQShortString(queueName); + existingQueues.add(name); + QueueRecord record = new QueueRecord(name, null, false, null); + _newMessageStore.createQueue(record); + } + } + } + + + //Migrate _messageMetaDataDb; + _logger.info("Message MetaData"); + + final Database newMetaDataDB = _newMessageStore.getMetaDataDb(); + final TupleBinding<Object> oldMetaDataTupleBinding = _oldMessageStore.getMetaDataTupleBindingFactory().getInstance(); + final TupleBinding<Object> newMetaDataTupleBinding = _newMessageStore.getMetaDataTupleBindingFactory().getInstance(); + + DatabaseVisitor metaDataVisitor = new DatabaseVisitor() + { + public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException + { + _count++; + MessageMetaData metaData = (MessageMetaData) oldMetaDataTupleBinding.entryToObject(value); + + // get message id + Long messageId = TupleBinding.getPrimitiveBinding(Long.class).entryToObject(key); + + // ONLY copy data if message is delivered to existing queue + if (!queueMessages.contains(messageId)) + { + return; + } + DatabaseEntry newValue = new DatabaseEntry(); + newMetaDataTupleBinding.objectToEntry(metaData, newValue); + + newMetaDataDB.put(null, key, newValue); + } + }; + _oldMessageStore.visitMetaDataDb(metaDataVisitor); + + logCount(metaDataVisitor.getVisitedCount(), "Message MetaData"); + + + //Migrate _messageContentDb; + _logger.info("Message Contents"); + final Database newContentDB = _newMessageStore.getContentDb(); + + final TupleBinding<MessageContentKey> oldContentKeyTupleBinding = new MessageContentKeyTB_4(); + final TupleBinding<MessageContentKey> newContentKeyTupleBinding = new MessageContentKeyTB_5(); + final TupleBinding contentTB = new ContentTB(); + + DatabaseVisitor contentVisitor = new DatabaseVisitor() + { + long _prevMsgId = -1; //Initialise to invalid value + int _bytesSeenSoFar = 0; + + public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException + { + _count++; + + //determine the msgId of the current entry + MessageContentKey_4 contentKey = (MessageContentKey_4) oldContentKeyTupleBinding.entryToObject(key); + long msgId = contentKey.getMessageId(); + + // ONLY copy data if message is delivered to existing queue + if (!queueMessages.contains(msgId)) + { + return; + } + //if this is a new message, restart the byte offset count. + if(_prevMsgId != msgId) + { + _bytesSeenSoFar = 0; + } + + //determine the content size + ByteBuffer content = (ByteBuffer) contentTB.entryToObject(value); + int contentSize = content.limit(); + + //create the new key: id + previously seen data count + MessageContentKey_5 newKey = new MessageContentKey_5(msgId, _bytesSeenSoFar); + DatabaseEntry newKeyEntry = new DatabaseEntry(); + newContentKeyTupleBinding.objectToEntry(newKey, newKeyEntry); + + DatabaseEntry newValueEntry = new DatabaseEntry(); + contentTB.objectToEntry(content, newValueEntry); + + newContentDB.put(null, newKeyEntry, newValueEntry); + + _prevMsgId = msgId; + _bytesSeenSoFar += contentSize; + } + }; + _oldMessageStore.visitContentDb(contentVisitor); + + logCount(contentVisitor.getVisitedCount(), "Message Content"); + + + //Migrate _deliveryDb; + _logger.info("Delivery Records"); + final Database deliveryDb =_newMessageStore.getDeliveryDb(); + DatabaseVisitor deliveryDbVisitor = new DatabaseVisitor() + { + + public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException + { + _count++; + + // get message id from entry key + QueueEntryKey entryKey = (QueueEntryKey) queueEntryTB.entryToObject(key); + AMQShortString queueName = entryKey.getQueueName(); + + // ONLY copy data if message queue exists + if (existingQueues.contains(queueName)) + { + deliveryDb.put(null, key, value); + } + } + }; + _oldMessageStore.visitDelivery(deliveryDbVisitor); + logCount(contentVisitor.getVisitedCount(), "Delivery Record"); + } + + /** + * Log the specified count for item in a user friendly way. + * + * @param count of items to log + * @param item description of what is being logged. + */ + private void logCount(int count, String item) + { + _logger.info(" " + count + " " + item + " " + (count == 1 ? "entry" : "entries")); + } + + /** + * @param oldDatabase The old MessageStoreDB to perform the visit on + * @param newDatabase The new MessageStoreDB to copy the data to. + * @param contentName The string name of the content for display purposes. + * + * @throws AMQException Due to createQueue thorwing AMQException + * @throws DatabaseException If there is a problem with the loading of the data + */ + private void moveContents(Database oldDatabase, final Database newDatabase, String contentName) throws AMQException, DatabaseException + { + + DatabaseVisitor moveVisitor = new DatabaseVisitor() + { + public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException + { + _count++; + newDatabase.put(null, key, value); + } + }; + + _oldMessageStore.visitDatabase(oldDatabase, moveVisitor); + + logCount(moveVisitor.getVisitedCount(), contentName); + } + + private static void usage() + { + System.out.println("usage: BDBStoreUpgrade:\n [-h|--help] [-q|--quiet] [-f|--force] [-b|--backup <Path to backup-db>] " + + "-i|--input <Path to input-db> [-o|--output <Path to upgraded-db>]"); + } + + private static void help() + { + System.out.println("usage: BDBStoreUpgrade:"); + System.out.println("Required:"); + for (Object obj : _options.getOptions()) + { + Option option = (Option) obj; + if (option.isRequired()) + { + System.out.println("-" + option.getOpt() + "|--" + option.getLongOpt() + "\t\t-\t" + option.getDescription()); + } + } + + System.out.println("\nOptions:"); + for (Object obj : _options.getOptions()) + { + Option option = (Option) obj; + if (!option.isRequired()) + { + System.out.println("--" + option.getLongOpt() + "|-" + option.getOpt() + "\t\t-\t" + option.getDescription()); + } + } + } + + static boolean isDirectoryAStoreDir(File directory) + { + if (directory.isFile()) + { + return false; + } + + for (File file : directory.listFiles()) + { + if (file.isFile()) + { + if (file.getName().endsWith(BDB_FILE_ENDING)) + { + return true; + } + } + } + return false; + } + + static File[] discoverDBStores(File fromDir) + { + if (!fromDir.exists()) + { + throw new IllegalArgumentException("'" + fromDir + "' does not exist unable to upgrade."); + } + + // Ensure we are given a directory + if (fromDir.isFile()) + { + throw new IllegalArgumentException("'" + fromDir + "' is not a directory unable to upgrade."); + } + + // Check to see if we have been given a single directory + if (isDirectoryAStoreDir(fromDir)) + { + return new File[]{fromDir}; + } + + // Check to see if we have been give a directory containing stores. + List<File> stores = new LinkedList<File>(); + + for (File directory : fromDir.listFiles()) + { + if (directory.isDirectory()) + { + if (isDirectoryAStoreDir(directory)) + { + stores.add(directory); + } + } + } + + return stores.toArray(new File[stores.size()]); + } + + private static void performDBBackup(File source, File backup, boolean force) throws Exception + { + if (backup.exists()) + { + if (force) + { + _logger.info("Backup location exists. Forced to remove."); + FileUtils.delete(backup, true); + } + else + { + throw new IllegalArgumentException("Unable to perform backup a backup already exists."); + } + } + + try + { + _logger.info("Backing up '" + source + "' to '" + backup + "'"); + FileUtils.copyRecursive(source, backup); + } + catch (FileNotFoundException e) + { + //Throwing IAE here as this will be reported as a Backup not started + throw new IllegalArgumentException("Unable to perform backup:" + e.getMessage()); + } + catch (FileUtils.UnableToCopyException e) + { + //Throwing exception here as this will be reported as a Failed Backup + throw new Exception("Unable to perform backup due to:" + e.getMessage()); + } + } + + public static void main(String[] args) throws ParseException + { + setOptions(_options); + + final Options helpOptions = new Options(); + setHelpOptions(helpOptions); + + //Display help + boolean displayHelp = false; + try + { + if (new PosixParser().parse(helpOptions, args).hasOption("h")) + { + showHelp(); + } + } + catch (ParseException pe) + { + displayHelp = true; + } + + //Parse commandline for required arguments + try + { + _commandLine = new PosixParser().parse(_options, args); + } + catch (ParseException mae) + { + if (displayHelp) + { + showHelp(); + } + else + { + fatalError(mae.getMessage()); + } + } + + String fromDir = _commandLine.getOptionValue(OPTION_INPUT_SHORT); + String toDir = _commandLine.getOptionValue(OPTION_OUTPUT_SHORT); + String backupDir = _commandLine.getOptionValue(OPTION_BACKUP_SHORT); + + if (backupDir == null && _commandLine.hasOption(OPTION_BACKUP_SHORT)) + { + backupDir = ""; + } + + //Attempt to locate possible Store to upgrade on input path + File[] stores = new File[0]; + try + { + stores = discoverDBStores(new File(fromDir)); + } + catch (IllegalArgumentException iae) + { + fatalError(iae.getMessage()); + } + + boolean interactive = !_commandLine.hasOption(OPTION_QUIET_SHORT); + boolean force = _commandLine.hasOption(OPTION_FORCE_SHORT); + + try{ + for (File store : stores) + { + + // if toDir is null then we are upgrading inplace so we don't need + // to provide an upgraded toDir when upgrading multiple stores. + if (toDir == null || + // Check to see if we are upgrading a store specified in + // fromDir or if the directories are nested. + (stores.length > 0 + && stores[0].toString().length() == fromDir.length())) + { + upgrade(store, toDir, backupDir, interactive, force); + } + else + { + // Add the extra part of path from store to the toDir + upgrade(store, toDir + File.separator + store.toString().substring(fromDir.length()), backupDir, interactive, force); + } + } + } + catch (RuntimeException re) + { + if (!(USER_ABORTED_PROCESS).equals(re.getMessage())) + { + re.printStackTrace(); + _logger.error("Upgrade Failed: " + re.getMessage()); + } + else + { + _logger.error("Upgrade stopped : User aborted"); + } + } + + } + + @SuppressWarnings("static-access") + private static void setOptions(Options options) + { + Option input = + OptionBuilder.isRequired().hasArg().withDescription("Location (Path) of store to upgrade.").withLongOpt(OPTION_INPUT) + .create(OPTION_INPUT_SHORT); + + Option output = + OptionBuilder.hasArg().withDescription("Location (Path) for the upgraded-db to be written.").withLongOpt(OPTION_OUTPUT) + .create(OPTION_OUTPUT_SHORT); + + Option quiet = new Option(OPTION_QUIET_SHORT, OPTION_QUIET, false, "Disable interactive options."); + + Option force = new Option(OPTION_FORCE_SHORT, OPTION_FORCE, false, "Force upgrade removing any existing upgrade target."); + Option backup = + OptionBuilder.hasOptionalArg().withDescription("Location (Path) for the backup-db to be written.").withLongOpt(OPTION_BACKUP) + .create(OPTION_BACKUP_SHORT); + + options.addOption(input); + options.addOption(output); + options.addOption(quiet); + options.addOption(force); + options.addOption(backup); + setHelpOptions(options); + } + + private static void setHelpOptions(Options options) + { + options.addOption(new Option("h", "help", false, "Show this help.")); + } + + static void upgrade(File fromDir, String toDir, String backupDir, boolean interactive, boolean force) + { + + _logger.info("Running BDB Message Store upgrade tool: v" + VERSION); + int version = getStoreVersion(fromDir); + if (!isVersionUpgradable(version)) + { + return; + } + try + { + new BDBStoreUpgrade(fromDir.toString(), toDir, backupDir, interactive, force).upgradeFromVersion(version); + + _logger.info("Upgrade complete."); + } + catch (IllegalArgumentException iae) + { + _logger.error("Upgrade not started due to: " + iae.getMessage()); + } + catch (DatabaseException de) + { + de.printStackTrace(); + _logger.error("Upgrade Failed: " + de.getMessage()); + } + catch (RuntimeException re) + { + if (!(USER_ABORTED_PROCESS).equals(re.getMessage())) + { + re.printStackTrace(); + _logger.error("Upgrade Failed: " + re.getMessage()); + } + else + { + throw re; + } + } + catch (Exception e) + { + e.printStackTrace(); + _logger.error("Upgrade Failed: " + e.getMessage()); + } + } + + /** + * Utility method to verify if store of given version can be upgraded. + * + * @param version + * store version to verify + * @return true if store can be upgraded, false otherwise + */ + protected static boolean isVersionUpgradable(int version) + { + boolean storeUpgradable = false; + if (version == 0) + { + _logger.error("Existing store version is undefined!"); + } + else if (version < LOWEST_SUPPORTED_STORE_VERSION) + { + _logger.error(MessageFormat.format(PREVIOUS_STORE_VERSION_UNSUPPORTED, + new Object[] { Integer.toString(version) })); + } + else if (version == BDBMessageStore.DATABASE_FORMAT_VERSION) + { + _logger.error(MessageFormat.format(STORE_ALREADY_UPGRADED, new Object[] { Integer.toString(version) })); + } + else if (version > BDBMessageStore.DATABASE_FORMAT_VERSION) + { + _logger.error(MessageFormat.format(FOLLOWING_STORE_VERSION_UNSUPPORTED, + new Object[] { Integer.toString(version) })); + } + else + { + _logger.info("Existing store version is " + version); + storeUpgradable = true; + } + return storeUpgradable; + } + + /** + * Detects existing store version by checking list of database in store + * environment + * + * @param fromDir + * store folder + * @return version + */ + public static int getStoreVersion(File fromDir) + { + int version = 0; + EnvironmentConfig envConfig = new EnvironmentConfig(); + envConfig.setAllowCreate(false); + envConfig.setTransactional(false); + envConfig.setReadOnly(true); + Environment environment = null; + try + { + + environment = new Environment(fromDir, envConfig); + List<String> databases = environment.getDatabaseNames(); + for (String name : databases) + { + if (name.startsWith("exchangeDb")) + { + if (name.startsWith("exchangeDb_v")) + { + version = Integer.parseInt(name.substring(12)); + } + else + { + version = 1; + } + break; + } + } + } + catch (Exception e) + { + _logger.error("Failure to open existing database: " + e.getMessage()); + } + finally + { + if (environment != null) + { + try + { + environment.close(); + } + catch (Exception e) + { + // ignoring. It should never happen. + } + } + } + return version; + } + + private static void fatalError(String message) + { + System.out.println(message); + usage(); + System.exit(1); + } + + private static void showHelp() + { + help(); + System.exit(0); + } + +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingKey.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingKey.java new file mode 100644 index 0000000000..396f0ed817 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingKey.java @@ -0,0 +1,62 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; + +public class BindingKey extends Object +{ + private final AMQShortString _exchangeName; + private final AMQShortString _queueName; + private final AMQShortString _routingKey; + private final FieldTable _arguments; + + public BindingKey(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey, FieldTable arguments) + { + _exchangeName = exchangeName; + _queueName = queueName; + _routingKey = routingKey; + _arguments = arguments; + } + + + public AMQShortString getExchangeName() + { + return _exchangeName; + } + + public AMQShortString getQueueName() + { + return _queueName; + } + + public AMQShortString getRoutingKey() + { + return _routingKey; + } + + public FieldTable getArguments() + { + return _arguments; + } + +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java new file mode 100644 index 0000000000..5ea3e9c2e8 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.nio.ByteBuffer; + +import com.sleepycat.bind.tuple.TupleBinding; +import com.sleepycat.bind.tuple.TupleInput; +import com.sleepycat.bind.tuple.TupleOutput; + +public class ContentTB extends TupleBinding +{ + public Object entryToObject(TupleInput tupleInput) + { + + final int size = tupleInput.readInt(); + byte[] underlying = new byte[size]; + tupleInput.readFast(underlying); + return ByteBuffer.wrap(underlying); + } + + public void objectToEntry(Object object, TupleOutput tupleOutput) + { + ByteBuffer src = (ByteBuffer) object; + + src = src.slice(); + + byte[] chunkData = new byte[src.limit()]; + src.duplicate().get(chunkData); + + tupleOutput.writeInt(chunkData.length); + tupleOutput.writeFast(chunkData); + } +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DatabaseVisitor.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DatabaseVisitor.java new file mode 100644 index 0000000000..9bd879025f --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DatabaseVisitor.java @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import org.apache.qpid.AMQStoreException; + +import com.sleepycat.je.DatabaseEntry; +import com.sleepycat.je.DatabaseException; + +/** Visitor Interface so that each DatabaseEntry for a database can easily be processed. */ +public abstract class DatabaseVisitor +{ + protected int _count; + + abstract public void visit(DatabaseEntry entry, DatabaseEntry value) throws AMQStoreException, DatabaseException; + + public int getVisitedCount() + { + return _count; + } + + public void resetVisitCount() + { + _count = 0; + } +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java new file mode 100644 index 0000000000..f9c7828bef --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import com.sleepycat.bind.tuple.TupleBinding; +import com.sleepycat.bind.tuple.TupleInput; +import com.sleepycat.bind.tuple.TupleOutput; +import org.apache.log4j.Logger; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.store.berkeleydb.records.ExchangeRecord; + +public class ExchangeTB extends TupleBinding +{ + private static final Logger _log = Logger.getLogger(ExchangeTB.class); + + public ExchangeTB() + { + } + + public Object entryToObject(TupleInput tupleInput) + { + + AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput); + AMQShortString typeName = AMQShortStringEncoding.readShortString(tupleInput); + + boolean autoDelete = tupleInput.readBoolean(); + + return new ExchangeRecord(name, typeName, autoDelete); + } + + public void objectToEntry(Object object, TupleOutput tupleOutput) + { + ExchangeRecord exchange = (ExchangeRecord) object; + + AMQShortStringEncoding.writeShortString(exchange.getNameShortString(), tupleOutput); + AMQShortStringEncoding.writeShortString(exchange.getType(), tupleOutput); + + tupleOutput.writeBoolean(exchange.isAutoDelete()); + } +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java new file mode 100644 index 0000000000..c09498cce3 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java @@ -0,0 +1,74 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import org.apache.qpid.framing.FieldTable; + +import com.sleepycat.bind.tuple.TupleInput; +import com.sleepycat.bind.tuple.TupleOutput; +import com.sleepycat.je.DatabaseException; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; + +public class FieldTableEncoding +{ + public static FieldTable readFieldTable(TupleInput tupleInput) throws DatabaseException + { + long length = tupleInput.readLong(); + if (length <= 0) + { + return null; + } + else + { + + byte[] data = new byte[(int)length]; + tupleInput.readFast(data); + + try + { + return new FieldTable(new DataInputStream(new ByteArrayInputStream(data)),length); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + + } + + } + + public static void writeFieldTable(FieldTable fieldTable, TupleOutput tupleOutput) + { + + if (fieldTable == null) + { + tupleOutput.writeLong(0); + } + else + { + tupleOutput.writeLong(fieldTable.getEncodedSize()); + tupleOutput.writeFast(fieldTable.getDataAsBytes()); + } + } +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageContentKey.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageContentKey.java new file mode 100644 index 0000000000..005e8d4604 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageContentKey.java @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +public class MessageContentKey +{ + private long _messageId; + + public MessageContentKey(long messageId) + { + _messageId = messageId; + } + + + public long getMessageId() + { + return _messageId; + } + + public void setMessageId(long messageId) + { + this._messageId = messageId; + } +}
\ No newline at end of file diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java new file mode 100644 index 0000000000..c274fdec8c --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java @@ -0,0 +1,49 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import org.apache.qpid.framing.AMQShortString; + +public class QueueEntryKey +{ + private AMQShortString _queueName; + private long _messageId; + + + public QueueEntryKey(AMQShortString queueName, long messageId) + { + _queueName = queueName; + _messageId = messageId; + } + + + public AMQShortString getQueueName() + { + return _queueName; + } + + + public long getMessageId() + { + return _messageId; + } + +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_4.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_4.java new file mode 100644 index 0000000000..30357c97d4 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_4.java @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.keys; + +import org.apache.qpid.server.store.berkeleydb.MessageContentKey; + +public class MessageContentKey_4 extends MessageContentKey +{ + private int _chunkNum; + + public MessageContentKey_4(long messageId, int chunkNo) + { + super(messageId); + _chunkNum = chunkNo; + } + + public int getChunk() + { + return _chunkNum; + } + + public void setChunk(int chunk) + { + this._chunkNum = chunk; + } +}
\ No newline at end of file diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_5.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_5.java new file mode 100644 index 0000000000..a1a7fe80b5 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_5.java @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.keys; + +import org.apache.qpid.server.store.berkeleydb.MessageContentKey; + +public class MessageContentKey_5 extends MessageContentKey +{ + private int _offset; + + public MessageContentKey_5(long messageId, int chunkNo) + { + super(messageId); + _offset = chunkNo; + } + + public int getOffset() + { + return _offset; + } + + public void setOffset(int chunk) + { + this._offset = chunk; + } +}
\ No newline at end of file diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/ExchangeRecord.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/ExchangeRecord.java new file mode 100644 index 0000000000..f20367e33b --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/ExchangeRecord.java @@ -0,0 +1,53 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.records; + +import org.apache.qpid.framing.AMQShortString; + +public class ExchangeRecord extends Object +{ + private final AMQShortString _exchangeName; + private final AMQShortString _exchangeType; + private final boolean _autoDelete; + + public ExchangeRecord(AMQShortString exchangeName, AMQShortString exchangeType, boolean autoDelete) + { + _exchangeName = exchangeName; + _exchangeType = exchangeType; + _autoDelete = autoDelete; + } + + public AMQShortString getNameShortString() + { + return _exchangeName; + } + + public AMQShortString getType() + { + return _exchangeType; + } + + public boolean isAutoDelete() + { + return _autoDelete; + } + +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java new file mode 100644 index 0000000000..fbe10433ca --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java @@ -0,0 +1,66 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.records; + +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; + +public class QueueRecord extends Object +{ + private final AMQShortString _queueName; + private final AMQShortString _owner; + private final FieldTable _arguments; + private boolean _exclusive; + + public QueueRecord(AMQShortString queueName, AMQShortString owner, boolean exclusive, FieldTable arguments) + { + _queueName = queueName; + _owner = owner; + _exclusive = exclusive; + _arguments = arguments; + } + + public AMQShortString getNameShortString() + { + return _queueName; + } + + public AMQShortString getOwner() + { + return _owner; + } + + public boolean isExclusive() + { + return _exclusive; + } + + public void setExclusive(boolean exclusive) + { + _exclusive = exclusive; + } + + public FieldTable getArguments() + { + return _arguments; + } + +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/testclient/BackupTestClient.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/testclient/BackupTestClient.java new file mode 100644 index 0000000000..f6344b3d7d --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/testclient/BackupTestClient.java @@ -0,0 +1,120 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.testclient; + +import org.apache.log4j.Logger; + +import org.apache.qpid.ping.PingDurableClient; +import org.apache.qpid.server.store.berkeleydb.BDBBackup; +import org.apache.qpid.util.CommandLineParser; + +import java.util.Properties; + +/** + * BackupTestClient extends {@link PingDurableClient} with an action that takes a BDB backup when a configurable + * message count is reached. This enables a test user to restore this beackup, knowing how many committed but undelivered + * messages were in the backup, in order to check that all are re-delivered when the backup is retored. + * + * <p><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Perform BDB Backup on configurable message count. + * </table> + */ +public class BackupTestClient extends PingDurableClient +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(BackupTestClient.class); + + /** Holds the from directory to take backups from. */ + private String fromDir; + + /** Holds the to directory to store backups in. */ + private String toDir; + + /** + * Default constructor, passes all property overrides to the parent. + * + * @param overrides Any property overrides to apply to the defaults. + * + * @throws Exception Any underlying exception is allowed to fall through. + */ + BackupTestClient(Properties overrides) throws Exception + { + super(overrides); + } + + /** + * Starts the ping/wait/receive process. From and to directory locations for the BDB backups must be specified + * on the command line: + * + * <p/><table><caption>Command Line</caption> + * <tr><th> Option <th> Comment + * <tr><td> -fromdir <td> The path to the directory to back the bdb log file from. + * <tr><td> -todir <td> The path to the directory to save the backed up bdb log files to. + * </table> + * + * @param args The command line arguments. + */ + public static void main(String[] args) + { + try + { + // Use the same command line format as BDBBackup utility, (compulsory from and to directories). + Properties options = + CommandLineParser.processCommandLine(args, new CommandLineParser(BDBBackup.COMMAND_LINE_SPEC), + System.getProperties()); + BackupTestClient pingProducer = new BackupTestClient(options); + + // Keep the from and to directories for backups. + pingProducer.fromDir = options.getProperty("fromdir"); + pingProducer.toDir = options.getProperty("todir"); + + // Create a shutdown hook to terminate the ping-pong producer. + Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook()); + + // Ensure that the ping pong producer is registered to listen for exceptions on the connection too. + // pingProducer.getConnection().setExceptionListener(pingProducer); + + // Run the test procedure. + int sent = pingProducer.send(); + pingProducer.waitForUser("Press return to begin receiving the pings."); + pingProducer.receive(sent); + + System.exit(0); + } + catch (Exception e) + { + System.err.println(e.getMessage()); + log.error("Top level handler caught execption.", e); + System.exit(1); + } + } + + /** + * Supplies a triggered action extension, based on message count. This action takes a BDB log file backup. + */ + public void takeAction() + { + BDBBackup backupUtil = new BDBBackup(); + backupUtil.takeBackupNoLock(fromDir, toDir); + System.out.println("Took backup of BDB log files from directory: " + fromDir); + } +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java new file mode 100644 index 0000000000..301ee417c5 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java @@ -0,0 +1,25 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.tuples; + +public interface BindingTuple +{ +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java new file mode 100644 index 0000000000..8e17f074d7 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.tuples; + +import org.apache.qpid.server.store.berkeleydb.BindingKey; + +import com.sleepycat.bind.tuple.TupleBinding; + +public class BindingTupleBindingFactory extends TupleBindingFactory<BindingKey> +{ + public BindingTupleBindingFactory(int version) + { + super(version); + } + + public TupleBinding<BindingKey> getInstance() + { + switch (_version) + { + default: + case 5: + //no change from v4 + case 4: + return new BindingTuple_4(); + } + } +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_4.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_4.java new file mode 100644 index 0000000000..52b131a7f2 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_4.java @@ -0,0 +1,76 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.tuples; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding; +import org.apache.qpid.server.store.berkeleydb.BindingKey; +import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; + +import com.sleepycat.bind.tuple.TupleBinding; +import com.sleepycat.bind.tuple.TupleInput; +import com.sleepycat.bind.tuple.TupleOutput; +import com.sleepycat.je.DatabaseException; + +public class BindingTuple_4 extends TupleBinding<BindingKey> implements BindingTuple +{ + protected static final Logger _log = Logger.getLogger(BindingTuple.class); + + public BindingTuple_4() + { + super(); + } + + public BindingKey entryToObject(TupleInput tupleInput) + { + AMQShortString exchangeName = AMQShortStringEncoding.readShortString(tupleInput); + AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput); + AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput); + + FieldTable arguments; + + // Addition for Version 2 of this table + try + { + arguments = FieldTableEncoding.readFieldTable(tupleInput); + } + catch (DatabaseException e) + { + _log.error("Unable to create binding: " + e, e); + return null; + } + + return new BindingKey(exchangeName, queueName, routingKey, arguments); + } + + public void objectToEntry(BindingKey binding, TupleOutput tupleOutput) + { + AMQShortStringEncoding.writeShortString(binding.getExchangeName(), tupleOutput); + AMQShortStringEncoding.writeShortString(binding.getQueueName(), tupleOutput); + AMQShortStringEncoding.writeShortString(binding.getRoutingKey(), tupleOutput); + + // Addition for Version 2 of this table + FieldTableEncoding.writeFieldTable(binding.getArguments(), tupleOutput); + } + +}
\ No newline at end of file diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_4.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_4.java new file mode 100644 index 0000000000..f5ba6bbce3 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_4.java @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.tuples; + +import org.apache.qpid.server.store.berkeleydb.MessageContentKey; +import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_4; + +import com.sleepycat.bind.tuple.TupleBinding; +import com.sleepycat.bind.tuple.TupleInput; +import com.sleepycat.bind.tuple.TupleOutput; + +public class MessageContentKeyTB_4 extends TupleBinding<MessageContentKey> +{ + + public MessageContentKey entryToObject(TupleInput tupleInput) + { + long messageId = tupleInput.readLong(); + int chunk = tupleInput.readInt(); + return new MessageContentKey_4(messageId, chunk); + } + + public void objectToEntry(MessageContentKey object, TupleOutput tupleOutput) + { + final MessageContentKey_4 mk = (MessageContentKey_4) object; + tupleOutput.writeLong(mk.getMessageId()); + tupleOutput.writeInt(mk.getChunk()); + } + +}
\ No newline at end of file diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_5.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_5.java new file mode 100644 index 0000000000..e6a2fd23a8 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_5.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.tuples; + +import org.apache.qpid.server.store.berkeleydb.MessageContentKey; +import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_5; + +import com.sleepycat.bind.tuple.TupleBinding; +import com.sleepycat.bind.tuple.TupleInput; +import com.sleepycat.bind.tuple.TupleOutput; + +public class MessageContentKeyTB_5 extends TupleBinding<MessageContentKey> +{ + public MessageContentKey entryToObject(TupleInput tupleInput) + { + long messageId = tupleInput.readLong(); + int offset = tupleInput.readInt(); + return new MessageContentKey_5(messageId, offset); + } + + public void objectToEntry(MessageContentKey object, TupleOutput tupleOutput) + { + final MessageContentKey_5 mk = (MessageContentKey_5) object; + tupleOutput.writeLong(mk.getMessageId()); + tupleOutput.writeInt(mk.getOffset()); + } + +}
\ No newline at end of file diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTupleBindingFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTupleBindingFactory.java new file mode 100644 index 0000000000..76ee4f66e4 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTupleBindingFactory.java @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.tuples; + +import org.apache.qpid.server.store.berkeleydb.MessageContentKey; + +import com.sleepycat.bind.tuple.TupleBinding; + +public class MessageContentKeyTupleBindingFactory extends TupleBindingFactory<MessageContentKey> +{ + public MessageContentKeyTupleBindingFactory(int version) + { + super(version); + } + + public TupleBinding<MessageContentKey> getInstance() + { + switch (_version) + { + default: + case 5: + return new MessageContentKeyTB_5(); + case 4: + return new MessageContentKeyTB_4(); + } + } +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_4.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_4.java new file mode 100644 index 0000000000..e26b544e38 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_4.java @@ -0,0 +1,162 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.tuples; + +import com.sleepycat.bind.tuple.TupleBinding; +import com.sleepycat.bind.tuple.TupleInput; +import com.sleepycat.bind.tuple.TupleOutput; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.*; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.message.MessageMetaData; +import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding; + +import java.io.*; + +/** + * Handles the mapping to and from 0-8/0-9 message meta data + */ +public class MessageMetaDataTB_4 extends TupleBinding<Object> +{ + private static final Logger _log = Logger.getLogger(MessageMetaDataTB_4.class); + + public MessageMetaDataTB_4() + { + } + + public Object entryToObject(TupleInput tupleInput) + { + try + { + final MessagePublishInfo publishBody = readMessagePublishInfo(tupleInput); + final ContentHeaderBody contentHeaderBody = readContentHeaderBody(tupleInput); + final int contentChunkCount = tupleInput.readInt(); + + return new MessageMetaData(publishBody, contentHeaderBody, contentChunkCount); + } + catch (Exception e) + { + _log.error("Error converting entry to object: " + e, e); + // annoyingly just have to return null since we cannot throw + return null; + } + } + + public void objectToEntry(Object object, TupleOutput tupleOutput) + { + MessageMetaData message = (MessageMetaData) object; + try + { + writeMessagePublishInfo(message.getMessagePublishInfo(), tupleOutput); + } + catch (AMQException e) + { + // can't do anything else since the BDB interface precludes throwing any exceptions + // in practice we should never get an exception + throw new RuntimeException("Error converting object to entry: " + e, e); + } + writeContentHeader(message.getContentHeaderBody(), tupleOutput); + tupleOutput.writeInt(message.getContentChunkCount()); + } + + private MessagePublishInfo readMessagePublishInfo(TupleInput tupleInput) + { + + final AMQShortString exchange = AMQShortStringEncoding.readShortString(tupleInput); + final AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput); + final boolean mandatory = tupleInput.readBoolean(); + final boolean immediate = tupleInput.readBoolean(); + + return new MessagePublishInfo() + { + + public AMQShortString getExchange() + { + return exchange; + } + + public void setExchange(AMQShortString exchange) + { + + } + + public boolean isImmediate() + { + return immediate; + } + + public boolean isMandatory() + { + return mandatory; + } + + public AMQShortString getRoutingKey() + { + return routingKey; + } + } ; + + } + + private ContentHeaderBody readContentHeaderBody(TupleInput tupleInput) throws AMQFrameDecodingException, AMQProtocolVersionException + { + int bodySize = tupleInput.readInt(); + byte[] underlying = new byte[bodySize]; + tupleInput.readFast(underlying); + + try + { + return ContentHeaderBody.createFromBuffer(new DataInputStream(new ByteArrayInputStream(underlying)), bodySize); + } + catch (IOException e) + { + throw new AMQFrameDecodingException(null, e.getMessage(), e); + } + } + + private void writeMessagePublishInfo(MessagePublishInfo publishBody, TupleOutput tupleOutput) throws AMQException + { + + AMQShortStringEncoding.writeShortString(publishBody.getExchange(), tupleOutput); + AMQShortStringEncoding.writeShortString(publishBody.getRoutingKey(), tupleOutput); + tupleOutput.writeBoolean(publishBody.isMandatory()); + tupleOutput.writeBoolean(publishBody.isImmediate()); + } + + private void writeContentHeader(ContentHeaderBody headerBody, TupleOutput tupleOutput) + { + // write out the content header body + final int bodySize = headerBody.getSize(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(bodySize); + try + { + headerBody.writePayload(new DataOutputStream(baos)); + tupleOutput.writeInt(bodySize); + tupleOutput.writeFast(baos.toByteArray()); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + + } +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_5.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_5.java new file mode 100644 index 0000000000..6dc041cb23 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_5.java @@ -0,0 +1,77 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.tuples; + +import com.sleepycat.bind.tuple.TupleInput; +import com.sleepycat.bind.tuple.TupleOutput; +import org.apache.log4j.Logger; +import org.apache.qpid.server.store.MessageMetaDataType; +import org.apache.qpid.server.store.StorableMessageMetaData; + +/** + * Handles the mapping to and from message meta data + */ +public class MessageMetaDataTB_5 extends MessageMetaDataTB_4 +{ + private static final Logger _log = Logger.getLogger(MessageMetaDataTB_5.class); + + @Override + public Object entryToObject(TupleInput tupleInput) + { + try + { + final int bodySize = tupleInput.readInt(); + byte[] dataAsBytes = new byte[bodySize]; + tupleInput.readFast(dataAsBytes); + + java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(dataAsBytes); + buf.position(1); + buf = buf.slice(); + MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]]; + StorableMessageMetaData metaData = type.getFactory().createMetaData(buf); + + return metaData; + } + catch (Exception e) + { + _log.error("Error converting entry to object: " + e, e); + // annoyingly just have to return null since we cannot throw + return null; + } + } + + @Override + public void objectToEntry(Object object, TupleOutput tupleOutput) + { + StorableMessageMetaData metaData = (StorableMessageMetaData) object; + + final int bodySize = 1 + metaData.getStorableSize(); + byte[] underlying = new byte[bodySize]; + underlying[0] = (byte) metaData.getType().ordinal(); + java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(underlying); + buf.position(1); + buf = buf.slice(); + + metaData.writeToBuffer(0, buf); + tupleOutput.writeInt(bodySize); + tupleOutput.writeFast(underlying); + } +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java new file mode 100644 index 0000000000..40153c13ea --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.tuples; + +import com.sleepycat.bind.tuple.TupleBinding; + +public class MessageMetaDataTupleBindingFactory extends TupleBindingFactory<Object> +{ + public MessageMetaDataTupleBindingFactory(int version) + { + super(version); + } + + public TupleBinding<Object> getInstance() + { + switch (_version) + { + default: + case 5: + return new MessageMetaDataTB_5(); + case 4: + return new MessageMetaDataTB_4(); + } + } +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java new file mode 100644 index 0000000000..975e558874 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.tuples; + +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding; +import org.apache.qpid.server.store.berkeleydb.QueueEntryKey; + +import com.sleepycat.bind.tuple.TupleBinding; +import com.sleepycat.bind.tuple.TupleInput; +import com.sleepycat.bind.tuple.TupleOutput; + +public class QueueEntryTB extends TupleBinding<QueueEntryKey> +{ + public QueueEntryKey entryToObject(TupleInput tupleInput) + { + AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput); + Long messageId = tupleInput.readLong(); + + return new QueueEntryKey(queueName, messageId); + } + + public void objectToEntry(QueueEntryKey mk, TupleOutput tupleOutput) + { + AMQShortStringEncoding.writeShortString(mk.getQueueName(),tupleOutput); + tupleOutput.writeLong(mk.getMessageId()); + } +}
\ No newline at end of file diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java new file mode 100644 index 0000000000..affa9a271d --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java @@ -0,0 +1,25 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.tuples; + +public interface QueueTuple +{ +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java new file mode 100644 index 0000000000..512e319f96 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.tuples; + +import org.apache.qpid.server.store.berkeleydb.records.QueueRecord; + +import com.sleepycat.bind.tuple.TupleBinding; + +public class QueueTupleBindingFactory extends TupleBindingFactory<QueueRecord> +{ + + public QueueTupleBindingFactory(int version) + { + super(version); + } + + public TupleBinding<QueueRecord> getInstance() + { + switch (_version) + { + default: + case 5: + return new QueueTuple_5(); + case 4: + return new QueueTuple_4(); + } + } +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_4.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_4.java new file mode 100644 index 0000000000..347eecf08e --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_4.java @@ -0,0 +1,72 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.tuples; + +import com.sleepycat.bind.tuple.TupleBinding; +import com.sleepycat.bind.tuple.TupleInput; +import com.sleepycat.bind.tuple.TupleOutput; +import com.sleepycat.je.DatabaseException; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding; +import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding; +import org.apache.qpid.server.store.berkeleydb.records.QueueRecord; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; + +public class QueueTuple_4 extends TupleBinding<QueueRecord> implements QueueTuple +{ + protected static final Logger _logger = Logger.getLogger(QueueTuple_4.class); + + protected FieldTable _arguments; + + public QueueTuple_4() + { + super(); + } + + public QueueRecord entryToObject(TupleInput tupleInput) + { + try + { + AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput); + AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput); + // Addition for Version 2 of this table, read the queue arguments + FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput); + + return new QueueRecord(name, owner, false, arguments); + } + catch (DatabaseException e) + { + _logger.error("Unable to create binding: " + e, e); + return null; + } + + } + + public void objectToEntry(QueueRecord queue, TupleOutput tupleOutput) + { + AMQShortStringEncoding.writeShortString(queue.getNameShortString(), tupleOutput); + AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput); + // Addition for Version 2 of this table, store the queue arguments + FieldTableEncoding.writeFieldTable(queue.getArguments(), tupleOutput); + } +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java new file mode 100644 index 0000000000..0f293b79b7 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java @@ -0,0 +1,75 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.tuples; + +import com.sleepycat.bind.tuple.TupleInput; +import com.sleepycat.bind.tuple.TupleOutput; +import com.sleepycat.je.DatabaseException; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding; +import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding; +import org.apache.qpid.server.store.berkeleydb.records.QueueRecord; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; + +public class QueueTuple_5 extends QueueTuple_4 +{ + protected static final Logger _logger = Logger.getLogger(QueueTuple_5.class); + + protected FieldTable _arguments; + + public QueueTuple_5() + { + super(); + } + + public QueueRecord entryToObject(TupleInput tupleInput) + { + try + { + AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput); + AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput); + // Addition for Version 2 of this table, read the queue arguments + FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput); + // Addition for Version 3 of this table, read the queue exclusivity + boolean exclusive = tupleInput.readBoolean(); + + return new QueueRecord(name, owner, exclusive, arguments); + } + catch (DatabaseException e) + { + _logger.error("Unable to create binding: " + e, e); + return null; + } + + } + + public void objectToEntry(QueueRecord queue, TupleOutput tupleOutput) + { + AMQShortStringEncoding.writeShortString(queue.getNameShortString(), tupleOutput); + AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput); + // Addition for Version 2 of this table, store the queue arguments + FieldTableEncoding.writeFieldTable(queue.getArguments(), tupleOutput); + // Addition for Version 3 of this table, store the queue exclusivity + tupleOutput.writeBoolean(queue.isExclusive()); + } +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java new file mode 100644 index 0000000000..2adac1f9a3 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.tuples; + +import com.sleepycat.bind.tuple.TupleBinding; + +public abstract class TupleBindingFactory<E> +{ + protected int _version; + + public TupleBindingFactory(int version) + { + _version = version; + } + + public abstract TupleBinding<E> getInstance(); +} |