summaryrefslogtreecommitdiff
path: root/api/leveldb
diff options
context:
space:
mode:
authorMichael Cahill <michael.cahill@wiredtiger.com>2014-07-01 16:19:34 +1000
committerMichael Cahill <michael.cahill@wiredtiger.com>2014-07-01 16:19:34 +1000
commit8655a624b88689f86adbb6b2bf5f3f578cbb4092 (patch)
tree16f2ec111f242289dcc2f2f36631fbea505025ff /api/leveldb
parent4b029ff53b194642205aa417a0211f9ce3f1e614 (diff)
downloadmongo-8655a624b88689f86adbb6b2bf5f3f578cbb4092.tar.gz
Simplify the context for operations in the LevelDB API: always open sessions for snapshots, and duplicate cursors for iterators, so there is no need to track whether the normal versions are in use.
While in the area, always reset cursors after Get operations: the data is copied out, so there is no need for the cursor to stay positioned.
Diffstat (limited to 'api/leveldb')
-rw-r--r--api/leveldb/leveldb_wt.cc170
-rw-r--r--api/leveldb/leveldb_wt.h35
-rw-r--r--api/leveldb/rocks_wt.cc40
3 files changed, 79 insertions, 166 deletions
diff --git a/api/leveldb/leveldb_wt.cc b/api/leveldb/leveldb_wt.cc
index eed510a0be0..10543676a84 100644
--- a/api/leveldb/leveldb_wt.cc
+++ b/api/leveldb/leveldb_wt.cc
@@ -195,41 +195,6 @@ const FilterPolicy *NewBloomFilterPolicy2(int bits_per_key) {
Cache::~Cache() {}
}
-// Return a cursor for the current operation to use. In the "normal" case
-// we will return the cursor opened when the OperationContext was created.
-// If the thread this OperationContext belongs to requires more than one
-// cursor (for example they start a read snapshot while doing updates), we
-// open a new session/cursor for each parallel operation.
-WT_CURSOR *OperationContext::GetCursor()
-{
- int ret;
- if (!in_use_) {
- in_use_ = true;
- return cursor_;
- } else {
- WT_SESSION *session;
- WT_CURSOR *cursor;
- if ((ret = conn_->open_session(
- conn_, NULL, "isolation=snapshot", &session)) != 0)
- return NULL;
- if ((ret = session->open_cursor(
- session, WT_URI, NULL, NULL, &cursor)) != 0)
- return NULL;
- return cursor;
- }
-}
-
-void OperationContext::ReleaseCursor(WT_CURSOR *cursor)
-{
- if (cursor == cursor_)
- in_use_ = false;
- else {
- WT_SESSION *session = cursor->session;
- int ret = session->close(session, NULL);
- assert(ret == 0);
- }
-}
-
int
wtleveldb_create(
WT_CONNECTION *conn, const Options &options, std::string const &uri)
@@ -314,7 +279,7 @@ Status
DbImpl::Put(const WriteOptions& options,
const Slice& key, const Slice& value)
{
- WT_CURSOR *cursor = GetCursor();
+ WT_CURSOR *cursor = GetContext()->GetCursor();
WT_ITEM item;
item.data = key.data();
@@ -324,7 +289,6 @@ DbImpl::Put(const WriteOptions& options,
item.size = value.size();
cursor->set_value(cursor, &item);
int ret = cursor->insert(cursor);
- ReleaseCursor(cursor);
return WiredTigerErrorToStatus(ret, NULL);
}
@@ -335,7 +299,7 @@ DbImpl::Put(const WriteOptions& options,
Status
DbImpl::Delete(const WriteOptions& options, const Slice& key)
{
- WT_CURSOR *cursor = GetCursor();
+ WT_CURSOR *cursor = GetContext()->GetCursor();
WT_ITEM item;
item.data = key.data();
@@ -347,44 +311,45 @@ DbImpl::Delete(const WriteOptions& options, const Slice& key)
// failures on - it's not necessary for correct operation.
int t_ret = cursor->reset(cursor);
assert(t_ret == 0);
- ReleaseCursor(cursor);
return WiredTigerErrorToStatus(ret, NULL);
}
// Implement WriteBatch::Handler
class WriteBatchHandler : public WriteBatch::Handler {
public:
- WriteBatchHandler(WT_CURSOR *cursor) : cursor_(cursor), status_(0) {}
+ WriteBatchHandler(OperationContext *context) : context_(context), status_(0) {}
virtual ~WriteBatchHandler() {}
int GetWiredTigerStatus() { return status_; }
virtual void Put(const Slice& key, const Slice& value) {
+ WT_CURSOR *cursor = context_->GetCursor();
WT_ITEM item;
item.data = key.data();
item.size = key.size();
- cursor_->set_key(cursor_, &item);
+ cursor->set_key(cursor, &item);
item.data = value.data();
item.size = value.size();
- cursor_->set_value(cursor_, &item);
- int ret = cursor_->insert(cursor_);
+ cursor->set_value(cursor, &item);
+ int ret = cursor->insert(cursor);
if (ret != 0 && status_ == 0)
status_ = ret;
}
virtual void Delete(const Slice& key) {
+ WT_CURSOR *cursor = context_->GetCursor();
WT_ITEM item;
item.data = key.data();
item.size = key.size();
- cursor_->set_key(cursor_, &item);
- int ret = cursor_->remove(cursor_);
+ cursor->set_key(cursor, &item);
+ int ret = cursor->remove(cursor);
if (ret != 0 && status_ == 0)
status_ = ret;
}
private:
- WT_CURSOR *cursor_;
+ OperationContext *context_;
int status_;
};
@@ -395,8 +360,8 @@ Status
DbImpl::Write(const WriteOptions& options, WriteBatch* updates)
{
Status status = Status::OK();
- WT_CURSOR *cursor = GetCursor();
- WT_SESSION *session = cursor->session;
+ OperationContext *context = GetContext();
+ WT_SESSION *session = context->GetSession();
const char *errmsg = NULL;
int ret, t_ret;
@@ -406,7 +371,7 @@ DbImpl::Write(const WriteOptions& options, WriteBatch* updates)
goto err;
}
- WriteBatchHandler handler(cursor);
+ WriteBatchHandler handler(context);
status = updates->Iterate(&handler);
if ((ret = handler.GetWiredTigerStatus()) != WT_DEADLOCK)
break;
@@ -423,7 +388,6 @@ DbImpl::Write(const WriteOptions& options, WriteBatch* updates)
ret = session->rollback_transaction(session, NULL);
err:
- ReleaseCursor(cursor);
if (status.ok() && ret != 0)
status = WiredTigerErrorToStatus(ret, NULL);
return status;
@@ -440,20 +404,10 @@ Status
DbImpl::Get(const ReadOptions& options,
const Slice& key, std::string* value)
{
- WT_CURSOR *cursor;
+ WT_CURSOR *cursor = GetContext(options)->GetCursor();
const SnapshotImpl *si = NULL;
const char *errmsg = NULL;
- // Read options can contain a snapshot for us to use
- if (options.snapshot == NULL) {
- cursor = GetCursor();
- } else {
- si = static_cast<const SnapshotImpl *>(options.snapshot);
- if (!si->GetStatus().ok())
- return si->GetStatus();
- cursor = si->GetCursor();
- }
-
WT_ITEM item;
item.data = key.data();
item.size = key.size();
@@ -461,14 +415,14 @@ DbImpl::Get(const ReadOptions& options,
int ret = cursor->search(cursor);
if (ret == 0) {
ret = cursor->get_value(cursor, &item);
- if (ret == 0)
+ if (ret == 0) {
+ // Make a copy of the value to return, then the cursor can be reset
*value = std::string((const char *)item.data, item.size);
+ ret = cursor->reset(cursor);
+ }
} else if (ret == WT_NOTFOUND)
errmsg = "DB::Get key not found";
err:
- // Release the cursor if we are not in a snapshot
- if (si == NULL)
- ReleaseCursor(cursor);
return WiredTigerErrorToStatus(ret, errmsg);
}
@@ -484,35 +438,24 @@ Status
DbImpl::Get(const ReadOptions& options,
const Slice& key, Value* value)
{
- WT_CURSOR *cursor;
- WT_ITEM item;
- const SnapshotImpl *si = NULL;
const char *errmsg = NULL;
- // Read options can contain a snapshot for us to use
- if (options.snapshot == NULL) {
- cursor = GetCursor();
- } else {
- si = static_cast<const SnapshotImpl *>(options.snapshot);
- if (!si->GetStatus().ok())
- return si->GetStatus();
- cursor = si->GetCursor();
- }
-
+ WT_CURSOR *cursor = GetContext(options)->GetCursor();
+ WT_ITEM item;
item.data = key.data();
item.size = key.size();
cursor->set_key(cursor, &item);
int ret = cursor->search(cursor);
if (ret == 0) {
ret = cursor->get_value(cursor, &item);
- if (ret == 0)
+ if (ret == 0) {
+ // This call makes a copy, reset the cursor afterwards.
value->assign((const char *)item.data, item.size);
+ ret = cursor->reset(cursor);
+ }
} else if (ret == WT_NOTFOUND)
errmsg = "DB::Get key not found";
err:
- // Release the cursor if we are not in a snapshot
- if (si == NULL)
- ReleaseCursor(cursor);
return WiredTigerErrorToStatus(ret, errmsg);
}
#endif
@@ -526,7 +469,15 @@ err:
Iterator *
DbImpl::NewIterator(const ReadOptions& options)
{
- return new IteratorImpl(this, options);
+ OperationContext *context = GetContext(options);
+
+ /* Duplicate the normal cursor for the iterator. */
+ WT_SESSION *session = context->GetSession();
+ WT_CURSOR *c = context->GetCursor();
+ WT_CURSOR *iterc;
+ int ret = session->open_cursor(session, NULL, c, NULL, &iterc);
+ assert(ret == 0);
+ return new IteratorImpl(this, iterc);
}
SnapshotImpl::SnapshotImpl(DbImpl *db) :
@@ -542,12 +493,9 @@ const Snapshot *
DbImpl::GetSnapshot()
{
SnapshotImpl *snapshot = new SnapshotImpl(this);
- Status status = snapshot->SetupTransaction();
- if (!status.ok()) {
- delete snapshot;
- // TODO: Flag an error here?
- return NULL;
- }
+ WT_SESSION *session = snapshot->GetContext()->GetSession();
+ int ret = session->begin_transaction(session, NULL);
+ assert(ret == 0);
return snapshot;
}
@@ -559,7 +507,10 @@ DbImpl::ReleaseSnapshot(const Snapshot* snapshot)
SnapshotImpl *si =
static_cast<SnapshotImpl *>(const_cast<Snapshot *>(snapshot));
if (si != NULL) {
- si->ReleaseTransaction();
+ // We started a transaction: we could commit it here, but it will be rolled
+ // back automatically by closing the session, which we have to do anyway.
+ int ret = si->GetContext()->Close();
+ assert(ret == 0);
delete si;
}
}
@@ -619,11 +570,10 @@ DbImpl::CompactRange(const Slice* begin, const Slice* end)
{
// The compact doesn't need a cursor, but the context always opens a
// cursor when opening the session - so grab that, and use the session.
- WT_CURSOR *cursor = GetCursor();
+ WT_CURSOR *cursor = GetContext()->GetCursor();
WT_SESSION *session = cursor->session;
int ret = session->compact(session, WT_URI, NULL);
assert(ret == 0);
- ReleaseCursor(cursor);
}
// Suspends the background compaction thread. This methods
@@ -641,24 +591,12 @@ DbImpl::ResumeCompactions()
/* Not supported */
}
-IteratorImpl::IteratorImpl(DbImpl *db, const ReadOptions &options) :
- cursor_(NULL), db_(db), status_(Status::OK()), valid_(false)
-{
- if (options.snapshot == NULL) {
- cursor_ = db_->GetCursor();
- own_cursor_ = true;
- } else {
- const SnapshotImpl *si =
- static_cast<const SnapshotImpl *>(options.snapshot);
- cursor_ = si->GetCursor();
- own_cursor_ = false;
- }
-}
-
IteratorImpl::~IteratorImpl()
{
- if (own_cursor_)
- db_->ReleaseCursor(cursor_);
+ if (cursor_ != NULL) {
+ int ret = cursor_->close(cursor_);
+ assert(ret == 0);
+ }
}
// Position at the first key in the source. The iterator is Valid()
@@ -841,19 +779,3 @@ IteratorImpl::Prev()
value_ = Slice((const char *)item.data, item.size);
valid_ = true;
}
-
-// Implementation for WiredTiger specific read snapshot
-Status SnapshotImpl::SetupTransaction()
-{
- WT_SESSION *session = context_->GetSession();
- int ret = session->begin_transaction(session, NULL);
- return WiredTigerErrorToStatus(ret, NULL);
-}
-
-Status SnapshotImpl::ReleaseTransaction()
-{
- WT_SESSION *session = context_->GetSession();
- int ret = session->commit_transaction(session, NULL);
-
- return WiredTigerErrorToStatus(ret, NULL);
-}
diff --git a/api/leveldb/leveldb_wt.h b/api/leveldb/leveldb_wt.h
index 768f3854caf..462b1fcc865 100644
--- a/api/leveldb/leveldb_wt.h
+++ b/api/leveldb/leveldb_wt.h
@@ -126,7 +126,7 @@ class DbImpl;
/* Context for operations (including snapshots, write batches, transactions) */
class OperationContext {
public:
- OperationContext(WT_CONNECTION *conn) : conn_(conn), in_use_(false) {
+ OperationContext(WT_CONNECTION *conn) {
int ret = conn->open_session(conn, NULL, "isolation=snapshot", &session_);
assert(ret == 0);
ret = session_->open_cursor(
@@ -136,12 +136,20 @@ public:
~OperationContext() {
#ifdef WANT_SHUTDOWN_RACES
- int ret = session_->close(session_, NULL);
+ int ret = Close();
assert(ret == 0);
#endif
}
- WT_CURSOR *GetCursor();
+ int Close() {
+ int ret = 0;
+ if (session_ != NULL)
+ ret = session_->close(session_, NULL);
+ session_ = NULL;
+ return (ret);
+ }
+
+ WT_CURSOR *GetCursor() { return cursor_; }
#ifdef HAVE_ROCKSDB
WT_CURSOR *GetCursor(int i) {
return (i < cursors_.size()) ? cursors_[i] : NULL;
@@ -153,21 +161,17 @@ public:
}
#endif
WT_SESSION *GetSession() { return session_; }
- void ReleaseCursor(WT_CURSOR *cursor);
private:
- WT_CONNECTION *conn_;
WT_SESSION *session_;
WT_CURSOR *cursor_;
#ifdef HAVE_ROCKSDB
std::vector<WT_CURSOR *> cursors_;
#endif
- bool in_use_;
};
class IteratorImpl : public Iterator {
public:
- IteratorImpl(DbImpl *db, const ReadOptions &options);
IteratorImpl(DbImpl *db, WT_CURSOR *cursor) : db_(db), cursor_(cursor), own_cursor_(true) {}
virtual ~IteratorImpl();
@@ -223,10 +227,8 @@ public:
virtual ~SnapshotImpl() { delete context_; }
protected:
OperationContext *GetContext() const { return context_; }
- WT_CURSOR *GetCursor() const { return context_->GetCursor(); }
Status GetStatus() const { return status_; }
Status SetupTransaction();
- Status ReleaseTransaction();
private:
DbImpl *db_;
OperationContext *context_;
@@ -375,13 +377,20 @@ private:
return (ctx);
}
+ OperationContext *GetContext(const ReadOptions &options) {
+ if (options.snapshot == NULL)
+ return GetContext();
+ else {
+ const SnapshotImpl *si =
+ static_cast<const SnapshotImpl *>(options.snapshot);
+ assert(si->GetStatus().ok());
+ return si->GetContext();
+ }
+ }
+
// No copying allowed
DbImpl(const DbImpl&);
void operator=(const DbImpl&);
-
-protected:
- WT_CURSOR *GetCursor() { return GetContext()->GetCursor(); }
- void ReleaseCursor(WT_CURSOR *cursor) { GetContext()->ReleaseCursor(cursor); }
};
#ifdef HAVE_ROCKSDB
diff --git a/api/leveldb/rocks_wt.cc b/api/leveldb/rocks_wt.cc
index 56a32c7f487..f690e3e8221 100644
--- a/api/leveldb/rocks_wt.cc
+++ b/api/leveldb/rocks_wt.cc
@@ -152,17 +152,7 @@ Status
DbImpl::Get(ReadOptions const &options, ColumnFamilyHandle *cfhp, Slice const &key, std::string *value)
{
const char *errmsg = NULL;
- OperationContext *context = NULL;
- // Read options can contain a snapshot for us to use
- if (options.snapshot == NULL) {
- context = GetContext();
- } else {
- const SnapshotImpl *si =
- reinterpret_cast<const SnapshotImpl *>(options.snapshot);
- if (!si->GetStatus().ok())
- return si->GetStatus();
- context = si->GetContext();
- }
+ OperationContext *context = GetContext(options);
WT_CURSOR *cursor;
int ret = wtrocks_get_cursor(context, cfhp, &cursor);
@@ -198,24 +188,16 @@ DbImpl::MultiGet(ReadOptions const&, std::vector<ColumnFamilyHandle*> const&, st
Iterator *
DbImpl::NewIterator(ReadOptions const &options, ColumnFamilyHandle *cfhp)
{
- OperationContext *context = NULL;
- // Read options can contain a snapshot for us to use
- if (options.snapshot == NULL) {
- context = GetContext();
- } else {
- const SnapshotImpl *si =
- reinterpret_cast<const SnapshotImpl *>(options.snapshot);
- if (!si->GetStatus().ok())
- return NULL;
- context = si->GetContext();
- }
-
- WT_CURSOR *cursor;
- int ret = wtrocks_get_cursor(context, cfhp, &cursor);
- if (ret != 0)
- return NULL;
-
- return new IteratorImpl(this, cursor);
+ OperationContext *context = GetContext(options);
+
+ /* Duplicate the normal cursor for the iterator. */
+ WT_SESSION *session = context->GetSession();
+ WT_CURSOR *c, *iterc;
+ int ret = wtrocks_get_cursor(context, cfhp, &c);
+ assert(ret == 0);
+ ret = session->open_cursor(session, NULL, c, NULL, &iterc);
+ assert(ret == 0);
+ return new IteratorImpl(this, iterc);
}
Status