summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDianna Hohensee <dianna.hohensee@10gen.com>2015-12-28 19:47:29 -0500
committerDianna Hohensee <dianna.hohensee@10gen.com>2016-01-06 17:52:45 -0500
commite3c8408cce6fe93f0ff6b47deef6152aab083d47 (patch)
tree6baf8c49d4f4ac8d0c878c1572c73646ca5db8df
parent75f38ab0e95f5702c01945aacef81609d55df277 (diff)
downloadmongo-e3c8408cce6fe93f0ff6b47deef6152aab083d47.tar.gz
SERVER-21899 MigrationSourceManager::logOp refactor
-rw-r--r--src/mongo/db/op_observer.cpp25
-rw-r--r--src/mongo/db/s/migration_source_manager.cpp104
-rw-r--r--src/mongo/db/s/migration_source_manager.h35
-rw-r--r--src/mongo/s/d_migrate.cpp40
-rw-r--r--src/mongo/s/d_state.h53
5 files changed, 167 insertions, 90 deletions
diff --git a/src/mongo/db/op_observer.cpp b/src/mongo/db/op_observer.cpp
index b4dacd091c5..a2488ab89a2 100644
--- a/src/mongo/db/op_observer.cpp
+++ b/src/mongo/db/op_observer.cpp
@@ -51,9 +51,9 @@ void OpObserver::onCreateIndex(OperationContext* txn,
BSONObj indexDoc,
bool fromMigrate) {
repl::logOp(txn, "i", ns.c_str(), indexDoc, nullptr, fromMigrate);
-
- getGlobalAuthorizationManager()->logOp(txn, "i", ns.c_str(), indexDoc, nullptr);
- logOpForSharding(txn, "i", ns.c_str(), indexDoc, nullptr, fromMigrate);
+ AuthorizationManager::get(txn->getServiceContext())
+ ->logOp(txn, "i", ns.c_str(), indexDoc, nullptr);
+ logInsertOpForSharding(txn, ns.c_str(), indexDoc, fromMigrate);
logOpForDbHash(txn, ns.c_str());
}
@@ -66,8 +66,8 @@ void OpObserver::onInserts(OperationContext* txn,
const char* ns = nss.ns().c_str();
for (auto it = begin; it != end; it++) {
- getGlobalAuthorizationManager()->logOp(txn, "i", ns, *it, nullptr);
- logOpForSharding(txn, "i", ns, *it, nullptr, fromMigrate);
+ AuthorizationManager::get(txn->getServiceContext())->logOp(txn, "i", ns, *it, nullptr);
+ logInsertOpForSharding(txn, ns, *it, fromMigrate);
}
logOpForDbHash(txn, ns);
@@ -83,9 +83,9 @@ void OpObserver::onUpdate(OperationContext* txn, oplogUpdateEntryArgs args) {
}
repl::logOp(txn, "u", args.ns.c_str(), args.update, &args.criteria, args.fromMigrate);
-
- getGlobalAuthorizationManager()->logOp(txn, "u", args.ns.c_str(), args.update, &args.criteria);
- logOpForSharding(txn, "u", args.ns.c_str(), args.update, &args.criteria, args.fromMigrate);
+ AuthorizationManager::get(txn->getServiceContext())
+ ->logOp(txn, "u", args.ns.c_str(), args.update, &args.criteria);
+ logUpdateOpForSharding(txn, args.ns.c_str(), args.criteria, args.fromMigrate);
logOpForDbHash(txn, args.ns.c_str());
if (strstr(args.ns.c_str(), ".system.js")) {
Scope::storedFuncMod(txn);
@@ -112,15 +112,10 @@ void OpObserver::onDelete(OperationContext* txn,
return;
repl::logOp(txn, "d", ns.ns().c_str(), deleteState.idDoc, nullptr, fromMigrate);
-
AuthorizationManager::get(txn->getServiceContext())
->logOp(txn, "d", ns.ns().c_str(), deleteState.idDoc, nullptr);
- logOpForSharding(txn,
- "d",
- ns.ns().c_str(),
- deleteState.idDoc,
- nullptr,
- fromMigrate || !deleteState.isMigrating);
+ logDeleteOpForSharding(
+ txn, ns.ns().c_str(), deleteState.idDoc, fromMigrate || !deleteState.isMigrating);
logOpForDbHash(txn, ns.ns().c_str());
if (ns.coll() == "system.js") {
Scope::storedFuncMod(txn);
diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp
index 09680bae2a3..882286fdf2b 100644
--- a/src/mongo/db/s/migration_source_manager.cpp
+++ b/src/mongo/db/s/migration_source_manager.cpp
@@ -219,73 +219,91 @@ void MigrationSourceManager::done(OperationContext* txn) {
_cloneLocs.clear();
}
-void MigrationSourceManager::logOp(OperationContext* txn,
- const char* opstr,
- const char* ns,
- const BSONObj& obj,
- BSONObj* patt,
- bool notInActiveChunk) {
+void MigrationSourceManager::logInsertOp(OperationContext* txn,
+ const char* ns,
+ const BSONObj& obj,
+ bool notInActiveChunk) {
ensureShardVersionOKOrThrow(txn, ns);
- const char op = opstr[0];
-
- if (notInActiveChunk) {
- // Ignore writes that came from the migration process like cleanup so they
- // won't be transferred to the recipient shard. Also ignore ops from
- // _migrateClone and _transferMods since it is impossible to move a chunk
- // to self.
- // Also ignore out of range deletes when migrating a chunk (is set
- // in OpObserver::onDelete)
+ if (notInActiveChunk)
return;
- }
dassert(txn->lockState()->isWriteLocked()); // Must have Global IX.
- if (!_active)
+ if (!_active || (_nss != ns))
return;
- if (_nss != ns)
+ BSONElement ide = obj["_id"];
+ if (ide.eoo()) {
+ warning() << "logInsertOp got mod with no _id, ignoring obj: " << obj << migrateLog;
return;
+ }
- // no need to log if this is not an insertion, an update, or an actual deletion
- // note: opstr 'db' isn't a deletion but a mention that a database exists
- // (for replication machinery mostly).
- if (op == 'n' || op == 'c' || (op == 'd' && opstr[1] == 'b'))
+ if (!isInRange(obj, _min, _max, _shardKeyPattern)) {
return;
+ }
+
+ BSONObj idObj(ide.wrap());
- BSONElement ide;
- if (patt)
- ide = patt->getField("_id");
- else
- ide = obj["_id"];
+ txn->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idObj, 'i'));
+}
+
+void MigrationSourceManager::logUpdateOp(OperationContext* txn,
+ const char* ns,
+ const BSONObj& pattern,
+ bool notInActiveChunk) {
+ ensureShardVersionOKOrThrow(txn, ns);
+
+ if (notInActiveChunk)
+ return;
+
+ dassert(txn->lockState()->isWriteLocked()); // Must have Global IX.
+
+ if (!_active || (_nss != ns))
+ return;
+ BSONElement ide = pattern.getField("_id");
if (ide.eoo()) {
- warning() << "logOpForSharding got mod with no _id, ignoring obj: " << obj << migrateLog;
+ warning() << "logUpdateOp got mod with no _id, ignoring obj: " << pattern << migrateLog;
return;
}
+ BSONObj idObj(ide.wrap());
- if (op == 'i' && (!isInRange(obj, _min, _max, _shardKeyPattern))) {
+ BSONObj fullDoc;
+ OldClientContext ctx(txn, _nss.ns(), false);
+ if (!Helpers::findById(txn, ctx.db(), _nss.ns().c_str(), idObj, fullDoc)) {
+ warning() << "logUpdateOp couldn't find: " << idObj << " even though should have"
+ << migrateLog;
+ dassert(false); // TODO: Abort the migration.
return;
}
- BSONObj idObj(ide.wrap());
+ if (!isInRange(fullDoc, _min, _max, _shardKeyPattern)) {
+ return;
+ }
- if (op == 'u') {
- BSONObj fullDoc;
- OldClientContext ctx(txn, _nss.ns(), false);
- if (!Helpers::findById(txn, ctx.db(), _nss.ns().c_str(), idObj, fullDoc)) {
- warning() << "logOpForSharding couldn't find: " << idObj << " even though should have"
- << migrateLog;
- dassert(false); // TODO: Abort the migration.
- return;
- }
+ txn->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idObj, 'u'));
+}
- if (!isInRange(fullDoc, _min, _max, _shardKeyPattern)) {
- return;
- }
+void MigrationSourceManager::logDeleteOp(OperationContext* txn,
+ const char* ns,
+ const BSONObj& obj,
+ bool notInActiveChunk) {
+ ensureShardVersionOKOrThrow(txn, ns);
+
+ if (notInActiveChunk)
+ return;
+
+ dassert(txn->lockState()->isWriteLocked()); // Must have Global IX.
+
+ BSONElement ide = obj["_id"];
+ if (ide.eoo()) {
+ warning() << "logDeleteOp got mod with no _id, ignoring obj: " << obj << migrateLog;
+ return;
}
+ BSONObj idObj(ide.wrap());
- txn->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idObj, op));
+ txn->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idObj, 'd'));
}
bool MigrationSourceManager::isInMigratingChunk(const NamespaceString& ns, const BSONObj& doc) {
diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h
index 076c14fa9f0..f8c5af184cc 100644
--- a/src/mongo/db/s/migration_source_manager.h
+++ b/src/mongo/db/s/migration_source_manager.h
@@ -64,12 +64,35 @@ public:
void done(OperationContext* txn);
- void logOp(OperationContext* txn,
- const char* opstr,
- const char* ns,
- const BSONObj& obj,
- BSONObj* patt,
- bool notInActiveChunk);
+ /**
+ * If a migration for the chunk in 'ns' containing 'obj' is in progress, saves this insert
+ * to the transfer mods log. The entries saved here are later transferred to the receiving
+ * side of the migration.
+ */
+ void logInsertOp(OperationContext* txn,
+ const char* ns,
+ const BSONObj& obj,
+ bool notInActiveChunk);
+
+ /**
+ * If a migration for the chunk in 'ns' containing the document with the _id in 'pattern' is
+ * in progress, saves this update to the transfer mods log. The entries saved here are later
+ * transferred to the receiving side of the migration.
+ */
+ void logUpdateOp(OperationContext* txn,
+ const char* ns,
+ const BSONObj& pattern,
+ bool notInActiveChunk);
+
+ /**
+ * If a migration for the chunk in 'ns' containing 'obj' is in progress, saves this delete
+ * to the transfer mods log. The entries saved here are later transferred to the receiving
+ * side of the migration.
+ */
+ void logDeleteOp(OperationContext* txn,
+ const char* ns,
+ const BSONObj& obj,
+ bool notInActiveChunk);
/**
* Determines whether the given document 'doc' in namespace 'ns' is within the range
diff --git a/src/mongo/s/d_migrate.cpp b/src/mongo/s/d_migrate.cpp
index 9983ccd58df..5fb8c038a5a 100644
--- a/src/mongo/s/d_migrate.cpp
+++ b/src/mongo/s/d_migrate.cpp
@@ -430,25 +430,31 @@ public:
} // namespace
-/**
- * If sharding is enabled, logs the operation for an active migration in the transfer mods log.
- *
- * 'ns' name of the collection in which the operation will occur.
- * 'notInActiveChunk' a true value indicates that either:
- * 1) the delete is coming from a donor shard in a current chunk migration,
- * and so does not need to be entered in this shard's outgoing transfer log.
- * 2) the document is not within this shard's outgoing chunk migration range,
- * and so does not need to be forwarded to the migration recipient via the transfer log.
- */
-void logOpForSharding(OperationContext* txn,
- const char* opstr,
- const char* ns,
- const BSONObj& obj,
- BSONObj* patt,
- bool notInActiveChunk) {
+void logInsertOpForSharding(OperationContext* txn,
+ const char* ns,
+ const BSONObj& obj,
+ bool notInActiveChunk) {
+ ShardingState* shardingState = ShardingState::get(txn);
+ if (shardingState->enabled())
+ shardingState->migrationSourceManager()->logInsertOp(txn, ns, obj, notInActiveChunk);
+}
+
+void logUpdateOpForSharding(OperationContext* txn,
+ const char* ns,
+ const BSONObj& pattern,
+ bool notInActiveChunk) {
+ ShardingState* shardingState = ShardingState::get(txn);
+ if (shardingState->enabled())
+ shardingState->migrationSourceManager()->logUpdateOp(txn, ns, pattern, notInActiveChunk);
+}
+
+void logDeleteOpForSharding(OperationContext* txn,
+ const char* ns,
+ const BSONObj& obj,
+ bool notInActiveChunk) {
ShardingState* shardingState = ShardingState::get(txn);
if (shardingState->enabled())
- shardingState->migrationSourceManager()->logOp(txn, opstr, ns, obj, patt, notInActiveChunk);
+ shardingState->migrationSourceManager()->logDeleteOp(txn, ns, obj, notInActiveChunk);
}
bool isInMigratingChunk(OperationContext* txn, const NamespaceString& ns, const BSONObj& doc) {
diff --git a/src/mongo/s/d_state.h b/src/mongo/s/d_state.h
index ce6136ea17c..7ab53dd5741 100644
--- a/src/mongo/s/d_state.h
+++ b/src/mongo/s/d_state.h
@@ -60,16 +60,51 @@ bool haveLocalShardingInfo(Client* client, const std::string& ns);
void ensureShardVersionOKOrThrow(OperationContext* txn, const std::string& ns);
/**
- * If a migration for the chunk in 'ns' where 'obj' lives is occurring, save this log entry
- * if it's relevant. The entries saved here are later transferred to the receiving side of
- * the migration. A relevant entry is an insertion, a deletion, or an update.
+ * If sharding is enabled, pass the insert along to MigrationSourceManager::logInsertOp
+ * to determine whether the insert should be logged to the migration transfer mods log.
+ *
+ * 'ns' name of the collection in which the operation will occur.
+ * 'obj' document being inserted.
+ * 'notInActiveChunk' if true indicates that the insert is coming from a donor shard
+ * in a current chunk migration, and so does not need to be entered in this shard's
+ * outgoing transfer log.
+ */
+void logInsertOpForSharding(OperationContext* txn,
+ const char* ns,
+ const BSONObj& obj,
+ bool notInActiveChunk);
+
+/**
+ * If sharding is enabled, pass the update along to MigrationSourceManager::logUpdateOp
+ * to determine whether the update should be logged to the migration transfer mods log.
+ *
+ * 'ns' name of the collection in which the operation will occur.
+ * 'pattern' contains the _id value of the doc being updated.
+ * 'notInActiveChunk' if true indicates that the update is coming from a donor shard
+ * in a current chunk migration, and so does not need to be entered in this shard's
+ * outgoing transfer log.
+ */
+void logUpdateOpForSharding(OperationContext* txn,
+ const char* ns,
+ const BSONObj& pattern,
+ bool notInActiveChunk);
+
+/**
+ * If sharding is enabled, pass the delete along to MigrationSourceManager::logDeleteOp
+ * to determine whether the delete should be logged to the migration transfer mods log.
+ *
+ * 'ns' name of the collection in which the operation will occur.
+ * 'obj' contains the _id value of the doc being deleted.
+ * 'notInActiveChunk' a true value indicates that either:
+ * 1) the delete is coming from a donor shard in a current chunk migration,
+ * and so does not need to be entered in this shard's outgoing transfer log.
+ * 2) the document is not within this shard's outgoing chunk migration range,
+ * and so does not need to be forwarded to the migration recipient via the transfer log.
*/
-void logOpForSharding(OperationContext* txn,
- const char* opstr,
- const char* ns,
- const BSONObj& obj,
- BSONObj* patt,
- bool forMigrateCleanup);
+void logDeleteOpForSharding(OperationContext* txn,
+ const char* ns,
+ const BSONObj& obj,
+ bool notInActiveChunk);
/**
* Checks if 'doc' in 'ns' belongs to a currently migrating chunk.