summaryrefslogtreecommitdiff
path: root/db
diff options
context:
space:
mode:
authorcostan <costan@google.com>2019-03-11 13:04:53 -0700
committerChris Mumford <cmumford@google.com>2019-03-11 13:41:25 -0700
commit7d8e41e49b8fddda66a2c5f0a6a47f1a916e8d26 (patch)
tree0a5556aaf20ba27bd6ea0ab3792d1366e60b2f81 /db
parentdd906262fd364c08a652dfa914f9995f6b7608a9 (diff)
downloadleveldb-7d8e41e49b8fddda66a2c5f0a6a47f1a916e8d26.tar.gz
leveldb: Replace AtomicPointer with std::atomic.
This CL removes AtomicPointer from leveldb's port interface. Its usage is replaced with std::atomic<> from the C++11 standard library. AtomicPointer was used to wrap flags, numbers, and pointers, so its instances are replaced with std::atomic<bool>, std::atomic<int>, std::atomic<size_t> and std::atomic<Node*>. This CL does not revise the memory ordering. AtomicPointer's methods are replaced mechanically with their std::atomic equivalents, even when the underlying usage is incorrect. (Example: DBImpl::has_imm_ is written using release stores, even though it is always read using relaxed ordering.) Revising the memory ordering is left for future CLs. ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=237865146
Diffstat (limited to 'db')
-rw-r--r--db/db_bench.cc20
-rw-r--r--db/db_impl.cc33
-rw-r--r--db/db_impl.h7
-rw-r--r--db/db_test.cc135
-rw-r--r--db/skiplist.h77
-rw-r--r--db/skiplist_test.cc21
6 files changed, 143 insertions, 150 deletions
diff --git a/db/db_bench.cc b/db/db_bench.cc
index 115cf45..f9403f4 100644
--- a/db/db_bench.cc
+++ b/db/db_bench.cc
@@ -34,7 +34,6 @@
// seekrandom -- N random seeks
// open -- cost of opening a DB
// crc32c -- repeated crc32c of 4K of data
-// acquireload -- load N*1000 times
// Meta operations:
// compact -- Compact the entire DB
// stats -- Print DB stats
@@ -57,7 +56,6 @@ static const char* FLAGS_benchmarks =
"crc32c,"
"snappycomp,"
"snappyuncomp,"
- "acquireload,"
;
// Number of key/values to place in database
@@ -510,8 +508,6 @@ class Benchmark {
method = &Benchmark::Compact;
} else if (name == Slice("crc32c")) {
method = &Benchmark::Crc32c;
- } else if (name == Slice("acquireload")) {
- method = &Benchmark::AcquireLoad;
} else if (name == Slice("snappycomp")) {
method = &Benchmark::SnappyCompress;
} else if (name == Slice("snappyuncomp")) {
@@ -639,22 +635,6 @@ class Benchmark {
thread->stats.AddMessage(label);
}
- void AcquireLoad(ThreadState* thread) {
- int dummy;
- port::AtomicPointer ap(&dummy);
- int count = 0;
- void *ptr = nullptr;
- thread->stats.AddMessage("(each op is 1000 loads)");
- while (count < 100000) {
- for (int i = 0; i < 1000; i++) {
- ptr = ap.Acquire_Load();
- }
- count++;
- thread->stats.FinishedSingleOp();
- }
- if (ptr == nullptr) exit(1); // Disable unused variable warning.
- }
-
void SnappyCompress(ThreadState* thread) {
RandomGenerator gen;
Slice input = gen.Generate(Options().block_size);
diff --git a/db/db_impl.cc b/db/db_impl.cc
index fefb883..3468862 100644
--- a/db/db_impl.cc
+++ b/db/db_impl.cc
@@ -8,6 +8,7 @@
#include <stdio.h>
#include <algorithm>
+#include <atomic>
#include <set>
#include <string>
#include <vector>
@@ -132,10 +133,11 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
dbname_(dbname),
table_cache_(new TableCache(dbname_, options_, TableCacheSize(options_))),
db_lock_(nullptr),
- shutting_down_(nullptr),
+ shutting_down_(false),
background_work_finished_signal_(&mutex_),
mem_(nullptr),
imm_(nullptr),
+ has_imm_(false),
logfile_(nullptr),
logfile_number_(0),
log_(nullptr),
@@ -144,14 +146,12 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
background_compaction_scheduled_(false),
manual_compaction_(nullptr),
versions_(new VersionSet(dbname_, &options_, table_cache_,
- &internal_comparator_)) {
- has_imm_.Release_Store(nullptr);
-}
+ &internal_comparator_)) {}
DBImpl::~DBImpl() {
- // Wait for background work to finish
+ // Wait for background work to finish.
mutex_.Lock();
- shutting_down_.Release_Store(this); // Any non-null value is ok
+ shutting_down_.store(true, std::memory_order_release);
while (background_compaction_scheduled_) {
background_work_finished_signal_.Wait();
}
@@ -547,7 +547,7 @@ void DBImpl::CompactMemTable() {
Status s = WriteLevel0Table(imm_, &edit, base);
base->Unref();
- if (s.ok() && shutting_down_.Acquire_Load()) {
+ if (s.ok() && shutting_down_.load(std::memory_order_acquire)) {
s = Status::IOError("Deleting DB during memtable compaction");
}
@@ -562,7 +562,7 @@ void DBImpl::CompactMemTable() {
// Commit to the new state
imm_->Unref();
imm_ = nullptr;
- has_imm_.Release_Store(nullptr);
+ has_imm_.store(false, std::memory_order_release);
DeleteObsoleteFiles();
} else {
RecordBackgroundError(s);
@@ -610,7 +610,8 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,
}
MutexLock l(&mutex_);
- while (!manual.done && !shutting_down_.Acquire_Load() && bg_error_.ok()) {
+ while (!manual.done && !shutting_down_.load(std::memory_order_acquire) &&
+ bg_error_.ok()) {
if (manual_compaction_ == nullptr) { // Idle
manual_compaction_ = &manual;
MaybeScheduleCompaction();
@@ -652,7 +653,7 @@ void DBImpl::MaybeScheduleCompaction() {
mutex_.AssertHeld();
if (background_compaction_scheduled_) {
// Already scheduled
- } else if (shutting_down_.Acquire_Load()) {
+ } else if (shutting_down_.load(std::memory_order_acquire)) {
// DB is being deleted; no more background compactions
} else if (!bg_error_.ok()) {
// Already got an error; no more changes
@@ -673,7 +674,7 @@ void DBImpl::BGWork(void* db) {
void DBImpl::BackgroundCall() {
MutexLock l(&mutex_);
assert(background_compaction_scheduled_);
- if (shutting_down_.Acquire_Load()) {
+ if (shutting_down_.load(std::memory_order_acquire)) {
// No more background work when shutting down.
} else if (!bg_error_.ok()) {
// No more background work after a background error.
@@ -752,7 +753,7 @@ void DBImpl::BackgroundCompaction() {
if (status.ok()) {
// Done
- } else if (shutting_down_.Acquire_Load()) {
+ } else if (shutting_down_.load(std::memory_order_acquire)) {
// Ignore compaction errors found during shutting down
} else {
Log(options_.info_log,
@@ -919,9 +920,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
std::string current_user_key;
bool has_current_user_key = false;
SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
- for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
+ for (; input->Valid() && !shutting_down_.load(std::memory_order_acquire); ) {
// Prioritize immutable compaction work
- if (has_imm_.NoBarrier_Load() != nullptr) {
+ if (has_imm_.load(std::memory_order_relaxed)) {
const uint64_t imm_start = env_->NowMicros();
mutex_.Lock();
if (imm_ != nullptr) {
@@ -1014,7 +1015,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
input->Next();
}
- if (status.ok() && shutting_down_.Acquire_Load()) {
+ if (status.ok() && shutting_down_.load(std::memory_order_acquire)) {
status = Status::IOError("Deleting DB during compaction");
}
if (status.ok() && compact->builder != nullptr) {
@@ -1378,7 +1379,7 @@ Status DBImpl::MakeRoomForWrite(bool force) {
logfile_number_ = new_log_number;
log_ = new log::Writer(lfile);
imm_ = mem_;
- has_imm_.Release_Store(imm_);
+ has_imm_.store(true, std::memory_order_release);
mem_ = new MemTable(internal_comparator_);
mem_->Ref();
force = false; // Do not force another compaction if have room
diff --git a/db/db_impl.h b/db/db_impl.h
index 00e800a..ca00d42 100644
--- a/db/db_impl.h
+++ b/db/db_impl.h
@@ -5,8 +5,11 @@
#ifndef STORAGE_LEVELDB_DB_DB_IMPL_H_
#define STORAGE_LEVELDB_DB_DB_IMPL_H_
+#include <atomic>
#include <deque>
#include <set>
+#include <string>
+
#include "db/dbformat.h"
#include "db/log_writer.h"
#include "db/snapshot.h"
@@ -136,11 +139,11 @@ class DBImpl : public DB {
// State below is protected by mutex_
port::Mutex mutex_;
- port::AtomicPointer shutting_down_;
+ std::atomic<bool> shutting_down_;
port::CondVar background_work_finished_signal_ GUARDED_BY(mutex_);
MemTable* mem_;
MemTable* imm_ GUARDED_BY(mutex_); // Memtable being compacted
- port::AtomicPointer has_imm_; // So bg thread can detect non-null imm_
+ std::atomic<bool> has_imm_; // So bg thread can detect non-null imm_
WritableFile* logfile_;
uint64_t logfile_number_ GUARDED_BY(mutex_);
log::Writer* log_;
diff --git a/db/db_test.cc b/db/db_test.cc
index 894ed23..e889a74 100644
--- a/db/db_test.cc
+++ b/db/db_test.cc
@@ -2,6 +2,9 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
+#include <atomic>
+#include <string>
+
#include "leveldb/db.h"
#include "leveldb/filter_policy.h"
#include "db/db_impl.h"
@@ -61,7 +64,7 @@ class AtomicCounter {
void DelayMilliseconds(int millis) {
Env::Default()->SleepForMicroseconds(millis * 1000);
}
-}
+} // namespace
// Test Env to override default Env behavior for testing.
class TestEnv : public EnvWrapper {
@@ -93,45 +96,45 @@ class TestEnv : public EnvWrapper {
bool ignore_dot_files_;
};
-// Special Env used to delay background operations
+// Special Env used to delay background operations.
class SpecialEnv : public EnvWrapper {
public:
// sstable/log Sync() calls are blocked while this pointer is non-null.
- port::AtomicPointer delay_data_sync_;
+ std::atomic<bool> delay_data_sync_;
// sstable/log Sync() calls return an error.
- port::AtomicPointer data_sync_error_;
+ std::atomic<bool> data_sync_error_;
// Simulate no-space errors while this pointer is non-null.
- port::AtomicPointer no_space_;
+ std::atomic<bool> no_space_;
// Simulate non-writable file system while this pointer is non-null.
- port::AtomicPointer non_writable_;
+ std::atomic<bool> non_writable_;
// Force sync of manifest files to fail while this pointer is non-null.
- port::AtomicPointer manifest_sync_error_;
+ std::atomic<bool> manifest_sync_error_;
// Force write to manifest files to fail while this pointer is non-null.
- port::AtomicPointer manifest_write_error_;
+ std::atomic<bool> manifest_write_error_;
bool count_random_reads_;
AtomicCounter random_read_counter_;
- explicit SpecialEnv(Env* base) : EnvWrapper(base) {
- delay_data_sync_.Release_Store(nullptr);
- data_sync_error_.Release_Store(nullptr);
- no_space_.Release_Store(nullptr);
- non_writable_.Release_Store(nullptr);
- count_random_reads_ = false;
- manifest_sync_error_.Release_Store(nullptr);
- manifest_write_error_.Release_Store(nullptr);
+ explicit SpecialEnv(Env* base) : EnvWrapper(base),
+ delay_data_sync_(false),
+ data_sync_error_(false),
+ no_space_(false),
+ non_writable_(false),
+ manifest_sync_error_(false),
+ manifest_write_error_(false),
+ count_random_reads_(false) {
}
Status NewWritableFile(const std::string& f, WritableFile** r) {
class DataFile : public WritableFile {
private:
- SpecialEnv* env_;
- WritableFile* base_;
+ SpecialEnv* const env_;
+ WritableFile* const base_;
public:
DataFile(SpecialEnv* env, WritableFile* base)
@@ -140,7 +143,7 @@ class SpecialEnv : public EnvWrapper {
}
~DataFile() { delete base_; }
Status Append(const Slice& data) {
- if (env_->no_space_.Acquire_Load() != nullptr) {
+ if (env_->no_space_.load(std::memory_order_acquire)) {
// Drop writes on the floor
return Status::OK();
} else {
@@ -150,10 +153,10 @@ class SpecialEnv : public EnvWrapper {
Status Close() { return base_->Close(); }
Status Flush() { return base_->Flush(); }
Status Sync() {
- if (env_->data_sync_error_.Acquire_Load() != nullptr) {
+ if (env_->data_sync_error_.load(std::memory_order_acquire)) {
return Status::IOError("simulated data sync error");
}
- while (env_->delay_data_sync_.Acquire_Load() != nullptr) {
+ while (env_->delay_data_sync_.load(std::memory_order_acquire)) {
DelayMilliseconds(100);
}
return base_->Sync();
@@ -167,7 +170,7 @@ class SpecialEnv : public EnvWrapper {
ManifestFile(SpecialEnv* env, WritableFile* b) : env_(env), base_(b) { }
~ManifestFile() { delete base_; }
Status Append(const Slice& data) {
- if (env_->manifest_write_error_.Acquire_Load() != nullptr) {
+ if (env_->manifest_write_error_.load(std::memory_order_acquire)) {
return Status::IOError("simulated writer error");
} else {
return base_->Append(data);
@@ -176,7 +179,7 @@ class SpecialEnv : public EnvWrapper {
Status Close() { return base_->Close(); }
Status Flush() { return base_->Flush(); }
Status Sync() {
- if (env_->manifest_sync_error_.Acquire_Load() != nullptr) {
+ if (env_->manifest_sync_error_.load(std::memory_order_acquire)) {
return Status::IOError("simulated sync error");
} else {
return base_->Sync();
@@ -184,7 +187,7 @@ class SpecialEnv : public EnvWrapper {
}
};
- if (non_writable_.Acquire_Load() != nullptr) {
+ if (non_writable_.load(std::memory_order_acquire)) {
return Status::IOError("simulated write error");
}
@@ -424,7 +427,7 @@ class DBTest {
ASSERT_TRUE(
db_->GetProperty("leveldb.num-files-at-level" + NumberToString(level),
&property));
- return atoi(property.c_str());
+ return std::stoi(property);
}
int TotalTableFiles() {
@@ -587,11 +590,13 @@ TEST(DBTest, GetFromImmutableLayer) {
ASSERT_OK(Put("foo", "v1"));
ASSERT_EQ("v1", Get("foo"));
- env_->delay_data_sync_.Release_Store(env_); // Block sync calls
- Put("k1", std::string(100000, 'x')); // Fill memtable
- Put("k2", std::string(100000, 'y')); // Trigger compaction
+ // Block sync calls.
+ env_->delay_data_sync_.store(true, std::memory_order_release);
+ Put("k1", std::string(100000, 'x')); // Fill memtable.
+ Put("k2", std::string(100000, 'y')); // Trigger compaction.
ASSERT_EQ("v1", Get("foo"));
- env_->delay_data_sync_.Release_Store(nullptr); // Release sync calls
+ // Release sync calls.
+ env_->delay_data_sync_.store(false, std::memory_order_release);
} while (ChangeOptions());
}
@@ -608,7 +613,7 @@ TEST(DBTest, GetMemUsage) {
ASSERT_OK(Put("foo", "v1"));
std::string val;
ASSERT_TRUE(db_->GetProperty("leveldb.approximate-memory-usage", &val));
- int mem_usage = atoi(val.c_str());
+ int mem_usage = std::stoi(val);
ASSERT_GT(mem_usage, 0);
ASSERT_LT(mem_usage, 5*1024*1024);
} while (ChangeOptions());
@@ -1106,7 +1111,7 @@ TEST(DBTest, RepeatedWritesToSameKey) {
for (int i = 0; i < 5 * kMaxFiles; i++) {
Put("key", value);
ASSERT_LE(TotalTableFiles(), kMaxFiles);
- fprintf(stderr, "after %d: %d files\n", int(i+1), TotalTableFiles());
+ fprintf(stderr, "after %d: %d files\n", i + 1, TotalTableFiles());
}
}
@@ -1271,7 +1276,7 @@ TEST(DBTest, IteratorPinsRef) {
// Write to force compactions
Put("foo", "newvalue1");
for (int i = 0; i < 100; i++) {
- ASSERT_OK(Put(Key(i), Key(i) + std::string(100000, 'v'))); // 100K values
+ ASSERT_OK(Put(Key(i), Key(i) + std::string(100000, 'v'))); // 100K values
}
Put("foo", "newvalue2");
@@ -1459,21 +1464,21 @@ TEST(DBTest, L0_CompactionBug_Issue44_a) {
TEST(DBTest, L0_CompactionBug_Issue44_b) {
Reopen();
- Put("","");
+ Put("", "");
Reopen();
Delete("e");
- Put("","");
+ Put("", "");
Reopen();
Put("c", "cv");
Reopen();
- Put("","");
+ Put("", "");
Reopen();
- Put("","");
+ Put("", "");
DelayMilliseconds(1000); // Wait for compaction to finish
Reopen();
- Put("d","dv");
+ Put("d", "dv");
Reopen();
- Put("","");
+ Put("", "");
Reopen();
Delete("d");
Delete("b");
@@ -1711,13 +1716,14 @@ TEST(DBTest, NoSpace) {
ASSERT_EQ("v1", Get("foo"));
Compact("a", "z");
const int num_files = CountFiles();
- env_->no_space_.Release_Store(env_); // Force out-of-space errors
+ // Force out-of-space errors.
+ env_->no_space_.store(true, std::memory_order_release);
for (int i = 0; i < 10; i++) {
for (int level = 0; level < config::kNumLevels-1; level++) {
dbfull()->TEST_CompactRange(level, nullptr, nullptr);
}
}
- env_->no_space_.Release_Store(nullptr);
+ env_->no_space_.store(false, std::memory_order_release);
ASSERT_LT(CountFiles(), num_files + 3);
}
@@ -1727,7 +1733,8 @@ TEST(DBTest, NonWritableFileSystem) {
options.env = env_;
Reopen(&options);
ASSERT_OK(Put("foo", "v1"));
- env_->non_writable_.Release_Store(env_); // Force errors for new files
+ // Force errors for new files.
+ env_->non_writable_.store(true, std::memory_order_release);
std::string big(100000, 'x');
int errors = 0;
for (int i = 0; i < 20; i++) {
@@ -1738,7 +1745,7 @@ TEST(DBTest, NonWritableFileSystem) {
}
}
ASSERT_GT(errors, 0);
- env_->non_writable_.Release_Store(nullptr);
+ env_->non_writable_.store(false, std::memory_order_release);
}
TEST(DBTest, WriteSyncError) {
@@ -1748,7 +1755,7 @@ TEST(DBTest, WriteSyncError) {
Options options = CurrentOptions();
options.env = env_;
Reopen(&options);
- env_->data_sync_error_.Release_Store(env_);
+ env_->data_sync_error_.store(true, std::memory_order_release);
// (b) Normal write should succeed
WriteOptions w;
@@ -1762,7 +1769,7 @@ TEST(DBTest, WriteSyncError) {
ASSERT_EQ("NOT_FOUND", Get("k2"));
// (d) make sync behave normally
- env_->data_sync_error_.Release_Store(nullptr);
+ env_->data_sync_error_.store(false, std::memory_order_release);
// (e) Do a non-sync write; should fail
w.sync = false;
@@ -1782,7 +1789,7 @@ TEST(DBTest, ManifestWriteError) {
// We iterate twice. In the second iteration, everything is the
// same except the log record never makes it to the MANIFEST file.
for (int iter = 0; iter < 2; iter++) {
- port::AtomicPointer* error_type = (iter == 0)
+ std::atomic<bool>* error_type = (iter == 0)
? &env_->manifest_sync_error_
: &env_->manifest_write_error_;
@@ -1802,12 +1809,12 @@ TEST(DBTest, ManifestWriteError) {
ASSERT_EQ(NumTableFilesAtLevel(last), 1); // foo=>bar is now in last level
// Merging compaction (will fail)
- error_type->Release_Store(env_);
+ error_type->store(true, std::memory_order_release);
dbfull()->TEST_CompactRange(last, nullptr, nullptr); // Should fail
ASSERT_EQ("bar", Get("foo"));
// Recovery: should not lose data
- error_type->Release_Store(nullptr);
+ error_type->store(false, std::memory_order_release);
Reopen(&options);
ASSERT_EQ("bar", Get("foo"));
}
@@ -1878,7 +1885,7 @@ TEST(DBTest, BloomFilter) {
dbfull()->TEST_CompactMemTable();
// Prevent auto compactions triggered by seeks
- env_->delay_data_sync_.Release_Store(env_);
+ env_->delay_data_sync_.store(true, std::memory_order_release);
// Lookup present keys. Should rarely read from small sstable.
env_->random_read_counter_.Reset();
@@ -1899,7 +1906,7 @@ TEST(DBTest, BloomFilter) {
fprintf(stderr, "%d missing => %d reads\n", N, reads);
ASSERT_LE(reads, 3*N/100);
- env_->delay_data_sync_.Release_Store(nullptr);
+ env_->delay_data_sync_.store(false, std::memory_order_release);
Close();
delete options.block_cache;
delete options.filter_policy;
@@ -1914,9 +1921,9 @@ static const int kNumKeys = 1000;
struct MTState {
DBTest* test;
- port::AtomicPointer stop;
- port::AtomicPointer counter[kNumThreads];
- port::AtomicPointer thread_done[kNumThreads];
+ std::atomic<bool> stop;
+ std::atomic<int> counter[kNumThreads];
+ std::atomic<bool> thread_done[kNumThreads];
};
struct MTThread {
@@ -1928,13 +1935,13 @@ static void MTThreadBody(void* arg) {
MTThread* t = reinterpret_cast<MTThread*>(arg);
int id = t->id;
DB* db = t->state->test->db_;
- uintptr_t counter = 0;
+ int counter = 0;
fprintf(stderr, "... starting thread %d\n", id);
Random rnd(1000 + id);
std::string value;
char valbuf[1500];
- while (t->state->stop.Acquire_Load() == nullptr) {
- t->state->counter[id].Release_Store(reinterpret_cast<void*>(counter));
+ while (!t->state->stop.load(std::memory_order_acquire)) {
+ t->state->counter[id].store(counter, std::memory_order_release);
int key = rnd.Uniform(kNumKeys);
char keybuf[20];
@@ -1959,14 +1966,13 @@ static void MTThreadBody(void* arg) {
ASSERT_EQ(k, key);
ASSERT_GE(w, 0);
ASSERT_LT(w, kNumThreads);
- ASSERT_LE(static_cast<uintptr_t>(c), reinterpret_cast<uintptr_t>(
- t->state->counter[w].Acquire_Load()));
+ ASSERT_LE(c, t->state->counter[w].load(std::memory_order_acquire));
}
}
counter++;
}
- t->state->thread_done[id].Release_Store(t);
- fprintf(stderr, "... stopping thread %d after %d ops\n", id, int(counter));
+ t->state->thread_done[id].store(true, std::memory_order_release);
+ fprintf(stderr, "... stopping thread %d after %d ops\n", id, counter);
}
} // namespace
@@ -1976,10 +1982,10 @@ TEST(DBTest, MultiThreaded) {
// Initialize state
MTState mt;
mt.test = this;
- mt.stop.Release_Store(0);
+ mt.stop.store(false, std::memory_order_release);
for (int id = 0; id < kNumThreads; id++) {
- mt.counter[id].Release_Store(0);
- mt.thread_done[id].Release_Store(0);
+ mt.counter[id].store(false, std::memory_order_release);
+ mt.thread_done[id].store(false, std::memory_order_release);
}
// Start threads
@@ -1994,9 +2000,9 @@ TEST(DBTest, MultiThreaded) {
DelayMilliseconds(kTestSeconds * 1000);
// Stop the threads and wait for them to finish
- mt.stop.Release_Store(&mt);
+ mt.stop.store(true, std::memory_order_release);
for (int id = 0; id < kNumThreads; id++) {
- while (mt.thread_done[id].Acquire_Load() == nullptr) {
+ while (!mt.thread_done[id].load(std::memory_order_acquire)) {
DelayMilliseconds(100);
}
}
@@ -2100,6 +2106,7 @@ class ModelDB: public DB {
virtual Slice key() const { return iter_->first; }
virtual Slice value() const { return iter_->second; }
virtual Status status() const { return Status::OK(); }
+
private:
const KVMap* const map_;
const bool owned_; // Do we own map_
diff --git a/db/skiplist.h b/db/skiplist.h
index b806ce0..7ac914b 100644
--- a/db/skiplist.h
+++ b/db/skiplist.h
@@ -27,9 +27,10 @@
//
// ... prev vs. next pointer ordering ...
-#include <assert.h>
-#include <stdlib.h>
-#include "port/port.h"
+#include <atomic>
+#include <cassert>
+#include <cstdlib>
+
#include "util/arena.h"
#include "util/random.h"
@@ -105,11 +106,10 @@ class SkipList {
// Modified only by Insert(). Read racily by readers, but stale
// values are ok.
- port::AtomicPointer max_height_; // Height of the entire list
+ std::atomic<int> max_height_; // Height of the entire list
inline int GetMaxHeight() const {
- return static_cast<int>(
- reinterpret_cast<intptr_t>(max_height_.NoBarrier_Load()));
+ return max_height_.load(std::memory_order_relaxed);
}
// Read/written only by Insert().
@@ -144,7 +144,7 @@ class SkipList {
// Implementation details follow
template<typename Key, class Comparator>
-struct SkipList<Key,Comparator>::Node {
+struct SkipList<Key, Comparator>::Node {
explicit Node(const Key& k) : key(k) { }
Key const key;
@@ -155,63 +155,63 @@ struct SkipList<Key,Comparator>::Node {
assert(n >= 0);
// Use an 'acquire load' so that we observe a fully initialized
// version of the returned Node.
- return reinterpret_cast<Node*>(next_[n].Acquire_Load());
+ return next_[n].load(std::memory_order_acquire);
}
void SetNext(int n, Node* x) {
assert(n >= 0);
// Use a 'release store' so that anybody who reads through this
// pointer observes a fully initialized version of the inserted node.
- next_[n].Release_Store(x);
+ next_[n].store(x, std::memory_order_release);
}
// No-barrier variants that can be safely used in a few locations.
Node* NoBarrier_Next(int n) {
assert(n >= 0);
- return reinterpret_cast<Node*>(next_[n].NoBarrier_Load());
+ return next_[n].load(std::memory_order_relaxed);
}
void NoBarrier_SetNext(int n, Node* x) {
assert(n >= 0);
- next_[n].NoBarrier_Store(x);
+ next_[n].store(x, std::memory_order_relaxed);
}
private:
// Array of length equal to the node height. next_[0] is lowest level link.
- port::AtomicPointer next_[1];
+ std::atomic<Node*> next_[1];
};
template<typename Key, class Comparator>
-typename SkipList<Key,Comparator>::Node*
-SkipList<Key,Comparator>::NewNode(const Key& key, int height) {
- char* mem = arena_->AllocateAligned(
- sizeof(Node) + sizeof(port::AtomicPointer) * (height - 1));
- return new (mem) Node(key);
+typename SkipList<Key, Comparator>::Node*
+SkipList<Key, Comparator>::NewNode(const Key& key, int height) {
+ char* const node_memory = arena_->AllocateAligned(
+ sizeof(Node) + sizeof(std::atomic<Node*>) * (height - 1));
+ return new (node_memory) Node(key);
}
template<typename Key, class Comparator>
-inline SkipList<Key,Comparator>::Iterator::Iterator(const SkipList* list) {
+inline SkipList<Key, Comparator>::Iterator::Iterator(const SkipList* list) {
list_ = list;
node_ = nullptr;
}
template<typename Key, class Comparator>
-inline bool SkipList<Key,Comparator>::Iterator::Valid() const {
+inline bool SkipList<Key, Comparator>::Iterator::Valid() const {
return node_ != nullptr;
}
template<typename Key, class Comparator>
-inline const Key& SkipList<Key,Comparator>::Iterator::key() const {
+inline const Key& SkipList<Key, Comparator>::Iterator::key() const {
assert(Valid());
return node_->key;
}
template<typename Key, class Comparator>
-inline void SkipList<Key,Comparator>::Iterator::Next() {
+inline void SkipList<Key, Comparator>::Iterator::Next() {
assert(Valid());
node_ = node_->Next(0);
}
template<typename Key, class Comparator>
-inline void SkipList<Key,Comparator>::Iterator::Prev() {
+inline void SkipList<Key, Comparator>::Iterator::Prev() {
// Instead of using explicit "prev" links, we just search for the
// last node that falls before key.
assert(Valid());
@@ -222,17 +222,17 @@ inline void SkipList<Key,Comparator>::Iterator::Prev() {
}
template<typename Key, class Comparator>
-inline void SkipList<Key,Comparator>::Iterator::Seek(const Key& target) {
+inline void SkipList<Key, Comparator>::Iterator::Seek(const Key& target) {
node_ = list_->FindGreaterOrEqual(target, nullptr);
}
template<typename Key, class Comparator>
-inline void SkipList<Key,Comparator>::Iterator::SeekToFirst() {
+inline void SkipList<Key, Comparator>::Iterator::SeekToFirst() {
node_ = list_->head_->Next(0);
}
template<typename Key, class Comparator>
-inline void SkipList<Key,Comparator>::Iterator::SeekToLast() {
+inline void SkipList<Key, Comparator>::Iterator::SeekToLast() {
node_ = list_->FindLast();
if (node_ == list_->head_) {
node_ = nullptr;
@@ -240,7 +240,7 @@ inline void SkipList<Key,Comparator>::Iterator::SeekToLast() {
}
template<typename Key, class Comparator>
-int SkipList<Key,Comparator>::RandomHeight() {
+int SkipList<Key, Comparator>::RandomHeight() {
// Increase height with probability 1 in kBranching
static const unsigned int kBranching = 4;
int height = 1;
@@ -253,14 +253,15 @@ int SkipList<Key,Comparator>::RandomHeight() {
}
template<typename Key, class Comparator>
-bool SkipList<Key,Comparator>::KeyIsAfterNode(const Key& key, Node* n) const {
+bool SkipList<Key, Comparator>::KeyIsAfterNode(const Key& key, Node* n) const {
// null n is considered infinite
return (n != nullptr) && (compare_(n->key, key) < 0);
}
template<typename Key, class Comparator>
-typename SkipList<Key,Comparator>::Node* SkipList<Key,Comparator>::FindGreaterOrEqual(const Key& key, Node** prev)
- const {
+typename SkipList<Key, Comparator>::Node*
+SkipList<Key, Comparator>::FindGreaterOrEqual(const Key& key,
+ Node** prev) const {
Node* x = head_;
int level = GetMaxHeight() - 1;
while (true) {
@@ -281,8 +282,8 @@ typename SkipList<Key,Comparator>::Node* SkipList<Key,Comparator>::FindGreaterOr
}
template<typename Key, class Comparator>
-typename SkipList<Key,Comparator>::Node*
-SkipList<Key,Comparator>::FindLessThan(const Key& key) const {
+typename SkipList<Key, Comparator>::Node*
+SkipList<Key, Comparator>::FindLessThan(const Key& key) const {
Node* x = head_;
int level = GetMaxHeight() - 1;
while (true) {
@@ -302,7 +303,7 @@ SkipList<Key,Comparator>::FindLessThan(const Key& key) const {
}
template<typename Key, class Comparator>
-typename SkipList<Key,Comparator>::Node* SkipList<Key,Comparator>::FindLast()
+typename SkipList<Key, Comparator>::Node* SkipList<Key, Comparator>::FindLast()
const {
Node* x = head_;
int level = GetMaxHeight() - 1;
@@ -322,11 +323,11 @@ typename SkipList<Key,Comparator>::Node* SkipList<Key,Comparator>::FindLast()
}
template<typename Key, class Comparator>
-SkipList<Key,Comparator>::SkipList(Comparator cmp, Arena* arena)
+SkipList<Key, Comparator>::SkipList(Comparator cmp, Arena* arena)
: compare_(cmp),
arena_(arena),
head_(NewNode(0 /* any key will do */, kMaxHeight)),
- max_height_(reinterpret_cast<void*>(1)),
+ max_height_(1),
rnd_(0xdeadbeef) {
for (int i = 0; i < kMaxHeight; i++) {
head_->SetNext(i, nullptr);
@@ -334,7 +335,7 @@ SkipList<Key,Comparator>::SkipList(Comparator cmp, Arena* arena)
}
template<typename Key, class Comparator>
-void SkipList<Key,Comparator>::Insert(const Key& key) {
+void SkipList<Key, Comparator>::Insert(const Key& key) {
// TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual()
// here since Insert() is externally synchronized.
Node* prev[kMaxHeight];
@@ -348,8 +349,6 @@ void SkipList<Key,Comparator>::Insert(const Key& key) {
for (int i = GetMaxHeight(); i < height; i++) {
prev[i] = head_;
}
- //fprintf(stderr, "Change height from %d to %d\n", max_height_, height);
-
// It is ok to mutate max_height_ without any synchronization
// with concurrent readers. A concurrent reader that observes
// the new value of max_height_ will see either the old value of
@@ -357,7 +356,7 @@ void SkipList<Key,Comparator>::Insert(const Key& key) {
// the loop below. In the former case the reader will
// immediately drop to the next level since nullptr sorts after all
// keys. In the latter case the reader will use the new node.
- max_height_.NoBarrier_Store(reinterpret_cast<void*>(height));
+ max_height_.store(height, std::memory_order_relaxed);
}
x = NewNode(key, height);
@@ -370,7 +369,7 @@ void SkipList<Key,Comparator>::Insert(const Key& key) {
}
template<typename Key, class Comparator>
-bool SkipList<Key,Comparator>::Contains(const Key& key) const {
+bool SkipList<Key, Comparator>::Contains(const Key& key) const {
Node* x = FindGreaterOrEqual(key, nullptr);
if (x != nullptr && Equal(key, x->key)) {
return true;
diff --git a/db/skiplist_test.cc b/db/skiplist_test.cc
index 24e0887..c4cf146 100644
--- a/db/skiplist_test.cc
+++ b/db/skiplist_test.cc
@@ -3,7 +3,10 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/skiplist.h"
+
+#include <atomic>
#include <set>
+
#include "leveldb/env.h"
#include "port/port.h"
#include "port/thread_annotations.h"
@@ -188,12 +191,12 @@ class ConcurrentTest {
// Per-key generation
struct State {
- port::AtomicPointer generation[K];
- void Set(int k, intptr_t v) {
- generation[k].Release_Store(reinterpret_cast<void*>(v));
+ std::atomic<int> generation[K];
+ void Set(int k, int v) {
+ generation[k].store(v, std::memory_order_release);
}
- intptr_t Get(int k) {
- return reinterpret_cast<intptr_t>(generation[k].Acquire_Load());
+ int Get(int k) {
+ return generation[k].load(std::memory_order_acquire);
}
State() {
@@ -300,7 +303,7 @@ class TestState {
public:
ConcurrentTest t_;
int seed_;
- port::AtomicPointer quit_flag_;
+ std::atomic<bool> quit_flag_;
enum ReaderState {
STARTING,
@@ -310,7 +313,7 @@ class TestState {
explicit TestState(int s)
: seed_(s),
- quit_flag_(nullptr),
+ quit_flag_(false),
state_(STARTING),
state_cv_(&mu_) {}
@@ -340,7 +343,7 @@ static void ConcurrentReader(void* arg) {
Random rnd(state->seed_);
int64_t reads = 0;
state->Change(TestState::RUNNING);
- while (!state->quit_flag_.Acquire_Load()) {
+ while (!state->quit_flag_.load(std::memory_order_acquire)) {
state->t_.ReadStep(&rnd);
++reads;
}
@@ -362,7 +365,7 @@ static void RunConcurrent(int run) {
for (int i = 0; i < kSize; i++) {
state.t_.WriteStep(&rnd);
}
- state.quit_flag_.Release_Store(&state); // Any non-null arg will do
+ state.quit_flag_.store(true, std::memory_order_release);
state.Wait(TestState::DONE);
}
}