diff options
author | Michael Cahill <michael.cahill@wiredtiger.com> | 2014-07-01 12:06:40 +1000 |
---|---|---|
committer | Michael Cahill <michael.cahill@wiredtiger.com> | 2014-07-01 12:06:40 +1000 |
commit | 4b029ff53b194642205aa417a0211f9ce3f1e614 (patch) | |
tree | f69f134e4e42dc6f3f6f5526bd30992baacf2d67 /api | |
parent | 26304c00751da9b38fc54a53596b910f5243d10d (diff) | |
download | mongo-4b029ff53b194642205aa417a0211f9ce3f1e614.tar.gz |
Implement most of the RocksDB-specific methods.
Diffstat (limited to 'api')
-rw-r--r-- | api/leveldb/leveldb_wt.cc | 136 | ||||
-rw-r--r-- | api/leveldb/leveldb_wt.h | 66 | ||||
-rw-r--r-- | api/leveldb/rocks_wt.cc | 146 |
3 files changed, 247 insertions, 101 deletions
diff --git a/api/leveldb/leveldb_wt.cc b/api/leveldb/leveldb_wt.cc index 72a4176af8b..eed510a0be0 100644 --- a/api/leveldb/leveldb_wt.cc +++ b/api/leveldb/leveldb_wt.cc @@ -200,7 +200,7 @@ Cache::~Cache() {} // If the thread this OperationContext belongs to requires more than one // cursor (for example they start a read snapshot while doing updates), we // open a new session/cursor for each parallel operation. -WT_CURSOR *OperationContext::getCursor() +WT_CURSOR *OperationContext::GetCursor() { int ret; if (!in_use_) { @@ -219,7 +219,7 @@ WT_CURSOR *OperationContext::getCursor() } } -void OperationContext::releaseCursor(WT_CURSOR *cursor) +void OperationContext::ReleaseCursor(WT_CURSOR *cursor) { if (cursor == cursor_) in_use_ = false; @@ -230,6 +230,38 @@ void OperationContext::releaseCursor(WT_CURSOR *cursor) } } +int +wtleveldb_create( + WT_CONNECTION *conn, const Options &options, std::string const &uri) +{ + int ret; + std::stringstream s_table; + s_table << WT_TABLE_CONFIG; + s_table << "internal_page_max=" << options.block_size << ","; + s_table << "leaf_page_max=" << options.block_size << ","; + if (options.compression == leveldb::kSnappyCompression) + s_table << "block_compressor=snappy,"; + s_table << "lsm=("; + s_table << "chunk_size=" << options.write_buffer_size << ","; + if (options.filter_policy) { + int bits = ((FilterPolicyImpl *)options.filter_policy)->bits_per_key_; + s_table << "bloom_bit_count=" << bits << ","; + // Approximate the optimal number of hashes + s_table << "bloom_hash_count=" << (int)(0.6 * bits) << ","; + } + s_table << "),"; + WT_SESSION *session; + std::string table_config = s_table.str(); + if ((ret = conn->open_session(conn, NULL, NULL, &session)) != 0) + return (ret); + if ((ret = session->create(session, uri.c_str(), table_config.c_str())) != 0) + return (ret); + if ((ret = session->close(session, NULL)) != 0) + return (ret); + + return (0); +} + Status leveldb::DB::Open(const Options &options, const std::string &name, leveldb::DB **dbptr) { @@ -263,31 +295,8 @@ leveldb::DB::Open(const Options &options, const std::string &name, leveldb::DB * else if (ret != 0) return WiredTigerErrorToStatus(ret, NULL); - if (options.create_if_missing) { - std::stringstream s_table; - s_table << WT_TABLE_CONFIG; - s_table << "internal_page_max=" << options.block_size << ","; - s_table << "leaf_page_max=" << options.block_size << ","; - if (options.compression == kSnappyCompression) - s_table << "block_compressor=snappy,"; - s_table << "lsm=("; - s_table << "chunk_size=" << options.write_buffer_size << ","; - if (options.filter_policy) { - int bits = ((FilterPolicyImpl *)options.filter_policy)->bits_per_key_; - s_table << "bloom_bit_count=" << bits << ","; - // Approximate the optimal number of hashes - s_table << "bloom_hash_count=" << (int)(0.6 * bits) << ","; - } - s_table << "),"; - WT_SESSION *session; - std::string table_config = s_table.str(); - if ((ret = conn->open_session(conn, NULL, NULL, &session)) != 0) - goto err; - if ((ret = session->create(session, WT_URI, table_config.c_str())) != 0) - goto err; - if ((ret = session->close(session, NULL)) != 0) - goto err; - } + if (options.create_if_missing) + ret = wtleveldb_create(conn, options, WT_URI); if (ret != 0) { err: @@ -305,7 +314,7 @@ Status DbImpl::Put(const WriteOptions& options, const Slice& key, const Slice& value) { - WT_CURSOR *cursor = getCursor(); + WT_CURSOR *cursor = GetCursor(); WT_ITEM item; item.data = key.data(); @@ -315,7 +324,7 @@ DbImpl::Put(const WriteOptions& options, item.size = value.size(); cursor->set_value(cursor, &item); int ret = cursor->insert(cursor); - releaseCursor(cursor); + ReleaseCursor(cursor); return WiredTigerErrorToStatus(ret, NULL); } @@ -326,7 +335,7 @@ DbImpl::Put(const WriteOptions& options, Status DbImpl::Delete(const WriteOptions& options, const Slice& key) { - WT_CURSOR *cursor = getCursor(); + WT_CURSOR *cursor = GetCursor(); WT_ITEM item; item.data = key.data(); @@ -338,7 +347,7 @@ DbImpl::Delete(const WriteOptions& options, const Slice& key) // failures on - it's not necessary for correct operation. int t_ret = cursor->reset(cursor); assert(t_ret == 0); - releaseCursor(cursor); + ReleaseCursor(cursor); return WiredTigerErrorToStatus(ret, NULL); } @@ -347,7 +356,7 @@ class WriteBatchHandler : public WriteBatch::Handler { public: WriteBatchHandler(WT_CURSOR *cursor) : cursor_(cursor), status_(0) {} virtual ~WriteBatchHandler() {} - int getWiredTigerStatus() { return status_; } + int GetWiredTigerStatus() { return status_; } virtual void Put(const Slice& key, const Slice& value) { WT_ITEM item; @@ -386,7 +395,7 @@ Status DbImpl::Write(const WriteOptions& options, WriteBatch* updates) { Status status = Status::OK(); - WT_CURSOR *cursor = getCursor(); + WT_CURSOR *cursor = GetCursor(); WT_SESSION *session = cursor->session; const char *errmsg = NULL; int ret, t_ret; @@ -399,7 +408,7 @@ DbImpl::Write(const WriteOptions& options, WriteBatch* updates) WriteBatchHandler handler(cursor); status = updates->Iterate(&handler); - if ((ret = handler.getWiredTigerStatus()) != WT_DEADLOCK) + if ((ret = handler.GetWiredTigerStatus()) != WT_DEADLOCK) break; // Roll back the transaction on deadlock so we can try again if ((ret = session->rollback_transaction(session, NULL)) != 0) { @@ -414,7 +423,7 @@ DbImpl::Write(const WriteOptions& options, WriteBatch* updates) ret = session->rollback_transaction(session, NULL); err: - releaseCursor(cursor); + ReleaseCursor(cursor); if (status.ok() && ret != 0) status = WiredTigerErrorToStatus(ret, NULL); return status; @@ -432,20 +441,20 @@ DbImpl::Get(const ReadOptions& options, const Slice& key, std::string* value) { WT_CURSOR *cursor; - WT_ITEM item; const SnapshotImpl *si = NULL; const char *errmsg = NULL; // Read options can contain a snapshot for us to use if (options.snapshot == NULL) { - cursor = getCursor(); + cursor = GetCursor(); } else { si = static_cast<const SnapshotImpl *>(options.snapshot); - if (!si->getStatus().ok()) - return si->getStatus(); - cursor = si->getCursor(); + if (!si->GetStatus().ok()) + return si->GetStatus(); + cursor = si->GetCursor(); } + WT_ITEM item; item.data = key.data(); item.size = key.size(); cursor->set_key(cursor, &item); @@ -459,7 +468,7 @@ DbImpl::Get(const ReadOptions& options, err: // Release the cursor if we are not in a snapshot if (si == NULL) - releaseCursor(cursor); + ReleaseCursor(cursor); return WiredTigerErrorToStatus(ret, errmsg); } @@ -482,12 +491,12 @@ DbImpl::Get(const ReadOptions& options, // Read options can contain a snapshot for us to use if (options.snapshot == NULL) { - cursor = getCursor(); + cursor = GetCursor(); } else { si = static_cast<const SnapshotImpl *>(options.snapshot); - if (!si->getStatus().ok()) - return si->getStatus(); - cursor = si->getCursor(); + if (!si->GetStatus().ok()) + return si->GetStatus(); + cursor = si->GetCursor(); } item.data = key.data(); @@ -503,7 +512,7 @@ DbImpl::Get(const ReadOptions& options, err: // Release the cursor if we are not in a snapshot if (si == NULL) - releaseCursor(cursor); + ReleaseCursor(cursor); return WiredTigerErrorToStatus(ret, errmsg); } #endif @@ -520,6 +529,11 @@ DbImpl::NewIterator(const ReadOptions& options) return new IteratorImpl(this, options); } +SnapshotImpl::SnapshotImpl(DbImpl *db) : + Snapshot(), db_(db), context_(db->NewContext()), status_(Status::OK()) +{ +} + // Return a handle to the current DB state. Iterators created with // this handle will all observe a stable snapshot of the current DB // state. The caller must call ReleaseSnapshot(result) when the @@ -528,7 +542,7 @@ const Snapshot * DbImpl::GetSnapshot() { SnapshotImpl *snapshot = new SnapshotImpl(this); - Status status = snapshot->setupTransaction(); + Status status = snapshot->SetupTransaction(); if (!status.ok()) { delete snapshot; // TODO: Flag an error here? @@ -545,7 +559,7 @@ DbImpl::ReleaseSnapshot(const Snapshot* snapshot) SnapshotImpl *si = static_cast<SnapshotImpl *>(const_cast<Snapshot *>(snapshot)); if (si != NULL) { - si->releaseTransaction(); + si->ReleaseTransaction(); delete si; } } @@ -605,11 +619,11 @@ DbImpl::CompactRange(const Slice* begin, const Slice* end) { // The compact doesn't need a cursor, but the context always opens a // cursor when opening the session - so grab that, and use the session. - WT_CURSOR *cursor = getCursor(); + WT_CURSOR *cursor = GetCursor(); WT_SESSION *session = cursor->session; int ret = session->compact(session, WT_URI, NULL); assert(ret == 0); - releaseCursor(cursor); + ReleaseCursor(cursor); } // Suspends the background compaction thread. This methods @@ -631,20 +645,20 @@ IteratorImpl::IteratorImpl(DbImpl *db, const ReadOptions &options) : cursor_(NULL), db_(db), status_(Status::OK()), valid_(false) { if (options.snapshot == NULL) { - cursor_ = db_->getCursor(); - snapshot_iterator_ = false; + cursor_ = db_->GetCursor(); + own_cursor_ = true; } else { const SnapshotImpl *si = static_cast<const SnapshotImpl *>(options.snapshot); - cursor_ = si->getCursor(); - snapshot_iterator_ = true; + cursor_ = si->GetCursor(); + own_cursor_ = false; } } IteratorImpl::~IteratorImpl() { - if (!snapshot_iterator_) - db_->releaseCursor(cursor_); + if (own_cursor_) + db_->ReleaseCursor(cursor_); } // Position at the first key in the source. The iterator is Valid() @@ -829,19 +843,17 @@ IteratorImpl::Prev() } // Implementation for WiredTiger specific read snapshot -Status SnapshotImpl::setupTransaction() +Status SnapshotImpl::SetupTransaction() { - cursor_ = db_->getCursor(); - WT_SESSION *session = cursor_->session; + WT_SESSION *session = context_->GetSession(); int ret = session->begin_transaction(session, NULL); return WiredTigerErrorToStatus(ret, NULL); } -Status SnapshotImpl::releaseTransaction() +Status SnapshotImpl::ReleaseTransaction() { - WT_SESSION *session = cursor_->session; + WT_SESSION *session = context_->GetSession(); int ret = session->commit_transaction(session, NULL); - db_->releaseCursor(cursor_); return WiredTigerErrorToStatus(ret, NULL); } diff --git a/api/leveldb/leveldb_wt.h b/api/leveldb/leveldb_wt.h index 32f4310dcd1..768f3854caf 100644 --- a/api/leveldb/leveldb_wt.h +++ b/api/leveldb/leveldb_wt.h @@ -107,11 +107,11 @@ public: assert(ret == 0); } - T *get() { + T *Get() { return (T *)(pthread_getspecific(key_)); } - void set(T *value) { + void Set(T *value) { int ret = pthread_setspecific(key_, value); assert(ret == 0); } @@ -141,19 +141,34 @@ public: #endif } - WT_CURSOR *getCursor(); - void releaseCursor(WT_CURSOR *cursor); + WT_CURSOR *GetCursor(); +#ifdef HAVE_ROCKSDB + WT_CURSOR *GetCursor(int i) { + return (i < cursors_.size()) ? cursors_[i] : NULL; + } + void SetCursor(int i, WT_CURSOR *c) { + if (i >= cursors_.size()) + cursors_.resize(i + 1); + cursors_[i] = c; + } +#endif + WT_SESSION *GetSession() { return session_; } + void ReleaseCursor(WT_CURSOR *cursor); private: WT_CONNECTION *conn_; WT_SESSION *session_; WT_CURSOR *cursor_; +#ifdef HAVE_ROCKSDB + std::vector<WT_CURSOR *> cursors_; +#endif bool in_use_; }; class IteratorImpl : public Iterator { public: IteratorImpl(DbImpl *db, const ReadOptions &options); + IteratorImpl(DbImpl *db, WT_CURSOR *cursor) : db_(db), cursor_(cursor), own_cursor_(true) {} virtual ~IteratorImpl(); // An iterator is either positioned at a key/value pair, or @@ -188,7 +203,7 @@ private: Slice key_, value_; Status status_; bool valid_; - bool snapshot_iterator_; + bool own_cursor_; void SetError(int wiredTigerError) { valid_ = false; @@ -204,17 +219,17 @@ class SnapshotImpl : public Snapshot { friend class DbImpl; friend class IteratorImpl; public: - SnapshotImpl(DbImpl *db) : - Snapshot(), db_(db), cursor_(NULL), status_(Status::OK()) {} - virtual ~SnapshotImpl() {} + SnapshotImpl(DbImpl *db); + virtual ~SnapshotImpl() { delete context_; } protected: - WT_CURSOR *getCursor() const { return cursor_; } - Status getStatus() const { return status_; } - Status setupTransaction(); - Status releaseTransaction(); + OperationContext *GetContext() const { return context_; } + WT_CURSOR *GetCursor() const { return context_->GetCursor(); } + Status GetStatus() const { return status_; } + Status SetupTransaction(); + Status ReleaseTransaction(); private: DbImpl *db_; - WT_CURSOR *cursor_; + OperationContext *context_; Status status_; }; @@ -343,12 +358,19 @@ public: private: WT_CONNECTION *conn_; ThreadLocal<OperationContext> *context_; +#ifdef HAVE_ROCKSDB + int numColumns_; +#endif - OperationContext *getContext() { - OperationContext *ctx = context_->get(); + OperationContext *NewContext() { + return new OperationContext(conn_); + } + + OperationContext *GetContext() { + OperationContext *ctx = context_->Get(); if (ctx == NULL) { - ctx = new OperationContext(conn_); - context_->set(ctx); + ctx = NewContext(); + context_->Set(ctx); } return (ctx); } @@ -358,8 +380,8 @@ private: void operator=(const DbImpl&); protected: - WT_CURSOR *getCursor() { return getContext()->getCursor(); } - void releaseCursor(WT_CURSOR *cursor) { getContext()->releaseCursor(cursor); } + WT_CURSOR *GetCursor() { return GetContext()->GetCursor(); } + void ReleaseCursor(WT_CURSOR *cursor) { GetContext()->ReleaseCursor(cursor); } }; #ifdef HAVE_ROCKSDB @@ -368,13 +390,17 @@ protected: // is done using the column family class ColumnFamilyHandleImpl : public ColumnFamilyHandle { public: - ColumnFamilyHandleImpl(DbImpl* db, uint32_t id) : db_(db), id_(id) {} + ColumnFamilyHandleImpl(DbImpl* db, std::string const &name, uint32_t id) : db_(db), id_(id), name_(name) {} virtual ~ColumnFamilyHandleImpl() {} virtual uint32_t GetID() const { return id_; } + std::string const &GetName() const { return name_; } + std::string const GetURI() const { return "table:" + name_; } + private: DbImpl* db_; uint32_t id_; + std::string const name_; }; #endif diff --git a/api/leveldb/rocks_wt.cc b/api/leveldb/rocks_wt.cc index 13fd96b734d..56a32c7f487 100644 --- a/api/leveldb/rocks_wt.cc +++ b/api/leveldb/rocks_wt.cc @@ -32,6 +32,8 @@ #include <sstream> using leveldb::Cache; +using leveldb::DB; +using leveldb::FlushOptions; using leveldb::FilterPolicy; using leveldb::Iterator; using leveldb::Options; @@ -44,15 +46,25 @@ using leveldb::Snapshot; using leveldb::Status; Status -rocksdb::DB::ListColumnFamilies(rocksdb::Options const&, std::string const&, std::vector<std::string, std::allocator<std::string> >*) +DB::ListColumnFamilies(Options const &, std::string const &, std::vector<std::string> *) { return WiredTigerErrorToStatus(ENOTSUP); } Status -rocksdb::DB::Open(rocksdb::Options const&, std::string const&, std::vector<rocksdb::ColumnFamilyDescriptor, std::allocator<rocksdb::ColumnFamilyDescriptor> > const&, std::vector<rocksdb::ColumnFamilyHandle*, std::allocator<rocksdb::ColumnFamilyHandle*> >*, rocksdb::DB**) +DB::Open(Options const &options, std::string const &name, const std::vector<ColumnFamilyDescriptor> &column_families, std::vector<ColumnFamilyHandle*> *handles, DB**dbptr) { - return WiredTigerErrorToStatus(ENOTSUP); + Status status = Open(options, name, dbptr); + if (!status.ok()) + return status; + DbImpl *db = reinterpret_cast<DbImpl *>(dbptr); + std::vector<ColumnFamilyHandle*> cfhandles( + column_families.size()); + for (size_t i = 0; i < column_families.size(); i++) + cfhandles[i] = new ColumnFamilyHandleImpl( + db, column_families[i].name, (int)i); + *handles = cfhandles; + return Status::OK(); } void @@ -66,49 +78,117 @@ WriteBatch::Handler::LogData(const Slice& blob) } Status -DbImpl::Merge(rocksdb::WriteOptions const&, rocksdb::ColumnFamilyHandle*, rocksdb::Slice const&, rocksdb::Slice const&) +DbImpl::Merge(WriteOptions const&, ColumnFamilyHandle*, Slice const&, Slice const&) { return WiredTigerErrorToStatus(ENOTSUP); } Status -DbImpl::CreateColumnFamily(rocksdb::Options const&, std::string const&, rocksdb::ColumnFamilyHandle**) +DbImpl::CreateColumnFamily(Options const &options, std::string const &name, ColumnFamilyHandle **cfhp) { - return WiredTigerErrorToStatus(ENOTSUP); + extern int wtleveldb_create(WT_CONNECTION *, + const Options &, std::string const &uri); + int ret = wtleveldb_create(conn_, options, "table:" + name); + if (ret != 0) + return WiredTigerErrorToStatus(ret); + *cfhp = new ColumnFamilyHandleImpl(this, name, ++numColumns_); + return Status::OK(); } Status -DbImpl::DropColumnFamily(rocksdb::ColumnFamilyHandle*) +DbImpl::DropColumnFamily(ColumnFamilyHandle *cfhp) { - return WiredTigerErrorToStatus(ENOTSUP); + ColumnFamilyHandleImpl *cf = + reinterpret_cast<ColumnFamilyHandleImpl *>(cfhp); + WT_SESSION *session = GetContext()->GetSession(); + int ret = session->drop(session, cf->GetURI().c_str(), NULL); + return WiredTigerErrorToStatus(ret); +} + +static int +wtrocks_get_cursor(OperationContext *context, ColumnFamilyHandle *cfhp, WT_CURSOR **cursorp) +{ + ColumnFamilyHandleImpl *cf = + reinterpret_cast<ColumnFamilyHandleImpl *>(cfhp); + WT_CURSOR *c = context->GetCursor(cf->GetID()); + if (c == NULL) { + WT_SESSION *session = context->GetSession(); + int ret; + if ((ret = session->open_cursor( + session, cf->GetURI().c_str(), NULL, NULL, &c)) != 0) + return (ret); + context->SetCursor(cf->GetID(), c); + } + *cursorp = c; + return (0); } Status -DbImpl::Delete(rocksdb::WriteOptions const&, rocksdb::ColumnFamilyHandle*, rocksdb::Slice const&) +DbImpl::Delete(WriteOptions const &write_options, ColumnFamilyHandle *cfhp, Slice const &key) { - return WiredTigerErrorToStatus(ENOTSUP); + WT_CURSOR *cursor; + int ret = wtrocks_get_cursor(GetContext(), cfhp, &cursor); + if (ret != 0) + return WiredTigerErrorToStatus(ret); + WT_ITEM item; + item.data = key.data(); + item.size = key.size(); + ret = cursor->remove(cursor); + // Reset the WiredTiger cursor so it doesn't keep any pages pinned. + // Track failures in debug builds since we don't expect failure, but + // don't pass failures on - it's not necessary for correct operation. + int t_ret = cursor->reset(cursor); + assert(t_ret == 0); + return WiredTigerErrorToStatus(ret); } Status -DbImpl::Flush(rocksdb::FlushOptions const&, rocksdb::ColumnFamilyHandle*) +DbImpl::Flush(FlushOptions const&, ColumnFamilyHandle*) { return WiredTigerErrorToStatus(ENOTSUP); } Status -DbImpl::Get(rocksdb::ReadOptions const&, rocksdb::ColumnFamilyHandle*, rocksdb::Slice const&, std::string*) +DbImpl::Get(ReadOptions const &options, ColumnFamilyHandle *cfhp, Slice const &key, std::string *value) { - return WiredTigerErrorToStatus(ENOTSUP); + const char *errmsg = NULL; + OperationContext *context = NULL; + // Read options can contain a snapshot for us to use + if (options.snapshot == NULL) { + context = GetContext(); + } else { + const SnapshotImpl *si = + reinterpret_cast<const SnapshotImpl *>(options.snapshot); + if (!si->GetStatus().ok()) + return si->GetStatus(); + context = si->GetContext(); + } + + WT_CURSOR *cursor; + int ret = wtrocks_get_cursor(context, cfhp, &cursor); + if (ret != 0) + return WiredTigerErrorToStatus(ret); + + WT_ITEM item; + item.data = key.data(); + item.size = key.size(); + cursor->set_key(cursor, &item); + if ((ret = cursor->search(cursor)) == 0 && + (ret = cursor->get_value(cursor, &item)) == 0) + *value = std::string((const char *)item.data, item.size); + if (ret == WT_NOTFOUND) + errmsg = "DB::Get key not found"; + return WiredTigerErrorToStatus(ret, errmsg); } bool -DbImpl::GetProperty(rocksdb::ColumnFamilyHandle*, rocksdb::Slice const&, std::string*) +DbImpl::GetProperty(ColumnFamilyHandle*, Slice const&, std::string*) { return false; } std::vector<Status> -DbImpl::MultiGet(rocksdb::ReadOptions const&, std::vector<rocksdb::ColumnFamilyHandle*, std::allocator<rocksdb::ColumnFamilyHandle*> > const&, std::vector<rocksdb::Slice, std::allocator<rocksdb::Slice> > const&, std::vector<std::string, std::allocator<std::string> >*) +DbImpl::MultiGet(ReadOptions const&, std::vector<ColumnFamilyHandle*> const&, std::vector<Slice> const&, std::vector<std::string, std::allocator<std::string> >*) { std::vector<Status> ret; ret.push_back(WiredTigerErrorToStatus(ENOTSUP)); @@ -116,13 +196,41 @@ DbImpl::MultiGet(rocksdb::ReadOptions const&, std::vector<rocksdb::ColumnFamilyH } Iterator * -DbImpl::NewIterator(rocksdb::ReadOptions const&, rocksdb::ColumnFamilyHandle*) +DbImpl::NewIterator(ReadOptions const &options, ColumnFamilyHandle *cfhp) { - return NULL; + OperationContext *context = NULL; + // Read options can contain a snapshot for us to use + if (options.snapshot == NULL) { + context = GetContext(); + } else { + const SnapshotImpl *si = + reinterpret_cast<const SnapshotImpl *>(options.snapshot); + if (!si->GetStatus().ok()) + return NULL; + context = si->GetContext(); + } + + WT_CURSOR *cursor; + int ret = wtrocks_get_cursor(context, cfhp, &cursor); + if (ret != 0) + return NULL; + + return new IteratorImpl(this, cursor); } Status -DbImpl::Put(rocksdb::WriteOptions const&, rocksdb::ColumnFamilyHandle*, rocksdb::Slice const&, rocksdb::Slice const&) +DbImpl::Put(WriteOptions const &options, ColumnFamilyHandle *cfhp, Slice const &key, Slice const &value) { - return WiredTigerErrorToStatus(ENOTSUP); + WT_CURSOR *cursor; + int ret = wtrocks_get_cursor(GetContext(), cfhp, &cursor); + WT_ITEM item; + + item.data = key.data(); + item.size = key.size(); + cursor->set_key(cursor, &item); + item.data = value.data(); + item.size = value.size(); + cursor->set_value(cursor, &item); + ret = cursor->insert(cursor); + return WiredTigerErrorToStatus(ret, NULL); } |