diff options
author | Ian Boros <ian.boros@mongodb.com> | 2020-08-17 15:02:53 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-09-09 03:06:57 +0000 |
commit | cbb82fd1b270f84e544243acbba2cb3fed779c28 (patch) | |
tree | afe0fc762f61d49a67551e8b5a1e990965f9e5de /src/mongo/db/s/shard_server_op_observer.cpp | |
parent | 58fd67fc2232a4ca591ff66443fd22213d4b5cac (diff) | |
download | mongo-cbb82fd1b270f84e544243acbba2cb3fed779c28.tar.gz |
SERVER-50218 Change shard server and config server op observers to support $v:2 update oplog entries
Diffstat (limited to 'src/mongo/db/s/shard_server_op_observer.cpp')
-rw-r--r-- | src/mongo/db/s/shard_server_op_observer.cpp | 74 |
1 files changed, 34 insertions, 40 deletions
diff --git a/src/mongo/db/s/shard_server_op_observer.cpp b/src/mongo/db/s/shard_server_op_observer.cpp index 70d6da97ec1..3cef6d2aebd 100644 --- a/src/mongo/db/s/shard_server_op_observer.cpp +++ b/src/mongo/db/s/shard_server_op_observer.cpp @@ -46,6 +46,7 @@ #include "mongo/db/s/sharding_initialization_mongod.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/s/type_shard_identity.h" +#include "mongo/db/update/update_oplog_entry_serialization.h" #include "mongo/logv2/log.h" #include "mongo/s/balancer_configuration.h" #include "mongo/s/catalog/type_shard_collection.h" @@ -280,7 +281,11 @@ void ShardServerOpObserver::onInserts(OperationContext* opCtx, } void ShardServerOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) { - if (args.nss == NamespaceString::kShardConfigCollectionsNamespace) { + const auto& updateDoc = args.updateArgs.update; + // Most of these handlers do not need to run when the update is a full document replacement. + const bool isReplacementUpdate = (update_oplog_entry::extractUpdateType(updateDoc) == + update_oplog_entry::UpdateType::kReplacement); + if (args.nss == NamespaceString::kShardConfigCollectionsNamespace && !isReplacementUpdate) { // Notification of routing table changes are only needed on secondaries if (isStandaloneOrPrimary(opCtx)) { return; @@ -309,32 +314,27 @@ void ShardServerOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateE return NamespaceString(coll); }()); - // Parse the '$set' update - BSONElement setElement; - Status setStatus = - bsonExtractTypedField(args.updateArgs.update, StringData("$set"), Object, &setElement); - if (setStatus.isOK()) { - BSONObj setField = setElement.Obj(); + auto enterCriticalSectionFieldNewVal = update_oplog_entry::extractNewValueForField( + updateDoc, ShardCollectionType::kEnterCriticalSectionCounterFieldName); + auto refreshingFieldNewVal = update_oplog_entry::extractNewValueForField( + updateDoc, ShardCollectionType::kRefreshingFieldName); - // Need the WUOW to retain the lock for CollectionVersionLogOpHandler::commit() - AutoGetCollection autoColl(opCtx, updatedNss, MODE_IX); - - auto refreshingField = setField.getField(ShardCollectionType::kRefreshingFieldName); - if (refreshingField.isBoolean() && !refreshingField.boolean()) { - opCtx->recoveryUnit()->registerChange( - std::make_unique<CollectionVersionLogOpHandler>(opCtx, updatedNss)); - } + // Need the WUOW to retain the lock for CollectionVersionLogOpHandler::commit(). + AutoGetCollection autoColl(opCtx, updatedNss, MODE_IX); + if (refreshingFieldNewVal.isBoolean() && !refreshingFieldNewVal.boolean()) { + opCtx->recoveryUnit()->registerChange( + std::make_unique<CollectionVersionLogOpHandler>(opCtx, updatedNss)); + } - if (setField.hasField(ShardCollectionType::kEnterCriticalSectionCounterFieldName)) { - // Force subsequent uses of the namespace to refresh the filtering metadata so they - // can synchronize with any work happening on the primary (e.g., migration critical - // section). - CollectionShardingRuntime::get(opCtx, updatedNss)->clearFilteringMetadata(opCtx); - } + if (enterCriticalSectionFieldNewVal.ok()) { + // Force subsequent uses of the namespace to refresh the filtering metadata so they + // can synchronize with any work happening on the primary (e.g., migration critical + // section). + CollectionShardingRuntime::get(opCtx, updatedNss)->clearFilteringMetadata(opCtx); } } - if (args.nss == NamespaceString::kShardConfigDatabasesNamespace) { + if (args.nss == NamespaceString::kShardConfigDatabasesNamespace && !isReplacementUpdate) { // Notification of routing table changes are only needed on secondaries if (isStandaloneOrPrimary(opCtx)) { return; @@ -353,31 +353,25 @@ void ShardServerOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateE 40478, bsonExtractStringField(args.updateArgs.criteria, ShardDatabaseType::name.name(), &db)); - // Parse the '$set' update - BSONElement setElement; - Status setStatus = - bsonExtractTypedField(args.updateArgs.update, StringData("$set"), Object, &setElement); - if (setStatus.isOK()) { - BSONObj setField = setElement.Obj(); - - if (setField.hasField(ShardDatabaseType::enterCriticalSectionCounter.name())) { - AutoGetDb autoDb(opCtx, db, MODE_X); - auto dss = DatabaseShardingState::get(opCtx, db); - auto dssLock = DatabaseShardingState::DSSLock::lockExclusive(opCtx, dss); - dss->setDbVersion(opCtx, boost::none, dssLock); - } + auto enterCriticalSectionCounterFieldNewVal = update_oplog_entry::extractNewValueForField( + updateDoc, ShardDatabaseType::enterCriticalSectionCounter.name()); + + if (enterCriticalSectionCounterFieldNewVal.ok()) { + AutoGetDb autoDb(opCtx, db, MODE_X); + auto dss = DatabaseShardingState::get(opCtx, db); + auto dssLock = DatabaseShardingState::DSSLock::lockExclusive(opCtx, dss); + dss->setDbVersion(opCtx, boost::none, dssLock); } } - if (args.nss == NamespaceString::kRangeDeletionNamespace) { + if (args.nss == NamespaceString::kRangeDeletionNamespace && !isReplacementUpdate) { if (!isStandaloneOrPrimary(opCtx)) return; - BSONElement unsetElement; - Status status = bsonExtractTypedField( - args.updateArgs.update, StringData("$unset"), Object, &unsetElement); + const auto pendingFieldRemovedStatus = + update_oplog_entry::isFieldRemovedByUpdate(args.updateArgs.update, "pending"); - if (unsetElement.Obj().hasField("pending")) { + if (pendingFieldRemovedStatus == update_oplog_entry::FieldRemovedStatus::kFieldRemoved) { auto deletionTask = RangeDeletionTask::parse( IDLParserErrorContext("ShardServerOpObserver"), args.updateArgs.updatedDoc); |