summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2014-06-17 17:45:53 -0400
committerRandolph Tan <randolph@10gen.com>2014-07-15 13:33:25 -0400
commitfa1233fbe4a48ef0675820f381987f1df4f42f75 (patch)
tree2f16f64be178a8aa417414dabb5e23f72c13e276 /src/mongo
parenta78c439ba94e08ab6079e5575f1959c39c9beb87 (diff)
downloadmongo-fa1233fbe4a48ef0675820f381987f1df4f42f75.tar.gz
SERVER-14041 enhance secondaryThrottle parameter
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/base/error_codes.err2
-rw-r--r--src/mongo/db/commands/cleanup_orphaned_cmd.cpp59
-rw-r--r--src/mongo/db/dbhelpers.cpp12
-rw-r--r--src/mongo/db/dbhelpers.h5
-rw-r--r--src/mongo/db/field_parser.cpp2
-rw-r--r--src/mongo/db/range_deleter.cpp27
-rw-r--r--src/mongo/db/range_deleter.h9
-rw-r--r--src/mongo/db/range_deleter_db_env.cpp6
-rw-r--r--src/mongo/db/range_deleter_test.cpp101
-rw-r--r--src/mongo/db/repl/repl_coordinator.h11
-rw-r--r--src/mongo/db/repl/repl_coordinator_hybrid.cpp7
-rw-r--r--src/mongo/db/repl/repl_coordinator_hybrid.h3
-rw-r--r--src/mongo/db/repl/repl_coordinator_impl.cpp7
-rw-r--r--src/mongo/db/repl/repl_coordinator_impl.h3
-rw-r--r--src/mongo/db/repl/repl_coordinator_legacy.cpp14
-rw-r--r--src/mongo/db/repl/repl_coordinator_legacy.h3
-rw-r--r--src/mongo/db/repl/repl_coordinator_mock.cpp6
-rw-r--r--src/mongo/db/repl/repl_coordinator_mock.h3
-rw-r--r--src/mongo/db/write_concern_options.cpp103
-rw-r--r--src/mongo/db/write_concern_options.h47
-rw-r--r--src/mongo/dbtests/dbhelper_tests.cpp4
-rw-r--r--src/mongo/s/balance.cpp40
-rw-r--r--src/mongo/s/balance.h6
-rw-r--r--src/mongo/s/chunk.cpp67
-rw-r--r--src/mongo/s/chunk.h5
-rw-r--r--src/mongo/s/commands_admin.cpp19
-rw-r--r--src/mongo/s/d_migrate.cpp200
-rw-r--r--src/mongo/s/grid.cpp72
-rw-r--r--src/mongo/s/grid.h30
-rw-r--r--src/mongo/s/type_settings.cpp82
-rw-r--r--src/mongo/s/type_settings.h72
-rw-r--r--src/mongo/s/type_settings_test.cpp31
32 files changed, 834 insertions, 224 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err
index 2415fee1c91..95076f74414 100644
--- a/src/mongo/base/error_codes.err
+++ b/src/mongo/base/error_codes.err
@@ -98,6 +98,8 @@ error_code("NotSecondary", 95)
error_code("OperationFailed", 96)
error_code("NoProjectionFound", 97)
error_code("DBPathInUse", 98)
+error_code("WriteConcernNotDefined", 99)
+error_code("CannotSatisfyWriteConcern", 100)
# Non-sequential error codes (for compatibility only)
error_code("NotMaster", 10107) #this comes from assert_util.h
diff --git a/src/mongo/db/commands/cleanup_orphaned_cmd.cpp b/src/mongo/db/commands/cleanup_orphaned_cmd.cpp
index 89e7f49ab4b..3fd6dbcb9d2 100644
--- a/src/mongo/db/commands/cleanup_orphaned_cmd.cpp
+++ b/src/mongo/db/commands/cleanup_orphaned_cmd.cpp
@@ -41,11 +41,22 @@
#include "mongo/db/jsobj.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/range_deleter_service.h"
+#include "mongo/db/repl/repl_coordinator_global.h"
#include "mongo/s/collection_metadata.h"
#include "mongo/s/d_logic.h"
#include "mongo/s/range_arithmetic.h"
+#include "mongo/s/type_settings.h"
#include "mongo/util/log.h"
+namespace {
+ using mongo::WriteConcernOptions;
+
+ const int kDefaultWTimeoutMs = 60 * 1000;
+ const WriteConcernOptions DefaultWriteConcern("majority",
+ WriteConcernOptions::NONE,
+ kDefaultWTimeoutMs);
+}
+
namespace mongo {
MONGO_LOG_DEFAULT_COMPONENT_FILE(::mongo::logger::LogComponent::kCommands);
@@ -69,7 +80,7 @@ namespace mongo {
CleanupResult cleanupOrphanedData( OperationContext* txn,
const NamespaceString& ns,
const BSONObj& startingFromKeyConst,
- bool secondaryThrottle,
+ const WriteConcernOptions& secondaryThrottle,
BSONObj* stoppedAtKey,
string* errMsg ) {
@@ -153,6 +164,17 @@ namespace mongo {
* the balancer is off.
*
* Safe to call with the balancer on.
+ *
+ * Format:
+ *
+ * {
+ * cleanupOrphaned: <ns>,
+ * // optional parameters:
+ * startingAtKey: { <shardKeyValue> }, // defaults to lowest value
+ * secondaryThrottle: <bool>, // defaults to true
+ * // defaults to { w: "majority", wtimeout: 60000 }. Applies to individual writes.
+ * writeConcern: { <writeConcern options> }
+ * }
*/
class CleanupOrphanedCommand : public Command {
public:
@@ -179,7 +201,6 @@ namespace mongo {
// Input
static BSONField<string> nsField;
static BSONField<BSONObj> startingFromKeyField;
- static BSONField<bool> secondaryThrottleField;
// Output
static BSONField<BSONObj> stoppedAtKeyField;
@@ -210,12 +231,29 @@ namespace mongo {
return false;
}
- bool secondaryThrottle = true;
- if ( !FieldParser::extract( cmdObj,
- secondaryThrottleField,
- &secondaryThrottle,
- &errmsg ) ) {
- return false;
+ WriteConcernOptions writeConcern;
+ Status status = writeConcern.parseSecondaryThrottle(cmdObj, NULL);
+
+ if (!status.isOK()){
+ if (status.code() != ErrorCodes::WriteConcernNotDefined) {
+ return appendCommandStatus(result, status);
+ }
+
+ writeConcern = DefaultWriteConcern;
+ }
+ else {
+ repl::ReplicationCoordinator* replCoordinator =
+ repl::getGlobalReplicationCoordinator();
+ Status status = replCoordinator->checkIfWriteConcernCanBeSatisfied(writeConcern);
+ if (!status.isOK()) {
+ return appendCommandStatus(result, status);
+ }
+ }
+
+ if (writeConcern.shouldWaitForOtherNodes() &&
+ writeConcern.wTimeout == WriteConcernOptions::kNoTimeout) {
+ // Don't allow no timeout.
+ writeConcern.wTimeout = kDefaultWTimeoutMs;
}
if (!shardingState.enabled()) {
@@ -225,7 +263,7 @@ namespace mongo {
}
ChunkVersion shardVersion;
- Status status = shardingState.refreshMetadataNow( ns, &shardVersion );
+ status = shardingState.refreshMetadataNow( ns, &shardVersion );
if ( !status.isOK() ) {
if ( status.code() == ErrorCodes::RemoteChangeDetected ) {
warning() << "Shard version in transition detected while refreshing "
@@ -242,7 +280,7 @@ namespace mongo {
CleanupResult cleanupResult = cleanupOrphanedData( txn,
NamespaceString( ns ),
startingFromKey,
- secondaryThrottle,
+ writeConcern,
&stoppedAtKey,
&errmsg );
@@ -263,7 +301,6 @@ namespace mongo {
BSONField<string> CleanupOrphanedCommand::nsField( "cleanupOrphaned" );
BSONField<BSONObj> CleanupOrphanedCommand::startingFromKeyField( "startingFromKey" );
- BSONField<bool> CleanupOrphanedCommand::secondaryThrottleField( "secondaryThrottle" );
BSONField<BSONObj> CleanupOrphanedCommand::stoppedAtKeyField( "stoppedAtKey" );
MONGO_INITIALIZER(RegisterCleanupOrphanedCommand)(InitializerContext* context) {
diff --git a/src/mongo/db/dbhelpers.cpp b/src/mongo/db/dbhelpers.cpp
index 83b5b0b7ec5..8cc0eb0bd1c 100644
--- a/src/mongo/db/dbhelpers.cpp
+++ b/src/mongo/db/dbhelpers.cpp
@@ -52,6 +52,7 @@
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/repl_coordinator_global.h"
#include "mongo/db/write_concern.h"
+#include "mongo/db/write_concern_options.h"
#include "mongo/db/operation_context_impl.h"
#include "mongo/db/storage_options.h"
#include "mongo/db/catalog/collection.h"
@@ -305,7 +306,7 @@ namespace mongo {
long long Helpers::removeRange( OperationContext* txn,
const KeyRange& range,
bool maxInclusive,
- bool secondaryThrottle,
+ const WriteConcernOptions& writeConcern,
RemoveSaver* callback,
bool fromMigrate,
bool onlyRemoveOrphanedDocs )
@@ -342,7 +343,7 @@ namespace mongo {
Helpers::toKeyFormat( indexKeyPattern.extendRangeBound(range.maxKey,maxInclusive));
LOG(1) << "begin removal of " << min << " to " << max << " in " << ns
- << (secondaryThrottle ? " (waiting for secondaries)" : "" ) << endl;
+ << " with write concern: " << writeConcern.toBSON() << endl;
Client& c = cc();
@@ -435,10 +436,7 @@ namespace mongo {
// TODO remove once the yielding below that references this timer has been removed
Timer secondaryThrottleTime;
- if ( secondaryThrottle && numDeleted > 0 ) {
- WriteConcernOptions writeConcern;
- writeConcern.wNumNodes = 2;
- writeConcern.wTimeout = 60 * 1000;
+ if (writeConcern.shouldWaitForOtherNodes() && numDeleted > 0) {
repl::ReplicationCoordinator::StatusAndDuration replStatus =
repl::getGlobalReplicationCoordinator()->awaitReplication(txn,
c.getLastOp(),
@@ -454,7 +452,7 @@ namespace mongo {
}
}
- if ( secondaryThrottle )
+ if (writeConcern.shouldWaitForOtherNodes())
log() << "Helpers::removeRangeUnlocked time spent waiting for replication: "
<< millisWaitingForReplication << "ms" << endl;
diff --git a/src/mongo/db/dbhelpers.h b/src/mongo/db/dbhelpers.h
index be0ca859248..865543c74f9 100644
--- a/src/mongo/db/dbhelpers.h
+++ b/src/mongo/db/dbhelpers.h
@@ -41,6 +41,7 @@ namespace mongo {
class Collection;
class Cursor;
class OperationContext;
+ struct WriteConcernOptions;
/**
* db helpers are helper functions and classes that let us easily manipulate the local
@@ -164,8 +165,8 @@ namespace mongo {
*/
static long long removeRange( OperationContext* txn,
const KeyRange& range,
- bool maxInclusive = false,
- bool secondaryThrottle = false,
+ bool maxInclusive,
+ const WriteConcernOptions& secondaryThrottle,
RemoveSaver* callback = NULL,
bool fromMigrate = false,
bool onlyRemoveOrphanedDocs = false );
diff --git a/src/mongo/db/field_parser.cpp b/src/mongo/db/field_parser.cpp
index 40c8c9664eb..47b5b8cf1c0 100644
--- a/src/mongo/db/field_parser.cpp
+++ b/src/mongo/db/field_parser.cpp
@@ -111,7 +111,7 @@ namespace mongo {
{
if (elem.eoo()) {
if (field.hasDefault()) {
- *out = field.getDefault();
+ *out = field.getDefault().getOwned();
return FIELD_DEFAULT;
}
else {
diff --git a/src/mongo/db/range_deleter.cpp b/src/mongo/db/range_deleter.cpp
index d096cab1219..e75ce96f0bd 100644
--- a/src/mongo/db/range_deleter.cpp
+++ b/src/mongo/db/range_deleter.cpp
@@ -189,7 +189,7 @@ namespace mongo {
const BSONObj& min,
const BSONObj& max,
const BSONObj& shardKeyPattern,
- bool secondaryThrottle,
+ const WriteConcernOptions& writeConcern,
Notification* notifyDone,
std::string* errMsg) {
string dummy;
@@ -199,7 +199,7 @@ namespace mongo {
min.getOwned(),
max.getOwned(),
shardKeyPattern.getOwned(),
- secondaryThrottle));
+ writeConcern));
toDelete->notifyDone = notifyDone;
{
@@ -243,10 +243,13 @@ namespace mongo {
}
namespace {
- bool _waitForReplication(OperationContext* txn, std::string* errMsg) {
- WriteConcernOptions writeConcern;
- writeConcern.wMode = "majority";
- writeConcern.wTimeout = 60 * 60 * 1000;
+ const int kWTimeoutMillis = 60 * 60 * 1000;
+
+ bool _waitForMajority(OperationContext* txn, std::string* errMsg) {
+ const WriteConcernOptions writeConcern("majority",
+ WriteConcernOptions::NONE,
+ kWTimeoutMillis);
+
repl::ReplicationCoordinator::StatusAndDuration replStatus =
repl::getGlobalReplicationCoordinator()->awaitReplicationOfLastOp(txn,
writeConcern);
@@ -279,7 +282,7 @@ namespace {
const BSONObj& min,
const BSONObj& max,
const BSONObj& shardKeyPattern,
- bool secondaryThrottle,
+ const WriteConcernOptions& writeConcern,
string* errMsg) {
if (stopRequested()) {
*errMsg = "deleter is already stopped.";
@@ -314,7 +317,7 @@ namespace {
<< " cursors in " << ns << " to finish" << endl;
}
- RangeDeleteEntry taskDetails(ns, min, max, shardKeyPattern, secondaryThrottle);
+ RangeDeleteEntry taskDetails(ns, min, max, shardKeyPattern, writeConcern);
taskDetails.stats.queueStartTS = jsTime();
Date_t timeSinceLastLog;
@@ -373,7 +376,7 @@ namespace {
if (result) {
taskDetails.stats.waitForReplStartTS = jsTime();
- result = _waitForReplication(txn, errMsg);
+ result = _waitForMajority(txn, errMsg);
taskDetails.stats.waitForReplEndTS = jsTime();
}
@@ -555,7 +558,7 @@ namespace {
if (delResult) {
nextTask->stats.waitForReplStartTS = jsTime();
- if (!_waitForReplication(txn.get(), &errMsg)) {
+ if (!_waitForMajority(txn.get(), &errMsg)) {
warning() << "Error encountered while waiting for replication: " << errMsg;
}
@@ -662,12 +665,12 @@ namespace {
const BSONObj& min,
const BSONObj& max,
const BSONObj& shardKey,
- bool secondaryThrottle):
+ const WriteConcernOptions& writeConcern):
ns(ns),
min(min),
max(max),
shardKeyPattern(shardKey),
- secondaryThrottle(secondaryThrottle),
+ writeConcern(writeConcern),
notifyDone(NULL) {
}
diff --git a/src/mongo/db/range_deleter.h b/src/mongo/db/range_deleter.h
index 022f3c84872..cda4578b02b 100644
--- a/src/mongo/db/range_deleter.h
+++ b/src/mongo/db/range_deleter.h
@@ -39,6 +39,7 @@
#include "mongo/db/clientcursor.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/write_concern_options.h"
#include "mongo/util/concurrency/mutex.h"
#include "mongo/util/concurrency/synchronization.h"
#include "mongo/util/time_support.h"
@@ -138,7 +139,7 @@ namespace mongo {
const BSONObj& min,
const BSONObj& max,
const BSONObj& shardKeyPattern,
- bool secondaryThrottle,
+ const WriteConcernOptions& writeConcern,
Notification* notifyDone,
std::string* errMsg);
@@ -154,7 +155,7 @@ namespace mongo {
const BSONObj& min,
const BSONObj& max,
const BSONObj& shardKeyPattern,
- bool secondaryThrottle,
+ const WriteConcernOptions& writeConcern,
std::string* errMsg);
/**
@@ -309,7 +310,7 @@ namespace mongo {
const BSONObj& min,
const BSONObj& max,
const BSONObj& shardKey,
- bool secondaryThrottle);
+ const WriteConcernOptions& writeConcern);
const std::string ns;
@@ -324,7 +325,7 @@ namespace mongo {
// like hash indexes.
const BSONObj shardKeyPattern;
- const bool secondaryThrottle;
+ const WriteConcernOptions writeConcern;
// Sets of cursors to wait to close until this can be ready
// for deletion.
diff --git a/src/mongo/db/range_deleter_db_env.cpp b/src/mongo/db/range_deleter_db_env.cpp
index 3658f99cd2c..29e96e4ead4 100644
--- a/src/mongo/db/range_deleter_db_env.cpp
+++ b/src/mongo/db/range_deleter_db_env.cpp
@@ -63,7 +63,7 @@ namespace mongo {
const BSONObj inclusiveLower(taskDetails.min);
const BSONObj exclusiveUpper(taskDetails.max);
const BSONObj keyPattern(taskDetails.shardKeyPattern);
- const bool secondaryThrottle(taskDetails.secondaryThrottle);
+ const WriteConcernOptions writeConcern(taskDetails.writeConcern);
const bool initiallyHaveClient = haveClient();
@@ -85,8 +85,6 @@ namespace mongo {
<< endl;
try {
- bool throttle = repl::getGlobalReplicationCoordinator()->getReplicationMode() ==
- repl::ReplicationCoordinator::modeReplSet ? secondaryThrottle : false;
*deletedDocs =
Helpers::removeRange(txn,
KeyRange(ns,
@@ -94,7 +92,7 @@ namespace mongo {
exclusiveUpper,
keyPattern),
false, /*maxInclusive*/
- throttle,
+ writeConcern,
serverGlobalParams.moveParanoia ? &removeSaver : NULL,
true, /*fromMigrate*/
true); /*onlyRemoveOrphans*/
diff --git a/src/mongo/db/range_deleter_test.cpp b/src/mongo/db/range_deleter_test.cpp
index 05ce3918320..c0b50aee619 100644
--- a/src/mongo/db/range_deleter_test.cpp
+++ b/src/mongo/db/range_deleter_test.cpp
@@ -35,6 +35,7 @@
#include "mongo/db/repl/repl_coordinator_global.h"
#include "mongo/db/repl/repl_coordinator_mock.h"
#include "mongo/db/repl/repl_settings.h"
+#include "mongo/db/write_concern_options.h"
#include "mongo/stdx/functional.h"
#include "mongo/unittest/unittest.h"
@@ -57,6 +58,7 @@ namespace {
const int MAX_IMMEDIATE_DELETE_WAIT_SECS = 2;
const mongo::repl::ReplSettings replSettings;
+ const mongo::WriteConcernOptions dummyWriteConcern;
// Should not be able to queue deletes if deleter workers were not started.
TEST(QueueDelete, CantAfterStop) {
@@ -73,7 +75,7 @@ namespace {
BSON("x" << 120),
BSON("x" << 200),
BSON("x" << 1),
- true,
+ dummyWriteConcern,
NULL /* notifier not needed */,
&errMsg));
ASSERT_FALSE(errMsg.empty());
@@ -93,8 +95,13 @@ namespace {
env->addCursorId(ns, 345);
Notification notifyDone;
- ASSERT_TRUE(deleter.queueDelete(ns, BSON("x" << 0), BSON("x" << 10), BSON("x" << 1),
- true, &notifyDone, NULL /* errMsg not needed */));
+ ASSERT_TRUE(deleter.queueDelete(ns,
+ BSON("x" << 0),
+ BSON("x" << 10),
+ BSON("x" << 1),
+ dummyWriteConcern,
+ &notifyDone,
+ NULL /* errMsg not needed */));
env->waitForNthGetCursor(1u);
@@ -129,8 +136,13 @@ namespace {
env->addCursorId(ns, 345);
Notification notifyDone;
- ASSERT_TRUE(deleter.queueDelete(ns, BSON("x" << 0), BSON("x" << 10), BSON("x" << 1),
- true, &notifyDone, NULL /* errMsg not needed */));
+ ASSERT_TRUE(deleter.queueDelete(ns,
+ BSON("x" << 0),
+ BSON("x" << 10),
+ BSON("x" << 1),
+ dummyWriteConcern,
+ &notifyDone,
+ NULL /* errMsg not needed */));
env->waitForNthGetCursor(1u);
@@ -145,9 +157,9 @@ namespace {
const BSONObj& min,
const BSONObj& max,
const BSONObj& shardKeyPattern,
- bool secondaryThrottle,
+ const mongo::WriteConcernOptions& secondaryThrottle,
std::string* errMsg) {
- deleter->deleteNow(txn, ns, min, max, shardKeyPattern, secondaryThrottle, errMsg);
+ deleter->deleteNow(txn,ns, min, max, shardKeyPattern, secondaryThrottle, errMsg);
}
// Should not start delete if the set of cursors that were open when the
@@ -171,7 +183,7 @@ namespace {
BSON("x" << 0),
BSON("x" << 10),
BSON("x" << 1),
- true,
+ dummyWriteConcern,
&errMsg));
env->waitForNthGetCursor(1u);
@@ -220,7 +232,7 @@ namespace {
BSON("x" << 0),
BSON("x" << 10),
BSON("x" << 1),
- true,
+ dummyWriteConcern,
&errMsg));
env->waitForNthGetCursor(1u);
@@ -262,7 +274,7 @@ namespace {
BSON("x" << 10),
BSON("x" << 20),
BSON("x" << 1),
- true,
+ dummyWriteConcern,
&notifyDone1,
NULL /* don't care errMsg */));
@@ -276,7 +288,7 @@ namespace {
BSON("x" << 20),
BSON("x" << 30),
BSON("x" << 1),
- true,
+ dummyWriteConcern,
&notifyDone2,
NULL /* don't care errMsg */));
@@ -285,7 +297,7 @@ namespace {
BSON("x" << 30),
BSON("x" << 40),
BSON("x" << 1),
- true,
+ dummyWriteConcern,
&notifyDone3,
NULL /* don't care errMsg */));
@@ -358,13 +370,23 @@ namespace {
ASSERT_TRUE(errMsg.empty());
errMsg.clear();
- ASSERT_FALSE(deleter.queueDelete(ns, BSON("x" << 120), BSON("x" << 140), BSON("x" << 1),
- false, NULL /* notifier not needed */, &errMsg));
+ ASSERT_FALSE(deleter.queueDelete(ns,
+ BSON("x" << 120),
+ BSON("x" << 140),
+ BSON("x" << 1),
+ dummyWriteConcern,
+ NULL /* notifier not needed */,
+ &errMsg));
ASSERT_FALSE(errMsg.empty());
errMsg.clear();
- ASSERT_FALSE(deleter.deleteNow(noTxn, ns, BSON("x" << 120), BSON("x" << 140),
- BSON("x" << 1), false, &errMsg));
+ ASSERT_FALSE(deleter.deleteNow(noTxn,
+ ns,
+ BSON("x" << 120),
+ BSON("x" << 140),
+ BSON("x" << 1),
+ dummyWriteConcern,
+ &errMsg));
ASSERT_FALSE(errMsg.empty());
ASSERT_FALSE(env->deleteOccured());
@@ -410,8 +432,13 @@ namespace {
env->addCursorId(ns, 58);
Notification notifyDone;
- deleter.queueDelete(ns, BSON("x" << 0), BSON("x" << 10), BSON("x" << 1),
- false, &notifyDone, NULL /* errMsg not needed */);
+ deleter.queueDelete(ns,
+ BSON("x" << 0),
+ BSON("x" << 10),
+ BSON("x" << 1),
+ dummyWriteConcern,
+ &notifyDone,
+ NULL /* errMsg not needed */);
string errMsg;
ASSERT_FALSE(deleter.addToBlackList(ns, BSON("x" << 5), BSON("x" << 15), &errMsg));
@@ -448,7 +475,7 @@ namespace {
BSON("x" << 64),
BSON("x" << 70),
BSON("x" << 1),
- true,
+ dummyWriteConcern,
&delErrMsg));
env->waitForNthPausedDelete(1u);
@@ -484,8 +511,13 @@ namespace {
ASSERT_FALSE(deleter.removeFromBlackList(ns, BSON("x" << 1234), BSON("x" << 9000)));
// Range should still be blacklisted
- ASSERT_FALSE(deleter.deleteNow(noTxn, ns, BSON("x" << 2000), BSON("x" << 4000), BSON("x" << 1),
- false, NULL /* errMsg not needed */));
+ ASSERT_FALSE(deleter.deleteNow(noTxn,
+ ns,
+ BSON("x" << 2000),
+ BSON("x" << 4000),
+ BSON("x" << 1),
+ dummyWriteConcern,
+ NULL /* errMsg not needed */));
deleter.stopWorkers();
}
@@ -503,15 +535,25 @@ namespace {
ASSERT_TRUE(errMsg.empty());
errMsg.clear();
- ASSERT_FALSE(deleter.deleteNow(noTxn, ns, BSON("x" << 600), BSON("x" << 700),
- BSON("x" << 1), false, &errMsg));
+ ASSERT_FALSE(deleter.deleteNow(noTxn,
+ ns,
+ BSON("x" << 600),
+ BSON("x" << 700),
+ BSON("x" << 1),
+ dummyWriteConcern,
+ &errMsg));
ASSERT_FALSE(errMsg.empty());
ASSERT_TRUE(deleter.removeFromBlackList(ns, BSON("x" << 500), BSON("x" << 801)));
errMsg.clear();
- ASSERT_TRUE(deleter.deleteNow(noTxn, ns, BSON("x" << 600), BSON("x" << 700),
- BSON("x" << 1), false, &errMsg));
+ ASSERT_TRUE(deleter.deleteNow(noTxn,
+ ns,
+ BSON("x" << 600),
+ BSON("x" << 700),
+ BSON("x" << 1),
+ dummyWriteConcern,
+ &errMsg));
ASSERT_TRUE(errMsg.empty());
deleter.stopWorkers();
@@ -527,8 +569,13 @@ namespace {
deleter.addToBlackList("foo.bar", BSON("x" << 100), BSON("x" << 200),
NULL /* errMsg not needed */);
- ASSERT_TRUE(deleter.deleteNow(noTxn, "test.user", BSON("x" << 120), BSON("x" << 140),
- BSON("x" << 1), true, NULL /* errMsg not needed */));
+ ASSERT_TRUE(deleter.deleteNow(noTxn,
+ "test.user",
+ BSON("x" << 120),
+ BSON("x" << 140),
+ BSON("x" << 1),
+ dummyWriteConcern,
+ NULL /* errMsg not needed */));
deleter.stopWorkers();
}
diff --git a/src/mongo/db/repl/repl_coordinator.h b/src/mongo/db/repl/repl_coordinator.h
index cf9b20630e6..000476fd4e8 100644
--- a/src/mongo/db/repl/repl_coordinator.h
+++ b/src/mongo/db/repl/repl_coordinator.h
@@ -201,6 +201,17 @@ namespace repl {
virtual bool canAcceptWritesForDatabase(const StringData& dbName) = 0;
/**
+ * Checks if the current replica set configuration can satisfy the given write concern.
+ *
+ * Things that are taken into consideration include:
+ * 1. If the set has enough members.
+ * 2. If the tag exists.
+ * 3. If there are enough members for the tag specified.
+ */
+ virtual Status checkIfWriteConcernCanBeSatisfied(
+ const WriteConcernOptions& writeConcern) const = 0;
+
+ /*
* Returns Status::OK() if it is valid for this node to serve reads on the given collection
* and an errorcode indicating why the node cannot if it cannot.
*/
diff --git a/src/mongo/db/repl/repl_coordinator_hybrid.cpp b/src/mongo/db/repl/repl_coordinator_hybrid.cpp
index c065008e033..7ca34597f20 100644
--- a/src/mongo/db/repl/repl_coordinator_hybrid.cpp
+++ b/src/mongo/db/repl/repl_coordinator_hybrid.cpp
@@ -275,5 +275,12 @@ namespace repl {
return legacyResponse;
}
+ Status HybridReplicationCoordinator::checkIfWriteConcernCanBeSatisfied(
+ const WriteConcernOptions& writeConcern) const {
+ Status legacyStatus = _legacy.checkIfWriteConcernCanBeSatisfied(writeConcern);
+ Status implStatus = _impl.checkIfWriteConcernCanBeSatisfied(writeConcern);
+ return legacyStatus;
+ }
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/repl_coordinator_hybrid.h b/src/mongo/db/repl/repl_coordinator_hybrid.h
index 8a339e556d5..bf375e0a0ee 100644
--- a/src/mongo/db/repl/repl_coordinator_hybrid.h
+++ b/src/mongo/db/repl/repl_coordinator_hybrid.h
@@ -83,6 +83,9 @@ namespace repl {
virtual bool canAcceptWritesForDatabase(const StringData& dbName);
+ virtual Status checkIfWriteConcernCanBeSatisfied(
+ const WriteConcernOptions& writeConcern) const;
+
virtual Status canServeReadsFor(const NamespaceString& ns, bool slaveOk);
virtual bool shouldIgnoreUniqueIndex(const IndexDescriptor* idx);
diff --git a/src/mongo/db/repl/repl_coordinator_impl.cpp b/src/mongo/db/repl/repl_coordinator_impl.cpp
index 9949b10a673..5da1a1deb39 100644
--- a/src/mongo/db/repl/repl_coordinator_impl.cpp
+++ b/src/mongo/db/repl/repl_coordinator_impl.cpp
@@ -446,5 +446,12 @@ namespace repl {
// TODO
return std::vector<BSONObj>();
}
+
+ Status ReplicationCoordinatorImpl::checkIfWriteConcernCanBeSatisfied(
+ const WriteConcernOptions& writeConcern) const {
+ // TODO
+ return Status::OK();
+ }
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/repl_coordinator_impl.h b/src/mongo/db/repl/repl_coordinator_impl.h
index 095c3897a70..62e043f9d0c 100644
--- a/src/mongo/db/repl/repl_coordinator_impl.h
+++ b/src/mongo/db/repl/repl_coordinator_impl.h
@@ -89,6 +89,9 @@ namespace repl {
virtual bool canAcceptWritesForDatabase(const StringData& database);
+ virtual Status checkIfWriteConcernCanBeSatisfied(
+ const WriteConcernOptions& writeConcern) const;
+
virtual Status canServeReadsFor(const NamespaceString& ns, bool slaveOk);
virtual bool shouldIgnoreUniqueIndex(const IndexDescriptor* idx);
diff --git a/src/mongo/db/repl/repl_coordinator_legacy.cpp b/src/mongo/db/repl/repl_coordinator_legacy.cpp
index f4be0d26369..17c30dfb9e0 100644
--- a/src/mongo/db/repl/repl_coordinator_legacy.cpp
+++ b/src/mongo/db/repl/repl_coordinator_legacy.cpp
@@ -944,5 +944,19 @@ namespace {
return repl::getHostsWrittenTo(op);
}
+ Status LegacyReplicationCoordinator::checkIfWriteConcernCanBeSatisfied(
+ const WriteConcernOptions& writeConcern) const {
+ // TODO: rewrite this method with the correct version. Note that this just a
+ // temporary stub for secondary throttle.
+
+ if (getReplicationMode() == ReplicationCoordinator::modeReplSet) {
+ if (writeConcern.wNumNodes > 1 && theReplSet->config().getMajority() <= 1) {
+ return Status(ErrorCodes::CannotSatisfyWriteConcern, "not enough nodes");
+ }
+ }
+
+ return Status::OK();
+ }
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/repl_coordinator_legacy.h b/src/mongo/db/repl/repl_coordinator_legacy.h
index a9095f80a09..f9cf15be96c 100644
--- a/src/mongo/db/repl/repl_coordinator_legacy.h
+++ b/src/mongo/db/repl/repl_coordinator_legacy.h
@@ -79,6 +79,9 @@ namespace repl {
virtual bool canAcceptWritesForDatabase(const StringData& dbName);
+ virtual Status checkIfWriteConcernCanBeSatisfied(
+ const WriteConcernOptions& writeConcern) const;
+
virtual Status canServeReadsFor(const NamespaceString& ns, bool slaveOk);
virtual bool shouldIgnoreUniqueIndex(const IndexDescriptor* idx);
diff --git a/src/mongo/db/repl/repl_coordinator_mock.cpp b/src/mongo/db/repl/repl_coordinator_mock.cpp
index b7b712bd0d4..aab04be4938 100644
--- a/src/mongo/db/repl/repl_coordinator_mock.cpp
+++ b/src/mongo/db/repl/repl_coordinator_mock.cpp
@@ -223,5 +223,11 @@ namespace repl {
// TODO
return std::vector<BSONObj>();
}
+
+ Status ReplicationCoordinatorMock::checkIfWriteConcernCanBeSatisfied(
+ const WriteConcernOptions& writeConcern) const {
+ return Status::OK();
+ }
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/repl_coordinator_mock.h b/src/mongo/db/repl/repl_coordinator_mock.h
index 260ed685705..1ac42d4a248 100644
--- a/src/mongo/db/repl/repl_coordinator_mock.h
+++ b/src/mongo/db/repl/repl_coordinator_mock.h
@@ -82,6 +82,9 @@ namespace repl {
virtual bool canAcceptWritesForDatabase(const StringData& dbName);
+ virtual Status checkIfWriteConcernCanBeSatisfied(
+ const WriteConcernOptions& writeConcern) const;
+
virtual Status canServeReadsFor(const NamespaceString& ns, bool slaveOk);
virtual bool shouldIgnoreUniqueIndex(const IndexDescriptor* idx);
diff --git a/src/mongo/db/write_concern_options.cpp b/src/mongo/db/write_concern_options.cpp
index 848c4e4464c..90690b87599 100644
--- a/src/mongo/db/write_concern_options.cpp
+++ b/src/mongo/db/write_concern_options.cpp
@@ -27,7 +27,9 @@
#include "mongo/db/write_concern_options.h"
+#include "mongo/bson/bson_field.h"
#include "mongo/client/dbclientinterface.h"
+#include "mongo/db/field_parser.h"
namespace mongo {
@@ -36,6 +38,27 @@ namespace mongo {
const BSONObj WriteConcernOptions::AllConfigs = BSONObj();
const BSONObj WriteConcernOptions::Unacknowledged(BSON("w" << W_NONE));
+ static const BSONField<bool> mongosSecondaryThrottleField("_secondaryThrottle", true);
+ static const BSONField<bool> secondaryThrottleField("secondaryThrottle", true);
+ static const BSONField<BSONObj> writeConcernField("writeConcern");
+
+ WriteConcernOptions::WriteConcernOptions(int numNodes,
+ SyncMode sync,
+ int timeout):
+ syncMode(sync),
+ wNumNodes(numNodes),
+ wTimeout(timeout) {
+ }
+
+ WriteConcernOptions::WriteConcernOptions(const std::string& mode,
+ SyncMode sync,
+ int timeout):
+ syncMode(sync),
+ wNumNodes(0),
+ wMode(mode),
+ wTimeout(timeout) {
+ }
+
Status WriteConcernOptions::parse( const BSONObj& obj ) {
if ( obj.isEmpty() ) {
return Status( ErrorCodes::FailedToParse, "write concern object cannot be empty" );
@@ -86,4 +109,84 @@ namespace mongo {
return Status::OK();
}
+
+ Status WriteConcernOptions::parseSecondaryThrottle(const BSONObj& doc,
+ BSONObj* rawWriteConcernObj) {
+ string errMsg;
+ bool isSecondaryThrottle;
+ FieldParser::FieldState fieldState = FieldParser::extract(doc,
+ secondaryThrottleField,
+ &isSecondaryThrottle,
+ &errMsg);
+ if (fieldState == FieldParser::FIELD_INVALID) {
+ return Status(ErrorCodes::FailedToParse, errMsg);
+ }
+
+ if (fieldState != FieldParser::FIELD_SET) {
+ fieldState = FieldParser::extract(doc,
+ mongosSecondaryThrottleField,
+ &isSecondaryThrottle,
+ &errMsg);
+
+ if (fieldState == FieldParser::FIELD_INVALID) {
+ return Status(ErrorCodes::FailedToParse, errMsg);
+ }
+ }
+
+ BSONObj dummyBSON;
+ if (!rawWriteConcernObj) {
+ rawWriteConcernObj = &dummyBSON;
+ }
+
+ fieldState = FieldParser::extract(doc,
+ writeConcernField,
+ rawWriteConcernObj,
+ &errMsg);
+ if (fieldState == FieldParser::FIELD_INVALID) {
+ return Status(ErrorCodes::FailedToParse, errMsg);
+ }
+
+ if (!isSecondaryThrottle) {
+ if (!rawWriteConcernObj->isEmpty()) {
+ return Status(ErrorCodes::UnsupportedFormat,
+ "Cannot have write concern when secondary throttle is false");
+ }
+
+ wNumNodes = 1;
+ return Status::OK();
+ }
+
+ if (rawWriteConcernObj->isEmpty()) {
+ return Status(ErrorCodes::WriteConcernNotDefined,
+ "Secondary throttle is on, but write concern is not specified");
+ }
+
+ return parse(*rawWriteConcernObj);
+ }
+
+ BSONObj WriteConcernOptions::toBSON() const {
+ BSONObjBuilder builder;
+
+ if (wMode.empty()) {
+ builder.append("w", wNumNodes);
+ }
+ else {
+ builder.append("w", wMode);
+ }
+
+ if (syncMode == FSYNC) {
+ builder.append("fsync", true);
+ }
+ else if (syncMode == JOURNAL) {
+ builder.append("j", true);
+ }
+
+ builder.append("wtimeout", wTimeout);
+
+ return builder.obj();
+ }
+
+ bool WriteConcernOptions::shouldWaitForOtherNodes() const {
+ return !wMode.empty() || wNumNodes > 1;
+ }
}
diff --git a/src/mongo/db/write_concern_options.h b/src/mongo/db/write_concern_options.h
index 44c67b2938b..31b0f556c30 100644
--- a/src/mongo/db/write_concern_options.h
+++ b/src/mongo/db/write_concern_options.h
@@ -37,8 +37,11 @@ namespace mongo {
struct WriteConcernOptions {
public:
+ enum SyncMode { NONE, FSYNC, JOURNAL };
+
static const int kNoTimeout = 0;
static const int kNoWaiting = -1;
+
static const BSONObj Default;
static const BSONObj Acknowledged;
static const BSONObj AllConfigs;
@@ -46,8 +49,43 @@ namespace mongo {
WriteConcernOptions() { reset(); }
+ WriteConcernOptions(int numNodes,
+ SyncMode sync,
+ int timeout);
+
+ WriteConcernOptions(const std::string& mode,
+ SyncMode sync,
+ int timeout);
+
Status parse( const BSONObj& obj );
+ /**
+ * Extracts the write concern settings from the BSONObj. The BSON object should have
+ * the format:
+ *
+ * {
+ * ...
+ * secondaryThrottle: <bool>, // optional
+ * _secondaryThrottle: <bool>, // optional
+ * writeConcern: <BSONObj> // optional
+ * }
+ *
+ * Note: secondaryThrottle takes precedence over _secondaryThrottle.
+ *
+ * Also sets output parameter rawWriteConcernObj if the writeCocnern field exists.
+ *
+ * Returns OK if the parse was successful. Also returns ErrorCodes::WriteConcernNotDefined
+ * when secondary throttle is true but write concern was not specified.
+ */
+ Status parseSecondaryThrottle(const BSONObj& doc,
+ BSONObj* rawWriteConcernObj);
+
+ /**
+ * Return true if the server needs to wait for other secondary nodes to satisfy this
+ * write concern setting. Errs on the false positive for non-empty wMode.
+ */
+ bool shouldWaitForOtherNodes() const;
+
void reset() {
syncMode = NONE;
wNumNodes = 0;
@@ -55,11 +93,18 @@ namespace mongo {
wTimeout = 0;
}
- enum SyncMode { NONE, FSYNC, JOURNAL } syncMode;
+ // Returns the BSON representation of this object.
+ // Warning: does not return the same object passed on the last parse() call.
+ BSONObj toBSON() const;
+
+ SyncMode syncMode;
+ // The w parameter for this write concern. The wMode represents the string format and
+ // takes precedence over the numeric format wNumNodes.
int wNumNodes;
std::string wMode;
+ // Timeout in milliseconds.
int wTimeout;
};
diff --git a/src/mongo/dbtests/dbhelper_tests.cpp b/src/mongo/dbtests/dbhelper_tests.cpp
index 33d190ed34c..5877cfc85a2 100644
--- a/src/mongo/dbtests/dbhelper_tests.cpp
+++ b/src/mongo/dbtests/dbhelper_tests.cpp
@@ -30,6 +30,7 @@
#include "mongo/db/catalog/collection.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/operation_context_impl.h"
+#include "mongo/db/write_concern_options.h"
#include "mongo/dbtests/dbtests.h"
#include "mongo/unittest/unittest.h"
@@ -68,7 +69,8 @@ namespace mongo {
BSON( "_id" << _min ),
BSON( "_id" << _max ),
BSON( "_id" << 1 ) );
- Helpers::removeRange( &txn, range );
+ mongo::WriteConcernOptions dummyWriteConcern;
+ Helpers::removeRange(&txn, range, false, dummyWriteConcern);
wunit.commit();
}
diff --git a/src/mongo/s/balance.cpp b/src/mongo/s/balance.cpp
index b2be36d2113..d4fbc5cdb79 100644
--- a/src/mongo/s/balance.cpp
+++ b/src/mongo/s/balance.cpp
@@ -33,6 +33,7 @@
#include "mongo/client/dbclientcursor.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/write_concern.h"
+#include "mongo/db/write_concern_options.h"
#include "mongo/s/chunk.h"
#include "mongo/s/cluster_write.h"
#include "mongo/s/config.h"
@@ -61,7 +62,7 @@ namespace mongo {
}
int Balancer::_moveChunks(const vector<CandidateChunkPtr>* candidateChunks,
- bool secondaryThrottle,
+ const WriteConcernOptions* writeConcern,
bool waitForDelete)
{
int movedCount = 0;
@@ -98,7 +99,7 @@ namespace mongo {
BSONObj res;
if (c->moveAndCommit(Shard::make(chunkInfo.to),
Chunk::MaxChunkSize,
- secondaryThrottle,
+ writeConcern,
waitForDelete,
0, /* maxTimeMS */
res)) {
@@ -456,9 +457,16 @@ namespace mongo {
// refresh chunk size (even though another balancer might be active)
Chunk::refreshChunkSize();
- BSONObj balancerConfig;
+ SettingsType balancerConfig;
+ string errMsg;
+
+ if (!grid.getBalancerSettings(&balancerConfig, &errMsg)) {
+ warning() << errMsg;
+ return ;
+ }
+
// now make sure we should even be running
- if ( ! grid.shouldBalance( "", &balancerConfig ) ) {
+ if (!grid.shouldBalance(balancerConfig)) {
LOG(1) << "skipping balancing round because balancing is disabled" << endl;
// Ping again so scripts can determine if we're active without waiting
@@ -494,20 +502,22 @@ namespace mongo {
continue;
}
- LOG(1) << "*** start balancing round" << endl;
+ const bool waitForDelete = (balancerConfig.isWaitForDeleteSet() ?
+ balancerConfig.getWaitForDelete() : false);
- bool waitForDelete = false;
- if (balancerConfig["_waitForDelete"].trueValue()) {
- waitForDelete = balancerConfig["_waitForDelete"].trueValue();
+ StatusWith<WriteConcernOptions*> extractStatus =
+ balancerConfig.extractWriteConcern();
+ if (!extractStatus.isOK()) {
+ warning() << extractStatus.toString();
}
- bool secondaryThrottle = true; // default to on
- if ( balancerConfig[SettingsType::secondaryThrottle()].type() ) {
- secondaryThrottle = balancerConfig[SettingsType::secondaryThrottle()].trueValue();
- }
+ scoped_ptr<WriteConcernOptions> writeConcern(extractStatus.getValue());
- LOG(1) << "waitForDelete: " << waitForDelete << endl;
- LOG(1) << "secondaryThrottle: " << secondaryThrottle << endl;
+ LOG(1) << "*** start balancing round. "
+ << "waitForDelete: " << waitForDelete
+ << ", secondaryThrottle: "
+ << (writeConcern.get() ? writeConcern->toBSON().toString() : "default")
+ << endl;
vector<CandidateChunkPtr> candidateChunks;
_doBalanceRound( conn.conn() , &candidateChunks );
@@ -517,7 +527,7 @@ namespace mongo {
}
else {
_balancedLastTime = _moveChunks(&candidateChunks,
- secondaryThrottle,
+ writeConcern.get(),
waitForDelete );
}
diff --git a/src/mongo/s/balance.h b/src/mongo/s/balance.h
index 4e97c38b4f4..87504bd2033 100644
--- a/src/mongo/s/balance.h
+++ b/src/mongo/s/balance.h
@@ -38,6 +38,8 @@
namespace mongo {
+ class WriteConcernOptions;
+
/**
* The balancer is a background task that tries to keep the number of chunks across all servers of the cluster even. Although
* every mongos will have one balancer running, only one of them will be active at the any given point in time. The balancer
@@ -96,12 +98,12 @@ namespace mongo {
* Issues chunk migration request, one at a time.
*
* @param candidateChunks possible chunks to move
- * @param secondaryThrottle wait for secondaries to catch up before pushing more deletes
+ * @param writeConcern detailed write concern. NULL means the default write concern.
* @param waitForDelete wait for deletes to complete after each chunk move
* @return number of chunks effectively moved
*/
int _moveChunks(const std::vector<CandidateChunkPtr>* candidateChunks,
- bool secondaryThrottle,
+ const WriteConcernOptions* writeConcern,
bool waitForDelete);
/**
diff --git a/src/mongo/s/chunk.cpp b/src/mongo/s/chunk.cpp
index 4aef8141a88..d6611f4fdd7 100644
--- a/src/mongo/s/chunk.cpp
+++ b/src/mongo/s/chunk.cpp
@@ -56,6 +56,7 @@
#include "mongo/db/query/query_planner.h"
#include "mongo/db/query/query_planner_common.h"
#include "mongo/db/query/index_bounds_builder.h"
+#include "mongo/db/write_concern_options.h"
namespace mongo {
@@ -355,37 +356,50 @@ namespace mongo {
bool Chunk::moveAndCommit(const Shard& to,
long long chunkSize /* bytes */,
- bool secondaryThrottle,
+ const WriteConcernOptions* writeConcern,
bool waitForDelete,
int maxTimeMS,
- BSONObj& res) const
- {
+ BSONObj& res) const {
uassert( 10167 , "can't move shard to its current location!" , getShard() != to );
- log() << "moving chunk ns: " << _manager->getns() << " moving ( " << toString() << ") " << _shard.toString() << " -> " << to.toString() << endl;
+ log() << "moving chunk ns: " << _manager->getns() << " moving ( " << toString() << ") "
+ << _shard.toString() << " -> " << to.toString() << endl;
Shard from = _shard;
-
ScopedDbConnection fromconn(from.getConnString());
- bool worked = fromconn->runCommand( "admin" ,
- BSON( "moveChunk" << _manager->getns() <<
- "from" << from.getAddress().toString() <<
- "to" << to.getAddress().toString() <<
- // NEEDED FOR 2.0 COMPATIBILITY
- "fromShard" << from.getName() <<
- "toShard" << to.getName() <<
- ///////////////////////////////
- "min" << _min <<
- "max" << _max <<
- "maxChunkSizeBytes" << chunkSize <<
- "shardId" << genID() <<
- "configdb" << configServer.modelServer() <<
- "secondaryThrottle" << secondaryThrottle <<
- "waitForDelete" << waitForDelete <<
- LiteParsedQuery::cmdOptionMaxTimeMS << maxTimeMS
- ) ,
- res);
+ BSONObjBuilder builder;
+ builder.append("moveChunk", _manager->getns());
+ builder.append("from", from.getAddress().toString());
+ builder.append("to", to.getAddress().toString());
+ // NEEDED FOR 2.0 COMPATIBILITY
+ builder.append("fromShard", from.getName());
+ builder.append("toShard", to.getName());
+ ///////////////////////////////
+ builder.append("min", _min);
+ builder.append("max", _max);
+ builder.append("maxChunkSizeBytes", chunkSize);
+ builder.append("shardId", genID());
+ builder.append("configdb", configServer.modelServer());
+
+ // For legacy secondary throttle setting.
+ bool secondaryThrottle = true;
+ if (writeConcern &&
+ writeConcern->wNumNodes <= 1 &&
+ writeConcern->wMode.empty()) {
+ secondaryThrottle = false;
+ }
+
+ builder.append("secondaryThrottle", secondaryThrottle);
+
+ if (secondaryThrottle && writeConcern) {
+ builder.append("writeConcern", writeConcern->toBSON());
+ }
+
+ builder.append("waitForDelete", waitForDelete);
+ builder.append(LiteParsedQuery::cmdOptionMaxTimeMS, maxTimeMS);
+
+ bool worked = fromconn->runCommand("admin", builder.done(), res);
fromconn.done();
LOG( worked ? 1 : 0 ) << "moveChunk result: " << res << endl;
@@ -450,7 +464,8 @@ namespace mongo {
_dataWritten = 0; // we're splitting, so should wait a bit
}
- bool shouldBalance = grid.shouldBalance( _manager->getns() );
+ const bool shouldBalance = grid.getConfigShouldBalance() &&
+ grid.getCollShouldBalance(_manager->getns());
log() << "autosplitted " << _manager->getns()
<< " shard: " << toString()
@@ -489,11 +504,13 @@ namespace mongo {
log().stream() << "moving chunk (auto): " << toMove << " to: " << newLocation.toString() << endl;
BSONObj res;
+
+ WriteConcernOptions noThrottle;
massert( 10412 ,
str::stream() << "moveAndCommit failed: " << res ,
toMove->moveAndCommit( newLocation ,
MaxChunkSize ,
- false , /* secondaryThrottle - small chunk, no need */
+ &noThrottle, /* secondaryThrottle */
false, /* waitForDelete - small chunk, no need */
0, /* maxTimeMS - don't time out */
res ) );
diff --git a/src/mongo/s/chunk.h b/src/mongo/s/chunk.h
index f8f18df1474..71baafad061 100644
--- a/src/mongo/s/chunk.h
+++ b/src/mongo/s/chunk.h
@@ -46,6 +46,7 @@ namespace mongo {
class ChunkRange;
class ChunkManager;
class ChunkObjUnitTest;
+ class WriteConcernOptions;
typedef shared_ptr<const Chunk> ChunkPtr;
@@ -172,7 +173,7 @@ namespace mongo {
*
* @param to shard to move this chunk to
* @param chunSize maximum number of bytes beyond which the migrate should no go trhough
- * @param secondaryThrottle whether during migrate all writes should block for repl
+ * @param writeConcern detailed write concern. NULL means the default write concern.
* @param waitForDelete whether chunk move should wait for cleanup or return immediately
* @param maxTimeMS max time for the migrate request
* @param res the object containing details about the migrate execution
@@ -180,7 +181,7 @@ namespace mongo {
*/
bool moveAndCommit(const Shard& to,
long long chunkSize,
- bool secondaryThrottle,
+ const WriteConcernOptions* writeConcern,
bool waitForDelete,
int maxTimeMS,
BSONObj& res) const;
diff --git a/src/mongo/s/commands_admin.cpp b/src/mongo/s/commands_admin.cpp
index 79372173622..aa66197d0fa 100644
--- a/src/mongo/s/commands_admin.cpp
+++ b/src/mongo/s/commands_admin.cpp
@@ -48,6 +48,7 @@
#include "mongo/db/stats/counters.h"
#include "mongo/db/wire_version.h"
#include "mongo/db/write_concern.h"
+#include "mongo/db/write_concern_options.h"
#include "mongo/s/chunk.h"
#include "mongo/s/client_info.h"
#include "mongo/s/cluster_write.h"
@@ -772,8 +773,9 @@ namespace mongo {
continue;
BSONObj moveResult;
+ WriteConcernOptions noThrottle;
if (!chunk->moveAndCommit(to, Chunk::MaxChunkSize,
- false, true, 0, moveResult)) {
+ &noThrottle, true, 0, moveResult)) {
warning().stream()
<< "Couldn't move chunk " << chunk << " to shard " << to
<< " while sharding collection " << ns << ". Reason: "
@@ -1133,10 +1135,23 @@ namespace mongo {
return false;
}
+ scoped_ptr<WriteConcernOptions> writeConcern(new WriteConcernOptions());
+ Status status = writeConcern->parseSecondaryThrottle(cmdObj, NULL);
+
+ if (!status.isOK()){
+ if (status.code() != ErrorCodes::WriteConcernNotDefined) {
+ errmsg = status.toString();
+ return false;
+ }
+
+ // Let the shard decide what write concern to use.
+ writeConcern.reset();
+ }
+
BSONObj res;
if (!c->moveAndCommit(to,
maxChunkSizeBytes,
- cmdObj["_secondaryThrottle"].trueValue(),
+ writeConcern.get(),
cmdObj["_waitForDelete"].trueValue(),
maxTimeMS.getValue(),
res)) {
diff --git a/src/mongo/s/d_migrate.cpp b/src/mongo/s/d_migrate.cpp
index 2fb6aa8be8f..414bc337249 100644
--- a/src/mongo/s/d_migrate.cpp
+++ b/src/mongo/s/d_migrate.cpp
@@ -88,6 +88,31 @@
using namespace std;
+namespace {
+ using mongo::WriteConcernOptions;
+ using mongo::repl::ReplicationCoordinator;
+
+ const int kDefaultWTimeoutMs = 60 * 1000;
+ const WriteConcernOptions DefaultWriteConcern(2, WriteConcernOptions::NONE, kDefaultWTimeoutMs);
+
+ /**
+ * Returns the default write concern for migration cleanup (at donor shard) and
+ * cloning documents (at recipient shard).
+ */
+ WriteConcernOptions getDefaultWriteConcern() {
+ ReplicationCoordinator* replCoordinator =
+ mongo::repl::getGlobalReplicationCoordinator();
+ mongo::Status status =
+ replCoordinator->checkIfWriteConcernCanBeSatisfied(DefaultWriteConcern);
+
+ if (status.isOK()) {
+ return DefaultWriteConcern;
+ }
+
+ return WriteConcernOptions(1, WriteConcernOptions::NONE, 0);
+ }
+}
+
namespace mongo {
MONGO_LOG_DEFAULT_COMPONENT_FILE(::mongo::logger::LogComponent::kSharding);
@@ -746,6 +771,24 @@ namespace mongo {
* called to initial a move
* usually by a mongos
* this is called on the "from" side
+ *
+ * Format:
+ * {
+ * moveChunk: "namespace",
+ * from: "hostAndPort",
+ * fromShard: "shardName",
+ * to: "hostAndPort",
+ * toShard: "shardName",
+ * min: {},
+ * max: {},
+ * maxChunkBytes: numeric,
+ * shardId: "_id of chunk document in config.chunks",
+ * configdb: "hostAndPort",
+ *
+ * // optional
+ * secondaryThrottle: bool, //defaults to true.
+ * writeConcern: {} // applies to individual writes.
+ * }
*/
class MoveChunkCommand : public Command {
public:
@@ -802,30 +845,35 @@ namespace mongo {
to = cmdObj["toShard"].String();
}
- // if we do a w=2 after every write
- bool secondaryThrottle = cmdObj["secondaryThrottle"].trueValue();
- if ( secondaryThrottle ) {
- const repl::ReplicationCoordinator::Mode replMode =
- repl::getGlobalReplicationCoordinator()->getReplicationMode();
- if (replMode == repl::ReplicationCoordinator::modeReplSet) {
- if (repl::theReplSet->config().getMajority() <= 1) {
- secondaryThrottle = false;
- warning() << "not enough nodes in set to use secondaryThrottle: "
- << " majority: " << repl::theReplSet->config().getMajority()
- << endl;
- }
- }
- else if (replMode == repl::ReplicationCoordinator::modeNone) {
- secondaryThrottle = false;
- warning() << "secondaryThrottle selected but no replication" << endl;
+ // Process secondary throttle settings and assign defaults if necessary.
+ BSONObj secThrottleObj;
+ WriteConcernOptions writeConcern;
+ Status status = writeConcern.parseSecondaryThrottle(cmdObj, &secThrottleObj);
+
+ if (!status.isOK()){
+ if (status.code() != ErrorCodes::WriteConcernNotDefined) {
+ warning() << status.toString() << endl;
+ return appendCommandStatus(result, status);
}
- else {
- // master/slave
- secondaryThrottle = false;
- warning() << "secondaryThrottle not allowed with master/slave" << endl;
+
+ writeConcern = getDefaultWriteConcern();
+ }
+ else {
+ repl::ReplicationCoordinator* replCoordinator =
+ repl::getGlobalReplicationCoordinator();
+ Status status = replCoordinator->checkIfWriteConcernCanBeSatisfied(writeConcern);
+ if (!status.isOK()) {
+ warning() << status.toString() << endl;
+ return appendCommandStatus(result, status);
}
}
+ if (writeConcern.shouldWaitForOtherNodes() &&
+ writeConcern.wTimeout == WriteConcernOptions::kNoTimeout) {
+ // Don't allow no timeout.
+ writeConcern.wTimeout = kDefaultWTimeoutMs;
+ }
+
// Do inline deletion
bool waitForDelete = cmdObj["waitForDelete"].trueValue();
if (waitForDelete) {
@@ -1056,19 +1104,27 @@ namespace mongo {
ScopedDbConnection connTo(toShard.getConnString());
BSONObj res;
bool ok;
+
+ const bool isSecondaryThrottle(writeConcern.shouldWaitForOtherNodes());
+
+ BSONObjBuilder recvChunkStartBuilder;
+ recvChunkStartBuilder.append("_recvChunkStart", ns);
+ recvChunkStartBuilder.append("from", fromShard.getConnString());
+ recvChunkStartBuilder.append("fromShardName", fromShard.getName());
+ recvChunkStartBuilder.append("toShardName", toShard.getName());
+ recvChunkStartBuilder.append("min", min);
+ recvChunkStartBuilder.append("max", max);
+ recvChunkStartBuilder.append("shardKeyPattern", shardKeyPattern);
+ recvChunkStartBuilder.append("configServer", configServer.modelServer());
+ recvChunkStartBuilder.append("secondaryThrottle", isSecondaryThrottle);
+
+ // Follow the same convention in moveChunk.
+ if (isSecondaryThrottle && !secThrottleObj.isEmpty()) {
+ recvChunkStartBuilder.append("writeConcern", secThrottleObj);
+ }
+
try{
- ok = connTo->runCommand( "admin" ,
- BSON( "_recvChunkStart" << ns <<
- "from" << fromShard.getConnString() <<
- "fromShardName" << fromShard.getName() <<
- "toShardName" << toShard.getName() <<
- "min" << min <<
- "max" << max <<
- "shardKeyPattern" << shardKeyPattern <<
- "configServer" << configServer.modelServer() <<
- "secondaryThrottle" << secondaryThrottle
- ) ,
- res );
+ ok = connTo->runCommand("admin", recvChunkStartBuilder.done(), res);
}
catch( DBException& e ){
errmsg = str::stream() << "moveChunk could not contact to: shard "
@@ -1501,7 +1557,7 @@ namespace mongo {
min.getOwned(),
max.getOwned(),
shardKeyPattern.getOwned(),
- secondaryThrottle,
+ writeConcern,
&errMsg)) {
log() << "Error occured while performing cleanup: " << errMsg << endl;
}
@@ -1514,7 +1570,7 @@ namespace mongo {
min.getOwned(),
max.getOwned(),
shardKeyPattern.getOwned(),
- secondaryThrottle,
+ writeConcern,
NULL, // Don't want to be notified.
&errMsg)) {
log() << "could not queue migration cleanup: " << errMsg << endl;
@@ -1728,7 +1784,7 @@ namespace mongo {
long long num = Helpers::removeRange( txn,
range,
false, /*maxInclusive*/
- secondaryThrottle, /* secondaryThrottle */
+ writeConcern,
/*callback*/
serverGlobalParams.moveParanoia ? &rs : 0,
true ); /* flag fromMigrate in oplog */
@@ -1760,7 +1816,7 @@ namespace mongo {
State currentState = getState();
if (currentState == FAIL || currentState == ABORT) {
string errMsg;
- if (!getDeleter()->queueDelete(ns, min, max, shardKeyPattern, secondaryThrottle,
+ if (!getDeleter()->queueDelete(ns, min, max, shardKeyPattern, writeConcern,
NULL /* notifier */, &errMsg)) {
warning() << "Failed to queue delete for migrate abort: " << errMsg << endl;
}
@@ -1820,18 +1876,15 @@ namespace mongo {
numCloned++;
clonedBytes += o.objsize();
- if ( secondaryThrottle && thisTime > 0 ) {
- WriteConcernOptions writeConcern;
- writeConcern.wNumNodes = 2;
- writeConcern.wTimeout = 60 * 1000;
+ if (writeConcern.shouldWaitForOtherNodes() && thisTime > 0) {
repl::ReplicationCoordinator::StatusAndDuration replStatus =
repl::getGlobalReplicationCoordinator()->awaitReplication(
txn,
cc().getLastOp(),
writeConcern);
if (replStatus.status.code() == ErrorCodes::ExceededTimeLimit) {
- warning() << "secondaryThrottle on, but doc insert timed out "
- "after 60 seconds, continuing";
+ warning() << "secondaryThrottle on, but doc insert timed out; "
+ "continuing";
}
else {
massertStatusOK(replStatus.status);
@@ -2044,10 +2097,12 @@ namespace mongo {
// TODO: create a better interface to remove objects directly
KeyRange range( ns, id, id, idIndexPattern );
+ const WriteConcernOptions singleNodeWrite(1, WriteConcernOptions::NONE,
+ WriteConcernOptions::kNoTimeout);
Helpers::removeRange( txn,
range ,
true , /*maxInclusive*/
- false , /* secondaryThrottle */
+ singleNodeWrite,
serverGlobalParams.moveParanoia ? &rs : 0 , /*callback*/
true ); /*fromMigrate*/
@@ -2212,7 +2267,7 @@ namespace mongo {
long long clonedBytes;
long long numCatchup;
long long numSteady;
- bool secondaryThrottle;
+ WriteConcernOptions writeConcern;
int replSetMajorityCount;
@@ -2238,6 +2293,25 @@ namespace mongo {
cc().shutdown();
}
+ /**
+ * Command for initiating the recipient side of the migration to start copying data
+ * from the donor shard.
+ *
+ * {
+ * _recvChunkStart: "namespace",
+ * congfigServer: "hostAndPort",
+ * from: "hostAndPort",
+ * fromShardName: "shardName",
+ * toShardName: "shardName",
+ * min: {},
+ * max: {},
+ * shardKeyPattern: {},
+ *
+ * // optional
+ * secondaryThrottle: bool, // defaults to true
+ * writeConcern: {} // applies to individual writes.
+ * }
+ */
class RecvChunkStartCommand : public ChunkCommandHelper {
public:
void help(stringstream& h) const { h << "internal"; }
@@ -2304,7 +2378,37 @@ namespace mongo {
migrateStatus.min = min;
migrateStatus.max = max;
migrateStatus.epoch = currentVersion.epoch();
- migrateStatus.secondaryThrottle = cmdObj["secondaryThrottle"].trueValue();
+
+ // Process secondary throttle settings and assign defaults if necessary.
+ WriteConcernOptions writeConcern;
+ status = writeConcern.parseSecondaryThrottle(cmdObj, NULL);
+
+ if (!status.isOK()){
+ if (status.code() != ErrorCodes::WriteConcernNotDefined) {
+ warning() << status.toString() << endl;
+ return appendCommandStatus(result, status);
+ }
+
+ writeConcern = getDefaultWriteConcern();
+ }
+ else {
+ repl::ReplicationCoordinator* replCoordinator =
+ repl::getGlobalReplicationCoordinator();
+ Status status = replCoordinator->checkIfWriteConcernCanBeSatisfied(writeConcern);
+ if (!status.isOK()) {
+ warning() << status.toString() << endl;
+ return appendCommandStatus(result, status);
+ }
+ }
+
+ if (writeConcern.shouldWaitForOtherNodes() &&
+ writeConcern.wTimeout == WriteConcernOptions::kNoTimeout) {
+ // Don't allow no timeout.
+ writeConcern.wTimeout = kDefaultWTimeoutMs;
+ }
+
+ migrateStatus.writeConcern = writeConcern;
+
if (cmdObj.hasField("shardKeyPattern")) {
migrateStatus.shardKeyPattern = cmdObj["shardKeyPattern"].Obj().getOwned();
} else {
@@ -2323,12 +2427,6 @@ namespace mongo {
migrateStatus.shardKeyPattern = keya.getOwned();
}
- if (migrateStatus.secondaryThrottle &&
- !repl::getGlobalReplicationCoordinator()->isReplEnabled()) {
- warning() << "secondaryThrottle asked for, but no replication is enabled" << endl;
- migrateStatus.secondaryThrottle = false;
- }
-
// Set the TO-side migration to active
migrateStatus.prepare();
diff --git a/src/mongo/s/grid.cpp b/src/mongo/s/grid.cpp
index 49804a42359..93b3e0a3f77 100644
--- a/src/mongo/s/grid.cpp
+++ b/src/mongo/s/grid.cpp
@@ -498,48 +498,70 @@ namespace mongo {
* Returns whether balancing is enabled, with optional namespace "ns" parameter for balancing on a particular
* collection.
*/
- bool Grid::shouldBalance( const string& ns, BSONObj* balancerDocOut ) const {
+ bool Grid::shouldBalance(const SettingsType& balancerSettings) const {
// Allow disabling the balancer for testing
- if ( MONGO_FAIL_POINT(neverBalance) ) return false;
+ if (MONGO_FAIL_POINT(neverBalance)) return false;
- ScopedDbConnection conn(configServer.getPrimary().getConnString(), 30);
+ if (balancerSettings.isBalancerStoppedSet() && balancerSettings.getBalancerStopped()) {
+ return false;
+ }
+
+ if (balancerSettings.isBalancerActiveWindowSet()) {
+ boost::posix_time::ptime now = boost::posix_time::second_clock::local_time();
+ return _inBalancingWindow(balancerSettings.getBalancerActiveWindow(), now);
+ }
+
+ return true;
+ }
+
+ bool Grid::getBalancerSettings(SettingsType* settings, string* errMsg) const {
BSONObj balancerDoc;
- BSONObj collDoc;
+ ScopedDbConnection conn(configServer.getPrimary().getConnString(), 30);
try {
- // look for the stop balancer marker
- balancerDoc = conn->findOne( SettingsType::ConfigNS,
- BSON( SettingsType::key("balancer") ) );
- if( ns.size() > 0 ) collDoc = conn->findOne(CollectionType::ConfigNS,
- BSON( CollectionType::ns(ns)));
+ balancerDoc = conn->findOne(SettingsType::ConfigNS,
+ BSON(SettingsType::key("balancer")));
conn.done();
}
- catch( DBException& e ){
- conn.kill();
- warning() << "could not determine whether balancer should be running, error getting"
- "config data from " << conn.getHost() << causedBy( e ) << endl;
- // if anything goes wrong, we shouldn't try balancing
+ catch (const DBException& ex) {
+ *errMsg = str::stream() << "failed to read balancer settings from " << conn.getHost()
+ << ": " << causedBy(ex);
return false;
}
- if ( balancerDocOut )
- *balancerDocOut = balancerDoc;
+ return settings->parseBSON(balancerDoc, errMsg);
+ }
- boost::posix_time::ptime now = boost::posix_time::second_clock::local_time();
- if ( _balancerStopped( balancerDoc ) || ! _inBalancingWindow( balancerDoc , now ) ) {
+ bool Grid::getConfigShouldBalance() const {
+ SettingsType balSettings;
+ string errMsg;
+
+ if (!getBalancerSettings(&balSettings, &errMsg)) {
+ warning() << errMsg;
return false;
}
- if( collDoc["noBalance"].trueValue() ) return false;
- return true;
+ return shouldBalance(balSettings);
}
- bool Grid::_balancerStopped( const BSONObj& balancerDoc ) {
- // check the 'stopped' marker maker
- // if present, it is a simple bool
- BSONElement stoppedElem = balancerDoc[SettingsType::balancerStopped()];
- return stoppedElem.trueValue();
+ bool Grid::getCollShouldBalance(const std::string& ns) const {
+ BSONObj collDoc;
+ ScopedDbConnection conn(configServer.getPrimary().getConnString(), 30);
+
+ try {
+ collDoc = conn->findOne(CollectionType::ConfigNS, BSON(CollectionType::ns(ns)));
+ conn.done();
+ }
+ catch (const DBException& e){
+ conn.kill();
+ warning() << "could not determine whether balancer should be running, error getting"
+ << "config data from " << conn.getHost() << causedBy(e) << endl;
+ // if anything goes wrong, we shouldn't try balancing
+ return false;
+ }
+
+ return !collDoc[CollectionType::noBalance()].trueValue();
}
bool Grid::_inBalancingWindow( const BSONObj& balancerDoc , const boost::posix_time::ptime& now ) {
diff --git a/src/mongo/s/grid.h b/src/mongo/s/grid.h
index e68918abce8..7d4e2a575d5 100644
--- a/src/mongo/s/grid.h
+++ b/src/mongo/s/grid.h
@@ -35,7 +35,8 @@
#include "mongo/util/time_support.h"
#include "mongo/util/concurrency/mutex.h"
-#include "config.h" // DBConfigPtr
+#include "mongo/s/config.h" // DBConfigPtr
+#include "mongo/s/type_settings.h"
namespace mongo {
@@ -107,9 +108,24 @@ namespace mongo {
bool knowAboutShard( const std::string& name ) const;
/**
- * @return true if the chunk balancing functionality is enabled
+ * Returns true if the balancer should be running.
*/
- bool shouldBalance( const std::string& ns = "", BSONObj* balancerDocOut = 0 ) const;
+ bool shouldBalance(const SettingsType& balancerSettings) const;
+
+ /**
+ * Retrieve the balancer settings from the config server.
+ */
+ bool getBalancerSettings(SettingsType* settings, string* errMsg) const;
+
+ /**
+ * Returns true if the config server settings indicate that the balancer should be active.
+ */
+ bool getConfigShouldBalance() const;
+
+ /**
+ * Returns true if the given collection can be balanced.
+ */
+ bool getCollShouldBalance(const std::string& ns) const;
/**
*
@@ -149,14 +165,6 @@ namespace mongo {
* @return whether a give dbname is used for shard "local" databases (e.g., admin or local)
*/
static bool _isSpecialLocalDB( const std::string& dbName );
-
- /**
- * @param balancerDoc bson that may contain a marker to stop the balancer
- * format { ... , stopped: [ "true" | "false" ] , ... }
- * @return true if the marker is present and is set to true
- */
- static bool _balancerStopped( const BSONObj& balancerDoc );
-
};
extern Grid grid;
diff --git a/src/mongo/s/type_settings.cpp b/src/mongo/s/type_settings.cpp
index 08ada19d680..437c4b9f230 100644
--- a/src/mongo/s/type_settings.cpp
+++ b/src/mongo/s/type_settings.cpp
@@ -28,6 +28,7 @@
#include "mongo/s/type_settings.h"
#include "mongo/db/field_parser.h"
+#include "mongo/db/write_concern_options.h"
#include "mongo/util/mongoutils/str.h"
#include "mongo/util/time_support.h"
@@ -41,7 +42,9 @@ namespace mongo {
const BSONField<int> SettingsType::chunksize("value");
const BSONField<bool> SettingsType::balancerStopped("stopped");
const BSONField<BSONObj> SettingsType::balancerActiveWindow("activeWindow");
- const BSONField<bool> SettingsType::secondaryThrottle("_secondaryThrottle");
+ const BSONField<bool> SettingsType::deprecated_secondaryThrottle("_secondaryThrottle", true);
+ const BSONField<BSONObj> SettingsType::migrationWriteConcern("_secondaryThrottle");
+ const BSONField<bool> SettingsType::waitForDelete("_waitForDelete");
SettingsType::SettingsType() {
clear();
@@ -94,6 +97,12 @@ namespace mongo {
" format is { start: \"hh:mm\" , stop: \"hh:mm\" }";
return false;
}
+
+ if (_isSecondaryThrottleSet && _isMigrationWriteConcernSet) {
+ *errMsg = stream() << "cannot have both secondary throttle and migration "
+ << "write concern set at the same time";
+ return false;
+ }
}
return true;
}
@@ -112,8 +121,13 @@ namespace mongo {
if (_isBalancerActiveWindowSet) {
builder.append(balancerActiveWindow(), _balancerActiveWindow);
}
- if (_isSecondaryThrottleSet) builder.append(secondaryThrottle(), _secondaryThrottle);
+ if (_isSecondaryThrottleSet) {
+ builder.append(deprecated_secondaryThrottle(), _secondaryThrottle);
+ }
+ if (_isMigrationWriteConcernSet) {
+ builder.append(migrationWriteConcern(), _migrationWriteConcern);
+ }
return builder.obj();
}
@@ -141,9 +155,23 @@ namespace mongo {
if (fieldState == FieldParser::FIELD_INVALID) return false;
_isBalancerActiveWindowSet = fieldState == FieldParser::FIELD_SET;
- fieldState = FieldParser::extract(source, secondaryThrottle, &_secondaryThrottle, errMsg);
+ fieldState = FieldParser::extract(source,
+ migrationWriteConcern,
+ &_migrationWriteConcern,
+ errMsg);
+ _isMigrationWriteConcernSet = fieldState == FieldParser::FIELD_SET;
+
+ if (fieldState == FieldParser::FIELD_INVALID) {
+ fieldState = FieldParser::extract(source, deprecated_secondaryThrottle,
+ &_secondaryThrottle, errMsg);
+ if (fieldState == FieldParser::FIELD_INVALID) return false;
+ errMsg->clear(); // Note: extract method doesn't clear errMsg.
+ _isSecondaryThrottleSet = fieldState == FieldParser::FIELD_SET;
+ }
+
+ fieldState = FieldParser::extract(source, waitForDelete, &_waitForDelete, errMsg);
if (fieldState == FieldParser::FIELD_INVALID) return false;
- _isSecondaryThrottleSet = fieldState == FieldParser::FIELD_SET;
+ _isWaitForDeleteSet = fieldState == FieldParser::FIELD_SET;
return true;
}
@@ -162,12 +190,14 @@ namespace mongo {
_balancerActiveWindow = BSONObj();
_isBalancerActiveWindowSet = false;
- _shortBalancerSleep = false;
- _isShortBalancerSleepSet = false;
-
_secondaryThrottle = false;
_isSecondaryThrottleSet = false;
+ _migrationWriteConcern = BSONObj();
+ _isMigrationWriteConcernSet= false;
+
+ _waitForDelete = false;
+ _isWaitForDeleteSet = false;
}
void SettingsType::cloneTo(SettingsType* other) const {
@@ -182,19 +212,49 @@ namespace mongo {
other->_balancerStopped = _balancerStopped;
other->_isBalancerStoppedSet = _isBalancerStoppedSet;
- other->_balancerActiveWindow = _balancerActiveWindow;
+ other->_balancerActiveWindow = _balancerActiveWindow.copy();
other->_isBalancerActiveWindowSet = _isBalancerActiveWindowSet;
- other->_shortBalancerSleep = _shortBalancerSleep;
- other->_isShortBalancerSleepSet = _isShortBalancerSleepSet;
-
other->_secondaryThrottle = _secondaryThrottle;
other->_isSecondaryThrottleSet = _isSecondaryThrottleSet;
+ other->_migrationWriteConcern = _migrationWriteConcern.copy();
+ other->_isMigrationWriteConcernSet = _isMigrationWriteConcernSet;
+
+ other->_waitForDelete = _waitForDelete;
+ other->_isWaitForDeleteSet = _isWaitForDeleteSet;
}
std::string SettingsType::toString() const {
return toBSON().toString();
}
+ StatusWith<WriteConcernOptions*> SettingsType::extractWriteConcern() const {
+ dassert(_isKeySet);
+ dassert(_key == "balancer");
+
+ const bool isSecondaryThrottle = getSecondaryThrottle();
+ if (!isSecondaryThrottle) {
+ return(StatusWith<WriteConcernOptions*>(
+ new WriteConcernOptions(1, WriteConcernOptions::NONE, 0)));
+ }
+
+ const BSONObj migrationWOption(isMigrationWriteConcernSet() ?
+ getMigrationWriteConcern() : BSONObj());
+
+ if (migrationWOption.isEmpty()) {
+ // Default setting.
+ return StatusWith<WriteConcernOptions*>(NULL);
+ }
+
+ auto_ptr<WriteConcernOptions> writeConcern(new WriteConcernOptions());
+ Status status = writeConcern->parse(migrationWOption);
+
+ if (!status.isOK()) {
+ return StatusWith<WriteConcernOptions*>(status);
+ }
+
+ return StatusWith<WriteConcernOptions*>(writeConcern.release());
+ }
+
} // namespace mongo
diff --git a/src/mongo/s/type_settings.h b/src/mongo/s/type_settings.h
index fa8a76e0480..f3d2b4c4739 100644
--- a/src/mongo/s/type_settings.h
+++ b/src/mongo/s/type_settings.h
@@ -31,11 +31,14 @@
#include <string>
#include "mongo/base/disallow_copying.h"
+#include "mongo/base/status_with.h"
#include "mongo/base/string_data.h"
#include "mongo/db/jsobj.h"
namespace mongo {
+ struct WriteConcernOptions;
+
/**
* This class represents the layout and contents of documents contained in the
* config.settings collection. All manipulation of documents coming from that
@@ -73,7 +76,9 @@ namespace mongo {
static const BSONField<int> chunksize;
static const BSONField<bool> balancerStopped;
static const BSONField<BSONObj> balancerActiveWindow;
- static const BSONField<bool> secondaryThrottle;
+ static const BSONField<bool> deprecated_secondaryThrottle;
+ static const BSONField<BSONObj> migrationWriteConcern;
+ static const BSONField<bool> waitForDelete;
//
// settings type methods
@@ -207,7 +212,7 @@ namespace mongo {
void unsetSecondaryThrottle() { _isSecondaryThrottleSet = false; }
bool isSecondaryThrottleSet() const {
- return _isSecondaryThrottleSet || secondaryThrottle.hasDefault();
+ return _isSecondaryThrottleSet || deprecated_secondaryThrottle.hasDefault();
}
// Calling get*() methods when the member is not set and has no default results in undefined
@@ -217,10 +222,59 @@ namespace mongo {
return _secondaryThrottle;
} else {
dassert(secondaryThrottle.hasDefault());
- return secondaryThrottle.getDefault();
+ return deprecated_secondaryThrottle.getDefault();
}
}
+ void setMigrationWriteConcern(const BSONObj& writeConcern) {
+ _migrationWriteConcern = writeConcern;
+ _isMigrationWriteConcernSet = true;
+ }
+
+ void unsetMigrationWriteConcern() {
+ _isMigrationWriteConcernSet = false;
+ }
+
+ bool isMigrationWriteConcernSet() const {
+ return _isMigrationWriteConcernSet;
+ }
+
+ // Calling get*() methods when the member is not set and has no default results in undefined
+ // behavior
+ BSONObj getMigrationWriteConcern() const {
+ dassert (_isBalancerWriteConcernSet);
+ return _migrationWriteConcern;
+ }
+
+ void setWaitForDelete(bool waitForDelete) {
+ _waitForDelete = waitForDelete;
+ _isWaitForDeleteSet = true;
+ }
+
+ void unsetWaitForDelete() {
+ _isWaitForDeleteSet = false;
+ }
+
+ bool isWaitForDeleteSet() const {
+ return _isWaitForDeleteSet;
+ }
+
+ // Calling get*() methods when the member is not set and has no default results in undefined
+ // behavior
+ bool getWaitForDelete() const {
+ dassert (_isWaitForDeleteSet);
+ return _waitForDelete;
+ }
+
+ // Helper methods
+
+ /**
+ * Extract the write concern settings from this settings. This is only valid when
+ * key is "balancer". Returns NULL if secondary throttle is true but write
+ * concern is not specified.
+ */
+ StatusWith<WriteConcernOptions*> extractWriteConcern() const;
+
private:
// Convention: (M)andatory, (O)ptional, (S)pecial rule.
std::string _key; // (M) key determining the type of options to use
@@ -238,11 +292,19 @@ namespace mongo {
// Format: { start: "08:00" , stop:
// "19:30" }, strftime format is %H:%M
- bool _shortBalancerSleep; // (O) controls how long the balancer sleeps
- bool _isShortBalancerSleepSet; // in some situations
bool _secondaryThrottle; // (O) only migrate chunks as fast as at least
bool _isSecondaryThrottleSet; // one secondary can keep up with
+
+ // (O) detailed write concern for *individual* writes during migration.
+ // From side: deletes during cleanup.
+ // To side: deletes to clear the incoming range, deletes to undo migration at abort,
+ // and writes during cloning.
+ BSONObj _migrationWriteConcern;
+ bool _isMigrationWriteConcernSet;
+
+ bool _waitForDelete; // (O) synchronous migration cleanup.
+ bool _isWaitForDeleteSet;
};
} // namespace mongo
diff --git a/src/mongo/s/type_settings_test.cpp b/src/mongo/s/type_settings_test.cpp
index 9b605464794..651c53d056c 100644
--- a/src/mongo/s/type_settings_test.cpp
+++ b/src/mongo/s/type_settings_test.cpp
@@ -91,10 +91,10 @@ namespace {
ASSERT_EQUALS(settings.getChunksize(), 1);
BSONObj objBalancer = BSON(SettingsType::key("balancer") <<
- SettingsType::balancerStopped(true) <<
- SettingsType::balancerActiveWindow(BSON("start" << "23:00" <<
- "stop" << "6:00" )) <<
- SettingsType::secondaryThrottle(true));
+ SettingsType::balancerStopped(true) <<
+ SettingsType::balancerActiveWindow(BSON("start" << "23:00" <<
+ "stop" << "6:00" )) <<
+ SettingsType::migrationWriteConcern(BSON("w" << 2)));
ASSERT(settings.parseBSON(objBalancer, &errMsg));
ASSERT_EQUALS(errMsg, "");
ASSERT_TRUE(settings.isValid(NULL));
@@ -102,7 +102,28 @@ namespace {
ASSERT_EQUALS(settings.getBalancerStopped(), true);
ASSERT_EQUALS(settings.getBalancerActiveWindow(), BSON("start" << "23:00" <<
"stop" << "6:00" ));
- ASSERT_EQUALS(settings.getSecondaryThrottle(), true);
+ ASSERT(settings.getSecondaryThrottle());
+ ASSERT_EQUALS(0, settings.getMigrationWriteConcern().woCompare(BSON("w" << 2)));
+ }
+
+ TEST(Validity, ValidWithDeprecatedThrottle) {
+ SettingsType settings;
+ BSONObj objChunksize = BSON(SettingsType::key("chunksize") <<
+ SettingsType::chunksize(1));
+ string errMsg;
+ ASSERT(settings.parseBSON(objChunksize, &errMsg));
+ ASSERT_EQUALS(errMsg, "");
+ ASSERT_TRUE(settings.isValid(NULL));
+ ASSERT_EQUALS(settings.getKey(), "chunksize");
+ ASSERT_EQUALS(settings.getChunksize(), 1);
+
+ BSONObj objBalancer = BSON(SettingsType::key("balancer") <<
+ SettingsType::deprecated_secondaryThrottle(true));
+ ASSERT(settings.parseBSON(objBalancer, &errMsg));
+ ASSERT_EQUALS(errMsg, "");
+ ASSERT_TRUE(settings.isValid(NULL));
+ ASSERT_EQUALS(settings.getKey(), "balancer");
+ ASSERT(settings.getSecondaryThrottle());
}
TEST(Validity, BadType) {