summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/oplog.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/oplog.cpp')
-rw-r--r--src/mongo/db/repl/oplog.cpp155
1 files changed, 77 insertions, 78 deletions
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 4508ef2621f..a3fc12b7adb 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -116,13 +116,17 @@ std::string masterSlaveOplogName = "local.oplog.$main";
MONGO_FP_DECLARE(disableSnapshotting);
namespace {
-// cached copy...so don't rename, drop, etc.!!!
+/**
+ * The `_localOplogCollection` pointer is always valid (or null) because an
+ * operation must take the global exclusive lock to set the pointer to null when
+ * the Collection instance is destroyed. See `oplogCheckCloseDatabase`.
+ */
Collection* _localOplogCollection = nullptr;
PseudoRandom hashGenerator(std::unique_ptr<SecureRandom>(SecureRandom::create())->nextInt64());
-// Synchronizes the section where a new Timestamp is generated and when it actually
-// appears in the oplog.
+// Synchronizes the section where a new Timestamp is generated and when it is registered in the
+// storage engine.
stdx::mutex newOpMutex;
stdx::condition_variable newTimestampNotifier;
// Remembers that last timestamp generated for creating new oplog entries or last timestamp of
@@ -137,27 +141,16 @@ void checkOplogInsert(Status result) {
massert(17322, str::stream() << "write to oplog failed: " << result.toString(), result.isOK());
}
-struct OplogSlot {
- OpTime opTime;
- int64_t hash = 0;
-};
-
-/**
- * Allocates an optime for a new entry in the oplog, and updates the replication coordinator to
- * reflect that new optime. Returns the new optime and the correct value of the "h" field for
- * the new oplog entry.
- */
-void getNextOpTime(OperationContext* opCtx,
- Collection* oplog,
- ReplicationCoordinator* replCoord,
- ReplicationCoordinator::Mode replicationMode,
- unsigned count,
- OplogSlot* slotsOut) {
- synchronizeOnCappedInFlightResource(opCtx->lockState(), oplog->ns());
+void _getNextOpTimes(OperationContext* opCtx,
+ Collection* oplog,
+ std::size_t count,
+ OplogSlot* slotsOut) {
+ synchronizeOnOplogInFlightResource(opCtx->lockState());
+ auto replCoord = ReplicationCoordinator::get(opCtx);
long long term = OpTime::kUninitializedTerm;
// Fetch term out of the newOpMutex.
- if (replicationMode == ReplicationCoordinator::modeReplSet &&
+ if (replCoord->getReplicationMode() == ReplicationCoordinator::modeReplSet &&
replCoord->isV1ElectionProtocol()) {
// Current term. If we're not a replset of pv=1, it remains kOldProtocolVersionTerm.
term = replCoord->getTerm();
@@ -172,8 +165,8 @@ void getNextOpTime(OperationContext* opCtx,
fassert(28560, oplog->getRecordStore()->oplogDiskLocRegister(opCtx, ts));
// Set hash if we're in replset mode, otherwise it remains 0 in master/slave.
- const bool needHash = (replicationMode == ReplicationCoordinator::modeReplSet);
- for (unsigned i = 0; i < count; i++) {
+ const bool needHash = (replCoord->getReplicationMode() == ReplicationCoordinator::modeReplSet);
+ for (std::size_t i = 0; i < count; i++) {
slotsOut[i].opTime = {Timestamp(ts.asULL() + i), term};
if (needHash) {
slotsOut[i].hash = hashGenerator.nextInt64();
@@ -222,11 +215,18 @@ private:
} // namespace
void setOplogCollectionName() {
- if (getGlobalReplicationCoordinator()->getReplicationMode() ==
- ReplicationCoordinator::modeReplSet) {
- _oplogCollectionName = NamespaceString::kRsOplogNamespace.ns();
- } else {
- _oplogCollectionName = masterSlaveOplogName;
+ switch (getGlobalReplicationCoordinator()->getReplicationMode()) {
+ case ReplicationCoordinator::modeReplSet:
+ _oplogCollectionName = NamespaceString::kRsOplogNamespace.ns();
+ break;
+ case ReplicationCoordinator::modeMasterSlave:
+ _oplogCollectionName = masterSlaveOplogName;
+ break;
+ case ReplicationCoordinator::modeNone:
+ // leave empty.
+ break;
+ default:
+ MONGO_UNREACHABLE;
}
}
@@ -275,21 +275,6 @@ void createIndexForApplyOps(OperationContext* opCtx,
namespace {
-Collection* getLocalOplogCollection(OperationContext* opCtx,
- const std::string& oplogCollectionName) {
- if (_localOplogCollection)
- return _localOplogCollection;
-
- AutoGetCollection autoColl(opCtx, NamespaceString(oplogCollectionName), MODE_IX);
- _localOplogCollection = autoColl.getCollection();
- massert(13347,
- "the oplog collection " + oplogCollectionName +
- " missing. did you drop it? if so, restart the server",
- _localOplogCollection);
-
- return _localOplogCollection;
-}
-
/**
* Attaches the session information of a write to an oplog entry if it exists.
*/
@@ -392,11 +377,9 @@ void _logOpsInner(OperationContext* opCtx,
Timestamp* timestamps,
size_t nDocs,
Collection* oplogCollection,
- ReplicationCoordinator::Mode replicationMode,
OpTime finalOpTime) {
- ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
-
- if (nss.size() && replicationMode == ReplicationCoordinator::modeReplSet &&
+ auto replCoord = ReplicationCoordinator::get(opCtx);
+ if (nss.size() && replCoord->getReplicationMode() == ReplicationCoordinator::modeReplSet &&
!replCoord->canAcceptWritesFor(opCtx, nss)) {
severe() << "logOp() but can't accept write to collection " << nss.ns();
fassertFailed(17405);
@@ -431,12 +414,11 @@ OpTime logOp(OperationContext* opCtx,
Lock::DBLock lk(opCtx, NamespaceString::kLocalDb, MODE_IX);
Lock::CollectionLock lock(opCtx->lockState(), _oplogCollectionName, MODE_IX);
- auto const oplog = getLocalOplogCollection(opCtx, _oplogCollectionName);
- auto const replMode = replCoord->getReplicationMode();
+ auto const oplog = _localOplogCollection;
OplogSlot slot;
WriteUnitOfWork wuow(opCtx);
- getNextOpTime(opCtx, oplog, replCoord, replMode, 1, &slot);
+ _getNextOpTimes(opCtx, oplog, 1, &slot);
auto writer = _logOpWriter(opCtx,
opstr,
@@ -453,7 +435,7 @@ OpTime logOp(OperationContext* opCtx,
oplogLink);
const DocWriter* basePtr = &writer;
auto timestamp = slot.opTime.getTimestamp();
- _logOpsInner(opCtx, nss, &basePtr, &timestamp, 1, oplog, replMode, slot.opTime);
+ _logOpsInner(opCtx, nss, &basePtr, &timestamp, 1, oplog, slot.opTime);
wuow.commit();
return slot.opTime;
}
@@ -476,14 +458,11 @@ repl::OpTime logInsertOps(OperationContext* opCtx,
const size_t count = end - begin;
std::vector<OplogDocWriter> writers;
writers.reserve(count);
- Collection* oplog = getLocalOplogCollection(opCtx, _oplogCollectionName);
+ Collection* oplog = _localOplogCollection;
Lock::DBLock lk(opCtx, "local", MODE_IX);
Lock::CollectionLock lock(opCtx->lockState(), _oplogCollectionName, MODE_IX);
- std::unique_ptr<OplogSlot[]> slots(new OplogSlot[count]);
- auto replMode = replCoord->getReplicationMode();
WriteUnitOfWork wuow(opCtx);
- getNextOpTime(opCtx, oplog, replCoord, replMode, count, slots.get());
auto wallTime = Date_t::now();
OplogLink oplogLink;
@@ -496,39 +475,40 @@ repl::OpTime logInsertOps(OperationContext* opCtx,
}
auto timestamps = stdx::make_unique<Timestamp[]>(count);
+ OpTime lastOpTime;
for (size_t i = 0; i < count; i++) {
- auto insertStatement = begin[i];
+ // Make a mutable copy.
+ auto insertStatementOplogSlot = begin[i].oplogSlot;
+ // Fetch optime now, if not already fetched.
+ if (insertStatementOplogSlot.opTime.isNull()) {
+ _getNextOpTimes(opCtx, oplog, 1, &insertStatementOplogSlot);
+ }
writers.emplace_back(_logOpWriter(opCtx,
"i",
nss,
uuid,
- insertStatement.doc,
+ begin[i].doc,
NULL,
fromMigrate,
- slots[i].opTime,
- slots[i].hash,
+ insertStatementOplogSlot.opTime,
+ insertStatementOplogSlot.hash,
wallTime,
sessionInfo,
- insertStatement.stmtId,
+ begin[i].stmtId,
oplogLink));
- oplogLink.prevTs = slots[i].opTime.getTimestamp();
- timestamps[i] = slots[i].opTime.getTimestamp();
+ oplogLink.prevTs = insertStatementOplogSlot.opTime.getTimestamp();
+ timestamps[i] = oplogLink.prevTs;
+ lastOpTime = insertStatementOplogSlot.opTime;
}
std::unique_ptr<DocWriter const* []> basePtrs(new DocWriter const*[count]);
for (size_t i = 0; i < count; i++) {
basePtrs[i] = &writers[i];
}
- _logOpsInner(opCtx,
- nss,
- basePtrs.get(),
- timestamps.get(),
- count,
- oplog,
- replMode,
- slots[count - 1].opTime);
+ invariant(!lastOpTime.isNull());
+ _logOpsInner(opCtx, nss, basePtrs.get(), timestamps.get(), count, oplog, lastOpTime);
wuow.commit();
- return slots[count - 1].opTime;
+ return lastOpTime;
}
namespace {
@@ -599,7 +579,7 @@ void createOplog(OperationContext* opCtx, const std::string& oplogCollectionName
throw AssertionException(13257, ss.str());
}
}
-
+ acquireOplogCollectionForLogging(opCtx);
if (!isReplSet)
initTimestampFromOplog(opCtx, oplogCollectionName);
return;
@@ -619,6 +599,7 @@ void createOplog(OperationContext* opCtx, const std::string& oplogCollectionName
writeConflictRetry(opCtx, "createCollection", oplogCollectionName, [&] {
WriteUnitOfWork uow(opCtx);
invariant(ctx.db()->createCollection(opCtx, oplogCollectionName, options));
+ acquireOplogCollectionForLogging(opCtx);
if (!isReplSet)
getGlobalServiceContext()->getOpObserver()->onOpMessage(opCtx, BSONObj());
uow.commit();
@@ -627,6 +608,7 @@ void createOplog(OperationContext* opCtx, const std::string& oplogCollectionName
/* sync here so we don't get any surprising lag later when we try to sync */
StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine();
storageEngine->flushAllFiles(opCtx, true);
+
log() << "******" << endl;
}
@@ -636,6 +618,14 @@ void createOplog(OperationContext* opCtx) {
createOplog(opCtx, _oplogCollectionName, isReplSet);
}
+void getNextOpTimes(OperationContext* opCtx, std::size_t count, OplogSlot* slotsOut) {
+ // The local oplog collection pointer must already be established by this point.
+ // We can't establish it here because that would require locking the local database, which would
+ // be a lock order violation.
+ invariant(_localOplogCollection);
+ _getNextOpTimes(opCtx, _localOplogCollection, count, slotsOut);
+}
+
// -------------------------------------
namespace {
@@ -1317,6 +1307,7 @@ void setNewTimestamp(ServiceContext* service, const Timestamp& newTime) {
void initTimestampFromOplog(OperationContext* opCtx, const std::string& oplogNS) {
DBDirectClient c(opCtx);
+ static const BSONObj reverseNaturalObj = BSON("$natural" << -1);
BSONObj lastOp = c.findOne(oplogNS, Query().sort(reverseNaturalObj), NULL, QueryOption_SlaveOk);
if (!lastOp.isEmpty()) {
@@ -1328,8 +1319,18 @@ void initTimestampFromOplog(OperationContext* opCtx, const std::string& oplogNS)
void oplogCheckCloseDatabase(OperationContext* opCtx, Database* db) {
invariant(opCtx->lockState()->isW());
+ if (db->name() == "local") {
+ _localOplogCollection = nullptr;
+ }
+}
+
- _localOplogCollection = nullptr;
+void acquireOplogCollectionForLogging(OperationContext* opCtx) {
+ if (!_oplogCollectionName.empty()) {
+ AutoGetCollection autoColl(opCtx, NamespaceString(_oplogCollectionName), MODE_IX);
+ _localOplogCollection = autoColl.getCollection();
+ fassert(13347, _localOplogCollection);
+ }
}
void signalOplogWaiters() {
@@ -1429,12 +1430,10 @@ void SnapshotThread::run() {
SnapshotName name(0); // assigned real value in block.
{
- // Make sure there are no in-flight capped inserts while we create our snapshot.
+ // Make sure there are no in-flight oplog inserts while we create our snapshot.
// This lock cannot be aquired until all writes holding the resource commit/abort.
- Lock::ResourceLock cappedInsertLockForOtherDb(
- opCtx->lockState(), resourceCappedInFlightForOtherDb, MODE_X);
- Lock::ResourceLock cappedInsertLockForLocalDb(
- opCtx->lockState(), resourceCappedInFlightForLocalDb, MODE_X);
+ Lock::ResourceLock cappedInsertLockForOplog(
+ opCtx->lockState(), resourceInFlightForOplog, MODE_X);
// Reserve the name immediately before we take our snapshot. This ensures that all
// names that compare lower must be from points in time visible to this named