summaryrefslogtreecommitdiff
path: root/api
diff options
context:
space:
mode:
authorMichael Cahill <michael.cahill@wiredtiger.com>2014-07-01 12:06:40 +1000
committerMichael Cahill <michael.cahill@wiredtiger.com>2014-07-01 12:06:40 +1000
commit4b029ff53b194642205aa417a0211f9ce3f1e614 (patch)
treef69f134e4e42dc6f3f6f5526bd30992baacf2d67 /api
parent26304c00751da9b38fc54a53596b910f5243d10d (diff)
downloadmongo-4b029ff53b194642205aa417a0211f9ce3f1e614.tar.gz
Implement most of the RocksDB-specific methods.
Diffstat (limited to 'api')
-rw-r--r--api/leveldb/leveldb_wt.cc136
-rw-r--r--api/leveldb/leveldb_wt.h66
-rw-r--r--api/leveldb/rocks_wt.cc146
3 files changed, 247 insertions, 101 deletions
diff --git a/api/leveldb/leveldb_wt.cc b/api/leveldb/leveldb_wt.cc
index 72a4176af8b..eed510a0be0 100644
--- a/api/leveldb/leveldb_wt.cc
+++ b/api/leveldb/leveldb_wt.cc
@@ -200,7 +200,7 @@ Cache::~Cache() {}
// If the thread this OperationContext belongs to requires more than one
// cursor (for example they start a read snapshot while doing updates), we
// open a new session/cursor for each parallel operation.
-WT_CURSOR *OperationContext::getCursor()
+WT_CURSOR *OperationContext::GetCursor()
{
int ret;
if (!in_use_) {
@@ -219,7 +219,7 @@ WT_CURSOR *OperationContext::getCursor()
}
}
-void OperationContext::releaseCursor(WT_CURSOR *cursor)
+void OperationContext::ReleaseCursor(WT_CURSOR *cursor)
{
if (cursor == cursor_)
in_use_ = false;
@@ -230,6 +230,38 @@ void OperationContext::releaseCursor(WT_CURSOR *cursor)
}
}
+int
+wtleveldb_create(
+ WT_CONNECTION *conn, const Options &options, std::string const &uri)
+{
+ int ret;
+ std::stringstream s_table;
+ s_table << WT_TABLE_CONFIG;
+ s_table << "internal_page_max=" << options.block_size << ",";
+ s_table << "leaf_page_max=" << options.block_size << ",";
+ if (options.compression == leveldb::kSnappyCompression)
+ s_table << "block_compressor=snappy,";
+ s_table << "lsm=(";
+ s_table << "chunk_size=" << options.write_buffer_size << ",";
+ if (options.filter_policy) {
+ int bits = ((FilterPolicyImpl *)options.filter_policy)->bits_per_key_;
+ s_table << "bloom_bit_count=" << bits << ",";
+ // Approximate the optimal number of hashes
+ s_table << "bloom_hash_count=" << (int)(0.6 * bits) << ",";
+ }
+ s_table << "),";
+ WT_SESSION *session;
+ std::string table_config = s_table.str();
+ if ((ret = conn->open_session(conn, NULL, NULL, &session)) != 0)
+ return (ret);
+ if ((ret = session->create(session, uri.c_str(), table_config.c_str())) != 0)
+ return (ret);
+ if ((ret = session->close(session, NULL)) != 0)
+ return (ret);
+
+ return (0);
+}
+
Status
leveldb::DB::Open(const Options &options, const std::string &name, leveldb::DB **dbptr)
{
@@ -263,31 +295,8 @@ leveldb::DB::Open(const Options &options, const std::string &name, leveldb::DB *
else if (ret != 0)
return WiredTigerErrorToStatus(ret, NULL);
- if (options.create_if_missing) {
- std::stringstream s_table;
- s_table << WT_TABLE_CONFIG;
- s_table << "internal_page_max=" << options.block_size << ",";
- s_table << "leaf_page_max=" << options.block_size << ",";
- if (options.compression == kSnappyCompression)
- s_table << "block_compressor=snappy,";
- s_table << "lsm=(";
- s_table << "chunk_size=" << options.write_buffer_size << ",";
- if (options.filter_policy) {
- int bits = ((FilterPolicyImpl *)options.filter_policy)->bits_per_key_;
- s_table << "bloom_bit_count=" << bits << ",";
- // Approximate the optimal number of hashes
- s_table << "bloom_hash_count=" << (int)(0.6 * bits) << ",";
- }
- s_table << "),";
- WT_SESSION *session;
- std::string table_config = s_table.str();
- if ((ret = conn->open_session(conn, NULL, NULL, &session)) != 0)
- goto err;
- if ((ret = session->create(session, WT_URI, table_config.c_str())) != 0)
- goto err;
- if ((ret = session->close(session, NULL)) != 0)
- goto err;
- }
+ if (options.create_if_missing)
+ ret = wtleveldb_create(conn, options, WT_URI);
if (ret != 0) {
err:
@@ -305,7 +314,7 @@ Status
DbImpl::Put(const WriteOptions& options,
const Slice& key, const Slice& value)
{
- WT_CURSOR *cursor = getCursor();
+ WT_CURSOR *cursor = GetCursor();
WT_ITEM item;
item.data = key.data();
@@ -315,7 +324,7 @@ DbImpl::Put(const WriteOptions& options,
item.size = value.size();
cursor->set_value(cursor, &item);
int ret = cursor->insert(cursor);
- releaseCursor(cursor);
+ ReleaseCursor(cursor);
return WiredTigerErrorToStatus(ret, NULL);
}
@@ -326,7 +335,7 @@ DbImpl::Put(const WriteOptions& options,
Status
DbImpl::Delete(const WriteOptions& options, const Slice& key)
{
- WT_CURSOR *cursor = getCursor();
+ WT_CURSOR *cursor = GetCursor();
WT_ITEM item;
item.data = key.data();
@@ -338,7 +347,7 @@ DbImpl::Delete(const WriteOptions& options, const Slice& key)
// failures on - it's not necessary for correct operation.
int t_ret = cursor->reset(cursor);
assert(t_ret == 0);
- releaseCursor(cursor);
+ ReleaseCursor(cursor);
return WiredTigerErrorToStatus(ret, NULL);
}
@@ -347,7 +356,7 @@ class WriteBatchHandler : public WriteBatch::Handler {
public:
WriteBatchHandler(WT_CURSOR *cursor) : cursor_(cursor), status_(0) {}
virtual ~WriteBatchHandler() {}
- int getWiredTigerStatus() { return status_; }
+ int GetWiredTigerStatus() { return status_; }
virtual void Put(const Slice& key, const Slice& value) {
WT_ITEM item;
@@ -386,7 +395,7 @@ Status
DbImpl::Write(const WriteOptions& options, WriteBatch* updates)
{
Status status = Status::OK();
- WT_CURSOR *cursor = getCursor();
+ WT_CURSOR *cursor = GetCursor();
WT_SESSION *session = cursor->session;
const char *errmsg = NULL;
int ret, t_ret;
@@ -399,7 +408,7 @@ DbImpl::Write(const WriteOptions& options, WriteBatch* updates)
WriteBatchHandler handler(cursor);
status = updates->Iterate(&handler);
- if ((ret = handler.getWiredTigerStatus()) != WT_DEADLOCK)
+ if ((ret = handler.GetWiredTigerStatus()) != WT_DEADLOCK)
break;
// Roll back the transaction on deadlock so we can try again
if ((ret = session->rollback_transaction(session, NULL)) != 0) {
@@ -414,7 +423,7 @@ DbImpl::Write(const WriteOptions& options, WriteBatch* updates)
ret = session->rollback_transaction(session, NULL);
err:
- releaseCursor(cursor);
+ ReleaseCursor(cursor);
if (status.ok() && ret != 0)
status = WiredTigerErrorToStatus(ret, NULL);
return status;
@@ -432,20 +441,20 @@ DbImpl::Get(const ReadOptions& options,
const Slice& key, std::string* value)
{
WT_CURSOR *cursor;
- WT_ITEM item;
const SnapshotImpl *si = NULL;
const char *errmsg = NULL;
// Read options can contain a snapshot for us to use
if (options.snapshot == NULL) {
- cursor = getCursor();
+ cursor = GetCursor();
} else {
si = static_cast<const SnapshotImpl *>(options.snapshot);
- if (!si->getStatus().ok())
- return si->getStatus();
- cursor = si->getCursor();
+ if (!si->GetStatus().ok())
+ return si->GetStatus();
+ cursor = si->GetCursor();
}
+ WT_ITEM item;
item.data = key.data();
item.size = key.size();
cursor->set_key(cursor, &item);
@@ -459,7 +468,7 @@ DbImpl::Get(const ReadOptions& options,
err:
// Release the cursor if we are not in a snapshot
if (si == NULL)
- releaseCursor(cursor);
+ ReleaseCursor(cursor);
return WiredTigerErrorToStatus(ret, errmsg);
}
@@ -482,12 +491,12 @@ DbImpl::Get(const ReadOptions& options,
// Read options can contain a snapshot for us to use
if (options.snapshot == NULL) {
- cursor = getCursor();
+ cursor = GetCursor();
} else {
si = static_cast<const SnapshotImpl *>(options.snapshot);
- if (!si->getStatus().ok())
- return si->getStatus();
- cursor = si->getCursor();
+ if (!si->GetStatus().ok())
+ return si->GetStatus();
+ cursor = si->GetCursor();
}
item.data = key.data();
@@ -503,7 +512,7 @@ DbImpl::Get(const ReadOptions& options,
err:
// Release the cursor if we are not in a snapshot
if (si == NULL)
- releaseCursor(cursor);
+ ReleaseCursor(cursor);
return WiredTigerErrorToStatus(ret, errmsg);
}
#endif
@@ -520,6 +529,11 @@ DbImpl::NewIterator(const ReadOptions& options)
return new IteratorImpl(this, options);
}
+SnapshotImpl::SnapshotImpl(DbImpl *db) :
+ Snapshot(), db_(db), context_(db->NewContext()), status_(Status::OK())
+{
+}
+
// Return a handle to the current DB state. Iterators created with
// this handle will all observe a stable snapshot of the current DB
// state. The caller must call ReleaseSnapshot(result) when the
@@ -528,7 +542,7 @@ const Snapshot *
DbImpl::GetSnapshot()
{
SnapshotImpl *snapshot = new SnapshotImpl(this);
- Status status = snapshot->setupTransaction();
+ Status status = snapshot->SetupTransaction();
if (!status.ok()) {
delete snapshot;
// TODO: Flag an error here?
@@ -545,7 +559,7 @@ DbImpl::ReleaseSnapshot(const Snapshot* snapshot)
SnapshotImpl *si =
static_cast<SnapshotImpl *>(const_cast<Snapshot *>(snapshot));
if (si != NULL) {
- si->releaseTransaction();
+ si->ReleaseTransaction();
delete si;
}
}
@@ -605,11 +619,11 @@ DbImpl::CompactRange(const Slice* begin, const Slice* end)
{
// The compact doesn't need a cursor, but the context always opens a
// cursor when opening the session - so grab that, and use the session.
- WT_CURSOR *cursor = getCursor();
+ WT_CURSOR *cursor = GetCursor();
WT_SESSION *session = cursor->session;
int ret = session->compact(session, WT_URI, NULL);
assert(ret == 0);
- releaseCursor(cursor);
+ ReleaseCursor(cursor);
}
// Suspends the background compaction thread. This methods
@@ -631,20 +645,20 @@ IteratorImpl::IteratorImpl(DbImpl *db, const ReadOptions &options) :
cursor_(NULL), db_(db), status_(Status::OK()), valid_(false)
{
if (options.snapshot == NULL) {
- cursor_ = db_->getCursor();
- snapshot_iterator_ = false;
+ cursor_ = db_->GetCursor();
+ own_cursor_ = true;
} else {
const SnapshotImpl *si =
static_cast<const SnapshotImpl *>(options.snapshot);
- cursor_ = si->getCursor();
- snapshot_iterator_ = true;
+ cursor_ = si->GetCursor();
+ own_cursor_ = false;
}
}
IteratorImpl::~IteratorImpl()
{
- if (!snapshot_iterator_)
- db_->releaseCursor(cursor_);
+ if (own_cursor_)
+ db_->ReleaseCursor(cursor_);
}
// Position at the first key in the source. The iterator is Valid()
@@ -829,19 +843,17 @@ IteratorImpl::Prev()
}
// Implementation for WiredTiger specific read snapshot
-Status SnapshotImpl::setupTransaction()
+Status SnapshotImpl::SetupTransaction()
{
- cursor_ = db_->getCursor();
- WT_SESSION *session = cursor_->session;
+ WT_SESSION *session = context_->GetSession();
int ret = session->begin_transaction(session, NULL);
return WiredTigerErrorToStatus(ret, NULL);
}
-Status SnapshotImpl::releaseTransaction()
+Status SnapshotImpl::ReleaseTransaction()
{
- WT_SESSION *session = cursor_->session;
+ WT_SESSION *session = context_->GetSession();
int ret = session->commit_transaction(session, NULL);
- db_->releaseCursor(cursor_);
return WiredTigerErrorToStatus(ret, NULL);
}
diff --git a/api/leveldb/leveldb_wt.h b/api/leveldb/leveldb_wt.h
index 32f4310dcd1..768f3854caf 100644
--- a/api/leveldb/leveldb_wt.h
+++ b/api/leveldb/leveldb_wt.h
@@ -107,11 +107,11 @@ public:
assert(ret == 0);
}
- T *get() {
+ T *Get() {
return (T *)(pthread_getspecific(key_));
}
- void set(T *value) {
+ void Set(T *value) {
int ret = pthread_setspecific(key_, value);
assert(ret == 0);
}
@@ -141,19 +141,34 @@ public:
#endif
}
- WT_CURSOR *getCursor();
- void releaseCursor(WT_CURSOR *cursor);
+ WT_CURSOR *GetCursor();
+#ifdef HAVE_ROCKSDB
+ WT_CURSOR *GetCursor(int i) {
+ return (i < cursors_.size()) ? cursors_[i] : NULL;
+ }
+ void SetCursor(int i, WT_CURSOR *c) {
+ if (i >= cursors_.size())
+ cursors_.resize(i + 1);
+ cursors_[i] = c;
+ }
+#endif
+ WT_SESSION *GetSession() { return session_; }
+ void ReleaseCursor(WT_CURSOR *cursor);
private:
WT_CONNECTION *conn_;
WT_SESSION *session_;
WT_CURSOR *cursor_;
+#ifdef HAVE_ROCKSDB
+ std::vector<WT_CURSOR *> cursors_;
+#endif
bool in_use_;
};
class IteratorImpl : public Iterator {
public:
IteratorImpl(DbImpl *db, const ReadOptions &options);
+ IteratorImpl(DbImpl *db, WT_CURSOR *cursor) : db_(db), cursor_(cursor), own_cursor_(true) {}
virtual ~IteratorImpl();
// An iterator is either positioned at a key/value pair, or
@@ -188,7 +203,7 @@ private:
Slice key_, value_;
Status status_;
bool valid_;
- bool snapshot_iterator_;
+ bool own_cursor_;
void SetError(int wiredTigerError) {
valid_ = false;
@@ -204,17 +219,17 @@ class SnapshotImpl : public Snapshot {
friend class DbImpl;
friend class IteratorImpl;
public:
- SnapshotImpl(DbImpl *db) :
- Snapshot(), db_(db), cursor_(NULL), status_(Status::OK()) {}
- virtual ~SnapshotImpl() {}
+ SnapshotImpl(DbImpl *db);
+ virtual ~SnapshotImpl() { delete context_; }
protected:
- WT_CURSOR *getCursor() const { return cursor_; }
- Status getStatus() const { return status_; }
- Status setupTransaction();
- Status releaseTransaction();
+ OperationContext *GetContext() const { return context_; }
+ WT_CURSOR *GetCursor() const { return context_->GetCursor(); }
+ Status GetStatus() const { return status_; }
+ Status SetupTransaction();
+ Status ReleaseTransaction();
private:
DbImpl *db_;
- WT_CURSOR *cursor_;
+ OperationContext *context_;
Status status_;
};
@@ -343,12 +358,19 @@ public:
private:
WT_CONNECTION *conn_;
ThreadLocal<OperationContext> *context_;
+#ifdef HAVE_ROCKSDB
+ int numColumns_;
+#endif
- OperationContext *getContext() {
- OperationContext *ctx = context_->get();
+ OperationContext *NewContext() {
+ return new OperationContext(conn_);
+ }
+
+ OperationContext *GetContext() {
+ OperationContext *ctx = context_->Get();
if (ctx == NULL) {
- ctx = new OperationContext(conn_);
- context_->set(ctx);
+ ctx = NewContext();
+ context_->Set(ctx);
}
return (ctx);
}
@@ -358,8 +380,8 @@ private:
void operator=(const DbImpl&);
protected:
- WT_CURSOR *getCursor() { return getContext()->getCursor(); }
- void releaseCursor(WT_CURSOR *cursor) { getContext()->releaseCursor(cursor); }
+ WT_CURSOR *GetCursor() { return GetContext()->GetCursor(); }
+ void ReleaseCursor(WT_CURSOR *cursor) { GetContext()->ReleaseCursor(cursor); }
};
#ifdef HAVE_ROCKSDB
@@ -368,13 +390,17 @@ protected:
// is done using the column family
class ColumnFamilyHandleImpl : public ColumnFamilyHandle {
public:
- ColumnFamilyHandleImpl(DbImpl* db, uint32_t id) : db_(db), id_(id) {}
+ ColumnFamilyHandleImpl(DbImpl* db, std::string const &name, uint32_t id) : db_(db), id_(id), name_(name) {}
virtual ~ColumnFamilyHandleImpl() {}
virtual uint32_t GetID() const { return id_; }
+ std::string const &GetName() const { return name_; }
+ std::string const GetURI() const { return "table:" + name_; }
+
private:
DbImpl* db_;
uint32_t id_;
+ std::string const name_;
};
#endif
diff --git a/api/leveldb/rocks_wt.cc b/api/leveldb/rocks_wt.cc
index 13fd96b734d..56a32c7f487 100644
--- a/api/leveldb/rocks_wt.cc
+++ b/api/leveldb/rocks_wt.cc
@@ -32,6 +32,8 @@
#include <sstream>
using leveldb::Cache;
+using leveldb::DB;
+using leveldb::FlushOptions;
using leveldb::FilterPolicy;
using leveldb::Iterator;
using leveldb::Options;
@@ -44,15 +46,25 @@ using leveldb::Snapshot;
using leveldb::Status;
Status
-rocksdb::DB::ListColumnFamilies(rocksdb::Options const&, std::string const&, std::vector<std::string, std::allocator<std::string> >*)
+DB::ListColumnFamilies(Options const &, std::string const &, std::vector<std::string> *)
{
return WiredTigerErrorToStatus(ENOTSUP);
}
Status
-rocksdb::DB::Open(rocksdb::Options const&, std::string const&, std::vector<rocksdb::ColumnFamilyDescriptor, std::allocator<rocksdb::ColumnFamilyDescriptor> > const&, std::vector<rocksdb::ColumnFamilyHandle*, std::allocator<rocksdb::ColumnFamilyHandle*> >*, rocksdb::DB**)
+DB::Open(Options const &options, std::string const &name, const std::vector<ColumnFamilyDescriptor> &column_families, std::vector<ColumnFamilyHandle*> *handles, DB**dbptr)
{
- return WiredTigerErrorToStatus(ENOTSUP);
+ Status status = Open(options, name, dbptr);
+ if (!status.ok())
+ return status;
+ DbImpl *db = reinterpret_cast<DbImpl *>(dbptr);
+ std::vector<ColumnFamilyHandle*> cfhandles(
+ column_families.size());
+ for (size_t i = 0; i < column_families.size(); i++)
+ cfhandles[i] = new ColumnFamilyHandleImpl(
+ db, column_families[i].name, (int)i);
+ *handles = cfhandles;
+ return Status::OK();
}
void
@@ -66,49 +78,117 @@ WriteBatch::Handler::LogData(const Slice& blob)
}
Status
-DbImpl::Merge(rocksdb::WriteOptions const&, rocksdb::ColumnFamilyHandle*, rocksdb::Slice const&, rocksdb::Slice const&)
+DbImpl::Merge(WriteOptions const&, ColumnFamilyHandle*, Slice const&, Slice const&)
{
return WiredTigerErrorToStatus(ENOTSUP);
}
Status
-DbImpl::CreateColumnFamily(rocksdb::Options const&, std::string const&, rocksdb::ColumnFamilyHandle**)
+DbImpl::CreateColumnFamily(Options const &options, std::string const &name, ColumnFamilyHandle **cfhp)
{
- return WiredTigerErrorToStatus(ENOTSUP);
+ extern int wtleveldb_create(WT_CONNECTION *,
+ const Options &, std::string const &uri);
+ int ret = wtleveldb_create(conn_, options, "table:" + name);
+ if (ret != 0)
+ return WiredTigerErrorToStatus(ret);
+ *cfhp = new ColumnFamilyHandleImpl(this, name, ++numColumns_);
+ return Status::OK();
}
Status
-DbImpl::DropColumnFamily(rocksdb::ColumnFamilyHandle*)
+DbImpl::DropColumnFamily(ColumnFamilyHandle *cfhp)
{
- return WiredTigerErrorToStatus(ENOTSUP);
+ ColumnFamilyHandleImpl *cf =
+ reinterpret_cast<ColumnFamilyHandleImpl *>(cfhp);
+ WT_SESSION *session = GetContext()->GetSession();
+ int ret = session->drop(session, cf->GetURI().c_str(), NULL);
+ return WiredTigerErrorToStatus(ret);
+}
+
+static int
+wtrocks_get_cursor(OperationContext *context, ColumnFamilyHandle *cfhp, WT_CURSOR **cursorp)
+{
+ ColumnFamilyHandleImpl *cf =
+ reinterpret_cast<ColumnFamilyHandleImpl *>(cfhp);
+ WT_CURSOR *c = context->GetCursor(cf->GetID());
+ if (c == NULL) {
+ WT_SESSION *session = context->GetSession();
+ int ret;
+ if ((ret = session->open_cursor(
+ session, cf->GetURI().c_str(), NULL, NULL, &c)) != 0)
+ return (ret);
+ context->SetCursor(cf->GetID(), c);
+ }
+ *cursorp = c;
+ return (0);
}
Status
-DbImpl::Delete(rocksdb::WriteOptions const&, rocksdb::ColumnFamilyHandle*, rocksdb::Slice const&)
+DbImpl::Delete(WriteOptions const &write_options, ColumnFamilyHandle *cfhp, Slice const &key)
{
- return WiredTigerErrorToStatus(ENOTSUP);
+ WT_CURSOR *cursor;
+ int ret = wtrocks_get_cursor(GetContext(), cfhp, &cursor);
+ if (ret != 0)
+ return WiredTigerErrorToStatus(ret);
+ WT_ITEM item;
+ item.data = key.data();
+ item.size = key.size();
+ ret = cursor->remove(cursor);
+ // Reset the WiredTiger cursor so it doesn't keep any pages pinned.
+ // Track failures in debug builds since we don't expect failure, but
+ // don't pass failures on - it's not necessary for correct operation.
+ int t_ret = cursor->reset(cursor);
+ assert(t_ret == 0);
+ return WiredTigerErrorToStatus(ret);
}
Status
-DbImpl::Flush(rocksdb::FlushOptions const&, rocksdb::ColumnFamilyHandle*)
+DbImpl::Flush(FlushOptions const&, ColumnFamilyHandle*)
{
return WiredTigerErrorToStatus(ENOTSUP);
}
Status
-DbImpl::Get(rocksdb::ReadOptions const&, rocksdb::ColumnFamilyHandle*, rocksdb::Slice const&, std::string*)
+DbImpl::Get(ReadOptions const &options, ColumnFamilyHandle *cfhp, Slice const &key, std::string *value)
{
- return WiredTigerErrorToStatus(ENOTSUP);
+ const char *errmsg = NULL;
+ OperationContext *context = NULL;
+ // Read options can contain a snapshot for us to use
+ if (options.snapshot == NULL) {
+ context = GetContext();
+ } else {
+ const SnapshotImpl *si =
+ reinterpret_cast<const SnapshotImpl *>(options.snapshot);
+ if (!si->GetStatus().ok())
+ return si->GetStatus();
+ context = si->GetContext();
+ }
+
+ WT_CURSOR *cursor;
+ int ret = wtrocks_get_cursor(context, cfhp, &cursor);
+ if (ret != 0)
+ return WiredTigerErrorToStatus(ret);
+
+ WT_ITEM item;
+ item.data = key.data();
+ item.size = key.size();
+ cursor->set_key(cursor, &item);
+ if ((ret = cursor->search(cursor)) == 0 &&
+ (ret = cursor->get_value(cursor, &item)) == 0)
+ *value = std::string((const char *)item.data, item.size);
+ if (ret == WT_NOTFOUND)
+ errmsg = "DB::Get key not found";
+ return WiredTigerErrorToStatus(ret, errmsg);
}
bool
-DbImpl::GetProperty(rocksdb::ColumnFamilyHandle*, rocksdb::Slice const&, std::string*)
+DbImpl::GetProperty(ColumnFamilyHandle*, Slice const&, std::string*)
{
return false;
}
std::vector<Status>
-DbImpl::MultiGet(rocksdb::ReadOptions const&, std::vector<rocksdb::ColumnFamilyHandle*, std::allocator<rocksdb::ColumnFamilyHandle*> > const&, std::vector<rocksdb::Slice, std::allocator<rocksdb::Slice> > const&, std::vector<std::string, std::allocator<std::string> >*)
+DbImpl::MultiGet(ReadOptions const&, std::vector<ColumnFamilyHandle*> const&, std::vector<Slice> const&, std::vector<std::string, std::allocator<std::string> >*)
{
std::vector<Status> ret;
ret.push_back(WiredTigerErrorToStatus(ENOTSUP));
@@ -116,13 +196,41 @@ DbImpl::MultiGet(rocksdb::ReadOptions const&, std::vector<rocksdb::ColumnFamilyH
}
Iterator *
-DbImpl::NewIterator(rocksdb::ReadOptions const&, rocksdb::ColumnFamilyHandle*)
+DbImpl::NewIterator(ReadOptions const &options, ColumnFamilyHandle *cfhp)
{
- return NULL;
+ OperationContext *context = NULL;
+ // Read options can contain a snapshot for us to use
+ if (options.snapshot == NULL) {
+ context = GetContext();
+ } else {
+ const SnapshotImpl *si =
+ reinterpret_cast<const SnapshotImpl *>(options.snapshot);
+ if (!si->GetStatus().ok())
+ return NULL;
+ context = si->GetContext();
+ }
+
+ WT_CURSOR *cursor;
+ int ret = wtrocks_get_cursor(context, cfhp, &cursor);
+ if (ret != 0)
+ return NULL;
+
+ return new IteratorImpl(this, cursor);
}
Status
-DbImpl::Put(rocksdb::WriteOptions const&, rocksdb::ColumnFamilyHandle*, rocksdb::Slice const&, rocksdb::Slice const&)
+DbImpl::Put(WriteOptions const &options, ColumnFamilyHandle *cfhp, Slice const &key, Slice const &value)
{
- return WiredTigerErrorToStatus(ENOTSUP);
+ WT_CURSOR *cursor;
+ int ret = wtrocks_get_cursor(GetContext(), cfhp, &cursor);
+ WT_ITEM item;
+
+ item.data = key.data();
+ item.size = key.size();
+ cursor->set_key(cursor, &item);
+ item.data = value.data();
+ item.size = value.size();
+ cursor->set_value(cursor, &item);
+ ret = cursor->insert(cursor);
+ return WiredTigerErrorToStatus(ret, NULL);
}