summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorcostan <costan@google.com>2019-03-11 13:04:53 -0700
committerChris Mumford <cmumford@google.com>2019-03-11 13:41:25 -0700
commit7d8e41e49b8fddda66a2c5f0a6a47f1a916e8d26 (patch)
tree0a5556aaf20ba27bd6ea0ab3792d1366e60b2f81
parentdd906262fd364c08a652dfa914f9995f6b7608a9 (diff)
downloadleveldb-7d8e41e49b8fddda66a2c5f0a6a47f1a916e8d26.tar.gz
leveldb: Replace AtomicPointer with std::atomic.
This CL removes AtomicPointer from leveldb's port interface. Its usage is replaced with std::atomic<> from the C++11 standard library. AtomicPointer was used to wrap flags, numbers, and pointers, so its instances are replaced with std::atomic<bool>, std::atomic<int>, std::atomic<size_t> and std::atomic<Node*>. This CL does not revise the memory ordering. AtomicPointer's methods are replaced mechanically with their std::atomic equivalents, even when the underlying usage is incorrect. (Example: DBImpl::has_imm_ is written using release stores, even though it is always read using relaxed ordering.) Revising the memory ordering is left for future CLs. ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=237865146
-rw-r--r--db/db_bench.cc20
-rw-r--r--db/db_impl.cc33
-rw-r--r--db/db_impl.h7
-rw-r--r--db/db_test.cc135
-rw-r--r--db/skiplist.h77
-rw-r--r--db/skiplist_test.cc21
-rw-r--r--port/atomic_pointer.h171
-rw-r--r--port/port_example.h29
-rw-r--r--port/port_stdcxx.h1
-rw-r--r--util/arena.cc5
-rw-r--r--util/arena.h17
-rw-r--r--util/env_test.cc57
-rw-r--r--util/env_windows.cc56
13 files changed, 210 insertions, 419 deletions
diff --git a/db/db_bench.cc b/db/db_bench.cc
index 115cf45..f9403f4 100644
--- a/db/db_bench.cc
+++ b/db/db_bench.cc
@@ -34,7 +34,6 @@
// seekrandom -- N random seeks
// open -- cost of opening a DB
// crc32c -- repeated crc32c of 4K of data
-// acquireload -- load N*1000 times
// Meta operations:
// compact -- Compact the entire DB
// stats -- Print DB stats
@@ -57,7 +56,6 @@ static const char* FLAGS_benchmarks =
"crc32c,"
"snappycomp,"
"snappyuncomp,"
- "acquireload,"
;
// Number of key/values to place in database
@@ -510,8 +508,6 @@ class Benchmark {
method = &Benchmark::Compact;
} else if (name == Slice("crc32c")) {
method = &Benchmark::Crc32c;
- } else if (name == Slice("acquireload")) {
- method = &Benchmark::AcquireLoad;
} else if (name == Slice("snappycomp")) {
method = &Benchmark::SnappyCompress;
} else if (name == Slice("snappyuncomp")) {
@@ -639,22 +635,6 @@ class Benchmark {
thread->stats.AddMessage(label);
}
- void AcquireLoad(ThreadState* thread) {
- int dummy;
- port::AtomicPointer ap(&dummy);
- int count = 0;
- void *ptr = nullptr;
- thread->stats.AddMessage("(each op is 1000 loads)");
- while (count < 100000) {
- for (int i = 0; i < 1000; i++) {
- ptr = ap.Acquire_Load();
- }
- count++;
- thread->stats.FinishedSingleOp();
- }
- if (ptr == nullptr) exit(1); // Disable unused variable warning.
- }
-
void SnappyCompress(ThreadState* thread) {
RandomGenerator gen;
Slice input = gen.Generate(Options().block_size);
diff --git a/db/db_impl.cc b/db/db_impl.cc
index fefb883..3468862 100644
--- a/db/db_impl.cc
+++ b/db/db_impl.cc
@@ -8,6 +8,7 @@
#include <stdio.h>
#include <algorithm>
+#include <atomic>
#include <set>
#include <string>
#include <vector>
@@ -132,10 +133,11 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
dbname_(dbname),
table_cache_(new TableCache(dbname_, options_, TableCacheSize(options_))),
db_lock_(nullptr),
- shutting_down_(nullptr),
+ shutting_down_(false),
background_work_finished_signal_(&mutex_),
mem_(nullptr),
imm_(nullptr),
+ has_imm_(false),
logfile_(nullptr),
logfile_number_(0),
log_(nullptr),
@@ -144,14 +146,12 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
background_compaction_scheduled_(false),
manual_compaction_(nullptr),
versions_(new VersionSet(dbname_, &options_, table_cache_,
- &internal_comparator_)) {
- has_imm_.Release_Store(nullptr);
-}
+ &internal_comparator_)) {}
DBImpl::~DBImpl() {
- // Wait for background work to finish
+ // Wait for background work to finish.
mutex_.Lock();
- shutting_down_.Release_Store(this); // Any non-null value is ok
+ shutting_down_.store(true, std::memory_order_release);
while (background_compaction_scheduled_) {
background_work_finished_signal_.Wait();
}
@@ -547,7 +547,7 @@ void DBImpl::CompactMemTable() {
Status s = WriteLevel0Table(imm_, &edit, base);
base->Unref();
- if (s.ok() && shutting_down_.Acquire_Load()) {
+ if (s.ok() && shutting_down_.load(std::memory_order_acquire)) {
s = Status::IOError("Deleting DB during memtable compaction");
}
@@ -562,7 +562,7 @@ void DBImpl::CompactMemTable() {
// Commit to the new state
imm_->Unref();
imm_ = nullptr;
- has_imm_.Release_Store(nullptr);
+ has_imm_.store(false, std::memory_order_release);
DeleteObsoleteFiles();
} else {
RecordBackgroundError(s);
@@ -610,7 +610,8 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,
}
MutexLock l(&mutex_);
- while (!manual.done && !shutting_down_.Acquire_Load() && bg_error_.ok()) {
+ while (!manual.done && !shutting_down_.load(std::memory_order_acquire) &&
+ bg_error_.ok()) {
if (manual_compaction_ == nullptr) { // Idle
manual_compaction_ = &manual;
MaybeScheduleCompaction();
@@ -652,7 +653,7 @@ void DBImpl::MaybeScheduleCompaction() {
mutex_.AssertHeld();
if (background_compaction_scheduled_) {
// Already scheduled
- } else if (shutting_down_.Acquire_Load()) {
+ } else if (shutting_down_.load(std::memory_order_acquire)) {
// DB is being deleted; no more background compactions
} else if (!bg_error_.ok()) {
// Already got an error; no more changes
@@ -673,7 +674,7 @@ void DBImpl::BGWork(void* db) {
void DBImpl::BackgroundCall() {
MutexLock l(&mutex_);
assert(background_compaction_scheduled_);
- if (shutting_down_.Acquire_Load()) {
+ if (shutting_down_.load(std::memory_order_acquire)) {
// No more background work when shutting down.
} else if (!bg_error_.ok()) {
// No more background work after a background error.
@@ -752,7 +753,7 @@ void DBImpl::BackgroundCompaction() {
if (status.ok()) {
// Done
- } else if (shutting_down_.Acquire_Load()) {
+ } else if (shutting_down_.load(std::memory_order_acquire)) {
// Ignore compaction errors found during shutting down
} else {
Log(options_.info_log,
@@ -919,9 +920,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
std::string current_user_key;
bool has_current_user_key = false;
SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
- for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
+ for (; input->Valid() && !shutting_down_.load(std::memory_order_acquire); ) {
// Prioritize immutable compaction work
- if (has_imm_.NoBarrier_Load() != nullptr) {
+ if (has_imm_.load(std::memory_order_relaxed)) {
const uint64_t imm_start = env_->NowMicros();
mutex_.Lock();
if (imm_ != nullptr) {
@@ -1014,7 +1015,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
input->Next();
}
- if (status.ok() && shutting_down_.Acquire_Load()) {
+ if (status.ok() && shutting_down_.load(std::memory_order_acquire)) {
status = Status::IOError("Deleting DB during compaction");
}
if (status.ok() && compact->builder != nullptr) {
@@ -1378,7 +1379,7 @@ Status DBImpl::MakeRoomForWrite(bool force) {
logfile_number_ = new_log_number;
log_ = new log::Writer(lfile);
imm_ = mem_;
- has_imm_.Release_Store(imm_);
+ has_imm_.store(true, std::memory_order_release);
mem_ = new MemTable(internal_comparator_);
mem_->Ref();
force = false; // Do not force another compaction if have room
diff --git a/db/db_impl.h b/db/db_impl.h
index 00e800a..ca00d42 100644
--- a/db/db_impl.h
+++ b/db/db_impl.h
@@ -5,8 +5,11 @@
#ifndef STORAGE_LEVELDB_DB_DB_IMPL_H_
#define STORAGE_LEVELDB_DB_DB_IMPL_H_
+#include <atomic>
#include <deque>
#include <set>
+#include <string>
+
#include "db/dbformat.h"
#include "db/log_writer.h"
#include "db/snapshot.h"
@@ -136,11 +139,11 @@ class DBImpl : public DB {
// State below is protected by mutex_
port::Mutex mutex_;
- port::AtomicPointer shutting_down_;
+ std::atomic<bool> shutting_down_;
port::CondVar background_work_finished_signal_ GUARDED_BY(mutex_);
MemTable* mem_;
MemTable* imm_ GUARDED_BY(mutex_); // Memtable being compacted
- port::AtomicPointer has_imm_; // So bg thread can detect non-null imm_
+ std::atomic<bool> has_imm_; // So bg thread can detect non-null imm_
WritableFile* logfile_;
uint64_t logfile_number_ GUARDED_BY(mutex_);
log::Writer* log_;
diff --git a/db/db_test.cc b/db/db_test.cc
index 894ed23..e889a74 100644
--- a/db/db_test.cc
+++ b/db/db_test.cc
@@ -2,6 +2,9 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
+#include <atomic>
+#include <string>
+
#include "leveldb/db.h"
#include "leveldb/filter_policy.h"
#include "db/db_impl.h"
@@ -61,7 +64,7 @@ class AtomicCounter {
void DelayMilliseconds(int millis) {
Env::Default()->SleepForMicroseconds(millis * 1000);
}
-}
+} // namespace
// Test Env to override default Env behavior for testing.
class TestEnv : public EnvWrapper {
@@ -93,45 +96,45 @@ class TestEnv : public EnvWrapper {
bool ignore_dot_files_;
};
-// Special Env used to delay background operations
+// Special Env used to delay background operations.
class SpecialEnv : public EnvWrapper {
public:
// sstable/log Sync() calls are blocked while this pointer is non-null.
- port::AtomicPointer delay_data_sync_;
+ std::atomic<bool> delay_data_sync_;
// sstable/log Sync() calls return an error.
- port::AtomicPointer data_sync_error_;
+ std::atomic<bool> data_sync_error_;
// Simulate no-space errors while this pointer is non-null.
- port::AtomicPointer no_space_;
+ std::atomic<bool> no_space_;
// Simulate non-writable file system while this pointer is non-null.
- port::AtomicPointer non_writable_;
+ std::atomic<bool> non_writable_;
// Force sync of manifest files to fail while this pointer is non-null.
- port::AtomicPointer manifest_sync_error_;
+ std::atomic<bool> manifest_sync_error_;
// Force write to manifest files to fail while this pointer is non-null.
- port::AtomicPointer manifest_write_error_;
+ std::atomic<bool> manifest_write_error_;
bool count_random_reads_;
AtomicCounter random_read_counter_;
- explicit SpecialEnv(Env* base) : EnvWrapper(base) {
- delay_data_sync_.Release_Store(nullptr);
- data_sync_error_.Release_Store(nullptr);
- no_space_.Release_Store(nullptr);
- non_writable_.Release_Store(nullptr);
- count_random_reads_ = false;
- manifest_sync_error_.Release_Store(nullptr);
- manifest_write_error_.Release_Store(nullptr);
+ explicit SpecialEnv(Env* base) : EnvWrapper(base),
+ delay_data_sync_(false),
+ data_sync_error_(false),
+ no_space_(false),
+ non_writable_(false),
+ manifest_sync_error_(false),
+ manifest_write_error_(false),
+ count_random_reads_(false) {
}
Status NewWritableFile(const std::string& f, WritableFile** r) {
class DataFile : public WritableFile {
private:
- SpecialEnv* env_;
- WritableFile* base_;
+ SpecialEnv* const env_;
+ WritableFile* const base_;
public:
DataFile(SpecialEnv* env, WritableFile* base)
@@ -140,7 +143,7 @@ class SpecialEnv : public EnvWrapper {
}
~DataFile() { delete base_; }
Status Append(const Slice& data) {
- if (env_->no_space_.Acquire_Load() != nullptr) {
+ if (env_->no_space_.load(std::memory_order_acquire)) {
// Drop writes on the floor
return Status::OK();
} else {
@@ -150,10 +153,10 @@ class SpecialEnv : public EnvWrapper {
Status Close() { return base_->Close(); }
Status Flush() { return base_->Flush(); }
Status Sync() {
- if (env_->data_sync_error_.Acquire_Load() != nullptr) {
+ if (env_->data_sync_error_.load(std::memory_order_acquire)) {
return Status::IOError("simulated data sync error");
}
- while (env_->delay_data_sync_.Acquire_Load() != nullptr) {
+ while (env_->delay_data_sync_.load(std::memory_order_acquire)) {
DelayMilliseconds(100);
}
return base_->Sync();
@@ -167,7 +170,7 @@ class SpecialEnv : public EnvWrapper {
ManifestFile(SpecialEnv* env, WritableFile* b) : env_(env), base_(b) { }
~ManifestFile() { delete base_; }
Status Append(const Slice& data) {
- if (env_->manifest_write_error_.Acquire_Load() != nullptr) {
+ if (env_->manifest_write_error_.load(std::memory_order_acquire)) {
return Status::IOError("simulated writer error");
} else {
return base_->Append(data);
@@ -176,7 +179,7 @@ class SpecialEnv : public EnvWrapper {
Status Close() { return base_->Close(); }
Status Flush() { return base_->Flush(); }
Status Sync() {
- if (env_->manifest_sync_error_.Acquire_Load() != nullptr) {
+ if (env_->manifest_sync_error_.load(std::memory_order_acquire)) {
return Status::IOError("simulated sync error");
} else {
return base_->Sync();
@@ -184,7 +187,7 @@ class SpecialEnv : public EnvWrapper {
}
};
- if (non_writable_.Acquire_Load() != nullptr) {
+ if (non_writable_.load(std::memory_order_acquire)) {
return Status::IOError("simulated write error");
}
@@ -424,7 +427,7 @@ class DBTest {
ASSERT_TRUE(
db_->GetProperty("leveldb.num-files-at-level" + NumberToString(level),
&property));
- return atoi(property.c_str());
+ return std::stoi(property);
}
int TotalTableFiles() {
@@ -587,11 +590,13 @@ TEST(DBTest, GetFromImmutableLayer) {
ASSERT_OK(Put("foo", "v1"));
ASSERT_EQ("v1", Get("foo"));
- env_->delay_data_sync_.Release_Store(env_); // Block sync calls
- Put("k1", std::string(100000, 'x')); // Fill memtable
- Put("k2", std::string(100000, 'y')); // Trigger compaction
+ // Block sync calls.
+ env_->delay_data_sync_.store(true, std::memory_order_release);
+ Put("k1", std::string(100000, 'x')); // Fill memtable.
+ Put("k2", std::string(100000, 'y')); // Trigger compaction.
ASSERT_EQ("v1", Get("foo"));
- env_->delay_data_sync_.Release_Store(nullptr); // Release sync calls
+ // Release sync calls.
+ env_->delay_data_sync_.store(false, std::memory_order_release);
} while (ChangeOptions());
}
@@ -608,7 +613,7 @@ TEST(DBTest, GetMemUsage) {
ASSERT_OK(Put("foo", "v1"));
std::string val;
ASSERT_TRUE(db_->GetProperty("leveldb.approximate-memory-usage", &val));
- int mem_usage = atoi(val.c_str());
+ int mem_usage = std::stoi(val);
ASSERT_GT(mem_usage, 0);
ASSERT_LT(mem_usage, 5*1024*1024);
} while (ChangeOptions());
@@ -1106,7 +1111,7 @@ TEST(DBTest, RepeatedWritesToSameKey) {
for (int i = 0; i < 5 * kMaxFiles; i++) {
Put("key", value);
ASSERT_LE(TotalTableFiles(), kMaxFiles);
- fprintf(stderr, "after %d: %d files\n", int(i+1), TotalTableFiles());
+ fprintf(stderr, "after %d: %d files\n", i + 1, TotalTableFiles());
}
}
@@ -1271,7 +1276,7 @@ TEST(DBTest, IteratorPinsRef) {
// Write to force compactions
Put("foo", "newvalue1");
for (int i = 0; i < 100; i++) {
- ASSERT_OK(Put(Key(i), Key(i) + std::string(100000, 'v'))); // 100K values
+ ASSERT_OK(Put(Key(i), Key(i) + std::string(100000, 'v'))); // 100K values
}
Put("foo", "newvalue2");
@@ -1459,21 +1464,21 @@ TEST(DBTest, L0_CompactionBug_Issue44_a) {
TEST(DBTest, L0_CompactionBug_Issue44_b) {
Reopen();
- Put("","");
+ Put("", "");
Reopen();
Delete("e");
- Put("","");
+ Put("", "");
Reopen();
Put("c", "cv");
Reopen();
- Put("","");
+ Put("", "");
Reopen();
- Put("","");
+ Put("", "");
DelayMilliseconds(1000); // Wait for compaction to finish
Reopen();
- Put("d","dv");
+ Put("d", "dv");
Reopen();
- Put("","");
+ Put("", "");
Reopen();
Delete("d");
Delete("b");
@@ -1711,13 +1716,14 @@ TEST(DBTest, NoSpace) {
ASSERT_EQ("v1", Get("foo"));
Compact("a", "z");
const int num_files = CountFiles();
- env_->no_space_.Release_Store(env_); // Force out-of-space errors
+ // Force out-of-space errors.
+ env_->no_space_.store(true, std::memory_order_release);
for (int i = 0; i < 10; i++) {
for (int level = 0; level < config::kNumLevels-1; level++) {
dbfull()->TEST_CompactRange(level, nullptr, nullptr);
}
}
- env_->no_space_.Release_Store(nullptr);
+ env_->no_space_.store(false, std::memory_order_release);
ASSERT_LT(CountFiles(), num_files + 3);
}
@@ -1727,7 +1733,8 @@ TEST(DBTest, NonWritableFileSystem) {
options.env = env_;
Reopen(&options);
ASSERT_OK(Put("foo", "v1"));
- env_->non_writable_.Release_Store(env_); // Force errors for new files
+ // Force errors for new files.
+ env_->non_writable_.store(true, std::memory_order_release);
std::string big(100000, 'x');
int errors = 0;
for (int i = 0; i < 20; i++) {
@@ -1738,7 +1745,7 @@ TEST(DBTest, NonWritableFileSystem) {
}
}
ASSERT_GT(errors, 0);
- env_->non_writable_.Release_Store(nullptr);
+ env_->non_writable_.store(false, std::memory_order_release);
}
TEST(DBTest, WriteSyncError) {
@@ -1748,7 +1755,7 @@ TEST(DBTest, WriteSyncError) {
Options options = CurrentOptions();
options.env = env_;
Reopen(&options);
- env_->data_sync_error_.Release_Store(env_);
+ env_->data_sync_error_.store(true, std::memory_order_release);
// (b) Normal write should succeed
WriteOptions w;
@@ -1762,7 +1769,7 @@ TEST(DBTest, WriteSyncError) {
ASSERT_EQ("NOT_FOUND", Get("k2"));
// (d) make sync behave normally
- env_->data_sync_error_.Release_Store(nullptr);
+ env_->data_sync_error_.store(false, std::memory_order_release);
// (e) Do a non-sync write; should fail
w.sync = false;
@@ -1782,7 +1789,7 @@ TEST(DBTest, ManifestWriteError) {
// We iterate twice. In the second iteration, everything is the
// same except the log record never makes it to the MANIFEST file.
for (int iter = 0; iter < 2; iter++) {
- port::AtomicPointer* error_type = (iter == 0)
+ std::atomic<bool>* error_type = (iter == 0)
? &env_->manifest_sync_error_
: &env_->manifest_write_error_;
@@ -1802,12 +1809,12 @@ TEST(DBTest, ManifestWriteError) {
ASSERT_EQ(NumTableFilesAtLevel(last), 1); // foo=>bar is now in last level
// Merging compaction (will fail)
- error_type->Release_Store(env_);
+ error_type->store(true, std::memory_order_release);
dbfull()->TEST_CompactRange(last, nullptr, nullptr); // Should fail
ASSERT_EQ("bar", Get("foo"));
// Recovery: should not lose data
- error_type->Release_Store(nullptr);
+ error_type->store(false, std::memory_order_release);
Reopen(&options);
ASSERT_EQ("bar", Get("foo"));
}
@@ -1878,7 +1885,7 @@ TEST(DBTest, BloomFilter) {
dbfull()->TEST_CompactMemTable();
// Prevent auto compactions triggered by seeks
- env_->delay_data_sync_.Release_Store(env_);
+ env_->delay_data_sync_.store(true, std::memory_order_release);
// Lookup present keys. Should rarely read from small sstable.
env_->random_read_counter_.Reset();
@@ -1899,7 +1906,7 @@ TEST(DBTest, BloomFilter) {
fprintf(stderr, "%d missing => %d reads\n", N, reads);
ASSERT_LE(reads, 3*N/100);
- env_->delay_data_sync_.Release_Store(nullptr);
+ env_->delay_data_sync_.store(false, std::memory_order_release);
Close();
delete options.block_cache;
delete options.filter_policy;
@@ -1914,9 +1921,9 @@ static const int kNumKeys = 1000;
struct MTState {
DBTest* test;
- port::AtomicPointer stop;
- port::AtomicPointer counter[kNumThreads];
- port::AtomicPointer thread_done[kNumThreads];
+ std::atomic<bool> stop;
+ std::atomic<int> counter[kNumThreads];
+ std::atomic<bool> thread_done[kNumThreads];
};
struct MTThread {
@@ -1928,13 +1935,13 @@ static void MTThreadBody(void* arg) {
MTThread* t = reinterpret_cast<MTThread*>(arg);
int id = t->id;
DB* db = t->state->test->db_;
- uintptr_t counter = 0;
+ int counter = 0;
fprintf(stderr, "... starting thread %d\n", id);
Random rnd(1000 + id);
std::string value;
char valbuf[1500];
- while (t->state->stop.Acquire_Load() == nullptr) {
- t->state->counter[id].Release_Store(reinterpret_cast<void*>(counter));
+ while (!t->state->stop.load(std::memory_order_acquire)) {
+ t->state->counter[id].store(counter, std::memory_order_release);
int key = rnd.Uniform(kNumKeys);
char keybuf[20];
@@ -1959,14 +1966,13 @@ static void MTThreadBody(void* arg) {
ASSERT_EQ(k, key);
ASSERT_GE(w, 0);
ASSERT_LT(w, kNumThreads);
- ASSERT_LE(static_cast<uintptr_t>(c), reinterpret_cast<uintptr_t>(
- t->state->counter[w].Acquire_Load()));
+ ASSERT_LE(c, t->state->counter[w].load(std::memory_order_acquire));
}
}
counter++;
}
- t->state->thread_done[id].Release_Store(t);
- fprintf(stderr, "... stopping thread %d after %d ops\n", id, int(counter));
+ t->state->thread_done[id].store(true, std::memory_order_release);
+ fprintf(stderr, "... stopping thread %d after %d ops\n", id, counter);
}
} // namespace
@@ -1976,10 +1982,10 @@ TEST(DBTest, MultiThreaded) {
// Initialize state
MTState mt;
mt.test = this;
- mt.stop.Release_Store(0);
+ mt.stop.store(false, std::memory_order_release);
for (int id = 0; id < kNumThreads; id++) {
- mt.counter[id].Release_Store(0);
- mt.thread_done[id].Release_Store(0);
+ mt.counter[id].store(false, std::memory_order_release);
+ mt.thread_done[id].store(false, std::memory_order_release);
}
// Start threads
@@ -1994,9 +2000,9 @@ TEST(DBTest, MultiThreaded) {
DelayMilliseconds(kTestSeconds * 1000);
// Stop the threads and wait for them to finish
- mt.stop.Release_Store(&mt);
+ mt.stop.store(true, std::memory_order_release);
for (int id = 0; id < kNumThreads; id++) {
- while (mt.thread_done[id].Acquire_Load() == nullptr) {
+ while (!mt.thread_done[id].load(std::memory_order_acquire)) {
DelayMilliseconds(100);
}
}
@@ -2100,6 +2106,7 @@ class ModelDB: public DB {
virtual Slice key() const { return iter_->first; }
virtual Slice value() const { return iter_->second; }
virtual Status status() const { return Status::OK(); }
+
private:
const KVMap* const map_;
const bool owned_; // Do we own map_
diff --git a/db/skiplist.h b/db/skiplist.h
index b806ce0..7ac914b 100644
--- a/db/skiplist.h
+++ b/db/skiplist.h
@@ -27,9 +27,10 @@
//
// ... prev vs. next pointer ordering ...
-#include <assert.h>
-#include <stdlib.h>
-#include "port/port.h"
+#include <atomic>
+#include <cassert>
+#include <cstdlib>
+
#include "util/arena.h"
#include "util/random.h"
@@ -105,11 +106,10 @@ class SkipList {
// Modified only by Insert(). Read racily by readers, but stale
// values are ok.
- port::AtomicPointer max_height_; // Height of the entire list
+ std::atomic<int> max_height_; // Height of the entire list
inline int GetMaxHeight() const {
- return static_cast<int>(
- reinterpret_cast<intptr_t>(max_height_.NoBarrier_Load()));
+ return max_height_.load(std::memory_order_relaxed);
}
// Read/written only by Insert().
@@ -144,7 +144,7 @@ class SkipList {
// Implementation details follow
template<typename Key, class Comparator>
-struct SkipList<Key,Comparator>::Node {
+struct SkipList<Key, Comparator>::Node {
explicit Node(const Key& k) : key(k) { }
Key const key;
@@ -155,63 +155,63 @@ struct SkipList<Key,Comparator>::Node {
assert(n >= 0);
// Use an 'acquire load' so that we observe a fully initialized
// version of the returned Node.
- return reinterpret_cast<Node*>(next_[n].Acquire_Load());
+ return next_[n].load(std::memory_order_acquire);
}
void SetNext(int n, Node* x) {
assert(n >= 0);
// Use a 'release store' so that anybody who reads through this
// pointer observes a fully initialized version of the inserted node.
- next_[n].Release_Store(x);
+ next_[n].store(x, std::memory_order_release);
}
// No-barrier variants that can be safely used in a few locations.
Node* NoBarrier_Next(int n) {
assert(n >= 0);
- return reinterpret_cast<Node*>(next_[n].NoBarrier_Load());
+ return next_[n].load(std::memory_order_relaxed);
}
void NoBarrier_SetNext(int n, Node* x) {
assert(n >= 0);
- next_[n].NoBarrier_Store(x);
+ next_[n].store(x, std::memory_order_relaxed);
}
private:
// Array of length equal to the node height. next_[0] is lowest level link.
- port::AtomicPointer next_[1];
+ std::atomic<Node*> next_[1];
};
template<typename Key, class Comparator>
-typename SkipList<Key,Comparator>::Node*
-SkipList<Key,Comparator>::NewNode(const Key& key, int height) {
- char* mem = arena_->AllocateAligned(
- sizeof(Node) + sizeof(port::AtomicPointer) * (height - 1));
- return new (mem) Node(key);
+typename SkipList<Key, Comparator>::Node*
+SkipList<Key, Comparator>::NewNode(const Key& key, int height) {
+ char* const node_memory = arena_->AllocateAligned(
+ sizeof(Node) + sizeof(std::atomic<Node*>) * (height - 1));
+ return new (node_memory) Node(key);
}
template<typename Key, class Comparator>
-inline SkipList<Key,Comparator>::Iterator::Iterator(const SkipList* list) {
+inline SkipList<Key, Comparator>::Iterator::Iterator(const SkipList* list) {
list_ = list;
node_ = nullptr;
}
template<typename Key, class Comparator>
-inline bool SkipList<Key,Comparator>::Iterator::Valid() const {
+inline bool SkipList<Key, Comparator>::Iterator::Valid() const {
return node_ != nullptr;
}
template<typename Key, class Comparator>
-inline const Key& SkipList<Key,Comparator>::Iterator::key() const {
+inline const Key& SkipList<Key, Comparator>::Iterator::key() const {
assert(Valid());
return node_->key;
}
template<typename Key, class Comparator>
-inline void SkipList<Key,Comparator>::Iterator::Next() {
+inline void SkipList<Key, Comparator>::Iterator::Next() {
assert(Valid());
node_ = node_->Next(0);
}
template<typename Key, class Comparator>
-inline void SkipList<Key,Comparator>::Iterator::Prev() {
+inline void SkipList<Key, Comparator>::Iterator::Prev() {
// Instead of using explicit "prev" links, we just search for the
// last node that falls before key.
assert(Valid());
@@ -222,17 +222,17 @@ inline void SkipList<Key,Comparator>::Iterator::Prev() {
}
template<typename Key, class Comparator>
-inline void SkipList<Key,Comparator>::Iterator::Seek(const Key& target) {
+inline void SkipList<Key, Comparator>::Iterator::Seek(const Key& target) {
node_ = list_->FindGreaterOrEqual(target, nullptr);
}
template<typename Key, class Comparator>
-inline void SkipList<Key,Comparator>::Iterator::SeekToFirst() {
+inline void SkipList<Key, Comparator>::Iterator::SeekToFirst() {
node_ = list_->head_->Next(0);
}
template<typename Key, class Comparator>
-inline void SkipList<Key,Comparator>::Iterator::SeekToLast() {
+inline void SkipList<Key, Comparator>::Iterator::SeekToLast() {
node_ = list_->FindLast();
if (node_ == list_->head_) {
node_ = nullptr;
@@ -240,7 +240,7 @@ inline void SkipList<Key,Comparator>::Iterator::SeekToLast() {
}
template<typename Key, class Comparator>
-int SkipList<Key,Comparator>::RandomHeight() {
+int SkipList<Key, Comparator>::RandomHeight() {
// Increase height with probability 1 in kBranching
static const unsigned int kBranching = 4;
int height = 1;
@@ -253,14 +253,15 @@ int SkipList<Key,Comparator>::RandomHeight() {
}
template<typename Key, class Comparator>
-bool SkipList<Key,Comparator>::KeyIsAfterNode(const Key& key, Node* n) const {
+bool SkipList<Key, Comparator>::KeyIsAfterNode(const Key& key, Node* n) const {
// null n is considered infinite
return (n != nullptr) && (compare_(n->key, key) < 0);
}
template<typename Key, class Comparator>
-typename SkipList<Key,Comparator>::Node* SkipList<Key,Comparator>::FindGreaterOrEqual(const Key& key, Node** prev)
- const {
+typename SkipList<Key, Comparator>::Node*
+SkipList<Key, Comparator>::FindGreaterOrEqual(const Key& key,
+ Node** prev) const {
Node* x = head_;
int level = GetMaxHeight() - 1;
while (true) {
@@ -281,8 +282,8 @@ typename SkipList<Key,Comparator>::Node* SkipList<Key,Comparator>::FindGreaterOr
}
template<typename Key, class Comparator>
-typename SkipList<Key,Comparator>::Node*
-SkipList<Key,Comparator>::FindLessThan(const Key& key) const {
+typename SkipList<Key, Comparator>::Node*
+SkipList<Key, Comparator>::FindLessThan(const Key& key) const {
Node* x = head_;
int level = GetMaxHeight() - 1;
while (true) {
@@ -302,7 +303,7 @@ SkipList<Key,Comparator>::FindLessThan(const Key& key) const {
}
template<typename Key, class Comparator>
-typename SkipList<Key,Comparator>::Node* SkipList<Key,Comparator>::FindLast()
+typename SkipList<Key, Comparator>::Node* SkipList<Key, Comparator>::FindLast()
const {
Node* x = head_;
int level = GetMaxHeight() - 1;
@@ -322,11 +323,11 @@ typename SkipList<Key,Comparator>::Node* SkipList<Key,Comparator>::FindLast()
}
template<typename Key, class Comparator>
-SkipList<Key,Comparator>::SkipList(Comparator cmp, Arena* arena)
+SkipList<Key, Comparator>::SkipList(Comparator cmp, Arena* arena)
: compare_(cmp),
arena_(arena),
head_(NewNode(0 /* any key will do */, kMaxHeight)),
- max_height_(reinterpret_cast<void*>(1)),
+ max_height_(1),
rnd_(0xdeadbeef) {
for (int i = 0; i < kMaxHeight; i++) {
head_->SetNext(i, nullptr);
@@ -334,7 +335,7 @@ SkipList<Key,Comparator>::SkipList(Comparator cmp, Arena* arena)
}
template<typename Key, class Comparator>
-void SkipList<Key,Comparator>::Insert(const Key& key) {
+void SkipList<Key, Comparator>::Insert(const Key& key) {
// TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual()
// here since Insert() is externally synchronized.
Node* prev[kMaxHeight];
@@ -348,8 +349,6 @@ void SkipList<Key,Comparator>::Insert(const Key& key) {
for (int i = GetMaxHeight(); i < height; i++) {
prev[i] = head_;
}
- //fprintf(stderr, "Change height from %d to %d\n", max_height_, height);
-
// It is ok to mutate max_height_ without any synchronization
// with concurrent readers. A concurrent reader that observes
// the new value of max_height_ will see either the old value of
@@ -357,7 +356,7 @@ void SkipList<Key,Comparator>::Insert(const Key& key) {
// the loop below. In the former case the reader will
// immediately drop to the next level since nullptr sorts after all
// keys. In the latter case the reader will use the new node.
- max_height_.NoBarrier_Store(reinterpret_cast<void*>(height));
+ max_height_.store(height, std::memory_order_relaxed);
}
x = NewNode(key, height);
@@ -370,7 +369,7 @@ void SkipList<Key,Comparator>::Insert(const Key& key) {
}
template<typename Key, class Comparator>
-bool SkipList<Key,Comparator>::Contains(const Key& key) const {
+bool SkipList<Key, Comparator>::Contains(const Key& key) const {
Node* x = FindGreaterOrEqual(key, nullptr);
if (x != nullptr && Equal(key, x->key)) {
return true;
diff --git a/db/skiplist_test.cc b/db/skiplist_test.cc
index 24e0887..c4cf146 100644
--- a/db/skiplist_test.cc
+++ b/db/skiplist_test.cc
@@ -3,7 +3,10 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/skiplist.h"
+
+#include <atomic>
#include <set>
+
#include "leveldb/env.h"
#include "port/port.h"
#include "port/thread_annotations.h"
@@ -188,12 +191,12 @@ class ConcurrentTest {
// Per-key generation
struct State {
- port::AtomicPointer generation[K];
- void Set(int k, intptr_t v) {
- generation[k].Release_Store(reinterpret_cast<void*>(v));
+ std::atomic<int> generation[K];
+ void Set(int k, int v) {
+ generation[k].store(v, std::memory_order_release);
}
- intptr_t Get(int k) {
- return reinterpret_cast<intptr_t>(generation[k].Acquire_Load());
+ int Get(int k) {
+ return generation[k].load(std::memory_order_acquire);
}
State() {
@@ -300,7 +303,7 @@ class TestState {
public:
ConcurrentTest t_;
int seed_;
- port::AtomicPointer quit_flag_;
+ std::atomic<bool> quit_flag_;
enum ReaderState {
STARTING,
@@ -310,7 +313,7 @@ class TestState {
explicit TestState(int s)
: seed_(s),
- quit_flag_(nullptr),
+ quit_flag_(false),
state_(STARTING),
state_cv_(&mu_) {}
@@ -340,7 +343,7 @@ static void ConcurrentReader(void* arg) {
Random rnd(state->seed_);
int64_t reads = 0;
state->Change(TestState::RUNNING);
- while (!state->quit_flag_.Acquire_Load()) {
+ while (!state->quit_flag_.load(std::memory_order_acquire)) {
state->t_.ReadStep(&rnd);
++reads;
}
@@ -362,7 +365,7 @@ static void RunConcurrent(int run) {
for (int i = 0; i < kSize; i++) {
state.t_.WriteStep(&rnd);
}
- state.quit_flag_.Release_Store(&state); // Any non-null arg will do
+ state.quit_flag_.store(true, std::memory_order_release);
state.Wait(TestState::DONE);
}
}
diff --git a/port/atomic_pointer.h b/port/atomic_pointer.h
deleted file mode 100644
index d906f63..0000000
--- a/port/atomic_pointer.h
+++ /dev/null
@@ -1,171 +0,0 @@
-// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file. See the AUTHORS file for names of contributors.
-
-// AtomicPointer provides storage for a lock-free pointer.
-// Platform-dependent implementation of AtomicPointer:
-// - If the platform provides a cheap barrier, we use it with raw pointers
-// - If <atomic> is present (on newer versions of gcc, it is), we use
-// a <atomic>-based AtomicPointer. However we prefer the memory
-// barrier based version, because at least on a gcc 4.4 32-bit build
-// on linux, we have encountered a buggy <atomic> implementation.
-// Also, some <atomic> implementations are much slower than a memory-barrier
-// based implementation (~16ns for <atomic> based acquire-load vs. ~1ns for
-// a barrier based acquire-load).
-// This code is based on atomicops-internals-* in Google's perftools:
-// http://code.google.com/p/google-perftools/source/browse/#svn%2Ftrunk%2Fsrc%2Fbase
-
-#ifndef PORT_ATOMIC_POINTER_H_
-#define PORT_ATOMIC_POINTER_H_
-
-#include <stdint.h>
-
-#include <atomic>
-
-#if defined(_M_X64) || defined(__x86_64__)
-#define ARCH_CPU_X86_FAMILY 1
-#elif defined(_M_IX86) || defined(__i386__) || defined(__i386)
-#define ARCH_CPU_X86_FAMILY 1
-#elif defined(__ARMEL__)
-#define ARCH_CPU_ARM_FAMILY 1
-#elif defined(__aarch64__)
-#define ARCH_CPU_ARM64_FAMILY 1
-#elif defined(__ppc__) || defined(__powerpc__) || defined(__powerpc64__)
-#define ARCH_CPU_PPC_FAMILY 1
-#elif defined(__mips__)
-#define ARCH_CPU_MIPS_FAMILY 1
-#endif
-
-namespace leveldb {
-namespace port {
-
-// Define MemoryBarrier() if available
-// Windows on x86
-#if defined(OS_WIN) && defined(COMPILER_MSVC) && defined(ARCH_CPU_X86_FAMILY)
-// windows.h already provides a MemoryBarrier(void) macro
-// http://msdn.microsoft.com/en-us/library/ms684208(v=vs.85).aspx
-#define LEVELDB_HAVE_MEMORY_BARRIER
-
-// Mac OS
-#elif defined(__APPLE__)
-inline void MemoryBarrier() {
- std::atomic_thread_fence(std::memory_order_seq_cst);
-}
-#define LEVELDB_HAVE_MEMORY_BARRIER
-
-// Gcc on x86
-#elif defined(ARCH_CPU_X86_FAMILY) && defined(__GNUC__)
-inline void MemoryBarrier() {
- // See http://gcc.gnu.org/ml/gcc/2003-04/msg01180.html for a discussion on
- // this idiom. Also see http://en.wikipedia.org/wiki/Memory_ordering.
- __asm__ __volatile__("" : : : "memory");
-}
-#define LEVELDB_HAVE_MEMORY_BARRIER
-
-// Sun Studio
-#elif defined(ARCH_CPU_X86_FAMILY) && defined(__SUNPRO_CC)
-inline void MemoryBarrier() {
- // See http://gcc.gnu.org/ml/gcc/2003-04/msg01180.html for a discussion on
- // this idiom. Also see http://en.wikipedia.org/wiki/Memory_ordering.
- asm volatile("" : : : "memory");
-}
-#define LEVELDB_HAVE_MEMORY_BARRIER
-
-// ARM Linux
-#elif defined(ARCH_CPU_ARM_FAMILY) && defined(__linux__)
-typedef void (*LinuxKernelMemoryBarrierFunc)(void);
-// The Linux ARM kernel provides a highly optimized device-specific memory
-// barrier function at a fixed memory address that is mapped in every
-// user-level process.
-//
-// This beats using CPU-specific instructions which are, on single-core
-// devices, un-necessary and very costly (e.g. ARMv7-A "dmb" takes more
-// than 180ns on a Cortex-A8 like the one on a Nexus One). Benchmarking
-// shows that the extra function call cost is completely negligible on
-// multi-core devices.
-//
-inline void MemoryBarrier() {
- (*(LinuxKernelMemoryBarrierFunc)0xffff0fa0)();
-}
-#define LEVELDB_HAVE_MEMORY_BARRIER
-
-// ARM64
-#elif defined(ARCH_CPU_ARM64_FAMILY)
-inline void MemoryBarrier() {
- asm volatile("dmb sy" : : : "memory");
-}
-#define LEVELDB_HAVE_MEMORY_BARRIER
-
-// PPC
-#elif defined(ARCH_CPU_PPC_FAMILY) && defined(__GNUC__)
-inline void MemoryBarrier() {
- // TODO for some powerpc expert: is there a cheaper suitable variant?
- // Perhaps by having separate barriers for acquire and release ops.
- asm volatile("sync" : : : "memory");
-}
-#define LEVELDB_HAVE_MEMORY_BARRIER
-
-// MIPS
-#elif defined(ARCH_CPU_MIPS_FAMILY) && defined(__GNUC__)
-inline void MemoryBarrier() {
- __asm__ __volatile__("sync" : : : "memory");
-}
-#define LEVELDB_HAVE_MEMORY_BARRIER
-
-#endif
-
-// AtomicPointer built using platform-specific MemoryBarrier().
-#if defined(LEVELDB_HAVE_MEMORY_BARRIER)
-class AtomicPointer {
- private:
- void* rep_;
- public:
- AtomicPointer() { }
- explicit AtomicPointer(void* p) : rep_(p) {}
- inline void* NoBarrier_Load() const { return rep_; }
- inline void NoBarrier_Store(void* v) { rep_ = v; }
- inline void* Acquire_Load() const {
- void* result = rep_;
- MemoryBarrier();
- return result;
- }
- inline void Release_Store(void* v) {
- MemoryBarrier();
- rep_ = v;
- }
-};
-
-// AtomicPointer based on C++11 <atomic>.
-#else
-class AtomicPointer {
- private:
- std::atomic<void*> rep_;
- public:
- AtomicPointer() { }
- explicit AtomicPointer(void* v) : rep_(v) { }
- inline void* Acquire_Load() const {
- return rep_.load(std::memory_order_acquire);
- }
- inline void Release_Store(void* v) {
- rep_.store(v, std::memory_order_release);
- }
- inline void* NoBarrier_Load() const {
- return rep_.load(std::memory_order_relaxed);
- }
- inline void NoBarrier_Store(void* v) {
- rep_.store(v, std::memory_order_relaxed);
- }
-};
-
-#endif
-
-#undef LEVELDB_HAVE_MEMORY_BARRIER
-#undef ARCH_CPU_X86_FAMILY
-#undef ARCH_CPU_ARM_FAMILY
-#undef ARCH_CPU_ARM64_FAMILY
-#undef ARCH_CPU_PPC_FAMILY
-
-} // namespace port
-} // namespace leveldb
-
-#endif // PORT_ATOMIC_POINTER_H_
diff --git a/port/port_example.h b/port/port_example.h
index 9c648c3..1a8fca2 100644
--- a/port/port_example.h
+++ b/port/port_example.h
@@ -62,35 +62,6 @@ class CondVar {
void SignallAll();
};
-// A type that holds a pointer that can be read or written atomically
-// (i.e., without word-tearing.)
-class AtomicPointer {
- private:
- intptr_t rep_;
- public:
- // Initialize to arbitrary value
- AtomicPointer();
-
- // Initialize to hold v
- explicit AtomicPointer(void* v) : rep_(v) { }
-
- // Read and return the stored pointer with the guarantee that no
- // later memory access (read or write) by this thread can be
- // reordered ahead of this read.
- void* Acquire_Load() const;
-
- // Set v as the stored pointer with the guarantee that no earlier
- // memory access (read or write) by this thread can be reordered
- // after this store.
- void Release_Store(void* v);
-
- // Read the stored pointer with no ordering guarantees.
- void* NoBarrier_Load() const;
-
- // Set va as the stored pointer with no ordering guarantees.
- void NoBarrier_Store(void* v);
-};
-
// ------------------ Compression -------------------
// Store the snappy compression of "input[0,input_length-1]" in *output.
diff --git a/port/port_stdcxx.h b/port/port_stdcxx.h
index 4713e26..e21fa70 100644
--- a/port/port_stdcxx.h
+++ b/port/port_stdcxx.h
@@ -35,7 +35,6 @@
#include <condition_variable> // NOLINT
#include <mutex> // NOLINT
#include <string>
-#include "port/atomic_pointer.h"
#include "port/thread_annotations.h"
namespace leveldb {
diff --git a/util/arena.cc b/util/arena.cc
index a0338bf..a496ad0 100644
--- a/util/arena.cc
+++ b/util/arena.cc
@@ -3,7 +3,6 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "util/arena.h"
-#include <assert.h>
namespace leveldb {
@@ -60,8 +59,8 @@ char* Arena::AllocateAligned(size_t bytes) {
char* Arena::AllocateNewBlock(size_t block_bytes) {
char* result = new char[block_bytes];
blocks_.push_back(result);
- memory_usage_.NoBarrier_Store(
- reinterpret_cast<void*>(MemoryUsage() + block_bytes + sizeof(char*)));
+ memory_usage_.fetch_add(block_bytes + sizeof(char*),
+ std::memory_order_relaxed);
return result;
}
diff --git a/util/arena.h b/util/arena.h
index 48bab33..624e262 100644
--- a/util/arena.h
+++ b/util/arena.h
@@ -5,11 +5,11 @@
#ifndef STORAGE_LEVELDB_UTIL_ARENA_H_
#define STORAGE_LEVELDB_UTIL_ARENA_H_
+#include <atomic>
+#include <cassert>
+#include <cstddef>
+#include <cstdint>
#include <vector>
-#include <assert.h>
-#include <stddef.h>
-#include <stdint.h>
-#include "port/port.h"
namespace leveldb {
@@ -21,13 +21,13 @@ class Arena {
// Return a pointer to a newly allocated memory block of "bytes" bytes.
char* Allocate(size_t bytes);
- // Allocate memory with the normal alignment guarantees provided by malloc
+ // Allocate memory with the normal alignment guarantees provided by malloc.
char* AllocateAligned(size_t bytes);
// Returns an estimate of the total memory usage of data allocated
// by the arena.
size_t MemoryUsage() const {
- return reinterpret_cast<uintptr_t>(memory_usage_.NoBarrier_Load());
+ return memory_usage_.load(std::memory_order_relaxed);
}
private:
@@ -42,7 +42,10 @@ class Arena {
std::vector<char*> blocks_;
// Total memory usage of the arena.
- port::AtomicPointer memory_usage_;
+ //
+ // TODO(costan): This member is accessed via atomics, but the others are
+ // accessed without any locking. Is this OK?
+ std::atomic<size_t> memory_usage_;
// No copying allowed
Arena(const Arena&);
diff --git a/util/env_test.cc b/util/env_test.cc
index 070109b..b204089 100644
--- a/util/env_test.cc
+++ b/util/env_test.cc
@@ -5,6 +5,7 @@
#include "leveldb/env.h"
#include <algorithm>
+#include <atomic>
#include "port/port.h"
#include "port/thread_annotations.h"
@@ -24,10 +25,15 @@ class EnvTest {
EnvTest() : env_(Env::Default()) { }
};
-static void SetBool(void* ptr) {
- reinterpret_cast<port::AtomicPointer*>(ptr)->NoBarrier_Store(ptr);
+namespace {
+
+static void SetAtomicBool(void* atomic_bool_ptr) {
+ std::atomic<bool>* atomic_bool =
+ reinterpret_cast<std::atomic<bool>*>(atomic_bool_ptr);
+ atomic_bool->store(true, std::memory_order_relaxed);
}
+} // namespace
TEST(EnvTest, ReadWrite) {
Random rnd(test::RandomSeed());
@@ -77,42 +83,41 @@ TEST(EnvTest, ReadWrite) {
}
TEST(EnvTest, RunImmediately) {
- port::AtomicPointer called(nullptr);
- env_->Schedule(&SetBool, &called);
+ std::atomic<bool> called(false);
+ env_->Schedule(&SetAtomicBool, &called);
env_->SleepForMicroseconds(kDelayMicros);
- ASSERT_TRUE(called.NoBarrier_Load() != nullptr);
+ ASSERT_TRUE(called.load(std::memory_order_relaxed));
}
TEST(EnvTest, RunMany) {
- port::AtomicPointer last_id(nullptr);
+ std::atomic<int> last_id(0);
- struct CB {
- port::AtomicPointer* last_id_ptr; // Pointer to shared slot
- uintptr_t id; // Order# for the execution of this callback
+ struct Callback {
+ std::atomic<int>* const last_id_ptr_; // Pointer to shared state.
+ const int id_; // Order# for the execution of this callback.
- CB(port::AtomicPointer* p, int i) : last_id_ptr(p), id(i) { }
+ Callback(std::atomic<int>* last_id_ptr, int id)
+ : last_id_ptr_(last_id_ptr), id_(id) { }
- static void Run(void* v) {
- CB* cb = reinterpret_cast<CB*>(v);
- void* cur = cb->last_id_ptr->NoBarrier_Load();
- ASSERT_EQ(cb->id-1, reinterpret_cast<uintptr_t>(cur));
- cb->last_id_ptr->Release_Store(reinterpret_cast<void*>(cb->id));
+ static void Run(void* arg) {
+ Callback* callback = reinterpret_cast<Callback*>(arg);
+ int current_id = callback->last_id_ptr_->load(std::memory_order_relaxed);
+ ASSERT_EQ(callback->id_ - 1, current_id);
+ callback->last_id_ptr_->store(callback->id_, std::memory_order_relaxed);
}
};
- // Schedule in different order than start time
- CB cb1(&last_id, 1);
- CB cb2(&last_id, 2);
- CB cb3(&last_id, 3);
- CB cb4(&last_id, 4);
- env_->Schedule(&CB::Run, &cb1);
- env_->Schedule(&CB::Run, &cb2);
- env_->Schedule(&CB::Run, &cb3);
- env_->Schedule(&CB::Run, &cb4);
+ Callback callback1(&last_id, 1);
+ Callback callback2(&last_id, 2);
+ Callback callback3(&last_id, 3);
+ Callback callback4(&last_id, 4);
+ env_->Schedule(&Callback::Run, &callback1);
+ env_->Schedule(&Callback::Run, &callback2);
+ env_->Schedule(&Callback::Run, &callback3);
+ env_->Schedule(&Callback::Run, &callback4);
env_->SleepForMicroseconds(kDelayMicros);
- void* cur = last_id.Acquire_Load();
- ASSERT_EQ(4, reinterpret_cast<uintptr_t>(cur));
+ ASSERT_EQ(4, last_id.load(std::memory_order_relaxed));
}
struct State {
diff --git a/util/env_windows.cc b/util/env_windows.cc
index 57932bb..3b4496b 100644
--- a/util/env_windows.cc
+++ b/util/env_windows.cc
@@ -8,6 +8,7 @@
#include <windows.h>
#include <algorithm>
+#include <atomic>
#include <chrono>
#include <condition_variable>
#include <deque>
@@ -105,51 +106,42 @@ class ScopedHandle {
};
// Helper class to limit resource usage to avoid exhaustion.
-// Currently used to limit mmap file usage so that we do not end
-// up running out virtual memory, or running into kernel performance
-// problems for very large databases.
+// Currently used to limit read-only file descriptors and mmap file usage
+// so that we do not run out of file descriptors or virtual memory, or run into
+// kernel performance problems for very large databases.
class Limiter {
public:
- // Limit maximum number of resources to |n|.
- Limiter(intptr_t n) { SetAllowed(n); }
+ // Limit maximum number of resources to |max_acquires|.
+ Limiter(int max_acquires) : acquires_allowed_(max_acquires) {}
+
+ Limiter(const Limiter&) = delete;
+ Limiter operator=(const Limiter&) = delete;
// If another resource is available, acquire it and return true.
// Else return false.
- bool Acquire() LOCKS_EXCLUDED(mu_) {
- if (GetAllowed() <= 0) {
- return false;
- }
- MutexLock l(&mu_);
- intptr_t x = GetAllowed();
- if (x <= 0) {
- return false;
- } else {
- SetAllowed(x - 1);
+ bool Acquire() {
+ int old_acquires_allowed =
+ acquires_allowed_.fetch_sub(1, std::memory_order_relaxed);
+
+ if (old_acquires_allowed > 0)
return true;
- }
+
+ acquires_allowed_.fetch_add(1, std::memory_order_relaxed);
+ return false;
}
// Release a resource acquired by a previous call to Acquire() that returned
// true.
- void Release() LOCKS_EXCLUDED(mu_) {
- MutexLock l(&mu_);
- SetAllowed(GetAllowed() + 1);
+ void Release() {
+ acquires_allowed_.fetch_add(1, std::memory_order_relaxed);
}
private:
- port::Mutex mu_;
- port::AtomicPointer allowed_;
-
- intptr_t GetAllowed() const {
- return reinterpret_cast<intptr_t>(allowed_.Acquire_Load());
- }
-
- void SetAllowed(intptr_t v) EXCLUSIVE_LOCKS_REQUIRED(mu_) {
- allowed_.Release_Store(reinterpret_cast<void*>(v));
- }
-
- Limiter(const Limiter&);
- void operator=(const Limiter&);
+ // The number of available resources.
+ //
+ // This is a counter and is not tied to the invariants of any other class, so
+ // it can be operated on safely using std::memory_order_relaxed.
+ std::atomic<int> acquires_allowed_;
};
class WindowsSequentialFile : public SequentialFile {