diff options
author | matt dannenberg <matt.dannenberg@10gen.com> | 2016-03-04 06:00:05 -0500 |
---|---|---|
committer | matt dannenberg <matt.dannenberg@10gen.com> | 2016-03-07 13:19:29 -0500 |
commit | 03eb4777de6cc9bade4041190b837b3c31a88e34 (patch) | |
tree | 8208f193b3d107362c98fbfbd036d829851437be /src/mongo/db/repl/sync_tail.cpp | |
parent | b34164948c4727e1038cb76ce3783570c7f90d15 (diff) | |
download | mongo-03eb4777de6cc9bade4041190b837b3c31a88e34.tar.gz |
SERVER-22965 move OplogEntry into repl/oplog_entry.{cpp,h}
Diffstat (limited to 'src/mongo/db/repl/sync_tail.cpp')
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 40 |
1 files changed, 11 insertions, 29 deletions
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index f6a56395827..cd1e9eace81 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -432,7 +432,7 @@ void prefetchOp(const BSONObj& op) { } // Doles out all the work to the reader pool threads and waits for them to complete -void prefetchOps(const std::deque<SyncTail::OplogEntry>& ops, OldThreadPool* prefetcherPool) { +void prefetchOps(const std::deque<OplogEntry>& ops, OldThreadPool* prefetcherPool) { invariant(prefetcherPool); for (auto&& op : ops) { prefetcherPool->schedule(&prefetchOp, op.raw); @@ -441,12 +441,12 @@ void prefetchOps(const std::deque<SyncTail::OplogEntry>& ops, OldThreadPool* pre } // Doles out all the work to the writer pool threads and waits for them to complete -void applyOps(const std::vector<std::vector<SyncTail::OplogEntry>>& writerVectors, +void applyOps(const std::vector<std::vector<OplogEntry>>& writerVectors, OldThreadPool* writerPool, SyncTail::MultiSyncApplyFunc func, SyncTail* sync) { TimerHolder timer(&applyBatchStats); - for (std::vector<std::vector<SyncTail::OplogEntry>>::const_iterator it = writerVectors.begin(); + for (std::vector<std::vector<OplogEntry>>::const_iterator it = writerVectors.begin(); it != writerVectors.end(); ++it) { if (!it->empty()) { @@ -486,8 +486,8 @@ private: }; void fillWriterVectors(OperationContext* txn, - const std::deque<SyncTail::OplogEntry>& ops, - std::vector<std::vector<SyncTail::OplogEntry>>* writerVectors) { + const std::deque<OplogEntry>& ops, + std::vector<std::vector<OplogEntry>>* writerVectors) { const bool supportsDocLocking = getGlobalServiceContext()->getGlobalStorageEngine()->supportsDocLocking(); const uint32_t numWriters = writerVectors->size(); @@ -525,7 +525,7 @@ void fillWriterVectors(OperationContext* txn, if (op.opType == "i" && isCapped(txn, hashedNs)) { // Mark capped collection ops before storing them to ensure we do not attempt to bulk // insert them. - SyncTail::OplogEntry modifiedOp = op; + OplogEntry modifiedOp = op; modifiedOp.isForCappedCollection = true; (*writerVectors)[hash % numWriters].push_back(modifiedOp); } else { @@ -546,7 +546,7 @@ OpTime SyncTail::multiApply(OperationContext* txn, const OpQueue& ops) { prefetchOps(ops.getDeque(), &_prefetcherPool); } - std::vector<std::vector<SyncTail::OplogEntry>> writerVectors(replWriterThreadCount); + std::vector<std::vector<OplogEntry>> writerVectors(replWriterThreadCount); fillWriterVectors(txn, ops.getDeque(), &writerVectors); LOG(2) << "replication batch size is " << ops.getDeque().size() << endl; @@ -832,23 +832,6 @@ void SyncTail::oplogApplication() { } } -SyncTail::OplogEntry::OplogEntry(const BSONObj& rawInput) : raw(rawInput.getOwned()) { - for (auto elem : raw) { - const auto name = elem.fieldNameStringData(); - if (name == "ns") { - ns = elem.valuestrsafe(); - } else if (name == "op") { - opType = elem.valuestrsafe(); - } else if (name == "o2") { - o2 = elem; - } else if (name == "v") { - version = elem; - } else if (name == "o") { - o = elem; - } - } -} - // Copies ops out of the bgsync queue into the deque passed in as a parameter. // Returns true if the batch should be ended early. // Batch should end early if we encounter a command, or if @@ -1030,8 +1013,8 @@ static void initializeWriterThread() { } // This free function is used by the writer threads to apply each op -void multiSyncApply(const std::vector<SyncTail::OplogEntry>& ops, SyncTail* st) { - using OplogEntry = SyncTail::OplogEntry; +void multiSyncApply(const std::vector<OplogEntry>& ops, SyncTail* st) { + using OplogEntry = OplogEntry; std::vector<OplogEntry> oplogEntries(ops.begin(), ops.end()); std::vector<OplogEntry*> oplogEntryPointers(oplogEntries.size()); @@ -1149,7 +1132,7 @@ void multiSyncApply(const std::vector<SyncTail::OplogEntry>& ops, SyncTail* st) } // This free function is used by the initial sync writer threads to apply each op -void multiInitialSyncApply(const std::vector<SyncTail::OplogEntry>& ops, SyncTail* st) { +void multiInitialSyncApply(const std::vector<OplogEntry>& ops, SyncTail* st) { initializeWriterThread(); OperationContextImpl txn; @@ -1161,8 +1144,7 @@ void multiInitialSyncApply(const std::vector<SyncTail::OplogEntry>& ops, SyncTai bool convertUpdatesToUpserts = false; - for (std::vector<SyncTail::OplogEntry>::const_iterator it = ops.begin(); it != ops.end(); - ++it) { + for (std::vector<OplogEntry>::const_iterator it = ops.begin(); it != ops.end(); ++it) { try { const Status s = SyncTail::syncApply(&txn, it->raw, convertUpdatesToUpserts); if (!s.isOK()) { |