summaryrefslogtreecommitdiff
path: root/api
diff options
context:
space:
mode:
authorMichael Cahill <michael.cahill@wiredtiger.com>2014-07-02 22:39:14 +1000
committerMichael Cahill <michael.cahill@wiredtiger.com>2014-07-02 22:39:14 +1000
commitdd246265793c035cbfdde034efa61c3014643f4f (patch)
treee5ad33e1cda7f971a1bfa9973dcdc3bd6111184a /api
parentf64b96f97d1a31720dd0e1e4142b962c8381f413 (diff)
downloadmongo-dd246265793c035cbfdde034efa61c3014643f4f.tar.gz
Finish implementation of enough of the RocksDB API to allow MongoDB to run.
Diffstat (limited to 'api')
-rw-r--r--api/leveldb/leveldb_wt.cc95
-rw-r--r--api/leveldb/leveldb_wt.h114
-rw-r--r--api/leveldb/rocks_wt.cc123
3 files changed, 221 insertions, 111 deletions
diff --git a/api/leveldb/leveldb_wt.cc b/api/leveldb/leveldb_wt.cc
index 10543676a84..2489b549c5e 100644
--- a/api/leveldb/leveldb_wt.cc
+++ b/api/leveldb/leveldb_wt.cc
@@ -239,10 +239,12 @@ leveldb::DB::Open(const Options &options, const std::string &name, leveldb::DB *
}
if (options.error_if_exists)
s_conn << "exclusive,";
+#if 0
if (options.compression == kSnappyCompression)
s_conn << "extensions=[libwiredtiger_snappy.so],";
+#endif
size_t cache_size = 2 * options.write_buffer_size;
- cache_size += options.max_open_files * (4 << 20);
+ cache_size += (size_t)options.max_open_files * (4 << 20);
if (options.block_cache)
cache_size += ((CacheImpl *)options.block_cache)->capacity_;
else
@@ -251,7 +253,7 @@ leveldb::DB::Open(const Options &options, const std::string &name, leveldb::DB *
std::string conn_config = s_conn.str();
WT_CONNECTION *conn;
- fprintf(stderr,"Open: Home %s config %s\r\n",name.c_str(),conn_config.c_str());
+ printf("Open: home %s config %s\r\n",name.c_str(),conn_config.c_str());
int ret = ::wiredtiger_open(name.c_str(), NULL, conn_config.c_str(), &conn);
if (ret == ENOENT)
return Status::NotFound(Slice("Database does not exist."));
@@ -314,44 +316,33 @@ DbImpl::Delete(const WriteOptions& options, const Slice& key)
return WiredTigerErrorToStatus(ret, NULL);
}
-// Implement WriteBatch::Handler
-class WriteBatchHandler : public WriteBatch::Handler {
-public:
- WriteBatchHandler(OperationContext *context) : context_(context), status_(0) {}
- virtual ~WriteBatchHandler() {}
- int GetWiredTigerStatus() { return status_; }
-
- virtual void Put(const Slice& key, const Slice& value) {
- WT_CURSOR *cursor = context_->GetCursor();
- WT_ITEM item;
-
- item.data = key.data();
- item.size = key.size();
- cursor->set_key(cursor, &item);
- item.data = value.data();
- item.size = value.size();
- cursor->set_value(cursor, &item);
- int ret = cursor->insert(cursor);
- if (ret != 0 && status_ == 0)
- status_ = ret;
- }
+void
+WriteBatchHandler::Put(const Slice& key, const Slice& value) {
+ WT_CURSOR *cursor = context_->GetCursor();
+ WT_ITEM item;
- virtual void Delete(const Slice& key) {
- WT_CURSOR *cursor = context_->GetCursor();
- WT_ITEM item;
+ item.data = key.data();
+ item.size = key.size();
+ cursor->set_key(cursor, &item);
+ item.data = value.data();
+ item.size = value.size();
+ cursor->set_value(cursor, &item);
+ int ret = cursor->insert(cursor);
+ if (ret != 0 && status_ == 0)
+ status_ = ret;
+}
- item.data = key.data();
- item.size = key.size();
- cursor->set_key(cursor, &item);
- int ret = cursor->remove(cursor);
- if (ret != 0 && status_ == 0)
- status_ = ret;
- }
+void WriteBatchHandler::Delete(const Slice& key) {
+ WT_CURSOR *cursor = context_->GetCursor();
+ WT_ITEM item;
-private:
- OperationContext *context_;
- int status_;
-};
+ item.data = key.data();
+ item.size = key.size();
+ cursor->set_key(cursor, &item);
+ int ret = cursor->remove(cursor);
+ if (ret != 0 && status_ == 0)
+ status_ = ret;
+}
// Apply the specified updates to the database.
// Returns OK on success, non-OK on failure.
@@ -371,9 +362,14 @@ DbImpl::Write(const WriteOptions& options, WriteBatch* updates)
goto err;
}
- WriteBatchHandler handler(context);
- status = updates->Iterate(&handler);
- if ((ret = handler.GetWiredTigerStatus()) != WT_DEADLOCK)
+ WriteBatchHandler handler(this, context);
+ try {
+ status = updates->Iterate(&handler);
+ } catch(...) {
+ (void)session->rollback_transaction(session, NULL);
+ throw;
+ }
+ if (!status.ok() || (ret = handler.GetWiredTigerStatus()) != WT_DEADLOCK)
break;
// Roll back the transaction on deadlock so we can try again
if ((ret = session->rollback_transaction(session, NULL)) != 0) {
@@ -382,10 +378,13 @@ DbImpl::Write(const WriteOptions& options, WriteBatch* updates)
}
}
- if (status.ok() && ret == 0)
+ if (status.ok() && ret == 0) {
ret = session->commit_transaction(session, NULL);
- else if (ret == 0)
- ret = session->rollback_transaction(session, NULL);
+ } else {
+ t_ret = session->rollback_transaction(session, NULL);
+ if (ret == 0)
+ ret = t_ret;
+ }
err:
if (status.ok() && ret != 0)
@@ -475,7 +474,8 @@ DbImpl::NewIterator(const ReadOptions& options)
WT_SESSION *session = context->GetSession();
WT_CURSOR *c = context->GetCursor();
WT_CURSOR *iterc;
- int ret = session->open_cursor(session, NULL, c, NULL, &iterc);
+ /* XXX would like a fast duplicate for LSM cursors without position. */
+ int ret = session->open_cursor(session, c->uri, NULL, NULL, &iterc);
assert(ret == 0);
return new IteratorImpl(this, iterc);
}
@@ -492,11 +492,11 @@ SnapshotImpl::SnapshotImpl(DbImpl *db) :
const Snapshot *
DbImpl::GetSnapshot()
{
- SnapshotImpl *snapshot = new SnapshotImpl(this);
- WT_SESSION *session = snapshot->GetContext()->GetSession();
+ SnapshotImpl *si = new SnapshotImpl(this);
+ WT_SESSION *session = si->GetContext()->GetSession();
int ret = session->begin_transaction(session, NULL);
assert(ret == 0);
- return snapshot;
+ return si;
}
// Release a previously acquired snapshot. The caller must not
@@ -507,6 +507,7 @@ DbImpl::ReleaseSnapshot(const Snapshot* snapshot)
SnapshotImpl *si =
static_cast<SnapshotImpl *>(const_cast<Snapshot *>(snapshot));
if (si != NULL) {
+ WT_SESSION *session = si->GetContext()->GetSession();
// We started a transaction: we could commit it here, but it will be rolled
// back automatically by closing the session, which we have to do anyway.
int ret = si->GetContext()->Close();
diff --git a/api/leveldb/leveldb_wt.h b/api/leveldb/leveldb_wt.h
index 462b1fcc865..489240af86f 100644
--- a/api/leveldb/leveldb_wt.h
+++ b/api/leveldb/leveldb_wt.h
@@ -74,21 +74,6 @@ using leveldb::ColumnFamilyHandle;
extern Status WiredTigerErrorToStatus(int wiredTigerError, const char *msg = "");
-class CacheImpl : public Cache {
-public:
- CacheImpl(size_t capacity) : Cache(), capacity_(capacity) {}
-
- virtual Handle* Insert(const Slice& key, void* value, size_t charge,
- void (*deleter)(const Slice& key, void* value)) { return 0; }
- virtual Handle* Lookup(const Slice& key) { return 0; }
- virtual void Release(Handle* handle) {}
- virtual void* Value(Handle* handle) { return 0; }
- virtual void Erase(const Slice& key) {}
- virtual uint64_t NewId() { return 0; }
-
- size_t capacity_;
-};
-
/* POSIX thread-local storage */
template <class T>
class ThreadLocal {
@@ -170,6 +155,42 @@ private:
#endif
};
+class CacheImpl : public Cache {
+public:
+ CacheImpl(size_t capacity) : Cache(), capacity_(capacity) {}
+
+ virtual Handle* Insert(const Slice& key, void* value, size_t charge,
+ void (*deleter)(const Slice& key, void* value)) { return 0; }
+ virtual Handle* Lookup(const Slice& key) { return 0; }
+ virtual void Release(Handle* handle) {}
+ virtual void* Value(Handle* handle) { return 0; }
+ virtual void Erase(const Slice& key) {}
+ virtual uint64_t NewId() { return 0; }
+
+ size_t capacity_;
+};
+
+#ifdef HAVE_ROCKSDB
+// ColumnFamilyHandleImpl is the class that clients use to access different
+// column families. It has non-trivial destructor, which gets called when client
+// is done using the column family
+class ColumnFamilyHandleImpl : public ColumnFamilyHandle {
+ public:
+ ColumnFamilyHandleImpl(DbImpl* db, std::string const &name, uint32_t id) : db_(db), id_(id), name_(name) {}
+ ColumnFamilyHandleImpl(const ColumnFamilyHandleImpl &copyfrom) : db_(copyfrom.db_), id_(copyfrom.id_), name_(copyfrom.name_) {}
+ virtual ~ColumnFamilyHandleImpl() {}
+ virtual uint32_t GetID() const { return id_; }
+
+ std::string const &GetName() const { return name_; }
+ std::string const GetURI() const { return "table:" + name_; }
+
+ private:
+ DbImpl* db_;
+ uint32_t id_;
+ std::string const name_;
+};
+#endif
+
class IteratorImpl : public Iterator {
public:
IteratorImpl(DbImpl *db, WT_CURSOR *cursor) : db_(db), cursor_(cursor), own_cursor_(true) {}
@@ -338,6 +359,13 @@ public:
// Flush all mem-table data.
virtual Status Flush(const FlushOptions& options,
ColumnFamilyHandle* column_family);
+
+ ColumnFamilyHandleImpl *GetCF(uint32_t column_family_id) {
+ return reinterpret_cast<ColumnFamilyHandleImpl *>(columns_.at(column_family_id));
+ }
+ void SetColumns(std::vector<ColumnFamilyHandle *> &cols) {
+ columns_ = cols;
+ }
#endif
virtual Iterator* NewIterator(const ReadOptions& options);
@@ -357,26 +385,26 @@ public:
virtual void ResumeCompactions();
+ OperationContext *GetContext() {
+ OperationContext *ctx = context_->Get();
+ if (ctx == NULL) {
+ ctx = NewContext();
+ context_->Set(ctx);
+ }
+ return (ctx);
+ }
+
private:
WT_CONNECTION *conn_;
ThreadLocal<OperationContext> *context_;
#ifdef HAVE_ROCKSDB
- int numColumns_;
+ std::vector<ColumnFamilyHandle*> columns_;
#endif
OperationContext *NewContext() {
return new OperationContext(conn_);
}
- OperationContext *GetContext() {
- OperationContext *ctx = context_->Get();
- if (ctx == NULL) {
- ctx = NewContext();
- context_->Set(ctx);
- }
- return (ctx);
- }
-
OperationContext *GetContext(const ReadOptions &options) {
if (options.snapshot == NULL)
return GetContext();
@@ -393,24 +421,28 @@ private:
void operator=(const DbImpl&);
};
-#ifdef HAVE_ROCKSDB
-// ColumnFamilyHandleImpl is the class that clients use to access different
-// column families. It has non-trivial destructor, which gets called when client
-// is done using the column family
-class ColumnFamilyHandleImpl : public ColumnFamilyHandle {
- public:
- ColumnFamilyHandleImpl(DbImpl* db, std::string const &name, uint32_t id) : db_(db), id_(id), name_(name) {}
- virtual ~ColumnFamilyHandleImpl() {}
- virtual uint32_t GetID() const { return id_; }
+// Implemention of WriteBatch::Handler
+class WriteBatchHandler : public WriteBatch::Handler {
+public:
+ WriteBatchHandler(DbImpl *db, OperationContext *context) : db_(db), context_(context), status_(0) {}
+ virtual ~WriteBatchHandler() {}
+ int GetWiredTigerStatus() { return status_; }
- std::string const &GetName() const { return name_; }
- std::string const GetURI() const { return "table:" + name_; }
+ virtual void Put(const Slice& key, const Slice& value);
- private:
- DbImpl* db_;
- uint32_t id_;
- std::string const name_;
-};
+ virtual void Delete(const Slice& key);
+
+#ifdef HAVE_ROCKSDB
+ // Implementations are in rocksdb_wt.cc
+ virtual Status PutCF(uint32_t column_family_id, const Slice& key,
+ const Slice& value);
+ virtual Status DeleteCF(uint32_t column_family_id, const Slice& key);
#endif
+private:
+ DbImpl *db_;
+ OperationContext *context_;
+ int status_;
+};
+
#endif
diff --git a/api/leveldb/rocks_wt.cc b/api/leveldb/rocks_wt.cc
index f690e3e8221..37ecc42f524 100644
--- a/api/leveldb/rocks_wt.cc
+++ b/api/leveldb/rocks_wt.cc
@@ -45,10 +45,69 @@ using leveldb::Slice;
using leveldb::Snapshot;
using leveldb::Status;
+static int
+wtrocks_get_cursor(OperationContext *context, ColumnFamilyHandle *cfhp, WT_CURSOR **cursorp)
+{
+ ColumnFamilyHandleImpl *cf =
+ reinterpret_cast<ColumnFamilyHandleImpl *>(cfhp);
+ WT_CURSOR *c = context->GetCursor(cf->GetID());
+ if (c == NULL) {
+ WT_SESSION *session = context->GetSession();
+ int ret;
+ if ((ret = session->open_cursor(
+ session, cf->GetURI().c_str(), NULL, NULL, &c)) != 0) {
+ fprintf(stderr, "Failed to open cursor on %s: %s\n", cf->GetURI().c_str(), wiredtiger_strerror(ret));
+ return (ret);
+ }
+ context->SetCursor(cf->GetID(), c);
+ }
+ *cursorp = c;
+ return (0);
+}
+
Status
-DB::ListColumnFamilies(Options const &, std::string const &, std::vector<std::string> *)
+DB::ListColumnFamilies(
+ Options const &options, std::string const &name,
+ std::vector<std::string> *column_families)
{
- return WiredTigerErrorToStatus(ENOTSUP);
+ std::vector<std::string> cf;
+ DB *dbptr;
+ Status status = DB::Open(options, name, &dbptr);
+ if (!status.ok())
+ return status;
+ DbImpl *db = reinterpret_cast<DbImpl *>(dbptr);
+ OperationContext *context = db->GetContext();
+ WT_SESSION *session = context->GetSession();
+ WT_CURSOR *c;
+ int ret = session->open_cursor(session, "metadata:", NULL, NULL, &c);
+ if (ret != 0)
+ goto err;
+ c->set_key(c, "table:");
+ /* Position on the first table entry */
+ int cmp;
+ ret = c->search_near(c, &cmp);
+ if (ret != 0 || (cmp < 0 && (ret = c->next(c)) != 0))
+ goto err;
+ /* Add entries while we are getting "table" URIs. */
+ for (; ret == 0; ret = c->next(c)) {
+ const char *key;
+ if ((ret = c->get_key(c, &key)) != 0)
+ goto err;
+ if (strncmp(key, "table:", strlen("table:")) != 0)
+ break;
+ cf.push_back(std::string(key + strlen("table:")));
+ }
+
+err: delete db;
+ /*
+ * WT_NOTFOUND is not an error: it just means we got to the end of the
+ * list of tables.
+ */
+ if (ret == 0 || ret == WT_NOTFOUND) {
+ *column_families = cf;
+ ret = 0;
+ }
+ return WiredTigerErrorToStatus(ret);
}
Status
@@ -63,7 +122,7 @@ DB::Open(Options const &options, std::string const &name, const std::vector<Colu
for (size_t i = 0; i < column_families.size(); i++)
cfhandles[i] = new ColumnFamilyHandleImpl(
db, column_families[i].name, (int)i);
- *handles = cfhandles;
+ db->SetColumns(*handles = cfhandles);
return Status::OK();
}
@@ -78,6 +137,40 @@ WriteBatch::Handler::LogData(const Slice& blob)
}
Status
+WriteBatchHandler::PutCF(
+ uint32_t column_family_id, const Slice& key, const Slice& value)
+{
+ WT_CURSOR *cursor;
+ int ret = wtrocks_get_cursor(context_, db_->GetCF(column_family_id), &cursor);
+ if (ret != 0)
+ return WiredTigerErrorToStatus(ret);
+ WT_ITEM item;
+ item.data = key.data();
+ item.size = key.size();
+ cursor->set_key(cursor, &item);
+ item.data = value.data();
+ item.size = value.size();
+ cursor->set_value(cursor, &item);
+ ret = cursor->insert(cursor);
+ return WiredTigerErrorToStatus(ret);
+}
+
+Status
+WriteBatchHandler::DeleteCF(uint32_t column_family_id, const Slice& key)
+{
+ WT_CURSOR *cursor;
+ int ret = wtrocks_get_cursor(context_, db_->GetCF(column_family_id), &cursor);
+ if (ret != 0)
+ return WiredTigerErrorToStatus(ret);
+ WT_ITEM item;
+ item.data = key.data();
+ item.size = key.size();
+ cursor->set_key(cursor, &item);
+ ret = cursor->remove(cursor);
+ return WiredTigerErrorToStatus(ret);
+}
+
+Status
DbImpl::Merge(WriteOptions const&, ColumnFamilyHandle*, Slice const&, Slice const&)
{
return WiredTigerErrorToStatus(ENOTSUP);
@@ -91,7 +184,8 @@ DbImpl::CreateColumnFamily(Options const &options, std::string const &name, Colu
int ret = wtleveldb_create(conn_, options, "table:" + name);
if (ret != 0)
return WiredTigerErrorToStatus(ret);
- *cfhp = new ColumnFamilyHandleImpl(this, name, ++numColumns_);
+ *cfhp = new ColumnFamilyHandleImpl(this, name, columns_.size());
+ columns_.push_back(*cfhp);
return Status::OK();
}
@@ -105,24 +199,6 @@ DbImpl::DropColumnFamily(ColumnFamilyHandle *cfhp)
return WiredTigerErrorToStatus(ret);
}
-static int
-wtrocks_get_cursor(OperationContext *context, ColumnFamilyHandle *cfhp, WT_CURSOR **cursorp)
-{
- ColumnFamilyHandleImpl *cf =
- reinterpret_cast<ColumnFamilyHandleImpl *>(cfhp);
- WT_CURSOR *c = context->GetCursor(cf->GetID());
- if (c == NULL) {
- WT_SESSION *session = context->GetSession();
- int ret;
- if ((ret = session->open_cursor(
- session, cf->GetURI().c_str(), NULL, NULL, &c)) != 0)
- return (ret);
- context->SetCursor(cf->GetID(), c);
- }
- *cursorp = c;
- return (0);
-}
-
Status
DbImpl::Delete(WriteOptions const &write_options, ColumnFamilyHandle *cfhp, Slice const &key)
{
@@ -195,7 +271,8 @@ DbImpl::NewIterator(ReadOptions const &options, ColumnFamilyHandle *cfhp)
WT_CURSOR *c, *iterc;
int ret = wtrocks_get_cursor(context, cfhp, &c);
assert(ret == 0);
- ret = session->open_cursor(session, NULL, c, NULL, &iterc);
+ /* XXX would like a fast duplicate for LSM cursors without position. */
+ ret = session->open_cursor(session, c->uri, NULL, NULL, &iterc);
assert(ret == 0);
return new IteratorImpl(this, iterc);
}