diff options
author | Michael Cahill <michael.cahill@wiredtiger.com> | 2014-07-02 22:39:14 +1000 |
---|---|---|
committer | Michael Cahill <michael.cahill@wiredtiger.com> | 2014-07-02 22:39:14 +1000 |
commit | dd246265793c035cbfdde034efa61c3014643f4f (patch) | |
tree | e5ad33e1cda7f971a1bfa9973dcdc3bd6111184a /api | |
parent | f64b96f97d1a31720dd0e1e4142b962c8381f413 (diff) | |
download | mongo-dd246265793c035cbfdde034efa61c3014643f4f.tar.gz |
Finish implementation of enough of the RocksDB API to allow MongoDB to run.
Diffstat (limited to 'api')
-rw-r--r-- | api/leveldb/leveldb_wt.cc | 95 | ||||
-rw-r--r-- | api/leveldb/leveldb_wt.h | 114 | ||||
-rw-r--r-- | api/leveldb/rocks_wt.cc | 123 |
3 files changed, 221 insertions, 111 deletions
diff --git a/api/leveldb/leveldb_wt.cc b/api/leveldb/leveldb_wt.cc index 10543676a84..2489b549c5e 100644 --- a/api/leveldb/leveldb_wt.cc +++ b/api/leveldb/leveldb_wt.cc @@ -239,10 +239,12 @@ leveldb::DB::Open(const Options &options, const std::string &name, leveldb::DB * } if (options.error_if_exists) s_conn << "exclusive,"; +#if 0 if (options.compression == kSnappyCompression) s_conn << "extensions=[libwiredtiger_snappy.so],"; +#endif size_t cache_size = 2 * options.write_buffer_size; - cache_size += options.max_open_files * (4 << 20); + cache_size += (size_t)options.max_open_files * (4 << 20); if (options.block_cache) cache_size += ((CacheImpl *)options.block_cache)->capacity_; else @@ -251,7 +253,7 @@ leveldb::DB::Open(const Options &options, const std::string &name, leveldb::DB * std::string conn_config = s_conn.str(); WT_CONNECTION *conn; - fprintf(stderr,"Open: Home %s config %s\r\n",name.c_str(),conn_config.c_str()); + printf("Open: home %s config %s\r\n",name.c_str(),conn_config.c_str()); int ret = ::wiredtiger_open(name.c_str(), NULL, conn_config.c_str(), &conn); if (ret == ENOENT) return Status::NotFound(Slice("Database does not exist.")); @@ -314,44 +316,33 @@ DbImpl::Delete(const WriteOptions& options, const Slice& key) return WiredTigerErrorToStatus(ret, NULL); } -// Implement WriteBatch::Handler -class WriteBatchHandler : public WriteBatch::Handler { -public: - WriteBatchHandler(OperationContext *context) : context_(context), status_(0) {} - virtual ~WriteBatchHandler() {} - int GetWiredTigerStatus() { return status_; } - - virtual void Put(const Slice& key, const Slice& value) { - WT_CURSOR *cursor = context_->GetCursor(); - WT_ITEM item; - - item.data = key.data(); - item.size = key.size(); - cursor->set_key(cursor, &item); - item.data = value.data(); - item.size = value.size(); - cursor->set_value(cursor, &item); - int ret = cursor->insert(cursor); - if (ret != 0 && status_ == 0) - status_ = ret; - } +void +WriteBatchHandler::Put(const Slice& key, const Slice& value) { + WT_CURSOR *cursor = context_->GetCursor(); + WT_ITEM item; - virtual void Delete(const Slice& key) { - WT_CURSOR *cursor = context_->GetCursor(); - WT_ITEM item; + item.data = key.data(); + item.size = key.size(); + cursor->set_key(cursor, &item); + item.data = value.data(); + item.size = value.size(); + cursor->set_value(cursor, &item); + int ret = cursor->insert(cursor); + if (ret != 0 && status_ == 0) + status_ = ret; +} - item.data = key.data(); - item.size = key.size(); - cursor->set_key(cursor, &item); - int ret = cursor->remove(cursor); - if (ret != 0 && status_ == 0) - status_ = ret; - } +void WriteBatchHandler::Delete(const Slice& key) { + WT_CURSOR *cursor = context_->GetCursor(); + WT_ITEM item; -private: - OperationContext *context_; - int status_; -}; + item.data = key.data(); + item.size = key.size(); + cursor->set_key(cursor, &item); + int ret = cursor->remove(cursor); + if (ret != 0 && status_ == 0) + status_ = ret; +} // Apply the specified updates to the database. // Returns OK on success, non-OK on failure. @@ -371,9 +362,14 @@ DbImpl::Write(const WriteOptions& options, WriteBatch* updates) goto err; } - WriteBatchHandler handler(context); - status = updates->Iterate(&handler); - if ((ret = handler.GetWiredTigerStatus()) != WT_DEADLOCK) + WriteBatchHandler handler(this, context); + try { + status = updates->Iterate(&handler); + } catch(...) { + (void)session->rollback_transaction(session, NULL); + throw; + } + if (!status.ok() || (ret = handler.GetWiredTigerStatus()) != WT_DEADLOCK) break; // Roll back the transaction on deadlock so we can try again if ((ret = session->rollback_transaction(session, NULL)) != 0) { @@ -382,10 +378,13 @@ DbImpl::Write(const WriteOptions& options, WriteBatch* updates) } } - if (status.ok() && ret == 0) + if (status.ok() && ret == 0) { ret = session->commit_transaction(session, NULL); - else if (ret == 0) - ret = session->rollback_transaction(session, NULL); + } else { + t_ret = session->rollback_transaction(session, NULL); + if (ret == 0) + ret = t_ret; + } err: if (status.ok() && ret != 0) @@ -475,7 +474,8 @@ DbImpl::NewIterator(const ReadOptions& options) WT_SESSION *session = context->GetSession(); WT_CURSOR *c = context->GetCursor(); WT_CURSOR *iterc; - int ret = session->open_cursor(session, NULL, c, NULL, &iterc); + /* XXX would like a fast duplicate for LSM cursors without position. */ + int ret = session->open_cursor(session, c->uri, NULL, NULL, &iterc); assert(ret == 0); return new IteratorImpl(this, iterc); } @@ -492,11 +492,11 @@ SnapshotImpl::SnapshotImpl(DbImpl *db) : const Snapshot * DbImpl::GetSnapshot() { - SnapshotImpl *snapshot = new SnapshotImpl(this); - WT_SESSION *session = snapshot->GetContext()->GetSession(); + SnapshotImpl *si = new SnapshotImpl(this); + WT_SESSION *session = si->GetContext()->GetSession(); int ret = session->begin_transaction(session, NULL); assert(ret == 0); - return snapshot; + return si; } // Release a previously acquired snapshot. The caller must not @@ -507,6 +507,7 @@ DbImpl::ReleaseSnapshot(const Snapshot* snapshot) SnapshotImpl *si = static_cast<SnapshotImpl *>(const_cast<Snapshot *>(snapshot)); if (si != NULL) { + WT_SESSION *session = si->GetContext()->GetSession(); // We started a transaction: we could commit it here, but it will be rolled // back automatically by closing the session, which we have to do anyway. int ret = si->GetContext()->Close(); diff --git a/api/leveldb/leveldb_wt.h b/api/leveldb/leveldb_wt.h index 462b1fcc865..489240af86f 100644 --- a/api/leveldb/leveldb_wt.h +++ b/api/leveldb/leveldb_wt.h @@ -74,21 +74,6 @@ using leveldb::ColumnFamilyHandle; extern Status WiredTigerErrorToStatus(int wiredTigerError, const char *msg = ""); -class CacheImpl : public Cache { -public: - CacheImpl(size_t capacity) : Cache(), capacity_(capacity) {} - - virtual Handle* Insert(const Slice& key, void* value, size_t charge, - void (*deleter)(const Slice& key, void* value)) { return 0; } - virtual Handle* Lookup(const Slice& key) { return 0; } - virtual void Release(Handle* handle) {} - virtual void* Value(Handle* handle) { return 0; } - virtual void Erase(const Slice& key) {} - virtual uint64_t NewId() { return 0; } - - size_t capacity_; -}; - /* POSIX thread-local storage */ template <class T> class ThreadLocal { @@ -170,6 +155,42 @@ private: #endif }; +class CacheImpl : public Cache { +public: + CacheImpl(size_t capacity) : Cache(), capacity_(capacity) {} + + virtual Handle* Insert(const Slice& key, void* value, size_t charge, + void (*deleter)(const Slice& key, void* value)) { return 0; } + virtual Handle* Lookup(const Slice& key) { return 0; } + virtual void Release(Handle* handle) {} + virtual void* Value(Handle* handle) { return 0; } + virtual void Erase(const Slice& key) {} + virtual uint64_t NewId() { return 0; } + + size_t capacity_; +}; + +#ifdef HAVE_ROCKSDB +// ColumnFamilyHandleImpl is the class that clients use to access different +// column families. It has non-trivial destructor, which gets called when client +// is done using the column family +class ColumnFamilyHandleImpl : public ColumnFamilyHandle { + public: + ColumnFamilyHandleImpl(DbImpl* db, std::string const &name, uint32_t id) : db_(db), id_(id), name_(name) {} + ColumnFamilyHandleImpl(const ColumnFamilyHandleImpl ©from) : db_(copyfrom.db_), id_(copyfrom.id_), name_(copyfrom.name_) {} + virtual ~ColumnFamilyHandleImpl() {} + virtual uint32_t GetID() const { return id_; } + + std::string const &GetName() const { return name_; } + std::string const GetURI() const { return "table:" + name_; } + + private: + DbImpl* db_; + uint32_t id_; + std::string const name_; +}; +#endif + class IteratorImpl : public Iterator { public: IteratorImpl(DbImpl *db, WT_CURSOR *cursor) : db_(db), cursor_(cursor), own_cursor_(true) {} @@ -338,6 +359,13 @@ public: // Flush all mem-table data. virtual Status Flush(const FlushOptions& options, ColumnFamilyHandle* column_family); + + ColumnFamilyHandleImpl *GetCF(uint32_t column_family_id) { + return reinterpret_cast<ColumnFamilyHandleImpl *>(columns_.at(column_family_id)); + } + void SetColumns(std::vector<ColumnFamilyHandle *> &cols) { + columns_ = cols; + } #endif virtual Iterator* NewIterator(const ReadOptions& options); @@ -357,26 +385,26 @@ public: virtual void ResumeCompactions(); + OperationContext *GetContext() { + OperationContext *ctx = context_->Get(); + if (ctx == NULL) { + ctx = NewContext(); + context_->Set(ctx); + } + return (ctx); + } + private: WT_CONNECTION *conn_; ThreadLocal<OperationContext> *context_; #ifdef HAVE_ROCKSDB - int numColumns_; + std::vector<ColumnFamilyHandle*> columns_; #endif OperationContext *NewContext() { return new OperationContext(conn_); } - OperationContext *GetContext() { - OperationContext *ctx = context_->Get(); - if (ctx == NULL) { - ctx = NewContext(); - context_->Set(ctx); - } - return (ctx); - } - OperationContext *GetContext(const ReadOptions &options) { if (options.snapshot == NULL) return GetContext(); @@ -393,24 +421,28 @@ private: void operator=(const DbImpl&); }; -#ifdef HAVE_ROCKSDB -// ColumnFamilyHandleImpl is the class that clients use to access different -// column families. It has non-trivial destructor, which gets called when client -// is done using the column family -class ColumnFamilyHandleImpl : public ColumnFamilyHandle { - public: - ColumnFamilyHandleImpl(DbImpl* db, std::string const &name, uint32_t id) : db_(db), id_(id), name_(name) {} - virtual ~ColumnFamilyHandleImpl() {} - virtual uint32_t GetID() const { return id_; } +// Implemention of WriteBatch::Handler +class WriteBatchHandler : public WriteBatch::Handler { +public: + WriteBatchHandler(DbImpl *db, OperationContext *context) : db_(db), context_(context), status_(0) {} + virtual ~WriteBatchHandler() {} + int GetWiredTigerStatus() { return status_; } - std::string const &GetName() const { return name_; } - std::string const GetURI() const { return "table:" + name_; } + virtual void Put(const Slice& key, const Slice& value); - private: - DbImpl* db_; - uint32_t id_; - std::string const name_; -}; + virtual void Delete(const Slice& key); + +#ifdef HAVE_ROCKSDB + // Implementations are in rocksdb_wt.cc + virtual Status PutCF(uint32_t column_family_id, const Slice& key, + const Slice& value); + virtual Status DeleteCF(uint32_t column_family_id, const Slice& key); #endif +private: + DbImpl *db_; + OperationContext *context_; + int status_; +}; + #endif diff --git a/api/leveldb/rocks_wt.cc b/api/leveldb/rocks_wt.cc index f690e3e8221..37ecc42f524 100644 --- a/api/leveldb/rocks_wt.cc +++ b/api/leveldb/rocks_wt.cc @@ -45,10 +45,69 @@ using leveldb::Slice; using leveldb::Snapshot; using leveldb::Status; +static int +wtrocks_get_cursor(OperationContext *context, ColumnFamilyHandle *cfhp, WT_CURSOR **cursorp) +{ + ColumnFamilyHandleImpl *cf = + reinterpret_cast<ColumnFamilyHandleImpl *>(cfhp); + WT_CURSOR *c = context->GetCursor(cf->GetID()); + if (c == NULL) { + WT_SESSION *session = context->GetSession(); + int ret; + if ((ret = session->open_cursor( + session, cf->GetURI().c_str(), NULL, NULL, &c)) != 0) { + fprintf(stderr, "Failed to open cursor on %s: %s\n", cf->GetURI().c_str(), wiredtiger_strerror(ret)); + return (ret); + } + context->SetCursor(cf->GetID(), c); + } + *cursorp = c; + return (0); +} + Status -DB::ListColumnFamilies(Options const &, std::string const &, std::vector<std::string> *) +DB::ListColumnFamilies( + Options const &options, std::string const &name, + std::vector<std::string> *column_families) { - return WiredTigerErrorToStatus(ENOTSUP); + std::vector<std::string> cf; + DB *dbptr; + Status status = DB::Open(options, name, &dbptr); + if (!status.ok()) + return status; + DbImpl *db = reinterpret_cast<DbImpl *>(dbptr); + OperationContext *context = db->GetContext(); + WT_SESSION *session = context->GetSession(); + WT_CURSOR *c; + int ret = session->open_cursor(session, "metadata:", NULL, NULL, &c); + if (ret != 0) + goto err; + c->set_key(c, "table:"); + /* Position on the first table entry */ + int cmp; + ret = c->search_near(c, &cmp); + if (ret != 0 || (cmp < 0 && (ret = c->next(c)) != 0)) + goto err; + /* Add entries while we are getting "table" URIs. */ + for (; ret == 0; ret = c->next(c)) { + const char *key; + if ((ret = c->get_key(c, &key)) != 0) + goto err; + if (strncmp(key, "table:", strlen("table:")) != 0) + break; + cf.push_back(std::string(key + strlen("table:"))); + } + +err: delete db; + /* + * WT_NOTFOUND is not an error: it just means we got to the end of the + * list of tables. + */ + if (ret == 0 || ret == WT_NOTFOUND) { + *column_families = cf; + ret = 0; + } + return WiredTigerErrorToStatus(ret); } Status @@ -63,7 +122,7 @@ DB::Open(Options const &options, std::string const &name, const std::vector<Colu for (size_t i = 0; i < column_families.size(); i++) cfhandles[i] = new ColumnFamilyHandleImpl( db, column_families[i].name, (int)i); - *handles = cfhandles; + db->SetColumns(*handles = cfhandles); return Status::OK(); } @@ -78,6 +137,40 @@ WriteBatch::Handler::LogData(const Slice& blob) } Status +WriteBatchHandler::PutCF( + uint32_t column_family_id, const Slice& key, const Slice& value) +{ + WT_CURSOR *cursor; + int ret = wtrocks_get_cursor(context_, db_->GetCF(column_family_id), &cursor); + if (ret != 0) + return WiredTigerErrorToStatus(ret); + WT_ITEM item; + item.data = key.data(); + item.size = key.size(); + cursor->set_key(cursor, &item); + item.data = value.data(); + item.size = value.size(); + cursor->set_value(cursor, &item); + ret = cursor->insert(cursor); + return WiredTigerErrorToStatus(ret); +} + +Status +WriteBatchHandler::DeleteCF(uint32_t column_family_id, const Slice& key) +{ + WT_CURSOR *cursor; + int ret = wtrocks_get_cursor(context_, db_->GetCF(column_family_id), &cursor); + if (ret != 0) + return WiredTigerErrorToStatus(ret); + WT_ITEM item; + item.data = key.data(); + item.size = key.size(); + cursor->set_key(cursor, &item); + ret = cursor->remove(cursor); + return WiredTigerErrorToStatus(ret); +} + +Status DbImpl::Merge(WriteOptions const&, ColumnFamilyHandle*, Slice const&, Slice const&) { return WiredTigerErrorToStatus(ENOTSUP); @@ -91,7 +184,8 @@ DbImpl::CreateColumnFamily(Options const &options, std::string const &name, Colu int ret = wtleveldb_create(conn_, options, "table:" + name); if (ret != 0) return WiredTigerErrorToStatus(ret); - *cfhp = new ColumnFamilyHandleImpl(this, name, ++numColumns_); + *cfhp = new ColumnFamilyHandleImpl(this, name, columns_.size()); + columns_.push_back(*cfhp); return Status::OK(); } @@ -105,24 +199,6 @@ DbImpl::DropColumnFamily(ColumnFamilyHandle *cfhp) return WiredTigerErrorToStatus(ret); } -static int -wtrocks_get_cursor(OperationContext *context, ColumnFamilyHandle *cfhp, WT_CURSOR **cursorp) -{ - ColumnFamilyHandleImpl *cf = - reinterpret_cast<ColumnFamilyHandleImpl *>(cfhp); - WT_CURSOR *c = context->GetCursor(cf->GetID()); - if (c == NULL) { - WT_SESSION *session = context->GetSession(); - int ret; - if ((ret = session->open_cursor( - session, cf->GetURI().c_str(), NULL, NULL, &c)) != 0) - return (ret); - context->SetCursor(cf->GetID(), c); - } - *cursorp = c; - return (0); -} - Status DbImpl::Delete(WriteOptions const &write_options, ColumnFamilyHandle *cfhp, Slice const &key) { @@ -195,7 +271,8 @@ DbImpl::NewIterator(ReadOptions const &options, ColumnFamilyHandle *cfhp) WT_CURSOR *c, *iterc; int ret = wtrocks_get_cursor(context, cfhp, &c); assert(ret == 0); - ret = session->open_cursor(session, NULL, c, NULL, &iterc); + /* XXX would like a fast duplicate for LSM cursors without position. */ + ret = session->open_cursor(session, c->uri, NULL, NULL, &iterc); assert(ret == 0); return new IteratorImpl(this, iterc); } |