summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/catalog/collection.cpp10
-rw-r--r--src/mongo/db/op_observer.cpp25
-rw-r--r--src/mongo/db/op_observer.h9
-rw-r--r--src/mongo/db/repl/oplog.cpp138
-rw-r--r--src/mongo/db/repl/oplog.h14
5 files changed, 127 insertions, 69 deletions
diff --git a/src/mongo/db/catalog/collection.cpp b/src/mongo/db/catalog/collection.cpp
index a7e2087f643..2c45a6057b7 100644
--- a/src/mongo/db/catalog/collection.cpp
+++ b/src/mongo/db/catalog/collection.cpp
@@ -360,11 +360,7 @@ Status Collection::insertDocuments(OperationContext* txn,
return status;
invariant(sid == txn->recoveryUnit()->getSnapshotId());
- int inserted = 0;
- for (vector<BSONObj>::iterator it = begin; it != end; it++) { // TODO: vectorize
- getGlobalServiceContext()->getOpObserver()->onInsert(txn, ns(), *it, fromMigrate);
- inserted++;
- }
+ getGlobalServiceContext()->getOpObserver()->onInserts(txn, ns(), begin, end, fromMigrate);
txn->recoveryUnit()->onCommit([this]() { notifyCappedWaitersIfNeeded(); });
@@ -405,7 +401,9 @@ Status Collection::insertDocument(OperationContext* txn,
if (!status.isOK())
return status;
- getGlobalServiceContext()->getOpObserver()->onInsert(txn, ns(), doc);
+ vector<BSONObj> docs;
+ docs.push_back(doc);
+ getGlobalServiceContext()->getOpObserver()->onInserts(txn, ns(), docs.begin(), docs.end());
txn->recoveryUnit()->onCommit([this]() { notifyCappedWaitersIfNeeded(); });
diff --git a/src/mongo/db/op_observer.cpp b/src/mongo/db/op_observer.cpp
index 0563482a99a..720d26ca9f2 100644
--- a/src/mongo/db/op_observer.cpp
+++ b/src/mongo/db/op_observer.cpp
@@ -43,6 +43,8 @@
namespace mongo {
+using std::vector;
+
void OpObserver::onCreateIndex(OperationContext* txn,
const std::string& ns,
BSONObj indexDoc,
@@ -54,16 +56,21 @@ void OpObserver::onCreateIndex(OperationContext* txn,
logOpForDbHash(txn, ns.c_str());
}
-void OpObserver::onInsert(OperationContext* txn,
- const NamespaceString& ns,
- BSONObj doc,
- bool fromMigrate) {
- repl::logOp(txn, "i", ns.ns().c_str(), doc, nullptr, fromMigrate);
+void OpObserver::onInserts(OperationContext* txn,
+ const NamespaceString& nss,
+ vector<BSONObj>::iterator begin,
+ vector<BSONObj>::iterator end,
+ bool fromMigrate) {
+ repl::logOps(txn, "i", nss, begin, end, fromMigrate);
+
+ const char* ns = nss.ns().c_str();
+ for (auto it = begin; it != end; it++) {
+ getGlobalAuthorizationManager()->logOp(txn, "i", ns, *it, nullptr);
+ logOpForSharding(txn, "i", ns, *it, nullptr, fromMigrate);
+ }
- getGlobalAuthorizationManager()->logOp(txn, "i", ns.ns().c_str(), doc, nullptr);
- logOpForSharding(txn, "i", ns.ns().c_str(), doc, nullptr, fromMigrate);
- logOpForDbHash(txn, ns.ns().c_str());
- if (strstr(ns.ns().c_str(), ".system.js")) {
+ logOpForDbHash(txn, ns);
+ if (strstr(ns, ".system.js")) {
Scope::storedFuncMod(txn);
}
}
diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h
index 4ecf7df20d0..2ef3a191f0a 100644
--- a/src/mongo/db/op_observer.h
+++ b/src/mongo/db/op_observer.h
@@ -55,10 +55,11 @@ public:
const std::string& ns,
BSONObj indexDoc,
bool fromMigrate = false);
- void onInsert(OperationContext* txn,
- const NamespaceString& ns,
- BSONObj doc,
- bool fromMigrate = false);
+ void onInserts(OperationContext* txn,
+ const NamespaceString& ns,
+ std::vector<BSONObj>::iterator begin,
+ std::vector<BSONObj>::iterator end,
+ bool fromMigrate = false);
void onUpdate(OperationContext* txn, oplogUpdateEntryArgs args);
void onDelete(OperationContext* txn,
const std::string& ns,
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 629a2e91f41..c82b8ee6e27 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -100,6 +100,7 @@ using std::endl;
using std::string;
using std::stringstream;
using std::unique_ptr;
+using std::vector;
namespace repl {
std::string rsOplogName = "local.oplog.rs";
@@ -127,6 +128,11 @@ void checkOplogInsert(Status result) {
massert(17322, str::stream() << "write to oplog failed: " << result.toString(), result.isOK());
}
+struct OplogSlot {
+ OpTime opTime;
+ int64_t hash;
+};
+
/**
* Allocates an optime for a new entry in the oplog, and updates the replication coordinator to
* reflect that new optime. Returns the new optime and the correct value of the "h" field for
@@ -137,14 +143,14 @@ void checkOplogInsert(Status result) {
* function registers the new optime with the storage system and the replication coordinator,
* and provides no facility to revert those registrations on rollback.
*/
-std::pair<OpTime, long long> getNextOpTime(OperationContext* txn,
- Collection* oplog,
- ReplicationCoordinator* replCoord,
- const char* opstr,
- ReplicationCoordinator::Mode replicationMode) {
+OplogSlot getNextOpTime(OperationContext* txn,
+ Collection* oplog,
+ ReplicationCoordinator* replCoord,
+ ReplicationCoordinator::Mode replicationMode) {
synchronizeOnCappedInFlightResource(txn->lockState(), oplog->ns());
- long long hashNew = 0;
+ OplogSlot slot;
+ slot.hash = 0;
long long term = OpTime::kUninitializedTerm;
// Fetch term out of the newOpMutex.
@@ -162,11 +168,11 @@ std::pair<OpTime, long long> getNextOpTime(OperationContext* txn,
// Set hash if we're in replset mode, otherwise it remains 0 in master/slave.
if (replicationMode == ReplicationCoordinator::modeReplSet) {
- hashNew = hashGenerator.nextInt64();
+ slot.hash = hashGenerator.nextInt64();
}
- OpTime opTime(ts, term);
- return std::pair<OpTime, long long>(opTime, hashNew);
+ slot.opTime = OpTime(ts, term);
+ return slot;
}
/**
@@ -234,7 +240,10 @@ void setOplogCollectionName() {
}
namespace {
-void cacheOplogCollection(OperationContext* txn, const std::string& oplogCollectionName) {
+
+Collection* getLocalOplogCollection(OperationContext* txn, const std::string& oplogCollectionName) {
+ if (_localOplogCollection)
+ return _localOplogCollection;
Lock::DBLock lk(txn->lockState(), "local", MODE_IX);
Lock::CollectionLock lk2(txn->lockState(), oplogCollectionName, MODE_IX);
@@ -246,11 +255,12 @@ void cacheOplogCollection(OperationContext* txn, const std::string& oplogCollect
"the oplog collection " + oplogCollectionName +
" missing. did you drop it? if so, restart the server",
_localOplogCollection);
+ return _localOplogCollection;
}
bool oplogDisabled(OperationContext* txn,
ReplicationCoordinator::Mode replicationMode,
- NamespaceString& nss) {
+ const NamespaceString& nss) {
if (replicationMode == ReplicationCoordinator::modeNone)
return true;
@@ -270,7 +280,7 @@ bool oplogDisabled(OperationContext* txn,
OplogDocWriter _logOpWriter(OperationContext* txn,
const char* opstr,
- NamespaceString& nss,
+ const NamespaceString& nss,
const BSONObj& obj,
BSONObj* o2,
bool fromMigrate,
@@ -339,6 +349,7 @@ void truncateOplogTo(OperationContext* txn, Timestamp truncateTimestamp) {
oplogCollection->temp_cappedTruncateAfter(txn, lastRecordId, false);
}
}
+
/* we write to local.oplog.rs:
{ ts : ..., h: ..., v: ..., op: ..., etc }
ts: an OpTime timestamp
@@ -356,24 +367,15 @@ void truncateOplogTo(OperationContext* txn, Timestamp truncateTimestamp) {
if not null, specifies a boolean to pass along to the other side as b: param.
used for "justOne" or "upsert" flags on 'd', 'u'
*/
-void _logOp(OperationContext* txn,
- const char* opstr,
- const char* ns,
- const BSONObj& obj,
- BSONObj* o2,
- bool fromMigrate,
- const std::string& oplogCollectionName,
- ReplicationCoordinator::Mode replicationMode,
- bool updateReplOpTime) {
- NamespaceString nss(ns);
- if (oplogDisabled(txn, replicationMode, nss))
- return;
-
- if (_localOplogCollection == nullptr)
- cacheOplogCollection(txn, oplogCollectionName);
-
- Lock::DBLock lk(txn->lockState(), "local", MODE_IX);
-
+void _logOpsInner(OperationContext* txn,
+ const char* opstr,
+ const NamespaceString& nss,
+ vector<OplogDocWriter> writers,
+ bool fromMigrate,
+ Collection* oplogCollection,
+ ReplicationCoordinator::Mode replicationMode,
+ bool updateReplOpTime,
+ OpTime finalOpTime) {
ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
if (nss.size() && replicationMode == ReplicationCoordinator::modeReplSet &&
@@ -381,37 +383,79 @@ void _logOp(OperationContext* txn,
severe() << "logOp() but can't accept write to collection " << nss.ns();
fassertFailed(17405);
}
- Lock::CollectionLock lk2(txn->lockState(), oplogCollectionName, MODE_IX);
- auto slot = getNextOpTime(txn, _localOplogCollection, replCoord, opstr, replicationMode);
// we jump through a bunch of hoops here to avoid copying the obj buffer twice --
// instead we do a single copy to the destination in the record store.
- auto writer = _logOpWriter(txn, opstr, nss, obj, o2, fromMigrate, slot.first, slot.second);
- checkOplogInsert(_localOplogCollection->insertDocument(txn, &writer, false));
+ for (auto it = writers.begin(); it != writers.end(); it++)
+ checkOplogInsert(oplogCollection->insertDocument(txn, &(*it), false));
// Set replCoord last optime only after we're sure the WUOW didn't abort and roll back.
- if (updateReplOpTime) {
- txn->recoveryUnit()->registerChange(new UpdateReplOpTimeChange(slot.first, replCoord));
- }
+ if (updateReplOpTime)
+ txn->recoveryUnit()->registerChange(new UpdateReplOpTimeChange(finalOpTime, replCoord));
+
+ ReplClientInfo::forClient(txn->getClient()).setLastOp(finalOpTime);
+}
- ReplClientInfo::forClient(txn->getClient()).setLastOp(slot.first);
+void _logOp(OperationContext* txn,
+ const char* opstr,
+ const char* ns,
+ const BSONObj& obj,
+ BSONObj* o2,
+ bool fromMigrate,
+ const std::string& oplogName,
+ ReplicationCoordinator::Mode replMode,
+ bool updateOpTime) {
+ NamespaceString nss(ns);
+ if (oplogDisabled(txn, replMode, nss))
+ return;
+
+ ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
+ vector<OplogDocWriter> writers;
+ Collection* oplog = getLocalOplogCollection(txn, oplogName);
+ Lock::DBLock lk(txn->lockState(), "local", MODE_IX);
+ Lock::CollectionLock lock(txn->lockState(), oplogName, MODE_IX);
+ auto slot = getNextOpTime(txn, oplog, replCoord, replMode);
+ auto writer = _logOpWriter(txn, opstr, nss, obj, o2, fromMigrate, slot.opTime, slot.hash);
+ writers.push_back(writer);
+ _logOpsInner(txn, opstr, nss, writers, fromMigrate, oplog, replMode, updateOpTime, slot.opTime);
+}
+
+void logOps(OperationContext* txn,
+ const char* opstr,
+ const NamespaceString& nss,
+ std::vector<BSONObj>::iterator begin,
+ std::vector<BSONObj>::iterator end,
+ bool fromMigrate) {
+ ReplicationCoordinator::Mode replMode = ReplicationCoordinator::get(txn)->getReplicationMode();
+
+ invariant(begin != end);
+ if (oplogDisabled(txn, replMode, nss))
+ return;
+
+ vector<OplogDocWriter> writers;
+ OpTime finalOpTime;
+ ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
+ Collection* oplog = getLocalOplogCollection(txn, _oplogCollectionName);
+ Lock::DBLock lk(txn->lockState(), "local", MODE_IX);
+ Lock::CollectionLock lock(txn->lockState(), _oplogCollectionName, MODE_IX);
+ for (auto it = begin; it != end; it++) {
+ auto slot = getNextOpTime(txn, oplog, replCoord, replMode);
+ finalOpTime = slot.opTime;
+ auto writer = _logOpWriter(txn, opstr, nss, *it, NULL, fromMigrate, slot.opTime, slot.hash);
+ writers.emplace_back(writer);
+ }
+ _logOpsInner(txn, opstr, nss, writers, fromMigrate, oplog, replMode, true, finalOpTime);
}
+
void logOp(OperationContext* txn,
const char* opstr,
const char* ns,
const BSONObj& obj,
BSONObj* o2,
bool fromMigrate) {
- _logOp(txn,
- opstr,
- ns,
- obj,
- o2,
- fromMigrate,
- _oplogCollectionName,
- ReplicationCoordinator::get(txn)->getReplicationMode(),
- true);
+ ReplicationCoordinator::Mode replMode = ReplicationCoordinator::get(txn)->getReplicationMode();
+ _logOp(txn, opstr, ns, obj, o2, fromMigrate, _oplogCollectionName, replMode, true);
}
OpTime writeOpsToOplog(OperationContext* txn, const std::deque<BSONObj>& ops) {
diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h
index bd47fbc0a1b..613c02edbcf 100644
--- a/src/mongo/db/repl/oplog.h
+++ b/src/mongo/db/repl/oplog.h
@@ -81,7 +81,7 @@ extern std::string masterSlaveOplogName;
extern int OPLOG_VERSION;
-/** Log an operation to the local oplog
+/* Log operation(s) to the local oplog
*
* @param opstr
* "i" insert
@@ -90,8 +90,16 @@ extern int OPLOG_VERSION;
* "c" db cmd
* "n" no-op
* "db" declares presence of a database (ns is set to the db name + '.')
- *
- * For 'u' records, 'obj' captures the mutation made to the object but not
+ */
+
+void logOps(OperationContext* txn,
+ const char* opstr,
+ const NamespaceString& nss,
+ std::vector<BSONObj>::iterator begin,
+ std::vector<BSONObj>::iterator end,
+ bool fromMigrate);
+
+/* For 'u' records, 'obj' captures the mutation made to the object but not
* the object itself. 'o2' captures the the criteria for the object that will be modified.
*
* Sets replCoord last optime if 'updateReplOpTime' is true.