summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/shard_server_op_observer.cpp
diff options
context:
space:
mode:
authorIan Boros <ian.boros@mongodb.com>2020-08-17 15:02:53 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-09-09 03:06:57 +0000
commitcbb82fd1b270f84e544243acbba2cb3fed779c28 (patch)
treeafe0fc762f61d49a67551e8b5a1e990965f9e5de /src/mongo/db/s/shard_server_op_observer.cpp
parent58fd67fc2232a4ca591ff66443fd22213d4b5cac (diff)
downloadmongo-cbb82fd1b270f84e544243acbba2cb3fed779c28.tar.gz
SERVER-50218 Change shard server and config server op observers to support $v:2 update oplog entries
Diffstat (limited to 'src/mongo/db/s/shard_server_op_observer.cpp')
-rw-r--r--src/mongo/db/s/shard_server_op_observer.cpp74
1 files changed, 34 insertions, 40 deletions
diff --git a/src/mongo/db/s/shard_server_op_observer.cpp b/src/mongo/db/s/shard_server_op_observer.cpp
index 70d6da97ec1..3cef6d2aebd 100644
--- a/src/mongo/db/s/shard_server_op_observer.cpp
+++ b/src/mongo/db/s/shard_server_op_observer.cpp
@@ -46,6 +46,7 @@
#include "mongo/db/s/sharding_initialization_mongod.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/s/type_shard_identity.h"
+#include "mongo/db/update/update_oplog_entry_serialization.h"
#include "mongo/logv2/log.h"
#include "mongo/s/balancer_configuration.h"
#include "mongo/s/catalog/type_shard_collection.h"
@@ -280,7 +281,11 @@ void ShardServerOpObserver::onInserts(OperationContext* opCtx,
}
void ShardServerOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) {
- if (args.nss == NamespaceString::kShardConfigCollectionsNamespace) {
+ const auto& updateDoc = args.updateArgs.update;
+ // Most of these handlers do not need to run when the update is a full document replacement.
+ const bool isReplacementUpdate = (update_oplog_entry::extractUpdateType(updateDoc) ==
+ update_oplog_entry::UpdateType::kReplacement);
+ if (args.nss == NamespaceString::kShardConfigCollectionsNamespace && !isReplacementUpdate) {
// Notification of routing table changes are only needed on secondaries
if (isStandaloneOrPrimary(opCtx)) {
return;
@@ -309,32 +314,27 @@ void ShardServerOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateE
return NamespaceString(coll);
}());
- // Parse the '$set' update
- BSONElement setElement;
- Status setStatus =
- bsonExtractTypedField(args.updateArgs.update, StringData("$set"), Object, &setElement);
- if (setStatus.isOK()) {
- BSONObj setField = setElement.Obj();
+ auto enterCriticalSectionFieldNewVal = update_oplog_entry::extractNewValueForField(
+ updateDoc, ShardCollectionType::kEnterCriticalSectionCounterFieldName);
+ auto refreshingFieldNewVal = update_oplog_entry::extractNewValueForField(
+ updateDoc, ShardCollectionType::kRefreshingFieldName);
- // Need the WUOW to retain the lock for CollectionVersionLogOpHandler::commit()
- AutoGetCollection autoColl(opCtx, updatedNss, MODE_IX);
-
- auto refreshingField = setField.getField(ShardCollectionType::kRefreshingFieldName);
- if (refreshingField.isBoolean() && !refreshingField.boolean()) {
- opCtx->recoveryUnit()->registerChange(
- std::make_unique<CollectionVersionLogOpHandler>(opCtx, updatedNss));
- }
+ // Need the WUOW to retain the lock for CollectionVersionLogOpHandler::commit().
+ AutoGetCollection autoColl(opCtx, updatedNss, MODE_IX);
+ if (refreshingFieldNewVal.isBoolean() && !refreshingFieldNewVal.boolean()) {
+ opCtx->recoveryUnit()->registerChange(
+ std::make_unique<CollectionVersionLogOpHandler>(opCtx, updatedNss));
+ }
- if (setField.hasField(ShardCollectionType::kEnterCriticalSectionCounterFieldName)) {
- // Force subsequent uses of the namespace to refresh the filtering metadata so they
- // can synchronize with any work happening on the primary (e.g., migration critical
- // section).
- CollectionShardingRuntime::get(opCtx, updatedNss)->clearFilteringMetadata(opCtx);
- }
+ if (enterCriticalSectionFieldNewVal.ok()) {
+ // Force subsequent uses of the namespace to refresh the filtering metadata so they
+ // can synchronize with any work happening on the primary (e.g., migration critical
+ // section).
+ CollectionShardingRuntime::get(opCtx, updatedNss)->clearFilteringMetadata(opCtx);
}
}
- if (args.nss == NamespaceString::kShardConfigDatabasesNamespace) {
+ if (args.nss == NamespaceString::kShardConfigDatabasesNamespace && !isReplacementUpdate) {
// Notification of routing table changes are only needed on secondaries
if (isStandaloneOrPrimary(opCtx)) {
return;
@@ -353,31 +353,25 @@ void ShardServerOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateE
40478,
bsonExtractStringField(args.updateArgs.criteria, ShardDatabaseType::name.name(), &db));
- // Parse the '$set' update
- BSONElement setElement;
- Status setStatus =
- bsonExtractTypedField(args.updateArgs.update, StringData("$set"), Object, &setElement);
- if (setStatus.isOK()) {
- BSONObj setField = setElement.Obj();
-
- if (setField.hasField(ShardDatabaseType::enterCriticalSectionCounter.name())) {
- AutoGetDb autoDb(opCtx, db, MODE_X);
- auto dss = DatabaseShardingState::get(opCtx, db);
- auto dssLock = DatabaseShardingState::DSSLock::lockExclusive(opCtx, dss);
- dss->setDbVersion(opCtx, boost::none, dssLock);
- }
+ auto enterCriticalSectionCounterFieldNewVal = update_oplog_entry::extractNewValueForField(
+ updateDoc, ShardDatabaseType::enterCriticalSectionCounter.name());
+
+ if (enterCriticalSectionCounterFieldNewVal.ok()) {
+ AutoGetDb autoDb(opCtx, db, MODE_X);
+ auto dss = DatabaseShardingState::get(opCtx, db);
+ auto dssLock = DatabaseShardingState::DSSLock::lockExclusive(opCtx, dss);
+ dss->setDbVersion(opCtx, boost::none, dssLock);
}
}
- if (args.nss == NamespaceString::kRangeDeletionNamespace) {
+ if (args.nss == NamespaceString::kRangeDeletionNamespace && !isReplacementUpdate) {
if (!isStandaloneOrPrimary(opCtx))
return;
- BSONElement unsetElement;
- Status status = bsonExtractTypedField(
- args.updateArgs.update, StringData("$unset"), Object, &unsetElement);
+ const auto pendingFieldRemovedStatus =
+ update_oplog_entry::isFieldRemovedByUpdate(args.updateArgs.update, "pending");
- if (unsetElement.Obj().hasField("pending")) {
+ if (pendingFieldRemovedStatus == update_oplog_entry::FieldRemovedStatus::kFieldRemoved) {
auto deletionTask = RangeDeletionTask::parse(
IDLParserErrorContext("ShardServerOpObserver"), args.updateArgs.updatedDoc);