diff options
Diffstat (limited to 'src/mongo/db/repl/oplog.cpp')
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 155 |
1 files changed, 77 insertions, 78 deletions
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 4508ef2621f..a3fc12b7adb 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -116,13 +116,17 @@ std::string masterSlaveOplogName = "local.oplog.$main"; MONGO_FP_DECLARE(disableSnapshotting); namespace { -// cached copy...so don't rename, drop, etc.!!! +/** + * The `_localOplogCollection` pointer is always valid (or null) because an + * operation must take the global exclusive lock to set the pointer to null when + * the Collection instance is destroyed. See `oplogCheckCloseDatabase`. + */ Collection* _localOplogCollection = nullptr; PseudoRandom hashGenerator(std::unique_ptr<SecureRandom>(SecureRandom::create())->nextInt64()); -// Synchronizes the section where a new Timestamp is generated and when it actually -// appears in the oplog. +// Synchronizes the section where a new Timestamp is generated and when it is registered in the +// storage engine. stdx::mutex newOpMutex; stdx::condition_variable newTimestampNotifier; // Remembers that last timestamp generated for creating new oplog entries or last timestamp of @@ -137,27 +141,16 @@ void checkOplogInsert(Status result) { massert(17322, str::stream() << "write to oplog failed: " << result.toString(), result.isOK()); } -struct OplogSlot { - OpTime opTime; - int64_t hash = 0; -}; - -/** - * Allocates an optime for a new entry in the oplog, and updates the replication coordinator to - * reflect that new optime. Returns the new optime and the correct value of the "h" field for - * the new oplog entry. - */ -void getNextOpTime(OperationContext* opCtx, - Collection* oplog, - ReplicationCoordinator* replCoord, - ReplicationCoordinator::Mode replicationMode, - unsigned count, - OplogSlot* slotsOut) { - synchronizeOnCappedInFlightResource(opCtx->lockState(), oplog->ns()); +void _getNextOpTimes(OperationContext* opCtx, + Collection* oplog, + std::size_t count, + OplogSlot* slotsOut) { + synchronizeOnOplogInFlightResource(opCtx->lockState()); + auto replCoord = ReplicationCoordinator::get(opCtx); long long term = OpTime::kUninitializedTerm; // Fetch term out of the newOpMutex. - if (replicationMode == ReplicationCoordinator::modeReplSet && + if (replCoord->getReplicationMode() == ReplicationCoordinator::modeReplSet && replCoord->isV1ElectionProtocol()) { // Current term. If we're not a replset of pv=1, it remains kOldProtocolVersionTerm. term = replCoord->getTerm(); @@ -172,8 +165,8 @@ void getNextOpTime(OperationContext* opCtx, fassert(28560, oplog->getRecordStore()->oplogDiskLocRegister(opCtx, ts)); // Set hash if we're in replset mode, otherwise it remains 0 in master/slave. - const bool needHash = (replicationMode == ReplicationCoordinator::modeReplSet); - for (unsigned i = 0; i < count; i++) { + const bool needHash = (replCoord->getReplicationMode() == ReplicationCoordinator::modeReplSet); + for (std::size_t i = 0; i < count; i++) { slotsOut[i].opTime = {Timestamp(ts.asULL() + i), term}; if (needHash) { slotsOut[i].hash = hashGenerator.nextInt64(); @@ -222,11 +215,18 @@ private: } // namespace void setOplogCollectionName() { - if (getGlobalReplicationCoordinator()->getReplicationMode() == - ReplicationCoordinator::modeReplSet) { - _oplogCollectionName = NamespaceString::kRsOplogNamespace.ns(); - } else { - _oplogCollectionName = masterSlaveOplogName; + switch (getGlobalReplicationCoordinator()->getReplicationMode()) { + case ReplicationCoordinator::modeReplSet: + _oplogCollectionName = NamespaceString::kRsOplogNamespace.ns(); + break; + case ReplicationCoordinator::modeMasterSlave: + _oplogCollectionName = masterSlaveOplogName; + break; + case ReplicationCoordinator::modeNone: + // leave empty. + break; + default: + MONGO_UNREACHABLE; } } @@ -275,21 +275,6 @@ void createIndexForApplyOps(OperationContext* opCtx, namespace { -Collection* getLocalOplogCollection(OperationContext* opCtx, - const std::string& oplogCollectionName) { - if (_localOplogCollection) - return _localOplogCollection; - - AutoGetCollection autoColl(opCtx, NamespaceString(oplogCollectionName), MODE_IX); - _localOplogCollection = autoColl.getCollection(); - massert(13347, - "the oplog collection " + oplogCollectionName + - " missing. did you drop it? if so, restart the server", - _localOplogCollection); - - return _localOplogCollection; -} - /** * Attaches the session information of a write to an oplog entry if it exists. */ @@ -392,11 +377,9 @@ void _logOpsInner(OperationContext* opCtx, Timestamp* timestamps, size_t nDocs, Collection* oplogCollection, - ReplicationCoordinator::Mode replicationMode, OpTime finalOpTime) { - ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); - - if (nss.size() && replicationMode == ReplicationCoordinator::modeReplSet && + auto replCoord = ReplicationCoordinator::get(opCtx); + if (nss.size() && replCoord->getReplicationMode() == ReplicationCoordinator::modeReplSet && !replCoord->canAcceptWritesFor(opCtx, nss)) { severe() << "logOp() but can't accept write to collection " << nss.ns(); fassertFailed(17405); @@ -431,12 +414,11 @@ OpTime logOp(OperationContext* opCtx, Lock::DBLock lk(opCtx, NamespaceString::kLocalDb, MODE_IX); Lock::CollectionLock lock(opCtx->lockState(), _oplogCollectionName, MODE_IX); - auto const oplog = getLocalOplogCollection(opCtx, _oplogCollectionName); - auto const replMode = replCoord->getReplicationMode(); + auto const oplog = _localOplogCollection; OplogSlot slot; WriteUnitOfWork wuow(opCtx); - getNextOpTime(opCtx, oplog, replCoord, replMode, 1, &slot); + _getNextOpTimes(opCtx, oplog, 1, &slot); auto writer = _logOpWriter(opCtx, opstr, @@ -453,7 +435,7 @@ OpTime logOp(OperationContext* opCtx, oplogLink); const DocWriter* basePtr = &writer; auto timestamp = slot.opTime.getTimestamp(); - _logOpsInner(opCtx, nss, &basePtr, ×tamp, 1, oplog, replMode, slot.opTime); + _logOpsInner(opCtx, nss, &basePtr, ×tamp, 1, oplog, slot.opTime); wuow.commit(); return slot.opTime; } @@ -476,14 +458,11 @@ repl::OpTime logInsertOps(OperationContext* opCtx, const size_t count = end - begin; std::vector<OplogDocWriter> writers; writers.reserve(count); - Collection* oplog = getLocalOplogCollection(opCtx, _oplogCollectionName); + Collection* oplog = _localOplogCollection; Lock::DBLock lk(opCtx, "local", MODE_IX); Lock::CollectionLock lock(opCtx->lockState(), _oplogCollectionName, MODE_IX); - std::unique_ptr<OplogSlot[]> slots(new OplogSlot[count]); - auto replMode = replCoord->getReplicationMode(); WriteUnitOfWork wuow(opCtx); - getNextOpTime(opCtx, oplog, replCoord, replMode, count, slots.get()); auto wallTime = Date_t::now(); OplogLink oplogLink; @@ -496,39 +475,40 @@ repl::OpTime logInsertOps(OperationContext* opCtx, } auto timestamps = stdx::make_unique<Timestamp[]>(count); + OpTime lastOpTime; for (size_t i = 0; i < count; i++) { - auto insertStatement = begin[i]; + // Make a mutable copy. + auto insertStatementOplogSlot = begin[i].oplogSlot; + // Fetch optime now, if not already fetched. + if (insertStatementOplogSlot.opTime.isNull()) { + _getNextOpTimes(opCtx, oplog, 1, &insertStatementOplogSlot); + } writers.emplace_back(_logOpWriter(opCtx, "i", nss, uuid, - insertStatement.doc, + begin[i].doc, NULL, fromMigrate, - slots[i].opTime, - slots[i].hash, + insertStatementOplogSlot.opTime, + insertStatementOplogSlot.hash, wallTime, sessionInfo, - insertStatement.stmtId, + begin[i].stmtId, oplogLink)); - oplogLink.prevTs = slots[i].opTime.getTimestamp(); - timestamps[i] = slots[i].opTime.getTimestamp(); + oplogLink.prevTs = insertStatementOplogSlot.opTime.getTimestamp(); + timestamps[i] = oplogLink.prevTs; + lastOpTime = insertStatementOplogSlot.opTime; } std::unique_ptr<DocWriter const* []> basePtrs(new DocWriter const*[count]); for (size_t i = 0; i < count; i++) { basePtrs[i] = &writers[i]; } - _logOpsInner(opCtx, - nss, - basePtrs.get(), - timestamps.get(), - count, - oplog, - replMode, - slots[count - 1].opTime); + invariant(!lastOpTime.isNull()); + _logOpsInner(opCtx, nss, basePtrs.get(), timestamps.get(), count, oplog, lastOpTime); wuow.commit(); - return slots[count - 1].opTime; + return lastOpTime; } namespace { @@ -599,7 +579,7 @@ void createOplog(OperationContext* opCtx, const std::string& oplogCollectionName throw AssertionException(13257, ss.str()); } } - + acquireOplogCollectionForLogging(opCtx); if (!isReplSet) initTimestampFromOplog(opCtx, oplogCollectionName); return; @@ -619,6 +599,7 @@ void createOplog(OperationContext* opCtx, const std::string& oplogCollectionName writeConflictRetry(opCtx, "createCollection", oplogCollectionName, [&] { WriteUnitOfWork uow(opCtx); invariant(ctx.db()->createCollection(opCtx, oplogCollectionName, options)); + acquireOplogCollectionForLogging(opCtx); if (!isReplSet) getGlobalServiceContext()->getOpObserver()->onOpMessage(opCtx, BSONObj()); uow.commit(); @@ -627,6 +608,7 @@ void createOplog(OperationContext* opCtx, const std::string& oplogCollectionName /* sync here so we don't get any surprising lag later when we try to sync */ StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine(); storageEngine->flushAllFiles(opCtx, true); + log() << "******" << endl; } @@ -636,6 +618,14 @@ void createOplog(OperationContext* opCtx) { createOplog(opCtx, _oplogCollectionName, isReplSet); } +void getNextOpTimes(OperationContext* opCtx, std::size_t count, OplogSlot* slotsOut) { + // The local oplog collection pointer must already be established by this point. + // We can't establish it here because that would require locking the local database, which would + // be a lock order violation. + invariant(_localOplogCollection); + _getNextOpTimes(opCtx, _localOplogCollection, count, slotsOut); +} + // ------------------------------------- namespace { @@ -1317,6 +1307,7 @@ void setNewTimestamp(ServiceContext* service, const Timestamp& newTime) { void initTimestampFromOplog(OperationContext* opCtx, const std::string& oplogNS) { DBDirectClient c(opCtx); + static const BSONObj reverseNaturalObj = BSON("$natural" << -1); BSONObj lastOp = c.findOne(oplogNS, Query().sort(reverseNaturalObj), NULL, QueryOption_SlaveOk); if (!lastOp.isEmpty()) { @@ -1328,8 +1319,18 @@ void initTimestampFromOplog(OperationContext* opCtx, const std::string& oplogNS) void oplogCheckCloseDatabase(OperationContext* opCtx, Database* db) { invariant(opCtx->lockState()->isW()); + if (db->name() == "local") { + _localOplogCollection = nullptr; + } +} + - _localOplogCollection = nullptr; +void acquireOplogCollectionForLogging(OperationContext* opCtx) { + if (!_oplogCollectionName.empty()) { + AutoGetCollection autoColl(opCtx, NamespaceString(_oplogCollectionName), MODE_IX); + _localOplogCollection = autoColl.getCollection(); + fassert(13347, _localOplogCollection); + } } void signalOplogWaiters() { @@ -1429,12 +1430,10 @@ void SnapshotThread::run() { SnapshotName name(0); // assigned real value in block. { - // Make sure there are no in-flight capped inserts while we create our snapshot. + // Make sure there are no in-flight oplog inserts while we create our snapshot. // This lock cannot be aquired until all writes holding the resource commit/abort. - Lock::ResourceLock cappedInsertLockForOtherDb( - opCtx->lockState(), resourceCappedInFlightForOtherDb, MODE_X); - Lock::ResourceLock cappedInsertLockForLocalDb( - opCtx->lockState(), resourceCappedInFlightForLocalDb, MODE_X); + Lock::ResourceLock cappedInsertLockForOplog( + opCtx->lockState(), resourceInFlightForOplog, MODE_X); // Reserve the name immediately before we take our snapshot. This ensures that all // names that compare lower must be from points in time visible to this named |