diff options
-rw-r--r-- | jstests/sharding/oplog_document_key.js | 154 | ||||
-rw-r--r-- | src/mongo/db/auth/authz_manager_external_state_mock.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/exec/update.cpp | 60 | ||||
-rw-r--r-- | src/mongo/db/op_observer.h | 2 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/collection_sharding_state.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/s/metadata_manager.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/update/update_driver.cpp | 19 | ||||
-rw-r--r-- | src/mongo/db/update/update_driver.h | 6 |
9 files changed, 184 insertions, 71 deletions
diff --git a/jstests/sharding/oplog_document_key.js b/jstests/sharding/oplog_document_key.js new file mode 100644 index 00000000000..3e3ebaee215 --- /dev/null +++ b/jstests/sharding/oplog_document_key.js @@ -0,0 +1,154 @@ +/** + * Verify that shard key appears in oplog records + * + * The only records that need to be checked are delete and update records, but updates happen on + * various paths that must all be checked. + */ + +(function() { + "use strict"; + + var st = new ShardingTest({name: 'test', shards: {rs0: {nodes: 1}}}); + var db = st.s.getDB('test'); + + assert.commandWorked(st.s.adminCommand({enableSharding: 'test'})); + + // 'test.un' is left unsharded. + assert.commandWorked(db.adminCommand({shardcollection: 'test.byId', key: {_id: 1}})); + assert.commandWorked(db.adminCommand({shardcollection: 'test.byX', key: {x: 1}})); + assert.commandWorked(db.adminCommand({shardcollection: 'test.byXId', key: {x: 1, _id: 1}})); + assert.commandWorked(db.adminCommand({shardcollection: 'test.byIdX', key: {_id: 1, x: 1}})); + + assert.writeOK(db.un.insert({_id: 10, x: 50, y: 60})); + assert.writeOK(db.un.insert({_id: 30, x: 70, y: 80})); + + assert.writeOK(db.byId.insert({_id: 11, x: 51, y: 61})); + assert.writeOK(db.byId.insert({_id: 31, x: 71, y: 81})); + + assert.writeOK(db.byX.insert({_id: 12, x: 52, y: 62})); + assert.writeOK(db.byX.insert({_id: 32, x: 72, y: 82})); + + assert.writeOK(db.byXId.insert({_id: 13, x: 53, y: 63})); + assert.writeOK(db.byXId.insert({_id: 33, x: 73, y: 83})); + + assert.writeOK(db.byIdX.insert({_id: 14, x: 54, y: 64})); + assert.writeOK(db.byIdX.insert({_id: 34, x: 74, y: 84})); + + var oplog = st.rs0.getPrimary().getDB('local').oplog.rs; + + //////////////////////////////////////////////////////////////////////// + jsTest.log("Test update command on 'un'"); + + assert.writeOK(db.un.update({_id: 10, x: 50}, {$set: {y: 70}})); // in place + assert.writeOK(db.un.update({_id: 30, x: 70}, {y: 75})); // replacement + + // unsharded, only _id appears in o2: + + var a = oplog.findOne({ns: 'test.un', op: 'u', 'o2._id': 10}); + assert.docEq(a.o2, {_id: 10}); + + var b = oplog.findOne({ns: 'test.un', op: 'u', 'o2._id': 30}); + assert.docEq(b.o2, {_id: 30}); + + //////////////////////////////////////////////////////////////////////// + jsTest.log("Test update command on 'byId'"); + + assert.writeOK(db.byId.update({_id: 11}, {$set: {y: 71}})); // in place + assert.writeOK(db.byId.update({_id: 31}, {x: 71, y: 76})); // replacement + + // sharded by {_id: 1}: only _id appears in o2: + + a = oplog.findOne({ns: 'test.byId', op: 'u', 'o2._id': 11}); + assert.docEq(a.o2, {_id: 11}); + + b = oplog.findOne({ns: 'test.byId', op: 'u', 'o2._id': 31}); + assert.docEq(b.o2, {_id: 31}); + + //////////////////////////////////////////////////////////////////////// + jsTest.log("Test update command on 'byX'"); + + assert.writeOK(db.byX.update({x: 52}, {$set: {y: 72}})); // in place + assert.writeOK(db.byX.update({x: 72}, {x: 72, y: 77})); // replacement + + // sharded by {x: 1}: x appears in o2, followed by _id: + + a = oplog.findOne({ns: 'test.byX', op: 'u', 'o2._id': 12}); + assert.docEq(a.o2, {x: 52, _id: 12}); + + b = oplog.findOne({ns: 'test.byX', op: 'u', 'o2._id': 32}); + assert.docEq(b.o2, {x: 72, _id: 32}); + + //////////////////////////////////////////////////////////////////////// + jsTest.log("Test update command on 'byXId'"); + + assert.writeOK(db.byXId.update({_id: 13, x: 53}, {$set: {y: 73}})); // in place + assert.writeOK(db.byXId.update({_id: 33, x: 73}, {x: 73, y: 78})); // replacement + + // sharded by {x: 1, _id: 1}: x appears in o2, followed by _id: + + a = oplog.findOne({ns: 'test.byXId', op: 'u', 'o2._id': 13}); + assert.docEq(a.o2, {x: 53, _id: 13}); + + b = oplog.findOne({ns: 'test.byXId', op: 'u', 'o2._id': 33}); + assert.docEq(b.o2, {x: 73, _id: 33}); + + //////////////////////////////////////////////////////////////////////// + jsTest.log("Test update command on 'byIdX'"); + + assert.writeOK(db.byIdX.update({_id: 14, x: 54}, {$set: {y: 74}})); // in place + assert.writeOK(db.byIdX.update({_id: 34, x: 74}, {x: 74, y: 79})); // replacement + + // sharded by {_id: 1, x: 1}: _id appears in o2, followed by x: + + a = oplog.findOne({ns: 'test.byIdX', op: 'u', 'o2._id': 14}); + assert.docEq(a.o2, {_id: 14, x: 54}); + + b = oplog.findOne({ns: 'test.byIdX', op: 'u', 'o2._id': 34}); + assert.docEq(b.o2, {_id: 34, x: 74}); + + //////////////////////////////////////////////////////////////////////// + jsTest.log("Test remove command: 'un'"); + + assert.writeOK(db.un.remove({_id: 10})); + assert.writeOK(db.un.remove({_id: 30})); + + a = oplog.findOne({ns: 'test.un', op: 'd', 'o._id': 10}); + assert.docEq(a.o, {_id: 10}); + b = oplog.findOne({ns: 'test.un', op: 'd', 'o._id': 30}); + assert.docEq(b.o, {_id: 30}); + + //////////////////////////////////////////////////////////////////////// + jsTest.log("Test remove command: 'byX'"); + + assert.writeOK(db.byX.remove({_id: 12})); + assert.writeOK(db.byX.remove({_id: 32})); + + a = oplog.findOne({ns: 'test.byX', op: 'd', 'o._id': 12}); + assert.docEq(a.o, {x: 52, _id: 12}); + b = oplog.findOne({ns: 'test.byX', op: 'd', 'o._id': 32}); + assert.docEq(b.o, {x: 72, _id: 32}); + + //////////////////////////////////////////////////////////////////////// + jsTest.log("Test remove command: 'byXId'"); + + assert.writeOK(db.byXId.remove({_id: 13})); + assert.writeOK(db.byXId.remove({_id: 33})); + + a = oplog.findOne({ns: 'test.byXId', op: 'd', 'o._id': 13}); + assert.docEq(a.o, {x: 53, _id: 13}); + b = oplog.findOne({ns: 'test.byXId', op: 'd', 'o._id': 33}); + assert.docEq(b.o, {x: 73, _id: 33}); + + //////////////////////////////////////////////////////////////////////// + jsTest.log("Test remove command: 'byIdX'"); + + assert.writeOK(db.byIdX.remove({_id: 14})); + assert.writeOK(db.byIdX.remove({_id: 34})); + + a = oplog.findOne({ns: 'test.byIdX', op: 'd', 'o._id': 14}); + assert.docEq(a.o, {_id: 14, x: 54}); + b = oplog.findOne({ns: 'test.byIdX', op: 'd', 'o._id': 34}); + assert.docEq(b.o, {_id: 34, x: 74}); + + st.stop(); +})(); diff --git a/src/mongo/db/auth/authz_manager_external_state_mock.cpp b/src/mongo/db/auth/authz_manager_external_state_mock.cpp index 1d2c92ecd2a..ca77d4184a7 100644 --- a/src/mongo/db/auth/authz_manager_external_state_mock.cpp +++ b/src/mongo/db/auth/authz_manager_external_state_mock.cpp @@ -203,7 +203,7 @@ Status AuthzManagerExternalStateMock::updateOne(OperationContext* opCtx, return status; BSONObj newObj = document.getObject().copy(); *iter = newObj; - BSONObj idQuery = driver.makeOplogEntryQuery(newObj, false); + BSONObj idQuery = newObj["_id"_sd].Obj(); if (_authzManager) { _authzManager->logOp(opCtx, "u", collectionName, logObj, &idQuery); diff --git a/src/mongo/db/exec/update.cpp b/src/mongo/db/exec/update.cpp index 5125368b612..f3a12e32159 100644 --- a/src/mongo/db/exec/update.cpp +++ b/src/mongo/db/exec/update.cpp @@ -46,6 +46,7 @@ #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/s/collection_metadata.h" #include "mongo/db/s/collection_sharding_state.h" +#include "mongo/db/s/metadata_manager.h" #include "mongo/db/service_context.h" #include "mongo/db/update/storage_validation.h" #include "mongo/stdx/memory.h" @@ -288,34 +289,35 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, Reco WriteUnitOfWork wunit(getOpCtx()); RecordId newRecordId; + OplogUpdateEntryArgs args; + if (!request->isExplain()) { + invariant(_collection); + auto* css = CollectionShardingState::get(getOpCtx(), _collection->ns()); + args.nss = _collection->ns(); + args.uuid = _collection->uuid(); + args.stmtId = request->getStmtId(); + args.update = logObj; + args.criteria = css->getMetadata().extractDocumentKey(newObj); + uassert(16980, + "Multi-update operations require all documents to have an '_id' field", + !request->isMulti() || args.criteria.hasField("_id"_sd)); + args.fromMigrate = request->isFromMigration(); + args.storeDocOption = getStoreDocMode(*request); + if (args.storeDocOption == OplogUpdateEntryArgs::StoreDocOption::PreImage) { + args.preImageDoc = oldObj.value().getOwned(); + } + } if (inPlace) { - // Don't actually do the write if this is an explain. if (!request->isExplain()) { - invariant(_collection); newObj = oldObj.value(); const RecordData oldRec(oldObj.value().objdata(), oldObj.value().objsize()); - BSONObj idQuery = driver->makeOplogEntryQuery(newObj, request->isMulti()); - OplogUpdateEntryArgs args; - args.nss = _collection->ns(); - args.uuid = _collection->uuid(); - args.stmtId = request->getStmtId(); - args.update = logObj; - args.criteria = idQuery; - args.fromMigrate = request->isFromMigration(); - args.storeDocOption = getStoreDocMode(*request); - - if (args.storeDocOption == OplogUpdateEntryArgs::StoreDocOption::PreImage) { - args.preImageDoc = oldObj.value().getOwned(); - } + + Snapshotted<RecordData> snap(oldObj.snapshotId(), oldRec); StatusWith<RecordData> newRecStatus = _collection->updateDocumentWithDamages( - getOpCtx(), - recordId, - Snapshotted<RecordData>(oldObj.snapshotId(), oldRec), - source, - _damages, - &args); + getOpCtx(), recordId, std::move(snap), source, _damages, &args); + newObj = uassertStatusOK(std::move(newRecStatus)).releaseToBson(); } @@ -329,23 +331,7 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, Reco << BSONObjMaxUserSize, newObj.objsize() <= BSONObjMaxUserSize); - // Don't actually do the write if this is an explain. if (!request->isExplain()) { - invariant(_collection); - BSONObj idQuery = driver->makeOplogEntryQuery(newObj, request->isMulti()); - OplogUpdateEntryArgs args; - args.nss = _collection->ns(); - args.uuid = _collection->uuid(); - args.stmtId = request->getStmtId(); - args.update = logObj; - args.criteria = idQuery; - args.fromMigrate = request->isFromMigration(); - args.storeDocOption = getStoreDocMode(*request); - - if (args.storeDocOption == OplogUpdateEntryArgs::StoreDocOption::PreImage) { - args.preImageDoc = oldObj.value().getOwned(); - } - newRecordId = _collection->updateDocument(getOpCtx(), recordId, oldObj, diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h index 3ff05ff561d..53c2a79f0e5 100644 --- a/src/mongo/db/op_observer.h +++ b/src/mongo/db/op_observer.h @@ -72,7 +72,7 @@ struct OplogUpdateEntryArgs { BSONObj criteria; // True if this update comes from a chunk migration. - bool fromMigrate; + bool fromMigrate = false; StoreDocOption storeDocOption = StoreDocOption::None; }; diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index be7b8c9be8c..9a6bd8e5be7 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -177,7 +177,7 @@ repl::OpTime replLogDelete(OperationContext* opCtx, OptionalCollectionUUID uuid, Session* session, StmtId stmtId, - CollectionShardingState::DeleteState deleteState, + const CollectionShardingState::DeleteState& deleteState, bool fromMigrate, const boost::optional<BSONObj>& deletedDoc) { OperationSessionInfo sessionInfo; diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp index a5b6073205d..787b59e73bd 100644 --- a/src/mongo/db/s/collection_sharding_state.cpp +++ b/src/mongo/db/s/collection_sharding_state.cpp @@ -320,14 +320,11 @@ void CollectionShardingState::onUpdateOp(OperationContext* opCtx, } auto CollectionShardingState::makeDeleteState(BSONObj const& doc) -> DeleteState { - BSONObj documentKey = getMetadata().extractDocumentKey(doc).getOwned(); - invariant(documentKey.hasField("_id"_sd)); - return {std::move(documentKey), + return {getMetadata().extractDocumentKey(doc).getOwned(), _sourceMgr && _sourceMgr->getCloner()->isDocumentInMigratingChunk(doc)}; } -void CollectionShardingState::onDeleteOp(OperationContext* opCtx, - const CollectionShardingState::DeleteState& deleteState) { +void CollectionShardingState::onDeleteOp(OperationContext* opCtx, DeleteState const& deleteState) { dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { diff --git a/src/mongo/db/s/metadata_manager.cpp b/src/mongo/db/s/metadata_manager.cpp index c7d5c74052e..9e4e2efd88c 100644 --- a/src/mongo/db/s/metadata_manager.cpp +++ b/src/mongo/db/s/metadata_manager.cpp @@ -293,7 +293,8 @@ BSONObj ScopedCollectionMetadata::extractDocumentKey(BSONObj const& doc) const { if (auto id = doc["_id"_sd]) { return key.isEmpty() ? id.wrap() : BSONObjBuilder(std::move(key)).append(id).obj(); } - return key; + // For legacy documents that lack an _id, use the document itself as its key. + return doc; } ScopedCollectionMetadata::~ScopedCollectionMetadata() { diff --git a/src/mongo/db/update/update_driver.cpp b/src/mongo/db/update/update_driver.cpp index 3c6124e4d8e..cfcc2d891fd 100644 --- a/src/mongo/db/update/update_driver.cpp +++ b/src/mongo/db/update/update_driver.cpp @@ -558,25 +558,6 @@ void UpdateDriver::setCollator(const CollatorInterface* collator) { _modOptions.collator = collator; } -BSONObj UpdateDriver::makeOplogEntryQuery(const BSONObj& doc, bool multi) const { - BSONObjBuilder idPattern; - BSONElement id; - // NOTE: If the matching object lacks an id, we'll log - // with the original pattern. This isn't replay-safe. - // It might make sense to suppress the log instead - // if there's no id. - if (doc.getObjectID(id)) { - idPattern.append(id); - return idPattern.obj(); - } else { - uassert(16980, - str::stream() << "Multi-update operations require all documents to " - "have an '_id' field. " - << doc.toString(), - !multi); - return doc; - } -} void UpdateDriver::clear() { for (vector<ModifierInterface*>::iterator it = _mods.begin(); it != _mods.end(); ++it) { delete *it; diff --git a/src/mongo/db/update/update_driver.h b/src/mongo/db/update/update_driver.h index 823e8eafeeb..3a74b1ca753 100644 --- a/src/mongo/db/update/update_driver.h +++ b/src/mongo/db/update/update_driver.h @@ -98,12 +98,6 @@ public: mutablebson::Document& doc) const; /** - * return a BSONObj with the _id field of the doc passed in, or the doc itself. - * If no _id and multi, error. - */ - BSONObj makeOplogEntryQuery(const BSONObj& doc, bool multi) const; - - /** * Executes the update over 'doc'. If any modifier is positional, use 'matchedField' (index of * the array item matched). If 'doc' allows the modifiers to be applied in place and no index * updating is involved, then the modifiers may be applied "in place" over 'doc'. |