diff options
author | Greg Studer <greg@10gen.com> | 2014-01-14 16:35:28 -0500 |
---|---|---|
committer | Greg Studer <greg@10gen.com> | 2014-01-16 11:39:53 -0500 |
commit | 6be5ff60cf3233dff7fff542ded1a06b541076b1 (patch) | |
tree | ac81d4ebedc235266c999b186b107fd3981efe32 | |
parent | 1721db7bddcec2c61c121de961a899622f47c6ec (diff) | |
download | mongo-6be5ff60cf3233dff7fff542ded1a06b541076b1.tar.gz |
SERVER-12274 mongos GLE changes for better backwards compatibility
-rw-r--r-- | jstests/sharding/gle_error_message.js | 159 | ||||
-rw-r--r-- | jstests/sharding/gle_sharded_wc.js | 154 | ||||
-rw-r--r-- | src/mongo/client/dbclientinterface.h | 12 | ||||
-rw-r--r-- | src/mongo/s/client_info.cpp | 64 | ||||
-rw-r--r-- | src/mongo/s/client_info.h | 29 | ||||
-rw-r--r-- | src/mongo/s/commands_admin.cpp | 128 | ||||
-rw-r--r-- | src/mongo/s/mock_multi_write_command.h (renamed from src/mongo/s/mock_multi_command.h) | 2 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_downconvert.cpp | 212 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_downconvert.h | 32 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_downconvert_test.cpp | 206 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_write_exec_test.cpp | 8 | ||||
-rw-r--r-- | src/mongo/s/write_ops/config_coordinator_test.cpp | 4 | ||||
-rw-r--r-- | src/mongo/s/write_ops/dbclient_safe_writer.cpp | 6 | ||||
-rw-r--r-- | src/mongo/s/write_ops/dbclient_safe_writer.h | 2 |
14 files changed, 747 insertions, 271 deletions
diff --git a/jstests/sharding/gle_error_message.js b/jstests/sharding/gle_error_message.js deleted file mode 100644 index d02a9778db3..00000000000 --- a/jstests/sharding/gle_error_message.js +++ /dev/null @@ -1,159 +0,0 @@ -// -// Tests whether sharded GLE fails sanely and correctly reports failures. -// - -function waitForWrite(shardIndex, query, count) { - var searchText = tojson(query) + " on shard " + shardIndex; - jsTest.log( "Waiting for " + count + " document(s) with " + searchText ); - var shardDB = connect(shards[shardIndex].host + "/" + jsTestName()); - assert.soon( function() { return shardDB.coll.find( query ).count() == count; }, - "Failed to find document with " + searchText, - /* timeout */ 10 * 1000, - /*interval*/ 10 ); -} - -jsTest.log( "Starting sharded cluster..." ) - -var st = new ShardingTest({ shards : 3, - mongos : 2, - verbose : 3, - other : { separateConfig : true } }) - -st.stopBalancer() - -var mongos = st.s0 -var admin = mongos.getDB( "admin" ) -var config = mongos.getDB( "config" ) -var coll = mongos.getCollection( jsTestName() + ".coll" ) -var shards = config.shards.find().toArray() - -jsTest.log( "Enabling sharding..." ) - -printjson( admin.runCommand({ enableSharding : "" + coll.getDB() }) ) -printjson( admin.runCommand({ movePrimary : "" + coll.getDB(), to : shards[0]._id }) ) -printjson( admin.runCommand({ shardCollection : "" + coll, key : { _id : 1 } }) ) -printjson( admin.runCommand({ split : "" + coll, middle : { _id : 0 } }) ) -printjson( admin.runCommand({ moveChunk : "" + coll, find : { _id : 0 }, to : shards[1]._id }) ) - -st.printShardingStatus() - - - -jsTest.log( "Testing GLE...") - -// insert to two diff shards -coll.insert({ _id : -1, hello : "world" }) -coll.insert({ _id : 1, hello : "world" }) - -waitForWrite(0, {_id: -1}, 1); -waitForWrite(1, {_id: 1}, 1); - -jsTest.log( "GLE : " + tojson( coll.getDB().getLastErrorObj() ) ) - - - -jsTest.log( "Testing GLE when writeback host goes down..." ) - -// insert to two diff shards -coll.insert({ _id : -2, hello : "world" }) -coll.insert({ _id : 2, hello : "world" }) - -waitForWrite(0, {_id: -2}, 1); -waitForWrite(1, {_id: 2}, 1); - -MongoRunner.stopMongod( st.shard0 ) - -jsTest.log( "GLE : " + tojson( coll.getDB().getLastErrorObj() ) ) - -st.shard0 = MongoRunner.runMongod( st.shard0 ) - -// NEED TO WAIT 5s so the connection pool knows we're down and up -sleep( 5 * 1000 ); - -jsTest.log( "Testing GLE when main host goes down..." ) - -// insert to two diff shards -coll.insert({ _id : -3, hello : "world" }) -coll.insert({ _id : 3, hello : "world" }) - -waitForWrite(0, {_id: -3}, 1); -waitForWrite(1, {_id: 3}, 1); - -MongoRunner.stopMongod( st.shard1 ) - -try{ - jsTest.log( "Calling GLE! " ) - coll.getDB().getLastErrorObj() - assert( false ) -} -catch( e ){ - jsTest.log( "GLE : " + e ) - - // Stupid string exceptions - assert( /could not get last error|socket exception/.test( e + "") ) -} - -st.shard1 = MongoRunner.runMongod( st.shard1 ) - -// NEED TO WAIT 5s so the connection pool knows we're down and up -sleep( 5 * 1000 ); - -jsTest.log( "Testing multi GLE for multi-host writes..." ) - -coll.update({ hello : "world" }, { $set : { goodbye : "world" } }, false, true) - -waitForWrite(0, {goodbye: "world"}, 3); -waitForWrite(1, {goodbye: "world"}, 3); - -jsTest.log( "GLE : " + tojson( coll.getDB().getLastErrorObj() ) ) - -jsTest.log( "Testing multi GLE when host goes down..." ) - -// insert to two diff shards -coll.update({ hello : "world" }, { $set : { goodnight : "moon" } }, false, true) - -waitForWrite(0, {goodnight: "moon"}, 3); -waitForWrite(1, {goodnight: "moon"}, 3); - -MongoRunner.stopMongod( st.shard0 ) - -try{ - jsTest.log( "Calling GLE! " ) - coll.getDB().getLastErrorObj() - assert( false ) -} -catch( e ){ - jsTest.log( "GLE : " + e ) - - // Stupid string exceptions - assert( /could not get last error|socket exception/.test( e + "") ) -} - -st.shard0 = MongoRunner.runMongod( st.shard0 ) - -// NEED TO WAIT 5s so the connection pool knows we're down -sleep( 5 * 1000 ); - -jsTest.log( "Testing stale version GLE when host goes down..." ) - -var staleColl = st.s1.getCollection( coll + "" ) -staleColl.findOne() - -printjson( admin.runCommand({ connPoolStats : true }) ); -//printjson( admin.runCommand({ connPoolSync : true }) ); -assert( admin.runCommand({ moveChunk : "" + coll, find : { _id : 0 }, to : shards[2]._id }).ok ); - -waitForWrite(2, {goodnight: "moon"}, 3); - -MongoRunner.stopMongod( st.shard2 ) - -jsTest.log( "Sending stale write..." ) - -staleColl.insert({ _id : 4, hello : "world" }) - -assert.neq( null, staleColl.getDB().getLastError() ) - - -jsTest.log( "Done!" ) - -st.stop() diff --git a/jstests/sharding/gle_sharded_wc.js b/jstests/sharding/gle_sharded_wc.js new file mode 100644 index 00000000000..6c042a7e630 --- /dev/null +++ b/jstests/sharding/gle_sharded_wc.js @@ -0,0 +1,154 @@ +// +// Tests whether sharded GLE fails sanely and correctly reports failures. +// + +// Options for a cluster with two replica set shards, the first with two nodes the second with one +// This lets us try a number of GLE scenarios +var options = { separateConfig : true, + rs : true, + rsOptions : { nojournal : "" }, + // Options for each replica set shard + rs0 : { nodes : 2 }, + rs1 : { nodes : 1 } }; + +var st = new ShardingTest({ shards : 2, mongos : 1, other : options }); +st.stopBalancer(); + +var mongos = st.s0; +var admin = mongos.getDB( "admin" ); +var config = mongos.getDB( "config" ); +var coll = mongos.getCollection( jsTestName() + ".coll" ); +var shards = config.shards.find().toArray(); + +assert.commandWorked( admin.runCommand({ enableSharding : coll.getDB().toString() }) ); +printjson( admin.runCommand({ movePrimary : coll.getDB().toString(), to : shards[0]._id }) ); +assert.commandWorked( admin.runCommand({ shardCollection : coll.toString(), key : { _id : 1 } }) ); +assert.commandWorked( admin.runCommand({ split : coll.toString(), middle : { _id : 0 } }) ); +assert.commandWorked( admin.runCommand({ moveChunk : coll.toString(), + find : { _id : 0 }, + to : shards[1]._id }) ); + +st.printShardingStatus(); + +// Don't use write commands +coll.getMongo().useWriteCommands = function(){ return false; }; + +var gle = null; + +// +// Successful insert +coll.remove({}); +coll.insert({ _id : -1 }); +printjson(gle = coll.getDB().runCommand({ getLastError : 1 })); +assert(gle.ok); +assert(!gle.err); +assert.eq(coll.count(), 1); + +// +// Successful upserts +coll.remove({}); +coll.update({ _id : -1 }, { _id : -1 }, true); +coll.update({ _id : 1 }, { _id : 1 }, true); +printjson(gle = coll.getDB().runCommand({ getLastError : 1 })); +assert(gle.ok); +assert(!gle.err); +assert.eq(gle.n, 1); +assert.eq(gle.upserted, 1); +assert.eq(coll.count(), 2); + +// +// No journal insert, GLE fails +coll.remove({}); +coll.insert({ _id : -1 }); +printjson(gle = coll.getDB().runCommand({ getLastError : 1, j : true })); +assert(!gle.ok); +assert(gle.errmsg); + +// +// Successful insert, write concern mode invalid +coll.remove({}); +coll.insert({ _id : -1 }); +printjson(gle = coll.getDB().runCommand({ getLastError : 1, w : 'invalid' })); +assert(!gle.ok); +assert(!gle.err); +assert(gle.errmsg); +assert.eq(gle.code, 79); // UnknownReplWriteConcern - needed for backwards compatibility +assert.eq(coll.count(), 1); + +// +// Error on insert (dup key), write concern error not reported +coll.remove({}); +coll.insert({ _id : -1 }); +coll.insert({ _id : -1 }); +printjson(gle = coll.getDB().runCommand({ getLastError : 1, w : 'invalid' })); +assert(gle.ok); +assert(gle.err); +assert(gle.code); +assert(!gle.errmsg); +assert.eq(coll.count(), 1); + +// +// Error on two-hosts during remove +coll.remove({}); +coll.remove({ $invalid : 'remove' }); +printjson(gle = coll.getDB().runCommand({ getLastError : 1 })); +assert(gle.ok); +assert(gle.err); +assert(gle.code); +assert(!gle.errmsg); +assert(gle.shards); +assert.eq(coll.count(), 0); + +// +// Successful remove on two hosts, write concern timeout on one +coll.remove({}); +st.rs0.awaitReplication(); // To ensure the first shard won't timeout +printjson(gle = coll.getDB().runCommand({ getLastError : 1, w : 2, wtimeout : 5 * 1000 })); +assert(gle.ok); +assert.eq(gle.err, 'timeout'); +assert(gle.wtimeout); +assert(gle.shards); +assert.eq(coll.count(), 0); + +// +// Successful remove on two hosts, write concern timeout on both +// We don't aggregate two timeouts together +coll.remove({}); +st.rs0.awaitReplication(); // To ensure the first shard won't timeout +printjson(gle = coll.getDB().runCommand({ getLastError : 1, w : 3, wtimeout : 5 * 1000 })); +assert(!gle.ok); +assert(gle.errmsg); +assert.eq(gle.code, 64); // WriteConcernFailed - needed for backwards compatibility +assert(!gle.wtimeout); +assert(gle.shards); +assert(gle.errs); +assert.eq(coll.count(), 0); + +// +// First replica set DOWN +// + +// +// Successful bulk insert on two hosts, host changes before gle (error contacting host) +coll.remove({}); +coll.insert([{ _id : 1 }, { _id : -1 }]); +st.rs0.stop(st.rs0.getPrimary(), true); // wait for stop +printjson(gle = coll.getDB().runCommand({ getLastError : 1 })); +assert(!gle.ok); +assert(gle.errmsg); +assert.eq(coll.count({ _id : 1 }), 1); + +// +// Failed insert on two hosts, first host down +// NOTE: This is DIFFERENT from 2.4, since we don't need to contact a host we didn't get +// successful writes from. +coll.remove({ _id : 1 }); +coll.insert([{ _id : 1 }, { _id : -1 }]); +printjson(gle = coll.getDB().runCommand({ getLastError : 1 })); +assert(gle.ok); +assert(gle.err); +assert.eq(coll.count({ _id : 1 }), 1); + +jsTest.log( "DONE!" ); + +st.stop(); diff --git a/src/mongo/client/dbclientinterface.h b/src/mongo/client/dbclientinterface.h index eba2d0bdb43..417b365f1e6 100644 --- a/src/mongo/client/dbclientinterface.h +++ b/src/mongo/client/dbclientinterface.h @@ -294,6 +294,18 @@ namespace mongo { return _connectHook; } + // + // FOR TESTING ONLY - useful to be able to directly mock a connection string without + // including the entire client library. + // + + static ConnectionString mock( const HostAndPort& server ) { + ConnectionString connStr; + connStr._servers.push_back( server ); + connStr._string = server.toString( true ); + return connStr; + } + private: void _fillServers( string s ); diff --git a/src/mongo/s/client_info.cpp b/src/mongo/s/client_info.cpp index 49225ba7a6a..9cf34c31005 100644 --- a/src/mongo/s/client_info.cpp +++ b/src/mongo/s/client_info.cpp @@ -39,6 +39,7 @@ #include "mongo/db/dbmessage.h" #include "mongo/db/stats/counters.h" #include "mongo/db/stats/timer_stats.h" +#include "mongo/s/write_ops/batch_downconvert.h" #include "mongo/s/client_info.h" #include "mongo/s/config.h" #include "mongo/s/chunk.h" @@ -150,69 +151,6 @@ namespace mongo { static TimerStats gleWtimeStats; static ServerStatusMetricField<TimerStats> displayGleLatency( "getLastError.wtime", &gleWtimeStats ); - static BSONObj addOpTimeTo( const BSONObj& options, const OpTime& opTime ) { - BSONObjBuilder builder; - builder.appendElements( options ); - builder.appendTimestamp( "wOpTime", opTime.asDate() ); - return builder.obj(); - } - - // TODO: Break out of ClientInfo when we have a better place for this - bool ClientInfo::enforceWriteConcern( const string& dbName, - const BSONObj& options, - string* errMsg ) { - - const map<string, OpTime>& hostOpTimes = getPrevHostOpTimes(); - - if ( hostOpTimes.empty() ) { - return true; - } - - for ( map<string, OpTime>::const_iterator it = hostOpTimes.begin(); it != hostOpTimes.end(); - ++it ) { - - const string& shardHost = it->first; - const OpTime& opTime = it->second; - - LOG(5) << "enforcing write concern " << options << " on " << shardHost << endl; - - BSONObj optionsWithOpTime = addOpTimeTo( options, opTime ); - - bool ok = false; - - boost::scoped_ptr<ScopedDbConnection> connPtr; - try { - connPtr.reset( new ScopedDbConnection( shardHost ) ); - ScopedDbConnection& conn = *connPtr; - - BSONObj result; - ok = conn->runCommand( dbName , optionsWithOpTime , result ); - if ( !ok ) - *errMsg = result.toString(); - - conn.done(); - } - catch( const DBException& ex ){ - *errMsg = ex.toString(); - - if ( connPtr ) - connPtr->done(); - } - - // Done if anyone fails - if ( !ok ) { - - *errMsg = str::stream() << "could not enforce write concern on " << shardHost - << causedBy( errMsg ); - - warning() << *errMsg << endl; - return false; - } - } - - return true; - } - boost::thread_specific_ptr<ClientInfo> ClientInfo::_tlInfo; } // namespace mongo diff --git a/src/mongo/s/client_info.h b/src/mongo/s/client_info.h index a6f9e62213b..cd845f2dcf5 100644 --- a/src/mongo/s/client_info.h +++ b/src/mongo/s/client_info.h @@ -31,6 +31,10 @@ #include "mongo/pch.h" +#include <map> +#include <set> +#include <vector> + #include "mongo/db/client_basic.h" #include "mongo/s/chunk.h" #include "mongo/s/writeback_listener.h" @@ -109,25 +113,6 @@ namespace mongo { void disableForCommand(); - /** - * Uses GLE and the shard hosts and opTimes last written by write commands to enforce a - * write concern. - * - * Returns true if write concern was enforced, false with errMsg if not. - */ - bool enforceWriteConcern( const string& dbName, const BSONObj& options, string* errMsg ); - - /** - * calls getLastError - * resets shards since get last error - * @return if the command was ok or if there was an error - */ - bool getLastError( const string& dbName, - const BSONObj& options , - BSONObjBuilder& result , - string& errmsg, - bool fromWriteBackListener = false ); - /** @return if its ok to auto split from this client */ bool autoSplitOk() const { return _autoSplitOk && Chunk::ShouldAutoSplit; } @@ -154,8 +139,8 @@ namespace mongo { hostOpTimes.clear(); } - set<string> shardHostsWritten; - map<string, OpTime> hostOpTimes; + std::set<string> shardHostsWritten; + std::map<string, OpTime> hostOpTimes; }; // we use _a and _b to store info from the current request and the previous request @@ -168,7 +153,7 @@ namespace mongo { RequestInfo* _prev; // "" - set<string> _sinceLastGetError; // all shards accessed since last getLastError + std::set<string> _sinceLastGetError; // all shards accessed since last getLastError int _lastAccess; bool _autoSplitOk; diff --git a/src/mongo/s/commands_admin.cpp b/src/mongo/s/commands_admin.cpp index 748bac8fa13..2a15f9a1b59 100644 --- a/src/mongo/s/commands_admin.cpp +++ b/src/mongo/s/commands_admin.cpp @@ -51,12 +51,15 @@ #include "mongo/s/client_info.h" #include "mongo/s/cluster_write.h" #include "mongo/s/config.h" +#include "mongo/s/dbclient_multi_command.h" #include "mongo/s/grid.h" #include "mongo/s/strategy.h" #include "mongo/s/type_chunk.h" #include "mongo/s/type_database.h" #include "mongo/s/type_shard.h" #include "mongo/s/writeback_listener.h" +#include "mongo/s/write_ops/batch_downconvert.h" +#include "mongo/s/write_ops/batch_write_exec.h" #include "mongo/s/write_ops/batched_command_request.h" #include "mongo/util/net/listen.h" #include "mongo/util/net/message.h" @@ -1521,27 +1524,132 @@ namespace mongo { std::vector<Privilege>* out) {} // No auth required CmdShardingGetLastError() : Command("getLastError" , false , "getlasterror") { } - virtual bool run(const string& dbName, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) { + virtual bool run( const string& dbName, + BSONObj& cmdObj, + int, + string& errmsg, + BSONObjBuilder& result, + bool ) { + + // + // Mongos GLE - finicky. + // + // To emulate mongod, we first append any write errors we had, then try to append + // write concern error if there was no write error. We need to contact the previous + // shards regardless to maintain 2.4 behavior. + // + // If there are any unexpected or connectivity errors when calling GLE, fail the + // command. + // + // Finally, report the write concern errors IF we don't already have an error. + // If we only get one write concern error back, report that, otherwise report an + // aggregated error. + // + // TODO: Do we need to contact the prev shards regardless - do we care that much + // about 2.4 behavior? + // + LastError *le = lastError.disableForCommand(); verify( le ); - // Write commands always have the error stored in the mongos last error + bool errorOccurred = false; if ( le->nPrev == 1 ) { - le->appendSelf( result ); - } - else { - result.appendNull( "err" ); + errorOccurred = le->appendSelf( result, false ); } - bool wcResult = ClientInfo::get()->enforceWriteConcern( dbName, - cmdObj, - &errmsg ); + // For compatibility with 2.4 sharded GLE, we always enforce the write concern + // across all shards. + + DBClientMultiCommand dispatcher; + vector<LegacyWCResponse> wcResponses; + Status status = enforceLegacyWriteConcern( &dispatcher, + dbName, + cmdObj, + convertMap( ClientInfo::get() + ->getPrevHostOpTimes() ), + &wcResponses ); // Don't forget about our last hosts, reset the client info ClientInfo::get()->disableForCommand(); - return wcResult; + + // We're now done contacting all remote servers, just report results + + if ( !status.isOK() ) { + // Return immediately if we failed to contact a shard, unexpected GLE issue + // Can't return code, since it may have been set above (2.4 compatibility) + result.append( "errmsg", status.reason() ); + return false; + } + + // Go through all the write concern responses and find errors + BSONArrayBuilder shards; + BSONObjBuilder shardRawGLE; + BSONArrayBuilder errors; + BSONArrayBuilder errorRawGLE; + + int numWCErrors = 0; + const LegacyWCResponse* lastErrResponse = NULL; + + for ( vector<LegacyWCResponse>::const_iterator it = wcResponses.begin(); + it != wcResponses.end(); ++it ) { + + const LegacyWCResponse& wcResponse = *it; + + shards.append( wcResponse.shardHost ); + shardRawGLE.append( wcResponse.shardHost, wcResponse.gleResponse ); + + if ( !wcResponse.errToReport.empty() ) { + numWCErrors++; + lastErrResponse = &wcResponse; + errors.append( wcResponse.errToReport ); + errorRawGLE.append( wcResponse.gleResponse ); + } + } + + // Always report what we found to match 2.4 behavior and for debugging + if ( wcResponses.size() == 1u ) { + result.append( "singleShard", wcResponses.front().shardHost ); + } + else { + result.append( "shards", shards.arr() ); + result.append( "shardRawGLE", shardRawGLE.obj() ); + } + + // Suppress write concern errors if a write error occurred, to match mongod behavior + if ( errorOccurred || numWCErrors == 0 ) + return true; + + if ( numWCErrors == 1 ) { + + // Return the single write concern error we found + result.appendElements( lastErrResponse->gleResponse ); + return lastErrResponse->gleResponse["ok"].trueValue(); + } + else { + + // Return a generic combined WC error message + result.append( "errs", errors.arr() ); + result.append( "errObjects", errorRawGLE.arr() ); + + return appendCommandStatus( result, + Status( ErrorCodes::WriteConcernFailed, + "multiple write concern errors occurred" ) ); + } + } + + private: + + HostOpTimeMap convertMap( const map<string, OpTime>& rawMap ) { + HostOpTimeMap parsedMap; + for ( map<string, OpTime>::const_iterator it = rawMap.begin(); it != rawMap.end(); + ++it ) { + string errMsg; + parsedMap[ConnectionString::parse( it->first, errMsg )] = it->second; + } + return parsedMap; } + } cmdGetLastError; } diff --git a/src/mongo/s/mock_multi_command.h b/src/mongo/s/mock_multi_write_command.h index 41a2d8f7d70..bb8301e4817 100644 --- a/src/mongo/s/mock_multi_command.h +++ b/src/mongo/s/mock_multi_write_command.h @@ -64,7 +64,7 @@ namespace mongo { * If an endpoint isn't registered with a MockEndpoint, just returns BatchedCommandResponses * with ok : true. */ - class MockMultiCommand : public MultiCommandDispatch { + class MockMultiWriteCommand : public MultiCommandDispatch { public: void init( const std::vector<MockEndpoint*> mockEndpoints ) { diff --git a/src/mongo/s/write_ops/batch_downconvert.cpp b/src/mongo/s/write_ops/batch_downconvert.cpp index dc4e723fd17..e3c0d6903b0 100644 --- a/src/mongo/s/write_ops/batch_downconvert.cpp +++ b/src/mongo/s/write_ops/batch_downconvert.cpp @@ -287,4 +287,216 @@ namespace mongo { response->setOk( true ); dassert( response->isValid( NULL ) ); } + + /** + * Suppress the "err" and "code" field if they are coming from a previous write error and + * are not related to write concern. Also removes any write stats information (e.g. "n") + * + * Also, In some cases, 2.4 GLE w/ wOpTime can give us duplicate "err" and "code" fields b/c of + * reporting a previous error. The later field is what we want - dedup and use later field. + * + * Returns the stripped GLE response. + */ + BSONObj BatchSafeWriter::stripNonWCInfo( const BSONObj& gleResponse ) { + + BSONObjIterator it( gleResponse ); + BSONObjBuilder builder; + + BSONElement codeField; // eoo + BSONElement errField; // eoo + + while ( it.more() ) { + BSONElement el = it.next(); + StringData fieldName( el.fieldName() ); + if ( fieldName.compare( "err" ) == 0 ) { + errField = el; + } + else if ( fieldName.compare( "code" ) == 0 ) { + codeField = el; + } + else if ( fieldName.compare( "n" ) == 0 || fieldName.compare( "nModified" ) == 0 + || fieldName.compare( "upserted" ) == 0 + || fieldName.compare( "updatedExisting" ) == 0 ) { + // Suppress field + } + else { + builder.append( el ); + } + } + + if ( !codeField.eoo() ) { + if ( !gleResponse["ok"].trueValue() ) { + // The last code will be from the write concern + builder.append( codeField ); + } + else { + // The code is from a non-wc error on this connection - suppress it + } + } + + if ( !errField.eoo() ) { + string err = errField.str(); + if ( err == "norepl" || err == "noreplset" || err == "timeout" ) { + // Append err if it's from a write concern issue + builder.append( errField ); + } + else { + // Suppress non-write concern err + } + } + + return builder.obj(); + } + + namespace { + + /** + * Trivial implementation of a BSON serializable object for backwards-compatibility. + * + * NOTE: This is not a good example of using BSONSerializable. For anything more complex, + * create an implementation with fields defined. + */ + class RawBSONSerializable : public BSONSerializable { + MONGO_DISALLOW_COPYING(RawBSONSerializable); + public: + + RawBSONSerializable() { + } + + RawBSONSerializable( const BSONObj& doc ) : + _doc( doc ) { + } + + bool isValid( std::string* errMsg ) const { + return true; + } + + BSONObj toBSON() const { + return _doc; + } + + bool parseBSON( const BSONObj& source, std::string* errMsg ) { + _doc = source.getOwned(); + return true; + } + + void clear() { + _doc = BSONObj(); + } + + string toString() const { + return toBSON().toString(); + } + + private: + + BSONObj _doc; + }; + } + + // Adds a wOpTime field to a set of gle options + static BSONObj buildGLECmdWithOpTime( const BSONObj& gleOptions, const OpTime& opTime ) { + BSONObjBuilder builder; + BSONObjIterator it( gleOptions ); + + for ( int i = 0; it.more(); ++i ) { + BSONElement el = it.next(); + + // Make sure first element is getLastError : 1 + if ( i == 0 ) { + StringData elName( el.fieldName() ); + if ( !elName.equalCaseInsensitive( "getLastError" ) ) { + builder.append( "getLastError", 1 ); + } + } + + builder.append( el ); + } + builder.appendTimestamp( "wOpTime", opTime.asDate() ); + return builder.obj(); + } + + Status enforceLegacyWriteConcern( MultiCommandDispatch* dispatcher, + const StringData& dbName, + const BSONObj& options, + const HostOpTimeMap& hostOpTimes, + vector<LegacyWCResponse>* legacyWCResponses ) { + + if ( hostOpTimes.empty() ) { + return Status::OK(); + } + + for ( HostOpTimeMap::const_iterator it = hostOpTimes.begin(); it != hostOpTimes.end(); + ++it ) { + + const ConnectionString& shardEndpoint = it->first; + const OpTime& opTime = it->second; + + LOG( 3 ) << "enforcing write concern " << options << " on " << shardEndpoint.toString() + << " at opTime " << opTime.toStringPretty() << endl; + + BSONObj gleCmd = buildGLECmdWithOpTime( options, opTime ); + + dispatcher->addCommand( shardEndpoint, dbName, RawBSONSerializable( gleCmd ) ); + } + + dispatcher->sendAll(); + + vector<Status> failedStatuses; + + while ( dispatcher->numPending() > 0 ) { + + ConnectionString shardEndpoint; + RawBSONSerializable gleResponseSerial; + + Status dispatchStatus = dispatcher->recvAny( &shardEndpoint, &gleResponseSerial ); + if ( !dispatchStatus.isOK() ) { + // We need to get all responses before returning + failedStatuses.push_back( dispatchStatus ); + continue; + } + + BSONObj gleResponse = BatchSafeWriter::stripNonWCInfo( gleResponseSerial.toBSON() ); + + // Use the downconversion tools to determine if this GLE response is ok, a + // write concern error, or an unknown error we should immediately abort for. + BatchSafeWriter::GLEErrors errors; + Status extractStatus = BatchSafeWriter::extractGLEErrors( gleResponse, &errors ); + if ( !extractStatus.isOK() ) { + failedStatuses.push_back( extractStatus ); + continue; + } + + LegacyWCResponse wcResponse; + wcResponse.shardHost = shardEndpoint.toString(); + wcResponse.gleResponse = gleResponse; + if ( errors.wcError.get() ) { + wcResponse.errToReport = errors.wcError->getErrMessage(); + } + + legacyWCResponses->push_back( wcResponse ); + } + + if ( failedStatuses.empty() ) { + return Status::OK(); + } + + StringBuilder builder; + builder << "could not enforce write concern"; + + for ( vector<Status>::const_iterator it = failedStatuses.begin(); + it != failedStatuses.end(); ++it ) { + const Status& failedStatus = *it; + if ( it == failedStatuses.begin() ) { + builder << causedBy( failedStatus.toString() ); + } + else { + builder << ":: and ::" << failedStatus.toString(); + } + } + + return Status( failedStatuses.size() == 1u ? failedStatuses.front().code() : + ErrorCodes::MultipleErrorsOccurred, + builder.str() ); + } } diff --git a/src/mongo/s/write_ops/batch_downconvert.h b/src/mongo/s/write_ops/batch_downconvert.h index 1c81858c55c..cd5440ae993 100644 --- a/src/mongo/s/write_ops/batch_downconvert.h +++ b/src/mongo/s/write_ops/batch_downconvert.h @@ -35,6 +35,8 @@ #include "mongo/bson/bsonobj.h" #include "mongo/bson/optime.h" #include "mongo/client/dbclientinterface.h" +#include "mongo/s/multi_command_dispatch.h" +#include "mongo/s/write_ops/batch_write_exec.h" #include "mongo/s/write_ops/batch_write_op.h" #include "mongo/s/write_ops/batched_command_request.h" #include "mongo/s/write_ops/batched_command_response.h" @@ -67,7 +69,7 @@ namespace mongo { * write command emulation. */ virtual Status enforceWriteConcern( DBClientBase* conn, - const std::string& dbName, + const StringData& dbName, const BSONObj& writeConcern, BSONObj* gleResponse ) = 0; }; @@ -91,10 +93,6 @@ namespace mongo { const BatchedCommandRequest& request, BatchedCommandResponse* response ); - // - // Exposed for testing - // - // Helper that acts as an auto-ptr for write and wc errors struct GLEErrors { auto_ptr<WriteErrorDetail> writeError; @@ -124,9 +122,33 @@ namespace mongo { */ static void extractGLEStats( const BSONObj& gleResponse, GLEStats* stats ); + /** + * Given a GLE response, strips out all non-write-concern related information + */ + static BSONObj stripNonWCInfo( const BSONObj& gleResponse ); + private: SafeWriter* _safeWriter; }; + // Used for reporting legacy write concern responses + struct LegacyWCResponse { + string shardHost; + BSONObj gleResponse; + string errToReport; + }; + + /** + * Uses GLE and the shard hosts and opTimes last written by write commands to enforce a + * write concern across the previously used shards. + * + * Returns OK with the LegacyWCResponses containing only write concern error information + * Returns !OK if there was an error getting a GLE response + */ + Status enforceLegacyWriteConcern( MultiCommandDispatch* dispatcher, + const StringData& dbName, + const BSONObj& options, + const HostOpTimeMap& hostOpTimes, + vector<LegacyWCResponse>* wcResponses ); } diff --git a/src/mongo/s/write_ops/batch_downconvert_test.cpp b/src/mongo/s/write_ops/batch_downconvert_test.cpp index e38f941446a..e6b9d7e7874 100644 --- a/src/mongo/s/write_ops/batch_downconvert_test.cpp +++ b/src/mongo/s/write_ops/batch_downconvert_test.cpp @@ -33,6 +33,7 @@ #include "mongo/db/jsobj.h" #include "mongo/db/json.h" +#include "mongo/s/multi_command_dispatch.h" #include "mongo/unittest/unittest.h" namespace { @@ -237,7 +238,7 @@ namespace { } Status enforceWriteConcern( DBClientBase* conn, - const std::string& dbName, + const StringData& dbName, const BSONObj& writeConcern, BSONObj* gleResponse ) { BSONObj response = _gleResponses.front(); @@ -424,5 +425,208 @@ namespace { ASSERT_EQUALS( response.getLastOp().toStringPretty(), OpTime(20, 0).toStringPretty() ); } + // + // Tests of processing and suppressing non-WC related fields from legacy GLE responses + // + + TEST(LegacyGLESuppress, Basic) { + + const BSONObj gleResponse = fromjson( "{ok: 1.0, err: null}" ); + + BSONObj stripped = BatchSafeWriter::stripNonWCInfo( gleResponse ); + ASSERT_EQUALS( stripped.nFields(), 1 ); + ASSERT( stripped["ok"].trueValue() ); + } + + TEST(LegacyGLESuppress, BasicStats) { + + const BSONObj gleResponse = + fromjson( "{ok: 0.0, err: 'message'," + " n: 1, nModified: 1, upserted: 'abc', updatedExisting: true}" ); + + BSONObj stripped = BatchSafeWriter::stripNonWCInfo( gleResponse ); + ASSERT_EQUALS( stripped.nFields(), 1 ); + ASSERT( !stripped["ok"].trueValue() ); + } + + TEST(LegacyGLESuppress, ReplError) { + + const BSONObj gleResponse = + fromjson( "{ok: 0.0, err: 'norepl', n: 1, wcField: true}" ); + + BSONObj stripped = BatchSafeWriter::stripNonWCInfo( gleResponse ); + ASSERT_EQUALS( stripped.nFields(), 3 ); + ASSERT( !stripped["ok"].trueValue() ); + ASSERT_EQUALS( stripped["err"].str(), "norepl" ); + ASSERT( stripped["wcField"].trueValue() ); + } + + TEST(LegacyGLESuppress, StripCode) { + + const BSONObj gleResponse = + fromjson( "{ok: 1.0, err: 'message', code: 12345}" ); + + BSONObj stripped = BatchSafeWriter::stripNonWCInfo( gleResponse ); + ASSERT_EQUALS( stripped.nFields(), 1 ); + ASSERT( stripped["ok"].trueValue() ); + } + + TEST(LegacyGLESuppress, TimeoutDupError24) { + + const BSONObj gleResponse = + BSON( "ok" << 0.0 << "err" << "message" << "code" << 12345 + << "err" << "timeout" << "code" << 56789 << "wtimeout" << true ); + + BSONObj stripped = BatchSafeWriter::stripNonWCInfo( gleResponse ); + ASSERT_EQUALS( stripped.nFields(), 4 ); + ASSERT( !stripped["ok"].trueValue() ); + ASSERT_EQUALS( stripped["err"].str(), "timeout" ); + ASSERT_EQUALS( stripped["code"].numberInt(), 56789 ); + ASSERT( stripped["wtimeout"].trueValue() ); + } + + // + // Tests of basic logical dispatching and aggregation for legacy GLE-based write concern + // + + class MockCommandDispatch : public MultiCommandDispatch { + public: + + MockCommandDispatch( const vector<BSONObj>& gleResponses ) : + _gleResponses( gleResponses.begin(), gleResponses.end() ) { + } + + virtual ~MockCommandDispatch() { + } + + void addCommand( const ConnectionString& endpoint, + const StringData& dbName, + const BSONSerializable& request ) { + _gleHosts.push_back( endpoint ); + } + + void sendAll() { + // No-op + } + + /** + * Returns the number of sent requests that are still waiting to be recv'd. + */ + int numPending() const { + return _gleHosts.size(); + } + + Status recvAny( ConnectionString* endpoint, BSONSerializable* response ) { + *endpoint = _gleHosts.front(); + response->parseBSON( _gleResponses.front(), NULL ); + _gleHosts.pop_front(); + _gleResponses.pop_front(); + return Status::OK(); + } + + private: + + deque<ConnectionString> _gleHosts; + deque<BSONObj> _gleResponses; + }; + + TEST(LegacyGLEWriteConcern, Basic) { + + HostOpTimeMap hostOpTimes; + hostOpTimes[ConnectionString::mock(HostAndPort("shardA:1000"))] = OpTime(); + hostOpTimes[ConnectionString::mock(HostAndPort("shardB:1000"))] = OpTime(); + + vector<BSONObj> gleResponses; + gleResponses.push_back( fromjson( "{ok: 1.0, err: null}" ) ); + gleResponses.push_back( fromjson( "{ok: 1.0, err: null}" ) ); + + MockCommandDispatch dispatcher( gleResponses ); + vector<LegacyWCResponse> wcResponses; + + Status status = enforceLegacyWriteConcern( &dispatcher, + "db", + BSONObj(), + hostOpTimes, + &wcResponses ); + + ASSERT_OK( status ); + ASSERT_EQUALS( wcResponses.size(), 2u ); + } + + TEST(LegacyGLEWriteConcern, FailGLE) { + + HostOpTimeMap hostOpTimes; + hostOpTimes[ConnectionString::mock(HostAndPort("shardA:1000"))] = OpTime(); + hostOpTimes[ConnectionString::mock(HostAndPort("shardB:1000"))] = OpTime(); + + vector<BSONObj> gleResponses; + gleResponses.push_back( fromjson( "{ok: 0.0, errmsg: 'something'}" ) ); + gleResponses.push_back( fromjson( "{ok: 1.0, err: null}" ) ); + + MockCommandDispatch dispatcher( gleResponses ); + vector<LegacyWCResponse> wcResponses; + + Status status = enforceLegacyWriteConcern( &dispatcher, + "db", + BSONObj(), + hostOpTimes, + &wcResponses ); + + ASSERT_NOT_OK( status ); + // Ensure we keep getting the rest of the responses + ASSERT_EQUALS( wcResponses.size(), 1u ); + } + + TEST(LegacyGLEWriteConcern, MultiWCErrors) { + + HostOpTimeMap hostOpTimes; + hostOpTimes[ConnectionString::mock( HostAndPort( "shardA:1000" ) )] = OpTime(); + hostOpTimes[ConnectionString::mock( HostAndPort( "shardB:1000" ) )] = OpTime(); + + vector<BSONObj> gleResponses; + gleResponses.push_back( fromjson( "{ok: 0.0, err: 'norepl'}" ) ); + gleResponses.push_back( fromjson( "{ok: 0.0, err: 'timeout', wtimeout: true}" ) ); + + MockCommandDispatch dispatcher( gleResponses ); + vector<LegacyWCResponse> wcResponses; + + Status status = enforceLegacyWriteConcern( &dispatcher, + "db", + BSONObj(), + hostOpTimes, + &wcResponses ); + + ASSERT_OK( status ); + ASSERT_EQUALS( wcResponses.size(), 2u ); + ASSERT_EQUALS( wcResponses[0].shardHost, "shardA:1000" ); + ASSERT_EQUALS( wcResponses[0].gleResponse["err"].str(), "norepl" ); + ASSERT_EQUALS( wcResponses[0].errToReport, "norepl" ); + ASSERT_EQUALS( wcResponses[1].shardHost, "shardB:1000" ); + ASSERT_EQUALS( wcResponses[1].gleResponse["err"].str(), "timeout" ); + ASSERT_EQUALS( wcResponses[1].errToReport, "timeout" ); + } + + TEST(LegacyGLEWriteConcern, MultiFailGLE) { + + HostOpTimeMap hostOpTimes; + hostOpTimes[ConnectionString::mock(HostAndPort("shardA:1000"))] = OpTime(); + hostOpTimes[ConnectionString::mock(HostAndPort("shardB:1000"))] = OpTime(); + + vector<BSONObj> gleResponses; + gleResponses.push_back( fromjson( "{ok: 0.0, errmsg: 'something'}" ) ); + gleResponses.push_back( fromjson( "{ok: 0.0, errmsg: 'something'}" ) ); + + MockCommandDispatch dispatcher( gleResponses ); + vector<LegacyWCResponse> wcResponses; + + Status status = enforceLegacyWriteConcern( &dispatcher, + "db", + BSONObj(), + hostOpTimes, + &wcResponses ); + + ASSERT_NOT_OK( status ); + ASSERT_EQUALS( wcResponses.size(), 0u ); + } } diff --git a/src/mongo/s/write_ops/batch_write_exec_test.cpp b/src/mongo/s/write_ops/batch_write_exec_test.cpp index 7fa0bbe5cae..acf62ab2027 100644 --- a/src/mongo/s/write_ops/batch_write_exec_test.cpp +++ b/src/mongo/s/write_ops/batch_write_exec_test.cpp @@ -29,7 +29,7 @@ #include "mongo/s/write_ops/batch_write_exec.h" #include "mongo/base/owned_pointer_vector.h" -#include "mongo/s/mock_multi_command.h" +#include "mongo/s/mock_multi_write_command.h" #include "mongo/s/mock_ns_targeter.h" #include "mongo/s/mock_shard_resolver.h" #include "mongo/s/write_ops/batched_command_request.h" @@ -74,7 +74,7 @@ namespace { MockNSTargeter targeter; targeter.init( mockRanges ); - MockMultiCommand dispatcher; + MockMultiWriteCommand dispatcher; BatchWriteExec exec( &targeter, &resolver, &dispatcher ); @@ -122,7 +122,7 @@ namespace { MockNSTargeter targeter; targeter.init( mockRanges ); - MockMultiCommand dispatcher; + MockMultiWriteCommand dispatcher; dispatcher.init( mockEndpoints ); BatchWriteExec exec( &targeter, &resolver, &dispatcher ); @@ -180,7 +180,7 @@ namespace { MockNSTargeter targeter; targeter.init( mockRanges ); - MockMultiCommand dispatcher; + MockMultiWriteCommand dispatcher; dispatcher.init( mockEndpoints ); BatchWriteExec exec( &targeter, &resolver, &dispatcher ); diff --git a/src/mongo/s/write_ops/config_coordinator_test.cpp b/src/mongo/s/write_ops/config_coordinator_test.cpp index f8120bfedaa..812aec7729e 100644 --- a/src/mongo/s/write_ops/config_coordinator_test.cpp +++ b/src/mongo/s/write_ops/config_coordinator_test.cpp @@ -31,7 +31,7 @@ #include <vector> #include "mongo/client/dbclientinterface.h" -#include "mongo/s/mock_multi_command.h" +#include "mongo/s/mock_multi_write_command.h" #include "mongo/unittest/unittest.h" namespace { @@ -44,7 +44,7 @@ namespace { // TEST(ConfigCoordinatorTests, Basic) { - MockMultiCommand dispatcher; + MockMultiWriteCommand dispatcher; vector<ConnectionString> configHosts; ConfigCoordinator exec( &dispatcher, configHosts ); } diff --git a/src/mongo/s/write_ops/dbclient_safe_writer.cpp b/src/mongo/s/write_ops/dbclient_safe_writer.cpp index 1f8177fd951..b3eec6d757e 100644 --- a/src/mongo/s/write_ops/dbclient_safe_writer.cpp +++ b/src/mongo/s/write_ops/dbclient_safe_writer.cpp @@ -91,19 +91,19 @@ namespace mongo { } Status DBClientSafeWriter::enforceWriteConcern( DBClientBase* conn, - const string& dbName, + const StringData& dbName, const BSONObj& writeConcern, BSONObj* gleResponse ) { try { BSONObj resetResponse; // ignored, always ok - conn->runCommand( dbName, BSON( "resetError" << 1 ), resetResponse ); + conn->runCommand( dbName.toString(), BSON( "resetError" << 1 ), resetResponse ); BSONObjBuilder gleCmdB; gleCmdB.append( "getLastError", true ); gleCmdB.appendElements( writeConcern ); - conn->runCommand( dbName, gleCmdB.obj(), *gleResponse ); + conn->runCommand( dbName.toString(), gleCmdB.obj(), *gleResponse ); } catch ( const DBException& ex ) { return ex.toStatus(); diff --git a/src/mongo/s/write_ops/dbclient_safe_writer.h b/src/mongo/s/write_ops/dbclient_safe_writer.h index cb1831c3770..c4656281651 100644 --- a/src/mongo/s/write_ops/dbclient_safe_writer.h +++ b/src/mongo/s/write_ops/dbclient_safe_writer.h @@ -53,7 +53,7 @@ namespace mongo { BSONObj* gleResponse ); Status enforceWriteConcern( DBClientBase* conn, - const std::string& dbName, + const StringData& dbName, const BSONObj& writeConcern, BSONObj* gleResponse ); |