diff options
author | Randolph Tan <randolph@10gen.com> | 2014-07-15 16:48:59 -0400 |
---|---|---|
committer | Randolph Tan <randolph@10gen.com> | 2014-07-16 13:41:37 -0400 |
commit | 2fd2eebb2656ae65bd6eb63b39f8faeb84e0db4e (patch) | |
tree | e74a56e3d3423d15565d739721f343fafa753798 /src/mongo | |
parent | d134ba27aa7f2c4eb8fc4b1331c8331f5914bfc6 (diff) | |
download | mongo-2fd2eebb2656ae65bd6eb63b39f8faeb84e0db4e.tar.gz |
SERVER-14041 enhance secondaryThrottle parameter
This reverts commit 37f2a1e3b724dbd9e1f8eafd4ac87c5bf613c048 (undo revert).
This reverts commit d60fd22dec1c0bd104622eab463cdbba18bf11a9 (undo revert).
Fix Windows compile failure.
Add more defensive checks.
Diffstat (limited to 'src/mongo')
30 files changed, 859 insertions, 229 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err index 2415fee1c91..95076f74414 100644 --- a/src/mongo/base/error_codes.err +++ b/src/mongo/base/error_codes.err @@ -98,6 +98,8 @@ error_code("NotSecondary", 95) error_code("OperationFailed", 96) error_code("NoProjectionFound", 97) error_code("DBPathInUse", 98) +error_code("WriteConcernNotDefined", 99) +error_code("CannotSatisfyWriteConcern", 100) # Non-sequential error codes (for compatibility only) error_code("NotMaster", 10107) #this comes from assert_util.h diff --git a/src/mongo/db/commands/cleanup_orphaned_cmd.cpp b/src/mongo/db/commands/cleanup_orphaned_cmd.cpp index 89e7f49ab4b..3fd6dbcb9d2 100644 --- a/src/mongo/db/commands/cleanup_orphaned_cmd.cpp +++ b/src/mongo/db/commands/cleanup_orphaned_cmd.cpp @@ -41,11 +41,22 @@ #include "mongo/db/jsobj.h" #include "mongo/db/namespace_string.h" #include "mongo/db/range_deleter_service.h" +#include "mongo/db/repl/repl_coordinator_global.h" #include "mongo/s/collection_metadata.h" #include "mongo/s/d_logic.h" #include "mongo/s/range_arithmetic.h" +#include "mongo/s/type_settings.h" #include "mongo/util/log.h" +namespace { + using mongo::WriteConcernOptions; + + const int kDefaultWTimeoutMs = 60 * 1000; + const WriteConcernOptions DefaultWriteConcern("majority", + WriteConcernOptions::NONE, + kDefaultWTimeoutMs); +} + namespace mongo { MONGO_LOG_DEFAULT_COMPONENT_FILE(::mongo::logger::LogComponent::kCommands); @@ -69,7 +80,7 @@ namespace mongo { CleanupResult cleanupOrphanedData( OperationContext* txn, const NamespaceString& ns, const BSONObj& startingFromKeyConst, - bool secondaryThrottle, + const WriteConcernOptions& secondaryThrottle, BSONObj* stoppedAtKey, string* errMsg ) { @@ -153,6 +164,17 @@ namespace mongo { * the balancer is off. * * Safe to call with the balancer on. + * + * Format: + * + * { + * cleanupOrphaned: <ns>, + * // optional parameters: + * startingAtKey: { <shardKeyValue> }, // defaults to lowest value + * secondaryThrottle: <bool>, // defaults to true + * // defaults to { w: "majority", wtimeout: 60000 }. Applies to individual writes. + * writeConcern: { <writeConcern options> } + * } */ class CleanupOrphanedCommand : public Command { public: @@ -179,7 +201,6 @@ namespace mongo { // Input static BSONField<string> nsField; static BSONField<BSONObj> startingFromKeyField; - static BSONField<bool> secondaryThrottleField; // Output static BSONField<BSONObj> stoppedAtKeyField; @@ -210,12 +231,29 @@ namespace mongo { return false; } - bool secondaryThrottle = true; - if ( !FieldParser::extract( cmdObj, - secondaryThrottleField, - &secondaryThrottle, - &errmsg ) ) { - return false; + WriteConcernOptions writeConcern; + Status status = writeConcern.parseSecondaryThrottle(cmdObj, NULL); + + if (!status.isOK()){ + if (status.code() != ErrorCodes::WriteConcernNotDefined) { + return appendCommandStatus(result, status); + } + + writeConcern = DefaultWriteConcern; + } + else { + repl::ReplicationCoordinator* replCoordinator = + repl::getGlobalReplicationCoordinator(); + Status status = replCoordinator->checkIfWriteConcernCanBeSatisfied(writeConcern); + if (!status.isOK()) { + return appendCommandStatus(result, status); + } + } + + if (writeConcern.shouldWaitForOtherNodes() && + writeConcern.wTimeout == WriteConcernOptions::kNoTimeout) { + // Don't allow no timeout. + writeConcern.wTimeout = kDefaultWTimeoutMs; } if (!shardingState.enabled()) { @@ -225,7 +263,7 @@ namespace mongo { } ChunkVersion shardVersion; - Status status = shardingState.refreshMetadataNow( ns, &shardVersion ); + status = shardingState.refreshMetadataNow( ns, &shardVersion ); if ( !status.isOK() ) { if ( status.code() == ErrorCodes::RemoteChangeDetected ) { warning() << "Shard version in transition detected while refreshing " @@ -242,7 +280,7 @@ namespace mongo { CleanupResult cleanupResult = cleanupOrphanedData( txn, NamespaceString( ns ), startingFromKey, - secondaryThrottle, + writeConcern, &stoppedAtKey, &errmsg ); @@ -263,7 +301,6 @@ namespace mongo { BSONField<string> CleanupOrphanedCommand::nsField( "cleanupOrphaned" ); BSONField<BSONObj> CleanupOrphanedCommand::startingFromKeyField( "startingFromKey" ); - BSONField<bool> CleanupOrphanedCommand::secondaryThrottleField( "secondaryThrottle" ); BSONField<BSONObj> CleanupOrphanedCommand::stoppedAtKeyField( "stoppedAtKey" ); MONGO_INITIALIZER(RegisterCleanupOrphanedCommand)(InitializerContext* context) { diff --git a/src/mongo/db/dbhelpers.cpp b/src/mongo/db/dbhelpers.cpp index 83b5b0b7ec5..8cc0eb0bd1c 100644 --- a/src/mongo/db/dbhelpers.cpp +++ b/src/mongo/db/dbhelpers.cpp @@ -52,6 +52,7 @@ #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/repl_coordinator_global.h" #include "mongo/db/write_concern.h" +#include "mongo/db/write_concern_options.h" #include "mongo/db/operation_context_impl.h" #include "mongo/db/storage_options.h" #include "mongo/db/catalog/collection.h" @@ -305,7 +306,7 @@ namespace mongo { long long Helpers::removeRange( OperationContext* txn, const KeyRange& range, bool maxInclusive, - bool secondaryThrottle, + const WriteConcernOptions& writeConcern, RemoveSaver* callback, bool fromMigrate, bool onlyRemoveOrphanedDocs ) @@ -342,7 +343,7 @@ namespace mongo { Helpers::toKeyFormat( indexKeyPattern.extendRangeBound(range.maxKey,maxInclusive)); LOG(1) << "begin removal of " << min << " to " << max << " in " << ns - << (secondaryThrottle ? " (waiting for secondaries)" : "" ) << endl; + << " with write concern: " << writeConcern.toBSON() << endl; Client& c = cc(); @@ -435,10 +436,7 @@ namespace mongo { // TODO remove once the yielding below that references this timer has been removed Timer secondaryThrottleTime; - if ( secondaryThrottle && numDeleted > 0 ) { - WriteConcernOptions writeConcern; - writeConcern.wNumNodes = 2; - writeConcern.wTimeout = 60 * 1000; + if (writeConcern.shouldWaitForOtherNodes() && numDeleted > 0) { repl::ReplicationCoordinator::StatusAndDuration replStatus = repl::getGlobalReplicationCoordinator()->awaitReplication(txn, c.getLastOp(), @@ -454,7 +452,7 @@ namespace mongo { } } - if ( secondaryThrottle ) + if (writeConcern.shouldWaitForOtherNodes()) log() << "Helpers::removeRangeUnlocked time spent waiting for replication: " << millisWaitingForReplication << "ms" << endl; diff --git a/src/mongo/db/dbhelpers.h b/src/mongo/db/dbhelpers.h index be0ca859248..865543c74f9 100644 --- a/src/mongo/db/dbhelpers.h +++ b/src/mongo/db/dbhelpers.h @@ -41,6 +41,7 @@ namespace mongo { class Collection; class Cursor; class OperationContext; + struct WriteConcernOptions; /** * db helpers are helper functions and classes that let us easily manipulate the local @@ -164,8 +165,8 @@ namespace mongo { */ static long long removeRange( OperationContext* txn, const KeyRange& range, - bool maxInclusive = false, - bool secondaryThrottle = false, + bool maxInclusive, + const WriteConcernOptions& secondaryThrottle, RemoveSaver* callback = NULL, bool fromMigrate = false, bool onlyRemoveOrphanedDocs = false ); diff --git a/src/mongo/db/field_parser.cpp b/src/mongo/db/field_parser.cpp index 40c8c9664eb..47b5b8cf1c0 100644 --- a/src/mongo/db/field_parser.cpp +++ b/src/mongo/db/field_parser.cpp @@ -111,7 +111,7 @@ namespace mongo { { if (elem.eoo()) { if (field.hasDefault()) { - *out = field.getDefault(); + *out = field.getDefault().getOwned(); return FIELD_DEFAULT; } else { diff --git a/src/mongo/db/range_deleter.cpp b/src/mongo/db/range_deleter.cpp index d096cab1219..e75ce96f0bd 100644 --- a/src/mongo/db/range_deleter.cpp +++ b/src/mongo/db/range_deleter.cpp @@ -189,7 +189,7 @@ namespace mongo { const BSONObj& min, const BSONObj& max, const BSONObj& shardKeyPattern, - bool secondaryThrottle, + const WriteConcernOptions& writeConcern, Notification* notifyDone, std::string* errMsg) { string dummy; @@ -199,7 +199,7 @@ namespace mongo { min.getOwned(), max.getOwned(), shardKeyPattern.getOwned(), - secondaryThrottle)); + writeConcern)); toDelete->notifyDone = notifyDone; { @@ -243,10 +243,13 @@ namespace mongo { } namespace { - bool _waitForReplication(OperationContext* txn, std::string* errMsg) { - WriteConcernOptions writeConcern; - writeConcern.wMode = "majority"; - writeConcern.wTimeout = 60 * 60 * 1000; + const int kWTimeoutMillis = 60 * 60 * 1000; + + bool _waitForMajority(OperationContext* txn, std::string* errMsg) { + const WriteConcernOptions writeConcern("majority", + WriteConcernOptions::NONE, + kWTimeoutMillis); + repl::ReplicationCoordinator::StatusAndDuration replStatus = repl::getGlobalReplicationCoordinator()->awaitReplicationOfLastOp(txn, writeConcern); @@ -279,7 +282,7 @@ namespace { const BSONObj& min, const BSONObj& max, const BSONObj& shardKeyPattern, - bool secondaryThrottle, + const WriteConcernOptions& writeConcern, string* errMsg) { if (stopRequested()) { *errMsg = "deleter is already stopped."; @@ -314,7 +317,7 @@ namespace { << " cursors in " << ns << " to finish" << endl; } - RangeDeleteEntry taskDetails(ns, min, max, shardKeyPattern, secondaryThrottle); + RangeDeleteEntry taskDetails(ns, min, max, shardKeyPattern, writeConcern); taskDetails.stats.queueStartTS = jsTime(); Date_t timeSinceLastLog; @@ -373,7 +376,7 @@ namespace { if (result) { taskDetails.stats.waitForReplStartTS = jsTime(); - result = _waitForReplication(txn, errMsg); + result = _waitForMajority(txn, errMsg); taskDetails.stats.waitForReplEndTS = jsTime(); } @@ -555,7 +558,7 @@ namespace { if (delResult) { nextTask->stats.waitForReplStartTS = jsTime(); - if (!_waitForReplication(txn.get(), &errMsg)) { + if (!_waitForMajority(txn.get(), &errMsg)) { warning() << "Error encountered while waiting for replication: " << errMsg; } @@ -662,12 +665,12 @@ namespace { const BSONObj& min, const BSONObj& max, const BSONObj& shardKey, - bool secondaryThrottle): + const WriteConcernOptions& writeConcern): ns(ns), min(min), max(max), shardKeyPattern(shardKey), - secondaryThrottle(secondaryThrottle), + writeConcern(writeConcern), notifyDone(NULL) { } diff --git a/src/mongo/db/range_deleter.h b/src/mongo/db/range_deleter.h index 022f3c84872..cda4578b02b 100644 --- a/src/mongo/db/range_deleter.h +++ b/src/mongo/db/range_deleter.h @@ -39,6 +39,7 @@ #include "mongo/db/clientcursor.h" #include "mongo/db/jsobj.h" #include "mongo/db/operation_context.h" +#include "mongo/db/write_concern_options.h" #include "mongo/util/concurrency/mutex.h" #include "mongo/util/concurrency/synchronization.h" #include "mongo/util/time_support.h" @@ -138,7 +139,7 @@ namespace mongo { const BSONObj& min, const BSONObj& max, const BSONObj& shardKeyPattern, - bool secondaryThrottle, + const WriteConcernOptions& writeConcern, Notification* notifyDone, std::string* errMsg); @@ -154,7 +155,7 @@ namespace mongo { const BSONObj& min, const BSONObj& max, const BSONObj& shardKeyPattern, - bool secondaryThrottle, + const WriteConcernOptions& writeConcern, std::string* errMsg); /** @@ -309,7 +310,7 @@ namespace mongo { const BSONObj& min, const BSONObj& max, const BSONObj& shardKey, - bool secondaryThrottle); + const WriteConcernOptions& writeConcern); const std::string ns; @@ -324,7 +325,7 @@ namespace mongo { // like hash indexes. const BSONObj shardKeyPattern; - const bool secondaryThrottle; + const WriteConcernOptions writeConcern; // Sets of cursors to wait to close until this can be ready // for deletion. diff --git a/src/mongo/db/range_deleter_db_env.cpp b/src/mongo/db/range_deleter_db_env.cpp index 3658f99cd2c..29e96e4ead4 100644 --- a/src/mongo/db/range_deleter_db_env.cpp +++ b/src/mongo/db/range_deleter_db_env.cpp @@ -63,7 +63,7 @@ namespace mongo { const BSONObj inclusiveLower(taskDetails.min); const BSONObj exclusiveUpper(taskDetails.max); const BSONObj keyPattern(taskDetails.shardKeyPattern); - const bool secondaryThrottle(taskDetails.secondaryThrottle); + const WriteConcernOptions writeConcern(taskDetails.writeConcern); const bool initiallyHaveClient = haveClient(); @@ -85,8 +85,6 @@ namespace mongo { << endl; try { - bool throttle = repl::getGlobalReplicationCoordinator()->getReplicationMode() == - repl::ReplicationCoordinator::modeReplSet ? secondaryThrottle : false; *deletedDocs = Helpers::removeRange(txn, KeyRange(ns, @@ -94,7 +92,7 @@ namespace mongo { exclusiveUpper, keyPattern), false, /*maxInclusive*/ - throttle, + writeConcern, serverGlobalParams.moveParanoia ? &removeSaver : NULL, true, /*fromMigrate*/ true); /*onlyRemoveOrphans*/ diff --git a/src/mongo/db/range_deleter_test.cpp b/src/mongo/db/range_deleter_test.cpp index 05ce3918320..c0b50aee619 100644 --- a/src/mongo/db/range_deleter_test.cpp +++ b/src/mongo/db/range_deleter_test.cpp @@ -35,6 +35,7 @@ #include "mongo/db/repl/repl_coordinator_global.h" #include "mongo/db/repl/repl_coordinator_mock.h" #include "mongo/db/repl/repl_settings.h" +#include "mongo/db/write_concern_options.h" #include "mongo/stdx/functional.h" #include "mongo/unittest/unittest.h" @@ -57,6 +58,7 @@ namespace { const int MAX_IMMEDIATE_DELETE_WAIT_SECS = 2; const mongo::repl::ReplSettings replSettings; + const mongo::WriteConcernOptions dummyWriteConcern; // Should not be able to queue deletes if deleter workers were not started. TEST(QueueDelete, CantAfterStop) { @@ -73,7 +75,7 @@ namespace { BSON("x" << 120), BSON("x" << 200), BSON("x" << 1), - true, + dummyWriteConcern, NULL /* notifier not needed */, &errMsg)); ASSERT_FALSE(errMsg.empty()); @@ -93,8 +95,13 @@ namespace { env->addCursorId(ns, 345); Notification notifyDone; - ASSERT_TRUE(deleter.queueDelete(ns, BSON("x" << 0), BSON("x" << 10), BSON("x" << 1), - true, ¬ifyDone, NULL /* errMsg not needed */)); + ASSERT_TRUE(deleter.queueDelete(ns, + BSON("x" << 0), + BSON("x" << 10), + BSON("x" << 1), + dummyWriteConcern, + ¬ifyDone, + NULL /* errMsg not needed */)); env->waitForNthGetCursor(1u); @@ -129,8 +136,13 @@ namespace { env->addCursorId(ns, 345); Notification notifyDone; - ASSERT_TRUE(deleter.queueDelete(ns, BSON("x" << 0), BSON("x" << 10), BSON("x" << 1), - true, ¬ifyDone, NULL /* errMsg not needed */)); + ASSERT_TRUE(deleter.queueDelete(ns, + BSON("x" << 0), + BSON("x" << 10), + BSON("x" << 1), + dummyWriteConcern, + ¬ifyDone, + NULL /* errMsg not needed */)); env->waitForNthGetCursor(1u); @@ -145,9 +157,9 @@ namespace { const BSONObj& min, const BSONObj& max, const BSONObj& shardKeyPattern, - bool secondaryThrottle, + const mongo::WriteConcernOptions& secondaryThrottle, std::string* errMsg) { - deleter->deleteNow(txn, ns, min, max, shardKeyPattern, secondaryThrottle, errMsg); + deleter->deleteNow(txn,ns, min, max, shardKeyPattern, secondaryThrottle, errMsg); } // Should not start delete if the set of cursors that were open when the @@ -171,7 +183,7 @@ namespace { BSON("x" << 0), BSON("x" << 10), BSON("x" << 1), - true, + dummyWriteConcern, &errMsg)); env->waitForNthGetCursor(1u); @@ -220,7 +232,7 @@ namespace { BSON("x" << 0), BSON("x" << 10), BSON("x" << 1), - true, + dummyWriteConcern, &errMsg)); env->waitForNthGetCursor(1u); @@ -262,7 +274,7 @@ namespace { BSON("x" << 10), BSON("x" << 20), BSON("x" << 1), - true, + dummyWriteConcern, ¬ifyDone1, NULL /* don't care errMsg */)); @@ -276,7 +288,7 @@ namespace { BSON("x" << 20), BSON("x" << 30), BSON("x" << 1), - true, + dummyWriteConcern, ¬ifyDone2, NULL /* don't care errMsg */)); @@ -285,7 +297,7 @@ namespace { BSON("x" << 30), BSON("x" << 40), BSON("x" << 1), - true, + dummyWriteConcern, ¬ifyDone3, NULL /* don't care errMsg */)); @@ -358,13 +370,23 @@ namespace { ASSERT_TRUE(errMsg.empty()); errMsg.clear(); - ASSERT_FALSE(deleter.queueDelete(ns, BSON("x" << 120), BSON("x" << 140), BSON("x" << 1), - false, NULL /* notifier not needed */, &errMsg)); + ASSERT_FALSE(deleter.queueDelete(ns, + BSON("x" << 120), + BSON("x" << 140), + BSON("x" << 1), + dummyWriteConcern, + NULL /* notifier not needed */, + &errMsg)); ASSERT_FALSE(errMsg.empty()); errMsg.clear(); - ASSERT_FALSE(deleter.deleteNow(noTxn, ns, BSON("x" << 120), BSON("x" << 140), - BSON("x" << 1), false, &errMsg)); + ASSERT_FALSE(deleter.deleteNow(noTxn, + ns, + BSON("x" << 120), + BSON("x" << 140), + BSON("x" << 1), + dummyWriteConcern, + &errMsg)); ASSERT_FALSE(errMsg.empty()); ASSERT_FALSE(env->deleteOccured()); @@ -410,8 +432,13 @@ namespace { env->addCursorId(ns, 58); Notification notifyDone; - deleter.queueDelete(ns, BSON("x" << 0), BSON("x" << 10), BSON("x" << 1), - false, ¬ifyDone, NULL /* errMsg not needed */); + deleter.queueDelete(ns, + BSON("x" << 0), + BSON("x" << 10), + BSON("x" << 1), + dummyWriteConcern, + ¬ifyDone, + NULL /* errMsg not needed */); string errMsg; ASSERT_FALSE(deleter.addToBlackList(ns, BSON("x" << 5), BSON("x" << 15), &errMsg)); @@ -448,7 +475,7 @@ namespace { BSON("x" << 64), BSON("x" << 70), BSON("x" << 1), - true, + dummyWriteConcern, &delErrMsg)); env->waitForNthPausedDelete(1u); @@ -484,8 +511,13 @@ namespace { ASSERT_FALSE(deleter.removeFromBlackList(ns, BSON("x" << 1234), BSON("x" << 9000))); // Range should still be blacklisted - ASSERT_FALSE(deleter.deleteNow(noTxn, ns, BSON("x" << 2000), BSON("x" << 4000), BSON("x" << 1), - false, NULL /* errMsg not needed */)); + ASSERT_FALSE(deleter.deleteNow(noTxn, + ns, + BSON("x" << 2000), + BSON("x" << 4000), + BSON("x" << 1), + dummyWriteConcern, + NULL /* errMsg not needed */)); deleter.stopWorkers(); } @@ -503,15 +535,25 @@ namespace { ASSERT_TRUE(errMsg.empty()); errMsg.clear(); - ASSERT_FALSE(deleter.deleteNow(noTxn, ns, BSON("x" << 600), BSON("x" << 700), - BSON("x" << 1), false, &errMsg)); + ASSERT_FALSE(deleter.deleteNow(noTxn, + ns, + BSON("x" << 600), + BSON("x" << 700), + BSON("x" << 1), + dummyWriteConcern, + &errMsg)); ASSERT_FALSE(errMsg.empty()); ASSERT_TRUE(deleter.removeFromBlackList(ns, BSON("x" << 500), BSON("x" << 801))); errMsg.clear(); - ASSERT_TRUE(deleter.deleteNow(noTxn, ns, BSON("x" << 600), BSON("x" << 700), - BSON("x" << 1), false, &errMsg)); + ASSERT_TRUE(deleter.deleteNow(noTxn, + ns, + BSON("x" << 600), + BSON("x" << 700), + BSON("x" << 1), + dummyWriteConcern, + &errMsg)); ASSERT_TRUE(errMsg.empty()); deleter.stopWorkers(); @@ -527,8 +569,13 @@ namespace { deleter.addToBlackList("foo.bar", BSON("x" << 100), BSON("x" << 200), NULL /* errMsg not needed */); - ASSERT_TRUE(deleter.deleteNow(noTxn, "test.user", BSON("x" << 120), BSON("x" << 140), - BSON("x" << 1), true, NULL /* errMsg not needed */)); + ASSERT_TRUE(deleter.deleteNow(noTxn, + "test.user", + BSON("x" << 120), + BSON("x" << 140), + BSON("x" << 1), + dummyWriteConcern, + NULL /* errMsg not needed */)); deleter.stopWorkers(); } diff --git a/src/mongo/db/repl/repl_coordinator.h b/src/mongo/db/repl/repl_coordinator.h index cf9b20630e6..000476fd4e8 100644 --- a/src/mongo/db/repl/repl_coordinator.h +++ b/src/mongo/db/repl/repl_coordinator.h @@ -201,6 +201,17 @@ namespace repl { virtual bool canAcceptWritesForDatabase(const StringData& dbName) = 0; /** + * Checks if the current replica set configuration can satisfy the given write concern. + * + * Things that are taken into consideration include: + * 1. If the set has enough members. + * 2. If the tag exists. + * 3. If there are enough members for the tag specified. + */ + virtual Status checkIfWriteConcernCanBeSatisfied( + const WriteConcernOptions& writeConcern) const = 0; + + /* * Returns Status::OK() if it is valid for this node to serve reads on the given collection * and an errorcode indicating why the node cannot if it cannot. */ diff --git a/src/mongo/db/repl/repl_coordinator_impl.cpp b/src/mongo/db/repl/repl_coordinator_impl.cpp index 9949b10a673..5da1a1deb39 100644 --- a/src/mongo/db/repl/repl_coordinator_impl.cpp +++ b/src/mongo/db/repl/repl_coordinator_impl.cpp @@ -446,5 +446,12 @@ namespace repl { // TODO return std::vector<BSONObj>(); } + + Status ReplicationCoordinatorImpl::checkIfWriteConcernCanBeSatisfied( + const WriteConcernOptions& writeConcern) const { + // TODO + return Status::OK(); + } + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/repl_coordinator_impl.h b/src/mongo/db/repl/repl_coordinator_impl.h index 095c3897a70..62e043f9d0c 100644 --- a/src/mongo/db/repl/repl_coordinator_impl.h +++ b/src/mongo/db/repl/repl_coordinator_impl.h @@ -89,6 +89,9 @@ namespace repl { virtual bool canAcceptWritesForDatabase(const StringData& database); + virtual Status checkIfWriteConcernCanBeSatisfied( + const WriteConcernOptions& writeConcern) const; + virtual Status canServeReadsFor(const NamespaceString& ns, bool slaveOk); virtual bool shouldIgnoreUniqueIndex(const IndexDescriptor* idx); diff --git a/src/mongo/db/repl/repl_coordinator_legacy.cpp b/src/mongo/db/repl/repl_coordinator_legacy.cpp index f4be0d26369..17c30dfb9e0 100644 --- a/src/mongo/db/repl/repl_coordinator_legacy.cpp +++ b/src/mongo/db/repl/repl_coordinator_legacy.cpp @@ -944,5 +944,19 @@ namespace { return repl::getHostsWrittenTo(op); } + Status LegacyReplicationCoordinator::checkIfWriteConcernCanBeSatisfied( + const WriteConcernOptions& writeConcern) const { + // TODO: rewrite this method with the correct version. Note that this just a + // temporary stub for secondary throttle. + + if (getReplicationMode() == ReplicationCoordinator::modeReplSet) { + if (writeConcern.wNumNodes > 1 && theReplSet->config().getMajority() <= 1) { + return Status(ErrorCodes::CannotSatisfyWriteConcern, "not enough nodes"); + } + } + + return Status::OK(); + } + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/repl_coordinator_legacy.h b/src/mongo/db/repl/repl_coordinator_legacy.h index a9095f80a09..f9cf15be96c 100644 --- a/src/mongo/db/repl/repl_coordinator_legacy.h +++ b/src/mongo/db/repl/repl_coordinator_legacy.h @@ -79,6 +79,9 @@ namespace repl { virtual bool canAcceptWritesForDatabase(const StringData& dbName); + virtual Status checkIfWriteConcernCanBeSatisfied( + const WriteConcernOptions& writeConcern) const; + virtual Status canServeReadsFor(const NamespaceString& ns, bool slaveOk); virtual bool shouldIgnoreUniqueIndex(const IndexDescriptor* idx); diff --git a/src/mongo/db/repl/repl_coordinator_mock.cpp b/src/mongo/db/repl/repl_coordinator_mock.cpp index b7b712bd0d4..aab04be4938 100644 --- a/src/mongo/db/repl/repl_coordinator_mock.cpp +++ b/src/mongo/db/repl/repl_coordinator_mock.cpp @@ -223,5 +223,11 @@ namespace repl { // TODO return std::vector<BSONObj>(); } + + Status ReplicationCoordinatorMock::checkIfWriteConcernCanBeSatisfied( + const WriteConcernOptions& writeConcern) const { + return Status::OK(); + } + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/repl_coordinator_mock.h b/src/mongo/db/repl/repl_coordinator_mock.h index 260ed685705..1ac42d4a248 100644 --- a/src/mongo/db/repl/repl_coordinator_mock.h +++ b/src/mongo/db/repl/repl_coordinator_mock.h @@ -82,6 +82,9 @@ namespace repl { virtual bool canAcceptWritesForDatabase(const StringData& dbName); + virtual Status checkIfWriteConcernCanBeSatisfied( + const WriteConcernOptions& writeConcern) const; + virtual Status canServeReadsFor(const NamespaceString& ns, bool slaveOk); virtual bool shouldIgnoreUniqueIndex(const IndexDescriptor* idx); diff --git a/src/mongo/db/write_concern_options.cpp b/src/mongo/db/write_concern_options.cpp index 848c4e4464c..90690b87599 100644 --- a/src/mongo/db/write_concern_options.cpp +++ b/src/mongo/db/write_concern_options.cpp @@ -27,7 +27,9 @@ #include "mongo/db/write_concern_options.h" +#include "mongo/bson/bson_field.h" #include "mongo/client/dbclientinterface.h" +#include "mongo/db/field_parser.h" namespace mongo { @@ -36,6 +38,27 @@ namespace mongo { const BSONObj WriteConcernOptions::AllConfigs = BSONObj(); const BSONObj WriteConcernOptions::Unacknowledged(BSON("w" << W_NONE)); + static const BSONField<bool> mongosSecondaryThrottleField("_secondaryThrottle", true); + static const BSONField<bool> secondaryThrottleField("secondaryThrottle", true); + static const BSONField<BSONObj> writeConcernField("writeConcern"); + + WriteConcernOptions::WriteConcernOptions(int numNodes, + SyncMode sync, + int timeout): + syncMode(sync), + wNumNodes(numNodes), + wTimeout(timeout) { + } + + WriteConcernOptions::WriteConcernOptions(const std::string& mode, + SyncMode sync, + int timeout): + syncMode(sync), + wNumNodes(0), + wMode(mode), + wTimeout(timeout) { + } + Status WriteConcernOptions::parse( const BSONObj& obj ) { if ( obj.isEmpty() ) { return Status( ErrorCodes::FailedToParse, "write concern object cannot be empty" ); @@ -86,4 +109,84 @@ namespace mongo { return Status::OK(); } + + Status WriteConcernOptions::parseSecondaryThrottle(const BSONObj& doc, + BSONObj* rawWriteConcernObj) { + string errMsg; + bool isSecondaryThrottle; + FieldParser::FieldState fieldState = FieldParser::extract(doc, + secondaryThrottleField, + &isSecondaryThrottle, + &errMsg); + if (fieldState == FieldParser::FIELD_INVALID) { + return Status(ErrorCodes::FailedToParse, errMsg); + } + + if (fieldState != FieldParser::FIELD_SET) { + fieldState = FieldParser::extract(doc, + mongosSecondaryThrottleField, + &isSecondaryThrottle, + &errMsg); + + if (fieldState == FieldParser::FIELD_INVALID) { + return Status(ErrorCodes::FailedToParse, errMsg); + } + } + + BSONObj dummyBSON; + if (!rawWriteConcernObj) { + rawWriteConcernObj = &dummyBSON; + } + + fieldState = FieldParser::extract(doc, + writeConcernField, + rawWriteConcernObj, + &errMsg); + if (fieldState == FieldParser::FIELD_INVALID) { + return Status(ErrorCodes::FailedToParse, errMsg); + } + + if (!isSecondaryThrottle) { + if (!rawWriteConcernObj->isEmpty()) { + return Status(ErrorCodes::UnsupportedFormat, + "Cannot have write concern when secondary throttle is false"); + } + + wNumNodes = 1; + return Status::OK(); + } + + if (rawWriteConcernObj->isEmpty()) { + return Status(ErrorCodes::WriteConcernNotDefined, + "Secondary throttle is on, but write concern is not specified"); + } + + return parse(*rawWriteConcernObj); + } + + BSONObj WriteConcernOptions::toBSON() const { + BSONObjBuilder builder; + + if (wMode.empty()) { + builder.append("w", wNumNodes); + } + else { + builder.append("w", wMode); + } + + if (syncMode == FSYNC) { + builder.append("fsync", true); + } + else if (syncMode == JOURNAL) { + builder.append("j", true); + } + + builder.append("wtimeout", wTimeout); + + return builder.obj(); + } + + bool WriteConcernOptions::shouldWaitForOtherNodes() const { + return !wMode.empty() || wNumNodes > 1; + } } diff --git a/src/mongo/db/write_concern_options.h b/src/mongo/db/write_concern_options.h index 44c67b2938b..31b0f556c30 100644 --- a/src/mongo/db/write_concern_options.h +++ b/src/mongo/db/write_concern_options.h @@ -37,8 +37,11 @@ namespace mongo { struct WriteConcernOptions { public: + enum SyncMode { NONE, FSYNC, JOURNAL }; + static const int kNoTimeout = 0; static const int kNoWaiting = -1; + static const BSONObj Default; static const BSONObj Acknowledged; static const BSONObj AllConfigs; @@ -46,8 +49,43 @@ namespace mongo { WriteConcernOptions() { reset(); } + WriteConcernOptions(int numNodes, + SyncMode sync, + int timeout); + + WriteConcernOptions(const std::string& mode, + SyncMode sync, + int timeout); + Status parse( const BSONObj& obj ); + /** + * Extracts the write concern settings from the BSONObj. The BSON object should have + * the format: + * + * { + * ... + * secondaryThrottle: <bool>, // optional + * _secondaryThrottle: <bool>, // optional + * writeConcern: <BSONObj> // optional + * } + * + * Note: secondaryThrottle takes precedence over _secondaryThrottle. + * + * Also sets output parameter rawWriteConcernObj if the writeCocnern field exists. + * + * Returns OK if the parse was successful. Also returns ErrorCodes::WriteConcernNotDefined + * when secondary throttle is true but write concern was not specified. + */ + Status parseSecondaryThrottle(const BSONObj& doc, + BSONObj* rawWriteConcernObj); + + /** + * Return true if the server needs to wait for other secondary nodes to satisfy this + * write concern setting. Errs on the false positive for non-empty wMode. + */ + bool shouldWaitForOtherNodes() const; + void reset() { syncMode = NONE; wNumNodes = 0; @@ -55,11 +93,18 @@ namespace mongo { wTimeout = 0; } - enum SyncMode { NONE, FSYNC, JOURNAL } syncMode; + // Returns the BSON representation of this object. + // Warning: does not return the same object passed on the last parse() call. + BSONObj toBSON() const; + + SyncMode syncMode; + // The w parameter for this write concern. The wMode represents the string format and + // takes precedence over the numeric format wNumNodes. int wNumNodes; std::string wMode; + // Timeout in milliseconds. int wTimeout; }; diff --git a/src/mongo/dbtests/dbhelper_tests.cpp b/src/mongo/dbtests/dbhelper_tests.cpp index 33d190ed34c..5877cfc85a2 100644 --- a/src/mongo/dbtests/dbhelper_tests.cpp +++ b/src/mongo/dbtests/dbhelper_tests.cpp @@ -30,6 +30,7 @@ #include "mongo/db/catalog/collection.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/operation_context_impl.h" +#include "mongo/db/write_concern_options.h" #include "mongo/dbtests/dbtests.h" #include "mongo/unittest/unittest.h" @@ -68,7 +69,8 @@ namespace mongo { BSON( "_id" << _min ), BSON( "_id" << _max ), BSON( "_id" << 1 ) ); - Helpers::removeRange( &txn, range ); + mongo::WriteConcernOptions dummyWriteConcern; + Helpers::removeRange(&txn, range, false, dummyWriteConcern); wunit.commit(); } diff --git a/src/mongo/s/balance.cpp b/src/mongo/s/balance.cpp index b2be36d2113..c4cb7042f79 100644 --- a/src/mongo/s/balance.cpp +++ b/src/mongo/s/balance.cpp @@ -33,6 +33,7 @@ #include "mongo/client/dbclientcursor.h" #include "mongo/db/jsobj.h" #include "mongo/db/write_concern.h" +#include "mongo/db/write_concern_options.h" #include "mongo/s/chunk.h" #include "mongo/s/cluster_write.h" #include "mongo/s/config.h" @@ -61,7 +62,7 @@ namespace mongo { } int Balancer::_moveChunks(const vector<CandidateChunkPtr>* candidateChunks, - bool secondaryThrottle, + const WriteConcernOptions* writeConcern, bool waitForDelete) { int movedCount = 0; @@ -98,7 +99,7 @@ namespace mongo { BSONObj res; if (c->moveAndCommit(Shard::make(chunkInfo.to), Chunk::MaxChunkSize, - secondaryThrottle, + writeConcern, waitForDelete, 0, /* maxTimeMS */ res)) { @@ -456,9 +457,18 @@ namespace mongo { // refresh chunk size (even though another balancer might be active) Chunk::refreshChunkSize(); - BSONObj balancerConfig; + SettingsType balancerConfig; + string errMsg; + + if (!grid.getBalancerSettings(&balancerConfig, &errMsg)) { + warning() << errMsg; + return ; + } + // now make sure we should even be running - if ( ! grid.shouldBalance( "", &balancerConfig ) ) { + if (balancerConfig.isKeySet() && // balancer config doc exists + !grid.shouldBalance(balancerConfig)) { + LOG(1) << "skipping balancing round because balancing is disabled" << endl; // Ping again so scripts can determine if we're active without waiting @@ -494,20 +504,26 @@ namespace mongo { continue; } - LOG(1) << "*** start balancing round" << endl; - - bool waitForDelete = false; - if (balancerConfig["_waitForDelete"].trueValue()) { - waitForDelete = balancerConfig["_waitForDelete"].trueValue(); - } - - bool secondaryThrottle = true; // default to on - if ( balancerConfig[SettingsType::secondaryThrottle()].type() ) { - secondaryThrottle = balancerConfig[SettingsType::secondaryThrottle()].trueValue(); + const bool waitForDelete = (balancerConfig.isWaitForDeleteSet() ? + balancerConfig.getWaitForDelete() : false); + + scoped_ptr<WriteConcernOptions> writeConcern; + if (balancerConfig.isKeySet()) { // if balancer doc exists. + StatusWith<WriteConcernOptions*> extractStatus = + balancerConfig.extractWriteConcern(); + if (extractStatus.isOK()) { + writeConcern.reset(extractStatus.getValue()); + } + else { + warning() << extractStatus.toString(); + } } - LOG(1) << "waitForDelete: " << waitForDelete << endl; - LOG(1) << "secondaryThrottle: " << secondaryThrottle << endl; + LOG(1) << "*** start balancing round. " + << "waitForDelete: " << waitForDelete + << ", secondaryThrottle: " + << (writeConcern.get() ? writeConcern->toBSON().toString() : "default") + << endl; vector<CandidateChunkPtr> candidateChunks; _doBalanceRound( conn.conn() , &candidateChunks ); @@ -517,7 +533,7 @@ namespace mongo { } else { _balancedLastTime = _moveChunks(&candidateChunks, - secondaryThrottle, + writeConcern.get(), waitForDelete ); } diff --git a/src/mongo/s/balance.h b/src/mongo/s/balance.h index 4e97c38b4f4..8c3e23711d3 100644 --- a/src/mongo/s/balance.h +++ b/src/mongo/s/balance.h @@ -38,6 +38,8 @@ namespace mongo { + struct WriteConcernOptions; + /** * The balancer is a background task that tries to keep the number of chunks across all servers of the cluster even. Although * every mongos will have one balancer running, only one of them will be active at the any given point in time. The balancer @@ -96,12 +98,12 @@ namespace mongo { * Issues chunk migration request, one at a time. * * @param candidateChunks possible chunks to move - * @param secondaryThrottle wait for secondaries to catch up before pushing more deletes + * @param writeConcern detailed write concern. NULL means the default write concern. * @param waitForDelete wait for deletes to complete after each chunk move * @return number of chunks effectively moved */ int _moveChunks(const std::vector<CandidateChunkPtr>* candidateChunks, - bool secondaryThrottle, + const WriteConcernOptions* writeConcern, bool waitForDelete); /** diff --git a/src/mongo/s/chunk.cpp b/src/mongo/s/chunk.cpp index 4aef8141a88..d6611f4fdd7 100644 --- a/src/mongo/s/chunk.cpp +++ b/src/mongo/s/chunk.cpp @@ -56,6 +56,7 @@ #include "mongo/db/query/query_planner.h" #include "mongo/db/query/query_planner_common.h" #include "mongo/db/query/index_bounds_builder.h" +#include "mongo/db/write_concern_options.h" namespace mongo { @@ -355,37 +356,50 @@ namespace mongo { bool Chunk::moveAndCommit(const Shard& to, long long chunkSize /* bytes */, - bool secondaryThrottle, + const WriteConcernOptions* writeConcern, bool waitForDelete, int maxTimeMS, - BSONObj& res) const - { + BSONObj& res) const { uassert( 10167 , "can't move shard to its current location!" , getShard() != to ); - log() << "moving chunk ns: " << _manager->getns() << " moving ( " << toString() << ") " << _shard.toString() << " -> " << to.toString() << endl; + log() << "moving chunk ns: " << _manager->getns() << " moving ( " << toString() << ") " + << _shard.toString() << " -> " << to.toString() << endl; Shard from = _shard; - ScopedDbConnection fromconn(from.getConnString()); - bool worked = fromconn->runCommand( "admin" , - BSON( "moveChunk" << _manager->getns() << - "from" << from.getAddress().toString() << - "to" << to.getAddress().toString() << - // NEEDED FOR 2.0 COMPATIBILITY - "fromShard" << from.getName() << - "toShard" << to.getName() << - /////////////////////////////// - "min" << _min << - "max" << _max << - "maxChunkSizeBytes" << chunkSize << - "shardId" << genID() << - "configdb" << configServer.modelServer() << - "secondaryThrottle" << secondaryThrottle << - "waitForDelete" << waitForDelete << - LiteParsedQuery::cmdOptionMaxTimeMS << maxTimeMS - ) , - res); + BSONObjBuilder builder; + builder.append("moveChunk", _manager->getns()); + builder.append("from", from.getAddress().toString()); + builder.append("to", to.getAddress().toString()); + // NEEDED FOR 2.0 COMPATIBILITY + builder.append("fromShard", from.getName()); + builder.append("toShard", to.getName()); + /////////////////////////////// + builder.append("min", _min); + builder.append("max", _max); + builder.append("maxChunkSizeBytes", chunkSize); + builder.append("shardId", genID()); + builder.append("configdb", configServer.modelServer()); + + // For legacy secondary throttle setting. + bool secondaryThrottle = true; + if (writeConcern && + writeConcern->wNumNodes <= 1 && + writeConcern->wMode.empty()) { + secondaryThrottle = false; + } + + builder.append("secondaryThrottle", secondaryThrottle); + + if (secondaryThrottle && writeConcern) { + builder.append("writeConcern", writeConcern->toBSON()); + } + + builder.append("waitForDelete", waitForDelete); + builder.append(LiteParsedQuery::cmdOptionMaxTimeMS, maxTimeMS); + + bool worked = fromconn->runCommand("admin", builder.done(), res); fromconn.done(); LOG( worked ? 1 : 0 ) << "moveChunk result: " << res << endl; @@ -450,7 +464,8 @@ namespace mongo { _dataWritten = 0; // we're splitting, so should wait a bit } - bool shouldBalance = grid.shouldBalance( _manager->getns() ); + const bool shouldBalance = grid.getConfigShouldBalance() && + grid.getCollShouldBalance(_manager->getns()); log() << "autosplitted " << _manager->getns() << " shard: " << toString() @@ -489,11 +504,13 @@ namespace mongo { log().stream() << "moving chunk (auto): " << toMove << " to: " << newLocation.toString() << endl; BSONObj res; + + WriteConcernOptions noThrottle; massert( 10412 , str::stream() << "moveAndCommit failed: " << res , toMove->moveAndCommit( newLocation , MaxChunkSize , - false , /* secondaryThrottle - small chunk, no need */ + &noThrottle, /* secondaryThrottle */ false, /* waitForDelete - small chunk, no need */ 0, /* maxTimeMS - don't time out */ res ) ); diff --git a/src/mongo/s/chunk.h b/src/mongo/s/chunk.h index f8f18df1474..5c60137e718 100644 --- a/src/mongo/s/chunk.h +++ b/src/mongo/s/chunk.h @@ -46,6 +46,7 @@ namespace mongo { class ChunkRange; class ChunkManager; class ChunkObjUnitTest; + struct WriteConcernOptions; typedef shared_ptr<const Chunk> ChunkPtr; @@ -172,7 +173,7 @@ namespace mongo { * * @param to shard to move this chunk to * @param chunSize maximum number of bytes beyond which the migrate should no go trhough - * @param secondaryThrottle whether during migrate all writes should block for repl + * @param writeConcern detailed write concern. NULL means the default write concern. * @param waitForDelete whether chunk move should wait for cleanup or return immediately * @param maxTimeMS max time for the migrate request * @param res the object containing details about the migrate execution @@ -180,7 +181,7 @@ namespace mongo { */ bool moveAndCommit(const Shard& to, long long chunkSize, - bool secondaryThrottle, + const WriteConcernOptions* writeConcern, bool waitForDelete, int maxTimeMS, BSONObj& res) const; diff --git a/src/mongo/s/commands_admin.cpp b/src/mongo/s/commands_admin.cpp index 79372173622..aa66197d0fa 100644 --- a/src/mongo/s/commands_admin.cpp +++ b/src/mongo/s/commands_admin.cpp @@ -48,6 +48,7 @@ #include "mongo/db/stats/counters.h" #include "mongo/db/wire_version.h" #include "mongo/db/write_concern.h" +#include "mongo/db/write_concern_options.h" #include "mongo/s/chunk.h" #include "mongo/s/client_info.h" #include "mongo/s/cluster_write.h" @@ -772,8 +773,9 @@ namespace mongo { continue; BSONObj moveResult; + WriteConcernOptions noThrottle; if (!chunk->moveAndCommit(to, Chunk::MaxChunkSize, - false, true, 0, moveResult)) { + &noThrottle, true, 0, moveResult)) { warning().stream() << "Couldn't move chunk " << chunk << " to shard " << to << " while sharding collection " << ns << ". Reason: " @@ -1133,10 +1135,23 @@ namespace mongo { return false; } + scoped_ptr<WriteConcernOptions> writeConcern(new WriteConcernOptions()); + Status status = writeConcern->parseSecondaryThrottle(cmdObj, NULL); + + if (!status.isOK()){ + if (status.code() != ErrorCodes::WriteConcernNotDefined) { + errmsg = status.toString(); + return false; + } + + // Let the shard decide what write concern to use. + writeConcern.reset(); + } + BSONObj res; if (!c->moveAndCommit(to, maxChunkSizeBytes, - cmdObj["_secondaryThrottle"].trueValue(), + writeConcern.get(), cmdObj["_waitForDelete"].trueValue(), maxTimeMS.getValue(), res)) { diff --git a/src/mongo/s/d_migrate.cpp b/src/mongo/s/d_migrate.cpp index 2fb6aa8be8f..414bc337249 100644 --- a/src/mongo/s/d_migrate.cpp +++ b/src/mongo/s/d_migrate.cpp @@ -88,6 +88,31 @@ using namespace std; +namespace { + using mongo::WriteConcernOptions; + using mongo::repl::ReplicationCoordinator; + + const int kDefaultWTimeoutMs = 60 * 1000; + const WriteConcernOptions DefaultWriteConcern(2, WriteConcernOptions::NONE, kDefaultWTimeoutMs); + + /** + * Returns the default write concern for migration cleanup (at donor shard) and + * cloning documents (at recipient shard). + */ + WriteConcernOptions getDefaultWriteConcern() { + ReplicationCoordinator* replCoordinator = + mongo::repl::getGlobalReplicationCoordinator(); + mongo::Status status = + replCoordinator->checkIfWriteConcernCanBeSatisfied(DefaultWriteConcern); + + if (status.isOK()) { + return DefaultWriteConcern; + } + + return WriteConcernOptions(1, WriteConcernOptions::NONE, 0); + } +} + namespace mongo { MONGO_LOG_DEFAULT_COMPONENT_FILE(::mongo::logger::LogComponent::kSharding); @@ -746,6 +771,24 @@ namespace mongo { * called to initial a move * usually by a mongos * this is called on the "from" side + * + * Format: + * { + * moveChunk: "namespace", + * from: "hostAndPort", + * fromShard: "shardName", + * to: "hostAndPort", + * toShard: "shardName", + * min: {}, + * max: {}, + * maxChunkBytes: numeric, + * shardId: "_id of chunk document in config.chunks", + * configdb: "hostAndPort", + * + * // optional + * secondaryThrottle: bool, //defaults to true. + * writeConcern: {} // applies to individual writes. + * } */ class MoveChunkCommand : public Command { public: @@ -802,30 +845,35 @@ namespace mongo { to = cmdObj["toShard"].String(); } - // if we do a w=2 after every write - bool secondaryThrottle = cmdObj["secondaryThrottle"].trueValue(); - if ( secondaryThrottle ) { - const repl::ReplicationCoordinator::Mode replMode = - repl::getGlobalReplicationCoordinator()->getReplicationMode(); - if (replMode == repl::ReplicationCoordinator::modeReplSet) { - if (repl::theReplSet->config().getMajority() <= 1) { - secondaryThrottle = false; - warning() << "not enough nodes in set to use secondaryThrottle: " - << " majority: " << repl::theReplSet->config().getMajority() - << endl; - } - } - else if (replMode == repl::ReplicationCoordinator::modeNone) { - secondaryThrottle = false; - warning() << "secondaryThrottle selected but no replication" << endl; + // Process secondary throttle settings and assign defaults if necessary. + BSONObj secThrottleObj; + WriteConcernOptions writeConcern; + Status status = writeConcern.parseSecondaryThrottle(cmdObj, &secThrottleObj); + + if (!status.isOK()){ + if (status.code() != ErrorCodes::WriteConcernNotDefined) { + warning() << status.toString() << endl; + return appendCommandStatus(result, status); } - else { - // master/slave - secondaryThrottle = false; - warning() << "secondaryThrottle not allowed with master/slave" << endl; + + writeConcern = getDefaultWriteConcern(); + } + else { + repl::ReplicationCoordinator* replCoordinator = + repl::getGlobalReplicationCoordinator(); + Status status = replCoordinator->checkIfWriteConcernCanBeSatisfied(writeConcern); + if (!status.isOK()) { + warning() << status.toString() << endl; + return appendCommandStatus(result, status); } } + if (writeConcern.shouldWaitForOtherNodes() && + writeConcern.wTimeout == WriteConcernOptions::kNoTimeout) { + // Don't allow no timeout. + writeConcern.wTimeout = kDefaultWTimeoutMs; + } + // Do inline deletion bool waitForDelete = cmdObj["waitForDelete"].trueValue(); if (waitForDelete) { @@ -1056,19 +1104,27 @@ namespace mongo { ScopedDbConnection connTo(toShard.getConnString()); BSONObj res; bool ok; + + const bool isSecondaryThrottle(writeConcern.shouldWaitForOtherNodes()); + + BSONObjBuilder recvChunkStartBuilder; + recvChunkStartBuilder.append("_recvChunkStart", ns); + recvChunkStartBuilder.append("from", fromShard.getConnString()); + recvChunkStartBuilder.append("fromShardName", fromShard.getName()); + recvChunkStartBuilder.append("toShardName", toShard.getName()); + recvChunkStartBuilder.append("min", min); + recvChunkStartBuilder.append("max", max); + recvChunkStartBuilder.append("shardKeyPattern", shardKeyPattern); + recvChunkStartBuilder.append("configServer", configServer.modelServer()); + recvChunkStartBuilder.append("secondaryThrottle", isSecondaryThrottle); + + // Follow the same convention in moveChunk. + if (isSecondaryThrottle && !secThrottleObj.isEmpty()) { + recvChunkStartBuilder.append("writeConcern", secThrottleObj); + } + try{ - ok = connTo->runCommand( "admin" , - BSON( "_recvChunkStart" << ns << - "from" << fromShard.getConnString() << - "fromShardName" << fromShard.getName() << - "toShardName" << toShard.getName() << - "min" << min << - "max" << max << - "shardKeyPattern" << shardKeyPattern << - "configServer" << configServer.modelServer() << - "secondaryThrottle" << secondaryThrottle - ) , - res ); + ok = connTo->runCommand("admin", recvChunkStartBuilder.done(), res); } catch( DBException& e ){ errmsg = str::stream() << "moveChunk could not contact to: shard " @@ -1501,7 +1557,7 @@ namespace mongo { min.getOwned(), max.getOwned(), shardKeyPattern.getOwned(), - secondaryThrottle, + writeConcern, &errMsg)) { log() << "Error occured while performing cleanup: " << errMsg << endl; } @@ -1514,7 +1570,7 @@ namespace mongo { min.getOwned(), max.getOwned(), shardKeyPattern.getOwned(), - secondaryThrottle, + writeConcern, NULL, // Don't want to be notified. &errMsg)) { log() << "could not queue migration cleanup: " << errMsg << endl; @@ -1728,7 +1784,7 @@ namespace mongo { long long num = Helpers::removeRange( txn, range, false, /*maxInclusive*/ - secondaryThrottle, /* secondaryThrottle */ + writeConcern, /*callback*/ serverGlobalParams.moveParanoia ? &rs : 0, true ); /* flag fromMigrate in oplog */ @@ -1760,7 +1816,7 @@ namespace mongo { State currentState = getState(); if (currentState == FAIL || currentState == ABORT) { string errMsg; - if (!getDeleter()->queueDelete(ns, min, max, shardKeyPattern, secondaryThrottle, + if (!getDeleter()->queueDelete(ns, min, max, shardKeyPattern, writeConcern, NULL /* notifier */, &errMsg)) { warning() << "Failed to queue delete for migrate abort: " << errMsg << endl; } @@ -1820,18 +1876,15 @@ namespace mongo { numCloned++; clonedBytes += o.objsize(); - if ( secondaryThrottle && thisTime > 0 ) { - WriteConcernOptions writeConcern; - writeConcern.wNumNodes = 2; - writeConcern.wTimeout = 60 * 1000; + if (writeConcern.shouldWaitForOtherNodes() && thisTime > 0) { repl::ReplicationCoordinator::StatusAndDuration replStatus = repl::getGlobalReplicationCoordinator()->awaitReplication( txn, cc().getLastOp(), writeConcern); if (replStatus.status.code() == ErrorCodes::ExceededTimeLimit) { - warning() << "secondaryThrottle on, but doc insert timed out " - "after 60 seconds, continuing"; + warning() << "secondaryThrottle on, but doc insert timed out; " + "continuing"; } else { massertStatusOK(replStatus.status); @@ -2044,10 +2097,12 @@ namespace mongo { // TODO: create a better interface to remove objects directly KeyRange range( ns, id, id, idIndexPattern ); + const WriteConcernOptions singleNodeWrite(1, WriteConcernOptions::NONE, + WriteConcernOptions::kNoTimeout); Helpers::removeRange( txn, range , true , /*maxInclusive*/ - false , /* secondaryThrottle */ + singleNodeWrite, serverGlobalParams.moveParanoia ? &rs : 0 , /*callback*/ true ); /*fromMigrate*/ @@ -2212,7 +2267,7 @@ namespace mongo { long long clonedBytes; long long numCatchup; long long numSteady; - bool secondaryThrottle; + WriteConcernOptions writeConcern; int replSetMajorityCount; @@ -2238,6 +2293,25 @@ namespace mongo { cc().shutdown(); } + /** + * Command for initiating the recipient side of the migration to start copying data + * from the donor shard. + * + * { + * _recvChunkStart: "namespace", + * congfigServer: "hostAndPort", + * from: "hostAndPort", + * fromShardName: "shardName", + * toShardName: "shardName", + * min: {}, + * max: {}, + * shardKeyPattern: {}, + * + * // optional + * secondaryThrottle: bool, // defaults to true + * writeConcern: {} // applies to individual writes. + * } + */ class RecvChunkStartCommand : public ChunkCommandHelper { public: void help(stringstream& h) const { h << "internal"; } @@ -2304,7 +2378,37 @@ namespace mongo { migrateStatus.min = min; migrateStatus.max = max; migrateStatus.epoch = currentVersion.epoch(); - migrateStatus.secondaryThrottle = cmdObj["secondaryThrottle"].trueValue(); + + // Process secondary throttle settings and assign defaults if necessary. + WriteConcernOptions writeConcern; + status = writeConcern.parseSecondaryThrottle(cmdObj, NULL); + + if (!status.isOK()){ + if (status.code() != ErrorCodes::WriteConcernNotDefined) { + warning() << status.toString() << endl; + return appendCommandStatus(result, status); + } + + writeConcern = getDefaultWriteConcern(); + } + else { + repl::ReplicationCoordinator* replCoordinator = + repl::getGlobalReplicationCoordinator(); + Status status = replCoordinator->checkIfWriteConcernCanBeSatisfied(writeConcern); + if (!status.isOK()) { + warning() << status.toString() << endl; + return appendCommandStatus(result, status); + } + } + + if (writeConcern.shouldWaitForOtherNodes() && + writeConcern.wTimeout == WriteConcernOptions::kNoTimeout) { + // Don't allow no timeout. + writeConcern.wTimeout = kDefaultWTimeoutMs; + } + + migrateStatus.writeConcern = writeConcern; + if (cmdObj.hasField("shardKeyPattern")) { migrateStatus.shardKeyPattern = cmdObj["shardKeyPattern"].Obj().getOwned(); } else { @@ -2323,12 +2427,6 @@ namespace mongo { migrateStatus.shardKeyPattern = keya.getOwned(); } - if (migrateStatus.secondaryThrottle && - !repl::getGlobalReplicationCoordinator()->isReplEnabled()) { - warning() << "secondaryThrottle asked for, but no replication is enabled" << endl; - migrateStatus.secondaryThrottle = false; - } - // Set the TO-side migration to active migrateStatus.prepare(); diff --git a/src/mongo/s/grid.cpp b/src/mongo/s/grid.cpp index 49804a42359..ae687832cc6 100644 --- a/src/mongo/s/grid.cpp +++ b/src/mongo/s/grid.cpp @@ -498,48 +498,75 @@ namespace mongo { * Returns whether balancing is enabled, with optional namespace "ns" parameter for balancing on a particular * collection. */ - bool Grid::shouldBalance( const string& ns, BSONObj* balancerDocOut ) const { + bool Grid::shouldBalance(const SettingsType& balancerSettings) const { // Allow disabling the balancer for testing - if ( MONGO_FAIL_POINT(neverBalance) ) return false; + if (MONGO_FAIL_POINT(neverBalance)) return false; - ScopedDbConnection conn(configServer.getPrimary().getConnString(), 30); + if (balancerSettings.isBalancerStoppedSet() && balancerSettings.getBalancerStopped()) { + return false; + } + + if (balancerSettings.isBalancerActiveWindowSet()) { + boost::posix_time::ptime now = boost::posix_time::second_clock::local_time(); + return _inBalancingWindow(balancerSettings.getBalancerActiveWindow(), now); + } + + return true; + } + + bool Grid::getBalancerSettings(SettingsType* settings, string* errMsg) const { BSONObj balancerDoc; - BSONObj collDoc; + ScopedDbConnection conn(configServer.getPrimary().getConnString(), 30); try { - // look for the stop balancer marker - balancerDoc = conn->findOne( SettingsType::ConfigNS, - BSON( SettingsType::key("balancer") ) ); - if( ns.size() > 0 ) collDoc = conn->findOne(CollectionType::ConfigNS, - BSON( CollectionType::ns(ns))); + balancerDoc = conn->findOne(SettingsType::ConfigNS, + BSON(SettingsType::key("balancer"))); conn.done(); } - catch( DBException& e ){ - conn.kill(); - warning() << "could not determine whether balancer should be running, error getting" - "config data from " << conn.getHost() << causedBy( e ) << endl; - // if anything goes wrong, we shouldn't try balancing + catch (const DBException& ex) { + *errMsg = str::stream() << "failed to read balancer settings from " << conn.getHost() + << ": " << causedBy(ex); return false; } - if ( balancerDocOut ) - *balancerDocOut = balancerDoc; + return settings->parseBSON(balancerDoc, errMsg); + } + + bool Grid::getConfigShouldBalance() const { + SettingsType balSettings; + string errMsg; - boost::posix_time::ptime now = boost::posix_time::second_clock::local_time(); - if ( _balancerStopped( balancerDoc ) || ! _inBalancingWindow( balancerDoc , now ) ) { + if (!getBalancerSettings(&balSettings, &errMsg)) { + warning() << errMsg; return false; } - if( collDoc["noBalance"].trueValue() ) return false; - return true; + if (!balSettings.isKeySet()) { + // Balancer settings doc does not exist. Default to yes. + return true; + } + + return shouldBalance(balSettings); } - bool Grid::_balancerStopped( const BSONObj& balancerDoc ) { - // check the 'stopped' marker maker - // if present, it is a simple bool - BSONElement stoppedElem = balancerDoc[SettingsType::balancerStopped()]; - return stoppedElem.trueValue(); + bool Grid::getCollShouldBalance(const std::string& ns) const { + BSONObj collDoc; + ScopedDbConnection conn(configServer.getPrimary().getConnString(), 30); + + try { + collDoc = conn->findOne(CollectionType::ConfigNS, BSON(CollectionType::ns(ns))); + conn.done(); + } + catch (const DBException& e){ + conn.kill(); + warning() << "could not determine whether balancer should be running, error getting" + << "config data from " << conn.getHost() << causedBy(e) << endl; + // if anything goes wrong, we shouldn't try balancing + return false; + } + + return !collDoc[CollectionType::noBalance()].trueValue(); } bool Grid::_inBalancingWindow( const BSONObj& balancerDoc , const boost::posix_time::ptime& now ) { diff --git a/src/mongo/s/grid.h b/src/mongo/s/grid.h index e68918abce8..672072170dd 100644 --- a/src/mongo/s/grid.h +++ b/src/mongo/s/grid.h @@ -35,7 +35,8 @@ #include "mongo/util/time_support.h" #include "mongo/util/concurrency/mutex.h" -#include "config.h" // DBConfigPtr +#include "mongo/s/config.h" // DBConfigPtr +#include "mongo/s/type_settings.h" namespace mongo { @@ -107,9 +108,29 @@ namespace mongo { bool knowAboutShard( const std::string& name ) const; /** - * @return true if the chunk balancing functionality is enabled + * Returns true if the balancer should be running. Caller is responsible + * for making sure settings has the balancer key. */ - bool shouldBalance( const std::string& ns = "", BSONObj* balancerDocOut = 0 ) const; + bool shouldBalance(const SettingsType& balancerSettings) const; + + /** + * Retrieve the balancer settings from the config server. Returns false if an error + * occurred while retrieving the document. If the balancer settings document does not + * exist, it is not considered as an error, but the "key" property of the settings + * output parameter will not be set. + */ + bool getBalancerSettings(SettingsType* settings, string* errMsg) const; + + /** + * Returns true if the config server settings indicate that the balancer should be active. + */ + bool getConfigShouldBalance() const; + + /** + * Returns true if the given collection can be balanced based on the config.collections + * document. + */ + bool getCollShouldBalance(const std::string& ns) const; /** * @@ -149,14 +170,6 @@ namespace mongo { * @return whether a give dbname is used for shard "local" databases (e.g., admin or local) */ static bool _isSpecialLocalDB( const std::string& dbName ); - - /** - * @param balancerDoc bson that may contain a marker to stop the balancer - * format { ... , stopped: [ "true" | "false" ] , ... } - * @return true if the marker is present and is set to true - */ - static bool _balancerStopped( const BSONObj& balancerDoc ); - }; extern Grid grid; diff --git a/src/mongo/s/type_settings.cpp b/src/mongo/s/type_settings.cpp index 08ada19d680..2c2f12bc7bd 100644 --- a/src/mongo/s/type_settings.cpp +++ b/src/mongo/s/type_settings.cpp @@ -28,6 +28,7 @@ #include "mongo/s/type_settings.h" #include "mongo/db/field_parser.h" +#include "mongo/db/write_concern_options.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/time_support.h" @@ -36,12 +37,16 @@ namespace mongo { using mongoutils::str::stream; const std::string SettingsType::ConfigNS = "config.settings"; + const std::string SettingsType::BalancerDocKey("balancer"); + const std::string SettingsType::ChunkSizeDocKey("chunksize"); const BSONField<std::string> SettingsType::key("_id"); const BSONField<int> SettingsType::chunksize("value"); const BSONField<bool> SettingsType::balancerStopped("stopped"); const BSONField<BSONObj> SettingsType::balancerActiveWindow("activeWindow"); - const BSONField<bool> SettingsType::secondaryThrottle("_secondaryThrottle"); + const BSONField<bool> SettingsType::deprecated_secondaryThrottle("_secondaryThrottle", true); + const BSONField<BSONObj> SettingsType::migrationWriteConcern("_secondaryThrottle"); + const BSONField<bool> SettingsType::waitForDelete("_waitForDelete"); SettingsType::SettingsType() { clear(); @@ -62,7 +67,7 @@ namespace mongo { return false; } - if (_key == "chunksize") { + if (_key == ChunkSizeDocKey) { if (_isChunksizeSet) { if (!(_chunksize > 0)) { *errMsg = stream() << "chunksize specified in " << chunksize.name() << @@ -76,7 +81,7 @@ namespace mongo { } return true; } - else if (_key == "balancer") { + else if (_key == BalancerDocKey) { if (_balancerActiveWindow.nFields() != 0) { // check if both 'start' and 'stop' are present const std::string start = _balancerActiveWindow["start"].str(); @@ -94,6 +99,12 @@ namespace mongo { " format is { start: \"hh:mm\" , stop: \"hh:mm\" }"; return false; } + + if (_isSecondaryThrottleSet && _isMigrationWriteConcernSet) { + *errMsg = stream() << "cannot have both secondary throttle and migration " + << "write concern set at the same time"; + return false; + } } return true; } @@ -112,8 +123,13 @@ namespace mongo { if (_isBalancerActiveWindowSet) { builder.append(balancerActiveWindow(), _balancerActiveWindow); } - if (_isSecondaryThrottleSet) builder.append(secondaryThrottle(), _secondaryThrottle); + if (_isSecondaryThrottleSet) { + builder.append(deprecated_secondaryThrottle(), _secondaryThrottle); + } + if (_isMigrationWriteConcernSet) { + builder.append(migrationWriteConcern(), _migrationWriteConcern); + } return builder.obj(); } @@ -141,9 +157,23 @@ namespace mongo { if (fieldState == FieldParser::FIELD_INVALID) return false; _isBalancerActiveWindowSet = fieldState == FieldParser::FIELD_SET; - fieldState = FieldParser::extract(source, secondaryThrottle, &_secondaryThrottle, errMsg); + fieldState = FieldParser::extract(source, + migrationWriteConcern, + &_migrationWriteConcern, + errMsg); + _isMigrationWriteConcernSet = fieldState == FieldParser::FIELD_SET; + + if (fieldState == FieldParser::FIELD_INVALID) { + fieldState = FieldParser::extract(source, deprecated_secondaryThrottle, + &_secondaryThrottle, errMsg); + if (fieldState == FieldParser::FIELD_INVALID) return false; + errMsg->clear(); // Note: extract method doesn't clear errMsg. + _isSecondaryThrottleSet = fieldState == FieldParser::FIELD_SET; + } + + fieldState = FieldParser::extract(source, waitForDelete, &_waitForDelete, errMsg); if (fieldState == FieldParser::FIELD_INVALID) return false; - _isSecondaryThrottleSet = fieldState == FieldParser::FIELD_SET; + _isWaitForDeleteSet = fieldState == FieldParser::FIELD_SET; return true; } @@ -162,12 +192,14 @@ namespace mongo { _balancerActiveWindow = BSONObj(); _isBalancerActiveWindowSet = false; - _shortBalancerSleep = false; - _isShortBalancerSleepSet = false; - _secondaryThrottle = false; _isSecondaryThrottleSet = false; + _migrationWriteConcern = BSONObj(); + _isMigrationWriteConcernSet= false; + + _waitForDelete = false; + _isWaitForDeleteSet = false; } void SettingsType::cloneTo(SettingsType* other) const { @@ -182,19 +214,49 @@ namespace mongo { other->_balancerStopped = _balancerStopped; other->_isBalancerStoppedSet = _isBalancerStoppedSet; - other->_balancerActiveWindow = _balancerActiveWindow; + other->_balancerActiveWindow = _balancerActiveWindow.copy(); other->_isBalancerActiveWindowSet = _isBalancerActiveWindowSet; - other->_shortBalancerSleep = _shortBalancerSleep; - other->_isShortBalancerSleepSet = _isShortBalancerSleepSet; - other->_secondaryThrottle = _secondaryThrottle; other->_isSecondaryThrottleSet = _isSecondaryThrottleSet; + other->_migrationWriteConcern = _migrationWriteConcern.copy(); + other->_isMigrationWriteConcernSet = _isMigrationWriteConcernSet; + + other->_waitForDelete = _waitForDelete; + other->_isWaitForDeleteSet = _isWaitForDeleteSet; } std::string SettingsType::toString() const { return toBSON().toString(); } + StatusWith<WriteConcernOptions*> SettingsType::extractWriteConcern() const { + dassert(_isKeySet); + dassert(_key == BalancerDocKey); + + const bool isSecondaryThrottle = getSecondaryThrottle(); + if (!isSecondaryThrottle) { + return(StatusWith<WriteConcernOptions*>( + new WriteConcernOptions(1, WriteConcernOptions::NONE, 0))); + } + + const BSONObj migrationWOption(isMigrationWriteConcernSet() ? + getMigrationWriteConcern() : BSONObj()); + + if (migrationWOption.isEmpty()) { + // Default setting. + return StatusWith<WriteConcernOptions*>(NULL); + } + + auto_ptr<WriteConcernOptions> writeConcern(new WriteConcernOptions()); + Status status = writeConcern->parse(migrationWOption); + + if (!status.isOK()) { + return StatusWith<WriteConcernOptions*>(status); + } + + return StatusWith<WriteConcernOptions*>(writeConcern.release()); + } + } // namespace mongo diff --git a/src/mongo/s/type_settings.h b/src/mongo/s/type_settings.h index fa8a76e0480..114e2dfadf5 100644 --- a/src/mongo/s/type_settings.h +++ b/src/mongo/s/type_settings.h @@ -31,11 +31,14 @@ #include <string> #include "mongo/base/disallow_copying.h" +#include "mongo/base/status_with.h" #include "mongo/base/string_data.h" #include "mongo/db/jsobj.h" namespace mongo { + struct WriteConcernOptions; + /** * This class represents the layout and contents of documents contained in the * config.settings collection. All manipulation of documents coming from that @@ -56,6 +59,10 @@ namespace mongo { * } * // use 'exampleType' * + * It is the responsibility of the caller to make sure that this object has + * the right "key" when calling the getter and serialize methods. The key is + * used for identifying the type of document this object represents (for example, + * balancer settings or chunk size settings document). */ class SettingsType { MONGO_DISALLOW_COPYING(SettingsType); @@ -67,13 +74,17 @@ namespace mongo { // Name of the settings collection in the config server. static const std::string ConfigNS; + static const std::string BalancerDocKey; + static const std::string ChunkSizeDocKey; // Field names and types in the settings collection type. static const BSONField<std::string> key; static const BSONField<int> chunksize; static const BSONField<bool> balancerStopped; static const BSONField<BSONObj> balancerActiveWindow; - static const BSONField<bool> secondaryThrottle; + static const BSONField<bool> deprecated_secondaryThrottle; + static const BSONField<BSONObj> migrationWriteConcern; + static const BSONField<bool> waitForDelete; // // settings type methods @@ -149,6 +160,7 @@ namespace mongo { // Calling get*() methods when the member is not set and has no default results in undefined // behavior int getChunksize() const { + dassert(_key == ChunkSizeDocKey); if (_isChunksizeSet) { return _chunksize; } else { @@ -170,6 +182,7 @@ namespace mongo { // Calling get*() methods when the member is not set and has no default results in undefined // behavior bool getBalancerStopped() const { + dassert(_key == BalancerDocKey); if (_isBalancerStoppedSet) { return _balancerStopped; } else { @@ -191,6 +204,7 @@ namespace mongo { // Calling get*() methods when the member is not set and has no default results in undefined // behavior BSONObj getBalancerActiveWindow() const { + dassert(_key == BalancerDocKey); if (_isBalancerActiveWindowSet) { return _balancerActiveWindow; } else { @@ -207,20 +221,72 @@ namespace mongo { void unsetSecondaryThrottle() { _isSecondaryThrottleSet = false; } bool isSecondaryThrottleSet() const { - return _isSecondaryThrottleSet || secondaryThrottle.hasDefault(); + return _isSecondaryThrottleSet || deprecated_secondaryThrottle.hasDefault(); } // Calling get*() methods when the member is not set and has no default results in undefined // behavior bool getSecondaryThrottle() const { + dassert(_key == BalancerDocKey); if (_isSecondaryThrottleSet) { return _secondaryThrottle; } else { - dassert(secondaryThrottle.hasDefault()); - return secondaryThrottle.getDefault(); + dassert(deprecated_secondaryThrottle.hasDefault()); + return deprecated_secondaryThrottle.getDefault(); } } + void setMigrationWriteConcern(const BSONObj& writeConcern) { + _migrationWriteConcern = writeConcern; + _isMigrationWriteConcernSet = true; + } + + void unsetMigrationWriteConcern() { + _isMigrationWriteConcernSet = false; + } + + bool isMigrationWriteConcernSet() const { + return _isMigrationWriteConcernSet; + } + + // Calling get*() methods when the member is not set and has no default results in undefined + // behavior + BSONObj getMigrationWriteConcern() const { + dassert(_key == BalancerDocKey); + dassert (_isMigrationWriteConcernSet); + return _migrationWriteConcern; + } + + void setWaitForDelete(bool waitForDelete) { + _waitForDelete = waitForDelete; + _isWaitForDeleteSet = true; + } + + void unsetWaitForDelete() { + _isWaitForDeleteSet = false; + } + + bool isWaitForDeleteSet() const { + return _isWaitForDeleteSet; + } + + // Calling get*() methods when the member is not set and has no default results in undefined + // behavior + bool getWaitForDelete() const { + dassert(_key == BalancerDocKey); + dassert (_isWaitForDeleteSet); + return _waitForDelete; + } + + // Helper methods + + /** + * Extract the write concern settings from this settings. This is only valid when + * key is "balancer". Returns NULL if secondary throttle is true but write + * concern is not specified. + */ + StatusWith<WriteConcernOptions*> extractWriteConcern() const; + private: // Convention: (M)andatory, (O)ptional, (S)pecial rule. std::string _key; // (M) key determining the type of options to use @@ -238,11 +304,19 @@ namespace mongo { // Format: { start: "08:00" , stop: // "19:30" }, strftime format is %H:%M - bool _shortBalancerSleep; // (O) controls how long the balancer sleeps - bool _isShortBalancerSleepSet; // in some situations bool _secondaryThrottle; // (O) only migrate chunks as fast as at least bool _isSecondaryThrottleSet; // one secondary can keep up with + + // (O) detailed write concern for *individual* writes during migration. + // From side: deletes during cleanup. + // To side: deletes to clear the incoming range, deletes to undo migration at abort, + // and writes during cloning. + BSONObj _migrationWriteConcern; + bool _isMigrationWriteConcernSet; + + bool _waitForDelete; // (O) synchronous migration cleanup. + bool _isWaitForDeleteSet; }; } // namespace mongo diff --git a/src/mongo/s/type_settings_test.cpp b/src/mongo/s/type_settings_test.cpp index 9b605464794..651c53d056c 100644 --- a/src/mongo/s/type_settings_test.cpp +++ b/src/mongo/s/type_settings_test.cpp @@ -91,10 +91,10 @@ namespace { ASSERT_EQUALS(settings.getChunksize(), 1); BSONObj objBalancer = BSON(SettingsType::key("balancer") << - SettingsType::balancerStopped(true) << - SettingsType::balancerActiveWindow(BSON("start" << "23:00" << - "stop" << "6:00" )) << - SettingsType::secondaryThrottle(true)); + SettingsType::balancerStopped(true) << + SettingsType::balancerActiveWindow(BSON("start" << "23:00" << + "stop" << "6:00" )) << + SettingsType::migrationWriteConcern(BSON("w" << 2))); ASSERT(settings.parseBSON(objBalancer, &errMsg)); ASSERT_EQUALS(errMsg, ""); ASSERT_TRUE(settings.isValid(NULL)); @@ -102,7 +102,28 @@ namespace { ASSERT_EQUALS(settings.getBalancerStopped(), true); ASSERT_EQUALS(settings.getBalancerActiveWindow(), BSON("start" << "23:00" << "stop" << "6:00" )); - ASSERT_EQUALS(settings.getSecondaryThrottle(), true); + ASSERT(settings.getSecondaryThrottle()); + ASSERT_EQUALS(0, settings.getMigrationWriteConcern().woCompare(BSON("w" << 2))); + } + + TEST(Validity, ValidWithDeprecatedThrottle) { + SettingsType settings; + BSONObj objChunksize = BSON(SettingsType::key("chunksize") << + SettingsType::chunksize(1)); + string errMsg; + ASSERT(settings.parseBSON(objChunksize, &errMsg)); + ASSERT_EQUALS(errMsg, ""); + ASSERT_TRUE(settings.isValid(NULL)); + ASSERT_EQUALS(settings.getKey(), "chunksize"); + ASSERT_EQUALS(settings.getChunksize(), 1); + + BSONObj objBalancer = BSON(SettingsType::key("balancer") << + SettingsType::deprecated_secondaryThrottle(true)); + ASSERT(settings.parseBSON(objBalancer, &errMsg)); + ASSERT_EQUALS(errMsg, ""); + ASSERT_TRUE(settings.isValid(NULL)); + ASSERT_EQUALS(settings.getKey(), "balancer"); + ASSERT(settings.getSecondaryThrottle()); } TEST(Validity, BadType) { |