summaryrefslogtreecommitdiff
path: root/src/mongo/db/op_observer_registry.h
diff options
context:
space:
mode:
authorADAM David Alan Martin <adam.martin@10gen.com>2018-03-01 16:40:51 -0500
committerADAM David Alan Martin <adam.martin@10gen.com>2018-03-01 16:40:51 -0500
commit296fde1259d29e081069fde1c69bb9ae083932b1 (patch)
tree0e0973b0e6e642e09822f59be220be3a8c036466 /src/mongo/db/op_observer_registry.h
parent22b2b828a922a7459b4e1c75860a11c7eb3db630 (diff)
downloadmongo-296fde1259d29e081069fde1c69bb9ae083932b1.tar.gz
SERVER-32843 Allow multiple times in OpObservers
The OpObserverRegistry was introduced as an abstraction to allow decoupling making data modifications from the side effects, which need to happen as a result of these modifications, such as op log writes, retryable writes, etc. Some of the OpObserver's methods currently return the OpTime which resulted from logging the operation to the replication oplog. In addition, in certain cases, the OpTime resulting from an earlier OpObserver might be needed by a later one, which is the case with retryable writes. In order to support these requirements, the OpObserver(s) chain should have access to some common per-operation structure, where runtime information could be persisted.
Diffstat (limited to 'src/mongo/db/op_observer_registry.h')
-rw-r--r--src/mongo/db/op_observer_registry.h79
1 files changed, 48 insertions, 31 deletions
diff --git a/src/mongo/db/op_observer_registry.h b/src/mongo/db/op_observer_registry.h
index 30a1dcd5de5..c233eedfd6e 100644
--- a/src/mongo/db/op_observer_registry.h
+++ b/src/mongo/db/op_observer_registry.h
@@ -53,122 +53,142 @@ public:
// Add 'observer' to the list of observers to call. Observers are called in registration order.
// Registration must be done while no calls to observers are made.
void addObserver(std::unique_ptr<OpObserver> observer) {
- _observers.emplace_back(std::move(observer));
+ _observers.push_back(std::move(observer));
}
- void onCreateIndex(OperationContext* opCtx,
+ void onCreateIndex(OperationContext* const opCtx,
const NamespaceString& nss,
OptionalCollectionUUID uuid,
BSONObj indexDoc,
bool fromMigrate) override {
+ ReservedTimes times{opCtx};
for (auto& o : _observers)
o->onCreateIndex(opCtx, nss, uuid, indexDoc, fromMigrate);
}
- void onInserts(OperationContext* opCtx,
+ void onInserts(OperationContext* const opCtx,
const NamespaceString& nss,
OptionalCollectionUUID uuid,
std::vector<InsertStatement>::const_iterator begin,
std::vector<InsertStatement>::const_iterator end,
bool fromMigrate) override {
+ ReservedTimes times{opCtx};
for (auto& o : _observers)
o->onInserts(opCtx, nss, uuid, begin, end, fromMigrate);
}
- void onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) override {
+ void onUpdate(OperationContext* const opCtx, const OplogUpdateEntryArgs& args) override {
+ ReservedTimes times{opCtx};
for (auto& o : _observers)
o->onUpdate(opCtx, args);
}
- void aboutToDelete(OperationContext* opCtx,
+ void aboutToDelete(OperationContext* const opCtx,
const NamespaceString& nss,
const BSONObj& doc) override {
+ ReservedTimes times{opCtx};
for (auto& o : _observers)
o->aboutToDelete(opCtx, nss, doc);
}
- void onDelete(OperationContext* opCtx,
+ void onDelete(OperationContext* const opCtx,
const NamespaceString& nss,
OptionalCollectionUUID uuid,
StmtId stmtId,
bool fromMigrate,
const boost::optional<BSONObj>& deletedDoc) override {
+ ReservedTimes times{opCtx};
for (auto& o : _observers)
o->onDelete(opCtx, nss, uuid, stmtId, fromMigrate, deletedDoc);
}
- void onInternalOpMessage(OperationContext* opCtx,
+ void onInternalOpMessage(OperationContext* const opCtx,
const NamespaceString& nss,
const boost::optional<UUID> uuid,
const BSONObj& msgObj,
const boost::optional<BSONObj> o2MsgObj) override {
+ ReservedTimes times{opCtx};
for (auto& o : _observers)
o->onInternalOpMessage(opCtx, nss, uuid, msgObj, o2MsgObj);
}
- void onCreateCollection(OperationContext* opCtx,
+ void onCreateCollection(OperationContext* const opCtx,
Collection* coll,
const NamespaceString& collectionName,
const CollectionOptions& options,
const BSONObj& idIndex) override {
+ ReservedTimes times{opCtx};
for (auto& o : _observers)
o->onCreateCollection(opCtx, coll, collectionName, options, idIndex);
}
- void onCollMod(OperationContext* opCtx,
+ void onCollMod(OperationContext* const opCtx,
const NamespaceString& nss,
OptionalCollectionUUID uuid,
const BSONObj& collModCmd,
const CollectionOptions& oldCollOptions,
boost::optional<TTLCollModInfo> ttlInfo) override {
+ ReservedTimes times{opCtx};
for (auto& o : _observers)
o->onCollMod(opCtx, nss, uuid, collModCmd, oldCollOptions, ttlInfo);
}
- void onDropDatabase(OperationContext* opCtx, const std::string& dbName) override {
+ void onDropDatabase(OperationContext* const opCtx, const std::string& dbName) override {
+ ReservedTimes times{opCtx};
for (auto& o : _observers)
o->onDropDatabase(opCtx, dbName);
}
- repl::OpTime onDropCollection(OperationContext* opCtx,
+
+ repl::OpTime onDropCollection(OperationContext* const opCtx,
const NamespaceString& collectionName,
- OptionalCollectionUUID uuid) override {
- return _forEachObserver([&](auto& observer) -> repl::OpTime {
- return observer.onDropCollection(opCtx, collectionName, uuid);
- });
+ const OptionalCollectionUUID uuid) override {
+ ReservedTimes times{opCtx};
+ for (auto& observer : this->_observers) {
+ auto time = observer->onDropCollection(opCtx, collectionName, uuid);
+ invariant(time.isNull());
+ }
+ return _getOpTimeToReturn(times.get().reservedOpTimes);
}
- void onDropIndex(OperationContext* opCtx,
+ void onDropIndex(OperationContext* const opCtx,
const NamespaceString& nss,
OptionalCollectionUUID uuid,
const std::string& indexName,
const BSONObj& idxDescriptor) override {
+ ReservedTimes times{opCtx};
for (auto& o : _observers)
o->onDropIndex(opCtx, nss, uuid, indexName, idxDescriptor);
}
- repl::OpTime onRenameCollection(OperationContext* opCtx,
+ repl::OpTime onRenameCollection(OperationContext* const opCtx,
const NamespaceString& fromCollection,
const NamespaceString& toCollection,
OptionalCollectionUUID uuid,
bool dropTarget,
OptionalCollectionUUID dropTargetUUID,
bool stayTemp) override {
- return _forEachObserver([&](auto& observer) -> repl::OpTime {
- return observer.onRenameCollection(
+ ReservedTimes times{opCtx};
+ for (auto& observer : this->_observers) {
+ const auto time = observer->onRenameCollection(
opCtx, fromCollection, toCollection, uuid, dropTarget, dropTargetUUID, stayTemp);
- });
+ invariant(time.isNull());
+ }
+
+ return _getOpTimeToReturn(times.get().reservedOpTimes);
}
- void onApplyOps(OperationContext* opCtx,
+ void onApplyOps(OperationContext* const opCtx,
const std::string& dbName,
const BSONObj& applyOpCmd) override {
+ ReservedTimes times{opCtx};
for (auto& o : _observers)
o->onApplyOps(opCtx, dbName, applyOpCmd);
}
- void onEmptyCapped(OperationContext* opCtx,
+ void onEmptyCapped(OperationContext* const opCtx,
const NamespaceString& collectionName,
OptionalCollectionUUID uuid) {
+ ReservedTimes times{opCtx};
for (auto& o : _observers)
o->onEmptyCapped(opCtx, collectionName, uuid);
}
@@ -190,17 +210,14 @@ public:
}
private:
- repl::OpTime _forEachObserver(stdx::function<repl::OpTime(OpObserver&)> f) {
- repl::OpTime opTime;
- for (auto& observer : _observers) {
- repl::OpTime newTime = f(*observer);
- if (!newTime.isNull() && newTime != opTime) {
- invariant(opTime.isNull());
- opTime = newTime;
- }
+ static repl::OpTime _getOpTimeToReturn(const std::vector<repl::OpTime>& times) {
+ if (times.empty()) {
+ return repl::OpTime{};
}
- return opTime;
+ invariant(times.size() == 1);
+ return times.front();
}
+
std::vector<std::unique_ptr<OpObserver>> _observers;
};
} // namespace mongo