summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl
diff options
context:
space:
mode:
authorEric Milkie <milkie@10gen.com>2017-08-29 13:51:15 -0400
committerEric Milkie <milkie@10gen.com>2017-09-12 16:04:57 -0400
commit6264d36ac6002b296aa41b8dc79400fcc2cbdd74 (patch)
treef1c0971779064bbf6c3fde535b7666d8738f236d /src/mongo/db/repl
parent978521eb3926867b30903781fd89d4acd931f0c4 (diff)
downloadmongo-6264d36ac6002b296aa41b8dc79400fcc2cbdd74.tar.gz
SERVER-30827 SERVER-30639 Timestamp bulk writes via changes to optime generator
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r--src/mongo/db/repl/SConscript38
-rw-r--r--src/mongo/db/repl/initial_syncer.cpp32
-rw-r--r--src/mongo/db/repl/initial_syncer.h7
-rw-r--r--src/mongo/db/repl/initial_syncer_test.cpp38
-rw-r--r--src/mongo/db/repl/oplog.cpp155
-rw-r--r--src/mongo/db/repl/oplog.h40
-rw-r--r--src/mongo/db/repl/oplogreader.cpp2
-rw-r--r--src/mongo/db/repl/oplogreader.h3
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp3
-rw-r--r--src/mongo/db/repl/storage_interface.h7
-rw-r--r--src/mongo/db/repl/storage_interface_impl.cpp28
-rw-r--r--src/mongo/db/repl/storage_interface_impl.h2
-rw-r--r--src/mongo/db/repl/storage_interface_mock.h9
13 files changed, 223 insertions, 141 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 4cf37c9f16e..0b1cf62ad90 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -9,25 +9,31 @@ env.Library(
source=[
'apply_ops.cpp',
'oplog.cpp',
- 'oplogreader.cpp',
],
- LIBDEPS=[
- 'dbcheck',
- 'repl_coordinator_interface',
- 'repl_coordinator_global',
+ LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/background',
- '$BUILD_DIR/mongo/db/catalog/database',
- '$BUILD_DIR/mongo/db/catalog/database_holder',
- '$BUILD_DIR/mongo/db/concurrency/lock_manager',
+ '$BUILD_DIR/mongo/db/catalog/catalog_helpers',
'$BUILD_DIR/mongo/db/db_raii',
'$BUILD_DIR/mongo/db/dbdirectclient',
'$BUILD_DIR/mongo/db/dbhelpers',
'$BUILD_DIR/mongo/db/index_d',
- '$BUILD_DIR/mongo/db/index/index_descriptor',
- '$BUILD_DIR/mongo/db/write_ops',
- '$BUILD_DIR/mongo/db/catalog/catalog_helpers',
- '$BUILD_DIR/mongo/db/namespace_string',
+ 'dbcheck',
+ 'repl_coordinator_interface',
+ ],
+)
+
+env.Library(
+ target='oplogreader',
+ source=[
+ 'oplogreader.cpp',
+ ],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/client/clientdriver',
+ '$BUILD_DIR/mongo/db/auth/authcommon',
+ '$BUILD_DIR/mongo/db/auth/authorization_manager_global',
+ '$BUILD_DIR/mongo/util/net/network',
],
)
@@ -37,12 +43,12 @@ env.CppUnitTest(
'oplog_test.cpp',
],
LIBDEPS=[
- 'oplog',
- 'oplog_entry',
- 'replmocks',
'$BUILD_DIR/mongo/db/service_context_d_test_fixture',
'$BUILD_DIR/mongo/unittest/concurrency',
'$BUILD_DIR/mongo/util/concurrency/thread_pool',
+ 'oplog',
+ 'oplog_entry',
+ 'replmocks',
],
)
@@ -54,6 +60,7 @@ env.Library(
LIBDEPS=[
'oplog',
'oplog_interface_remote',
+ 'oplogreader',
'repl_coordinator_interface',
'repl_coordinator_global',
'$BUILD_DIR/mongo/base',
@@ -475,6 +482,7 @@ env.Library(
'oplog',
'replication_process',
'roll_back_local_operations',
+ '$BUILD_DIR/mongo/db/catalog/catalog_helpers',
'$BUILD_DIR/mongo/db/catalog/database_holder',
'$BUILD_DIR/mongo/db/s/sharding',
'$BUILD_DIR/mongo/db/write_ops',
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp
index 2f438ec436c..edd114366a4 100644
--- a/src/mongo/db/repl/initial_syncer.cpp
+++ b/src/mongo/db/repl/initial_syncer.cpp
@@ -505,7 +505,7 @@ void InitialSyncer::_chooseSyncSourceCallback(
// There is no need to schedule separate task to create oplog collection since we are already in
// a callback and we are certain there's no existing operation context (required for creating
// collections and dropping user databases) attached to the current thread.
- status = _recreateOplogAndDropReplicatedDatabases();
+ status = _truncateOplogAndDropReplicatedDatabases();
if (!status.isOK()) {
onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
return;
@@ -527,9 +527,9 @@ void InitialSyncer::_chooseSyncSourceCallback(
_getBaseRollbackIdHandle = scheduleResult.getValue();
}
-Status InitialSyncer::_recreateOplogAndDropReplicatedDatabases() {
- // drop/create oplog; drop user databases.
- LOG(1) << "About to drop+create the oplog, if it exists, ns:" << _opts.localOplogNS
+Status InitialSyncer::_truncateOplogAndDropReplicatedDatabases() {
+ // truncate oplog; drop user databases.
+ LOG(1) << "About to truncate the oplog, if it exists, ns:" << _opts.localOplogNS
<< ", and drop all user databases (so that we can clone them).";
auto opCtx = makeOpCtx();
@@ -537,23 +537,21 @@ Status InitialSyncer::_recreateOplogAndDropReplicatedDatabases() {
// We are not replicating nor validating these writes.
UnreplicatedWritesBlock unreplicatedWritesBlock(opCtx.get());
- // 1.) Drop the oplog.
- LOG(2) << "Dropping the existing oplog: " << _opts.localOplogNS;
- auto status = _storage->dropCollection(opCtx.get(), _opts.localOplogNS);
+ // 1.) Truncate the oplog.
+ LOG(2) << "Truncating the existing oplog: " << _opts.localOplogNS;
+ auto status = _storage->truncateCollection(opCtx.get(), _opts.localOplogNS);
if (!status.isOK()) {
- return status;
+ // 1a.) Create the oplog.
+ LOG(2) << "Creating the oplog: " << _opts.localOplogNS;
+ status = _storage->createOplog(opCtx.get(), _opts.localOplogNS);
+ if (!status.isOK()) {
+ return status;
+ }
}
// 2.) Drop user databases.
- LOG(2) << "Dropping user databases";
- status = _storage->dropReplicatedDatabases(opCtx.get());
- if (!status.isOK()) {
- return status;
- }
-
- // 3.) Create the oplog.
- LOG(2) << "Creating the oplog: " << _opts.localOplogNS;
- return _storage->createOplog(opCtx.get(), _opts.localOplogNS);
+ LOG(2) << "Dropping user databases";
+ return _storage->dropReplicatedDatabases(opCtx.get());
}
void InitialSyncer::_rollbackCheckerResetCallback(
diff --git a/src/mongo/db/repl/initial_syncer.h b/src/mongo/db/repl/initial_syncer.h
index 8ee406d8876..63095486232 100644
--- a/src/mongo/db/repl/initial_syncer.h
+++ b/src/mongo/db/repl/initial_syncer.h
@@ -278,7 +278,7 @@ private:
* |
* |
* V
- * _recreateOplogAndDropReplicatedDatabases()
+ * _truncateOplogAndDropReplicatedDatabases()
* |
* |
* V
@@ -368,11 +368,10 @@ private:
/**
* This function does the following:
- * 1.) Drop oplog.
+ * 1.) Truncate oplog.
* 2.) Drop user databases (replicated dbs).
- * 3.) Create oplog.
*/
- Status _recreateOplogAndDropReplicatedDatabases();
+ Status _truncateOplogAndDropReplicatedDatabases();
/**
* Callback for rollback checker's first replSetGetRBID command before starting data cloning.
diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp
index 274678a10bf..8d765cefc04 100644
--- a/src/mongo/db/repl/initial_syncer_test.cpp
+++ b/src/mongo/db/repl/initial_syncer_test.cpp
@@ -216,6 +216,7 @@ public:
protected:
struct StorageInterfaceResults {
bool createOplogCalled = false;
+ bool truncateCalled = false;
bool insertedOplogEntries = false;
int oplogEntriesInserted = 0;
bool droppedUserDBs = false;
@@ -235,6 +236,12 @@ protected:
_storageInterfaceWorkDone.createOplogCalled = true;
return Status::OK();
};
+ _storageInterface->truncateCollFn = [this](OperationContext* opCtx,
+ const NamespaceString& nss) {
+ LockGuard lock(_storageInterfaceWorkDoneMutex);
+ _storageInterfaceWorkDone.truncateCalled = true;
+ return Status::OK();
+ };
_storageInterface->insertDocumentFn = [this](
OperationContext* opCtx, const NamespaceString& nss, const TimestampedBSONObj& doc) {
LockGuard lock(_storageInterfaceWorkDoneMutex);
@@ -921,14 +928,13 @@ TEST_F(InitialSyncerTest, InitialSyncerResetsOnCompletionCallbackFunctionPointer
ASSERT_TRUE(sharedCallbackStateDestroyed);
}
-TEST_F(InitialSyncerTest, InitialSyncerRecreatesOplogAndDropsReplicatedDatabases) {
- // We are not interested in proceeding beyond the oplog creation stage so we inject a failure
- // after setting '_storageInterfaceWorkDone.createOplogCalled' to true.
- auto oldCreateOplogFn = _storageInterface->createOplogFn;
- _storageInterface->createOplogFn = [oldCreateOplogFn](OperationContext* opCtx,
- const NamespaceString& nss) {
- oldCreateOplogFn(opCtx, nss).transitional_ignore();
- return Status(ErrorCodes::OperationFailed, "oplog creation failed");
+TEST_F(InitialSyncerTest, InitialSyncerTruncatesOplogAndDropsReplicatedDatabases) {
+ // We are not interested in proceeding beyond the dropUserDB stage so we inject a failure
+ // after setting '_storageInterfaceWorkDone.droppedUserDBs' to true.
+ auto oldDropUserDBsFn = _storageInterface->dropUserDBsFn;
+ _storageInterface->dropUserDBsFn = [oldDropUserDBsFn](OperationContext* opCtx) {
+ ASSERT_OK(oldDropUserDBsFn(opCtx));
+ return Status(ErrorCodes::OperationFailed, "drop userdbs failed");
};
auto initialSyncer = &getInitialSyncer();
@@ -941,8 +947,8 @@ TEST_F(InitialSyncerTest, InitialSyncerRecreatesOplogAndDropsReplicatedDatabases
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
LockGuard lock(_storageInterfaceWorkDoneMutex);
+ ASSERT_TRUE(_storageInterfaceWorkDone.truncateCalled);
ASSERT_TRUE(_storageInterfaceWorkDone.droppedUserDBs);
- ASSERT_TRUE(_storageInterfaceWorkDone.createOplogCalled);
}
TEST_F(InitialSyncerTest, InitialSyncerPassesThroughGetRollbackIdScheduleError) {
@@ -973,13 +979,13 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughGetRollbackIdScheduleError)
TEST_F(
InitialSyncerTest,
InitialSyncerReturnsShutdownInProgressIfSchedulingRollbackCheckerFailedDueToExecutorShutdown) {
- // The rollback id request is sent immediately after oplog creation. We shut the task executor
- // down before returning from createOplog() to make the scheduleRemoteCommand() call for
+ // The rollback id request is sent immediately after oplog truncation. We shut the task executor
+ // down before returning from truncate() to make the scheduleRemoteCommand() call for
// replSetGetRBID fail.
- auto oldCreateOplogFn = _storageInterface->createOplogFn;
- _storageInterface->createOplogFn = [oldCreateOplogFn, this](OperationContext* opCtx,
- const NamespaceString& nss) {
- auto status = oldCreateOplogFn(opCtx, nss);
+ auto oldTruncateCollFn = _storageInterface->truncateCollFn;
+ _storageInterface->truncateCollFn = [oldTruncateCollFn, this](OperationContext* opCtx,
+ const NamespaceString& nss) {
+ auto status = oldTruncateCollFn(opCtx, nss);
getExecutor().shutdown();
return status;
};
@@ -994,7 +1000,7 @@ TEST_F(
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, _lastApplied);
LockGuard lock(_storageInterfaceWorkDoneMutex);
- ASSERT_TRUE(_storageInterfaceWorkDone.createOplogCalled);
+ ASSERT_TRUE(_storageInterfaceWorkDone.truncateCalled);
}
TEST_F(InitialSyncerTest, InitialSyncerCancelsRollbackCheckerOnShutdown) {
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 4508ef2621f..a3fc12b7adb 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -116,13 +116,17 @@ std::string masterSlaveOplogName = "local.oplog.$main";
MONGO_FP_DECLARE(disableSnapshotting);
namespace {
-// cached copy...so don't rename, drop, etc.!!!
+/**
+ * The `_localOplogCollection` pointer is always valid (or null) because an
+ * operation must take the global exclusive lock to set the pointer to null when
+ * the Collection instance is destroyed. See `oplogCheckCloseDatabase`.
+ */
Collection* _localOplogCollection = nullptr;
PseudoRandom hashGenerator(std::unique_ptr<SecureRandom>(SecureRandom::create())->nextInt64());
-// Synchronizes the section where a new Timestamp is generated and when it actually
-// appears in the oplog.
+// Synchronizes the section where a new Timestamp is generated and when it is registered in the
+// storage engine.
stdx::mutex newOpMutex;
stdx::condition_variable newTimestampNotifier;
// Remembers that last timestamp generated for creating new oplog entries or last timestamp of
@@ -137,27 +141,16 @@ void checkOplogInsert(Status result) {
massert(17322, str::stream() << "write to oplog failed: " << result.toString(), result.isOK());
}
-struct OplogSlot {
- OpTime opTime;
- int64_t hash = 0;
-};
-
-/**
- * Allocates an optime for a new entry in the oplog, and updates the replication coordinator to
- * reflect that new optime. Returns the new optime and the correct value of the "h" field for
- * the new oplog entry.
- */
-void getNextOpTime(OperationContext* opCtx,
- Collection* oplog,
- ReplicationCoordinator* replCoord,
- ReplicationCoordinator::Mode replicationMode,
- unsigned count,
- OplogSlot* slotsOut) {
- synchronizeOnCappedInFlightResource(opCtx->lockState(), oplog->ns());
+void _getNextOpTimes(OperationContext* opCtx,
+ Collection* oplog,
+ std::size_t count,
+ OplogSlot* slotsOut) {
+ synchronizeOnOplogInFlightResource(opCtx->lockState());
+ auto replCoord = ReplicationCoordinator::get(opCtx);
long long term = OpTime::kUninitializedTerm;
// Fetch term out of the newOpMutex.
- if (replicationMode == ReplicationCoordinator::modeReplSet &&
+ if (replCoord->getReplicationMode() == ReplicationCoordinator::modeReplSet &&
replCoord->isV1ElectionProtocol()) {
// Current term. If we're not a replset of pv=1, it remains kOldProtocolVersionTerm.
term = replCoord->getTerm();
@@ -172,8 +165,8 @@ void getNextOpTime(OperationContext* opCtx,
fassert(28560, oplog->getRecordStore()->oplogDiskLocRegister(opCtx, ts));
// Set hash if we're in replset mode, otherwise it remains 0 in master/slave.
- const bool needHash = (replicationMode == ReplicationCoordinator::modeReplSet);
- for (unsigned i = 0; i < count; i++) {
+ const bool needHash = (replCoord->getReplicationMode() == ReplicationCoordinator::modeReplSet);
+ for (std::size_t i = 0; i < count; i++) {
slotsOut[i].opTime = {Timestamp(ts.asULL() + i), term};
if (needHash) {
slotsOut[i].hash = hashGenerator.nextInt64();
@@ -222,11 +215,18 @@ private:
} // namespace
void setOplogCollectionName() {
- if (getGlobalReplicationCoordinator()->getReplicationMode() ==
- ReplicationCoordinator::modeReplSet) {
- _oplogCollectionName = NamespaceString::kRsOplogNamespace.ns();
- } else {
- _oplogCollectionName = masterSlaveOplogName;
+ switch (getGlobalReplicationCoordinator()->getReplicationMode()) {
+ case ReplicationCoordinator::modeReplSet:
+ _oplogCollectionName = NamespaceString::kRsOplogNamespace.ns();
+ break;
+ case ReplicationCoordinator::modeMasterSlave:
+ _oplogCollectionName = masterSlaveOplogName;
+ break;
+ case ReplicationCoordinator::modeNone:
+ // leave empty.
+ break;
+ default:
+ MONGO_UNREACHABLE;
}
}
@@ -275,21 +275,6 @@ void createIndexForApplyOps(OperationContext* opCtx,
namespace {
-Collection* getLocalOplogCollection(OperationContext* opCtx,
- const std::string& oplogCollectionName) {
- if (_localOplogCollection)
- return _localOplogCollection;
-
- AutoGetCollection autoColl(opCtx, NamespaceString(oplogCollectionName), MODE_IX);
- _localOplogCollection = autoColl.getCollection();
- massert(13347,
- "the oplog collection " + oplogCollectionName +
- " missing. did you drop it? if so, restart the server",
- _localOplogCollection);
-
- return _localOplogCollection;
-}
-
/**
* Attaches the session information of a write to an oplog entry if it exists.
*/
@@ -392,11 +377,9 @@ void _logOpsInner(OperationContext* opCtx,
Timestamp* timestamps,
size_t nDocs,
Collection* oplogCollection,
- ReplicationCoordinator::Mode replicationMode,
OpTime finalOpTime) {
- ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
-
- if (nss.size() && replicationMode == ReplicationCoordinator::modeReplSet &&
+ auto replCoord = ReplicationCoordinator::get(opCtx);
+ if (nss.size() && replCoord->getReplicationMode() == ReplicationCoordinator::modeReplSet &&
!replCoord->canAcceptWritesFor(opCtx, nss)) {
severe() << "logOp() but can't accept write to collection " << nss.ns();
fassertFailed(17405);
@@ -431,12 +414,11 @@ OpTime logOp(OperationContext* opCtx,
Lock::DBLock lk(opCtx, NamespaceString::kLocalDb, MODE_IX);
Lock::CollectionLock lock(opCtx->lockState(), _oplogCollectionName, MODE_IX);
- auto const oplog = getLocalOplogCollection(opCtx, _oplogCollectionName);
- auto const replMode = replCoord->getReplicationMode();
+ auto const oplog = _localOplogCollection;
OplogSlot slot;
WriteUnitOfWork wuow(opCtx);
- getNextOpTime(opCtx, oplog, replCoord, replMode, 1, &slot);
+ _getNextOpTimes(opCtx, oplog, 1, &slot);
auto writer = _logOpWriter(opCtx,
opstr,
@@ -453,7 +435,7 @@ OpTime logOp(OperationContext* opCtx,
oplogLink);
const DocWriter* basePtr = &writer;
auto timestamp = slot.opTime.getTimestamp();
- _logOpsInner(opCtx, nss, &basePtr, &timestamp, 1, oplog, replMode, slot.opTime);
+ _logOpsInner(opCtx, nss, &basePtr, &timestamp, 1, oplog, slot.opTime);
wuow.commit();
return slot.opTime;
}
@@ -476,14 +458,11 @@ repl::OpTime logInsertOps(OperationContext* opCtx,
const size_t count = end - begin;
std::vector<OplogDocWriter> writers;
writers.reserve(count);
- Collection* oplog = getLocalOplogCollection(opCtx, _oplogCollectionName);
+ Collection* oplog = _localOplogCollection;
Lock::DBLock lk(opCtx, "local", MODE_IX);
Lock::CollectionLock lock(opCtx->lockState(), _oplogCollectionName, MODE_IX);
- std::unique_ptr<OplogSlot[]> slots(new OplogSlot[count]);
- auto replMode = replCoord->getReplicationMode();
WriteUnitOfWork wuow(opCtx);
- getNextOpTime(opCtx, oplog, replCoord, replMode, count, slots.get());
auto wallTime = Date_t::now();
OplogLink oplogLink;
@@ -496,39 +475,40 @@ repl::OpTime logInsertOps(OperationContext* opCtx,
}
auto timestamps = stdx::make_unique<Timestamp[]>(count);
+ OpTime lastOpTime;
for (size_t i = 0; i < count; i++) {
- auto insertStatement = begin[i];
+ // Make a mutable copy.
+ auto insertStatementOplogSlot = begin[i].oplogSlot;
+ // Fetch optime now, if not already fetched.
+ if (insertStatementOplogSlot.opTime.isNull()) {
+ _getNextOpTimes(opCtx, oplog, 1, &insertStatementOplogSlot);
+ }
writers.emplace_back(_logOpWriter(opCtx,
"i",
nss,
uuid,
- insertStatement.doc,
+ begin[i].doc,
NULL,
fromMigrate,
- slots[i].opTime,
- slots[i].hash,
+ insertStatementOplogSlot.opTime,
+ insertStatementOplogSlot.hash,
wallTime,
sessionInfo,
- insertStatement.stmtId,
+ begin[i].stmtId,
oplogLink));
- oplogLink.prevTs = slots[i].opTime.getTimestamp();
- timestamps[i] = slots[i].opTime.getTimestamp();
+ oplogLink.prevTs = insertStatementOplogSlot.opTime.getTimestamp();
+ timestamps[i] = oplogLink.prevTs;
+ lastOpTime = insertStatementOplogSlot.opTime;
}
std::unique_ptr<DocWriter const* []> basePtrs(new DocWriter const*[count]);
for (size_t i = 0; i < count; i++) {
basePtrs[i] = &writers[i];
}
- _logOpsInner(opCtx,
- nss,
- basePtrs.get(),
- timestamps.get(),
- count,
- oplog,
- replMode,
- slots[count - 1].opTime);
+ invariant(!lastOpTime.isNull());
+ _logOpsInner(opCtx, nss, basePtrs.get(), timestamps.get(), count, oplog, lastOpTime);
wuow.commit();
- return slots[count - 1].opTime;
+ return lastOpTime;
}
namespace {
@@ -599,7 +579,7 @@ void createOplog(OperationContext* opCtx, const std::string& oplogCollectionName
throw AssertionException(13257, ss.str());
}
}
-
+ acquireOplogCollectionForLogging(opCtx);
if (!isReplSet)
initTimestampFromOplog(opCtx, oplogCollectionName);
return;
@@ -619,6 +599,7 @@ void createOplog(OperationContext* opCtx, const std::string& oplogCollectionName
writeConflictRetry(opCtx, "createCollection", oplogCollectionName, [&] {
WriteUnitOfWork uow(opCtx);
invariant(ctx.db()->createCollection(opCtx, oplogCollectionName, options));
+ acquireOplogCollectionForLogging(opCtx);
if (!isReplSet)
getGlobalServiceContext()->getOpObserver()->onOpMessage(opCtx, BSONObj());
uow.commit();
@@ -627,6 +608,7 @@ void createOplog(OperationContext* opCtx, const std::string& oplogCollectionName
/* sync here so we don't get any surprising lag later when we try to sync */
StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine();
storageEngine->flushAllFiles(opCtx, true);
+
log() << "******" << endl;
}
@@ -636,6 +618,14 @@ void createOplog(OperationContext* opCtx) {
createOplog(opCtx, _oplogCollectionName, isReplSet);
}
+void getNextOpTimes(OperationContext* opCtx, std::size_t count, OplogSlot* slotsOut) {
+ // The local oplog collection pointer must already be established by this point.
+ // We can't establish it here because that would require locking the local database, which would
+ // be a lock order violation.
+ invariant(_localOplogCollection);
+ _getNextOpTimes(opCtx, _localOplogCollection, count, slotsOut);
+}
+
// -------------------------------------
namespace {
@@ -1317,6 +1307,7 @@ void setNewTimestamp(ServiceContext* service, const Timestamp& newTime) {
void initTimestampFromOplog(OperationContext* opCtx, const std::string& oplogNS) {
DBDirectClient c(opCtx);
+ static const BSONObj reverseNaturalObj = BSON("$natural" << -1);
BSONObj lastOp = c.findOne(oplogNS, Query().sort(reverseNaturalObj), NULL, QueryOption_SlaveOk);
if (!lastOp.isEmpty()) {
@@ -1328,8 +1319,18 @@ void initTimestampFromOplog(OperationContext* opCtx, const std::string& oplogNS)
void oplogCheckCloseDatabase(OperationContext* opCtx, Database* db) {
invariant(opCtx->lockState()->isW());
+ if (db->name() == "local") {
+ _localOplogCollection = nullptr;
+ }
+}
+
- _localOplogCollection = nullptr;
+void acquireOplogCollectionForLogging(OperationContext* opCtx) {
+ if (!_oplogCollectionName.empty()) {
+ AutoGetCollection autoColl(opCtx, NamespaceString(_oplogCollectionName), MODE_IX);
+ _localOplogCollection = autoColl.getCollection();
+ fassert(13347, _localOplogCollection);
+ }
}
void signalOplogWaiters() {
@@ -1429,12 +1430,10 @@ void SnapshotThread::run() {
SnapshotName name(0); // assigned real value in block.
{
- // Make sure there are no in-flight capped inserts while we create our snapshot.
+ // Make sure there are no in-flight oplog inserts while we create our snapshot.
// This lock cannot be aquired until all writes holding the resource commit/abort.
- Lock::ResourceLock cappedInsertLockForOtherDb(
- opCtx->lockState(), resourceCappedInFlightForOtherDb, MODE_X);
- Lock::ResourceLock cappedInsertLockForLocalDb(
- opCtx->lockState(), resourceCappedInFlightForLocalDb, MODE_X);
+ Lock::ResourceLock cappedInsertLockForOplog(
+ opCtx->lockState(), resourceInFlightForOplog, MODE_X);
// Reserve the name immediately before we take our snapshot. This ensures that all
// names that compare lower must be from points in time visible to this named
diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h
index 812efb81310..2198bb86c7e 100644
--- a/src/mongo/db/repl/oplog.h
+++ b/src/mongo/db/repl/oplog.h
@@ -34,10 +34,11 @@
#include "mongo/base/status.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/timestamp.h"
-#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/collection_options.h"
+#include "mongo/db/logical_session_id.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/db/storage/snapshot_name.h"
#include "mongo/stdx/functional.h"
namespace mongo {
@@ -48,6 +49,30 @@ class OperationContext;
class OperationSessionInfo;
class Session;
+struct OplogSlot {
+ OplogSlot() {}
+ OplogSlot(repl::OpTime opTime, std::int64_t hash) : opTime(opTime), hash(hash) {}
+ repl::OpTime opTime;
+ std::int64_t hash = 0;
+};
+
+struct InsertStatement {
+public:
+ InsertStatement() = default;
+ explicit InsertStatement(BSONObj toInsert) : doc(toInsert) {}
+
+ InsertStatement(StmtId statementId, BSONObj toInsert) : stmtId(statementId), doc(toInsert) {}
+ InsertStatement(StmtId statementId, BSONObj toInsert, OplogSlot os)
+ : stmtId(statementId), oplogSlot(os), doc(toInsert) {}
+ InsertStatement(BSONObj toInsert, SnapshotName ts)
+ : oplogSlot(repl::OpTime(Timestamp(ts.asU64()), repl::OpTime::kUninitializedTerm), 0),
+ doc(toInsert) {}
+
+ StmtId stmtId = kUninitializedStmtId;
+ OplogSlot oplogSlot;
+ BSONObj doc;
+};
+
namespace repl {
class ReplSettings;
@@ -118,10 +143,15 @@ OpTime logOp(OperationContext* opCtx,
StmtId stmtId,
const OplogLink& oplogLink);
-// Flush out the cached pointers to the local database and oplog.
+// Flush out the cached pointer to the oplog.
// Used by the closeDatabase command to ensure we don't cache closed things.
void oplogCheckCloseDatabase(OperationContext* opCtx, Database* db);
+/**
+ * Establish the cached pointer to the local oplog.
+ */
+void acquireOplogCollectionForLogging(OperationContext* opCtx);
+
using IncrementOpsAppliedStatsFn = stdx::function<void()>;
/**
* Take the object field of a BSONObj, the BSONObj, and the namespace of
@@ -184,5 +214,11 @@ void createIndexForApplyOps(OperationContext* opCtx,
const NamespaceString& indexNss,
IncrementOpsAppliedStatsFn incrementOpsAppliedStats);
+/**
+ * Allocates optimes for new entries in the oplog. Returns an array of OplogSlots, which contain
+ * the new optimes along with their terms and newly calculated hash fields.
+ */
+void getNextOpTimes(OperationContext* opCtx, std::size_t count, OplogSlot* slotsOut);
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/oplogreader.cpp b/src/mongo/db/repl/oplogreader.cpp
index 744045df7f3..659e061c554 100644
--- a/src/mongo/db/repl/oplogreader.cpp
+++ b/src/mongo/db/repl/oplogreader.cpp
@@ -50,8 +50,6 @@ using std::string;
namespace repl {
-const BSONObj reverseNaturalObj = BSON("$natural" << -1);
-
bool replAuthenticate(DBClientBase* conn) {
if (isInternalAuthSet())
return conn->authenticateInternalUser();
diff --git a/src/mongo/db/repl/oplogreader.h b/src/mongo/db/repl/oplogreader.h
index 7307d14e406..92e224a6498 100644
--- a/src/mongo/db/repl/oplogreader.h
+++ b/src/mongo/db/repl/oplogreader.h
@@ -37,9 +37,6 @@
namespace mongo {
namespace repl {
-// {"$natural": -1 }
-extern const BSONObj reverseNaturalObj;
-
/**
* Authenticates conn using the server's cluster-membership credentials.
*
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 06b78d50ef5..674ec4d5940 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -224,6 +224,9 @@ bool ReplicationCoordinatorExternalStateImpl::isInitialSyncFlagSet(OperationCont
void ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication(
OperationContext* opCtx, ReplicationCoordinator* replCoord) {
+ // Initialize the cached pointer to the oplog collection, for writing to the oplog.
+ acquireOplogCollectionForLogging(opCtx);
+
LockGuard lk(_threadMutex);
invariant(replCoord);
invariant(!_bgSync);
diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h
index 840b40e2c16..309b1715aa0 100644
--- a/src/mongo/db/repl/storage_interface.h
+++ b/src/mongo/db/repl/storage_interface.h
@@ -144,11 +144,16 @@ public:
const CollectionOptions& options) = 0;
/**
- * Drops a collection, like the oplog.
+ * Drops a collection.
*/
virtual Status dropCollection(OperationContext* opCtx, const NamespaceString& nss) = 0;
/**
+ * Truncates a collection.
+ */
+ virtual Status truncateCollection(OperationContext* opCtx, const NamespaceString& nss) = 0;
+
+ /**
* Renames a collection from the "fromNS" to the "toNS". Fails if the new collection already
* exists.
*/
diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp
index db090542ed6..e9f6b002cf6 100644
--- a/src/mongo/db/repl/storage_interface_impl.cpp
+++ b/src/mongo/db/repl/storage_interface_impl.cpp
@@ -411,10 +411,32 @@ Status StorageInterfaceImpl::dropCollection(OperationContext* opCtx, const Names
}
WriteUnitOfWork wunit(opCtx);
const auto status = autoDB.getDb()->dropCollectionEvenIfSystem(opCtx, nss);
- if (status.isOK()) {
- wunit.commit();
+ if (!status.isOK()) {
+ return status;
}
- return status;
+ wunit.commit();
+ return Status::OK();
+ });
+}
+
+Status StorageInterfaceImpl::truncateCollection(OperationContext* opCtx,
+ const NamespaceString& nss) {
+ return writeConflictRetry(opCtx, "StorageInterfaceImpl::truncateCollection", nss.ns(), [&] {
+ AutoGetCollection autoColl(opCtx, nss, MODE_X);
+ auto collectionResult =
+ getCollection(autoColl, nss, "The collection must exist before truncating.");
+ if (!collectionResult.isOK()) {
+ return collectionResult.getStatus();
+ }
+ auto collection = collectionResult.getValue();
+
+ WriteUnitOfWork wunit(opCtx);
+ const auto status = collection->truncate(opCtx);
+ if (!status.isOK()) {
+ return status;
+ }
+ wunit.commit();
+ return Status::OK();
});
}
diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h
index 2b8ba24f022..dfb367f398d 100644
--- a/src/mongo/db/repl/storage_interface_impl.h
+++ b/src/mongo/db/repl/storage_interface_impl.h
@@ -84,6 +84,8 @@ public:
Status dropCollection(OperationContext* opCtx, const NamespaceString& nss) override;
+ Status truncateCollection(OperationContext* opCtx, const NamespaceString& nss) override;
+
Status renameCollection(OperationContext* opCtx,
const NamespaceString& fromNS,
const NamespaceString& toNS,
diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h
index a0cd570cf8d..18902950699 100644
--- a/src/mongo/db/repl/storage_interface_mock.h
+++ b/src/mongo/db/repl/storage_interface_mock.h
@@ -99,6 +99,8 @@ public:
stdx::function<Status(OperationContext* opCtx, const NamespaceString& nss)>;
using CreateCollectionFn = stdx::function<Status(
OperationContext* opCtx, const NamespaceString& nss, const CollectionOptions& options)>;
+ using TruncateCollectionFn =
+ stdx::function<Status(OperationContext* opCtx, const NamespaceString& nss)>;
using DropCollectionFn =
stdx::function<Status(OperationContext* opCtx, const NamespaceString& nss)>;
using FindDocumentsFn =
@@ -168,6 +170,10 @@ public:
return dropCollFn(opCtx, nss);
};
+ Status truncateCollection(OperationContext* opCtx, const NamespaceString& nss) override {
+ return truncateCollFn(opCtx, nss);
+ }
+
Status renameCollection(OperationContext* opCtx,
const NamespaceString& fromNS,
const NamespaceString& toNS,
@@ -292,6 +298,9 @@ public:
[](OperationContext* opCtx, const NamespaceString& nss, const CollectionOptions& options) {
return Status{ErrorCodes::IllegalOperation, "CreateCollectionFn not implemented."};
};
+ TruncateCollectionFn truncateCollFn = [](OperationContext* opCtx, const NamespaceString& nss) {
+ return Status{ErrorCodes::IllegalOperation, "TruncateCollectionFn not implemented."};
+ };
DropCollectionFn dropCollFn = [](OperationContext* opCtx, const NamespaceString& nss) {
return Status{ErrorCodes::IllegalOperation, "DropCollectionFn not implemented."};
};