summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/sync_tail.cpp
diff options
context:
space:
mode:
authormatt dannenberg <matt.dannenberg@10gen.com>2016-03-04 06:00:05 -0500
committermatt dannenberg <matt.dannenberg@10gen.com>2016-03-07 13:19:29 -0500
commit03eb4777de6cc9bade4041190b837b3c31a88e34 (patch)
tree8208f193b3d107362c98fbfbd036d829851437be /src/mongo/db/repl/sync_tail.cpp
parentb34164948c4727e1038cb76ce3783570c7f90d15 (diff)
downloadmongo-03eb4777de6cc9bade4041190b837b3c31a88e34.tar.gz
SERVER-22965 move OplogEntry into repl/oplog_entry.{cpp,h}
Diffstat (limited to 'src/mongo/db/repl/sync_tail.cpp')
-rw-r--r--src/mongo/db/repl/sync_tail.cpp40
1 files changed, 11 insertions, 29 deletions
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index f6a56395827..cd1e9eace81 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -432,7 +432,7 @@ void prefetchOp(const BSONObj& op) {
}
// Doles out all the work to the reader pool threads and waits for them to complete
-void prefetchOps(const std::deque<SyncTail::OplogEntry>& ops, OldThreadPool* prefetcherPool) {
+void prefetchOps(const std::deque<OplogEntry>& ops, OldThreadPool* prefetcherPool) {
invariant(prefetcherPool);
for (auto&& op : ops) {
prefetcherPool->schedule(&prefetchOp, op.raw);
@@ -441,12 +441,12 @@ void prefetchOps(const std::deque<SyncTail::OplogEntry>& ops, OldThreadPool* pre
}
// Doles out all the work to the writer pool threads and waits for them to complete
-void applyOps(const std::vector<std::vector<SyncTail::OplogEntry>>& writerVectors,
+void applyOps(const std::vector<std::vector<OplogEntry>>& writerVectors,
OldThreadPool* writerPool,
SyncTail::MultiSyncApplyFunc func,
SyncTail* sync) {
TimerHolder timer(&applyBatchStats);
- for (std::vector<std::vector<SyncTail::OplogEntry>>::const_iterator it = writerVectors.begin();
+ for (std::vector<std::vector<OplogEntry>>::const_iterator it = writerVectors.begin();
it != writerVectors.end();
++it) {
if (!it->empty()) {
@@ -486,8 +486,8 @@ private:
};
void fillWriterVectors(OperationContext* txn,
- const std::deque<SyncTail::OplogEntry>& ops,
- std::vector<std::vector<SyncTail::OplogEntry>>* writerVectors) {
+ const std::deque<OplogEntry>& ops,
+ std::vector<std::vector<OplogEntry>>* writerVectors) {
const bool supportsDocLocking =
getGlobalServiceContext()->getGlobalStorageEngine()->supportsDocLocking();
const uint32_t numWriters = writerVectors->size();
@@ -525,7 +525,7 @@ void fillWriterVectors(OperationContext* txn,
if (op.opType == "i" && isCapped(txn, hashedNs)) {
// Mark capped collection ops before storing them to ensure we do not attempt to bulk
// insert them.
- SyncTail::OplogEntry modifiedOp = op;
+ OplogEntry modifiedOp = op;
modifiedOp.isForCappedCollection = true;
(*writerVectors)[hash % numWriters].push_back(modifiedOp);
} else {
@@ -546,7 +546,7 @@ OpTime SyncTail::multiApply(OperationContext* txn, const OpQueue& ops) {
prefetchOps(ops.getDeque(), &_prefetcherPool);
}
- std::vector<std::vector<SyncTail::OplogEntry>> writerVectors(replWriterThreadCount);
+ std::vector<std::vector<OplogEntry>> writerVectors(replWriterThreadCount);
fillWriterVectors(txn, ops.getDeque(), &writerVectors);
LOG(2) << "replication batch size is " << ops.getDeque().size() << endl;
@@ -832,23 +832,6 @@ void SyncTail::oplogApplication() {
}
}
-SyncTail::OplogEntry::OplogEntry(const BSONObj& rawInput) : raw(rawInput.getOwned()) {
- for (auto elem : raw) {
- const auto name = elem.fieldNameStringData();
- if (name == "ns") {
- ns = elem.valuestrsafe();
- } else if (name == "op") {
- opType = elem.valuestrsafe();
- } else if (name == "o2") {
- o2 = elem;
- } else if (name == "v") {
- version = elem;
- } else if (name == "o") {
- o = elem;
- }
- }
-}
-
// Copies ops out of the bgsync queue into the deque passed in as a parameter.
// Returns true if the batch should be ended early.
// Batch should end early if we encounter a command, or if
@@ -1030,8 +1013,8 @@ static void initializeWriterThread() {
}
// This free function is used by the writer threads to apply each op
-void multiSyncApply(const std::vector<SyncTail::OplogEntry>& ops, SyncTail* st) {
- using OplogEntry = SyncTail::OplogEntry;
+void multiSyncApply(const std::vector<OplogEntry>& ops, SyncTail* st) {
+ using OplogEntry = OplogEntry;
std::vector<OplogEntry> oplogEntries(ops.begin(), ops.end());
std::vector<OplogEntry*> oplogEntryPointers(oplogEntries.size());
@@ -1149,7 +1132,7 @@ void multiSyncApply(const std::vector<SyncTail::OplogEntry>& ops, SyncTail* st)
}
// This free function is used by the initial sync writer threads to apply each op
-void multiInitialSyncApply(const std::vector<SyncTail::OplogEntry>& ops, SyncTail* st) {
+void multiInitialSyncApply(const std::vector<OplogEntry>& ops, SyncTail* st) {
initializeWriterThread();
OperationContextImpl txn;
@@ -1161,8 +1144,7 @@ void multiInitialSyncApply(const std::vector<SyncTail::OplogEntry>& ops, SyncTai
bool convertUpdatesToUpserts = false;
- for (std::vector<SyncTail::OplogEntry>::const_iterator it = ops.begin(); it != ops.end();
- ++it) {
+ for (std::vector<OplogEntry>::const_iterator it = ops.begin(); it != ops.end(); ++it) {
try {
const Status s = SyncTail::syncApply(&txn, it->raw, convertUpdatesToUpserts);
if (!s.isOK()) {