summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorADAM David Alan Martin <adam.martin@10gen.com>2018-03-01 16:40:51 -0500
committerADAM David Alan Martin <adam.martin@10gen.com>2018-03-01 16:40:51 -0500
commit296fde1259d29e081069fde1c69bb9ae083932b1 (patch)
tree0e0973b0e6e642e09822f59be220be3a8c036466
parent22b2b828a922a7459b4e1c75860a11c7eb3db630 (diff)
downloadmongo-296fde1259d29e081069fde1c69bb9ae083932b1.tar.gz
SERVER-32843 Allow multiple times in OpObservers
The OpObserverRegistry was introduced as an abstraction to allow decoupling making data modifications from the side effects, which need to happen as a result of these modifications, such as op log writes, retryable writes, etc. Some of the OpObserver's methods currently return the OpTime which resulted from logging the operation to the replication oplog. In addition, in certain cases, the OpTime resulting from an earlier OpObserver might be needed by a later one, which is the case with retryable writes. In order to support these requirements, the OpObserver(s) chain should have access to some common per-operation structure, where runtime information could be persisted.
-rw-r--r--src/mongo/db/SConscript14
-rw-r--r--src/mongo/db/catalog/SConscript1
-rw-r--r--src/mongo/db/catalog/drop_database_test.cpp3
-rw-r--r--src/mongo/db/catalog/rename_collection_test.cpp7
-rw-r--r--src/mongo/db/op_observer.cpp41
-rw-r--r--src/mongo/db/op_observer.h66
-rw-r--r--src/mongo/db/op_observer_impl.cpp388
-rw-r--r--src/mongo/db/op_observer_impl_test.cpp6
-rw-r--r--src/mongo/db/op_observer_registry.h79
-rw-r--r--src/mongo/db/op_observer_registry_test.cpp46
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/oplog.cpp2
-rw-r--r--src/mongo/dbtests/SConscript1
-rw-r--r--src/mongo/s/sharding_mongod_test_fixture.cpp9
14 files changed, 430 insertions, 234 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index ed133252c5b..0145ab406d4 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -120,6 +120,8 @@ env.CppUnitTest(
source= 'op_observer_registry_test.cpp',
LIBDEPS=[
'common',
+ 'op_observer',
+ 'service_context',
],
)
@@ -645,12 +647,24 @@ env.Library(
)
env.Library(
+ target="op_observer",
+ source=[
+ "op_observer.cpp",
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/util/decorable',
+ ],
+)
+
+env.Library(
target="op_observer_d",
source=[
"op_observer_impl.cpp",
],
LIBDEPS=[
'catalog/collection_options',
+ 'op_observer',
'repl/oplog',
'views/views_mongod',
'$BUILD_DIR/mongo/base',
diff --git a/src/mongo/db/catalog/SConscript b/src/mongo/db/catalog/SConscript
index 662f7a22612..db9f6516cfa 100644
--- a/src/mongo/db/catalog/SConscript
+++ b/src/mongo/db/catalog/SConscript
@@ -290,6 +290,7 @@ env.Library(
'$BUILD_DIR/mongo/db/db_raii',
'$BUILD_DIR/mongo/db/index/index_access_method',
'$BUILD_DIR/mongo/db/index_d',
+ '$BUILD_DIR/mongo/db/op_observer',
'$BUILD_DIR/mongo/db/query_exec',
'$BUILD_DIR/mongo/db/repair_database',
'$BUILD_DIR/mongo/db/repl/drop_pending_collection_reaper',
diff --git a/src/mongo/db/catalog/drop_database_test.cpp b/src/mongo/db/catalog/drop_database_test.cpp
index da6bb449432..2c7e6d3e37c 100644
--- a/src/mongo/db/catalog/drop_database_test.cpp
+++ b/src/mongo/db/catalog/drop_database_test.cpp
@@ -98,7 +98,8 @@ repl::OpTime OpObserverMock::onDropCollection(OperationContext* opCtx,
uassert(
ErrorCodes::OperationFailed, "onDropCollection() failed", !onDropCollectionThrowsException);
- return opTime;
+ OpObserver::Times::get(opCtx).reservedOpTimes.push_back(opTime);
+ return {};
}
class DropDatabaseTest : public ServiceContextMongoDTest {
diff --git a/src/mongo/db/catalog/rename_collection_test.cpp b/src/mongo/db/catalog/rename_collection_test.cpp
index b3a919fc8a2..7802ccffdab 100644
--- a/src/mongo/db/catalog/rename_collection_test.cpp
+++ b/src/mongo/db/catalog/rename_collection_test.cpp
@@ -162,7 +162,9 @@ repl::OpTime OpObserverMock::onDropCollection(OperationContext* opCtx,
const NamespaceString& collectionName,
OptionalCollectionUUID uuid) {
_logOp(opCtx, collectionName, "drop");
- return OpObserverNoop::onDropCollection(opCtx, collectionName, uuid);
+ OpObserver::Times::get(opCtx).reservedOpTimes.push_back(
+ OpObserverNoop::onDropCollection(opCtx, collectionName, uuid));
+ return {};
}
repl::OpTime OpObserverMock::onRenameCollection(OperationContext* opCtx,
@@ -173,10 +175,11 @@ repl::OpTime OpObserverMock::onRenameCollection(OperationContext* opCtx,
OptionalCollectionUUID dropTargetUUID,
bool stayTemp) {
_logOp(opCtx, fromCollection, "rename");
+ OpObserver::Times::get(opCtx).reservedOpTimes.push_back(renameOpTime);
OpObserverNoop::onRenameCollection(
opCtx, fromCollection, toCollection, uuid, dropTarget, dropTargetUUID, stayTemp);
onRenameCollectionCalled = true;
- return renameOpTime;
+ return {};
}
void OpObserverMock::_logOp(OperationContext* opCtx,
diff --git a/src/mongo/db/op_observer.cpp b/src/mongo/db/op_observer.cpp
new file mode 100644
index 00000000000..7e0532b2225
--- /dev/null
+++ b/src/mongo/db/op_observer.cpp
@@ -0,0 +1,41 @@
+/**
+ * Copyright 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/op_observer.h"
+
+#include "mongo/db/operation_context.h"
+namespace mongo {
+namespace {
+const auto getOpObserverTimes = OperationContext::declareDecoration<OpObserver::Times>();
+} // namespace
+
+auto OpObserver::Times::get(OperationContext* const opCtx) -> Times& {
+ return getOpObserverTimes(opCtx);
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h
index 14d200fd458..2d9da73a1a4 100644
--- a/src/mongo/db/op_observer.h
+++ b/src/mongo/db/op_observer.h
@@ -45,7 +45,7 @@ class OperationContext;
namespace repl {
class OpTime;
-} // repl
+} // namespace repl
/**
* Holds document update information used in logging.
@@ -274,6 +274,70 @@ public:
*/
virtual void onReplicationRollback(OperationContext* opCtx,
const RollbackObserverInfo& rbInfo) = 0;
+
+ struct Times;
+
+protected:
+ class ReservedTimes;
};
+/**
+ * This struct is a decoration for `OperationContext` which contains collected `repl::OpTime`
+ * and `Date_t` timestamps of various critical stages of an operation performed by an OpObserver
+ * chain.
+ */
+struct OpObserver::Times {
+ static Times& get(OperationContext*);
+
+ std::vector<repl::OpTime> reservedOpTimes;
+
+private:
+ friend OpObserver::ReservedTimes;
+
+ // Because `OpObserver`s are re-entrant, it is necessary to track the recursion depth to know
+ // when to actually clear the `reservedOpTimes` vector, using the `ReservedTimes` scope object.
+ int _recursionDepth = 0;
+};
+
+/**
+ * This class is an RAII object to manage the state of the `OpObserver::Times` decoration on an
+ * operation context. Upon destruction the list of times in the decoration on the operation
+ * context is cleared. It is intended for use as a scope object in `OpObserverRegistry` to manage
+ * re-entrancy.
+ */
+class OpObserver::ReservedTimes {
+ ReservedTimes(const ReservedTimes&) = delete;
+ ReservedTimes& operator=(const ReservedTimes&) = delete;
+
+public:
+ ~ReservedTimes() {
+ // Every time the `ReservedTimes` guard goes out of scope, this indicates one fewer level of
+ // recursion in the `OpObserver` registered chain.
+ if (!--_times._recursionDepth) {
+ // When the depth hits 0, the `OpObserver` is considered to have finished, and therefore
+ // the `reservedOpTimes` state needs to be reset.
+ _times.reservedOpTimes.clear();
+ }
+ invariant(_times._recursionDepth >= 0);
+ }
+
+ explicit ReservedTimes(OperationContext* const opCtx) : _times(Times::get(opCtx)) {
+ // Every time that a `ReservedTimes` scope object is instantiated, we have to track if there
+ // was a potentially recursive call. When there was no `OpObserver` chain being executed
+ // before this instantiation, we should have an empty `reservedOpTimes` vector.
+ if (!_times._recursionDepth++) {
+ invariant(_times.reservedOpTimes.empty());
+ }
+
+ invariant(_times._recursionDepth > 0);
+ invariant(_times._recursionDepth == 1 || !opCtx->writesAreReplicated());
+ }
+
+ const Times& get() const {
+ return _times;
+ }
+
+private:
+ Times& _times;
+};
} // namespace mongo
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index eea416f7db9..05d6ac0b91c 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -65,6 +65,33 @@ using DeleteState = CollectionShardingState::DeleteState;
const OperationContext::Decoration<DeleteState> getDeleteState =
OperationContext::declareDecoration<DeleteState>();
+repl::OpTime logOperation(OperationContext* opCtx,
+ const char* opstr,
+ const NamespaceString& ns,
+ OptionalCollectionUUID uuid,
+ const BSONObj& obj,
+ const BSONObj* o2,
+ bool fromMigrate,
+ Date_t wallClockTime,
+ const OperationSessionInfo& sessionInfo,
+ StmtId stmtId,
+ const repl::OplogLink& oplogLink) {
+ auto& times = OpObserver::Times::get(opCtx).reservedOpTimes;
+ auto opTime = repl::logOp(opCtx,
+ opstr,
+ ns,
+ uuid,
+ obj,
+ o2,
+ fromMigrate,
+ wallClockTime,
+ sessionInfo,
+ stmtId,
+ oplogLink);
+ times.push_back(opTime);
+ return opTime;
+}
+
/**
* Returns whether we're a master using master-slave replication.
*/
@@ -166,17 +193,17 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx,
opTimes.wallClockTime = getWallClockTimeForOpLog(opCtx);
if (!storeObj.isEmpty() && opCtx->getTxnNumber()) {
- auto noteUpdateOpTime = repl::logOp(opCtx,
- "n",
- args.nss,
- args.uuid,
- storeObj,
- nullptr,
- false,
- opTimes.wallClockTime,
- sessionInfo,
- args.stmtId,
- {});
+ auto noteUpdateOpTime = logOperation(opCtx,
+ "n",
+ args.nss,
+ args.uuid,
+ storeObj,
+ nullptr,
+ false,
+ opTimes.wallClockTime,
+ sessionInfo,
+ args.stmtId,
+ {});
opTimes.prePostImageOpTime = noteUpdateOpTime;
@@ -187,17 +214,17 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx,
}
}
- opTimes.writeOpTime = repl::logOp(opCtx,
- "u",
- args.nss,
- args.uuid,
- args.update,
- &args.criteria,
- args.fromMigrate,
- opTimes.wallClockTime,
- sessionInfo,
- args.stmtId,
- oplogLink);
+ opTimes.writeOpTime = logOperation(opCtx,
+ "u",
+ args.nss,
+ args.uuid,
+ args.update,
+ &args.criteria,
+ args.fromMigrate,
+ opTimes.wallClockTime,
+ sessionInfo,
+ args.stmtId,
+ oplogLink);
return opTimes;
}
@@ -225,33 +252,33 @@ OpTimeBundle replLogDelete(OperationContext* opCtx,
opTimes.wallClockTime = getWallClockTimeForOpLog(opCtx);
if (deletedDoc && opCtx->getTxnNumber()) {
- auto noteOplog = repl::logOp(opCtx,
- "n",
- nss,
- uuid,
- deletedDoc.get(),
- nullptr,
- false,
- opTimes.wallClockTime,
- sessionInfo,
- stmtId,
- {});
- opTimes.prePostImageOpTime = noteOplog;
- oplogLink.preImageOpTime = noteOplog;
- }
-
- CollectionShardingState::DeleteState& deleteState = getDeleteState(opCtx);
- opTimes.writeOpTime = repl::logOp(opCtx,
- "d",
+ auto noteOplog = logOperation(opCtx,
+ "n",
nss,
uuid,
- deleteState.documentKey,
+ deletedDoc.get(),
nullptr,
- fromMigrate,
+ false,
opTimes.wallClockTime,
sessionInfo,
stmtId,
- oplogLink);
+ {});
+ opTimes.prePostImageOpTime = noteOplog;
+ oplogLink.preImageOpTime = noteOplog;
+ }
+
+ CollectionShardingState::DeleteState& deleteState = getDeleteState(opCtx);
+ opTimes.writeOpTime = logOperation(opCtx,
+ "d",
+ nss,
+ uuid,
+ deleteState.documentKey,
+ nullptr,
+ fromMigrate,
+ opTimes.wallClockTime,
+ sessionInfo,
+ stmtId,
+ oplogLink);
return opTimes;
}
@@ -266,17 +293,17 @@ OpTimeBundle replLogApplyOps(OperationContext* opCtx,
const repl::OplogLink& oplogLink) {
OpTimeBundle times;
times.wallClockTime = getWallClockTimeForOpLog(opCtx);
- times.writeOpTime = repl::logOp(opCtx,
- "c",
- cmdNss,
- {},
- applyOpCmd,
- nullptr,
- false,
- times.wallClockTime,
- sessionInfo,
- stmtId,
- oplogLink);
+ times.writeOpTime = logOperation(opCtx,
+ "c",
+ cmdNss,
+ {},
+ applyOpCmd,
+ nullptr,
+ false,
+ times.wallClockTime,
+ sessionInfo,
+ stmtId,
+ oplogLink);
return times;
}
@@ -298,29 +325,29 @@ void OpObserverImpl::onCreateIndex(OperationContext* opCtx,
builder.append(e);
}
- repl::logOp(opCtx,
- "c",
- nss.getCommandNS(),
- uuid,
- builder.done(),
- nullptr,
- fromMigrate,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {});
+ logOperation(opCtx,
+ "c",
+ nss.getCommandNS(),
+ uuid,
+ builder.done(),
+ nullptr,
+ fromMigrate,
+ getWallClockTimeForOpLog(opCtx),
+ {},
+ kUninitializedStmtId,
+ {});
} else {
- repl::logOp(opCtx,
- "i",
- systemIndexes,
- {},
- indexDoc,
- nullptr,
- fromMigrate,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {});
+ logOperation(opCtx,
+ "i",
+ systemIndexes,
+ {},
+ indexDoc,
+ nullptr,
+ fromMigrate,
+ getWallClockTimeForOpLog(opCtx),
+ {},
+ kUninitializedStmtId,
+ {});
}
AuthorizationManager::get(opCtx->getServiceContext())
@@ -335,12 +362,12 @@ void OpObserverImpl::onCreateIndex(OperationContext* opCtx,
void OpObserverImpl::onInserts(OperationContext* opCtx,
const NamespaceString& nss,
OptionalCollectionUUID uuid,
- std::vector<InsertStatement>::const_iterator begin,
- std::vector<InsertStatement>::const_iterator end,
+ std::vector<InsertStatement>::const_iterator first,
+ std::vector<InsertStatement>::const_iterator last,
bool fromMigrate) {
Session* const session = opCtx->getTxnNumber() ? OperationContextSession::get(opCtx) : nullptr;
if (session && opCtx->writesAreReplicated() && session->inMultiDocumentTransaction()) {
- for (auto iter = begin; iter != end; iter++) {
+ for (auto iter = first; iter != last; iter++) {
auto operation = OplogEntry::makeInsertOperation(nss, uuid, iter->doc);
session->addTransactionOperation(opCtx, operation);
}
@@ -350,14 +377,19 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
const auto lastWriteDate = getWallClockTimeForOpLog(opCtx);
const auto opTimeList =
- repl::logInsertOps(opCtx, nss, uuid, session, begin, end, fromMigrate, lastWriteDate);
+ repl::logInsertOps(opCtx, nss, uuid, session, first, last, fromMigrate, lastWriteDate);
+
+ auto& times = OpObserver::Times::get(opCtx).reservedOpTimes;
+ using std::begin;
+ using std::end;
+ times.insert(end(times), begin(opTimeList), end(opTimeList));
auto css = (nss == NamespaceString::kSessionTransactionsTableNamespace || fromMigrate)
? nullptr
: CollectionShardingState::get(opCtx, nss);
size_t index = 0;
- for (auto it = begin; it != end; it++, index++) {
+ for (auto it = first; it != last; it++, index++) {
AuthorizationManager::get(opCtx->getServiceContext())
->logOp(opCtx, "i", nss, it->doc, nullptr);
if (css) {
@@ -372,19 +404,20 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
} else if (nss.coll() == DurableViewCatalog::viewsCollectionName()) {
DurableViewCatalog::onExternalChange(opCtx, nss);
} else if (nss.ns() == FeatureCompatibilityVersion::kCollection) {
- for (auto it = begin; it != end; it++) {
+ for (auto it = first; it != last; it++) {
FeatureCompatibilityVersion::onInsertOrUpdate(opCtx, it->doc);
}
} else if (nss == NamespaceString::kSessionTransactionsTableNamespace && !lastOpTime.isNull()) {
- for (auto it = begin; it != end; it++) {
+ for (auto it = first; it != last; it++) {
SessionCatalog::get(opCtx)->invalidateSessions(opCtx, it->doc);
}
}
std::vector<StmtId> stmtIdsWritten;
- std::transform(begin, end, std::back_inserter(stmtIdsWritten), [](const InsertStatement& stmt) {
- return stmt.stmtId;
- });
+ std::transform(first,
+ last,
+ std::back_inserter(stmtIdsWritten),
+ [](const InsertStatement& stmt) { return stmt.stmtId; });
onWriteOpCompleted(opCtx, nss, session, stmtIdsWritten, lastOpTime, lastWriteDate);
}
@@ -511,17 +544,17 @@ void OpObserverImpl::onInternalOpMessage(OperationContext* opCtx,
const BSONObj& msgObj,
const boost::optional<BSONObj> o2MsgObj) {
const BSONObj* o2MsgPtr = o2MsgObj ? o2MsgObj.get_ptr() : nullptr;
- repl::logOp(opCtx,
- "n",
- nss,
- uuid,
- msgObj,
- o2MsgPtr,
- false,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {});
+ logOperation(opCtx,
+ "n",
+ nss,
+ uuid,
+ msgObj,
+ o2MsgPtr,
+ false,
+ getWallClockTimeForOpLog(opCtx),
+ {},
+ kUninitializedStmtId,
+ {});
}
void OpObserverImpl::onCreateCollection(OperationContext* opCtx,
@@ -554,17 +587,17 @@ void OpObserverImpl::onCreateCollection(OperationContext* opCtx,
if (!collectionName.isSystemDotProfile()) {
// do not replicate system.profile modifications
- repl::logOp(opCtx,
- "c",
- cmdNss,
- options.uuid,
- cmdObj,
- nullptr,
- false,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {});
+ logOperation(opCtx,
+ "c",
+ cmdNss,
+ options.uuid,
+ cmdObj,
+ nullptr,
+ false,
+ getWallClockTimeForOpLog(opCtx),
+ {},
+ kUninitializedStmtId,
+ {});
}
AuthorizationManager::get(opCtx->getServiceContext())
@@ -600,17 +633,17 @@ void OpObserverImpl::onCollMod(OperationContext* opCtx,
if (!nss.isSystemDotProfile()) {
// do not replicate system.profile modifications
- repl::logOp(opCtx,
- "c",
- cmdNss,
- uuid,
- cmdObj,
- &o2Obj,
- false,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {});
+ logOperation(opCtx,
+ "c",
+ cmdNss,
+ uuid,
+ cmdObj,
+ &o2Obj,
+ false,
+ getWallClockTimeForOpLog(opCtx),
+ {},
+ kUninitializedStmtId,
+ {});
}
AuthorizationManager::get(opCtx->getServiceContext())
@@ -634,17 +667,17 @@ void OpObserverImpl::onDropDatabase(OperationContext* opCtx, const std::string&
const NamespaceString cmdNss{dbName, "$cmd"};
const auto cmdObj = BSON("dropDatabase" << 1);
- repl::logOp(opCtx,
- "c",
- cmdNss,
- {},
- cmdObj,
- nullptr,
- false,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {});
+ logOperation(opCtx,
+ "c",
+ cmdNss,
+ {},
+ cmdObj,
+ nullptr,
+ false,
+ getWallClockTimeForOpLog(opCtx),
+ {},
+ kUninitializedStmtId,
+ {});
uassert(50714,
"dropping the admin database is not allowed.",
@@ -666,20 +699,19 @@ repl::OpTime OpObserverImpl::onDropCollection(OperationContext* opCtx,
const auto cmdNss = collectionName.getCommandNS();
const auto cmdObj = BSON("drop" << collectionName.coll());
- repl::OpTime dropOpTime;
if (!collectionName.isSystemDotProfile()) {
// Do not replicate system.profile modifications.
- dropOpTime = repl::logOp(opCtx,
- "c",
- cmdNss,
- uuid,
- cmdObj,
- nullptr,
- false,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {});
+ logOperation(opCtx,
+ "c",
+ cmdNss,
+ uuid,
+ cmdObj,
+ nullptr,
+ false,
+ getWallClockTimeForOpLog(opCtx),
+ {},
+ kUninitializedStmtId,
+ {});
}
uassert(50715,
@@ -701,7 +733,7 @@ repl::OpTime OpObserverImpl::onDropCollection(OperationContext* opCtx,
// Evict namespace entry from the namespace/uuid cache if it exists.
NamespaceUUIDCache::get(opCtx).evictNamespace(collectionName);
- return dropOpTime;
+ return {};
}
void OpObserverImpl::onDropIndex(OperationContext* opCtx,
@@ -712,23 +744,23 @@ void OpObserverImpl::onDropIndex(OperationContext* opCtx,
const auto cmdNss = nss.getCommandNS();
const auto cmdObj = BSON("dropIndexes" << nss.coll() << "index" << indexName);
- repl::logOp(opCtx,
- "c",
- cmdNss,
- uuid,
- cmdObj,
- &indexInfo,
- false,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {});
+ logOperation(opCtx,
+ "c",
+ cmdNss,
+ uuid,
+ cmdObj,
+ &indexInfo,
+ false,
+ getWallClockTimeForOpLog(opCtx),
+ {},
+ kUninitializedStmtId,
+ {});
AuthorizationManager::get(opCtx->getServiceContext())
->logOp(opCtx, "c", cmdNss, cmdObj, &indexInfo);
}
-repl::OpTime OpObserverImpl::onRenameCollection(OperationContext* opCtx,
+repl::OpTime OpObserverImpl::onRenameCollection(OperationContext* const opCtx,
const NamespaceString& fromCollection,
const NamespaceString& toCollection,
OptionalCollectionUUID uuid,
@@ -749,17 +781,17 @@ repl::OpTime OpObserverImpl::onRenameCollection(OperationContext* opCtx,
const auto cmdObj = builder.done();
- const auto renameOpTime = repl::logOp(opCtx,
- "c",
- cmdNss,
- uuid,
- cmdObj,
- nullptr,
- false,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {});
+ logOperation(opCtx,
+ "c",
+ cmdNss,
+ uuid,
+ cmdObj,
+ nullptr,
+ false,
+ getWallClockTimeForOpLog(opCtx),
+ {},
+ kUninitializedStmtId,
+ {});
if (fromCollection.isSystemDotViews())
DurableViewCatalog::onExternalChange(opCtx, fromCollection);
@@ -776,7 +808,7 @@ repl::OpTime OpObserverImpl::onRenameCollection(OperationContext* opCtx,
opCtx->recoveryUnit()->onRollback(
[&cache, toCollection]() { cache.evictNamespace(toCollection); });
- return renameOpTime;
+ return {};
}
void OpObserverImpl::onApplyOps(OperationContext* opCtx,
@@ -797,17 +829,17 @@ void OpObserverImpl::onEmptyCapped(OperationContext* opCtx,
if (!collectionName.isSystemDotProfile()) {
// Do not replicate system.profile modifications
- repl::logOp(opCtx,
- "c",
- cmdNss,
- uuid,
- cmdObj,
- nullptr,
- false,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {});
+ logOperation(opCtx,
+ "c",
+ cmdNss,
+ uuid,
+ cmdObj,
+ nullptr,
+ false,
+ getWallClockTimeForOpLog(opCtx),
+ {},
+ kUninitializedStmtId,
+ {});
}
AuthorizationManager::get(opCtx->getServiceContext())
diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp
index 37449d5f177..cc62f5dfbe1 100644
--- a/src/mongo/db/op_observer_impl_test.cpp
+++ b/src/mongo/db/op_observer_impl_test.cpp
@@ -195,7 +195,8 @@ TEST_F(OpObserverTest, OnDropCollectionReturnsDropOpTime) {
{
AutoGetDb autoDb(opCtx.get(), nss.db(), MODE_X);
WriteUnitOfWork wunit(opCtx.get());
- dropOpTime = opObserver.onDropCollection(opCtx.get(), nss, uuid);
+ opObserver.onDropCollection(opCtx.get(), nss, uuid);
+ dropOpTime = OpObserver::Times::get(opCtx.get()).reservedOpTimes.front();
wunit.commit();
}
@@ -229,8 +230,9 @@ TEST_F(OpObserverTest, OnRenameCollectionReturnsRenameOpTime) {
{
AutoGetDb autoDb(opCtx.get(), sourceNss.db(), MODE_X);
WriteUnitOfWork wunit(opCtx.get());
- renameOpTime = opObserver.onRenameCollection(
+ opObserver.onRenameCollection(
opCtx.get(), sourceNss, targetNss, {}, dropTarget, {}, stayTemp);
+ renameOpTime = OpObserver::Times::get(opCtx.get()).reservedOpTimes.front();
wunit.commit();
}
diff --git a/src/mongo/db/op_observer_registry.h b/src/mongo/db/op_observer_registry.h
index 30a1dcd5de5..c233eedfd6e 100644
--- a/src/mongo/db/op_observer_registry.h
+++ b/src/mongo/db/op_observer_registry.h
@@ -53,122 +53,142 @@ public:
// Add 'observer' to the list of observers to call. Observers are called in registration order.
// Registration must be done while no calls to observers are made.
void addObserver(std::unique_ptr<OpObserver> observer) {
- _observers.emplace_back(std::move(observer));
+ _observers.push_back(std::move(observer));
}
- void onCreateIndex(OperationContext* opCtx,
+ void onCreateIndex(OperationContext* const opCtx,
const NamespaceString& nss,
OptionalCollectionUUID uuid,
BSONObj indexDoc,
bool fromMigrate) override {
+ ReservedTimes times{opCtx};
for (auto& o : _observers)
o->onCreateIndex(opCtx, nss, uuid, indexDoc, fromMigrate);
}
- void onInserts(OperationContext* opCtx,
+ void onInserts(OperationContext* const opCtx,
const NamespaceString& nss,
OptionalCollectionUUID uuid,
std::vector<InsertStatement>::const_iterator begin,
std::vector<InsertStatement>::const_iterator end,
bool fromMigrate) override {
+ ReservedTimes times{opCtx};
for (auto& o : _observers)
o->onInserts(opCtx, nss, uuid, begin, end, fromMigrate);
}
- void onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) override {
+ void onUpdate(OperationContext* const opCtx, const OplogUpdateEntryArgs& args) override {
+ ReservedTimes times{opCtx};
for (auto& o : _observers)
o->onUpdate(opCtx, args);
}
- void aboutToDelete(OperationContext* opCtx,
+ void aboutToDelete(OperationContext* const opCtx,
const NamespaceString& nss,
const BSONObj& doc) override {
+ ReservedTimes times{opCtx};
for (auto& o : _observers)
o->aboutToDelete(opCtx, nss, doc);
}
- void onDelete(OperationContext* opCtx,
+ void onDelete(OperationContext* const opCtx,
const NamespaceString& nss,
OptionalCollectionUUID uuid,
StmtId stmtId,
bool fromMigrate,
const boost::optional<BSONObj>& deletedDoc) override {
+ ReservedTimes times{opCtx};
for (auto& o : _observers)
o->onDelete(opCtx, nss, uuid, stmtId, fromMigrate, deletedDoc);
}
- void onInternalOpMessage(OperationContext* opCtx,
+ void onInternalOpMessage(OperationContext* const opCtx,
const NamespaceString& nss,
const boost::optional<UUID> uuid,
const BSONObj& msgObj,
const boost::optional<BSONObj> o2MsgObj) override {
+ ReservedTimes times{opCtx};
for (auto& o : _observers)
o->onInternalOpMessage(opCtx, nss, uuid, msgObj, o2MsgObj);
}
- void onCreateCollection(OperationContext* opCtx,
+ void onCreateCollection(OperationContext* const opCtx,
Collection* coll,
const NamespaceString& collectionName,
const CollectionOptions& options,
const BSONObj& idIndex) override {
+ ReservedTimes times{opCtx};
for (auto& o : _observers)
o->onCreateCollection(opCtx, coll, collectionName, options, idIndex);
}
- void onCollMod(OperationContext* opCtx,
+ void onCollMod(OperationContext* const opCtx,
const NamespaceString& nss,
OptionalCollectionUUID uuid,
const BSONObj& collModCmd,
const CollectionOptions& oldCollOptions,
boost::optional<TTLCollModInfo> ttlInfo) override {
+ ReservedTimes times{opCtx};
for (auto& o : _observers)
o->onCollMod(opCtx, nss, uuid, collModCmd, oldCollOptions, ttlInfo);
}
- void onDropDatabase(OperationContext* opCtx, const std::string& dbName) override {
+ void onDropDatabase(OperationContext* const opCtx, const std::string& dbName) override {
+ ReservedTimes times{opCtx};
for (auto& o : _observers)
o->onDropDatabase(opCtx, dbName);
}
- repl::OpTime onDropCollection(OperationContext* opCtx,
+
+ repl::OpTime onDropCollection(OperationContext* const opCtx,
const NamespaceString& collectionName,
- OptionalCollectionUUID uuid) override {
- return _forEachObserver([&](auto& observer) -> repl::OpTime {
- return observer.onDropCollection(opCtx, collectionName, uuid);
- });
+ const OptionalCollectionUUID uuid) override {
+ ReservedTimes times{opCtx};
+ for (auto& observer : this->_observers) {
+ auto time = observer->onDropCollection(opCtx, collectionName, uuid);
+ invariant(time.isNull());
+ }
+ return _getOpTimeToReturn(times.get().reservedOpTimes);
}
- void onDropIndex(OperationContext* opCtx,
+ void onDropIndex(OperationContext* const opCtx,
const NamespaceString& nss,
OptionalCollectionUUID uuid,
const std::string& indexName,
const BSONObj& idxDescriptor) override {
+ ReservedTimes times{opCtx};
for (auto& o : _observers)
o->onDropIndex(opCtx, nss, uuid, indexName, idxDescriptor);
}
- repl::OpTime onRenameCollection(OperationContext* opCtx,
+ repl::OpTime onRenameCollection(OperationContext* const opCtx,
const NamespaceString& fromCollection,
const NamespaceString& toCollection,
OptionalCollectionUUID uuid,
bool dropTarget,
OptionalCollectionUUID dropTargetUUID,
bool stayTemp) override {
- return _forEachObserver([&](auto& observer) -> repl::OpTime {
- return observer.onRenameCollection(
+ ReservedTimes times{opCtx};
+ for (auto& observer : this->_observers) {
+ const auto time = observer->onRenameCollection(
opCtx, fromCollection, toCollection, uuid, dropTarget, dropTargetUUID, stayTemp);
- });
+ invariant(time.isNull());
+ }
+
+ return _getOpTimeToReturn(times.get().reservedOpTimes);
}
- void onApplyOps(OperationContext* opCtx,
+ void onApplyOps(OperationContext* const opCtx,
const std::string& dbName,
const BSONObj& applyOpCmd) override {
+ ReservedTimes times{opCtx};
for (auto& o : _observers)
o->onApplyOps(opCtx, dbName, applyOpCmd);
}
- void onEmptyCapped(OperationContext* opCtx,
+ void onEmptyCapped(OperationContext* const opCtx,
const NamespaceString& collectionName,
OptionalCollectionUUID uuid) {
+ ReservedTimes times{opCtx};
for (auto& o : _observers)
o->onEmptyCapped(opCtx, collectionName, uuid);
}
@@ -190,17 +210,14 @@ public:
}
private:
- repl::OpTime _forEachObserver(stdx::function<repl::OpTime(OpObserver&)> f) {
- repl::OpTime opTime;
- for (auto& observer : _observers) {
- repl::OpTime newTime = f(*observer);
- if (!newTime.isNull() && newTime != opTime) {
- invariant(opTime.isNull());
- opTime = newTime;
- }
+ static repl::OpTime _getOpTimeToReturn(const std::vector<repl::OpTime>& times) {
+ if (times.empty()) {
+ return repl::OpTime{};
}
- return opTime;
+ invariant(times.size() == 1);
+ return times.front();
}
+
std::vector<std::unique_ptr<OpObserver>> _observers;
};
} // namespace mongo
diff --git a/src/mongo/db/op_observer_registry_test.cpp b/src/mongo/db/op_observer_registry_test.cpp
index 6447962e323..5589860e1e7 100644
--- a/src/mongo/db/op_observer_registry_test.cpp
+++ b/src/mongo/db/op_observer_registry_test.cpp
@@ -26,8 +26,12 @@
* it in the license file.
*/
+#include "mongo/platform/basic.h"
+
#include "mongo/db/op_observer_registry.h"
+
#include "mongo/db/op_observer_noop.h"
+#include "mongo/db/operation_context_noop.h"
#include "mongo/db/repl/optime.h"
#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
@@ -52,7 +56,8 @@ struct TestObserver : public OpObserverNoop {
const NamespaceString& collectionName,
OptionalCollectionUUID uuid) {
drops++;
- return opTime;
+ OpObserver::Times::get(opCtx).reservedOpTimes.push_back(opTime);
+ return {};
}
repl::OpTime onRenameCollection(OperationContext* opCtx,
const NamespaceString& fromCollection,
@@ -61,7 +66,8 @@ struct TestObserver : public OpObserverNoop {
bool dropTarget,
OptionalCollectionUUID dropTargetUUID,
bool stayTemp) {
- return opTime;
+ OpObserver::Times::get(opCtx).reservedOpTimes.push_back(opTime);
+ return {};
}
};
@@ -81,22 +87,22 @@ struct OpObserverRegistryTest : public unittest::Test {
OpObserverRegistry registry;
/**
* The 'op' function calls an observer method on the registry that returns an OpTime.
- * The method checks that the registry correctly merges the results of the registered observers.
+ * The method checks that the registry correctly returns only the first observer's `OpTime`.
*/
void checkConsistentOpTime(stdx::function<repl::OpTime()> op) {
const repl::OpTime myTime(Timestamp(1, 1), 1);
ASSERT(op() == repl::OpTime());
observer1->opTime = myTime;
ASSERT(op() == myTime);
- observer2->opTime = myTime;
+ observer2->opTime = repl::OpTime(Timestamp(1, 1), 2);
ASSERT(op() == myTime);
observer1->opTime = {};
- ASSERT(op() == myTime);
+ ASSERT(op() == repl::OpTime{});
}
/**
* The 'op' function calls an observer method on the registry that returns an OpTime.
- * The method checks that the registry invariants if the observers return conflicting times.
+ * The method checks that the registry invariants if the observers return multiple times.
*/
void checkInconsistentOpTime(stdx::function<repl::OpTime()> op) {
observer1->opTime = repl::OpTime(Timestamp(1, 1), 1);
@@ -106,67 +112,75 @@ struct OpObserverRegistryTest : public unittest::Test {
};
TEST_F(OpObserverRegistryTest, NoObservers) {
+ OperationContextNoop opCtx;
// Check that it's OK to call observer methods with no observers registered.
- registry.onDropDatabase(nullptr, "test");
+ registry.onDropDatabase(&opCtx, "test");
}
TEST_F(OpObserverRegistryTest, TwoObservers) {
+ OperationContextNoop opCtx;
ASSERT_EQUALS(testObservers, 2);
registry.addObserver(std::move(unique1));
registry.addObserver(std::move(unique2));
- registry.onDropDatabase(nullptr, "test");
+ registry.onDropDatabase(&opCtx, "test");
ASSERT_EQUALS(observer1->drops, 1);
ASSERT_EQUALS(observer2->drops, 1);
}
TEST_F(OpObserverRegistryTest, ThrowingObserver1) {
+ OperationContextNoop opCtx;
unique1 = stdx::make_unique<ThrowingObserver>();
observer1 = unique1.get();
registry.addObserver(std::move(unique1));
registry.addObserver(std::move(unique2));
- ASSERT_THROWS(registry.onDropDatabase(nullptr, "test"), AssertionException);
+ ASSERT_THROWS(registry.onDropDatabase(&opCtx, "test"), AssertionException);
ASSERT_EQUALS(observer1->drops, 1);
ASSERT_EQUALS(observer2->drops, 0);
}
TEST_F(OpObserverRegistryTest, ThrowingObserver2) {
+ OperationContextNoop opCtx;
unique2 = stdx::make_unique<ThrowingObserver>();
observer2 = unique1.get();
registry.addObserver(std::move(unique1));
registry.addObserver(std::move(unique2));
- ASSERT_THROWS(registry.onDropDatabase(nullptr, "test"), AssertionException);
+ ASSERT_THROWS(registry.onDropDatabase(&opCtx, "test"), AssertionException);
ASSERT_EQUALS(observer1->drops, 1);
ASSERT_EQUALS(observer2->drops, 1);
}
TEST_F(OpObserverRegistryTest, OnDropCollectionObserverResultReturnsRightTime) {
+ OperationContextNoop opCtx;
registry.addObserver(std::move(unique1));
- registry.addObserver(std::move(unique2));
- auto op = [&]() -> repl::OpTime { return registry.onDropCollection(nullptr, testNss, {}); };
+ registry.addObserver(std::make_unique<OpObserverNoop>());
+ auto op = [&]() -> repl::OpTime { return registry.onDropCollection(&opCtx, testNss, {}); };
checkConsistentOpTime(op);
}
TEST_F(OpObserverRegistryTest, OnRenameCollectionObserverResultReturnsRightTime) {
+ OperationContextNoop opCtx;
registry.addObserver(std::move(unique1));
- registry.addObserver(std::move(unique2));
+ registry.addObserver(std::make_unique<OpObserverNoop>());
auto op = [&]() -> repl::OpTime {
- return registry.onRenameCollection(nullptr, testNss, testNss, {}, false, {}, false);
+ return registry.onRenameCollection(&opCtx, testNss, testNss, {}, false, {}, false);
};
checkConsistentOpTime(op);
}
DEATH_TEST_F(OpObserverRegistryTest, OnDropCollectionReturnsInconsistentTime, "invariant") {
+ OperationContextNoop opCtx;
registry.addObserver(std::move(unique1));
registry.addObserver(std::move(unique2));
- auto op = [&]() -> repl::OpTime { return registry.onDropCollection(nullptr, testNss, {}); };
+ auto op = [&]() -> repl::OpTime { return registry.onDropCollection(&opCtx, testNss, {}); };
checkInconsistentOpTime(op);
}
DEATH_TEST_F(OpObserverRegistryTest, OnRenameCollectionReturnsInconsistentTime, "invariant") {
+ OperationContextNoop opCtx;
registry.addObserver(std::move(unique1));
registry.addObserver(std::move(unique2));
auto op = [&]() -> repl::OpTime {
- return registry.onRenameCollection(nullptr, testNss, testNss, {}, false, {}, false);
+ return registry.onRenameCollection(&opCtx, testNss, testNss, {}, false, {}, false);
};
checkInconsistentOpTime(op);
}
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 69f284f3af4..670f315d71e 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -23,6 +23,7 @@ env.Library(
'$BUILD_DIR/mongo/db/dbdirectclient',
'$BUILD_DIR/mongo/db/dbhelpers',
'$BUILD_DIR/mongo/db/index_d',
+ '$BUILD_DIR/mongo/db/op_observer',
'$BUILD_DIR/mongo/idl/idl_parser',
],
)
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index a098a5b7b38..4b9158877dd 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -1,5 +1,3 @@
-// @file oplog.cpp
-
/**
* Copyright (C) 2008-2014 MongoDB Inc.
*
diff --git a/src/mongo/dbtests/SConscript b/src/mongo/dbtests/SConscript
index 3ebeb531a43..93e46007afc 100644
--- a/src/mongo/dbtests/SConscript
+++ b/src/mongo/dbtests/SConscript
@@ -25,6 +25,7 @@ env.Library(
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/dbdirectclient',
+ '$BUILD_DIR/mongo/db/op_observer',
'$BUILD_DIR/mongo/db/service_context_d',
'$BUILD_DIR/mongo/db/s/sharding',
'$BUILD_DIR/mongo/s/coreshard',
diff --git a/src/mongo/s/sharding_mongod_test_fixture.cpp b/src/mongo/s/sharding_mongod_test_fixture.cpp
index dea70878919..e4813f6f887 100644
--- a/src/mongo/s/sharding_mongod_test_fixture.cpp
+++ b/src/mongo/s/sharding_mongod_test_fixture.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/commands.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/op_observer_impl.h"
+#include "mongo/db/op_observer_registry.h"
#include "mongo/db/query/cursor_response.h"
#include "mongo/db/query/query_request.h"
#include "mongo/db/repl/drop_pending_collection_reaper.h"
@@ -142,7 +143,13 @@ void ShardingMongodTestFixture::setUp() {
repl::StorageInterface::set(service, std::move(storagePtr));
- service->setOpObserver(stdx::make_unique<OpObserverImpl>());
+ auto makeOpObserver = [&] {
+ auto opObserver = stdx::make_unique<OpObserverRegistry>();
+ opObserver->addObserver(stdx::make_unique<OpObserverImpl>());
+ return opObserver;
+ };
+ service->setOpObserver(makeOpObserver());
+
repl::setOplogCollectionName(service);
repl::createOplog(_opCtx.get());