summaryrefslogtreecommitdiff
path: root/src/mongo/db/commands/find_and_modify.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/commands/find_and_modify.cpp')
-rw-r--r--src/mongo/db/commands/find_and_modify.cpp51
1 files changed, 28 insertions, 23 deletions
diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp
index 20fd3df1466..0657d640e77 100644
--- a/src/mongo/db/commands/find_and_modify.cpp
+++ b/src/mongo/db/commands/find_and_modify.cpp
@@ -61,8 +61,8 @@
#include "mongo/db/query/plan_executor.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replication_coordinator_global.h"
+#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/write_concern.h"
-#include "mongo/s/d_state.h"
#include "mongo/util/log.h"
#include "mongo/util/scopeguard.h"
@@ -257,19 +257,18 @@ public:
// Explain calls of the findAndModify command are read-only, but we take write
// locks so that the timing information is more accurate.
- AutoGetDb autoDb(txn, dbName, MODE_IX);
- Lock::CollectionLock collLock(txn->lockState(), nsString.ns(), MODE_IX);
-
- ensureShardVersionOKOrThrow(txn, nsString.ns());
-
- Collection* collection = nullptr;
- if (autoDb.getDb()) {
- collection = autoDb.getDb()->getCollection(nsString.ns());
- } else {
+ AutoGetCollection autoColl(txn, nsString, MODE_IX);
+ if (!autoColl.getDb()) {
return {ErrorCodes::NamespaceNotFound,
str::stream() << "database " << dbName << " does not exist."};
}
+ auto css = CollectionShardingState::get(txn, nsString);
+ if (css) {
+ css->checkShardVersionOrThrow(txn);
+ }
+
+ Collection* const collection = autoColl.getCollection();
auto statusWithPlanExecutor = getExecutorDelete(txn, collection, &parsedDelete);
if (!statusWithPlanExecutor.isOK()) {
return statusWithPlanExecutor.getStatus();
@@ -293,19 +292,18 @@ public:
// Explain calls of the findAndModify command are read-only, but we take write
// locks so that the timing information is more accurate.
- AutoGetDb autoDb(txn, dbName, MODE_IX);
- Lock::CollectionLock collLock(txn->lockState(), nsString.ns(), MODE_IX);
-
- ensureShardVersionOKOrThrow(txn, nsString.ns());
-
- Collection* collection = nullptr;
- if (autoDb.getDb()) {
- collection = autoDb.getDb()->getCollection(nsString.ns());
- } else {
+ AutoGetCollection autoColl(txn, nsString, MODE_IX);
+ if (!autoColl.getDb()) {
return {ErrorCodes::NamespaceNotFound,
str::stream() << "database " << dbName << " does not exist."};
}
+ auto css = CollectionShardingState::get(txn, nsString);
+ if (css) {
+ css->checkShardVersionOrThrow(txn);
+ }
+
+ Collection* collection = autoColl.getCollection();
auto statusWithPlanExecutor =
getExecutorUpdate(txn, collection, &parsedUpdate, opDebug);
if (!statusWithPlanExecutor.isOK()) {
@@ -376,7 +374,6 @@ public:
AutoGetOrCreateDb autoDb(txn, dbName, MODE_IX);
Lock::CollectionLock collLock(txn->lockState(), nsString.ns(), MODE_IX);
- Collection* collection = autoDb.getDb()->getCollection(nsString.ns());
// Attach the namespace and database profiling level to the current op.
{
@@ -385,13 +382,17 @@ public:
->enter_inlock(nsString.ns().c_str(), autoDb.getDb()->getProfilingLevel());
}
- ensureShardVersionOKOrThrow(txn, nsString.ns());
+ auto css = CollectionShardingState::get(txn, nsString);
+ if (css) {
+ css->checkShardVersionOrThrow(txn);
+ }
Status isPrimary = checkCanAcceptWritesForDatabase(nsString);
if (!isPrimary.isOK()) {
return appendCommandStatus(result, isPrimary);
}
+ Collection* const collection = autoDb.getDb()->getCollection(nsString.ns());
auto statusWithPlanExecutor = getExecutorDelete(txn, collection, &parsedDelete);
if (!statusWithPlanExecutor.isOK()) {
return appendCommandStatus(result, statusWithPlanExecutor.getStatus());
@@ -435,7 +436,6 @@ public:
AutoGetOrCreateDb autoDb(txn, dbName, MODE_IX);
Lock::CollectionLock collLock(txn->lockState(), nsString.ns(), MODE_IX);
- Collection* collection = autoDb.getDb()->getCollection(nsString.ns());
// Attach the namespace and database profiling level to the current op.
{
@@ -444,13 +444,18 @@ public:
->enter_inlock(nsString.ns().c_str(), autoDb.getDb()->getProfilingLevel());
}
- ensureShardVersionOKOrThrow(txn, nsString.ns());
+ auto css = CollectionShardingState::get(txn, nsString);
+ if (css) {
+ css->checkShardVersionOrThrow(txn);
+ }
Status isPrimary = checkCanAcceptWritesForDatabase(nsString);
if (!isPrimary.isOK()) {
return appendCommandStatus(result, isPrimary);
}
+ Collection* collection = autoDb.getDb()->getCollection(nsString.ns());
+
// Create the collection if it does not exist when performing an upsert
// because the update stage does not create its own collection.
if (!collection && args.isUpsert()) {