summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/concurrency/lock_state.cpp1
-rw-r--r--src/mongo/db/range_deleter.cpp11
-rw-r--r--src/mongo/db/range_deleter.h6
-rw-r--r--src/mongo/db/range_deleter_test.cpp30
-rw-r--r--src/mongo/db/s/collection_sharding_state.cpp6
-rw-r--r--src/mongo/db/s/migration_source_manager.cpp28
-rw-r--r--src/mongo/db/s/migration_source_manager.h44
-rw-r--r--src/mongo/db/s/operation_sharding_state.cpp71
-rw-r--r--src/mongo/db/s/operation_sharding_state.h7
-rw-r--r--src/mongo/db/s/set_shard_version_command.cpp16
-rw-r--r--src/mongo/db/s/sharding_state.cpp2
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h1
-rw-r--r--src/mongo/dbtests/threadedtests.cpp55
-rw-r--r--src/mongo/util/SConscript2
-rw-r--r--src/mongo/util/background_job_test.cpp42
-rw-r--r--src/mongo/util/concurrency/SConscript10
-rw-r--r--src/mongo/util/concurrency/notification.h155
-rw-r--r--src/mongo/util/net/SConscript2
-rw-r--r--src/mongo/util/net/message_server_port.cpp2
-rw-r--r--src/mongo/util/net/sock_test.cpp18
-rw-r--r--src/mongo/util/net/thread_idle_callback.cpp (renamed from src/mongo/util/concurrency/synchronization.cpp)32
-rw-r--r--src/mongo/util/net/thread_idle_callback.h (renamed from src/mongo/util/concurrency/synchronization.h)55
-rw-r--r--src/mongo/util/tcmalloc_server_status_section.cpp2
24 files changed, 283 insertions, 316 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 94464e37819..9e53d26a224 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -367,7 +367,6 @@ env.Library(
'$BUILD_DIR/mongo/db/common',
'$BUILD_DIR/mongo/db/repl/repl_coordinator_global',
'$BUILD_DIR/mongo/db/service_context',
- '$BUILD_DIR/mongo/util/concurrency/synchronization',
'range_arithmetic',
],
)
diff --git a/src/mongo/db/concurrency/lock_state.cpp b/src/mongo/db/concurrency/lock_state.cpp
index d16c9f4e11d..2d167091b52 100644
--- a/src/mongo/db/concurrency/lock_state.cpp
+++ b/src/mongo/db/concurrency/lock_state.cpp
@@ -38,7 +38,6 @@
#include "mongo/db/service_context.h"
#include "mongo/platform/compiler.h"
#include "mongo/util/background.h"
-#include "mongo/util/concurrency/synchronization.h"
#include "mongo/util/concurrency/ticketholder.h"
#include "mongo/util/debug_util.h"
#include "mongo/util/log.h"
diff --git a/src/mongo/db/range_deleter.cpp b/src/mongo/db/range_deleter.cpp
index 666c8ee45a9..be280d8aa4a 100644
--- a/src/mongo/db/range_deleter.cpp
+++ b/src/mongo/db/range_deleter.cpp
@@ -39,7 +39,6 @@
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/service_context.h"
#include "mongo/db/write_concern_options.h"
-#include "mongo/util/concurrency/synchronization.h"
#include "mongo/util/exit.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
@@ -203,7 +202,7 @@ void RangeDeleter::stopWorkers() {
bool RangeDeleter::queueDelete(OperationContext* txn,
const RangeDeleterOptions& options,
- Notification* notifyDone,
+ Notification<void>* doneSignal,
std::string* errMsg) {
string dummy;
if (errMsg == NULL)
@@ -214,7 +213,7 @@ bool RangeDeleter::queueDelete(OperationContext* txn,
const BSONObj& max(options.range.maxKey);
unique_ptr<RangeDeleteEntry> toDelete(new RangeDeleteEntry(options));
- toDelete->notifyDone = notifyDone;
+ toDelete->doneSignal = doneSignal;
{
stdx::lock_guard<stdx::mutex> sl(_queueMutex);
@@ -507,8 +506,8 @@ void RangeDeleter::doWork() {
deletePtrElement(&_deleteSet, &setEntry);
_deletesInProgress--;
- if (nextTask->notifyDone) {
- nextTask->notifyDone->notifyOne();
+ if (nextTask->doneSignal) {
+ nextTask->doneSignal->set();
}
}
@@ -563,7 +562,7 @@ void RangeDeleter::recordDelStats(DeleteJobStats* newStat) {
}
RangeDeleteEntry::RangeDeleteEntry(const RangeDeleterOptions& options)
- : options(options), notifyDone(NULL) {}
+ : options(options), doneSignal(nullptr) {}
BSONObj RangeDeleteEntry::toBSON() const {
BSONObjBuilder builder;
diff --git a/src/mongo/db/range_deleter.h b/src/mongo/db/range_deleter.h
index 39f30e17a62..776f8a825fc 100644
--- a/src/mongo/db/range_deleter.h
+++ b/src/mongo/db/range_deleter.h
@@ -42,7 +42,7 @@
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/thread.h"
#include "mongo/util/concurrency/mutex.h"
-#include "mongo/util/concurrency/synchronization.h"
+#include "mongo/util/concurrency/notification.h"
#include "mongo/util/time_support.h"
namespace mongo {
@@ -138,7 +138,7 @@ public:
*/
bool queueDelete(OperationContext* txn,
const RangeDeleterOptions& options,
- Notification* notifyDone,
+ Notification<void>* doneSignal,
std::string* errMsg);
/**
@@ -286,7 +286,7 @@ struct RangeDeleteEntry {
// Not owned here.
// Important invariant: Can only be set and used by one thread.
- Notification* notifyDone;
+ Notification<void>* doneSignal;
// Time since the last time we reported this object.
Date_t lastLoggedTS;
diff --git a/src/mongo/db/range_deleter_test.cpp b/src/mongo/db/range_deleter_test.cpp
index a2fc38dc6c6..34c3a4b5739 100644
--- a/src/mongo/db/range_deleter_test.cpp
+++ b/src/mongo/db/range_deleter_test.cpp
@@ -26,6 +26,8 @@
* it in the license file.
*/
+#include "mongo/platform/basic.h"
+
#include <string>
#include "mongo/db/field_parser.h"
@@ -96,13 +98,13 @@ TEST(QueuedDelete, ShouldWaitCursor) {
env->addCursorId(ns, 345);
- Notification notifyDone;
+ Notification<void> doneSignal;
RangeDeleterOptions deleterOptions(
KeyRange(ns, BSON("x" << 0), BSON("x" << 10), BSON("x" << 1)));
deleterOptions.waitForOpenCursors = true;
ASSERT_TRUE(
- deleter.queueDelete(noTxn, deleterOptions, &notifyDone, NULL /* errMsg not needed */));
+ deleter.queueDelete(noTxn, deleterOptions, &doneSignal, NULL /* errMsg not needed */));
env->waitForNthGetCursor(1u);
@@ -113,7 +115,7 @@ TEST(QueuedDelete, ShouldWaitCursor) {
env->addCursorId(ns, 200);
env->removeCursorId(ns, 345);
- notifyDone.waitToBeNotified();
+ doneSignal.get(noTxn);
ASSERT_TRUE(env->deleteOccured());
const DeletedRange deletedChunk(env->getLastDelete());
@@ -141,12 +143,12 @@ TEST(QueuedDelete, StopWhileWaitingCursor) {
env->addCursorId(ns, 345);
- Notification notifyDone;
+ Notification<void> doneSignal;
RangeDeleterOptions deleterOptions(
KeyRange(ns, BSON("x" << 0), BSON("x" << 10), BSON("x" << 1)));
deleterOptions.waitForOpenCursors = true;
ASSERT_TRUE(
- deleter.queueDelete(noTxn, deleterOptions, &notifyDone, NULL /* errMsg not needed */));
+ deleter.queueDelete(noTxn, deleterOptions, &doneSignal, NULL /* errMsg not needed */));
env->waitForNthGetCursor(1u);
@@ -279,31 +281,31 @@ TEST(MixedDeletes, MultipleDeletes) {
env->addCursorId(blockedNS, 345);
env->pauseDeletes();
- Notification notifyDone1;
+ Notification<void> doneSignal1;
RangeDeleterOptions deleterOption1(
KeyRange(ns, BSON("x" << 10), BSON("x" << 20), BSON("x" << 1)));
deleterOption1.waitForOpenCursors = true;
ASSERT_TRUE(
- deleter.queueDelete(noTxn, deleterOption1, &notifyDone1, NULL /* don't care errMsg */));
+ deleter.queueDelete(noTxn, deleterOption1, &doneSignal1, NULL /* don't care errMsg */));
env->waitForNthPausedDelete(1u);
// Make sure that the delete is already in progress before proceeding.
ASSERT_EQUALS(1U, deleter.getDeletesInProgress());
- Notification notifyDone2;
+ Notification<void> doneSignal2;
RangeDeleterOptions deleterOption2(
KeyRange(blockedNS, BSON("x" << 20), BSON("x" << 30), BSON("x" << 1)));
deleterOption2.waitForOpenCursors = true;
ASSERT_TRUE(
- deleter.queueDelete(noTxn, deleterOption2, &notifyDone2, NULL /* don't care errMsg */));
+ deleter.queueDelete(noTxn, deleterOption2, &doneSignal2, NULL /* don't care errMsg */));
- Notification notifyDone3;
+ Notification<void> doneSignal3;
RangeDeleterOptions deleterOption3(
KeyRange(ns, BSON("x" << 30), BSON("x" << 40), BSON("x" << 1)));
deleterOption3.waitForOpenCursors = true;
ASSERT_TRUE(
- deleter.queueDelete(noTxn, deleterOption3, &notifyDone3, NULL /* don't care errMsg */));
+ deleter.queueDelete(noTxn, deleterOption3, &doneSignal3, NULL /* don't care errMsg */));
// Now, the setup is:
// { x: 10 } => { x: 20 } in progress.
@@ -317,7 +319,7 @@ TEST(MixedDeletes, MultipleDeletes) {
// Let the first delete proceed.
env->resumeOneDelete();
- notifyDone1.waitToBeNotified();
+ doneSignal1.get(noTxn);
ASSERT_TRUE(env->deleteOccured());
@@ -332,7 +334,7 @@ TEST(MixedDeletes, MultipleDeletes) {
// Let the second delete proceed.
env->resumeOneDelete();
- notifyDone3.waitToBeNotified();
+ doneSignal3.get(noTxn);
DeletedRange deleted2(env->getLastDelete());
@@ -347,7 +349,7 @@ TEST(MixedDeletes, MultipleDeletes) {
env->removeCursorId(blockedNS, 345);
// Let the last delete proceed.
env->resumeOneDelete();
- notifyDone2.waitToBeNotified();
+ doneSignal2.get(noTxn);
DeletedRange deleted3(env->getLastDelete());
diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp
index 185b8bab56e..20cc7e2d4d6 100644
--- a/src/mongo/db/s/collection_sharding_state.cpp
+++ b/src/mongo/db/s/collection_sharding_state.cpp
@@ -242,13 +242,13 @@ bool CollectionShardingState::_checkShardVersionOk(OperationContext* txn,
// Set this for error messaging purposes before potentially returning false.
*actualShardVersion = (_metadata ? _metadata->getShardVersion() : ChunkVersion::UNSHARDED());
- if (_sourceMgr && _sourceMgr->getMigrationCriticalSection()) {
+ if (_sourceMgr && _sourceMgr->getMigrationCriticalSectionSignal()) {
*errmsg = str::stream() << "migration commit in progress for " << _nss.ns();
// Set migration critical section on operation sharding state: operation will wait for the
// migration to finish before returning failure and retrying.
- OperationShardingState::get(txn).setMigrationCriticalSection(
- _sourceMgr->getMigrationCriticalSection());
+ OperationShardingState::get(txn).setMigrationCriticalSectionSignal(
+ _sourceMgr->getMigrationCriticalSectionSignal());
return false;
}
diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp
index 49b07775b28..12d0ab201fb 100644
--- a/src/mongo/db/s/migration_source_manager.cpp
+++ b/src/mongo/db/s/migration_source_manager.cpp
@@ -253,7 +253,7 @@ Status MigrationSourceManager::enterCriticalSection(OperationContext* txn) {
// IMPORTANT: After this line, the critical section is in place and needs to be rolled back
// if anything fails, which would prevent commit to the config servers.
- _critSec = std::make_shared<CriticalSectionState>();
+ _critSecSignal = std::make_shared<Notification<void>>();
}
log() << "Successfully entered critical section.";
@@ -527,8 +527,7 @@ void MigrationSourceManager::_cleanup(OperationContext* txn) {
// Leave the critical section.
if (_state == kCriticalSection) {
- _critSec->exitCriticalSection();
- _critSec.reset();
+ _critSecSignal->set();
}
}
@@ -546,27 +545,4 @@ void MigrationSourceManager::_cleanup(OperationContext* txn) {
_state = kDone;
}
-MigrationSourceManager::CriticalSectionState::CriticalSectionState() = default;
-
-bool MigrationSourceManager::CriticalSectionState::waitUntilOutOfCriticalSection(
- Microseconds waitTimeout) {
- const auto waitDeadline = Date_t::now() + waitTimeout;
-
- stdx::unique_lock<stdx::mutex> sl(_criticalSectionMutex);
- while (_inCriticalSection) {
- if (stdx::cv_status::timeout ==
- _criticalSectionCV.wait_until(sl, waitDeadline.toSystemTimePoint())) {
- return false;
- }
- }
-
- return true;
-}
-
-void MigrationSourceManager::CriticalSectionState::exitCriticalSection() {
- stdx::unique_lock<stdx::mutex> sl(_criticalSectionMutex);
- _inCriticalSection = false;
- _criticalSectionCV.notify_all();
-}
-
} // namespace mongo
diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h
index 292f3c52045..16eddc2351b 100644
--- a/src/mongo/db/s/migration_source_manager.h
+++ b/src/mongo/db/s/migration_source_manager.h
@@ -28,14 +28,12 @@
#pragma once
-#include <list>
-#include <set>
#include <string>
#include "mongo/base/disallow_copying.h"
#include "mongo/s/move_chunk_request.h"
#include "mongo/s/shard_key_pattern.h"
-#include "mongo/stdx/condition_variable.h"
+#include "mongo/util/concurrency/notification.h"
#include "mongo/util/timer.h"
namespace mongo {
@@ -73,8 +71,6 @@ class MigrationSourceManager {
MONGO_DISALLOW_COPYING(MigrationSourceManager);
public:
- class CriticalSectionState;
-
/**
* Instantiates a new migration source manager with the specified migration parameters. Must be
* called with the distributed lock acquired in advance (not asserted).
@@ -174,8 +170,8 @@ public:
*
* Must be called with some form of lock on the collection namespace.
*/
- std::shared_ptr<CriticalSectionState> getMigrationCriticalSection() const {
- return _critSec;
+ std::shared_ptr<Notification<void>> getMigrationCriticalSectionSignal() const {
+ return _critSecSignal;
}
private:
@@ -212,39 +208,7 @@ private:
// Whether the source manager is in a critical section. Tracked as a shared pointer so that
// callers don't have to hold collection lock in order to wait on it. Available after the
// critical section stage has completed.
- std::shared_ptr<CriticalSectionState> _critSec;
-};
-
-/**
- * This object is instantiated once the migration logic enters critical section. It contains all
- * the state which is associated with being in a critical section, such as the bumped metadata
- * version (which has not yet been reflected on the config server).
- */
-class MigrationSourceManager::CriticalSectionState {
- MONGO_DISALLOW_COPYING(CriticalSectionState);
-
-public:
- CriticalSectionState();
-
- /**
- * Blocks until the critical section completes. Returns true if the wait succeeded and the
- * critical section is no longer active, or false if the waitTimeout was exceeded.
- */
- bool waitUntilOutOfCriticalSection(Microseconds waitTimeout);
-
- /**
- * To be called when the critical section has completed. Signals any threads sitting blocked in
- * waitUntilOutOfCriticalSection. Must only be used once for the lifetime of this object.
- */
- void exitCriticalSection();
-
-private:
- // Only moves from true to false once. Happens under the critical section mutex and the critical
- // section will be signalled.
- bool _inCriticalSection{true};
-
- stdx::mutex _criticalSectionMutex;
- stdx::condition_variable _criticalSectionCV;
+ std::shared_ptr<Notification<void>> _critSecSignal;
};
} // namespace mongo
diff --git a/src/mongo/db/s/operation_sharding_state.cpp b/src/mongo/db/s/operation_sharding_state.cpp
index c6fe82ff6d0..97991dc27f8 100644
--- a/src/mongo/db/s/operation_sharding_state.cpp
+++ b/src/mongo/db/s/operation_sharding_state.cpp
@@ -1,30 +1,30 @@
-/*
-* Copyright (C) 2015 MongoDB Inc.
-*
-* This program is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License, version 3,
-* as published by the Free Software Foundation.
-*
-* This program is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see <http://www.gnu.org/licenses/>.
-*
-* As a special exception, the copyright holders give permission to link the
-* code of portions of this program with the OpenSSL library under certain
-* conditions as described in each individual source file and distribute
-* linked combinations including the program with the OpenSSL library. You
-* must comply with the GNU Affero General Public License in all respects for
-* all of the code used other than as permitted herein. If you modify file(s)
-* with this exception, you may extend this exception to your version of the
-* file(s), but you are not obligated to do so. If you do not wish to do so,
-* delete this exception statement from your version. If you delete this
-* exception statement from all source files in the program, then also delete
-* it in the license file.
-*/
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
#include "mongo/platform/basic.h"
@@ -96,26 +96,27 @@ void OperationShardingState::setShardVersion(NamespaceString nss, ChunkVersion n
_hasVersion = true;
}
-bool OperationShardingState::waitForMigrationCriticalSection(OperationContext* txn) {
+bool OperationShardingState::waitForMigrationCriticalSectionSignal(OperationContext* txn) {
// Must not block while holding a lock
invariant(!txn->lockState()->isLocked());
- if (_migrationCriticalSection) {
- _migrationCriticalSection->waitUntilOutOfCriticalSection(
+ if (_migrationCriticalSectionSignal) {
+ _migrationCriticalSectionSignal->waitFor(
+ txn,
txn->hasDeadline()
? std::min(txn->getRemainingMaxTimeMicros(), kMaxWaitForMigrationCriticalSection)
: kMaxWaitForMigrationCriticalSection);
- _migrationCriticalSection = nullptr;
+ _migrationCriticalSectionSignal = nullptr;
return true;
}
return false;
}
-void OperationShardingState::setMigrationCriticalSection(
- std::shared_ptr<MigrationSourceManager::CriticalSectionState> critSec) {
- invariant(critSec);
- _migrationCriticalSection = std::move(critSec);
+void OperationShardingState::setMigrationCriticalSectionSignal(
+ std::shared_ptr<Notification<void>> critSecSignal) {
+ invariant(critSecSignal);
+ _migrationCriticalSectionSignal = std::move(critSecSignal);
}
void OperationShardingState::_clear() {
diff --git a/src/mongo/db/s/operation_sharding_state.h b/src/mongo/db/s/operation_sharding_state.h
index f52cbd9f702..0ef397b7e92 100644
--- a/src/mongo/db/s/operation_sharding_state.h
+++ b/src/mongo/db/s/operation_sharding_state.h
@@ -99,15 +99,14 @@ public:
* Returns true if the call actually waited because of migration critical section (regardless if
* whether it timed out or not), false if there was no active migration critical section.
*/
- bool waitForMigrationCriticalSection(OperationContext* txn);
+ bool waitForMigrationCriticalSectionSignal(OperationContext* txn);
/**
* Setting this value indicates that when the version check failed, there was an active
* migration for the namespace and that it would be prudent to wait for the critical section to
* complete before retrying so the router doesn't make wasteful requests.
*/
- void setMigrationCriticalSection(
- std::shared_ptr<MigrationSourceManager::CriticalSectionState> critSec);
+ void setMigrationCriticalSectionSignal(std::shared_ptr<Notification<void>> critSecSignal);
private:
/**
@@ -122,7 +121,7 @@ private:
// This value will only be non-null if version check during the operation execution failed due
// to stale version and there was a migration for that namespace, which was in critical section.
- std::shared_ptr<MigrationSourceManager::CriticalSectionState> _migrationCriticalSection;
+ std::shared_ptr<Notification<void>> _migrationCriticalSectionSignal;
};
/**
diff --git a/src/mongo/db/s/set_shard_version_command.cpp b/src/mongo/db/s/set_shard_version_command.cpp
index acdb419538d..aa0ed124df2 100644
--- a/src/mongo/db/s/set_shard_version_command.cpp
+++ b/src/mongo/db/s/set_shard_version_command.cpp
@@ -245,12 +245,12 @@ public:
if (requestedVersion < collectionShardVersion &&
requestedVersion.epoch() == collectionShardVersion.epoch()) {
if (css->getMigrationSourceManager()) {
- auto critSec =
- css->getMigrationSourceManager()->getMigrationCriticalSection();
- if (critSec) {
+ auto critSecSignal =
+ css->getMigrationSourceManager()->getMigrationCriticalSectionSignal();
+ if (critSecSignal) {
autoColl.reset();
log() << "waiting till out of critical section";
- critSec->waitUntilOutOfCriticalSection(Seconds(10));
+ critSecSignal->waitFor(txn, Seconds(10));
}
}
@@ -267,12 +267,12 @@ public:
// Needed b/c when the last chunk is moved off a shard, the version gets reset
// to zero, which should require a reload.
if (css->getMigrationSourceManager()) {
- auto critSec =
- css->getMigrationSourceManager()->getMigrationCriticalSection();
- if (critSec) {
+ auto critSecSignal =
+ css->getMigrationSourceManager()->getMigrationCriticalSectionSignal();
+ if (critSecSignal) {
autoColl.reset();
log() << "waiting till out of critical section";
- critSec->waitUntilOutOfCriticalSection(Seconds(10));
+ critSecSignal->waitFor(txn, Seconds(10));
}
}
diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp
index 46609016cfa..01514ab3738 100644
--- a/src/mongo/db/s/sharding_state.cpp
+++ b/src/mongo/db/s/sharding_state.cpp
@@ -310,7 +310,7 @@ Status ShardingState::onStaleShardVersion(OperationContext* txn,
// Ensure any ongoing migrations have completed
auto& oss = OperationShardingState::get(txn);
- oss.waitForMigrationCriticalSection(txn);
+ oss.waitForMigrationCriticalSectionSignal(txn);
ChunkVersion collectionShardVersion;
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h
index 6200196cb02..be7a8229a4e 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h
@@ -40,7 +40,6 @@
#include "mongo/db/storage/record_store.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/stdx/mutex.h"
-#include "mongo/util/concurrency/synchronization.h"
#include "mongo/util/fail_point_service.h"
/**
diff --git a/src/mongo/dbtests/threadedtests.cpp b/src/mongo/dbtests/threadedtests.cpp
index 5ab410361eb..d2f8c4a7ea4 100644
--- a/src/mongo/dbtests/threadedtests.cpp
+++ b/src/mongo/dbtests/threadedtests.cpp
@@ -49,7 +49,6 @@
#include "mongo/util/concurrency/old_thread_pool.h"
#include "mongo/util/concurrency/old_thread_pool.h"
#include "mongo/util/concurrency/rwlock.h"
-#include "mongo/util/concurrency/synchronization.h"
#include "mongo/util/concurrency/ticketholder.h"
#include "mongo/util/log.h"
#include "mongo/util/timer.h"
@@ -610,59 +609,6 @@ private:
}
};
-class CondSlack : public ThreadedTest<17> {
- Notification n;
-
-public:
- CondSlack() {
- k = 0;
- done = false;
- a = b = 0;
- locks = 0;
- }
-
-private:
- unsigned a, b;
- virtual void validate() {
- cout << "CondSlack useful work fraction: " << ((double)a) / b << " locks:" << locks << endl;
- }
- unsigned locks;
- volatile int k;
- void watch() {
- while (1) {
- b++;
- if (k) {
- a++;
- }
- sleepmillis(0);
- if (done)
- break;
- }
- }
- volatile bool done;
- virtual void subthread(int x) {
- if (x == 1) {
- n.notifyOne();
- watch();
- return;
- }
- Timer t;
- while (1) {
- n.waitToBeNotified();
- verify(k == 0);
- k = 1;
- // not very long, we'd like to simulate about 100K locks per second
- sleepalittle();
- k = 0;
- locks++;
- n.notifyOne();
- if (done || t.millis() > 1500)
- break;
- }
- done = true;
- }
-};
-
const int WriteLocksAreGreedy_ThreadCount = 3;
class WriteLocksAreGreedy : public ThreadedTest<WriteLocksAreGreedy_ThreadCount> {
public:
@@ -778,7 +724,6 @@ public:
// would have very little slack.
add<Slack<SimpleMutex, stdx::lock_guard<SimpleMutex>>>();
add<Slack<SimpleRWLock, SimpleRWLock::Exclusive>>();
- add<CondSlack>();
add<UpgradableTest>();
diff --git a/src/mongo/util/SConscript b/src/mongo/util/SConscript
index e38ba85ef7d..289989edc04 100644
--- a/src/mongo/util/SConscript
+++ b/src/mongo/util/SConscript
@@ -144,7 +144,6 @@ env.Library(
LIBDEPS=[
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/platform/platform',
- '$BUILD_DIR/mongo/util/concurrency/synchronization',
'$BUILD_DIR/third_party/shim_allocator',
'$BUILD_DIR/third_party/shim_boost',
'$BUILD_DIR/third_party/shim_tz',
@@ -256,7 +255,6 @@ env.CppUnitTest(
],
LIBDEPS=[
"background_job",
- "concurrency/synchronization",
],
)
diff --git a/src/mongo/util/background_job_test.cpp b/src/mongo/util/background_job_test.cpp
index 1bd23d47792..57d403de614 100644
--- a/src/mongo/util/background_job_test.cpp
+++ b/src/mongo/util/background_job_test.cpp
@@ -32,25 +32,18 @@
#include "mongo/stdx/thread.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/background.h"
-#include "mongo/util/concurrency/synchronization.h"
+#include "mongo/util/concurrency/notification.h"
#include "mongo/util/time_support.h"
+namespace mongo {
namespace {
-using mongo::AtomicWord;
-using mongo::BackgroundJob;
-using mongo::MsgAssertionException;
-using mongo::stdx::mutex;
-using mongo::Notification;
-
-namespace stdx = mongo::stdx;
-
-class TestJob final : public mongo::BackgroundJob {
+class TestJob final : public BackgroundJob {
public:
TestJob(bool selfDelete,
AtomicWord<bool>* flag,
- Notification* canProceed = nullptr,
- Notification* destructorInvoked = nullptr)
+ Notification<void>* canProceed = nullptr,
+ Notification<void>* destructorInvoked = nullptr)
: BackgroundJob(selfDelete),
_flag(flag),
_canProceed(canProceed),
@@ -58,7 +51,7 @@ public:
~TestJob() override {
if (_destructorInvoked)
- _destructorInvoked->notifyOne();
+ _destructorInvoked->set();
}
std::string name() const override {
@@ -67,14 +60,14 @@ public:
void run() override {
if (_canProceed)
- _canProceed->waitToBeNotified();
+ _canProceed->get();
_flag->store(true);
}
private:
AtomicWord<bool>* const _flag;
- Notification* const _canProceed;
- Notification* const _destructorInvoked;
+ Notification<void>* const _canProceed;
+ Notification<void>* const _destructorInvoked;
};
TEST(BackgroundJobBasic, NormalCase) {
@@ -87,24 +80,24 @@ TEST(BackgroundJobBasic, NormalCase) {
TEST(BackgroundJobBasic, TimeOutCase) {
AtomicWord<bool> flag(false);
- Notification canProceed;
+ Notification<void> canProceed;
TestJob tj(false, &flag, &canProceed);
tj.go();
ASSERT(!tj.wait(1000));
ASSERT_EQUALS(false, flag.load());
- canProceed.notifyOne();
+ canProceed.set();
ASSERT(tj.wait());
ASSERT_EQUALS(true, flag.load());
}
TEST(BackgroundJobBasic, SelfDeletingCase) {
AtomicWord<bool> flag(false);
- Notification destructorInvoked;
+ Notification<void> destructorInvoked;
// Though it looks like one, this is not a leak since the job is self deleting.
(new TestJob(true, &flag, nullptr, &destructorInvoked))->go();
- destructorInvoked.waitToBeNotified();
+ destructorInvoked.get();
ASSERT_EQUALS(true, flag.load());
}
@@ -124,17 +117,17 @@ TEST(BackgroundJobLifeCycle, Go) {
_hasRun = true;
}
- _n.waitToBeNotified();
+ _n.get();
}
void notify() {
- _n.notifyOne();
+ _n.set();
}
private:
- mutex _mutex;
+ stdx::mutex _mutex;
bool _hasRun;
- Notification _n;
+ Notification<void> _n;
};
Job j;
@@ -155,3 +148,4 @@ TEST(BackgroundJobLifeCycle, Go) {
}
} // namespace
+} // namespace mongo \ No newline at end of file
diff --git a/src/mongo/util/concurrency/SConscript b/src/mongo/util/concurrency/SConscript
index 3bc52f9acce..fb7462fe586 100644
--- a/src/mongo/util/concurrency/SConscript
+++ b/src/mongo/util/concurrency/SConscript
@@ -38,16 +38,6 @@ env.Library('ticketholder',
'$BUILD_DIR/third_party/shim_boost'])
env.Library(
- target='synchronization',
- source=[
- 'synchronization.cpp'
- ],
- LIBDEPS=[
- '$BUILD_DIR/mongo/base',
- ],
-)
-
-env.Library(
target='spin_lock',
source=[
"spin_lock.cpp",
diff --git a/src/mongo/util/concurrency/notification.h b/src/mongo/util/concurrency/notification.h
new file mode 100644
index 00000000000..b8e8c8e0317
--- /dev/null
+++ b/src/mongo/util/concurrency/notification.h
@@ -0,0 +1,155 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "boost/optional.hpp"
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/duration.h"
+#include "mongo/util/time_support.h"
+
+namespace mongo {
+
+class OperationContext;
+
+/**
+ * Allows waiting for a result returned from an asynchronous operation.
+ */
+template <class T>
+class Notification {
+ MONGO_DISALLOW_COPYING(Notification);
+
+public:
+ /**
+ * Constructs a default notification in the unset state.
+ */
+ Notification() = default;
+
+ // Allow std::move semantics for notifications
+ Notification(Notification&&) = default;
+ Notification& operator=(Notification&&) = default;
+
+ /**
+ * Returns true if the notification has been set (i.e., the call to get/waitFor would not
+ * block).
+ */
+ explicit operator bool() const {
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+ return !!_value;
+ }
+
+ /**
+ * If the notification has been set, returns immediately. Otherwise blocks until it becomes set.
+ * If the wait is interrupted, throws an exception.
+ */
+ T& get(OperationContext* txn) {
+ return get();
+ }
+
+ /**
+ * If the notification has been set, returns immediately. Otherwise blocks until it becomes set.
+ * This variant of get cannot be interrupted.
+ */
+ T& get() {
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+ while (!_value) {
+ _condVar.wait(lock);
+ }
+
+ return _value.get();
+ }
+
+ /**
+ * Sets the notification result and wakes up any threads, which might be blocked in the wait
+ * call. Must only be called once for the lifetime of the notification.
+ */
+ void set(T value) {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ invariant(!_value);
+ _value = std::move(value);
+ _condVar.notify_all();
+ }
+
+ /**
+ * If the notification is not set, blocks either until it becomes set or until the waitTimeout
+ * expires. If the wait is interrupted, throws an exception. Otherwise, returns immediately.
+ */
+ bool waitFor(OperationContext* txn, Microseconds waitTimeout) {
+ const auto waitDeadline = Date_t::now() + waitTimeout;
+
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+ return _condVar.wait_until(
+ lock, waitDeadline.toSystemTimePoint(), [&]() { return !!_value; });
+ }
+
+private:
+ mutable stdx::mutex _mutex;
+ mutable stdx::condition_variable _condVar;
+
+ // Protected by mutex and only moves from not-set to set once
+ boost::optional<T> _value{boost::none};
+};
+
+template <>
+class Notification<void> {
+ MONGO_DISALLOW_COPYING(Notification);
+
+public:
+ Notification() = default;
+ Notification(Notification&&) = default;
+ Notification& operator=(Notification&&) = default;
+
+ explicit operator bool() const {
+ return _notification.operator bool();
+ }
+
+ void get(OperationContext* txn) {
+ _notification.get(txn);
+ }
+
+ void get() {
+ _notification.get();
+ }
+
+ void set() {
+ _notification.set(true);
+ }
+
+ bool waitFor(OperationContext* txn, Microseconds waitTimeout) {
+ return _notification.waitFor(txn, waitTimeout);
+ }
+
+private:
+ Notification<bool> _notification;
+};
+
+} // namespace mongo
diff --git a/src/mongo/util/net/SConscript b/src/mongo/util/net/SConscript
index 99bd2581fa6..fa8899561e9 100644
--- a/src/mongo/util/net/SConscript
+++ b/src/mongo/util/net/SConscript
@@ -46,6 +46,7 @@ networkEnv.Library(
"ssl_expiration.cpp",
"ssl_manager.cpp",
"ssl_options.cpp",
+ "thread_idle_callback.cpp",
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/server_options_core',
@@ -77,7 +78,6 @@ env.CppUnitTest(
'sock_test.cpp',
],
LIBDEPS=[
- '$BUILD_DIR/mongo/util/concurrency/synchronization',
'network',
],
)
diff --git a/src/mongo/util/net/message_server_port.cpp b/src/mongo/util/net/message_server_port.cpp
index c64c16f3073..95e9a1d5716 100644
--- a/src/mongo/util/net/message_server_port.cpp
+++ b/src/mongo/util/net/message_server_port.cpp
@@ -42,7 +42,6 @@
#include "mongo/stdx/functional.h"
#include "mongo/stdx/memory.h"
#include "mongo/stdx/thread.h"
-#include "mongo/util/concurrency/synchronization.h"
#include "mongo/util/concurrency/thread_name.h"
#include "mongo/util/concurrency/ticketholder.h"
#include "mongo/util/debug_util.h"
@@ -55,6 +54,7 @@
#include "mongo/util/net/message_server.h"
#include "mongo/util/net/socket_exception.h"
#include "mongo/util/net/ssl_manager.h"
+#include "mongo/util/net/thread_idle_callback.h"
#include "mongo/util/quick_exit.h"
#include "mongo/util/scopeguard.h"
diff --git a/src/mongo/util/net/sock_test.cpp b/src/mongo/util/net/sock_test.cpp
index 93e6cb5d3d2..af6ac166361 100644
--- a/src/mongo/util/net/sock_test.cpp
+++ b/src/mongo/util/net/sock_test.cpp
@@ -39,7 +39,7 @@
#include "mongo/db/server_options.h"
#include "mongo/stdx/thread.h"
#include "mongo/unittest/unittest.h"
-#include "mongo/util/concurrency/synchronization.h"
+#include "mongo/util/concurrency/notification.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/net/socket_exception.h"
@@ -59,16 +59,16 @@ SocketPair socketPair(const int type, const int protocol = 0);
#if defined(_WIN32)
namespace detail {
-void awaitAccept(SOCKET* acceptSock, SOCKET listenSock, Notification& notify) {
+void awaitAccept(SOCKET* acceptSock, SOCKET listenSock, Notification<void>& notify) {
*acceptSock = INVALID_SOCKET;
const SOCKET result = ::accept(listenSock, NULL, 0);
if (result != INVALID_SOCKET) {
*acceptSock = result;
}
- notify.notifyOne();
+ notify.set();
}
-void awaitConnect(SOCKET* connectSock, const struct addrinfo& where, Notification& notify) {
+void awaitConnect(SOCKET* connectSock, const struct addrinfo& where, Notification<void>& notify) {
*connectSock = INVALID_SOCKET;
SOCKET newSock = ::socket(where.ai_family, where.ai_socktype, where.ai_protocol);
if (newSock != INVALID_SOCKET) {
@@ -77,7 +77,7 @@ void awaitConnect(SOCKET* connectSock, const struct addrinfo& where, Notificatio
*connectSock = newSock;
}
}
- notify.notifyOne();
+ notify.set();
}
} // namespace detail
@@ -144,17 +144,17 @@ SocketPair socketPair(const int type, const int protocol) {
// I'd prefer to avoid trying to do this non-blocking on Windows. Just spin up some
// threads to do the connect and acccept.
- Notification accepted;
+ Notification<void> accepted;
SOCKET acceptSock = INVALID_SOCKET;
stdx::thread acceptor(
stdx::bind(&detail::awaitAccept, &acceptSock, listenSock, stdx::ref(accepted)));
- Notification connected;
+ Notification<void> connected;
SOCKET connectSock = INVALID_SOCKET;
stdx::thread connector(
stdx::bind(&detail::awaitConnect, &connectSock, *connectRes, stdx::ref(connected)));
- connected.waitToBeNotified();
+ connected.get();
connector.join();
if (connectSock == INVALID_SOCKET) {
closesocket(listenSock);
@@ -165,7 +165,7 @@ SocketPair socketPair(const int type, const int protocol) {
return SocketPair();
}
- accepted.waitToBeNotified();
+ accepted.get();
acceptor.join();
if (acceptSock == INVALID_SOCKET) {
closesocket(listenSock);
diff --git a/src/mongo/util/concurrency/synchronization.cpp b/src/mongo/util/net/thread_idle_callback.cpp
index 2562dbbe89c..f9259d44f5a 100644
--- a/src/mongo/util/concurrency/synchronization.cpp
+++ b/src/mongo/util/net/thread_idle_callback.cpp
@@ -1,6 +1,5 @@
-// synchronization.cpp
-
-/* Copyright 2010 10gen Inc.
+/**
+ * Copyright (C) 2016 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
@@ -29,16 +28,18 @@
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
-#include "synchronization.h"
+#include "mongo/platform/basic.h"
-#include <boost/date_time/posix_time/posix_time.hpp>
+#include "mongo/util/net/thread_idle_callback.h"
+#include "mongo/util/assert_util.h"
#include "mongo/util/log.h"
namespace mongo {
-
namespace {
+
ThreadIdleCallback threadIdleCallback;
+
} // namespace
void registerThreadIdleCallback(ThreadIdleCallback callback) {
@@ -58,23 +59,4 @@ void markThreadIdle() {
}
}
-Notification::Notification() {
- lookFor = 1;
- cur = 0;
-}
-
-void Notification::waitToBeNotified() {
- stdx::unique_lock<stdx::mutex> lock(_mutex);
- while (lookFor != cur)
- _condition.wait(lock);
- lookFor++;
-}
-
-void Notification::notifyOne() {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- verify(cur != lookFor);
- cur++;
- _condition.notify_one();
-}
-
} // namespace mongo
diff --git a/src/mongo/util/concurrency/synchronization.h b/src/mongo/util/net/thread_idle_callback.h
index 2e953976932..65500607be1 100644
--- a/src/mongo/util/concurrency/synchronization.h
+++ b/src/mongo/util/net/thread_idle_callback.h
@@ -1,6 +1,5 @@
-// synchronization.h
-
-/* Copyright 2010 10gen Inc.
+/**
+ * Copyright (C) 2016 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
@@ -29,60 +28,26 @@
#pragma once
-#include "mongo/stdx/condition_variable.h"
-#include "mongo/stdx/mutex.h"
-#include "mongo/util/concurrency/mutex.h"
-
namespace mongo {
/**
- * Type of callback functions that can be invoked when markThreadIdle() runs.
- * These functions *must not throw*.
+ * Type of callback functions that can be invoked when markThreadIdle() runs. These functions *must
+ * not throw*.
*/
typedef void (*ThreadIdleCallback)();
/**
- * Informs the registered listener that this thread believes it may go idle for an extended
- * period. The caller should avoid calling markThreadIdle at a high rate, as it can both be
- * moderately costly itself and in terms of distributed overhead for subsequent malloc/free
- * calls.
+ * Informs the registered listener that this thread believes it may go idle for an extended period.
+ * The caller should avoid calling markThreadIdle at a high rate, as it can both be moderately
+ * costly itself and in terms of distributed overhead for subsequent malloc/free calls.
*/
void markThreadIdle();
/**
- * Allows for registering callbacks for when threads go idle and become active. This is used
- * by TCMalloc to return freed memory to its central freelist at appropriate points, so it
- * won't happen during critical sections while holding locks. Calling this is not thread-safe.
+ * Allows for registering callbacks for when threads go idle and become active. This is used by
+ * TCMalloc to return freed memory to its central freelist at appropriate points, so it won't happen
+ * during critical sections while holding locks. Calling this is not thread-safe.
*/
void registerThreadIdleCallback(ThreadIdleCallback callback);
-/*
- * A class to establish a synchronization point between two threads. One thread is the waiter
- * and one is the notifier. After the notification event, both proceed normally.
- *
- * This class is thread-safe.
- */
-class Notification {
- MONGO_DISALLOW_COPYING(Notification);
-
-public:
- Notification();
-
- /*
- * Blocks until the method 'notifyOne()' is called.
- */
- void waitToBeNotified();
-
- /*
- * Notifies the waiter of '*this' that it can proceed. Can only be called once.
- */
- void notifyOne();
-
-private:
- stdx::mutex _mutex; // protects state below
- unsigned long long lookFor;
- unsigned long long cur;
- stdx::condition_variable _condition; // cond over _notified being true
-};
-
} // namespace mongo
diff --git a/src/mongo/util/tcmalloc_server_status_section.cpp b/src/mongo/util/tcmalloc_server_status_section.cpp
index cc2d6725d9b..7a8eb49e25b 100644
--- a/src/mongo/util/tcmalloc_server_status_section.cpp
+++ b/src/mongo/util/tcmalloc_server_status_section.cpp
@@ -38,9 +38,9 @@
#include "mongo/base/init.h"
#include "mongo/db/commands/server_status.h"
-#include "mongo/util/concurrency/synchronization.h"
#include "mongo/util/log.h"
#include "mongo/util/net/listen.h"
+#include "mongo/util/net/thread_idle_callback.h"
namespace mongo {