diff options
24 files changed, 283 insertions, 316 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 94464e37819..9e53d26a224 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -367,7 +367,6 @@ env.Library( '$BUILD_DIR/mongo/db/common', '$BUILD_DIR/mongo/db/repl/repl_coordinator_global', '$BUILD_DIR/mongo/db/service_context', - '$BUILD_DIR/mongo/util/concurrency/synchronization', 'range_arithmetic', ], ) diff --git a/src/mongo/db/concurrency/lock_state.cpp b/src/mongo/db/concurrency/lock_state.cpp index d16c9f4e11d..2d167091b52 100644 --- a/src/mongo/db/concurrency/lock_state.cpp +++ b/src/mongo/db/concurrency/lock_state.cpp @@ -38,7 +38,6 @@ #include "mongo/db/service_context.h" #include "mongo/platform/compiler.h" #include "mongo/util/background.h" -#include "mongo/util/concurrency/synchronization.h" #include "mongo/util/concurrency/ticketholder.h" #include "mongo/util/debug_util.h" #include "mongo/util/log.h" diff --git a/src/mongo/db/range_deleter.cpp b/src/mongo/db/range_deleter.cpp index 666c8ee45a9..be280d8aa4a 100644 --- a/src/mongo/db/range_deleter.cpp +++ b/src/mongo/db/range_deleter.cpp @@ -39,7 +39,6 @@ #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/service_context.h" #include "mongo/db/write_concern_options.h" -#include "mongo/util/concurrency/synchronization.h" #include "mongo/util/exit.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" @@ -203,7 +202,7 @@ void RangeDeleter::stopWorkers() { bool RangeDeleter::queueDelete(OperationContext* txn, const RangeDeleterOptions& options, - Notification* notifyDone, + Notification<void>* doneSignal, std::string* errMsg) { string dummy; if (errMsg == NULL) @@ -214,7 +213,7 @@ bool RangeDeleter::queueDelete(OperationContext* txn, const BSONObj& max(options.range.maxKey); unique_ptr<RangeDeleteEntry> toDelete(new RangeDeleteEntry(options)); - toDelete->notifyDone = notifyDone; + toDelete->doneSignal = doneSignal; { stdx::lock_guard<stdx::mutex> sl(_queueMutex); @@ -507,8 +506,8 @@ void RangeDeleter::doWork() { deletePtrElement(&_deleteSet, &setEntry); _deletesInProgress--; - if (nextTask->notifyDone) { - nextTask->notifyDone->notifyOne(); + if (nextTask->doneSignal) { + nextTask->doneSignal->set(); } } @@ -563,7 +562,7 @@ void RangeDeleter::recordDelStats(DeleteJobStats* newStat) { } RangeDeleteEntry::RangeDeleteEntry(const RangeDeleterOptions& options) - : options(options), notifyDone(NULL) {} + : options(options), doneSignal(nullptr) {} BSONObj RangeDeleteEntry::toBSON() const { BSONObjBuilder builder; diff --git a/src/mongo/db/range_deleter.h b/src/mongo/db/range_deleter.h index 39f30e17a62..776f8a825fc 100644 --- a/src/mongo/db/range_deleter.h +++ b/src/mongo/db/range_deleter.h @@ -42,7 +42,7 @@ #include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" #include "mongo/util/concurrency/mutex.h" -#include "mongo/util/concurrency/synchronization.h" +#include "mongo/util/concurrency/notification.h" #include "mongo/util/time_support.h" namespace mongo { @@ -138,7 +138,7 @@ public: */ bool queueDelete(OperationContext* txn, const RangeDeleterOptions& options, - Notification* notifyDone, + Notification<void>* doneSignal, std::string* errMsg); /** @@ -286,7 +286,7 @@ struct RangeDeleteEntry { // Not owned here. // Important invariant: Can only be set and used by one thread. - Notification* notifyDone; + Notification<void>* doneSignal; // Time since the last time we reported this object. Date_t lastLoggedTS; diff --git a/src/mongo/db/range_deleter_test.cpp b/src/mongo/db/range_deleter_test.cpp index a2fc38dc6c6..34c3a4b5739 100644 --- a/src/mongo/db/range_deleter_test.cpp +++ b/src/mongo/db/range_deleter_test.cpp @@ -26,6 +26,8 @@ * it in the license file. */ +#include "mongo/platform/basic.h" + #include <string> #include "mongo/db/field_parser.h" @@ -96,13 +98,13 @@ TEST(QueuedDelete, ShouldWaitCursor) { env->addCursorId(ns, 345); - Notification notifyDone; + Notification<void> doneSignal; RangeDeleterOptions deleterOptions( KeyRange(ns, BSON("x" << 0), BSON("x" << 10), BSON("x" << 1))); deleterOptions.waitForOpenCursors = true; ASSERT_TRUE( - deleter.queueDelete(noTxn, deleterOptions, ¬ifyDone, NULL /* errMsg not needed */)); + deleter.queueDelete(noTxn, deleterOptions, &doneSignal, NULL /* errMsg not needed */)); env->waitForNthGetCursor(1u); @@ -113,7 +115,7 @@ TEST(QueuedDelete, ShouldWaitCursor) { env->addCursorId(ns, 200); env->removeCursorId(ns, 345); - notifyDone.waitToBeNotified(); + doneSignal.get(noTxn); ASSERT_TRUE(env->deleteOccured()); const DeletedRange deletedChunk(env->getLastDelete()); @@ -141,12 +143,12 @@ TEST(QueuedDelete, StopWhileWaitingCursor) { env->addCursorId(ns, 345); - Notification notifyDone; + Notification<void> doneSignal; RangeDeleterOptions deleterOptions( KeyRange(ns, BSON("x" << 0), BSON("x" << 10), BSON("x" << 1))); deleterOptions.waitForOpenCursors = true; ASSERT_TRUE( - deleter.queueDelete(noTxn, deleterOptions, ¬ifyDone, NULL /* errMsg not needed */)); + deleter.queueDelete(noTxn, deleterOptions, &doneSignal, NULL /* errMsg not needed */)); env->waitForNthGetCursor(1u); @@ -279,31 +281,31 @@ TEST(MixedDeletes, MultipleDeletes) { env->addCursorId(blockedNS, 345); env->pauseDeletes(); - Notification notifyDone1; + Notification<void> doneSignal1; RangeDeleterOptions deleterOption1( KeyRange(ns, BSON("x" << 10), BSON("x" << 20), BSON("x" << 1))); deleterOption1.waitForOpenCursors = true; ASSERT_TRUE( - deleter.queueDelete(noTxn, deleterOption1, ¬ifyDone1, NULL /* don't care errMsg */)); + deleter.queueDelete(noTxn, deleterOption1, &doneSignal1, NULL /* don't care errMsg */)); env->waitForNthPausedDelete(1u); // Make sure that the delete is already in progress before proceeding. ASSERT_EQUALS(1U, deleter.getDeletesInProgress()); - Notification notifyDone2; + Notification<void> doneSignal2; RangeDeleterOptions deleterOption2( KeyRange(blockedNS, BSON("x" << 20), BSON("x" << 30), BSON("x" << 1))); deleterOption2.waitForOpenCursors = true; ASSERT_TRUE( - deleter.queueDelete(noTxn, deleterOption2, ¬ifyDone2, NULL /* don't care errMsg */)); + deleter.queueDelete(noTxn, deleterOption2, &doneSignal2, NULL /* don't care errMsg */)); - Notification notifyDone3; + Notification<void> doneSignal3; RangeDeleterOptions deleterOption3( KeyRange(ns, BSON("x" << 30), BSON("x" << 40), BSON("x" << 1))); deleterOption3.waitForOpenCursors = true; ASSERT_TRUE( - deleter.queueDelete(noTxn, deleterOption3, ¬ifyDone3, NULL /* don't care errMsg */)); + deleter.queueDelete(noTxn, deleterOption3, &doneSignal3, NULL /* don't care errMsg */)); // Now, the setup is: // { x: 10 } => { x: 20 } in progress. @@ -317,7 +319,7 @@ TEST(MixedDeletes, MultipleDeletes) { // Let the first delete proceed. env->resumeOneDelete(); - notifyDone1.waitToBeNotified(); + doneSignal1.get(noTxn); ASSERT_TRUE(env->deleteOccured()); @@ -332,7 +334,7 @@ TEST(MixedDeletes, MultipleDeletes) { // Let the second delete proceed. env->resumeOneDelete(); - notifyDone3.waitToBeNotified(); + doneSignal3.get(noTxn); DeletedRange deleted2(env->getLastDelete()); @@ -347,7 +349,7 @@ TEST(MixedDeletes, MultipleDeletes) { env->removeCursorId(blockedNS, 345); // Let the last delete proceed. env->resumeOneDelete(); - notifyDone2.waitToBeNotified(); + doneSignal2.get(noTxn); DeletedRange deleted3(env->getLastDelete()); diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp index 185b8bab56e..20cc7e2d4d6 100644 --- a/src/mongo/db/s/collection_sharding_state.cpp +++ b/src/mongo/db/s/collection_sharding_state.cpp @@ -242,13 +242,13 @@ bool CollectionShardingState::_checkShardVersionOk(OperationContext* txn, // Set this for error messaging purposes before potentially returning false. *actualShardVersion = (_metadata ? _metadata->getShardVersion() : ChunkVersion::UNSHARDED()); - if (_sourceMgr && _sourceMgr->getMigrationCriticalSection()) { + if (_sourceMgr && _sourceMgr->getMigrationCriticalSectionSignal()) { *errmsg = str::stream() << "migration commit in progress for " << _nss.ns(); // Set migration critical section on operation sharding state: operation will wait for the // migration to finish before returning failure and retrying. - OperationShardingState::get(txn).setMigrationCriticalSection( - _sourceMgr->getMigrationCriticalSection()); + OperationShardingState::get(txn).setMigrationCriticalSectionSignal( + _sourceMgr->getMigrationCriticalSectionSignal()); return false; } diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index 49b07775b28..12d0ab201fb 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -253,7 +253,7 @@ Status MigrationSourceManager::enterCriticalSection(OperationContext* txn) { // IMPORTANT: After this line, the critical section is in place and needs to be rolled back // if anything fails, which would prevent commit to the config servers. - _critSec = std::make_shared<CriticalSectionState>(); + _critSecSignal = std::make_shared<Notification<void>>(); } log() << "Successfully entered critical section."; @@ -527,8 +527,7 @@ void MigrationSourceManager::_cleanup(OperationContext* txn) { // Leave the critical section. if (_state == kCriticalSection) { - _critSec->exitCriticalSection(); - _critSec.reset(); + _critSecSignal->set(); } } @@ -546,27 +545,4 @@ void MigrationSourceManager::_cleanup(OperationContext* txn) { _state = kDone; } -MigrationSourceManager::CriticalSectionState::CriticalSectionState() = default; - -bool MigrationSourceManager::CriticalSectionState::waitUntilOutOfCriticalSection( - Microseconds waitTimeout) { - const auto waitDeadline = Date_t::now() + waitTimeout; - - stdx::unique_lock<stdx::mutex> sl(_criticalSectionMutex); - while (_inCriticalSection) { - if (stdx::cv_status::timeout == - _criticalSectionCV.wait_until(sl, waitDeadline.toSystemTimePoint())) { - return false; - } - } - - return true; -} - -void MigrationSourceManager::CriticalSectionState::exitCriticalSection() { - stdx::unique_lock<stdx::mutex> sl(_criticalSectionMutex); - _inCriticalSection = false; - _criticalSectionCV.notify_all(); -} - } // namespace mongo diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h index 292f3c52045..16eddc2351b 100644 --- a/src/mongo/db/s/migration_source_manager.h +++ b/src/mongo/db/s/migration_source_manager.h @@ -28,14 +28,12 @@ #pragma once -#include <list> -#include <set> #include <string> #include "mongo/base/disallow_copying.h" #include "mongo/s/move_chunk_request.h" #include "mongo/s/shard_key_pattern.h" -#include "mongo/stdx/condition_variable.h" +#include "mongo/util/concurrency/notification.h" #include "mongo/util/timer.h" namespace mongo { @@ -73,8 +71,6 @@ class MigrationSourceManager { MONGO_DISALLOW_COPYING(MigrationSourceManager); public: - class CriticalSectionState; - /** * Instantiates a new migration source manager with the specified migration parameters. Must be * called with the distributed lock acquired in advance (not asserted). @@ -174,8 +170,8 @@ public: * * Must be called with some form of lock on the collection namespace. */ - std::shared_ptr<CriticalSectionState> getMigrationCriticalSection() const { - return _critSec; + std::shared_ptr<Notification<void>> getMigrationCriticalSectionSignal() const { + return _critSecSignal; } private: @@ -212,39 +208,7 @@ private: // Whether the source manager is in a critical section. Tracked as a shared pointer so that // callers don't have to hold collection lock in order to wait on it. Available after the // critical section stage has completed. - std::shared_ptr<CriticalSectionState> _critSec; -}; - -/** - * This object is instantiated once the migration logic enters critical section. It contains all - * the state which is associated with being in a critical section, such as the bumped metadata - * version (which has not yet been reflected on the config server). - */ -class MigrationSourceManager::CriticalSectionState { - MONGO_DISALLOW_COPYING(CriticalSectionState); - -public: - CriticalSectionState(); - - /** - * Blocks until the critical section completes. Returns true if the wait succeeded and the - * critical section is no longer active, or false if the waitTimeout was exceeded. - */ - bool waitUntilOutOfCriticalSection(Microseconds waitTimeout); - - /** - * To be called when the critical section has completed. Signals any threads sitting blocked in - * waitUntilOutOfCriticalSection. Must only be used once for the lifetime of this object. - */ - void exitCriticalSection(); - -private: - // Only moves from true to false once. Happens under the critical section mutex and the critical - // section will be signalled. - bool _inCriticalSection{true}; - - stdx::mutex _criticalSectionMutex; - stdx::condition_variable _criticalSectionCV; + std::shared_ptr<Notification<void>> _critSecSignal; }; } // namespace mongo diff --git a/src/mongo/db/s/operation_sharding_state.cpp b/src/mongo/db/s/operation_sharding_state.cpp index c6fe82ff6d0..97991dc27f8 100644 --- a/src/mongo/db/s/operation_sharding_state.cpp +++ b/src/mongo/db/s/operation_sharding_state.cpp @@ -1,30 +1,30 @@ -/* -* Copyright (C) 2015 MongoDB Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ #include "mongo/platform/basic.h" @@ -96,26 +96,27 @@ void OperationShardingState::setShardVersion(NamespaceString nss, ChunkVersion n _hasVersion = true; } -bool OperationShardingState::waitForMigrationCriticalSection(OperationContext* txn) { +bool OperationShardingState::waitForMigrationCriticalSectionSignal(OperationContext* txn) { // Must not block while holding a lock invariant(!txn->lockState()->isLocked()); - if (_migrationCriticalSection) { - _migrationCriticalSection->waitUntilOutOfCriticalSection( + if (_migrationCriticalSectionSignal) { + _migrationCriticalSectionSignal->waitFor( + txn, txn->hasDeadline() ? std::min(txn->getRemainingMaxTimeMicros(), kMaxWaitForMigrationCriticalSection) : kMaxWaitForMigrationCriticalSection); - _migrationCriticalSection = nullptr; + _migrationCriticalSectionSignal = nullptr; return true; } return false; } -void OperationShardingState::setMigrationCriticalSection( - std::shared_ptr<MigrationSourceManager::CriticalSectionState> critSec) { - invariant(critSec); - _migrationCriticalSection = std::move(critSec); +void OperationShardingState::setMigrationCriticalSectionSignal( + std::shared_ptr<Notification<void>> critSecSignal) { + invariant(critSecSignal); + _migrationCriticalSectionSignal = std::move(critSecSignal); } void OperationShardingState::_clear() { diff --git a/src/mongo/db/s/operation_sharding_state.h b/src/mongo/db/s/operation_sharding_state.h index f52cbd9f702..0ef397b7e92 100644 --- a/src/mongo/db/s/operation_sharding_state.h +++ b/src/mongo/db/s/operation_sharding_state.h @@ -99,15 +99,14 @@ public: * Returns true if the call actually waited because of migration critical section (regardless if * whether it timed out or not), false if there was no active migration critical section. */ - bool waitForMigrationCriticalSection(OperationContext* txn); + bool waitForMigrationCriticalSectionSignal(OperationContext* txn); /** * Setting this value indicates that when the version check failed, there was an active * migration for the namespace and that it would be prudent to wait for the critical section to * complete before retrying so the router doesn't make wasteful requests. */ - void setMigrationCriticalSection( - std::shared_ptr<MigrationSourceManager::CriticalSectionState> critSec); + void setMigrationCriticalSectionSignal(std::shared_ptr<Notification<void>> critSecSignal); private: /** @@ -122,7 +121,7 @@ private: // This value will only be non-null if version check during the operation execution failed due // to stale version and there was a migration for that namespace, which was in critical section. - std::shared_ptr<MigrationSourceManager::CriticalSectionState> _migrationCriticalSection; + std::shared_ptr<Notification<void>> _migrationCriticalSectionSignal; }; /** diff --git a/src/mongo/db/s/set_shard_version_command.cpp b/src/mongo/db/s/set_shard_version_command.cpp index acdb419538d..aa0ed124df2 100644 --- a/src/mongo/db/s/set_shard_version_command.cpp +++ b/src/mongo/db/s/set_shard_version_command.cpp @@ -245,12 +245,12 @@ public: if (requestedVersion < collectionShardVersion && requestedVersion.epoch() == collectionShardVersion.epoch()) { if (css->getMigrationSourceManager()) { - auto critSec = - css->getMigrationSourceManager()->getMigrationCriticalSection(); - if (critSec) { + auto critSecSignal = + css->getMigrationSourceManager()->getMigrationCriticalSectionSignal(); + if (critSecSignal) { autoColl.reset(); log() << "waiting till out of critical section"; - critSec->waitUntilOutOfCriticalSection(Seconds(10)); + critSecSignal->waitFor(txn, Seconds(10)); } } @@ -267,12 +267,12 @@ public: // Needed b/c when the last chunk is moved off a shard, the version gets reset // to zero, which should require a reload. if (css->getMigrationSourceManager()) { - auto critSec = - css->getMigrationSourceManager()->getMigrationCriticalSection(); - if (critSec) { + auto critSecSignal = + css->getMigrationSourceManager()->getMigrationCriticalSectionSignal(); + if (critSecSignal) { autoColl.reset(); log() << "waiting till out of critical section"; - critSec->waitUntilOutOfCriticalSection(Seconds(10)); + critSecSignal->waitFor(txn, Seconds(10)); } } diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp index 46609016cfa..01514ab3738 100644 --- a/src/mongo/db/s/sharding_state.cpp +++ b/src/mongo/db/s/sharding_state.cpp @@ -310,7 +310,7 @@ Status ShardingState::onStaleShardVersion(OperationContext* txn, // Ensure any ongoing migrations have completed auto& oss = OperationShardingState::get(txn); - oss.waitForMigrationCriticalSection(txn); + oss.waitForMigrationCriticalSectionSignal(txn); ChunkVersion collectionShardVersion; diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h index 6200196cb02..be7a8229a4e 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h @@ -40,7 +40,6 @@ #include "mongo/db/storage/record_store.h" #include "mongo/platform/atomic_word.h" #include "mongo/stdx/mutex.h" -#include "mongo/util/concurrency/synchronization.h" #include "mongo/util/fail_point_service.h" /** diff --git a/src/mongo/dbtests/threadedtests.cpp b/src/mongo/dbtests/threadedtests.cpp index 5ab410361eb..d2f8c4a7ea4 100644 --- a/src/mongo/dbtests/threadedtests.cpp +++ b/src/mongo/dbtests/threadedtests.cpp @@ -49,7 +49,6 @@ #include "mongo/util/concurrency/old_thread_pool.h" #include "mongo/util/concurrency/old_thread_pool.h" #include "mongo/util/concurrency/rwlock.h" -#include "mongo/util/concurrency/synchronization.h" #include "mongo/util/concurrency/ticketholder.h" #include "mongo/util/log.h" #include "mongo/util/timer.h" @@ -610,59 +609,6 @@ private: } }; -class CondSlack : public ThreadedTest<17> { - Notification n; - -public: - CondSlack() { - k = 0; - done = false; - a = b = 0; - locks = 0; - } - -private: - unsigned a, b; - virtual void validate() { - cout << "CondSlack useful work fraction: " << ((double)a) / b << " locks:" << locks << endl; - } - unsigned locks; - volatile int k; - void watch() { - while (1) { - b++; - if (k) { - a++; - } - sleepmillis(0); - if (done) - break; - } - } - volatile bool done; - virtual void subthread(int x) { - if (x == 1) { - n.notifyOne(); - watch(); - return; - } - Timer t; - while (1) { - n.waitToBeNotified(); - verify(k == 0); - k = 1; - // not very long, we'd like to simulate about 100K locks per second - sleepalittle(); - k = 0; - locks++; - n.notifyOne(); - if (done || t.millis() > 1500) - break; - } - done = true; - } -}; - const int WriteLocksAreGreedy_ThreadCount = 3; class WriteLocksAreGreedy : public ThreadedTest<WriteLocksAreGreedy_ThreadCount> { public: @@ -778,7 +724,6 @@ public: // would have very little slack. add<Slack<SimpleMutex, stdx::lock_guard<SimpleMutex>>>(); add<Slack<SimpleRWLock, SimpleRWLock::Exclusive>>(); - add<CondSlack>(); add<UpgradableTest>(); diff --git a/src/mongo/util/SConscript b/src/mongo/util/SConscript index e38ba85ef7d..289989edc04 100644 --- a/src/mongo/util/SConscript +++ b/src/mongo/util/SConscript @@ -144,7 +144,6 @@ env.Library( LIBDEPS=[ '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/platform/platform', - '$BUILD_DIR/mongo/util/concurrency/synchronization', '$BUILD_DIR/third_party/shim_allocator', '$BUILD_DIR/third_party/shim_boost', '$BUILD_DIR/third_party/shim_tz', @@ -256,7 +255,6 @@ env.CppUnitTest( ], LIBDEPS=[ "background_job", - "concurrency/synchronization", ], ) diff --git a/src/mongo/util/background_job_test.cpp b/src/mongo/util/background_job_test.cpp index 1bd23d47792..57d403de614 100644 --- a/src/mongo/util/background_job_test.cpp +++ b/src/mongo/util/background_job_test.cpp @@ -32,25 +32,18 @@ #include "mongo/stdx/thread.h" #include "mongo/unittest/unittest.h" #include "mongo/util/background.h" -#include "mongo/util/concurrency/synchronization.h" +#include "mongo/util/concurrency/notification.h" #include "mongo/util/time_support.h" +namespace mongo { namespace { -using mongo::AtomicWord; -using mongo::BackgroundJob; -using mongo::MsgAssertionException; -using mongo::stdx::mutex; -using mongo::Notification; - -namespace stdx = mongo::stdx; - -class TestJob final : public mongo::BackgroundJob { +class TestJob final : public BackgroundJob { public: TestJob(bool selfDelete, AtomicWord<bool>* flag, - Notification* canProceed = nullptr, - Notification* destructorInvoked = nullptr) + Notification<void>* canProceed = nullptr, + Notification<void>* destructorInvoked = nullptr) : BackgroundJob(selfDelete), _flag(flag), _canProceed(canProceed), @@ -58,7 +51,7 @@ public: ~TestJob() override { if (_destructorInvoked) - _destructorInvoked->notifyOne(); + _destructorInvoked->set(); } std::string name() const override { @@ -67,14 +60,14 @@ public: void run() override { if (_canProceed) - _canProceed->waitToBeNotified(); + _canProceed->get(); _flag->store(true); } private: AtomicWord<bool>* const _flag; - Notification* const _canProceed; - Notification* const _destructorInvoked; + Notification<void>* const _canProceed; + Notification<void>* const _destructorInvoked; }; TEST(BackgroundJobBasic, NormalCase) { @@ -87,24 +80,24 @@ TEST(BackgroundJobBasic, NormalCase) { TEST(BackgroundJobBasic, TimeOutCase) { AtomicWord<bool> flag(false); - Notification canProceed; + Notification<void> canProceed; TestJob tj(false, &flag, &canProceed); tj.go(); ASSERT(!tj.wait(1000)); ASSERT_EQUALS(false, flag.load()); - canProceed.notifyOne(); + canProceed.set(); ASSERT(tj.wait()); ASSERT_EQUALS(true, flag.load()); } TEST(BackgroundJobBasic, SelfDeletingCase) { AtomicWord<bool> flag(false); - Notification destructorInvoked; + Notification<void> destructorInvoked; // Though it looks like one, this is not a leak since the job is self deleting. (new TestJob(true, &flag, nullptr, &destructorInvoked))->go(); - destructorInvoked.waitToBeNotified(); + destructorInvoked.get(); ASSERT_EQUALS(true, flag.load()); } @@ -124,17 +117,17 @@ TEST(BackgroundJobLifeCycle, Go) { _hasRun = true; } - _n.waitToBeNotified(); + _n.get(); } void notify() { - _n.notifyOne(); + _n.set(); } private: - mutex _mutex; + stdx::mutex _mutex; bool _hasRun; - Notification _n; + Notification<void> _n; }; Job j; @@ -155,3 +148,4 @@ TEST(BackgroundJobLifeCycle, Go) { } } // namespace +} // namespace mongo
\ No newline at end of file diff --git a/src/mongo/util/concurrency/SConscript b/src/mongo/util/concurrency/SConscript index 3bc52f9acce..fb7462fe586 100644 --- a/src/mongo/util/concurrency/SConscript +++ b/src/mongo/util/concurrency/SConscript @@ -38,16 +38,6 @@ env.Library('ticketholder', '$BUILD_DIR/third_party/shim_boost']) env.Library( - target='synchronization', - source=[ - 'synchronization.cpp' - ], - LIBDEPS=[ - '$BUILD_DIR/mongo/base', - ], -) - -env.Library( target='spin_lock', source=[ "spin_lock.cpp", diff --git a/src/mongo/util/concurrency/notification.h b/src/mongo/util/concurrency/notification.h new file mode 100644 index 00000000000..b8e8c8e0317 --- /dev/null +++ b/src/mongo/util/concurrency/notification.h @@ -0,0 +1,155 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "boost/optional.hpp" + +#include "mongo/base/disallow_copying.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/mutex.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/duration.h" +#include "mongo/util/time_support.h" + +namespace mongo { + +class OperationContext; + +/** + * Allows waiting for a result returned from an asynchronous operation. + */ +template <class T> +class Notification { + MONGO_DISALLOW_COPYING(Notification); + +public: + /** + * Constructs a default notification in the unset state. + */ + Notification() = default; + + // Allow std::move semantics for notifications + Notification(Notification&&) = default; + Notification& operator=(Notification&&) = default; + + /** + * Returns true if the notification has been set (i.e., the call to get/waitFor would not + * block). + */ + explicit operator bool() const { + stdx::unique_lock<stdx::mutex> lock(_mutex); + return !!_value; + } + + /** + * If the notification has been set, returns immediately. Otherwise blocks until it becomes set. + * If the wait is interrupted, throws an exception. + */ + T& get(OperationContext* txn) { + return get(); + } + + /** + * If the notification has been set, returns immediately. Otherwise blocks until it becomes set. + * This variant of get cannot be interrupted. + */ + T& get() { + stdx::unique_lock<stdx::mutex> lock(_mutex); + while (!_value) { + _condVar.wait(lock); + } + + return _value.get(); + } + + /** + * Sets the notification result and wakes up any threads, which might be blocked in the wait + * call. Must only be called once for the lifetime of the notification. + */ + void set(T value) { + stdx::lock_guard<stdx::mutex> lock(_mutex); + invariant(!_value); + _value = std::move(value); + _condVar.notify_all(); + } + + /** + * If the notification is not set, blocks either until it becomes set or until the waitTimeout + * expires. If the wait is interrupted, throws an exception. Otherwise, returns immediately. + */ + bool waitFor(OperationContext* txn, Microseconds waitTimeout) { + const auto waitDeadline = Date_t::now() + waitTimeout; + + stdx::unique_lock<stdx::mutex> lock(_mutex); + return _condVar.wait_until( + lock, waitDeadline.toSystemTimePoint(), [&]() { return !!_value; }); + } + +private: + mutable stdx::mutex _mutex; + mutable stdx::condition_variable _condVar; + + // Protected by mutex and only moves from not-set to set once + boost::optional<T> _value{boost::none}; +}; + +template <> +class Notification<void> { + MONGO_DISALLOW_COPYING(Notification); + +public: + Notification() = default; + Notification(Notification&&) = default; + Notification& operator=(Notification&&) = default; + + explicit operator bool() const { + return _notification.operator bool(); + } + + void get(OperationContext* txn) { + _notification.get(txn); + } + + void get() { + _notification.get(); + } + + void set() { + _notification.set(true); + } + + bool waitFor(OperationContext* txn, Microseconds waitTimeout) { + return _notification.waitFor(txn, waitTimeout); + } + +private: + Notification<bool> _notification; +}; + +} // namespace mongo diff --git a/src/mongo/util/net/SConscript b/src/mongo/util/net/SConscript index 99bd2581fa6..fa8899561e9 100644 --- a/src/mongo/util/net/SConscript +++ b/src/mongo/util/net/SConscript @@ -46,6 +46,7 @@ networkEnv.Library( "ssl_expiration.cpp", "ssl_manager.cpp", "ssl_options.cpp", + "thread_idle_callback.cpp", ], LIBDEPS=[ '$BUILD_DIR/mongo/db/server_options_core', @@ -77,7 +78,6 @@ env.CppUnitTest( 'sock_test.cpp', ], LIBDEPS=[ - '$BUILD_DIR/mongo/util/concurrency/synchronization', 'network', ], ) diff --git a/src/mongo/util/net/message_server_port.cpp b/src/mongo/util/net/message_server_port.cpp index c64c16f3073..95e9a1d5716 100644 --- a/src/mongo/util/net/message_server_port.cpp +++ b/src/mongo/util/net/message_server_port.cpp @@ -42,7 +42,6 @@ #include "mongo/stdx/functional.h" #include "mongo/stdx/memory.h" #include "mongo/stdx/thread.h" -#include "mongo/util/concurrency/synchronization.h" #include "mongo/util/concurrency/thread_name.h" #include "mongo/util/concurrency/ticketholder.h" #include "mongo/util/debug_util.h" @@ -55,6 +54,7 @@ #include "mongo/util/net/message_server.h" #include "mongo/util/net/socket_exception.h" #include "mongo/util/net/ssl_manager.h" +#include "mongo/util/net/thread_idle_callback.h" #include "mongo/util/quick_exit.h" #include "mongo/util/scopeguard.h" diff --git a/src/mongo/util/net/sock_test.cpp b/src/mongo/util/net/sock_test.cpp index 93e6cb5d3d2..af6ac166361 100644 --- a/src/mongo/util/net/sock_test.cpp +++ b/src/mongo/util/net/sock_test.cpp @@ -39,7 +39,7 @@ #include "mongo/db/server_options.h" #include "mongo/stdx/thread.h" #include "mongo/unittest/unittest.h" -#include "mongo/util/concurrency/synchronization.h" +#include "mongo/util/concurrency/notification.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/net/socket_exception.h" @@ -59,16 +59,16 @@ SocketPair socketPair(const int type, const int protocol = 0); #if defined(_WIN32) namespace detail { -void awaitAccept(SOCKET* acceptSock, SOCKET listenSock, Notification& notify) { +void awaitAccept(SOCKET* acceptSock, SOCKET listenSock, Notification<void>& notify) { *acceptSock = INVALID_SOCKET; const SOCKET result = ::accept(listenSock, NULL, 0); if (result != INVALID_SOCKET) { *acceptSock = result; } - notify.notifyOne(); + notify.set(); } -void awaitConnect(SOCKET* connectSock, const struct addrinfo& where, Notification& notify) { +void awaitConnect(SOCKET* connectSock, const struct addrinfo& where, Notification<void>& notify) { *connectSock = INVALID_SOCKET; SOCKET newSock = ::socket(where.ai_family, where.ai_socktype, where.ai_protocol); if (newSock != INVALID_SOCKET) { @@ -77,7 +77,7 @@ void awaitConnect(SOCKET* connectSock, const struct addrinfo& where, Notificatio *connectSock = newSock; } } - notify.notifyOne(); + notify.set(); } } // namespace detail @@ -144,17 +144,17 @@ SocketPair socketPair(const int type, const int protocol) { // I'd prefer to avoid trying to do this non-blocking on Windows. Just spin up some // threads to do the connect and acccept. - Notification accepted; + Notification<void> accepted; SOCKET acceptSock = INVALID_SOCKET; stdx::thread acceptor( stdx::bind(&detail::awaitAccept, &acceptSock, listenSock, stdx::ref(accepted))); - Notification connected; + Notification<void> connected; SOCKET connectSock = INVALID_SOCKET; stdx::thread connector( stdx::bind(&detail::awaitConnect, &connectSock, *connectRes, stdx::ref(connected))); - connected.waitToBeNotified(); + connected.get(); connector.join(); if (connectSock == INVALID_SOCKET) { closesocket(listenSock); @@ -165,7 +165,7 @@ SocketPair socketPair(const int type, const int protocol) { return SocketPair(); } - accepted.waitToBeNotified(); + accepted.get(); acceptor.join(); if (acceptSock == INVALID_SOCKET) { closesocket(listenSock); diff --git a/src/mongo/util/concurrency/synchronization.cpp b/src/mongo/util/net/thread_idle_callback.cpp index 2562dbbe89c..f9259d44f5a 100644 --- a/src/mongo/util/concurrency/synchronization.cpp +++ b/src/mongo/util/net/thread_idle_callback.cpp @@ -1,6 +1,5 @@ -// synchronization.cpp - -/* Copyright 2010 10gen Inc. +/** + * Copyright (C) 2016 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, @@ -29,16 +28,18 @@ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault -#include "synchronization.h" +#include "mongo/platform/basic.h" -#include <boost/date_time/posix_time/posix_time.hpp> +#include "mongo/util/net/thread_idle_callback.h" +#include "mongo/util/assert_util.h" #include "mongo/util/log.h" namespace mongo { - namespace { + ThreadIdleCallback threadIdleCallback; + } // namespace void registerThreadIdleCallback(ThreadIdleCallback callback) { @@ -58,23 +59,4 @@ void markThreadIdle() { } } -Notification::Notification() { - lookFor = 1; - cur = 0; -} - -void Notification::waitToBeNotified() { - stdx::unique_lock<stdx::mutex> lock(_mutex); - while (lookFor != cur) - _condition.wait(lock); - lookFor++; -} - -void Notification::notifyOne() { - stdx::lock_guard<stdx::mutex> lock(_mutex); - verify(cur != lookFor); - cur++; - _condition.notify_one(); -} - } // namespace mongo diff --git a/src/mongo/util/concurrency/synchronization.h b/src/mongo/util/net/thread_idle_callback.h index 2e953976932..65500607be1 100644 --- a/src/mongo/util/concurrency/synchronization.h +++ b/src/mongo/util/net/thread_idle_callback.h @@ -1,6 +1,5 @@ -// synchronization.h - -/* Copyright 2010 10gen Inc. +/** + * Copyright (C) 2016 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, @@ -29,60 +28,26 @@ #pragma once -#include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/mutex.h" -#include "mongo/util/concurrency/mutex.h" - namespace mongo { /** - * Type of callback functions that can be invoked when markThreadIdle() runs. - * These functions *must not throw*. + * Type of callback functions that can be invoked when markThreadIdle() runs. These functions *must + * not throw*. */ typedef void (*ThreadIdleCallback)(); /** - * Informs the registered listener that this thread believes it may go idle for an extended - * period. The caller should avoid calling markThreadIdle at a high rate, as it can both be - * moderately costly itself and in terms of distributed overhead for subsequent malloc/free - * calls. + * Informs the registered listener that this thread believes it may go idle for an extended period. + * The caller should avoid calling markThreadIdle at a high rate, as it can both be moderately + * costly itself and in terms of distributed overhead for subsequent malloc/free calls. */ void markThreadIdle(); /** - * Allows for registering callbacks for when threads go idle and become active. This is used - * by TCMalloc to return freed memory to its central freelist at appropriate points, so it - * won't happen during critical sections while holding locks. Calling this is not thread-safe. + * Allows for registering callbacks for when threads go idle and become active. This is used by + * TCMalloc to return freed memory to its central freelist at appropriate points, so it won't happen + * during critical sections while holding locks. Calling this is not thread-safe. */ void registerThreadIdleCallback(ThreadIdleCallback callback); -/* - * A class to establish a synchronization point between two threads. One thread is the waiter - * and one is the notifier. After the notification event, both proceed normally. - * - * This class is thread-safe. - */ -class Notification { - MONGO_DISALLOW_COPYING(Notification); - -public: - Notification(); - - /* - * Blocks until the method 'notifyOne()' is called. - */ - void waitToBeNotified(); - - /* - * Notifies the waiter of '*this' that it can proceed. Can only be called once. - */ - void notifyOne(); - -private: - stdx::mutex _mutex; // protects state below - unsigned long long lookFor; - unsigned long long cur; - stdx::condition_variable _condition; // cond over _notified being true -}; - } // namespace mongo diff --git a/src/mongo/util/tcmalloc_server_status_section.cpp b/src/mongo/util/tcmalloc_server_status_section.cpp index cc2d6725d9b..7a8eb49e25b 100644 --- a/src/mongo/util/tcmalloc_server_status_section.cpp +++ b/src/mongo/util/tcmalloc_server_status_section.cpp @@ -38,9 +38,9 @@ #include "mongo/base/init.h" #include "mongo/db/commands/server_status.h" -#include "mongo/util/concurrency/synchronization.h" #include "mongo/util/log.h" #include "mongo/util/net/listen.h" +#include "mongo/util/net/thread_idle_callback.h" namespace mongo { |