From 8e6f9467947e68bf00ea1998ecec0856ea308df3 Mon Sep 17 00:00:00 2001 From: Susan LoVerso Date: Thu, 7 Aug 2014 14:27:20 -0400 Subject: Add replay code to Hyperleveldb port. #1106 --- api/leveldb/hyper_wt.cc | 242 +++++++++++++++++++++++++++++++++++++++++++---- api/leveldb/leveldb_wt.h | 1 + 2 files changed, 227 insertions(+), 16 deletions(-) (limited to 'api') diff --git a/api/leveldb/hyper_wt.cc b/api/leveldb/hyper_wt.cc index c2cc4f9f64c..3cce85941d4 100644 --- a/api/leveldb/hyper_wt.cc +++ b/api/leveldb/hyper_wt.cc @@ -29,8 +29,8 @@ #include #include -using leveldb::Status; using leveldb::ReplayIterator; +using leveldb::Status; // Fill in missing methods from the interface ReplayIterator::ReplayIterator() {} @@ -38,53 +38,239 @@ ReplayIterator::~ReplayIterator() {} class ReplayIteratorImpl : public ReplayIterator { public: - ReplayIteratorImpl() {} + ReplayIteratorImpl(OperationContext *context) : context_(context), cursor_(NULL) { + WT_SESSION *session = context_->GetSession(); + int ret = session->open_cursor( + session, "log:", NULL, NULL, &cursor_); + status_ = WiredTigerErrorToStatus(ret); + valid_ = false; + // Position on first record. valid_ will be set appropriately. + Next(); + } + + ReplayIteratorImpl(OperationContext *context, const std::string& timestamp) : + context_(context), cursor_(NULL) { + + WT_SESSION *session = context_->GetSession(); + int ret = session->open_cursor( + session, "log:", NULL, NULL, &cursor_); + status_ = WiredTigerErrorToStatus(ret); + valid_ = false; + // Position on requested record. valid_ will be set appropriately. + SkipTo(timestamp); + } // An iterator is either positioned at a deleted key, present key/value pair, // or not valid. This method returns true iff the iterator is valid. - virtual bool Valid() { return false; } + virtual bool Valid() { return valid_; } // Moves to the next entry in the source. After this call, Valid() is // true iff the iterator was not positioned at the last entry in the source. // REQUIRES: Valid() - virtual void Next() { } + virtual void Next(); // Position at the first key in the source that at or past target for this // pass. Note that this is unlike the Seek call, as the ReplayIterator is // unsorted. // The iterator is Valid() after this call iff the source contains // an entry that comes at or past target. - virtual void SkipTo(const Slice& target) { } - virtual void SkipToLast() { } + virtual void SkipTo(const Slice& target) { + // Assume target data is a timestamp string for the moment. + SkipTo(std::string((char *)target.data())); } + virtual void SkipTo(const std::string& timestamp); + virtual void SkipToLast(); // Return true if the current entry points to a key-value pair. If this // returns false, it means the current entry is a deleted entry. - virtual bool HasValue() { return false; } + virtual bool HasValue() { + assert(Valid()); + if (optype == WT_LOGOP_ROW_PUT || + optype == WT_LOGOP_COL_PUT) + return true; + else + return false; + } + + int Compare(ReplayIteratorImpl* other) { + int cmp; + assert(Valid()); + // assert(other->Valid()); + int ret = cursor_->compare(cursor_, other->cursor_, &cmp); + status_ = WiredTigerErrorToStatus(ret); + return (cmp); + } // Return the key for the current entry. The underlying storage for // the returned slice is valid only until the next modification of // the iterator. // REQUIRES: Valid() - virtual Slice key() const { return Slice(); } + virtual Slice key() const { return Slice((const char *)key_.data, key_.size); } // Return the value for the current entry. The underlying storage for // the returned slice is valid only until the next modification of // the iterator. // REQUIRES: !AtEnd() && !AtStart() - virtual Slice value() const { return Slice(); } + virtual Slice value() const { return Slice((const char *)value_.data, value_.size); } // If an error has occurred, return it. Else return an ok status. - virtual Status status() const { return Status::NotSupported("ReplayIterator"); } + virtual Status status() const { return status_; } // must be released by giving it back to the DB - virtual ~ReplayIteratorImpl() { } + virtual ~ReplayIteratorImpl() { + int ret = Close(); + assert(ret == 0); + } + + std::string GetTimestamp() { + char lsn[256]; + assert(Valid()); + snprintf(lsn, sizeof(lsn), WT_TIMESTAMP_FORMAT, + lsn_.file, lsn_.offset); + return (std::string(lsn)); + } + + int Close() { + int ret = 0; + if (cursor_ != NULL) + ret = cursor_->close(cursor_); + status_ = WiredTigerErrorToStatus(ret); + valid_ = false; + cursor_ = NULL; + return (ret); + } private: + void SkipTo(WT_LSN *lsn); // No copying allowed ReplayIteratorImpl(const ReplayIterator&) { } void operator=(const ReplayIterator&) { } + OperationContext *context_; + Status status_; + WT_CURSOR *cursor_; + WT_ITEM key_, value_; + WT_LSN lsn_; + bool valid_; + uint64_t txnid; + uint32_t fileid, opcount, optype, rectype; }; +void +ReplayIteratorImpl::Next() { + int ret = 0; + + if (cursor_ != NULL) { + while ((ret = cursor_->next(cursor_)) == 0) { + ret = cursor_->get_key(cursor_, + &lsn_.file, &lsn_.offset, &opcount); + if (ret != 0) + break; + ret = cursor_->get_value(cursor_, + &txnid, &rectype, &optype, &fileid, &key_, &value_); + if (ret != 0) + break; + // Next() is only interested in modification operations. + // Continue for any other type of record. + if (optype == WT_LOGOP_COL_PUT || + optype == WT_LOGOP_COL_REMOVE || + optype == WT_LOGOP_ROW_PUT || + optype == WT_LOGOP_ROW_REMOVE) { + valid_ = true; + break; + } + } + status_ = WiredTigerErrorToStatus(ret); + if (ret != 0) { + valid_ = false; + ret = Close(); + assert(ret == 0); + } + } +} + +void +ReplayIteratorImpl::SkipToLast() { + int ret = 0; + WT_LSN last_lsn; + + last_lsn.file = 0; + if (cursor_ != NULL) { + // Walk the log to the end, then set the cursor on the + // last valid LSN we saw. + while ((ret = cursor_->next(cursor_)) == 0) { + ret = cursor_->get_key(cursor_, + &lsn_.file, &lsn_.offset, &opcount); + if (ret != 0) + break; + ret = cursor_->get_value(cursor_, + &txnid, &rectype, &optype, &fileid, &key_, &value_); + if (ret != 0) + break; + // We're only interested in modification operations. + // Continue for any other type of record. + if (optype == WT_LOGOP_COL_PUT || + optype == WT_LOGOP_COL_REMOVE || + optype == WT_LOGOP_ROW_PUT || + optype == WT_LOGOP_ROW_REMOVE) { + valid_ = true; + last_lsn = lsn_; + } + } + // We reached the end of log + if (ret != WT_NOTFOUND || last_lsn.file == 0) { + valid_ = false; + ret = Close(); + assert(ret == 0); + } else + SkipTo(&last_lsn); + } +} + +void +ReplayIteratorImpl::SkipTo(const std::string& timestamp) { + WT_LSN target_lsn; + int ret = 0; + + sscanf(timestamp.c_str(), WT_TIMESTAMP_FORMAT, + &target_lsn.file, &target_lsn.offset); + SkipTo(&target_lsn); +} + +// Set the cursor on the first modification record at or after the +// given LSN. +void +ReplayIteratorImpl::SkipTo(WT_LSN *target_lsn) { + int ret = 0; + + valid_ = false; + if (cursor_ != NULL) { + cursor_->set_key(cursor_, + target_lsn->file, target_lsn->offset, 0, 0); + ret = cursor_->search(cursor_); + status_ = WiredTigerErrorToStatus(ret); + if (ret != 0) + return; + // If we were successful, set up the info. + ret = cursor_->get_key(cursor_, + &lsn_.file, &lsn_.offset, &opcount); + status_ = WiredTigerErrorToStatus(ret); + if (ret != 0) + return; + ret = cursor_->get_value(cursor_, + &txnid, &rectype, &optype, &fileid, &key_, &value_); + status_ = WiredTigerErrorToStatus(ret); + if (ret != 0) + return; + valid_ = true; + // We're only interested in modification operations. + // Continue for any other type of record. + if (optype == WT_LOGOP_COL_PUT || + optype == WT_LOGOP_COL_REMOVE || + optype == WT_LOGOP_ROW_PUT || + optype == WT_LOGOP_ROW_REMOVE) + Next(); + } +} + // Create a live backup of a live LevelDB instance. // The backup is stored in a directory named "backup-" under the top // level of the open LevelDB database. The implementation is permitted, and @@ -102,7 +288,12 @@ DbImpl::LiveBackup(const Slice& name) void DbImpl::GetReplayTimestamp(std::string* timestamp) { - *timestamp = std::string("current lsn"); + OperationContext *context = GetContext(); + ReplayIteratorImpl *iter = new ReplayIteratorImpl(context); + + iter->SkipToLast(); + *timestamp = iter->GetTimestamp(); + ReleaseReplayIterator(iter); } // Set the lower bound for manual garbage collection. This method only takes @@ -116,14 +307,32 @@ DbImpl::AllowGarbageCollectBeforeTimestamp(const std::string& timestamp) bool DbImpl::ValidateTimestamp(const std::string& timestamp) { - return false; + bool valid; + OperationContext *context = GetContext(); + ReplayIteratorImpl *iter = new ReplayIteratorImpl(context); + + iter->SkipTo(timestamp); + valid = iter->Valid(); + ReleaseReplayIterator(iter); + return valid; } // Compare two timestamps and return -1, 0, 1 for lt, eq, gt int DbImpl::CompareTimestamps(const std::string& lhs, const std::string& rhs) { - return 0; + OperationContext *context = GetContext(); + ReplayIteratorImpl *lhiter = new ReplayIteratorImpl(context); + ReplayIteratorImpl *rhiter = new ReplayIteratorImpl(context); + int cmp = 0; + + lhiter->SkipTo(lhs); + rhiter->SkipTo(rhs); + if (lhiter->Valid() && rhiter->Valid()) + cmp = lhiter->Compare(rhiter); + ReleaseReplayIterator(lhiter); + ReleaseReplayIterator(rhiter); + return cmp; } // Return a ReplayIterator that returns every write operation performed after @@ -132,8 +341,9 @@ Status DbImpl::GetReplayIterator(const std::string& timestamp, ReplayIterator** iter) { - *iter = new ReplayIteratorImpl(); - return Status::NotSupported("DB::GetReplayIterator"); + OperationContext *context = GetContext(); + *iter = new ReplayIteratorImpl(context, timestamp); + return ((*iter)->status()); } // Release a previously allocated replay iterator. diff --git a/api/leveldb/leveldb_wt.h b/api/leveldb/leveldb_wt.h index 0d169f81782..848efec5f49 100644 --- a/api/leveldb/leveldb_wt.h +++ b/api/leveldb/leveldb_wt.h @@ -51,6 +51,7 @@ // Note: LSM doesn't split, build full pages from the start #define WT_TABLE_CONFIG "type=lsm,split_pct=100,leaf_item_max=1KB," \ "lsm=(chunk_size=100MB,bloom_config=(leaf_page_max=8MB))," +#define WT_TIMESTAMP_FORMAT "%d.%llu" using leveldb::Cache; using leveldb::FilterPolicy; -- cgit v1.2.1