summaryrefslogtreecommitdiff
path: root/api/leveldb/leveldb_wt.cc
diff options
context:
space:
mode:
Diffstat (limited to 'api/leveldb/leveldb_wt.cc')
-rw-r--r--api/leveldb/leveldb_wt.cc95
1 files changed, 48 insertions, 47 deletions
diff --git a/api/leveldb/leveldb_wt.cc b/api/leveldb/leveldb_wt.cc
index 10543676a84..2489b549c5e 100644
--- a/api/leveldb/leveldb_wt.cc
+++ b/api/leveldb/leveldb_wt.cc
@@ -239,10 +239,12 @@ leveldb::DB::Open(const Options &options, const std::string &name, leveldb::DB *
}
if (options.error_if_exists)
s_conn << "exclusive,";
+#if 0
if (options.compression == kSnappyCompression)
s_conn << "extensions=[libwiredtiger_snappy.so],";
+#endif
size_t cache_size = 2 * options.write_buffer_size;
- cache_size += options.max_open_files * (4 << 20);
+ cache_size += (size_t)options.max_open_files * (4 << 20);
if (options.block_cache)
cache_size += ((CacheImpl *)options.block_cache)->capacity_;
else
@@ -251,7 +253,7 @@ leveldb::DB::Open(const Options &options, const std::string &name, leveldb::DB *
std::string conn_config = s_conn.str();
WT_CONNECTION *conn;
- fprintf(stderr,"Open: Home %s config %s\r\n",name.c_str(),conn_config.c_str());
+ printf("Open: home %s config %s\r\n",name.c_str(),conn_config.c_str());
int ret = ::wiredtiger_open(name.c_str(), NULL, conn_config.c_str(), &conn);
if (ret == ENOENT)
return Status::NotFound(Slice("Database does not exist."));
@@ -314,44 +316,33 @@ DbImpl::Delete(const WriteOptions& options, const Slice& key)
return WiredTigerErrorToStatus(ret, NULL);
}
-// Implement WriteBatch::Handler
-class WriteBatchHandler : public WriteBatch::Handler {
-public:
- WriteBatchHandler(OperationContext *context) : context_(context), status_(0) {}
- virtual ~WriteBatchHandler() {}
- int GetWiredTigerStatus() { return status_; }
-
- virtual void Put(const Slice& key, const Slice& value) {
- WT_CURSOR *cursor = context_->GetCursor();
- WT_ITEM item;
-
- item.data = key.data();
- item.size = key.size();
- cursor->set_key(cursor, &item);
- item.data = value.data();
- item.size = value.size();
- cursor->set_value(cursor, &item);
- int ret = cursor->insert(cursor);
- if (ret != 0 && status_ == 0)
- status_ = ret;
- }
+void
+WriteBatchHandler::Put(const Slice& key, const Slice& value) {
+ WT_CURSOR *cursor = context_->GetCursor();
+ WT_ITEM item;
- virtual void Delete(const Slice& key) {
- WT_CURSOR *cursor = context_->GetCursor();
- WT_ITEM item;
+ item.data = key.data();
+ item.size = key.size();
+ cursor->set_key(cursor, &item);
+ item.data = value.data();
+ item.size = value.size();
+ cursor->set_value(cursor, &item);
+ int ret = cursor->insert(cursor);
+ if (ret != 0 && status_ == 0)
+ status_ = ret;
+}
- item.data = key.data();
- item.size = key.size();
- cursor->set_key(cursor, &item);
- int ret = cursor->remove(cursor);
- if (ret != 0 && status_ == 0)
- status_ = ret;
- }
+void WriteBatchHandler::Delete(const Slice& key) {
+ WT_CURSOR *cursor = context_->GetCursor();
+ WT_ITEM item;
-private:
- OperationContext *context_;
- int status_;
-};
+ item.data = key.data();
+ item.size = key.size();
+ cursor->set_key(cursor, &item);
+ int ret = cursor->remove(cursor);
+ if (ret != 0 && status_ == 0)
+ status_ = ret;
+}
// Apply the specified updates to the database.
// Returns OK on success, non-OK on failure.
@@ -371,9 +362,14 @@ DbImpl::Write(const WriteOptions& options, WriteBatch* updates)
goto err;
}
- WriteBatchHandler handler(context);
- status = updates->Iterate(&handler);
- if ((ret = handler.GetWiredTigerStatus()) != WT_DEADLOCK)
+ WriteBatchHandler handler(this, context);
+ try {
+ status = updates->Iterate(&handler);
+ } catch(...) {
+ (void)session->rollback_transaction(session, NULL);
+ throw;
+ }
+ if (!status.ok() || (ret = handler.GetWiredTigerStatus()) != WT_DEADLOCK)
break;
// Roll back the transaction on deadlock so we can try again
if ((ret = session->rollback_transaction(session, NULL)) != 0) {
@@ -382,10 +378,13 @@ DbImpl::Write(const WriteOptions& options, WriteBatch* updates)
}
}
- if (status.ok() && ret == 0)
+ if (status.ok() && ret == 0) {
ret = session->commit_transaction(session, NULL);
- else if (ret == 0)
- ret = session->rollback_transaction(session, NULL);
+ } else {
+ t_ret = session->rollback_transaction(session, NULL);
+ if (ret == 0)
+ ret = t_ret;
+ }
err:
if (status.ok() && ret != 0)
@@ -475,7 +474,8 @@ DbImpl::NewIterator(const ReadOptions& options)
WT_SESSION *session = context->GetSession();
WT_CURSOR *c = context->GetCursor();
WT_CURSOR *iterc;
- int ret = session->open_cursor(session, NULL, c, NULL, &iterc);
+ /* XXX would like a fast duplicate for LSM cursors without position. */
+ int ret = session->open_cursor(session, c->uri, NULL, NULL, &iterc);
assert(ret == 0);
return new IteratorImpl(this, iterc);
}
@@ -492,11 +492,11 @@ SnapshotImpl::SnapshotImpl(DbImpl *db) :
const Snapshot *
DbImpl::GetSnapshot()
{
- SnapshotImpl *snapshot = new SnapshotImpl(this);
- WT_SESSION *session = snapshot->GetContext()->GetSession();
+ SnapshotImpl *si = new SnapshotImpl(this);
+ WT_SESSION *session = si->GetContext()->GetSession();
int ret = session->begin_transaction(session, NULL);
assert(ret == 0);
- return snapshot;
+ return si;
}
// Release a previously acquired snapshot. The caller must not
@@ -507,6 +507,7 @@ DbImpl::ReleaseSnapshot(const Snapshot* snapshot)
SnapshotImpl *si =
static_cast<SnapshotImpl *>(const_cast<Snapshot *>(snapshot));
if (si != NULL) {
+ WT_SESSION *session = si->GetContext()->GetSession();
// We started a transaction: we could commit it here, but it will be rolled
// back automatically by closing the session, which we have to do anyway.
int ret = si->GetContext()->Close();