diff options
author | matt dannenberg <matt.dannenberg@10gen.com> | 2014-05-16 09:03:29 -0400 |
---|---|---|
committer | matt dannenberg <matt.dannenberg@10gen.com> | 2014-05-19 05:12:26 -0400 |
commit | 4ab25eac664ff6775bdb57a3689a3a2f4092e6b0 (patch) | |
tree | 70cba59309b94f72278b789e58e40ffb514ac515 /src/mongo/db/repl | |
parent | 869e0a15066618c67cb06ba1869bd0532e258db7 (diff) | |
download | mongo-4ab25eac664ff6775bdb57a3689a3a2f4092e6b0.tar.gz |
SERVER-13645 move multiSyncApply(), multiInitialSyncApply(), and initializeWriterThread() to sync_tail.cpp/.h
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r-- | src/mongo/db/repl/initial_sync.cpp | 68 | ||||
-rw-r--r-- | src/mongo/db/repl/initial_sync.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 68 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.h | 4 |
4 files changed, 72 insertions, 72 deletions
diff --git a/src/mongo/db/repl/initial_sync.cpp b/src/mongo/db/repl/initial_sync.cpp index b944653c4d7..7f99106f25a 100644 --- a/src/mongo/db/repl/initial_sync.cpp +++ b/src/mongo/db/repl/initial_sync.cpp @@ -37,74 +37,6 @@ namespace mongo { extern unsigned replSetForceInitialSyncFailure; namespace replset { - static AtomicUInt32 replWriterWorkerId; - void initializeWriterThread() { - // Only do this once per thread - if (!ClientBasic::getCurrent()) { - string threadName = str::stream() << "repl writer worker " - << replWriterWorkerId.addAndFetch(1); - Client::initThread( threadName.c_str() ); - // allow us to get through the magic barrier - Lock::ParallelBatchWriterMode::iAmABatchParticipant(); - replLocalAuth(); - } - } - - // This free function is used by the writer threads to apply each op - void multiSyncApply(const std::vector<BSONObj>& ops, SyncTail* st) { - initializeWriterThread(); - - // convert update operations only for 2.2.1 or greater, because we need guaranteed - // idempotent operations for this to work. See SERVER-6825 - bool convertUpdatesToUpserts = theReplSet->oplogVersion > 1 ? true : false; - - for (std::vector<BSONObj>::const_iterator it = ops.begin(); - it != ops.end(); - ++it) { - try { - if (!st->syncApply(*it, convertUpdatesToUpserts)) { - fassertFailedNoTrace(16359); - } - } catch (const DBException& e) { - error() << "writer worker caught exception: " << causedBy(e) - << " on: " << it->toString() << endl; - fassertFailedNoTrace(16360); - } - } - } - - // This free function is used by the initial sync writer threads to apply each op - void multiInitialSyncApply(const std::vector<BSONObj>& ops, SyncTail* st) { - initializeWriterThread(); - for (std::vector<BSONObj>::const_iterator it = ops.begin(); - it != ops.end(); - ++it) { - try { - if (!st->syncApply(*it)) { - bool status; - { - Lock::GlobalWrite lk; - status = st->shouldRetry(*it); - } - if (status) { - // retry - if (!st->syncApply(*it)) { - fassertFailedNoTrace(15915); - } - } - // If shouldRetry() returns false, fall through. - // This can happen if the document that was moved and missed by Cloner - // subsequently got deleted and no longer exists on the Sync Target at all - } - } - catch (const DBException& e) { - error() << "exception: " << causedBy(e) << " on: " << it->toString() << endl; - fassertFailedNoTrace(16361); - } - } - } - - InitialSync::InitialSync(BackgroundSyncInterface *q) : SyncTail(q) {} diff --git a/src/mongo/db/repl/initial_sync.h b/src/mongo/db/repl/initial_sync.h index 91d53346596..a86c61c56c8 100644 --- a/src/mongo/db/repl/initial_sync.h +++ b/src/mongo/db/repl/initial_sync.h @@ -35,10 +35,6 @@ namespace replset { class BackgroundSyncInterface; - // These free functions are used by the thread pool workers to write ops to the db. - void multiSyncApply(const std::vector<BSONObj>& ops, SyncTail* st); - void multiInitialSyncApply(const std::vector<BSONObj>& ops, SyncTail* st); - /** * Initial clone and sync */ diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index b4c16fa47d7..8ce9744818b 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -526,5 +526,73 @@ namespace replset { } // endif slaveDelay } + static AtomicUInt32 replWriterWorkerId; + + void initializeWriterThread() { + // Only do this once per thread + if (!ClientBasic::getCurrent()) { + string threadName = str::stream() << "repl writer worker " + << replWriterWorkerId.addAndFetch(1); + Client::initThread( threadName.c_str() ); + // allow us to get through the magic barrier + Lock::ParallelBatchWriterMode::iAmABatchParticipant(); + replLocalAuth(); + } + } + + // This free function is used by the writer threads to apply each op + void multiSyncApply(const std::vector<BSONObj>& ops, SyncTail* st) { + initializeWriterThread(); + + // convert update operations only for 2.2.1 or greater, because we need guaranteed + // idempotent operations for this to work. See SERVER-6825 + bool convertUpdatesToUpserts = theReplSet->oplogVersion > 1 ? true : false; + + for (std::vector<BSONObj>::const_iterator it = ops.begin(); + it != ops.end(); + ++it) { + try { + if (!st->syncApply(*it, convertUpdatesToUpserts)) { + fassertFailedNoTrace(16359); + } + } catch (const DBException& e) { + error() << "writer worker caught exception: " << causedBy(e) + << " on: " << it->toString() << endl; + fassertFailedNoTrace(16360); + } + } + } + + // This free function is used by the initial sync writer threads to apply each op + void multiInitialSyncApply(const std::vector<BSONObj>& ops, SyncTail* st) { + initializeWriterThread(); + for (std::vector<BSONObj>::const_iterator it = ops.begin(); + it != ops.end(); + ++it) { + try { + if (!st->syncApply(*it)) { + bool status; + { + Lock::GlobalWrite lk; + status = st->shouldRetry(*it); + } + if (status) { + // retry + if (!st->syncApply(*it)) { + fassertFailedNoTrace(15915); + } + } + // If shouldRetry() returns false, fall through. + // This can happen if the document that was moved and missed by Cloner + // subsequently got deleted and no longer exists on the Sync Target at all + } + } + catch (const DBException& e) { + error() << "exception: " << causedBy(e) << " on: " << it->toString() << endl; + fassertFailedNoTrace(16361); + } + } + } + } // namespace replset } // namespace mongo diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index 415dd54859b..7f3e4567dc7 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -134,5 +134,9 @@ namespace replset { void setOplogVersion(const BSONObj& op); }; + // These free functions are used by the thread pool workers to write ops to the db. + void multiSyncApply(const std::vector<BSONObj>& ops, SyncTail* st); + void multiInitialSyncApply(const std::vector<BSONObj>& ops, SyncTail* st); + } // namespace replset } // namespace mongo |