diff options
author | Michael Cahill <michael.cahill@wiredtiger.com> | 2014-06-27 18:38:41 +1000 |
---|---|---|
committer | Michael Cahill <michael.cahill@wiredtiger.com> | 2014-06-27 18:38:41 +1000 |
commit | 7b4f00fdf9bacb2bce4a75ed0080f0756eeaa539 (patch) | |
tree | 23b04c61c66ff11b792151e79d47da8f4203ef1a /api | |
parent | f2842ced64dfb7d4f79df7a70b8a3da6f6467ed4 (diff) | |
download | mongo-7b4f00fdf9bacb2bce4a75ed0080f0756eeaa539.tar.gz |
Add stubs for enough of the RocksDB API that MongoDB builds.
Diffstat (limited to 'api')
-rw-r--r-- | api/leveldb/Makefile.am | 4 | ||||
-rw-r--r-- | api/leveldb/db/dbformat.h | 10 | ||||
-rw-r--r-- | api/leveldb/db/write_batch.cc | 279 | ||||
-rw-r--r-- | api/leveldb/db/write_batch_internal.h | 14 | ||||
-rw-r--r-- | api/leveldb/include/leveldb/cache.h | 6 | ||||
-rw-r--r-- | api/leveldb/include/leveldb/db.h | 131 | ||||
-rw-r--r-- | api/leveldb/include/leveldb/options.h | 32 | ||||
-rw-r--r-- | api/leveldb/include/leveldb/slice.h | 12 | ||||
-rw-r--r-- | api/leveldb/include/leveldb/write_batch.h | 73 | ||||
-rw-r--r-- | api/leveldb/leveldb_wt.cc | 299 | ||||
-rw-r--r-- | api/leveldb/leveldb_wt.h | 338 | ||||
-rw-r--r-- | api/leveldb/rocks_wt.cc | 128 | ||||
-rw-r--r-- | api/leveldb/util/coding.cc | 197 | ||||
-rw-r--r-- | api/leveldb/util/coding.h | 207 |
14 files changed, 1342 insertions, 388 deletions
diff --git a/api/leveldb/Makefile.am b/api/leveldb/Makefile.am index 6f7f471181d..c1aa5916802 100644 --- a/api/leveldb/Makefile.am +++ b/api/leveldb/Makefile.am @@ -48,6 +48,10 @@ libwiredtiger_leveldb_la_SOURCES = \ if HAVE_ELEVELDB libwiredtiger_leveldb_la_SOURCES += util/perf_count.cc endif +if HAVE_ROCKSDB +libwiredtiger_leveldb_la_SOURCES += rocks_wt.cc +endif + leveldb_test_SOURCES = leveldb_test.cc #leveldb_test_LDADD = $(top_builddir)/libwiredtiger.la diff --git a/api/leveldb/db/dbformat.h b/api/leveldb/db/dbformat.h index 37c21ed95e7..2c8a9d5f5a7 100644 --- a/api/leveldb/db/dbformat.h +++ b/api/leveldb/db/dbformat.h @@ -44,6 +44,16 @@ class InternalKey; enum ValueType { kTypeDeletion = 0x0, kTypeValue = 0x1 +#ifdef HAVE_ROCKSDB + ,kTypeMerge = 0x2, + // Following types are used only in write ahead logs. They are not used in + // memtables or sst files: + kTypeLogData = 0x3, + kTypeColumnFamilyDeletion = 0x4, + kTypeColumnFamilyValue = 0x5, + kTypeColumnFamilyMerge = 0x6, + kMaxValue = 0x7F +#endif }; // kValueTypeForSeek defines the ValueType that should be passed when // constructing a ParsedInternalKey object for seeking to a particular diff --git a/api/leveldb/db/write_batch.cc b/api/leveldb/db/write_batch.cc index 6a488a83e3e..3edb187f796 100644 --- a/api/leveldb/db/write_batch.cc +++ b/api/leveldb/db/write_batch.cc @@ -14,8 +14,11 @@ // data: uint8[len] #include "leveldb_wt.h" + #include "db/write_batch_internal.h" +#ifndef HAVE_ROCKSDB + namespace leveldb { // WriteBatch header has an 8-byte sequence number followed by a 4-byte count. @@ -107,3 +110,279 @@ void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) { } } // namespace leveldb + +#else // HAVE_ROCKSDB + +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +// WriteBatch::rep_ := +// sequence: fixed64 +// count: fixed32 +// data: record[count] +// record := +// kTypeValue varstring varstring +// kTypeMerge varstring varstring +// kTypeDeletion varstring +// kTypeColumnFamilyValue varint32 varstring varstring +// kTypeColumnFamilyMerge varint32 varstring varstring +// kTypeColumnFamilyDeletion varint32 varstring varstring +// varstring := +// len: varint32 +// data: uint8[len] + +#include <stdexcept> + +namespace rocksdb { + +// WriteBatch header has an 8-byte sequence number followed by a 4-byte count. +static const size_t kHeader = 12; + +WriteBatch::WriteBatch(size_t reserved_bytes) { + rep_.reserve((reserved_bytes > kHeader) ? reserved_bytes : kHeader); + Clear(); +} + +WriteBatch::~WriteBatch() { } + +WriteBatch::Handler::~Handler() { } + +void WriteBatch::Handler::Put(const Slice& key, const Slice& value) { + // you need to either implement Put or PutCF + throw std::runtime_error("Handler::Put not implemented!"); +} + +#ifdef NOT_YET +void WriteBatch::Handler::Merge(const Slice& key, const Slice& value) { + throw std::runtime_error("Handler::Merge not implemented!"); +} +#endif + +void WriteBatch::Handler::Delete(const Slice& key) { + // you need to either implement Delete or DeleteCF + throw std::runtime_error("Handler::Delete not implemented!"); +} + +#ifdef NOT_YET +void WriteBatch::Handler::LogData(const Slice& blob) { + // If the user has not specified something to do with blobs, then we ignore + // them. +} +#endif + +bool WriteBatch::Handler::Continue() { + return true; +} + +void WriteBatch::Clear() { + rep_.clear(); + rep_.resize(kHeader); +} + +int WriteBatch::Count() const { + return WriteBatchInternal::Count(this); +} + +Status WriteBatch::Iterate(Handler* handler) const { + Slice input(rep_); + if (input.size() < kHeader) { + return Status::Corruption("malformed WriteBatch (too small)"); + } + + input.remove_prefix(kHeader); + Slice key, value, blob; + int found = 0; + Status s; + while (s.ok() && !input.empty() && handler->Continue()) { + char tag = input[0]; + input.remove_prefix(1); + uint32_t column_family = 0; // default + switch (tag) { + case kTypeColumnFamilyValue: + if (!GetVarint32(&input, &column_family)) { + return Status::Corruption("bad WriteBatch Put"); + } + // intentional fallthrough + case kTypeValue: + if (GetLengthPrefixedSlice(&input, &key) && + GetLengthPrefixedSlice(&input, &value)) { + s = handler->PutCF(column_family, key, value); + found++; + } else { + return Status::Corruption("bad WriteBatch Put"); + } + break; + case kTypeColumnFamilyDeletion: + if (!GetVarint32(&input, &column_family)) { + return Status::Corruption("bad WriteBatch Delete"); + } + // intentional fallthrough + case kTypeDeletion: + if (GetLengthPrefixedSlice(&input, &key)) { + s = handler->DeleteCF(column_family, key); + found++; + } else { + return Status::Corruption("bad WriteBatch Delete"); + } + break; + case kTypeColumnFamilyMerge: + if (!GetVarint32(&input, &column_family)) { + return Status::Corruption("bad WriteBatch Merge"); + } + // intentional fallthrough + case kTypeMerge: + if (GetLengthPrefixedSlice(&input, &key) && + GetLengthPrefixedSlice(&input, &value)) { + s = handler->MergeCF(column_family, key, value); + found++; + } else { + return Status::Corruption("bad WriteBatch Merge"); + } + break; + case kTypeLogData: + if (GetLengthPrefixedSlice(&input, &blob)) { + handler->LogData(blob); + } else { + return Status::Corruption("bad WriteBatch Blob"); + } + break; + default: + return Status::Corruption("unknown WriteBatch tag"); + } + } + if (!s.ok()) { + return s; + } + if (found != WriteBatchInternal::Count(this)) { + return Status::Corruption("WriteBatch has wrong count"); + } else { + return Status::OK(); + } +} + +int WriteBatchInternal::Count(const WriteBatch* b) { + return DecodeFixed32(b->rep_.data() + 8); +} + +void WriteBatchInternal::SetCount(WriteBatch* b, int n) { + EncodeFixed32(&b->rep_[8], n); +} + +#ifdef NOT_YET +SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) { + return SequenceNumber(DecodeFixed64(b->rep_.data())); +} + +void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) { + EncodeFixed64(&b->rep_[0], seq); +} +#endif + +void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id, + const Slice& key, const Slice& value) { + WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); + if (column_family_id == 0) { + b->rep_.push_back(static_cast<char>(kTypeValue)); + } else { + b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue)); + PutVarint32(&b->rep_, column_family_id); + } + PutLengthPrefixedSlice(&b->rep_, key); + PutLengthPrefixedSlice(&b->rep_, value); +} + +namespace { +inline uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) { + uint32_t column_family_id = 0; + if (column_family != NULL) { + ColumnFamilyHandleImpl *cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); + column_family_id = cfh->GetID(); + } + return column_family_id; +} +} // namespace + +void WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) { + WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key, value); +} + +void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id, + const SliceParts& key, const SliceParts& value) { + WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); + if (column_family_id == 0) { + b->rep_.push_back(static_cast<char>(kTypeValue)); + } else { + b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue)); + PutVarint32(&b->rep_, column_family_id); + } + PutLengthPrefixedSliceParts(&b->rep_, key); + PutLengthPrefixedSliceParts(&b->rep_, value); +} + +void WriteBatch::Put(ColumnFamilyHandle* column_family, const SliceParts& key, + const SliceParts& value) { + WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key, value); +} + +void WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id, + const Slice& key) { + WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); + if (column_family_id == 0) { + b->rep_.push_back(static_cast<char>(kTypeDeletion)); + } else { + b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion)); + PutVarint32(&b->rep_, column_family_id); + } + PutLengthPrefixedSlice(&b->rep_, key); +} + +void WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key) { + WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family), key); +} + +#ifdef NOT_YET +void WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id, + const Slice& key, const Slice& value) { + WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); + if (column_family_id == 0) { + b->rep_.push_back(static_cast<char>(kTypeMerge)); + } else { + b->rep_.push_back(static_cast<char>(kTypeColumnFamilyMerge)); + PutVarint32(&b->rep_, column_family_id); + } + PutLengthPrefixedSlice(&b->rep_, key); + PutLengthPrefixedSlice(&b->rep_, value); +} + +void WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) { + WriteBatchInternal::Merge(this, GetColumnFamilyID(column_family), key, value); +} + +void WriteBatch::PutLogData(const Slice& blob) { + rep_.push_back(static_cast<char>(kTypeLogData)); + PutLengthPrefixedSlice(&rep_, blob); +} +#endif + +void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) { + assert(contents.size() >= kHeader); + b->rep_.assign(contents.data(), contents.size()); +} + +void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) { + SetCount(dst, Count(dst) + Count(src)); + assert(src->rep_.size() >= kHeader); + dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader); +} + +} // namespace rocksdb + +#endif diff --git a/api/leveldb/db/write_batch_internal.h b/api/leveldb/db/write_batch_internal.h index b4295004e13..c8421cce124 100644 --- a/api/leveldb/db/write_batch_internal.h +++ b/api/leveldb/db/write_batch_internal.h @@ -14,6 +14,20 @@ namespace leveldb { // WriteBatch that we don't want in the public WriteBatch interface. class WriteBatchInternal { public: +#ifdef HAVE_ROCKSDB + // WriteBatch methods with column_family_id instead of ColumnFamilyHandle* + static void Put(WriteBatch* batch, uint32_t column_family_id, + const Slice& key, const Slice& value); + + static void Put(WriteBatch* batch, uint32_t column_family_id, + const SliceParts& key, const SliceParts& value); + + static void Delete(WriteBatch* batch, uint32_t column_family_id, + const Slice& key); + + static void Merge(WriteBatch* batch, uint32_t column_family_id, + const Slice& key, const Slice& value); +#endif // Return the number of entries in the batch. static int Count(const WriteBatch* batch); diff --git a/api/leveldb/include/leveldb/cache.h b/api/leveldb/include/leveldb/cache.h index e671d2ad01a..6ae25122133 100644 --- a/api/leveldb/include/leveldb/cache.h +++ b/api/leveldb/include/leveldb/cache.h @@ -23,6 +23,7 @@ #define leveldb rocksdb #endif +#include <memory> #include <stdint.h> #include "slice.h" @@ -33,6 +34,11 @@ class Cache; // Create a new cache with a fixed size capacity. This implementation // of Cache uses a least-recently-used eviction policy. extern Cache* NewLRUCache(size_t capacity); +#ifdef HAVE_ROCKSDB +extern Cache* NewLRUCache(size_t capacity, int numSharedBits); +extern Cache* NewLRUCache(size_t capacity, int numSharedBits, + int removeScanCountLimit); +#endif class Cache { public: diff --git a/api/leveldb/include/leveldb/db.h b/api/leveldb/include/leveldb/db.h index 380fe7a126e..f9cd546fc81 100644 --- a/api/leveldb/include/leveldb/db.h +++ b/api/leveldb/include/leveldb/db.h @@ -10,10 +10,13 @@ #define leveldb rocksdb #endif +#include <memory> #include <stdint.h> #include <stdio.h> +#include <vector> #include "iterator.h" #include "options.h" +#include "write_batch.h" #ifdef HAVE_HYPERLEVELDB #include "replay_iterator.h" #endif @@ -24,11 +27,29 @@ namespace leveldb { static const int kMajorVersion = 1; static const int kMinorVersion = 17; -struct Options; struct ReadOptions; struct WriteOptions; class WriteBatch; +#ifdef HAVE_ROCKSDB +struct FlushOptions; +class ColumnFamilyHandle { + public: + virtual ~ColumnFamilyHandle() {} +}; +extern const std::string kDefaultColumnFamilyName; + +struct ColumnFamilyDescriptor { + std::string name; + ColumnFamilyOptions options; + ColumnFamilyDescriptor() + : name(kDefaultColumnFamilyName), options(ColumnFamilyOptions()) {} + ColumnFamilyDescriptor(const std::string& _name, + const ColumnFamilyOptions& _options) + : name(_name), options(_options) {} +}; +#endif + // Abstract handle to particular state of a DB. // A Snapshot is an immutable object and can therefore be safely // accessed from multiple threads without any external synchronization. @@ -73,6 +94,112 @@ class DB { const std::string& name, DB** dbptr); +#ifdef HAVE_ROCKSDB + // Open DB with column families. + // db_options specify database specific options + // column_families is the vector of all column families in the databse, + // containing column family name and options. You need to open ALL column + // families in the database. To get the list of column families, you can use + // ListColumnFamilies(). Also, you can open only a subset of column families + // for read-only access. + // The default column family name is 'default' and it's stored + // in rocksdb::kDefaultColumnFamilyName. + // If everything is OK, handles will on return be the same size + // as column_families --- handles[i] will be a handle that you + // will use to operate on column family column_family[i] + static Status Open(const Options& db_options, const std::string& name, + const std::vector<ColumnFamilyDescriptor>& column_families, + std::vector<ColumnFamilyHandle*>* handles, DB** dbptr); + + // ListColumnFamilies will open the DB specified by argument name + // and return the list of all column families in that DB + // through column_families argument. The ordering of + // column families in column_families is unspecified. + static Status ListColumnFamilies(const Options& db_options, + const std::string& name, + std::vector<std::string>* column_families); + + // Create a column_family and return the handle of column family + // through the argument handle. + virtual Status CreateColumnFamily(const Options& options, + const std::string& column_family_name, + ColumnFamilyHandle** handle) = 0; + + // Drop a column family specified by column_family handle. This call + // only records a drop record in the manifest and prevents the column + // family from flushing and compacting. + virtual Status DropColumnFamily(ColumnFamilyHandle* column_family) = 0; + + // Set the database entry for "key" to "value". + // Returns OK on success, and a non-OK status on error. + // Note: consider setting options.sync = true. + virtual Status Put(const WriteOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) = 0; + + // Remove the database entry (if any) for "key". Returns OK on + // success, and a non-OK status on error. It is not an error if "key" + // did not exist in the database. + // Note: consider setting options.sync = true. + virtual Status Delete(const WriteOptions& options, + ColumnFamilyHandle* column_family, + const Slice& key) = 0; + + // Merge the database entry for "key" with "value". Returns OK on success, + // and a non-OK status on error. The semantics of this operation is + // determined by the user provided merge_operator when opening DB. + // Note: consider setting options.sync = true. + virtual Status Merge(const WriteOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) = 0; + + // May return some other Status on an error. + virtual Status Get(const ReadOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + std::string* value) = 0; + + // If keys[i] does not exist in the database, then the i'th returned + // status will be one for which Status::IsNotFound() is true, and + // (*values)[i] will be set to some arbitrary value (often ""). Otherwise, + // the i'th returned status will have Status::ok() true, and (*values)[i] + // will store the value associated with keys[i]. + // + // (*values) will always be resized to be the same size as (keys). + // Similarly, the number of returned statuses will be the number of keys. + // Note: keys will not be "de-duplicated". Duplicate keys will return + // duplicate values in order. + virtual std::vector<Status> MultiGet( + const ReadOptions& options, + const std::vector<ColumnFamilyHandle*>& column_family, + const std::vector<Slice>& keys, std::vector<std::string>* values) = 0; + + // If the key definitely does not exist in the database, then this method + // returns false, else true. If the caller wants to obtain value when the key + // is found in memory, a bool for 'value_found' must be passed. 'value_found' + // will be true on return if value has been set properly. + // This check is potentially lighter-weight than invoking DB::Get(). One way + // to make this lighter weight is to avoid doing any IOs. + // Default implementation here returns true and sets 'value_found' to false + virtual bool KeyMayExist(const ReadOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + std::string* value, bool* value_found = NULL) { + if (value_found != NULL) { + *value_found = false; + } + return true; + } + + virtual Iterator* NewIterator(const ReadOptions& options, + ColumnFamilyHandle* column_family) = 0; + + virtual bool GetProperty(ColumnFamilyHandle* column_family, + const Slice& property, std::string* value) = 0; + + // Flush all mem-table data. + virtual Status Flush(const FlushOptions& options, + ColumnFamilyHandle* column_family) = 0; +#endif + DB() { } virtual ~DB(); @@ -212,6 +339,6 @@ Status DestroyDB(const std::string& name, const Options& options); // on a database that contains important information. Status RepairDB(const std::string& dbname, const Options& options); -} // namespace leveldb +}; // namespace leveldb #endif // STORAGE_LEVELDB_INCLUDE_DB_H_ diff --git a/api/leveldb/include/leveldb/options.h b/api/leveldb/include/leveldb/options.h index 77cfac61b33..5ac4e175e27 100644 --- a/api/leveldb/include/leveldb/options.h +++ b/api/leveldb/include/leveldb/options.h @@ -10,6 +10,7 @@ #define leveldb rocksdb #endif +#include <memory> #include <stddef.h> namespace leveldb { @@ -61,6 +62,16 @@ struct Options { // Default: false bool paranoid_checks; +#ifdef HAVE_ROCKSDB + // By default, RocksDB uses only one background thread for flush and + // compaction. Calling this function will set it up such that total of + // `total_threads` is used. Good value for `total_threads` is the number of + // cores. You almost definitely want to call this function if your system is + // bottlenecked by RocksDB. + Options* IncreaseParallelism(int total_threads = 16) { return this; } + Options* OptimizeLevelStyleCompaction() { return this; } +#endif + #if HAVE_ELEVELDB // Riak specific: this variable replaces paranoid_checks at one // one place in the code. This variable alone controls whether or not @@ -167,6 +178,16 @@ struct Options { Options(); }; +#ifdef HAVE_ROCKSDB +struct ColumnFamilyOptions : public Options { + ColumnFamilyOptions() : Options() {} +}; + +struct DBOptions : public Options { + DBOptions() : Options() {} +}; +#endif + // Options that control read operations struct ReadOptions { // If true, all data read from underlying storage will be @@ -218,6 +239,17 @@ struct WriteOptions { } }; +#ifdef HAVE_ROCKSDB +// Options that control flush operations +struct FlushOptions { + // If true, the flush will wait until the flush is done. + // Default: true + bool wait; + + FlushOptions() : wait(true) {} +}; +#endif + } // namespace leveldb #endif // STORAGE_LEVELDB_INCLUDE_OPTIONS_H_ diff --git a/api/leveldb/include/leveldb/slice.h b/api/leveldb/include/leveldb/slice.h index c801f783b64..d7c20cfcaac 100644 --- a/api/leveldb/include/leveldb/slice.h +++ b/api/leveldb/include/leveldb/slice.h @@ -89,6 +89,18 @@ class Slice { // Intentionally copyable }; +#ifdef HAVE_ROCKSDB +// A set of Slices that are virtually concatenated together. 'parts' points +// to an array of Slices. The number of elements in the array is 'num_parts'. +struct SliceParts { + SliceParts(const Slice* _parts, int _num_parts) : + parts(_parts), num_parts(_num_parts) { } + + const Slice* parts; + int num_parts; +}; +#endif + inline bool operator==(const Slice& x, const Slice& y) { return ((x.size() == y.size()) && (memcmp(x.data(), y.data(), x.size()) == 0)); diff --git a/api/leveldb/include/leveldb/write_batch.h b/api/leveldb/include/leveldb/write_batch.h index 01089bff7c4..9184d42c24c 100644 --- a/api/leveldb/include/leveldb/write_batch.h +++ b/api/leveldb/include/leveldb/write_batch.h @@ -32,10 +32,18 @@ namespace leveldb { class Slice; +#if HAVE_ROCKSDB +class ColumnFamilyHandle; +struct SliceParts; +#endif class WriteBatch { public: +#ifdef HAVE_ROCKSDB + explicit WriteBatch(size_t reserved_bytes = 0); +#else WriteBatch(); +#endif ~WriteBatch(); // Store the mapping "key->value" in the database. @@ -47,15 +55,80 @@ class WriteBatch { // Clear all updates buffered in this batch. void Clear(); +#ifdef HAVE_ROCKSDB + void Put(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value); + + // Variant of Put() that gathers output like writev(2). The key and value + // that will be written to the database are concatentations of arrays of + // slices. + void Put(ColumnFamilyHandle* column_family, const SliceParts& key, + const SliceParts& value); + + void Delete(ColumnFamilyHandle* column_family, const Slice& key); +#endif + // Support for iterating over the contents of a batch. class Handler { public: virtual ~Handler(); +#ifdef HAVE_ROCKSDB + // default implementation will just call Put without column family for + // backwards compatibility. If the column family is not default, + // the function is noop + virtual Status PutCF(uint32_t column_family_id, const Slice& key, + const Slice& value) { + if (column_family_id == 0) { + // Put() historically doesn't return status. We didn't want to be + // backwards incompatible so we didn't change the return status + // (this is a public API). We do an ordinary get and return Status::OK() + Put(key, value); + return Status::OK(); + } + return Status::InvalidArgument( + "non-default column family and PutCF not implemented"); + } + // Merge and LogData are not pure virtual. Otherwise, we would break + // existing clients of Handler on a source code level. The default + // implementation of Merge simply throws a runtime exception. + virtual Status MergeCF(uint32_t column_family_id, const Slice& key, + const Slice& value) { + if (column_family_id == 0) { + Merge(key, value); + return Status::OK(); + } + return Status::InvalidArgument( + "non-default column family and MergeCF not implemented"); + } + virtual void Merge(const Slice& key, const Slice& value); + // The default implementation of LogData does nothing. + virtual void LogData(const Slice& blob); + virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) { + if (column_family_id == 0) { + Delete(key); + return Status::OK(); + } + return Status::InvalidArgument( + "non-default column family and DeleteCF not implemented"); + } + // Continue is called by WriteBatch::Iterate. If it returns false, + // iteration is halted. Otherwise, it continues iterating. The default + // implementation always returns true. + virtual bool Continue(); +#endif virtual void Put(const Slice& key, const Slice& value) = 0; virtual void Delete(const Slice& key) = 0; }; Status Iterate(Handler* handler) const; +#ifdef HAVE_ROCKSDB + // Retrieve data size of the batch. + size_t GetDataSize() const { return rep_.size(); } + + // Returns the number of updates in the batch + int Count() const; +#endif + private: friend class WriteBatchInternal; diff --git a/api/leveldb/leveldb_wt.cc b/api/leveldb/leveldb_wt.cc index f156a29c139..72a4176af8b 100644 --- a/api/leveldb/leveldb_wt.cc +++ b/api/leveldb/leveldb_wt.cc @@ -31,19 +31,7 @@ #include <unistd.h> #include <sstream> -using leveldb::Cache; -using leveldb::FilterPolicy; -using leveldb::Iterator; -using leveldb::Options; -using leveldb::ReadOptions; -using leveldb::WriteBatch; -using leveldb::WriteOptions; -using leveldb::Range; -using leveldb::Slice; -using leveldb::Snapshot; -using leveldb::Status; #if HAVE_ELEVELDB -using leveldb::Value; namespace leveldb { Value::~Value() {} @@ -63,14 +51,33 @@ class StringValue : public Value { } #endif -#define WT_URI "table:data" -#define WT_CONN_CONFIG "log=(enabled),checkpoint_sync=false,session_max=8192,"\ - "mmap=false,transaction_sync=(enabled=true,method=none)," -#define WT_TABLE_CONFIG "type=lsm,leaf_page_max=4KB,leaf_item_max=1KB," \ - "internal_page_max=128K,lsm=(chunk_size=100MB," \ - "bloom_config=(leaf_page_max=8MB)," \ - "bloom_bit_count=28,bloom_hash_count=19," \ - "bloom_oldest=true)," +Cache *NewLRUCache(size_t capacity) { + return new CacheImpl(capacity); +} + +Status leveldb::DestroyDB(const std::string& name, const Options& options) { + WT_CONNECTION *conn; + int ret, t_ret; + /* If the database doesn't exist, there is nothing to destroy. */ + if (access((name + "/WiredTiger").c_str(), F_OK) != 0) + return Status::OK(); + if ((ret = ::wiredtiger_open(name.c_str(), NULL, NULL, &conn)) != 0) + return WiredTigerErrorToStatus(ret, NULL); + WT_SESSION *session; + if ((ret = conn->open_session(conn, NULL, NULL, &session)) != 0) + goto cleanup; + if ((ret = session->drop(session, WT_URI, "force")) != 0) + goto cleanup; + +cleanup: + if ((t_ret = conn->close(conn, NULL)) != 0 && ret == 0) + ret = t_ret; + return WiredTigerErrorToStatus(ret, NULL); +} + +Status leveldb::RepairDB(const std::string& dbname, const Options& options) { + return Status::NotSupported("sorry!"); +} /* Destructors required for interfaces. */ leveldb::DB::~DB() {} @@ -186,260 +193,8 @@ const FilterPolicy *NewBloomFilterPolicy2(int bits_per_key) { #endif Cache::~Cache() {} - -class CacheImpl : public Cache { -public: - CacheImpl(size_t capacity) : Cache(), capacity_(capacity) {} - - virtual Handle* Insert(const Slice& key, void* value, size_t charge, - void (*deleter)(const Slice& key, void* value)) { return 0; } - virtual Handle* Lookup(const Slice& key) { return 0; } - virtual void Release(Handle* handle) {} - virtual void* Value(Handle* handle) { return 0; } - virtual void Erase(const Slice& key) {} - virtual uint64_t NewId() { return 0; } - - size_t capacity_; -}; - -Cache *NewLRUCache(size_t capacity) { - return new CacheImpl(capacity); } -Status DestroyDB(const std::string& name, const Options& options) { - WT_CONNECTION *conn; - int ret, t_ret; - /* If the database doesn't exist, there is nothing to destroy. */ - if (access((name + "/WiredTiger").c_str(), F_OK) != 0) - return Status::OK(); - if ((ret = ::wiredtiger_open(name.c_str(), NULL, NULL, &conn)) != 0) - return WiredTigerErrorToStatus(ret, NULL); - WT_SESSION *session; - if ((ret = conn->open_session(conn, NULL, NULL, &session)) != 0) - goto cleanup; - if ((ret = session->drop(session, WT_URI, "force")) != 0) - goto cleanup; - -cleanup: - if ((t_ret = conn->close(conn, NULL)) != 0 && ret == 0) - ret = t_ret; - return WiredTigerErrorToStatus(ret, NULL); -} - -Status RepairDB(const std::string& dbname, const Options& options) { - return Status::NotSupported("sorry!"); -} -} - -/* POSIX thread-local storage */ -template <class T> -class ThreadLocal { -public: - static void cleanup(void *val) { - delete (T *)val; - } - - ThreadLocal() { - int ret = pthread_key_create(&key_, cleanup); - assert(ret == 0); - } - - ~ThreadLocal() { - int ret = pthread_key_delete(key_); - assert(ret == 0); - } - - T *get() { - return (T *)(pthread_getspecific(key_)); - } - - void set(T *value) { - int ret = pthread_setspecific(key_, value); - assert(ret == 0); - } - -private: - pthread_key_t key_; -}; - -/* WiredTiger implementations. */ -class DbImpl; - -/* Context for operations (including snapshots, write batches, transactions) */ -class OperationContext { -public: - OperationContext(WT_CONNECTION *conn) : conn_(conn), in_use_(false) { - int ret = conn->open_session(conn, NULL, "isolation=snapshot", &session_); - assert(ret == 0); - ret = session_->open_cursor( - session_, WT_URI, NULL, NULL, &cursor_); - assert(ret == 0); - } - - ~OperationContext() { -#ifdef WANT_SHUTDOWN_RACES - int ret = session_->close(session_, NULL); - assert(ret == 0); -#endif - } - - WT_CURSOR *getCursor(); - void releaseCursor(WT_CURSOR *cursor); - -private: - WT_CONNECTION *conn_; - WT_SESSION *session_; - WT_CURSOR *cursor_; - bool in_use_; -}; - -class IteratorImpl : public Iterator { -public: - IteratorImpl(DbImpl *db, const ReadOptions &options); - virtual ~IteratorImpl(); - - // An iterator is either positioned at a key/value pair, or - // not valid. This method returns true iff the iterator is valid. - virtual bool Valid() const { return valid_; } - - virtual void SeekToFirst(); - - virtual void SeekToLast(); - - virtual void Seek(const Slice& target); - - virtual void Next(); - - virtual void Prev(); - - virtual Slice key() const { - return key_; - } - - virtual Slice value() const { - return value_; - } - - virtual Status status() const { - return status_; - } - -private: - DbImpl *db_; - WT_CURSOR *cursor_; - Slice key_, value_; - Status status_; - bool valid_; - bool snapshot_iterator_; - - void SetError(int wiredTigerError) { - valid_ = false; - status_ = WiredTigerErrorToStatus(wiredTigerError, NULL); - } - - // No copying allowed - IteratorImpl(const IteratorImpl&); - void operator=(const IteratorImpl&); -}; - -class SnapshotImpl : public Snapshot { -friend class DbImpl; -friend class IteratorImpl; -public: - SnapshotImpl(DbImpl *db) : - Snapshot(), db_(db), cursor_(NULL), status_(Status::OK()) {} - virtual ~SnapshotImpl() {} -protected: - WT_CURSOR *getCursor() const { return cursor_; } - Status getStatus() const { return status_; } - Status setupTransaction(); - Status releaseTransaction(); -private: - DbImpl *db_; - WT_CURSOR *cursor_; - Status status_; -}; - -class DbImpl : public leveldb::DB { -friend class IteratorImpl; -friend class SnapshotImpl; -public: - DbImpl(WT_CONNECTION *conn) : - DB(), conn_(conn), context_(new ThreadLocal<OperationContext>) {} - virtual ~DbImpl() { - delete context_; - int ret = conn_->close(conn_, NULL); - assert(ret == 0); - } - - virtual Status Put(const WriteOptions& options, - const Slice& key, - const Slice& value); - - virtual Status Delete(const WriteOptions& options, const Slice& key); - - virtual Status Write(const WriteOptions& options, WriteBatch* updates); - - virtual Status Get(const ReadOptions& options, - const Slice& key, std::string* value); - -#if HAVE_ELEVELDB - virtual Status Get(const ReadOptions& options, - const Slice& key, Value* value); -#endif - -#ifdef HAVE_HYPERLEVELDB - virtual Status LiveBackup(const Slice& name) { - return Status::NotSupported("sorry!"); - } - virtual void GetReplayTimestamp(std::string* timestamp) {} - virtual void AllowGarbageCollectBeforeTimestamp(const std::string& timestamp) {} - virtual bool ValidateTimestamp(const std::string& timestamp) {} - virtual int CompareTimestamps(const std::string& lhs, const std::string& rhs) {} - virtual Status GetReplayIterator(const std::string& timestamp, - leveldb::ReplayIterator** iter) { return Status::NotSupported("sorry!"); } - virtual void ReleaseReplayIterator(leveldb::ReplayIterator* iter) {} -#endif - - virtual Iterator* NewIterator(const ReadOptions& options); - - virtual const Snapshot* GetSnapshot(); - - virtual void ReleaseSnapshot(const Snapshot* snapshot); - - virtual bool GetProperty(const Slice& property, std::string* value); - - virtual void GetApproximateSizes(const Range* range, int n, - uint64_t* sizes); - - virtual void CompactRange(const Slice* begin, const Slice* end); - - virtual void SuspendCompactions(); - - virtual void ResumeCompactions(); - -private: - WT_CONNECTION *conn_; - ThreadLocal<OperationContext> *context_; - - OperationContext *getContext() { - OperationContext *ctx = context_->get(); - if (ctx == NULL) { - ctx = new OperationContext(conn_); - context_->set(ctx); - } - return (ctx); - } - - // No copying allowed - DbImpl(const DbImpl&); - void operator=(const DbImpl&); - -protected: - WT_CURSOR *getCursor() { return getContext()->getCursor(); } - void releaseCursor(WT_CURSOR *cursor) { getContext()->releaseCursor(cursor); } -}; - // Return a cursor for the current operation to use. In the "normal" case // we will return the cursor opened when the OperationContext was created. // If the thread this OperationContext belongs to requires more than one diff --git a/api/leveldb/leveldb_wt.h b/api/leveldb/leveldb_wt.h index 08f9523418a..32f4310dcd1 100644 --- a/api/leveldb/leveldb_wt.h +++ b/api/leveldb/leveldb_wt.h @@ -24,6 +24,8 @@ * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR * OTHER DEALINGS IN THE SOFTWARE. */ +#ifndef _INCLUDE_LEVELDB_WT_H +#define _INCLUDE_LEVELDB_WT_H 1 #include "wiredtiger_config.h" @@ -41,3 +43,339 @@ #endif #include "wiredtiger.h" + +#define WT_URI "table:data" +#define WT_CONN_CONFIG "log=(enabled),checkpoint_sync=false,session_max=8192,"\ + "mmap=false,transaction_sync=(enabled=true,method=none)," +#define WT_TABLE_CONFIG "type=lsm,leaf_page_max=4KB,leaf_item_max=1KB," \ + "internal_page_max=128K,lsm=(chunk_size=100MB," \ + "bloom_config=(leaf_page_max=8MB)," \ + "bloom_bit_count=28,bloom_hash_count=19," \ + "bloom_oldest=true)," + +using leveldb::Cache; +using leveldb::FilterPolicy; +using leveldb::Iterator; +using leveldb::Options; +using leveldb::ReadOptions; +using leveldb::WriteBatch; +using leveldb::WriteOptions; +using leveldb::Range; +using leveldb::Slice; +using leveldb::Snapshot; +using leveldb::Status; +#if HAVE_ELEVELDB +using leveldb::Value; +#endif +#if HAVE_ROCKSDB +using leveldb::FlushOptions; +using leveldb::ColumnFamilyHandle; +#endif + +extern Status WiredTigerErrorToStatus(int wiredTigerError, const char *msg = ""); + +class CacheImpl : public Cache { +public: + CacheImpl(size_t capacity) : Cache(), capacity_(capacity) {} + + virtual Handle* Insert(const Slice& key, void* value, size_t charge, + void (*deleter)(const Slice& key, void* value)) { return 0; } + virtual Handle* Lookup(const Slice& key) { return 0; } + virtual void Release(Handle* handle) {} + virtual void* Value(Handle* handle) { return 0; } + virtual void Erase(const Slice& key) {} + virtual uint64_t NewId() { return 0; } + + size_t capacity_; +}; + +/* POSIX thread-local storage */ +template <class T> +class ThreadLocal { +public: + static void cleanup(void *val) { + delete (T *)val; + } + + ThreadLocal() { + int ret = pthread_key_create(&key_, cleanup); + assert(ret == 0); + } + + ~ThreadLocal() { + int ret = pthread_key_delete(key_); + assert(ret == 0); + } + + T *get() { + return (T *)(pthread_getspecific(key_)); + } + + void set(T *value) { + int ret = pthread_setspecific(key_, value); + assert(ret == 0); + } + +private: + pthread_key_t key_; +}; + +/* WiredTiger implementations. */ +class DbImpl; + +/* Context for operations (including snapshots, write batches, transactions) */ +class OperationContext { +public: + OperationContext(WT_CONNECTION *conn) : conn_(conn), in_use_(false) { + int ret = conn->open_session(conn, NULL, "isolation=snapshot", &session_); + assert(ret == 0); + ret = session_->open_cursor( + session_, WT_URI, NULL, NULL, &cursor_); + assert(ret == 0); + } + + ~OperationContext() { +#ifdef WANT_SHUTDOWN_RACES + int ret = session_->close(session_, NULL); + assert(ret == 0); +#endif + } + + WT_CURSOR *getCursor(); + void releaseCursor(WT_CURSOR *cursor); + +private: + WT_CONNECTION *conn_; + WT_SESSION *session_; + WT_CURSOR *cursor_; + bool in_use_; +}; + +class IteratorImpl : public Iterator { +public: + IteratorImpl(DbImpl *db, const ReadOptions &options); + virtual ~IteratorImpl(); + + // An iterator is either positioned at a key/value pair, or + // not valid. This method returns true iff the iterator is valid. + virtual bool Valid() const { return valid_; } + + virtual void SeekToFirst(); + + virtual void SeekToLast(); + + virtual void Seek(const Slice& target); + + virtual void Next(); + + virtual void Prev(); + + virtual Slice key() const { + return key_; + } + + virtual Slice value() const { + return value_; + } + + virtual Status status() const { + return status_; + } + +private: + DbImpl *db_; + WT_CURSOR *cursor_; + Slice key_, value_; + Status status_; + bool valid_; + bool snapshot_iterator_; + + void SetError(int wiredTigerError) { + valid_ = false; + status_ = WiredTigerErrorToStatus(wiredTigerError, NULL); + } + + // No copying allowed + IteratorImpl(const IteratorImpl&); + void operator=(const IteratorImpl&); +}; + +class SnapshotImpl : public Snapshot { +friend class DbImpl; +friend class IteratorImpl; +public: + SnapshotImpl(DbImpl *db) : + Snapshot(), db_(db), cursor_(NULL), status_(Status::OK()) {} + virtual ~SnapshotImpl() {} +protected: + WT_CURSOR *getCursor() const { return cursor_; } + Status getStatus() const { return status_; } + Status setupTransaction(); + Status releaseTransaction(); +private: + DbImpl *db_; + WT_CURSOR *cursor_; + Status status_; +}; + +class DbImpl : public leveldb::DB { +friend class IteratorImpl; +friend class SnapshotImpl; +public: + DbImpl(WT_CONNECTION *conn) : + DB(), conn_(conn), context_(new ThreadLocal<OperationContext>) {} + virtual ~DbImpl() { + delete context_; + int ret = conn_->close(conn_, NULL); + assert(ret == 0); + } + + virtual Status Put(const WriteOptions& options, + const Slice& key, + const Slice& value); + + virtual Status Delete(const WriteOptions& options, const Slice& key); + + virtual Status Write(const WriteOptions& options, WriteBatch* updates); + + virtual Status Get(const ReadOptions& options, + const Slice& key, std::string* value); + +#if HAVE_ELEVELDB + virtual Status Get(const ReadOptions& options, + const Slice& key, Value* value); +#endif + +#ifdef HAVE_HYPERLEVELDB + virtual Status LiveBackup(const Slice& name) { + return Status::NotSupported("sorry!"); + } + virtual void GetReplayTimestamp(std::string* timestamp) {} + virtual void AllowGarbageCollectBeforeTimestamp(const std::string& timestamp) {} + virtual bool ValidateTimestamp(const std::string& timestamp) {} + virtual int CompareTimestamps(const std::string& lhs, const std::string& rhs) {} + virtual Status GetReplayIterator(const std::string& timestamp, + leveldb::ReplayIterator** iter) { return Status::NotSupported("sorry!"); } + virtual void ReleaseReplayIterator(leveldb::ReplayIterator* iter) {} +#endif + +#ifdef HAVE_ROCKSDB + virtual Status CreateColumnFamily(const Options& options, + const std::string& column_family_name, + ColumnFamilyHandle** handle); + + // Drop a column family specified by column_family handle. This call + // only records a drop record in the manifest and prevents the column + // family from flushing and compacting. + virtual Status DropColumnFamily(ColumnFamilyHandle* column_family); + + // Set the database entry for "key" to "value". + // Returns OK on success, and a non-OK status on error. + // Note: consider setting options.sync = true. + virtual Status Put(const WriteOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value); + + // Remove the database entry (if any) for "key". Returns OK on + // success, and a non-OK status on error. It is not an error if "key" + // did not exist in the database. + // Note: consider setting options.sync = true. + virtual Status Delete(const WriteOptions& options, + ColumnFamilyHandle* column_family, + const Slice& key); + + // Merge the database entry for "key" with "value". Returns OK on success, + // and a non-OK status on error. The semantics of this operation is + // determined by the user provided merge_operator when opening DB. + // Note: consider setting options.sync = true. + virtual Status Merge(const WriteOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value); + + // May return some other Status on an error. + virtual Status Get(const ReadOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + std::string* value); + + // If keys[i] does not exist in the database, then the i'th returned + // status will be one for which Status::IsNotFound() is true, and + // (*values)[i] will be set to some arbitrary value (often ""). Otherwise, + // the i'th returned status will have Status::ok() true, and (*values)[i] + // will store the value associated with keys[i]. + // + // (*values) will always be resized to be the same size as (keys). + // Similarly, the number of returned statuses will be the number of keys. + // Note: keys will not be "de-duplicated". Duplicate keys will return + // duplicate values in order. + virtual std::vector<Status> MultiGet( + const ReadOptions& options, + const std::vector<ColumnFamilyHandle*>& column_family, + const std::vector<Slice>& keys, std::vector<std::string>* values); + + virtual Iterator* NewIterator(const ReadOptions& options, + ColumnFamilyHandle* column_family); + + virtual bool GetProperty(ColumnFamilyHandle* column_family, + const Slice& property, std::string* value); + + // Flush all mem-table data. + virtual Status Flush(const FlushOptions& options, + ColumnFamilyHandle* column_family); +#endif + + virtual Iterator* NewIterator(const ReadOptions& options); + + virtual const Snapshot* GetSnapshot(); + + virtual void ReleaseSnapshot(const Snapshot* snapshot); + + virtual bool GetProperty(const Slice& property, std::string* value); + + virtual void GetApproximateSizes(const Range* range, int n, + uint64_t* sizes); + + virtual void CompactRange(const Slice* begin, const Slice* end); + + virtual void SuspendCompactions(); + + virtual void ResumeCompactions(); + +private: + WT_CONNECTION *conn_; + ThreadLocal<OperationContext> *context_; + + OperationContext *getContext() { + OperationContext *ctx = context_->get(); + if (ctx == NULL) { + ctx = new OperationContext(conn_); + context_->set(ctx); + } + return (ctx); + } + + // No copying allowed + DbImpl(const DbImpl&); + void operator=(const DbImpl&); + +protected: + WT_CURSOR *getCursor() { return getContext()->getCursor(); } + void releaseCursor(WT_CURSOR *cursor) { getContext()->releaseCursor(cursor); } +}; + +#ifdef HAVE_ROCKSDB +// ColumnFamilyHandleImpl is the class that clients use to access different +// column families. It has non-trivial destructor, which gets called when client +// is done using the column family +class ColumnFamilyHandleImpl : public ColumnFamilyHandle { + public: + ColumnFamilyHandleImpl(DbImpl* db, uint32_t id) : db_(db), id_(id) {} + virtual ~ColumnFamilyHandleImpl() {} + virtual uint32_t GetID() const { return id_; } + + private: + DbImpl* db_; + uint32_t id_; +}; +#endif + +#endif diff --git a/api/leveldb/rocks_wt.cc b/api/leveldb/rocks_wt.cc new file mode 100644 index 00000000000..13fd96b734d --- /dev/null +++ b/api/leveldb/rocks_wt.cc @@ -0,0 +1,128 @@ +/*- + * Public Domain 2008-2014 WiredTiger, Inc. + * + * This is free and unencumbered software released into the public domain. + * + * Anyone is free to copy, modify, publish, use, compile, sell, or + * distribute this software, either in source code form or as a compiled + * binary, for any purpose, commercial or non-commercial, and by any + * means. + * + * In jurisdictions that recognize copyright laws, the author or authors + * of this software dedicate any and all copyright interest in the + * software to the public domain. We make this dedication for the benefit + * of the public at large and to the detriment of our heirs and + * successors. We intend this dedication to be an overt act of + * relinquishment in perpetuity of all present and future rights to this + * software under copyright law. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. + * IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR + * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, + * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +#include "leveldb_wt.h" +#include <errno.h> +#include <sys/stat.h> +#include <unistd.h> +#include <sstream> + +using leveldb::Cache; +using leveldb::FilterPolicy; +using leveldb::Iterator; +using leveldb::Options; +using leveldb::ReadOptions; +using leveldb::WriteBatch; +using leveldb::WriteOptions; +using leveldb::Range; +using leveldb::Slice; +using leveldb::Snapshot; +using leveldb::Status; + +Status +rocksdb::DB::ListColumnFamilies(rocksdb::Options const&, std::string const&, std::vector<std::string, std::allocator<std::string> >*) +{ + return WiredTigerErrorToStatus(ENOTSUP); +} + +Status +rocksdb::DB::Open(rocksdb::Options const&, std::string const&, std::vector<rocksdb::ColumnFamilyDescriptor, std::allocator<rocksdb::ColumnFamilyDescriptor> > const&, std::vector<rocksdb::ColumnFamilyHandle*, std::allocator<rocksdb::ColumnFamilyHandle*> >*, rocksdb::DB**) +{ + return WiredTigerErrorToStatus(ENOTSUP); +} + +void +WriteBatch::Handler::Merge(const Slice& key, const Slice& value) +{ +} + +void +WriteBatch::Handler::LogData(const Slice& blob) +{ +} + +Status +DbImpl::Merge(rocksdb::WriteOptions const&, rocksdb::ColumnFamilyHandle*, rocksdb::Slice const&, rocksdb::Slice const&) +{ + return WiredTigerErrorToStatus(ENOTSUP); +} + +Status +DbImpl::CreateColumnFamily(rocksdb::Options const&, std::string const&, rocksdb::ColumnFamilyHandle**) +{ + return WiredTigerErrorToStatus(ENOTSUP); +} + +Status +DbImpl::DropColumnFamily(rocksdb::ColumnFamilyHandle*) +{ + return WiredTigerErrorToStatus(ENOTSUP); +} + +Status +DbImpl::Delete(rocksdb::WriteOptions const&, rocksdb::ColumnFamilyHandle*, rocksdb::Slice const&) +{ + return WiredTigerErrorToStatus(ENOTSUP); +} + +Status +DbImpl::Flush(rocksdb::FlushOptions const&, rocksdb::ColumnFamilyHandle*) +{ + return WiredTigerErrorToStatus(ENOTSUP); +} + +Status +DbImpl::Get(rocksdb::ReadOptions const&, rocksdb::ColumnFamilyHandle*, rocksdb::Slice const&, std::string*) +{ + return WiredTigerErrorToStatus(ENOTSUP); +} + +bool +DbImpl::GetProperty(rocksdb::ColumnFamilyHandle*, rocksdb::Slice const&, std::string*) +{ + return false; +} + +std::vector<Status> +DbImpl::MultiGet(rocksdb::ReadOptions const&, std::vector<rocksdb::ColumnFamilyHandle*, std::allocator<rocksdb::ColumnFamilyHandle*> > const&, std::vector<rocksdb::Slice, std::allocator<rocksdb::Slice> > const&, std::vector<std::string, std::allocator<std::string> >*) +{ + std::vector<Status> ret; + ret.push_back(WiredTigerErrorToStatus(ENOTSUP)); + return ret; +} + +Iterator * +DbImpl::NewIterator(rocksdb::ReadOptions const&, rocksdb::ColumnFamilyHandle*) +{ + return NULL; +} + +Status +DbImpl::Put(rocksdb::WriteOptions const&, rocksdb::ColumnFamilyHandle*, rocksdb::Slice const&, rocksdb::Slice const&) +{ + return WiredTigerErrorToStatus(ENOTSUP); +} diff --git a/api/leveldb/util/coding.cc b/api/leveldb/util/coding.cc index dbd7a6545c6..ad1f457a16a 100644 --- a/api/leveldb/util/coding.cc +++ b/api/leveldb/util/coding.cc @@ -6,44 +6,6 @@ namespace leveldb { -void EncodeFixed32(char* buf, uint32_t value) { -#if __BYTE_ORDER == __LITTLE_ENDIAN - memcpy(buf, &value, sizeof(value)); -#else - buf[0] = value & 0xff; - buf[1] = (value >> 8) & 0xff; - buf[2] = (value >> 16) & 0xff; - buf[3] = (value >> 24) & 0xff; -#endif -} - -void EncodeFixed64(char* buf, uint64_t value) { -#if __BYTE_ORDER == __LITTLE_ENDIAN - memcpy(buf, &value, sizeof(value)); -#else - buf[0] = value & 0xff; - buf[1] = (value >> 8) & 0xff; - buf[2] = (value >> 16) & 0xff; - buf[3] = (value >> 24) & 0xff; - buf[4] = (value >> 32) & 0xff; - buf[5] = (value >> 40) & 0xff; - buf[6] = (value >> 48) & 0xff; - buf[7] = (value >> 56) & 0xff; -#endif -} - -void PutFixed32(std::string* dst, uint32_t value) { - char buf[sizeof(value)]; - EncodeFixed32(buf, value); - dst->append(buf, sizeof(buf)); -} - -void PutFixed64(std::string* dst, uint64_t value) { - char buf[sizeof(value)]; - EncodeFixed64(buf, value); - dst->append(buf, sizeof(buf)); -} - char* EncodeVarint32(char* dst, uint32_t v) { // Operate on characters as unsigneds unsigned char* ptr = reinterpret_cast<unsigned char*>(dst); @@ -72,43 +34,6 @@ char* EncodeVarint32(char* dst, uint32_t v) { return reinterpret_cast<char*>(ptr); } -void PutVarint32(std::string* dst, uint32_t v) { - char buf[5]; - char* ptr = EncodeVarint32(buf, v); - dst->append(buf, ptr - buf); -} - -char* EncodeVarint64(char* dst, uint64_t v) { - static const int B = 128; - unsigned char* ptr = reinterpret_cast<unsigned char*>(dst); - while (v >= B) { - *(ptr++) = (v & (B-1)) | B; - v >>= 7; - } - *(ptr++) = static_cast<unsigned char>(v); - return reinterpret_cast<char*>(ptr); -} - -void PutVarint64(std::string* dst, uint64_t v) { - char buf[10]; - char* ptr = EncodeVarint64(buf, v); - dst->append(buf, ptr - buf); -} - -void PutLengthPrefixedSlice(std::string* dst, const Slice& value) { - PutVarint32(dst, value.size()); - dst->append(value.data(), value.size()); -} - -int VarintLength(uint64_t v) { - int len = 1; - while (v >= 128) { - v >>= 7; - len++; - } - return len; -} - const char* GetVarint32PtrFallback(const char* p, const char* limit, uint32_t* value) { @@ -128,18 +53,6 @@ const char* GetVarint32PtrFallback(const char* p, return NULL; } -bool GetVarint32(Slice* input, uint32_t* value) { - const char* p = input->data(); - const char* limit = p + input->size(); - const char* q = GetVarint32Ptr(p, limit, value); - if (q == NULL) { - return false; - } else { - *input = Slice(q, limit - q); - return true; - } -} - const char* GetVarint64Ptr(const char* p, const char* limit, uint64_t* value) { uint64_t result = 0; for (uint32_t shift = 0; shift <= 63 && p < limit; shift += 7) { @@ -157,38 +70,94 @@ const char* GetVarint64Ptr(const char* p, const char* limit, uint64_t* value) { return NULL; } -bool GetVarint64(Slice* input, uint64_t* value) { - const char* p = input->data(); - const char* limit = p + input->size(); - const char* q = GetVarint64Ptr(p, limit, value); - if (q == NULL) { - return false; - } else { - *input = Slice(q, limit - q); - return true; +#ifdef HAVE_ROCKSDB +void BitStreamPutInt(char* dst, size_t dstlen, size_t offset, + uint32_t bits, uint64_t value) { + assert((offset + bits + 7)/8 <= dstlen); + assert(bits <= 64); + + unsigned char* ptr = reinterpret_cast<unsigned char*>(dst); + + size_t byteOffset = offset / 8; + size_t bitOffset = offset % 8; + + // This prevents unused variable warnings when compiling. +#ifndef NDEBUG + // Store truncated value. + uint64_t origValue = (bits < 64)?(value & (((uint64_t)1 << bits) - 1)):value; + uint32_t origBits = bits; +#endif + + while (bits > 0) { + size_t bitsToGet = std::min<size_t>(bits, 8 - bitOffset); + unsigned char mask = ((1 << bitsToGet) - 1); + + ptr[byteOffset] = (ptr[byteOffset] & ~(mask << bitOffset)) + + ((value & mask) << bitOffset); + + value >>= bitsToGet; + byteOffset += 1; + bitOffset = 0; + bits -= bitsToGet; } -} -const char* GetLengthPrefixedSlice(const char* p, const char* limit, - Slice* result) { - uint32_t len; - p = GetVarint32Ptr(p, limit, &len); - if (p == NULL) return NULL; - if (p + len > limit) return NULL; - *result = Slice(p, len); - return p + len; + assert(origValue == BitStreamGetInt(dst, dstlen, offset, origBits)); } -bool GetLengthPrefixedSlice(Slice* input, Slice* result) { - uint32_t len; - if (GetVarint32(input, &len) && - input->size() >= len) { - *result = Slice(input->data(), len); - input->remove_prefix(len); - return true; - } else { - return false; +uint64_t BitStreamGetInt(const char* src, size_t srclen, size_t offset, + uint32_t bits) { + assert((offset + bits + 7)/8 <= srclen); + assert(bits <= 64); + + const unsigned char* ptr = reinterpret_cast<const unsigned char*>(src); + + uint64_t result = 0; + + size_t byteOffset = offset / 8; + size_t bitOffset = offset % 8; + size_t shift = 0; + + while (bits > 0) { + size_t bitsToGet = std::min<size_t>(bits, 8 - bitOffset); + unsigned char mask = ((1 << bitsToGet) - 1); + + result += (uint64_t)((ptr[byteOffset] >> bitOffset) & mask) << shift; + + shift += bitsToGet; + byteOffset += 1; + bitOffset = 0; + bits -= bitsToGet; + } + + return result; + } + +void BitStreamPutInt(std::string* dst, size_t offset, uint32_t bits, + uint64_t value) { + assert((offset + bits + 7)/8 <= dst->size()); + + const size_t kTmpBufLen = sizeof(value) + 1; + char tmpBuf[kTmpBufLen]; + + // Number of bytes of tmpBuf being used + const size_t kUsedBytes = (offset%8 + bits)/8; + + // Copy relevant parts of dst to tmpBuf + for (size_t idx = 0; idx <= kUsedBytes; ++idx) { + tmpBuf[idx] = (*dst)[offset/8 + idx]; + } + + BitStreamPutInt(tmpBuf, kTmpBufLen, offset%8, bits, value); + + // Copy tmpBuf back to dst + for (size_t idx = 0; idx <= kUsedBytes; ++idx) { + (*dst)[offset/8 + idx] = tmpBuf[idx]; + + // Do the check here too as we are working with a buffer. + assert(((bits < 64)?(value & (((uint64_t)1 << bits) - 1)):value) == + BitStreamGetInt(dst, offset, bits)); } } +#endif } // namespace leveldb diff --git a/api/leveldb/util/coding.h b/api/leveldb/util/coding.h index cba97ec421a..ed56ef4ea2d 100644 --- a/api/leveldb/util/coding.h +++ b/api/leveldb/util/coding.h @@ -10,6 +10,7 @@ #ifndef STORAGE_LEVELDB_UTIL_CODING_H_ #define STORAGE_LEVELDB_UTIL_CODING_H_ +#include <algorithm> #include <stdint.h> #include <string.h> #include <string> @@ -18,6 +19,10 @@ namespace leveldb { +// The maximum length of a varint in bytes for 32 and 64 bits respectively. +const unsigned int kMaxVarint32Length = 5; +const unsigned int kMaxVarint64Length = 10; + // Standard Put... routines append to a string extern void PutFixed32(std::string* dst, uint32_t value); extern void PutFixed64(std::string* dst, uint64_t value); @@ -31,6 +36,16 @@ extern bool GetVarint32(Slice* input, uint32_t* value); extern bool GetVarint64(Slice* input, uint64_t* value); extern bool GetLengthPrefixedSlice(Slice* input, Slice* result); +#ifdef HAVE_ROCKSDB +extern void PutLengthPrefixedSliceParts(std::string* dst, + const SliceParts& slice_parts); +extern bool GetFixed64(Slice* input, uint64_t* value); +// This function assumes data is well-formed. +extern Slice GetLengthPrefixedSlice(const char* data); + +extern Slice GetSliceUntil(Slice* slice, char delimiter); +#endif + // Pointer-based variants of GetVarint... These either store a value // in *v and return a pointer just past the parsed value, or return // NULL on error. These routines only look at bytes in the range @@ -99,6 +114,198 @@ inline const char* GetVarint32Ptr(const char* p, return GetVarint32PtrFallback(p, limit, value); } +// Writes an unsigned integer with bits number of bits with its least +// significant bit at offset. +// Bits are numbered from 0 to 7 in the first byte, 8 to 15 in the second and +// so on. +// value is truncated to the bits number of least significant bits. +// REQUIRES: (offset+bits+7)/8 <= dstlen +// REQUIRES: bits <= 64 +extern void BitStreamPutInt(char* dst, size_t dstlen, size_t offset, + uint32_t bits, uint64_t value); + +// Reads an unsigned integer with bits number of bits with its least +// significant bit at offset. +// Bits are numbered in the same way as ByteStreamPutInt(). +// REQUIRES: (offset+bits+7)/8 <= srclen +// REQUIRES: bits <= 64 +extern uint64_t BitStreamGetInt(const char* src, size_t srclen, size_t offset, + uint32_t bits); + +// Convenience functions +extern void BitStreamPutInt(std::string* dst, size_t offset, uint32_t bits, + uint64_t value); +extern uint64_t BitStreamGetInt(const std::string* src, size_t offset, + uint32_t bits); +extern uint64_t BitStreamGetInt(const Slice* src, size_t offset, + uint32_t bits); + +// -- Implementation of the functions declared above +inline void EncodeFixed32(char* buf, uint32_t value) { +#if __BYTE_ORDER == __LITTLE_ENDIAN + memcpy(buf, &value, sizeof(value)); +#else + buf[0] = value & 0xff; + buf[1] = (value >> 8) & 0xff; + buf[2] = (value >> 16) & 0xff; + buf[3] = (value >> 24) & 0xff; +#endif +} + +inline void EncodeFixed64(char* buf, uint64_t value) { +#if __BYTE_ORDER == __LITTLE_ENDIAN + memcpy(buf, &value, sizeof(value)); +#else + buf[0] = value & 0xff; + buf[1] = (value >> 8) & 0xff; + buf[2] = (value >> 16) & 0xff; + buf[3] = (value >> 24) & 0xff; + buf[4] = (value >> 32) & 0xff; + buf[5] = (value >> 40) & 0xff; + buf[6] = (value >> 48) & 0xff; + buf[7] = (value >> 56) & 0xff; +#endif +} + +inline void PutFixed32(std::string* dst, uint32_t value) { + char buf[sizeof(value)]; + EncodeFixed32(buf, value); + dst->append(buf, sizeof(buf)); +} + +inline void PutFixed64(std::string* dst, uint64_t value) { + char buf[sizeof(value)]; + EncodeFixed64(buf, value); + dst->append(buf, sizeof(buf)); +} + +inline void PutVarint32(std::string* dst, uint32_t v) { + char buf[5]; + char* ptr = EncodeVarint32(buf, v); + dst->append(buf, ptr - buf); +} + +inline char* EncodeVarint64(char* dst, uint64_t v) { + static const unsigned int B = 128; + unsigned char* ptr = reinterpret_cast<unsigned char*>(dst); + while (v >= B) { + *(ptr++) = (v & (B - 1)) | B; + v >>= 7; + } + *(ptr++) = static_cast<unsigned char>(v); + return reinterpret_cast<char*>(ptr); +} + +inline void PutVarint64(std::string* dst, uint64_t v) { + char buf[10]; + char* ptr = EncodeVarint64(buf, v); + dst->append(buf, ptr - buf); +} + +inline void PutLengthPrefixedSlice(std::string* dst, const Slice& value) { + PutVarint32(dst, value.size()); + dst->append(value.data(), value.size()); +} + +#ifdef HAVE_ROCKSDB +inline void PutLengthPrefixedSliceParts(std::string* dst, + const SliceParts& slice_parts) { + uint32_t total_bytes = 0; + for (int i = 0; i < slice_parts.num_parts; ++i) { + total_bytes += slice_parts.parts[i].size(); + } + PutVarint32(dst, total_bytes); + for (int i = 0; i < slice_parts.num_parts; ++i) { + dst->append(slice_parts.parts[i].data(), slice_parts.parts[i].size()); + } +} +#endif + +inline int VarintLength(uint64_t v) { + int len = 1; + while (v >= 128) { + v >>= 7; + len++; + } + return len; +} + +#ifdef HAVE_ROCKSDB +inline bool GetFixed64(Slice* input, uint64_t* value) { + if (input->size() < sizeof(uint64_t)) { + return false; + } + *value = DecodeFixed64(input->data()); + input->remove_prefix(sizeof(uint64_t)); + return true; +} +#endif + +inline bool GetVarint32(Slice* input, uint32_t* value) { + const char* p = input->data(); + const char* limit = p + input->size(); + const char* q = GetVarint32Ptr(p, limit, value); + if (q == NULL) { + return false; + } else { + *input = Slice(q, limit - q); + return true; + } +} + +inline bool GetVarint64(Slice* input, uint64_t* value) { + const char* p = input->data(); + const char* limit = p + input->size(); + const char* q = GetVarint64Ptr(p, limit, value); + if (q == NULL) { + return false; + } else { + *input = Slice(q, limit - q); + return true; + } +} + +inline bool GetLengthPrefixedSlice(Slice* input, Slice* result) { + uint32_t len = 0; + if (GetVarint32(input, &len) && input->size() >= len) { + *result = Slice(input->data(), len); + input->remove_prefix(len); + return true; + } else { + return false; + } +} + +#ifdef HAVE_ROCKSDB +inline Slice GetLengthPrefixedSlice(const char* data) { + uint32_t len = 0; + // +5: we assume "data" is not corrupted + const char *p = GetVarint32Ptr(data, data + 5 /* limit */, &len); + return Slice(p, len); +} + +inline Slice GetSliceUntil(Slice* slice, char delimiter) { + uint32_t len = 0; + for (len = 0; len < slice->size() && slice->data()[len] != delimiter; ++len) { + // nothing + } + + Slice ret(slice->data(), len); + slice->remove_prefix(len + ((len < slice->size()) ? 1 : 0)); + return ret; +} +#endif + +inline uint64_t BitStreamGetInt(const std::string* src, size_t offset, + uint32_t bits) { + return BitStreamGetInt(src->data(), src->size(), offset, bits); +} + +inline uint64_t BitStreamGetInt(const Slice* src, size_t offset, + uint32_t bits) { + return BitStreamGetInt(src->data(), src->size(), offset, bits); +} + } // namespace leveldb #endif // STORAGE_LEVELDB_UTIL_CODING_H_ |