diff options
author | Greg Studer <greg@10gen.com> | 2013-11-07 10:17:57 -0500 |
---|---|---|
committer | Greg Studer <greg@10gen.com> | 2013-11-08 10:11:58 -0500 |
commit | dd0857bd037487c65b12113f042510b88c08bea4 (patch) | |
tree | c0f9b1862beb6f2537c80513c59d80b546de233d /src/mongo/s | |
parent | b8fd0b30946d0e1599cb730c155a01a25e572f24 (diff) | |
download | mongo-dd0857bd037487c65b12113f042510b88c08bea4.tar.gz |
SERVER-10818 fixes for basic batch downconversion
Diffstat (limited to 'src/mongo/s')
-rw-r--r-- | src/mongo/s/batch_downconvert.cpp | 67 | ||||
-rw-r--r-- | src/mongo/s/chunk.cpp | 5 | ||||
-rw-r--r-- | src/mongo/s/chunk.h | 2 | ||||
-rw-r--r-- | src/mongo/s/chunk_manager_targeter.cpp | 55 | ||||
-rw-r--r-- | src/mongo/s/dbclient_multi_command.cpp | 7 | ||||
-rw-r--r-- | src/mongo/s/dbclient_safe_writer.cpp | 12 | ||||
-rw-r--r-- | src/mongo/s/strategy_shard.cpp | 6 |
7 files changed, 119 insertions, 35 deletions
diff --git a/src/mongo/s/batch_downconvert.cpp b/src/mongo/s/batch_downconvert.cpp index 5f0a637514d..ac5910089bc 100644 --- a/src/mongo/s/batch_downconvert.cpp +++ b/src/mongo/s/batch_downconvert.cpp @@ -35,12 +35,11 @@ namespace mongo { void SafeWriter::fillLastError( const BSONObj& gleResult, LastError* error ) { if ( gleResult["code"].isNumber() ) error->code = gleResult["code"].numberInt(); if ( !gleResult["err"].eoo() ) { - if ( gleResult["err"].type() == jstNULL ) { - error->msg = "empty error message"; + if ( gleResult["err"].type() == String ) { + error->msg = gleResult["err"].String(); } else { - dassert( gleResult["err"].type() == String ); - error->msg = gleResult["err"].String(); + dassert( gleResult["err"].type() == jstNULL ); } } if ( gleResult["n"].isNumber() ) error->nObjects = gleResult["n"].numberInt(); @@ -55,6 +54,11 @@ namespace mongo { dassert( gleResult["upserted"].isABSONObj() ); error->upsertedId = gleResult["upserted"].Obj(); } + // Needed b/c we detect stale config on legacy hosts via this field + if ( !gleResult["writeback"].eoo() ) { + dassert( gleResult["writeback"].type() == jstOID ); + error->writebackId = gleResult["writeback"].OID(); + } } bool BatchSafeWriter::isFailedOp( const LastError& error ) { @@ -63,13 +67,24 @@ namespace mongo { BatchedErrorDetail* BatchSafeWriter::lastErrorToBatchError( const LastError& lastError ) { - if ( BatchSafeWriter::isFailedOp( lastError ) ) { + bool isFailedOp = lastError.msg != ""; + bool isStaleOp = lastError.writebackId.isSet(); + dassert( !( isFailedOp && isStaleOp ) ); + + if ( isFailedOp ) { BatchedErrorDetail* batchError = new BatchedErrorDetail; if ( lastError.code != 0 ) batchError->setErrCode( lastError.code ); else batchError->setErrCode( ErrorCodes::UnknownError ); batchError->setErrMessage( lastError.msg ); return batchError; } + else if ( isStaleOp ) { + BatchedErrorDetail* batchError = new BatchedErrorDetail; + batchError->setErrCode( ErrorCodes::StaleShardVersion ); + batchError->setErrInfo( BSON( "downconvert" << true ) ); // For debugging + batchError->setErrMessage( "shard version was stale" ); + return batchError; + } return NULL; } @@ -78,6 +93,9 @@ namespace mongo { const BatchedCommandRequest& request, BatchedCommandResponse* response ) { + // N starts at zero, and we add to it for each item + response->setN( 0 ); + for ( size_t i = 0; i < request.sizeWriteOps(); ++i ) { BatchItemRef itemRef( &request, static_cast<int>( i ) ); @@ -87,13 +105,46 @@ namespace mongo { // Register the error if we need to BatchedErrorDetail* batchError = lastErrorToBatchError( lastError ); - batchError->setIndex( i ); - response->addToErrDetails( batchError ); + if ( batchError ) { + batchError->setIndex( i ); + response->addToErrDetails( batchError ); + } + + response->setN( response->getN() + lastError.nObjects ); - // TODO: Other stats, etc. + if ( !lastError.upsertedId.isEmpty() ) { + BatchedUpsertDetail* upsertedId = new BatchedUpsertDetail; + upsertedId->setIndex( i ); + upsertedId->setUpsertedID( lastError.upsertedId ); + response->addToUpsertDetails( upsertedId ); + } // Break on first error if we're ordered if ( request.getOrdered() && BatchSafeWriter::isFailedOp( lastError ) ) break; } + + if ( request.sizeWriteOps() == 1 && response->isErrDetailsSet() + && !response->isErrCodeSet() ) { + + // Promote single error to batch error + const BatchedErrorDetail* error = response->getErrDetailsAt( 0 ); + response->setErrCode( error->getErrCode() ); + if ( error->isErrInfoSet() ) response->setErrInfo( error->getErrInfo() ); + response->setErrMessage( error->getErrMessage() ); + + response->unsetErrDetails(); + } + + if ( request.sizeWriteOps() == 1 && response->isUpsertDetailsSet() ) { + + // Promote single upsert to batch upsert + const BatchedUpsertDetail* upsertedId = response->getUpsertDetailsAt( 0 ); + response->setSingleUpserted( upsertedId->getUpsertedID() ); + + response->unsetUpsertDetails(); + } + + response->setOk( !response->isErrCodeSet() ); + dassert( response->isValid( NULL ) ); } } diff --git a/src/mongo/s/chunk.cpp b/src/mongo/s/chunk.cpp index ebffb8a5fa0..5ed75c2a53d 100644 --- a/src/mongo/s/chunk.cpp +++ b/src/mongo/s/chunk.cpp @@ -1303,6 +1303,11 @@ namespace mongo { configServer.logChange( "dropCollection" , _ns , BSONObj() ); } + ChunkVersion ChunkManager::getVersion( const StringData& shardName ) const { + // NOTE: The empty-address Shard constructor is needed to avoid triggering a reload + return getVersion( Shard( shardName.toString(), "" ) ); + } + ChunkVersion ChunkManager::getVersion( const Shard& shard ) const { ShardVersionMap::const_iterator i = _shardVersions.find( shard ); if ( i == _shardVersions.end() ) diff --git a/src/mongo/s/chunk.h b/src/mongo/s/chunk.h index 25d36d44e4a..5b5eb7db1f1 100644 --- a/src/mongo/s/chunk.h +++ b/src/mongo/s/chunk.h @@ -30,6 +30,7 @@ #pragma once +#include "mongo/base/string_data.h" #include "mongo/bson/util/atomic_int.h" #include "mongo/client/distlock.h" #include "mongo/s/chunk_version.h" @@ -436,6 +437,7 @@ namespace mongo { string toString() const; + ChunkVersion getVersion( const StringData& shardName ) const; ChunkVersion getVersion( const Shard& shard ) const; ChunkVersion getVersion() const; diff --git a/src/mongo/s/chunk_manager_targeter.cpp b/src/mongo/s/chunk_manager_targeter.cpp index cc31e67b2ca..58c1ee2be46 100644 --- a/src/mongo/s/chunk_manager_targeter.cpp +++ b/src/mongo/s/chunk_manager_targeter.cpp @@ -177,26 +177,6 @@ namespace mongo { return Status::OK(); } - void ChunkManagerTargeter::noteStaleResponse( const ShardEndpoint& endpoint, - const BSONObj& staleInfo ) { - dassert( !_needsTargetingRefresh ); - - ChunkVersion remoteShardVersion = ChunkVersion::fromBSON( staleInfo, "vWanted" ); - - // We assume here that we can't have more than one stale config per-shard - dassert( _remoteShardVersions.find( endpoint.shardName ) == _remoteShardVersions.end() ); - _remoteShardVersions.insert( make_pair( endpoint.shardName, remoteShardVersion ) ); - } - - void ChunkManagerTargeter::noteCouldNotTarget() { - dassert( _remoteShardVersions.empty() ); - _needsTargetingRefresh = true; - } - - const TargeterStats* ChunkManagerTargeter::getStats() const { - return _stats.get(); - } - namespace { // @@ -230,8 +210,7 @@ namespace mongo { else return CompareResult_GTE; } - // NOTE: CAN THROW, since Shard() throws - ChunkVersion getShardVersion( const string& shardName, + ChunkVersion getShardVersion( const StringData& shardName, const ChunkManagerPtr& manager, const ShardPtr& primary ) { @@ -240,7 +219,7 @@ namespace mongo { if ( primary ) return ChunkVersion::UNSHARDED(); - return manager->getVersion( Shard( shardName ) ); + return manager->getVersion( shardName ); } /** @@ -325,6 +304,36 @@ namespace mongo { } } + void ChunkManagerTargeter::noteStaleResponse( const ShardEndpoint& endpoint, + const BSONObj& staleInfo ) { + dassert( !_needsTargetingRefresh ); + + ChunkVersion remoteShardVersion; + if ( staleInfo["vWanted"].eoo() ) { + // If we don't have a vWanted sent, assume the version is higher than our current + // version. + remoteShardVersion = getShardVersion( endpoint.shardName, _manager, _primary ); + remoteShardVersion.incMajor(); + } + else { + remoteShardVersion = ChunkVersion::fromBSON( staleInfo, "vWanted" ); + } + + // We assume here that we can't have more than one stale config per-shard + dassert( _remoteShardVersions.find( endpoint.shardName ) == _remoteShardVersions.end() ); + + _remoteShardVersions.insert( make_pair( endpoint.shardName, remoteShardVersion ) ); + } + + void ChunkManagerTargeter::noteCouldNotTarget() { + dassert( _remoteShardVersions.empty() ); + _needsTargetingRefresh = true; + } + + const TargeterStats* ChunkManagerTargeter::getStats() const { + return _stats.get(); + } + Status ChunkManagerTargeter::refreshIfNeeded() { // diff --git a/src/mongo/s/dbclient_multi_command.cpp b/src/mongo/s/dbclient_multi_command.cpp index ebe2678804f..dd16eab2803 100644 --- a/src/mongo/s/dbclient_multi_command.cpp +++ b/src/mongo/s/dbclient_multi_command.cpp @@ -90,10 +90,9 @@ namespace mongo { BatchedCommandRequest request( getBatchWriteType( cmdRequest ) ); // This should *always* parse correctly - string errMsg; - bool parsed = !request.parseBSON( cmdRequest, &errMsg ); + bool parsed = request.parseBSON( cmdRequest, NULL ); (void) parsed; // for non-debug compile - dassert( parsed && request.isValid( &errMsg ) ); + dassert( parsed && request.isValid( NULL ) ); // Collection name is sent without db to the dispatcher request.setNS( dbName.toString() + "." + request.getNS() ); @@ -104,7 +103,7 @@ namespace mongo { batchSafeWriter.safeWriteBatch( conn, request, &response ); // Back to BSON - dassert( response.isValid( &errMsg ) ); + dassert( response.isValid( NULL ) ); *cmdResponse = response.toBSON(); } } diff --git a/src/mongo/s/dbclient_safe_writer.cpp b/src/mongo/s/dbclient_safe_writer.cpp index 2240d28f982..551ed93c50e 100644 --- a/src/mongo/s/dbclient_safe_writer.cpp +++ b/src/mongo/s/dbclient_safe_writer.cpp @@ -28,6 +28,7 @@ #include "mongo/s/dbclient_safe_writer.h" +#include "mongo/s/version_manager.h" #include "mongo/util/assert_util.h" namespace mongo { @@ -39,6 +40,17 @@ namespace mongo { const BatchedCommandRequest* request = itemRef.getRequest(); try { + + // Default settings for checkShardVersion + const bool authoritative = false; + const int tryNum = 1; + + // We need to set our version using setShardVersion, managed by checkShardVersionCB + versionManager.checkShardVersionCB( conn, + request->getTargetingNS(), + authoritative, + tryNum ); + if ( request->getBatchType() == BatchedCommandRequest::BatchType_Insert ) { conn->insert( request->getNS(), request->getInsertRequest()->getDocumentsAt( itemRef.getItemIndex() ), diff --git a/src/mongo/s/strategy_shard.cpp b/src/mongo/s/strategy_shard.cpp index 7ff7389ca88..25399b38267 100644 --- a/src/mongo/s/strategy_shard.cpp +++ b/src/mongo/s/strategy_shard.cpp @@ -1302,6 +1302,12 @@ namespace mongo { // Valve for turning on upconversion for all batch write commands if ( useClusterWriteCommands ) { + // Ignore all messages from the WBL if we're using cluster write commands + // TODO: We'll have to turn it off at some point, but this is an easy way to ensure + // it has no impact. + bool fromWBL = r.d().reservedField() & Reserved_FromWriteback; + if ( fromWBL ) return; + auto_ptr<BatchedCommandRequest> request( msgToBatchRequest( r.m() ) ); BatchedCommandResponse response; |