diff options
Diffstat (limited to 'api/leveldb/leveldb_wt.cc')
-rw-r--r-- | api/leveldb/leveldb_wt.cc | 95 |
1 files changed, 48 insertions, 47 deletions
diff --git a/api/leveldb/leveldb_wt.cc b/api/leveldb/leveldb_wt.cc index 10543676a84..2489b549c5e 100644 --- a/api/leveldb/leveldb_wt.cc +++ b/api/leveldb/leveldb_wt.cc @@ -239,10 +239,12 @@ leveldb::DB::Open(const Options &options, const std::string &name, leveldb::DB * } if (options.error_if_exists) s_conn << "exclusive,"; +#if 0 if (options.compression == kSnappyCompression) s_conn << "extensions=[libwiredtiger_snappy.so],"; +#endif size_t cache_size = 2 * options.write_buffer_size; - cache_size += options.max_open_files * (4 << 20); + cache_size += (size_t)options.max_open_files * (4 << 20); if (options.block_cache) cache_size += ((CacheImpl *)options.block_cache)->capacity_; else @@ -251,7 +253,7 @@ leveldb::DB::Open(const Options &options, const std::string &name, leveldb::DB * std::string conn_config = s_conn.str(); WT_CONNECTION *conn; - fprintf(stderr,"Open: Home %s config %s\r\n",name.c_str(),conn_config.c_str()); + printf("Open: home %s config %s\r\n",name.c_str(),conn_config.c_str()); int ret = ::wiredtiger_open(name.c_str(), NULL, conn_config.c_str(), &conn); if (ret == ENOENT) return Status::NotFound(Slice("Database does not exist.")); @@ -314,44 +316,33 @@ DbImpl::Delete(const WriteOptions& options, const Slice& key) return WiredTigerErrorToStatus(ret, NULL); } -// Implement WriteBatch::Handler -class WriteBatchHandler : public WriteBatch::Handler { -public: - WriteBatchHandler(OperationContext *context) : context_(context), status_(0) {} - virtual ~WriteBatchHandler() {} - int GetWiredTigerStatus() { return status_; } - - virtual void Put(const Slice& key, const Slice& value) { - WT_CURSOR *cursor = context_->GetCursor(); - WT_ITEM item; - - item.data = key.data(); - item.size = key.size(); - cursor->set_key(cursor, &item); - item.data = value.data(); - item.size = value.size(); - cursor->set_value(cursor, &item); - int ret = cursor->insert(cursor); - if (ret != 0 && status_ == 0) - status_ = ret; - } +void +WriteBatchHandler::Put(const Slice& key, const Slice& value) { + WT_CURSOR *cursor = context_->GetCursor(); + WT_ITEM item; - virtual void Delete(const Slice& key) { - WT_CURSOR *cursor = context_->GetCursor(); - WT_ITEM item; + item.data = key.data(); + item.size = key.size(); + cursor->set_key(cursor, &item); + item.data = value.data(); + item.size = value.size(); + cursor->set_value(cursor, &item); + int ret = cursor->insert(cursor); + if (ret != 0 && status_ == 0) + status_ = ret; +} - item.data = key.data(); - item.size = key.size(); - cursor->set_key(cursor, &item); - int ret = cursor->remove(cursor); - if (ret != 0 && status_ == 0) - status_ = ret; - } +void WriteBatchHandler::Delete(const Slice& key) { + WT_CURSOR *cursor = context_->GetCursor(); + WT_ITEM item; -private: - OperationContext *context_; - int status_; -}; + item.data = key.data(); + item.size = key.size(); + cursor->set_key(cursor, &item); + int ret = cursor->remove(cursor); + if (ret != 0 && status_ == 0) + status_ = ret; +} // Apply the specified updates to the database. // Returns OK on success, non-OK on failure. @@ -371,9 +362,14 @@ DbImpl::Write(const WriteOptions& options, WriteBatch* updates) goto err; } - WriteBatchHandler handler(context); - status = updates->Iterate(&handler); - if ((ret = handler.GetWiredTigerStatus()) != WT_DEADLOCK) + WriteBatchHandler handler(this, context); + try { + status = updates->Iterate(&handler); + } catch(...) { + (void)session->rollback_transaction(session, NULL); + throw; + } + if (!status.ok() || (ret = handler.GetWiredTigerStatus()) != WT_DEADLOCK) break; // Roll back the transaction on deadlock so we can try again if ((ret = session->rollback_transaction(session, NULL)) != 0) { @@ -382,10 +378,13 @@ DbImpl::Write(const WriteOptions& options, WriteBatch* updates) } } - if (status.ok() && ret == 0) + if (status.ok() && ret == 0) { ret = session->commit_transaction(session, NULL); - else if (ret == 0) - ret = session->rollback_transaction(session, NULL); + } else { + t_ret = session->rollback_transaction(session, NULL); + if (ret == 0) + ret = t_ret; + } err: if (status.ok() && ret != 0) @@ -475,7 +474,8 @@ DbImpl::NewIterator(const ReadOptions& options) WT_SESSION *session = context->GetSession(); WT_CURSOR *c = context->GetCursor(); WT_CURSOR *iterc; - int ret = session->open_cursor(session, NULL, c, NULL, &iterc); + /* XXX would like a fast duplicate for LSM cursors without position. */ + int ret = session->open_cursor(session, c->uri, NULL, NULL, &iterc); assert(ret == 0); return new IteratorImpl(this, iterc); } @@ -492,11 +492,11 @@ SnapshotImpl::SnapshotImpl(DbImpl *db) : const Snapshot * DbImpl::GetSnapshot() { - SnapshotImpl *snapshot = new SnapshotImpl(this); - WT_SESSION *session = snapshot->GetContext()->GetSession(); + SnapshotImpl *si = new SnapshotImpl(this); + WT_SESSION *session = si->GetContext()->GetSession(); int ret = session->begin_transaction(session, NULL); assert(ret == 0); - return snapshot; + return si; } // Release a previously acquired snapshot. The caller must not @@ -507,6 +507,7 @@ DbImpl::ReleaseSnapshot(const Snapshot* snapshot) SnapshotImpl *si = static_cast<SnapshotImpl *>(const_cast<Snapshot *>(snapshot)); if (si != NULL) { + WT_SESSION *session = si->GetContext()->GetSession(); // We started a transaction: we could commit it here, but it will be rolled // back automatically by closing the session, which we have to do anyway. int ret = si->GetContext()->Close(); |