summaryrefslogtreecommitdiff
path: root/api
diff options
context:
space:
mode:
authorSusan LoVerso <sue@wiredtiger.com>2014-08-07 14:27:20 -0400
committerSusan LoVerso <sue@wiredtiger.com>2014-08-07 14:27:20 -0400
commit8e6f9467947e68bf00ea1998ecec0856ea308df3 (patch)
tree20554c46e80c031350008095dc19b44e4455055d /api
parent2b0508fe2907f109721425dca86a6c7ac7280787 (diff)
downloadmongo-8e6f9467947e68bf00ea1998ecec0856ea308df3.tar.gz
Add replay code to Hyperleveldb port. #1106
Diffstat (limited to 'api')
-rw-r--r--api/leveldb/hyper_wt.cc242
-rw-r--r--api/leveldb/leveldb_wt.h1
2 files changed, 227 insertions, 16 deletions
diff --git a/api/leveldb/hyper_wt.cc b/api/leveldb/hyper_wt.cc
index c2cc4f9f64c..3cce85941d4 100644
--- a/api/leveldb/hyper_wt.cc
+++ b/api/leveldb/hyper_wt.cc
@@ -29,8 +29,8 @@
#include <errno.h>
#include <sstream>
-using leveldb::Status;
using leveldb::ReplayIterator;
+using leveldb::Status;
// Fill in missing methods from the interface
ReplayIterator::ReplayIterator() {}
@@ -38,53 +38,239 @@ ReplayIterator::~ReplayIterator() {}
class ReplayIteratorImpl : public ReplayIterator {
public:
- ReplayIteratorImpl() {}
+ ReplayIteratorImpl(OperationContext *context) : context_(context), cursor_(NULL) {
+ WT_SESSION *session = context_->GetSession();
+ int ret = session->open_cursor(
+ session, "log:", NULL, NULL, &cursor_);
+ status_ = WiredTigerErrorToStatus(ret);
+ valid_ = false;
+ // Position on first record. valid_ will be set appropriately.
+ Next();
+ }
+
+ ReplayIteratorImpl(OperationContext *context, const std::string& timestamp) :
+ context_(context), cursor_(NULL) {
+
+ WT_SESSION *session = context_->GetSession();
+ int ret = session->open_cursor(
+ session, "log:", NULL, NULL, &cursor_);
+ status_ = WiredTigerErrorToStatus(ret);
+ valid_ = false;
+ // Position on requested record. valid_ will be set appropriately.
+ SkipTo(timestamp);
+ }
// An iterator is either positioned at a deleted key, present key/value pair,
// or not valid. This method returns true iff the iterator is valid.
- virtual bool Valid() { return false; }
+ virtual bool Valid() { return valid_; }
// Moves to the next entry in the source. After this call, Valid() is
// true iff the iterator was not positioned at the last entry in the source.
// REQUIRES: Valid()
- virtual void Next() { }
+ virtual void Next();
// Position at the first key in the source that at or past target for this
// pass. Note that this is unlike the Seek call, as the ReplayIterator is
// unsorted.
// The iterator is Valid() after this call iff the source contains
// an entry that comes at or past target.
- virtual void SkipTo(const Slice& target) { }
- virtual void SkipToLast() { }
+ virtual void SkipTo(const Slice& target) {
+ // Assume target data is a timestamp string for the moment.
+ SkipTo(std::string((char *)target.data())); }
+ virtual void SkipTo(const std::string& timestamp);
+ virtual void SkipToLast();
// Return true if the current entry points to a key-value pair. If this
// returns false, it means the current entry is a deleted entry.
- virtual bool HasValue() { return false; }
+ virtual bool HasValue() {
+ assert(Valid());
+ if (optype == WT_LOGOP_ROW_PUT ||
+ optype == WT_LOGOP_COL_PUT)
+ return true;
+ else
+ return false;
+ }
+
+ int Compare(ReplayIteratorImpl* other) {
+ int cmp;
+ assert(Valid());
+ // assert(other->Valid());
+ int ret = cursor_->compare(cursor_, other->cursor_, &cmp);
+ status_ = WiredTigerErrorToStatus(ret);
+ return (cmp);
+ }
// Return the key for the current entry. The underlying storage for
// the returned slice is valid only until the next modification of
// the iterator.
// REQUIRES: Valid()
- virtual Slice key() const { return Slice(); }
+ virtual Slice key() const { return Slice((const char *)key_.data, key_.size); }
// Return the value for the current entry. The underlying storage for
// the returned slice is valid only until the next modification of
// the iterator.
// REQUIRES: !AtEnd() && !AtStart()
- virtual Slice value() const { return Slice(); }
+ virtual Slice value() const { return Slice((const char *)value_.data, value_.size); }
// If an error has occurred, return it. Else return an ok status.
- virtual Status status() const { return Status::NotSupported("ReplayIterator"); }
+ virtual Status status() const { return status_; }
// must be released by giving it back to the DB
- virtual ~ReplayIteratorImpl() { }
+ virtual ~ReplayIteratorImpl() {
+ int ret = Close();
+ assert(ret == 0);
+ }
+
+ std::string GetTimestamp() {
+ char lsn[256];
+ assert(Valid());
+ snprintf(lsn, sizeof(lsn), WT_TIMESTAMP_FORMAT,
+ lsn_.file, lsn_.offset);
+ return (std::string(lsn));
+ }
+
+ int Close() {
+ int ret = 0;
+ if (cursor_ != NULL)
+ ret = cursor_->close(cursor_);
+ status_ = WiredTigerErrorToStatus(ret);
+ valid_ = false;
+ cursor_ = NULL;
+ return (ret);
+ }
private:
+ void SkipTo(WT_LSN *lsn);
// No copying allowed
ReplayIteratorImpl(const ReplayIterator&) { }
void operator=(const ReplayIterator&) { }
+ OperationContext *context_;
+ Status status_;
+ WT_CURSOR *cursor_;
+ WT_ITEM key_, value_;
+ WT_LSN lsn_;
+ bool valid_;
+ uint64_t txnid;
+ uint32_t fileid, opcount, optype, rectype;
};
+void
+ReplayIteratorImpl::Next() {
+ int ret = 0;
+
+ if (cursor_ != NULL) {
+ while ((ret = cursor_->next(cursor_)) == 0) {
+ ret = cursor_->get_key(cursor_,
+ &lsn_.file, &lsn_.offset, &opcount);
+ if (ret != 0)
+ break;
+ ret = cursor_->get_value(cursor_,
+ &txnid, &rectype, &optype, &fileid, &key_, &value_);
+ if (ret != 0)
+ break;
+ // Next() is only interested in modification operations.
+ // Continue for any other type of record.
+ if (optype == WT_LOGOP_COL_PUT ||
+ optype == WT_LOGOP_COL_REMOVE ||
+ optype == WT_LOGOP_ROW_PUT ||
+ optype == WT_LOGOP_ROW_REMOVE) {
+ valid_ = true;
+ break;
+ }
+ }
+ status_ = WiredTigerErrorToStatus(ret);
+ if (ret != 0) {
+ valid_ = false;
+ ret = Close();
+ assert(ret == 0);
+ }
+ }
+}
+
+void
+ReplayIteratorImpl::SkipToLast() {
+ int ret = 0;
+ WT_LSN last_lsn;
+
+ last_lsn.file = 0;
+ if (cursor_ != NULL) {
+ // Walk the log to the end, then set the cursor on the
+ // last valid LSN we saw.
+ while ((ret = cursor_->next(cursor_)) == 0) {
+ ret = cursor_->get_key(cursor_,
+ &lsn_.file, &lsn_.offset, &opcount);
+ if (ret != 0)
+ break;
+ ret = cursor_->get_value(cursor_,
+ &txnid, &rectype, &optype, &fileid, &key_, &value_);
+ if (ret != 0)
+ break;
+ // We're only interested in modification operations.
+ // Continue for any other type of record.
+ if (optype == WT_LOGOP_COL_PUT ||
+ optype == WT_LOGOP_COL_REMOVE ||
+ optype == WT_LOGOP_ROW_PUT ||
+ optype == WT_LOGOP_ROW_REMOVE) {
+ valid_ = true;
+ last_lsn = lsn_;
+ }
+ }
+ // We reached the end of log
+ if (ret != WT_NOTFOUND || last_lsn.file == 0) {
+ valid_ = false;
+ ret = Close();
+ assert(ret == 0);
+ } else
+ SkipTo(&last_lsn);
+ }
+}
+
+void
+ReplayIteratorImpl::SkipTo(const std::string& timestamp) {
+ WT_LSN target_lsn;
+ int ret = 0;
+
+ sscanf(timestamp.c_str(), WT_TIMESTAMP_FORMAT,
+ &target_lsn.file, &target_lsn.offset);
+ SkipTo(&target_lsn);
+}
+
+// Set the cursor on the first modification record at or after the
+// given LSN.
+void
+ReplayIteratorImpl::SkipTo(WT_LSN *target_lsn) {
+ int ret = 0;
+
+ valid_ = false;
+ if (cursor_ != NULL) {
+ cursor_->set_key(cursor_,
+ target_lsn->file, target_lsn->offset, 0, 0);
+ ret = cursor_->search(cursor_);
+ status_ = WiredTigerErrorToStatus(ret);
+ if (ret != 0)
+ return;
+ // If we were successful, set up the info.
+ ret = cursor_->get_key(cursor_,
+ &lsn_.file, &lsn_.offset, &opcount);
+ status_ = WiredTigerErrorToStatus(ret);
+ if (ret != 0)
+ return;
+ ret = cursor_->get_value(cursor_,
+ &txnid, &rectype, &optype, &fileid, &key_, &value_);
+ status_ = WiredTigerErrorToStatus(ret);
+ if (ret != 0)
+ return;
+ valid_ = true;
+ // We're only interested in modification operations.
+ // Continue for any other type of record.
+ if (optype == WT_LOGOP_COL_PUT ||
+ optype == WT_LOGOP_COL_REMOVE ||
+ optype == WT_LOGOP_ROW_PUT ||
+ optype == WT_LOGOP_ROW_REMOVE)
+ Next();
+ }
+}
+
// Create a live backup of a live LevelDB instance.
// The backup is stored in a directory named "backup-<name>" under the top
// level of the open LevelDB database. The implementation is permitted, and
@@ -102,7 +288,12 @@ DbImpl::LiveBackup(const Slice& name)
void
DbImpl::GetReplayTimestamp(std::string* timestamp)
{
- *timestamp = std::string("current lsn");
+ OperationContext *context = GetContext();
+ ReplayIteratorImpl *iter = new ReplayIteratorImpl(context);
+
+ iter->SkipToLast();
+ *timestamp = iter->GetTimestamp();
+ ReleaseReplayIterator(iter);
}
// Set the lower bound for manual garbage collection. This method only takes
@@ -116,14 +307,32 @@ DbImpl::AllowGarbageCollectBeforeTimestamp(const std::string& timestamp)
bool
DbImpl::ValidateTimestamp(const std::string& timestamp)
{
- return false;
+ bool valid;
+ OperationContext *context = GetContext();
+ ReplayIteratorImpl *iter = new ReplayIteratorImpl(context);
+
+ iter->SkipTo(timestamp);
+ valid = iter->Valid();
+ ReleaseReplayIterator(iter);
+ return valid;
}
// Compare two timestamps and return -1, 0, 1 for lt, eq, gt
int
DbImpl::CompareTimestamps(const std::string& lhs, const std::string& rhs)
{
- return 0;
+ OperationContext *context = GetContext();
+ ReplayIteratorImpl *lhiter = new ReplayIteratorImpl(context);
+ ReplayIteratorImpl *rhiter = new ReplayIteratorImpl(context);
+ int cmp = 0;
+
+ lhiter->SkipTo(lhs);
+ rhiter->SkipTo(rhs);
+ if (lhiter->Valid() && rhiter->Valid())
+ cmp = lhiter->Compare(rhiter);
+ ReleaseReplayIterator(lhiter);
+ ReleaseReplayIterator(rhiter);
+ return cmp;
}
// Return a ReplayIterator that returns every write operation performed after
@@ -132,8 +341,9 @@ Status
DbImpl::GetReplayIterator(const std::string& timestamp,
ReplayIterator** iter)
{
- *iter = new ReplayIteratorImpl();
- return Status::NotSupported("DB::GetReplayIterator");
+ OperationContext *context = GetContext();
+ *iter = new ReplayIteratorImpl(context, timestamp);
+ return ((*iter)->status());
}
// Release a previously allocated replay iterator.
diff --git a/api/leveldb/leveldb_wt.h b/api/leveldb/leveldb_wt.h
index 0d169f81782..848efec5f49 100644
--- a/api/leveldb/leveldb_wt.h
+++ b/api/leveldb/leveldb_wt.h
@@ -51,6 +51,7 @@
// Note: LSM doesn't split, build full pages from the start
#define WT_TABLE_CONFIG "type=lsm,split_pct=100,leaf_item_max=1KB," \
"lsm=(chunk_size=100MB,bloom_config=(leaf_page_max=8MB)),"
+#define WT_TIMESTAMP_FORMAT "%d.%llu"
using leveldb::Cache;
using leveldb::FilterPolicy;