diff options
author | Dianna Hohensee <dianna.hohensee@10gen.com> | 2015-12-28 19:47:29 -0500 |
---|---|---|
committer | Dianna Hohensee <dianna.hohensee@10gen.com> | 2016-01-06 17:52:45 -0500 |
commit | e3c8408cce6fe93f0ff6b47deef6152aab083d47 (patch) | |
tree | 6baf8c49d4f4ac8d0c878c1572c73646ca5db8df | |
parent | 75f38ab0e95f5702c01945aacef81609d55df277 (diff) | |
download | mongo-e3c8408cce6fe93f0ff6b47deef6152aab083d47.tar.gz |
SERVER-21899 MigrationSourceManager::logOp refactor
-rw-r--r-- | src/mongo/db/op_observer.cpp | 25 | ||||
-rw-r--r-- | src/mongo/db/s/migration_source_manager.cpp | 104 | ||||
-rw-r--r-- | src/mongo/db/s/migration_source_manager.h | 35 | ||||
-rw-r--r-- | src/mongo/s/d_migrate.cpp | 40 | ||||
-rw-r--r-- | src/mongo/s/d_state.h | 53 |
5 files changed, 167 insertions, 90 deletions
diff --git a/src/mongo/db/op_observer.cpp b/src/mongo/db/op_observer.cpp index b4dacd091c5..a2488ab89a2 100644 --- a/src/mongo/db/op_observer.cpp +++ b/src/mongo/db/op_observer.cpp @@ -51,9 +51,9 @@ void OpObserver::onCreateIndex(OperationContext* txn, BSONObj indexDoc, bool fromMigrate) { repl::logOp(txn, "i", ns.c_str(), indexDoc, nullptr, fromMigrate); - - getGlobalAuthorizationManager()->logOp(txn, "i", ns.c_str(), indexDoc, nullptr); - logOpForSharding(txn, "i", ns.c_str(), indexDoc, nullptr, fromMigrate); + AuthorizationManager::get(txn->getServiceContext()) + ->logOp(txn, "i", ns.c_str(), indexDoc, nullptr); + logInsertOpForSharding(txn, ns.c_str(), indexDoc, fromMigrate); logOpForDbHash(txn, ns.c_str()); } @@ -66,8 +66,8 @@ void OpObserver::onInserts(OperationContext* txn, const char* ns = nss.ns().c_str(); for (auto it = begin; it != end; it++) { - getGlobalAuthorizationManager()->logOp(txn, "i", ns, *it, nullptr); - logOpForSharding(txn, "i", ns, *it, nullptr, fromMigrate); + AuthorizationManager::get(txn->getServiceContext())->logOp(txn, "i", ns, *it, nullptr); + logInsertOpForSharding(txn, ns, *it, fromMigrate); } logOpForDbHash(txn, ns); @@ -83,9 +83,9 @@ void OpObserver::onUpdate(OperationContext* txn, oplogUpdateEntryArgs args) { } repl::logOp(txn, "u", args.ns.c_str(), args.update, &args.criteria, args.fromMigrate); - - getGlobalAuthorizationManager()->logOp(txn, "u", args.ns.c_str(), args.update, &args.criteria); - logOpForSharding(txn, "u", args.ns.c_str(), args.update, &args.criteria, args.fromMigrate); + AuthorizationManager::get(txn->getServiceContext()) + ->logOp(txn, "u", args.ns.c_str(), args.update, &args.criteria); + logUpdateOpForSharding(txn, args.ns.c_str(), args.criteria, args.fromMigrate); logOpForDbHash(txn, args.ns.c_str()); if (strstr(args.ns.c_str(), ".system.js")) { Scope::storedFuncMod(txn); @@ -112,15 +112,10 @@ void OpObserver::onDelete(OperationContext* txn, return; repl::logOp(txn, "d", ns.ns().c_str(), deleteState.idDoc, nullptr, fromMigrate); - AuthorizationManager::get(txn->getServiceContext()) ->logOp(txn, "d", ns.ns().c_str(), deleteState.idDoc, nullptr); - logOpForSharding(txn, - "d", - ns.ns().c_str(), - deleteState.idDoc, - nullptr, - fromMigrate || !deleteState.isMigrating); + logDeleteOpForSharding( + txn, ns.ns().c_str(), deleteState.idDoc, fromMigrate || !deleteState.isMigrating); logOpForDbHash(txn, ns.ns().c_str()); if (ns.coll() == "system.js") { Scope::storedFuncMod(txn); diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index 09680bae2a3..882286fdf2b 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -219,73 +219,91 @@ void MigrationSourceManager::done(OperationContext* txn) { _cloneLocs.clear(); } -void MigrationSourceManager::logOp(OperationContext* txn, - const char* opstr, - const char* ns, - const BSONObj& obj, - BSONObj* patt, - bool notInActiveChunk) { +void MigrationSourceManager::logInsertOp(OperationContext* txn, + const char* ns, + const BSONObj& obj, + bool notInActiveChunk) { ensureShardVersionOKOrThrow(txn, ns); - const char op = opstr[0]; - - if (notInActiveChunk) { - // Ignore writes that came from the migration process like cleanup so they - // won't be transferred to the recipient shard. Also ignore ops from - // _migrateClone and _transferMods since it is impossible to move a chunk - // to self. - // Also ignore out of range deletes when migrating a chunk (is set - // in OpObserver::onDelete) + if (notInActiveChunk) return; - } dassert(txn->lockState()->isWriteLocked()); // Must have Global IX. - if (!_active) + if (!_active || (_nss != ns)) return; - if (_nss != ns) + BSONElement ide = obj["_id"]; + if (ide.eoo()) { + warning() << "logInsertOp got mod with no _id, ignoring obj: " << obj << migrateLog; return; + } - // no need to log if this is not an insertion, an update, or an actual deletion - // note: opstr 'db' isn't a deletion but a mention that a database exists - // (for replication machinery mostly). - if (op == 'n' || op == 'c' || (op == 'd' && opstr[1] == 'b')) + if (!isInRange(obj, _min, _max, _shardKeyPattern)) { return; + } + + BSONObj idObj(ide.wrap()); - BSONElement ide; - if (patt) - ide = patt->getField("_id"); - else - ide = obj["_id"]; + txn->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idObj, 'i')); +} + +void MigrationSourceManager::logUpdateOp(OperationContext* txn, + const char* ns, + const BSONObj& pattern, + bool notInActiveChunk) { + ensureShardVersionOKOrThrow(txn, ns); + + if (notInActiveChunk) + return; + + dassert(txn->lockState()->isWriteLocked()); // Must have Global IX. + + if (!_active || (_nss != ns)) + return; + BSONElement ide = pattern.getField("_id"); if (ide.eoo()) { - warning() << "logOpForSharding got mod with no _id, ignoring obj: " << obj << migrateLog; + warning() << "logUpdateOp got mod with no _id, ignoring obj: " << pattern << migrateLog; return; } + BSONObj idObj(ide.wrap()); - if (op == 'i' && (!isInRange(obj, _min, _max, _shardKeyPattern))) { + BSONObj fullDoc; + OldClientContext ctx(txn, _nss.ns(), false); + if (!Helpers::findById(txn, ctx.db(), _nss.ns().c_str(), idObj, fullDoc)) { + warning() << "logUpdateOp couldn't find: " << idObj << " even though should have" + << migrateLog; + dassert(false); // TODO: Abort the migration. return; } - BSONObj idObj(ide.wrap()); + if (!isInRange(fullDoc, _min, _max, _shardKeyPattern)) { + return; + } - if (op == 'u') { - BSONObj fullDoc; - OldClientContext ctx(txn, _nss.ns(), false); - if (!Helpers::findById(txn, ctx.db(), _nss.ns().c_str(), idObj, fullDoc)) { - warning() << "logOpForSharding couldn't find: " << idObj << " even though should have" - << migrateLog; - dassert(false); // TODO: Abort the migration. - return; - } + txn->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idObj, 'u')); +} - if (!isInRange(fullDoc, _min, _max, _shardKeyPattern)) { - return; - } +void MigrationSourceManager::logDeleteOp(OperationContext* txn, + const char* ns, + const BSONObj& obj, + bool notInActiveChunk) { + ensureShardVersionOKOrThrow(txn, ns); + + if (notInActiveChunk) + return; + + dassert(txn->lockState()->isWriteLocked()); // Must have Global IX. + + BSONElement ide = obj["_id"]; + if (ide.eoo()) { + warning() << "logDeleteOp got mod with no _id, ignoring obj: " << obj << migrateLog; + return; } + BSONObj idObj(ide.wrap()); - txn->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idObj, op)); + txn->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idObj, 'd')); } bool MigrationSourceManager::isInMigratingChunk(const NamespaceString& ns, const BSONObj& doc) { diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h index 076c14fa9f0..f8c5af184cc 100644 --- a/src/mongo/db/s/migration_source_manager.h +++ b/src/mongo/db/s/migration_source_manager.h @@ -64,12 +64,35 @@ public: void done(OperationContext* txn); - void logOp(OperationContext* txn, - const char* opstr, - const char* ns, - const BSONObj& obj, - BSONObj* patt, - bool notInActiveChunk); + /** + * If a migration for the chunk in 'ns' containing 'obj' is in progress, saves this insert + * to the transfer mods log. The entries saved here are later transferred to the receiving + * side of the migration. + */ + void logInsertOp(OperationContext* txn, + const char* ns, + const BSONObj& obj, + bool notInActiveChunk); + + /** + * If a migration for the chunk in 'ns' containing the document with the _id in 'pattern' is + * in progress, saves this update to the transfer mods log. The entries saved here are later + * transferred to the receiving side of the migration. + */ + void logUpdateOp(OperationContext* txn, + const char* ns, + const BSONObj& pattern, + bool notInActiveChunk); + + /** + * If a migration for the chunk in 'ns' containing 'obj' is in progress, saves this delete + * to the transfer mods log. The entries saved here are later transferred to the receiving + * side of the migration. + */ + void logDeleteOp(OperationContext* txn, + const char* ns, + const BSONObj& obj, + bool notInActiveChunk); /** * Determines whether the given document 'doc' in namespace 'ns' is within the range diff --git a/src/mongo/s/d_migrate.cpp b/src/mongo/s/d_migrate.cpp index 9983ccd58df..5fb8c038a5a 100644 --- a/src/mongo/s/d_migrate.cpp +++ b/src/mongo/s/d_migrate.cpp @@ -430,25 +430,31 @@ public: } // namespace -/** - * If sharding is enabled, logs the operation for an active migration in the transfer mods log. - * - * 'ns' name of the collection in which the operation will occur. - * 'notInActiveChunk' a true value indicates that either: - * 1) the delete is coming from a donor shard in a current chunk migration, - * and so does not need to be entered in this shard's outgoing transfer log. - * 2) the document is not within this shard's outgoing chunk migration range, - * and so does not need to be forwarded to the migration recipient via the transfer log. - */ -void logOpForSharding(OperationContext* txn, - const char* opstr, - const char* ns, - const BSONObj& obj, - BSONObj* patt, - bool notInActiveChunk) { +void logInsertOpForSharding(OperationContext* txn, + const char* ns, + const BSONObj& obj, + bool notInActiveChunk) { + ShardingState* shardingState = ShardingState::get(txn); + if (shardingState->enabled()) + shardingState->migrationSourceManager()->logInsertOp(txn, ns, obj, notInActiveChunk); +} + +void logUpdateOpForSharding(OperationContext* txn, + const char* ns, + const BSONObj& pattern, + bool notInActiveChunk) { + ShardingState* shardingState = ShardingState::get(txn); + if (shardingState->enabled()) + shardingState->migrationSourceManager()->logUpdateOp(txn, ns, pattern, notInActiveChunk); +} + +void logDeleteOpForSharding(OperationContext* txn, + const char* ns, + const BSONObj& obj, + bool notInActiveChunk) { ShardingState* shardingState = ShardingState::get(txn); if (shardingState->enabled()) - shardingState->migrationSourceManager()->logOp(txn, opstr, ns, obj, patt, notInActiveChunk); + shardingState->migrationSourceManager()->logDeleteOp(txn, ns, obj, notInActiveChunk); } bool isInMigratingChunk(OperationContext* txn, const NamespaceString& ns, const BSONObj& doc) { diff --git a/src/mongo/s/d_state.h b/src/mongo/s/d_state.h index ce6136ea17c..7ab53dd5741 100644 --- a/src/mongo/s/d_state.h +++ b/src/mongo/s/d_state.h @@ -60,16 +60,51 @@ bool haveLocalShardingInfo(Client* client, const std::string& ns); void ensureShardVersionOKOrThrow(OperationContext* txn, const std::string& ns); /** - * If a migration for the chunk in 'ns' where 'obj' lives is occurring, save this log entry - * if it's relevant. The entries saved here are later transferred to the receiving side of - * the migration. A relevant entry is an insertion, a deletion, or an update. + * If sharding is enabled, pass the insert along to MigrationSourceManager::logInsertOp + * to determine whether the insert should be logged to the migration transfer mods log. + * + * 'ns' name of the collection in which the operation will occur. + * 'obj' document being inserted. + * 'notInActiveChunk' if true indicates that the insert is coming from a donor shard + * in a current chunk migration, and so does not need to be entered in this shard's + * outgoing transfer log. + */ +void logInsertOpForSharding(OperationContext* txn, + const char* ns, + const BSONObj& obj, + bool notInActiveChunk); + +/** + * If sharding is enabled, pass the update along to MigrationSourceManager::logUpdateOp + * to determine whether the update should be logged to the migration transfer mods log. + * + * 'ns' name of the collection in which the operation will occur. + * 'pattern' contains the _id value of the doc being updated. + * 'notInActiveChunk' if true indicates that the update is coming from a donor shard + * in a current chunk migration, and so does not need to be entered in this shard's + * outgoing transfer log. + */ +void logUpdateOpForSharding(OperationContext* txn, + const char* ns, + const BSONObj& pattern, + bool notInActiveChunk); + +/** + * If sharding is enabled, pass the delete along to MigrationSourceManager::logDeleteOp + * to determine whether the delete should be logged to the migration transfer mods log. + * + * 'ns' name of the collection in which the operation will occur. + * 'obj' contains the _id value of the doc being deleted. + * 'notInActiveChunk' a true value indicates that either: + * 1) the delete is coming from a donor shard in a current chunk migration, + * and so does not need to be entered in this shard's outgoing transfer log. + * 2) the document is not within this shard's outgoing chunk migration range, + * and so does not need to be forwarded to the migration recipient via the transfer log. */ -void logOpForSharding(OperationContext* txn, - const char* opstr, - const char* ns, - const BSONObj& obj, - BSONObj* patt, - bool forMigrateCleanup); +void logDeleteOpForSharding(OperationContext* txn, + const char* ns, + const BSONObj& obj, + bool notInActiveChunk); /** * Checks if 'doc' in 'ns' belongs to a currently migrating chunk. |