summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGreg Studer <greg@10gen.com>2014-01-14 16:35:28 -0500
committerGreg Studer <greg@10gen.com>2014-01-16 11:39:53 -0500
commit6be5ff60cf3233dff7fff542ded1a06b541076b1 (patch)
treeac81d4ebedc235266c999b186b107fd3981efe32
parent1721db7bddcec2c61c121de961a899622f47c6ec (diff)
downloadmongo-6be5ff60cf3233dff7fff542ded1a06b541076b1.tar.gz
SERVER-12274 mongos GLE changes for better backwards compatibility
-rw-r--r--jstests/sharding/gle_error_message.js159
-rw-r--r--jstests/sharding/gle_sharded_wc.js154
-rw-r--r--src/mongo/client/dbclientinterface.h12
-rw-r--r--src/mongo/s/client_info.cpp64
-rw-r--r--src/mongo/s/client_info.h29
-rw-r--r--src/mongo/s/commands_admin.cpp128
-rw-r--r--src/mongo/s/mock_multi_write_command.h (renamed from src/mongo/s/mock_multi_command.h)2
-rw-r--r--src/mongo/s/write_ops/batch_downconvert.cpp212
-rw-r--r--src/mongo/s/write_ops/batch_downconvert.h32
-rw-r--r--src/mongo/s/write_ops/batch_downconvert_test.cpp206
-rw-r--r--src/mongo/s/write_ops/batch_write_exec_test.cpp8
-rw-r--r--src/mongo/s/write_ops/config_coordinator_test.cpp4
-rw-r--r--src/mongo/s/write_ops/dbclient_safe_writer.cpp6
-rw-r--r--src/mongo/s/write_ops/dbclient_safe_writer.h2
14 files changed, 747 insertions, 271 deletions
diff --git a/jstests/sharding/gle_error_message.js b/jstests/sharding/gle_error_message.js
deleted file mode 100644
index d02a9778db3..00000000000
--- a/jstests/sharding/gle_error_message.js
+++ /dev/null
@@ -1,159 +0,0 @@
-//
-// Tests whether sharded GLE fails sanely and correctly reports failures.
-//
-
-function waitForWrite(shardIndex, query, count) {
- var searchText = tojson(query) + " on shard " + shardIndex;
- jsTest.log( "Waiting for " + count + " document(s) with " + searchText );
- var shardDB = connect(shards[shardIndex].host + "/" + jsTestName());
- assert.soon( function() { return shardDB.coll.find( query ).count() == count; },
- "Failed to find document with " + searchText,
- /* timeout */ 10 * 1000,
- /*interval*/ 10 );
-}
-
-jsTest.log( "Starting sharded cluster..." )
-
-var st = new ShardingTest({ shards : 3,
- mongos : 2,
- verbose : 3,
- other : { separateConfig : true } })
-
-st.stopBalancer()
-
-var mongos = st.s0
-var admin = mongos.getDB( "admin" )
-var config = mongos.getDB( "config" )
-var coll = mongos.getCollection( jsTestName() + ".coll" )
-var shards = config.shards.find().toArray()
-
-jsTest.log( "Enabling sharding..." )
-
-printjson( admin.runCommand({ enableSharding : "" + coll.getDB() }) )
-printjson( admin.runCommand({ movePrimary : "" + coll.getDB(), to : shards[0]._id }) )
-printjson( admin.runCommand({ shardCollection : "" + coll, key : { _id : 1 } }) )
-printjson( admin.runCommand({ split : "" + coll, middle : { _id : 0 } }) )
-printjson( admin.runCommand({ moveChunk : "" + coll, find : { _id : 0 }, to : shards[1]._id }) )
-
-st.printShardingStatus()
-
-
-
-jsTest.log( "Testing GLE...")
-
-// insert to two diff shards
-coll.insert({ _id : -1, hello : "world" })
-coll.insert({ _id : 1, hello : "world" })
-
-waitForWrite(0, {_id: -1}, 1);
-waitForWrite(1, {_id: 1}, 1);
-
-jsTest.log( "GLE : " + tojson( coll.getDB().getLastErrorObj() ) )
-
-
-
-jsTest.log( "Testing GLE when writeback host goes down..." )
-
-// insert to two diff shards
-coll.insert({ _id : -2, hello : "world" })
-coll.insert({ _id : 2, hello : "world" })
-
-waitForWrite(0, {_id: -2}, 1);
-waitForWrite(1, {_id: 2}, 1);
-
-MongoRunner.stopMongod( st.shard0 )
-
-jsTest.log( "GLE : " + tojson( coll.getDB().getLastErrorObj() ) )
-
-st.shard0 = MongoRunner.runMongod( st.shard0 )
-
-// NEED TO WAIT 5s so the connection pool knows we're down and up
-sleep( 5 * 1000 );
-
-jsTest.log( "Testing GLE when main host goes down..." )
-
-// insert to two diff shards
-coll.insert({ _id : -3, hello : "world" })
-coll.insert({ _id : 3, hello : "world" })
-
-waitForWrite(0, {_id: -3}, 1);
-waitForWrite(1, {_id: 3}, 1);
-
-MongoRunner.stopMongod( st.shard1 )
-
-try{
- jsTest.log( "Calling GLE! " )
- coll.getDB().getLastErrorObj()
- assert( false )
-}
-catch( e ){
- jsTest.log( "GLE : " + e )
-
- // Stupid string exceptions
- assert( /could not get last error|socket exception/.test( e + "") )
-}
-
-st.shard1 = MongoRunner.runMongod( st.shard1 )
-
-// NEED TO WAIT 5s so the connection pool knows we're down and up
-sleep( 5 * 1000 );
-
-jsTest.log( "Testing multi GLE for multi-host writes..." )
-
-coll.update({ hello : "world" }, { $set : { goodbye : "world" } }, false, true)
-
-waitForWrite(0, {goodbye: "world"}, 3);
-waitForWrite(1, {goodbye: "world"}, 3);
-
-jsTest.log( "GLE : " + tojson( coll.getDB().getLastErrorObj() ) )
-
-jsTest.log( "Testing multi GLE when host goes down..." )
-
-// insert to two diff shards
-coll.update({ hello : "world" }, { $set : { goodnight : "moon" } }, false, true)
-
-waitForWrite(0, {goodnight: "moon"}, 3);
-waitForWrite(1, {goodnight: "moon"}, 3);
-
-MongoRunner.stopMongod( st.shard0 )
-
-try{
- jsTest.log( "Calling GLE! " )
- coll.getDB().getLastErrorObj()
- assert( false )
-}
-catch( e ){
- jsTest.log( "GLE : " + e )
-
- // Stupid string exceptions
- assert( /could not get last error|socket exception/.test( e + "") )
-}
-
-st.shard0 = MongoRunner.runMongod( st.shard0 )
-
-// NEED TO WAIT 5s so the connection pool knows we're down
-sleep( 5 * 1000 );
-
-jsTest.log( "Testing stale version GLE when host goes down..." )
-
-var staleColl = st.s1.getCollection( coll + "" )
-staleColl.findOne()
-
-printjson( admin.runCommand({ connPoolStats : true }) );
-//printjson( admin.runCommand({ connPoolSync : true }) );
-assert( admin.runCommand({ moveChunk : "" + coll, find : { _id : 0 }, to : shards[2]._id }).ok );
-
-waitForWrite(2, {goodnight: "moon"}, 3);
-
-MongoRunner.stopMongod( st.shard2 )
-
-jsTest.log( "Sending stale write..." )
-
-staleColl.insert({ _id : 4, hello : "world" })
-
-assert.neq( null, staleColl.getDB().getLastError() )
-
-
-jsTest.log( "Done!" )
-
-st.stop()
diff --git a/jstests/sharding/gle_sharded_wc.js b/jstests/sharding/gle_sharded_wc.js
new file mode 100644
index 00000000000..6c042a7e630
--- /dev/null
+++ b/jstests/sharding/gle_sharded_wc.js
@@ -0,0 +1,154 @@
+//
+// Tests whether sharded GLE fails sanely and correctly reports failures.
+//
+
+// Options for a cluster with two replica set shards, the first with two nodes the second with one
+// This lets us try a number of GLE scenarios
+var options = { separateConfig : true,
+ rs : true,
+ rsOptions : { nojournal : "" },
+ // Options for each replica set shard
+ rs0 : { nodes : 2 },
+ rs1 : { nodes : 1 } };
+
+var st = new ShardingTest({ shards : 2, mongos : 1, other : options });
+st.stopBalancer();
+
+var mongos = st.s0;
+var admin = mongos.getDB( "admin" );
+var config = mongos.getDB( "config" );
+var coll = mongos.getCollection( jsTestName() + ".coll" );
+var shards = config.shards.find().toArray();
+
+assert.commandWorked( admin.runCommand({ enableSharding : coll.getDB().toString() }) );
+printjson( admin.runCommand({ movePrimary : coll.getDB().toString(), to : shards[0]._id }) );
+assert.commandWorked( admin.runCommand({ shardCollection : coll.toString(), key : { _id : 1 } }) );
+assert.commandWorked( admin.runCommand({ split : coll.toString(), middle : { _id : 0 } }) );
+assert.commandWorked( admin.runCommand({ moveChunk : coll.toString(),
+ find : { _id : 0 },
+ to : shards[1]._id }) );
+
+st.printShardingStatus();
+
+// Don't use write commands
+coll.getMongo().useWriteCommands = function(){ return false; };
+
+var gle = null;
+
+//
+// Successful insert
+coll.remove({});
+coll.insert({ _id : -1 });
+printjson(gle = coll.getDB().runCommand({ getLastError : 1 }));
+assert(gle.ok);
+assert(!gle.err);
+assert.eq(coll.count(), 1);
+
+//
+// Successful upserts
+coll.remove({});
+coll.update({ _id : -1 }, { _id : -1 }, true);
+coll.update({ _id : 1 }, { _id : 1 }, true);
+printjson(gle = coll.getDB().runCommand({ getLastError : 1 }));
+assert(gle.ok);
+assert(!gle.err);
+assert.eq(gle.n, 1);
+assert.eq(gle.upserted, 1);
+assert.eq(coll.count(), 2);
+
+//
+// No journal insert, GLE fails
+coll.remove({});
+coll.insert({ _id : -1 });
+printjson(gle = coll.getDB().runCommand({ getLastError : 1, j : true }));
+assert(!gle.ok);
+assert(gle.errmsg);
+
+//
+// Successful insert, write concern mode invalid
+coll.remove({});
+coll.insert({ _id : -1 });
+printjson(gle = coll.getDB().runCommand({ getLastError : 1, w : 'invalid' }));
+assert(!gle.ok);
+assert(!gle.err);
+assert(gle.errmsg);
+assert.eq(gle.code, 79); // UnknownReplWriteConcern - needed for backwards compatibility
+assert.eq(coll.count(), 1);
+
+//
+// Error on insert (dup key), write concern error not reported
+coll.remove({});
+coll.insert({ _id : -1 });
+coll.insert({ _id : -1 });
+printjson(gle = coll.getDB().runCommand({ getLastError : 1, w : 'invalid' }));
+assert(gle.ok);
+assert(gle.err);
+assert(gle.code);
+assert(!gle.errmsg);
+assert.eq(coll.count(), 1);
+
+//
+// Error on two-hosts during remove
+coll.remove({});
+coll.remove({ $invalid : 'remove' });
+printjson(gle = coll.getDB().runCommand({ getLastError : 1 }));
+assert(gle.ok);
+assert(gle.err);
+assert(gle.code);
+assert(!gle.errmsg);
+assert(gle.shards);
+assert.eq(coll.count(), 0);
+
+//
+// Successful remove on two hosts, write concern timeout on one
+coll.remove({});
+st.rs0.awaitReplication(); // To ensure the first shard won't timeout
+printjson(gle = coll.getDB().runCommand({ getLastError : 1, w : 2, wtimeout : 5 * 1000 }));
+assert(gle.ok);
+assert.eq(gle.err, 'timeout');
+assert(gle.wtimeout);
+assert(gle.shards);
+assert.eq(coll.count(), 0);
+
+//
+// Successful remove on two hosts, write concern timeout on both
+// We don't aggregate two timeouts together
+coll.remove({});
+st.rs0.awaitReplication(); // To ensure the first shard won't timeout
+printjson(gle = coll.getDB().runCommand({ getLastError : 1, w : 3, wtimeout : 5 * 1000 }));
+assert(!gle.ok);
+assert(gle.errmsg);
+assert.eq(gle.code, 64); // WriteConcernFailed - needed for backwards compatibility
+assert(!gle.wtimeout);
+assert(gle.shards);
+assert(gle.errs);
+assert.eq(coll.count(), 0);
+
+//
+// First replica set DOWN
+//
+
+//
+// Successful bulk insert on two hosts, host changes before gle (error contacting host)
+coll.remove({});
+coll.insert([{ _id : 1 }, { _id : -1 }]);
+st.rs0.stop(st.rs0.getPrimary(), true); // wait for stop
+printjson(gle = coll.getDB().runCommand({ getLastError : 1 }));
+assert(!gle.ok);
+assert(gle.errmsg);
+assert.eq(coll.count({ _id : 1 }), 1);
+
+//
+// Failed insert on two hosts, first host down
+// NOTE: This is DIFFERENT from 2.4, since we don't need to contact a host we didn't get
+// successful writes from.
+coll.remove({ _id : 1 });
+coll.insert([{ _id : 1 }, { _id : -1 }]);
+printjson(gle = coll.getDB().runCommand({ getLastError : 1 }));
+assert(gle.ok);
+assert(gle.err);
+assert.eq(coll.count({ _id : 1 }), 1);
+
+jsTest.log( "DONE!" );
+
+st.stop();
diff --git a/src/mongo/client/dbclientinterface.h b/src/mongo/client/dbclientinterface.h
index eba2d0bdb43..417b365f1e6 100644
--- a/src/mongo/client/dbclientinterface.h
+++ b/src/mongo/client/dbclientinterface.h
@@ -294,6 +294,18 @@ namespace mongo {
return _connectHook;
}
+ //
+ // FOR TESTING ONLY - useful to be able to directly mock a connection string without
+ // including the entire client library.
+ //
+
+ static ConnectionString mock( const HostAndPort& server ) {
+ ConnectionString connStr;
+ connStr._servers.push_back( server );
+ connStr._string = server.toString( true );
+ return connStr;
+ }
+
private:
void _fillServers( string s );
diff --git a/src/mongo/s/client_info.cpp b/src/mongo/s/client_info.cpp
index 49225ba7a6a..9cf34c31005 100644
--- a/src/mongo/s/client_info.cpp
+++ b/src/mongo/s/client_info.cpp
@@ -39,6 +39,7 @@
#include "mongo/db/dbmessage.h"
#include "mongo/db/stats/counters.h"
#include "mongo/db/stats/timer_stats.h"
+#include "mongo/s/write_ops/batch_downconvert.h"
#include "mongo/s/client_info.h"
#include "mongo/s/config.h"
#include "mongo/s/chunk.h"
@@ -150,69 +151,6 @@ namespace mongo {
static TimerStats gleWtimeStats;
static ServerStatusMetricField<TimerStats> displayGleLatency( "getLastError.wtime", &gleWtimeStats );
- static BSONObj addOpTimeTo( const BSONObj& options, const OpTime& opTime ) {
- BSONObjBuilder builder;
- builder.appendElements( options );
- builder.appendTimestamp( "wOpTime", opTime.asDate() );
- return builder.obj();
- }
-
- // TODO: Break out of ClientInfo when we have a better place for this
- bool ClientInfo::enforceWriteConcern( const string& dbName,
- const BSONObj& options,
- string* errMsg ) {
-
- const map<string, OpTime>& hostOpTimes = getPrevHostOpTimes();
-
- if ( hostOpTimes.empty() ) {
- return true;
- }
-
- for ( map<string, OpTime>::const_iterator it = hostOpTimes.begin(); it != hostOpTimes.end();
- ++it ) {
-
- const string& shardHost = it->first;
- const OpTime& opTime = it->second;
-
- LOG(5) << "enforcing write concern " << options << " on " << shardHost << endl;
-
- BSONObj optionsWithOpTime = addOpTimeTo( options, opTime );
-
- bool ok = false;
-
- boost::scoped_ptr<ScopedDbConnection> connPtr;
- try {
- connPtr.reset( new ScopedDbConnection( shardHost ) );
- ScopedDbConnection& conn = *connPtr;
-
- BSONObj result;
- ok = conn->runCommand( dbName , optionsWithOpTime , result );
- if ( !ok )
- *errMsg = result.toString();
-
- conn.done();
- }
- catch( const DBException& ex ){
- *errMsg = ex.toString();
-
- if ( connPtr )
- connPtr->done();
- }
-
- // Done if anyone fails
- if ( !ok ) {
-
- *errMsg = str::stream() << "could not enforce write concern on " << shardHost
- << causedBy( errMsg );
-
- warning() << *errMsg << endl;
- return false;
- }
- }
-
- return true;
- }
-
boost::thread_specific_ptr<ClientInfo> ClientInfo::_tlInfo;
} // namespace mongo
diff --git a/src/mongo/s/client_info.h b/src/mongo/s/client_info.h
index a6f9e62213b..cd845f2dcf5 100644
--- a/src/mongo/s/client_info.h
+++ b/src/mongo/s/client_info.h
@@ -31,6 +31,10 @@
#include "mongo/pch.h"
+#include <map>
+#include <set>
+#include <vector>
+
#include "mongo/db/client_basic.h"
#include "mongo/s/chunk.h"
#include "mongo/s/writeback_listener.h"
@@ -109,25 +113,6 @@ namespace mongo {
void disableForCommand();
- /**
- * Uses GLE and the shard hosts and opTimes last written by write commands to enforce a
- * write concern.
- *
- * Returns true if write concern was enforced, false with errMsg if not.
- */
- bool enforceWriteConcern( const string& dbName, const BSONObj& options, string* errMsg );
-
- /**
- * calls getLastError
- * resets shards since get last error
- * @return if the command was ok or if there was an error
- */
- bool getLastError( const string& dbName,
- const BSONObj& options ,
- BSONObjBuilder& result ,
- string& errmsg,
- bool fromWriteBackListener = false );
-
/** @return if its ok to auto split from this client */
bool autoSplitOk() const { return _autoSplitOk && Chunk::ShouldAutoSplit; }
@@ -154,8 +139,8 @@ namespace mongo {
hostOpTimes.clear();
}
- set<string> shardHostsWritten;
- map<string, OpTime> hostOpTimes;
+ std::set<string> shardHostsWritten;
+ std::map<string, OpTime> hostOpTimes;
};
// we use _a and _b to store info from the current request and the previous request
@@ -168,7 +153,7 @@ namespace mongo {
RequestInfo* _prev; // ""
- set<string> _sinceLastGetError; // all shards accessed since last getLastError
+ std::set<string> _sinceLastGetError; // all shards accessed since last getLastError
int _lastAccess;
bool _autoSplitOk;
diff --git a/src/mongo/s/commands_admin.cpp b/src/mongo/s/commands_admin.cpp
index 748bac8fa13..2a15f9a1b59 100644
--- a/src/mongo/s/commands_admin.cpp
+++ b/src/mongo/s/commands_admin.cpp
@@ -51,12 +51,15 @@
#include "mongo/s/client_info.h"
#include "mongo/s/cluster_write.h"
#include "mongo/s/config.h"
+#include "mongo/s/dbclient_multi_command.h"
#include "mongo/s/grid.h"
#include "mongo/s/strategy.h"
#include "mongo/s/type_chunk.h"
#include "mongo/s/type_database.h"
#include "mongo/s/type_shard.h"
#include "mongo/s/writeback_listener.h"
+#include "mongo/s/write_ops/batch_downconvert.h"
+#include "mongo/s/write_ops/batch_write_exec.h"
#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/util/net/listen.h"
#include "mongo/util/net/message.h"
@@ -1521,27 +1524,132 @@ namespace mongo {
std::vector<Privilege>* out) {} // No auth required
CmdShardingGetLastError() : Command("getLastError" , false , "getlasterror") { }
- virtual bool run(const string& dbName, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) {
+ virtual bool run( const string& dbName,
+ BSONObj& cmdObj,
+ int,
+ string& errmsg,
+ BSONObjBuilder& result,
+ bool ) {
+
+ //
+ // Mongos GLE - finicky.
+ //
+ // To emulate mongod, we first append any write errors we had, then try to append
+ // write concern error if there was no write error. We need to contact the previous
+ // shards regardless to maintain 2.4 behavior.
+ //
+ // If there are any unexpected or connectivity errors when calling GLE, fail the
+ // command.
+ //
+ // Finally, report the write concern errors IF we don't already have an error.
+ // If we only get one write concern error back, report that, otherwise report an
+ // aggregated error.
+ //
+ // TODO: Do we need to contact the prev shards regardless - do we care that much
+ // about 2.4 behavior?
+ //
+
LastError *le = lastError.disableForCommand();
verify( le );
-
// Write commands always have the error stored in the mongos last error
+ bool errorOccurred = false;
if ( le->nPrev == 1 ) {
- le->appendSelf( result );
- }
- else {
- result.appendNull( "err" );
+ errorOccurred = le->appendSelf( result, false );
}
- bool wcResult = ClientInfo::get()->enforceWriteConcern( dbName,
- cmdObj,
- &errmsg );
+ // For compatibility with 2.4 sharded GLE, we always enforce the write concern
+ // across all shards.
+
+ DBClientMultiCommand dispatcher;
+ vector<LegacyWCResponse> wcResponses;
+ Status status = enforceLegacyWriteConcern( &dispatcher,
+ dbName,
+ cmdObj,
+ convertMap( ClientInfo::get()
+ ->getPrevHostOpTimes() ),
+ &wcResponses );
// Don't forget about our last hosts, reset the client info
ClientInfo::get()->disableForCommand();
- return wcResult;
+
+ // We're now done contacting all remote servers, just report results
+
+ if ( !status.isOK() ) {
+ // Return immediately if we failed to contact a shard, unexpected GLE issue
+ // Can't return code, since it may have been set above (2.4 compatibility)
+ result.append( "errmsg", status.reason() );
+ return false;
+ }
+
+ // Go through all the write concern responses and find errors
+ BSONArrayBuilder shards;
+ BSONObjBuilder shardRawGLE;
+ BSONArrayBuilder errors;
+ BSONArrayBuilder errorRawGLE;
+
+ int numWCErrors = 0;
+ const LegacyWCResponse* lastErrResponse = NULL;
+
+ for ( vector<LegacyWCResponse>::const_iterator it = wcResponses.begin();
+ it != wcResponses.end(); ++it ) {
+
+ const LegacyWCResponse& wcResponse = *it;
+
+ shards.append( wcResponse.shardHost );
+ shardRawGLE.append( wcResponse.shardHost, wcResponse.gleResponse );
+
+ if ( !wcResponse.errToReport.empty() ) {
+ numWCErrors++;
+ lastErrResponse = &wcResponse;
+ errors.append( wcResponse.errToReport );
+ errorRawGLE.append( wcResponse.gleResponse );
+ }
+ }
+
+ // Always report what we found to match 2.4 behavior and for debugging
+ if ( wcResponses.size() == 1u ) {
+ result.append( "singleShard", wcResponses.front().shardHost );
+ }
+ else {
+ result.append( "shards", shards.arr() );
+ result.append( "shardRawGLE", shardRawGLE.obj() );
+ }
+
+ // Suppress write concern errors if a write error occurred, to match mongod behavior
+ if ( errorOccurred || numWCErrors == 0 )
+ return true;
+
+ if ( numWCErrors == 1 ) {
+
+ // Return the single write concern error we found
+ result.appendElements( lastErrResponse->gleResponse );
+ return lastErrResponse->gleResponse["ok"].trueValue();
+ }
+ else {
+
+ // Return a generic combined WC error message
+ result.append( "errs", errors.arr() );
+ result.append( "errObjects", errorRawGLE.arr() );
+
+ return appendCommandStatus( result,
+ Status( ErrorCodes::WriteConcernFailed,
+ "multiple write concern errors occurred" ) );
+ }
+ }
+
+ private:
+
+ HostOpTimeMap convertMap( const map<string, OpTime>& rawMap ) {
+ HostOpTimeMap parsedMap;
+ for ( map<string, OpTime>::const_iterator it = rawMap.begin(); it != rawMap.end();
+ ++it ) {
+ string errMsg;
+ parsedMap[ConnectionString::parse( it->first, errMsg )] = it->second;
+ }
+ return parsedMap;
}
+
} cmdGetLastError;
}
diff --git a/src/mongo/s/mock_multi_command.h b/src/mongo/s/mock_multi_write_command.h
index 41a2d8f7d70..bb8301e4817 100644
--- a/src/mongo/s/mock_multi_command.h
+++ b/src/mongo/s/mock_multi_write_command.h
@@ -64,7 +64,7 @@ namespace mongo {
* If an endpoint isn't registered with a MockEndpoint, just returns BatchedCommandResponses
* with ok : true.
*/
- class MockMultiCommand : public MultiCommandDispatch {
+ class MockMultiWriteCommand : public MultiCommandDispatch {
public:
void init( const std::vector<MockEndpoint*> mockEndpoints ) {
diff --git a/src/mongo/s/write_ops/batch_downconvert.cpp b/src/mongo/s/write_ops/batch_downconvert.cpp
index dc4e723fd17..e3c0d6903b0 100644
--- a/src/mongo/s/write_ops/batch_downconvert.cpp
+++ b/src/mongo/s/write_ops/batch_downconvert.cpp
@@ -287,4 +287,216 @@ namespace mongo {
response->setOk( true );
dassert( response->isValid( NULL ) );
}
+
+ /**
+ * Suppress the "err" and "code" field if they are coming from a previous write error and
+ * are not related to write concern. Also removes any write stats information (e.g. "n")
+ *
+ * Also, In some cases, 2.4 GLE w/ wOpTime can give us duplicate "err" and "code" fields b/c of
+ * reporting a previous error. The later field is what we want - dedup and use later field.
+ *
+ * Returns the stripped GLE response.
+ */
+ BSONObj BatchSafeWriter::stripNonWCInfo( const BSONObj& gleResponse ) {
+
+ BSONObjIterator it( gleResponse );
+ BSONObjBuilder builder;
+
+ BSONElement codeField; // eoo
+ BSONElement errField; // eoo
+
+ while ( it.more() ) {
+ BSONElement el = it.next();
+ StringData fieldName( el.fieldName() );
+ if ( fieldName.compare( "err" ) == 0 ) {
+ errField = el;
+ }
+ else if ( fieldName.compare( "code" ) == 0 ) {
+ codeField = el;
+ }
+ else if ( fieldName.compare( "n" ) == 0 || fieldName.compare( "nModified" ) == 0
+ || fieldName.compare( "upserted" ) == 0
+ || fieldName.compare( "updatedExisting" ) == 0 ) {
+ // Suppress field
+ }
+ else {
+ builder.append( el );
+ }
+ }
+
+ if ( !codeField.eoo() ) {
+ if ( !gleResponse["ok"].trueValue() ) {
+ // The last code will be from the write concern
+ builder.append( codeField );
+ }
+ else {
+ // The code is from a non-wc error on this connection - suppress it
+ }
+ }
+
+ if ( !errField.eoo() ) {
+ string err = errField.str();
+ if ( err == "norepl" || err == "noreplset" || err == "timeout" ) {
+ // Append err if it's from a write concern issue
+ builder.append( errField );
+ }
+ else {
+ // Suppress non-write concern err
+ }
+ }
+
+ return builder.obj();
+ }
+
+ namespace {
+
+ /**
+ * Trivial implementation of a BSON serializable object for backwards-compatibility.
+ *
+ * NOTE: This is not a good example of using BSONSerializable. For anything more complex,
+ * create an implementation with fields defined.
+ */
+ class RawBSONSerializable : public BSONSerializable {
+ MONGO_DISALLOW_COPYING(RawBSONSerializable);
+ public:
+
+ RawBSONSerializable() {
+ }
+
+ RawBSONSerializable( const BSONObj& doc ) :
+ _doc( doc ) {
+ }
+
+ bool isValid( std::string* errMsg ) const {
+ return true;
+ }
+
+ BSONObj toBSON() const {
+ return _doc;
+ }
+
+ bool parseBSON( const BSONObj& source, std::string* errMsg ) {
+ _doc = source.getOwned();
+ return true;
+ }
+
+ void clear() {
+ _doc = BSONObj();
+ }
+
+ string toString() const {
+ return toBSON().toString();
+ }
+
+ private:
+
+ BSONObj _doc;
+ };
+ }
+
+ // Adds a wOpTime field to a set of gle options
+ static BSONObj buildGLECmdWithOpTime( const BSONObj& gleOptions, const OpTime& opTime ) {
+ BSONObjBuilder builder;
+ BSONObjIterator it( gleOptions );
+
+ for ( int i = 0; it.more(); ++i ) {
+ BSONElement el = it.next();
+
+ // Make sure first element is getLastError : 1
+ if ( i == 0 ) {
+ StringData elName( el.fieldName() );
+ if ( !elName.equalCaseInsensitive( "getLastError" ) ) {
+ builder.append( "getLastError", 1 );
+ }
+ }
+
+ builder.append( el );
+ }
+ builder.appendTimestamp( "wOpTime", opTime.asDate() );
+ return builder.obj();
+ }
+
+ Status enforceLegacyWriteConcern( MultiCommandDispatch* dispatcher,
+ const StringData& dbName,
+ const BSONObj& options,
+ const HostOpTimeMap& hostOpTimes,
+ vector<LegacyWCResponse>* legacyWCResponses ) {
+
+ if ( hostOpTimes.empty() ) {
+ return Status::OK();
+ }
+
+ for ( HostOpTimeMap::const_iterator it = hostOpTimes.begin(); it != hostOpTimes.end();
+ ++it ) {
+
+ const ConnectionString& shardEndpoint = it->first;
+ const OpTime& opTime = it->second;
+
+ LOG( 3 ) << "enforcing write concern " << options << " on " << shardEndpoint.toString()
+ << " at opTime " << opTime.toStringPretty() << endl;
+
+ BSONObj gleCmd = buildGLECmdWithOpTime( options, opTime );
+
+ dispatcher->addCommand( shardEndpoint, dbName, RawBSONSerializable( gleCmd ) );
+ }
+
+ dispatcher->sendAll();
+
+ vector<Status> failedStatuses;
+
+ while ( dispatcher->numPending() > 0 ) {
+
+ ConnectionString shardEndpoint;
+ RawBSONSerializable gleResponseSerial;
+
+ Status dispatchStatus = dispatcher->recvAny( &shardEndpoint, &gleResponseSerial );
+ if ( !dispatchStatus.isOK() ) {
+ // We need to get all responses before returning
+ failedStatuses.push_back( dispatchStatus );
+ continue;
+ }
+
+ BSONObj gleResponse = BatchSafeWriter::stripNonWCInfo( gleResponseSerial.toBSON() );
+
+ // Use the downconversion tools to determine if this GLE response is ok, a
+ // write concern error, or an unknown error we should immediately abort for.
+ BatchSafeWriter::GLEErrors errors;
+ Status extractStatus = BatchSafeWriter::extractGLEErrors( gleResponse, &errors );
+ if ( !extractStatus.isOK() ) {
+ failedStatuses.push_back( extractStatus );
+ continue;
+ }
+
+ LegacyWCResponse wcResponse;
+ wcResponse.shardHost = shardEndpoint.toString();
+ wcResponse.gleResponse = gleResponse;
+ if ( errors.wcError.get() ) {
+ wcResponse.errToReport = errors.wcError->getErrMessage();
+ }
+
+ legacyWCResponses->push_back( wcResponse );
+ }
+
+ if ( failedStatuses.empty() ) {
+ return Status::OK();
+ }
+
+ StringBuilder builder;
+ builder << "could not enforce write concern";
+
+ for ( vector<Status>::const_iterator it = failedStatuses.begin();
+ it != failedStatuses.end(); ++it ) {
+ const Status& failedStatus = *it;
+ if ( it == failedStatuses.begin() ) {
+ builder << causedBy( failedStatus.toString() );
+ }
+ else {
+ builder << ":: and ::" << failedStatus.toString();
+ }
+ }
+
+ return Status( failedStatuses.size() == 1u ? failedStatuses.front().code() :
+ ErrorCodes::MultipleErrorsOccurred,
+ builder.str() );
+ }
}
diff --git a/src/mongo/s/write_ops/batch_downconvert.h b/src/mongo/s/write_ops/batch_downconvert.h
index 1c81858c55c..cd5440ae993 100644
--- a/src/mongo/s/write_ops/batch_downconvert.h
+++ b/src/mongo/s/write_ops/batch_downconvert.h
@@ -35,6 +35,8 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/optime.h"
#include "mongo/client/dbclientinterface.h"
+#include "mongo/s/multi_command_dispatch.h"
+#include "mongo/s/write_ops/batch_write_exec.h"
#include "mongo/s/write_ops/batch_write_op.h"
#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/s/write_ops/batched_command_response.h"
@@ -67,7 +69,7 @@ namespace mongo {
* write command emulation.
*/
virtual Status enforceWriteConcern( DBClientBase* conn,
- const std::string& dbName,
+ const StringData& dbName,
const BSONObj& writeConcern,
BSONObj* gleResponse ) = 0;
};
@@ -91,10 +93,6 @@ namespace mongo {
const BatchedCommandRequest& request,
BatchedCommandResponse* response );
- //
- // Exposed for testing
- //
-
// Helper that acts as an auto-ptr for write and wc errors
struct GLEErrors {
auto_ptr<WriteErrorDetail> writeError;
@@ -124,9 +122,33 @@ namespace mongo {
*/
static void extractGLEStats( const BSONObj& gleResponse, GLEStats* stats );
+ /**
+ * Given a GLE response, strips out all non-write-concern related information
+ */
+ static BSONObj stripNonWCInfo( const BSONObj& gleResponse );
+
private:
SafeWriter* _safeWriter;
};
+ // Used for reporting legacy write concern responses
+ struct LegacyWCResponse {
+ string shardHost;
+ BSONObj gleResponse;
+ string errToReport;
+ };
+
+ /**
+ * Uses GLE and the shard hosts and opTimes last written by write commands to enforce a
+ * write concern across the previously used shards.
+ *
+ * Returns OK with the LegacyWCResponses containing only write concern error information
+ * Returns !OK if there was an error getting a GLE response
+ */
+ Status enforceLegacyWriteConcern( MultiCommandDispatch* dispatcher,
+ const StringData& dbName,
+ const BSONObj& options,
+ const HostOpTimeMap& hostOpTimes,
+ vector<LegacyWCResponse>* wcResponses );
}
diff --git a/src/mongo/s/write_ops/batch_downconvert_test.cpp b/src/mongo/s/write_ops/batch_downconvert_test.cpp
index e38f941446a..e6b9d7e7874 100644
--- a/src/mongo/s/write_ops/batch_downconvert_test.cpp
+++ b/src/mongo/s/write_ops/batch_downconvert_test.cpp
@@ -33,6 +33,7 @@
#include "mongo/db/jsobj.h"
#include "mongo/db/json.h"
+#include "mongo/s/multi_command_dispatch.h"
#include "mongo/unittest/unittest.h"
namespace {
@@ -237,7 +238,7 @@ namespace {
}
Status enforceWriteConcern( DBClientBase* conn,
- const std::string& dbName,
+ const StringData& dbName,
const BSONObj& writeConcern,
BSONObj* gleResponse ) {
BSONObj response = _gleResponses.front();
@@ -424,5 +425,208 @@ namespace {
ASSERT_EQUALS( response.getLastOp().toStringPretty(), OpTime(20, 0).toStringPretty() );
}
+ //
+ // Tests of processing and suppressing non-WC related fields from legacy GLE responses
+ //
+
+ TEST(LegacyGLESuppress, Basic) {
+
+ const BSONObj gleResponse = fromjson( "{ok: 1.0, err: null}" );
+
+ BSONObj stripped = BatchSafeWriter::stripNonWCInfo( gleResponse );
+ ASSERT_EQUALS( stripped.nFields(), 1 );
+ ASSERT( stripped["ok"].trueValue() );
+ }
+
+ TEST(LegacyGLESuppress, BasicStats) {
+
+ const BSONObj gleResponse =
+ fromjson( "{ok: 0.0, err: 'message',"
+ " n: 1, nModified: 1, upserted: 'abc', updatedExisting: true}" );
+
+ BSONObj stripped = BatchSafeWriter::stripNonWCInfo( gleResponse );
+ ASSERT_EQUALS( stripped.nFields(), 1 );
+ ASSERT( !stripped["ok"].trueValue() );
+ }
+
+ TEST(LegacyGLESuppress, ReplError) {
+
+ const BSONObj gleResponse =
+ fromjson( "{ok: 0.0, err: 'norepl', n: 1, wcField: true}" );
+
+ BSONObj stripped = BatchSafeWriter::stripNonWCInfo( gleResponse );
+ ASSERT_EQUALS( stripped.nFields(), 3 );
+ ASSERT( !stripped["ok"].trueValue() );
+ ASSERT_EQUALS( stripped["err"].str(), "norepl" );
+ ASSERT( stripped["wcField"].trueValue() );
+ }
+
+ TEST(LegacyGLESuppress, StripCode) {
+
+ const BSONObj gleResponse =
+ fromjson( "{ok: 1.0, err: 'message', code: 12345}" );
+
+ BSONObj stripped = BatchSafeWriter::stripNonWCInfo( gleResponse );
+ ASSERT_EQUALS( stripped.nFields(), 1 );
+ ASSERT( stripped["ok"].trueValue() );
+ }
+
+ TEST(LegacyGLESuppress, TimeoutDupError24) {
+
+ const BSONObj gleResponse =
+ BSON( "ok" << 0.0 << "err" << "message" << "code" << 12345
+ << "err" << "timeout" << "code" << 56789 << "wtimeout" << true );
+
+ BSONObj stripped = BatchSafeWriter::stripNonWCInfo( gleResponse );
+ ASSERT_EQUALS( stripped.nFields(), 4 );
+ ASSERT( !stripped["ok"].trueValue() );
+ ASSERT_EQUALS( stripped["err"].str(), "timeout" );
+ ASSERT_EQUALS( stripped["code"].numberInt(), 56789 );
+ ASSERT( stripped["wtimeout"].trueValue() );
+ }
+
+ //
+ // Tests of basic logical dispatching and aggregation for legacy GLE-based write concern
+ //
+
+ class MockCommandDispatch : public MultiCommandDispatch {
+ public:
+
+ MockCommandDispatch( const vector<BSONObj>& gleResponses ) :
+ _gleResponses( gleResponses.begin(), gleResponses.end() ) {
+ }
+
+ virtual ~MockCommandDispatch() {
+ }
+
+ void addCommand( const ConnectionString& endpoint,
+ const StringData& dbName,
+ const BSONSerializable& request ) {
+ _gleHosts.push_back( endpoint );
+ }
+
+ void sendAll() {
+ // No-op
+ }
+
+ /**
+ * Returns the number of sent requests that are still waiting to be recv'd.
+ */
+ int numPending() const {
+ return _gleHosts.size();
+ }
+
+ Status recvAny( ConnectionString* endpoint, BSONSerializable* response ) {
+ *endpoint = _gleHosts.front();
+ response->parseBSON( _gleResponses.front(), NULL );
+ _gleHosts.pop_front();
+ _gleResponses.pop_front();
+ return Status::OK();
+ }
+
+ private:
+
+ deque<ConnectionString> _gleHosts;
+ deque<BSONObj> _gleResponses;
+ };
+
+ TEST(LegacyGLEWriteConcern, Basic) {
+
+ HostOpTimeMap hostOpTimes;
+ hostOpTimes[ConnectionString::mock(HostAndPort("shardA:1000"))] = OpTime();
+ hostOpTimes[ConnectionString::mock(HostAndPort("shardB:1000"))] = OpTime();
+
+ vector<BSONObj> gleResponses;
+ gleResponses.push_back( fromjson( "{ok: 1.0, err: null}" ) );
+ gleResponses.push_back( fromjson( "{ok: 1.0, err: null}" ) );
+
+ MockCommandDispatch dispatcher( gleResponses );
+ vector<LegacyWCResponse> wcResponses;
+
+ Status status = enforceLegacyWriteConcern( &dispatcher,
+ "db",
+ BSONObj(),
+ hostOpTimes,
+ &wcResponses );
+
+ ASSERT_OK( status );
+ ASSERT_EQUALS( wcResponses.size(), 2u );
+ }
+
+ TEST(LegacyGLEWriteConcern, FailGLE) {
+
+ HostOpTimeMap hostOpTimes;
+ hostOpTimes[ConnectionString::mock(HostAndPort("shardA:1000"))] = OpTime();
+ hostOpTimes[ConnectionString::mock(HostAndPort("shardB:1000"))] = OpTime();
+
+ vector<BSONObj> gleResponses;
+ gleResponses.push_back( fromjson( "{ok: 0.0, errmsg: 'something'}" ) );
+ gleResponses.push_back( fromjson( "{ok: 1.0, err: null}" ) );
+
+ MockCommandDispatch dispatcher( gleResponses );
+ vector<LegacyWCResponse> wcResponses;
+
+ Status status = enforceLegacyWriteConcern( &dispatcher,
+ "db",
+ BSONObj(),
+ hostOpTimes,
+ &wcResponses );
+
+ ASSERT_NOT_OK( status );
+ // Ensure we keep getting the rest of the responses
+ ASSERT_EQUALS( wcResponses.size(), 1u );
+ }
+
+ TEST(LegacyGLEWriteConcern, MultiWCErrors) {
+
+ HostOpTimeMap hostOpTimes;
+ hostOpTimes[ConnectionString::mock( HostAndPort( "shardA:1000" ) )] = OpTime();
+ hostOpTimes[ConnectionString::mock( HostAndPort( "shardB:1000" ) )] = OpTime();
+
+ vector<BSONObj> gleResponses;
+ gleResponses.push_back( fromjson( "{ok: 0.0, err: 'norepl'}" ) );
+ gleResponses.push_back( fromjson( "{ok: 0.0, err: 'timeout', wtimeout: true}" ) );
+
+ MockCommandDispatch dispatcher( gleResponses );
+ vector<LegacyWCResponse> wcResponses;
+
+ Status status = enforceLegacyWriteConcern( &dispatcher,
+ "db",
+ BSONObj(),
+ hostOpTimes,
+ &wcResponses );
+
+ ASSERT_OK( status );
+ ASSERT_EQUALS( wcResponses.size(), 2u );
+ ASSERT_EQUALS( wcResponses[0].shardHost, "shardA:1000" );
+ ASSERT_EQUALS( wcResponses[0].gleResponse["err"].str(), "norepl" );
+ ASSERT_EQUALS( wcResponses[0].errToReport, "norepl" );
+ ASSERT_EQUALS( wcResponses[1].shardHost, "shardB:1000" );
+ ASSERT_EQUALS( wcResponses[1].gleResponse["err"].str(), "timeout" );
+ ASSERT_EQUALS( wcResponses[1].errToReport, "timeout" );
+ }
+
+ TEST(LegacyGLEWriteConcern, MultiFailGLE) {
+
+ HostOpTimeMap hostOpTimes;
+ hostOpTimes[ConnectionString::mock(HostAndPort("shardA:1000"))] = OpTime();
+ hostOpTimes[ConnectionString::mock(HostAndPort("shardB:1000"))] = OpTime();
+
+ vector<BSONObj> gleResponses;
+ gleResponses.push_back( fromjson( "{ok: 0.0, errmsg: 'something'}" ) );
+ gleResponses.push_back( fromjson( "{ok: 0.0, errmsg: 'something'}" ) );
+
+ MockCommandDispatch dispatcher( gleResponses );
+ vector<LegacyWCResponse> wcResponses;
+
+ Status status = enforceLegacyWriteConcern( &dispatcher,
+ "db",
+ BSONObj(),
+ hostOpTimes,
+ &wcResponses );
+
+ ASSERT_NOT_OK( status );
+ ASSERT_EQUALS( wcResponses.size(), 0u );
+ }
}
diff --git a/src/mongo/s/write_ops/batch_write_exec_test.cpp b/src/mongo/s/write_ops/batch_write_exec_test.cpp
index 7fa0bbe5cae..acf62ab2027 100644
--- a/src/mongo/s/write_ops/batch_write_exec_test.cpp
+++ b/src/mongo/s/write_ops/batch_write_exec_test.cpp
@@ -29,7 +29,7 @@
#include "mongo/s/write_ops/batch_write_exec.h"
#include "mongo/base/owned_pointer_vector.h"
-#include "mongo/s/mock_multi_command.h"
+#include "mongo/s/mock_multi_write_command.h"
#include "mongo/s/mock_ns_targeter.h"
#include "mongo/s/mock_shard_resolver.h"
#include "mongo/s/write_ops/batched_command_request.h"
@@ -74,7 +74,7 @@ namespace {
MockNSTargeter targeter;
targeter.init( mockRanges );
- MockMultiCommand dispatcher;
+ MockMultiWriteCommand dispatcher;
BatchWriteExec exec( &targeter, &resolver, &dispatcher );
@@ -122,7 +122,7 @@ namespace {
MockNSTargeter targeter;
targeter.init( mockRanges );
- MockMultiCommand dispatcher;
+ MockMultiWriteCommand dispatcher;
dispatcher.init( mockEndpoints );
BatchWriteExec exec( &targeter, &resolver, &dispatcher );
@@ -180,7 +180,7 @@ namespace {
MockNSTargeter targeter;
targeter.init( mockRanges );
- MockMultiCommand dispatcher;
+ MockMultiWriteCommand dispatcher;
dispatcher.init( mockEndpoints );
BatchWriteExec exec( &targeter, &resolver, &dispatcher );
diff --git a/src/mongo/s/write_ops/config_coordinator_test.cpp b/src/mongo/s/write_ops/config_coordinator_test.cpp
index f8120bfedaa..812aec7729e 100644
--- a/src/mongo/s/write_ops/config_coordinator_test.cpp
+++ b/src/mongo/s/write_ops/config_coordinator_test.cpp
@@ -31,7 +31,7 @@
#include <vector>
#include "mongo/client/dbclientinterface.h"
-#include "mongo/s/mock_multi_command.h"
+#include "mongo/s/mock_multi_write_command.h"
#include "mongo/unittest/unittest.h"
namespace {
@@ -44,7 +44,7 @@ namespace {
//
TEST(ConfigCoordinatorTests, Basic) {
- MockMultiCommand dispatcher;
+ MockMultiWriteCommand dispatcher;
vector<ConnectionString> configHosts;
ConfigCoordinator exec( &dispatcher, configHosts );
}
diff --git a/src/mongo/s/write_ops/dbclient_safe_writer.cpp b/src/mongo/s/write_ops/dbclient_safe_writer.cpp
index 1f8177fd951..b3eec6d757e 100644
--- a/src/mongo/s/write_ops/dbclient_safe_writer.cpp
+++ b/src/mongo/s/write_ops/dbclient_safe_writer.cpp
@@ -91,19 +91,19 @@ namespace mongo {
}
Status DBClientSafeWriter::enforceWriteConcern( DBClientBase* conn,
- const string& dbName,
+ const StringData& dbName,
const BSONObj& writeConcern,
BSONObj* gleResponse ) {
try {
BSONObj resetResponse; // ignored, always ok
- conn->runCommand( dbName, BSON( "resetError" << 1 ), resetResponse );
+ conn->runCommand( dbName.toString(), BSON( "resetError" << 1 ), resetResponse );
BSONObjBuilder gleCmdB;
gleCmdB.append( "getLastError", true );
gleCmdB.appendElements( writeConcern );
- conn->runCommand( dbName, gleCmdB.obj(), *gleResponse );
+ conn->runCommand( dbName.toString(), gleCmdB.obj(), *gleResponse );
}
catch ( const DBException& ex ) {
return ex.toStatus();
diff --git a/src/mongo/s/write_ops/dbclient_safe_writer.h b/src/mongo/s/write_ops/dbclient_safe_writer.h
index cb1831c3770..c4656281651 100644
--- a/src/mongo/s/write_ops/dbclient_safe_writer.h
+++ b/src/mongo/s/write_ops/dbclient_safe_writer.h
@@ -53,7 +53,7 @@ namespace mongo {
BSONObj* gleResponse );
Status enforceWriteConcern( DBClientBase* conn,
- const std::string& dbName,
+ const StringData& dbName,
const BSONObj& writeConcern,
BSONObj* gleResponse );