summaryrefslogtreecommitdiff
path: root/src/mongo/s
diff options
context:
space:
mode:
authorGreg Studer <greg@10gen.com>2013-11-07 10:17:57 -0500
committerGreg Studer <greg@10gen.com>2013-11-08 10:11:58 -0500
commitdd0857bd037487c65b12113f042510b88c08bea4 (patch)
treec0f9b1862beb6f2537c80513c59d80b546de233d /src/mongo/s
parentb8fd0b30946d0e1599cb730c155a01a25e572f24 (diff)
downloadmongo-dd0857bd037487c65b12113f042510b88c08bea4.tar.gz
SERVER-10818 fixes for basic batch downconversion
Diffstat (limited to 'src/mongo/s')
-rw-r--r--src/mongo/s/batch_downconvert.cpp67
-rw-r--r--src/mongo/s/chunk.cpp5
-rw-r--r--src/mongo/s/chunk.h2
-rw-r--r--src/mongo/s/chunk_manager_targeter.cpp55
-rw-r--r--src/mongo/s/dbclient_multi_command.cpp7
-rw-r--r--src/mongo/s/dbclient_safe_writer.cpp12
-rw-r--r--src/mongo/s/strategy_shard.cpp6
7 files changed, 119 insertions, 35 deletions
diff --git a/src/mongo/s/batch_downconvert.cpp b/src/mongo/s/batch_downconvert.cpp
index 5f0a637514d..ac5910089bc 100644
--- a/src/mongo/s/batch_downconvert.cpp
+++ b/src/mongo/s/batch_downconvert.cpp
@@ -35,12 +35,11 @@ namespace mongo {
void SafeWriter::fillLastError( const BSONObj& gleResult, LastError* error ) {
if ( gleResult["code"].isNumber() ) error->code = gleResult["code"].numberInt();
if ( !gleResult["err"].eoo() ) {
- if ( gleResult["err"].type() == jstNULL ) {
- error->msg = "empty error message";
+ if ( gleResult["err"].type() == String ) {
+ error->msg = gleResult["err"].String();
}
else {
- dassert( gleResult["err"].type() == String );
- error->msg = gleResult["err"].String();
+ dassert( gleResult["err"].type() == jstNULL );
}
}
if ( gleResult["n"].isNumber() ) error->nObjects = gleResult["n"].numberInt();
@@ -55,6 +54,11 @@ namespace mongo {
dassert( gleResult["upserted"].isABSONObj() );
error->upsertedId = gleResult["upserted"].Obj();
}
+ // Needed b/c we detect stale config on legacy hosts via this field
+ if ( !gleResult["writeback"].eoo() ) {
+ dassert( gleResult["writeback"].type() == jstOID );
+ error->writebackId = gleResult["writeback"].OID();
+ }
}
bool BatchSafeWriter::isFailedOp( const LastError& error ) {
@@ -63,13 +67,24 @@ namespace mongo {
BatchedErrorDetail* BatchSafeWriter::lastErrorToBatchError( const LastError& lastError ) {
- if ( BatchSafeWriter::isFailedOp( lastError ) ) {
+ bool isFailedOp = lastError.msg != "";
+ bool isStaleOp = lastError.writebackId.isSet();
+ dassert( !( isFailedOp && isStaleOp ) );
+
+ if ( isFailedOp ) {
BatchedErrorDetail* batchError = new BatchedErrorDetail;
if ( lastError.code != 0 ) batchError->setErrCode( lastError.code );
else batchError->setErrCode( ErrorCodes::UnknownError );
batchError->setErrMessage( lastError.msg );
return batchError;
}
+ else if ( isStaleOp ) {
+ BatchedErrorDetail* batchError = new BatchedErrorDetail;
+ batchError->setErrCode( ErrorCodes::StaleShardVersion );
+ batchError->setErrInfo( BSON( "downconvert" << true ) ); // For debugging
+ batchError->setErrMessage( "shard version was stale" );
+ return batchError;
+ }
return NULL;
}
@@ -78,6 +93,9 @@ namespace mongo {
const BatchedCommandRequest& request,
BatchedCommandResponse* response ) {
+ // N starts at zero, and we add to it for each item
+ response->setN( 0 );
+
for ( size_t i = 0; i < request.sizeWriteOps(); ++i ) {
BatchItemRef itemRef( &request, static_cast<int>( i ) );
@@ -87,13 +105,46 @@ namespace mongo {
// Register the error if we need to
BatchedErrorDetail* batchError = lastErrorToBatchError( lastError );
- batchError->setIndex( i );
- response->addToErrDetails( batchError );
+ if ( batchError ) {
+ batchError->setIndex( i );
+ response->addToErrDetails( batchError );
+ }
+
+ response->setN( response->getN() + lastError.nObjects );
- // TODO: Other stats, etc.
+ if ( !lastError.upsertedId.isEmpty() ) {
+ BatchedUpsertDetail* upsertedId = new BatchedUpsertDetail;
+ upsertedId->setIndex( i );
+ upsertedId->setUpsertedID( lastError.upsertedId );
+ response->addToUpsertDetails( upsertedId );
+ }
// Break on first error if we're ordered
if ( request.getOrdered() && BatchSafeWriter::isFailedOp( lastError ) ) break;
}
+
+ if ( request.sizeWriteOps() == 1 && response->isErrDetailsSet()
+ && !response->isErrCodeSet() ) {
+
+ // Promote single error to batch error
+ const BatchedErrorDetail* error = response->getErrDetailsAt( 0 );
+ response->setErrCode( error->getErrCode() );
+ if ( error->isErrInfoSet() ) response->setErrInfo( error->getErrInfo() );
+ response->setErrMessage( error->getErrMessage() );
+
+ response->unsetErrDetails();
+ }
+
+ if ( request.sizeWriteOps() == 1 && response->isUpsertDetailsSet() ) {
+
+ // Promote single upsert to batch upsert
+ const BatchedUpsertDetail* upsertedId = response->getUpsertDetailsAt( 0 );
+ response->setSingleUpserted( upsertedId->getUpsertedID() );
+
+ response->unsetUpsertDetails();
+ }
+
+ response->setOk( !response->isErrCodeSet() );
+ dassert( response->isValid( NULL ) );
}
}
diff --git a/src/mongo/s/chunk.cpp b/src/mongo/s/chunk.cpp
index ebffb8a5fa0..5ed75c2a53d 100644
--- a/src/mongo/s/chunk.cpp
+++ b/src/mongo/s/chunk.cpp
@@ -1303,6 +1303,11 @@ namespace mongo {
configServer.logChange( "dropCollection" , _ns , BSONObj() );
}
+ ChunkVersion ChunkManager::getVersion( const StringData& shardName ) const {
+ // NOTE: The empty-address Shard constructor is needed to avoid triggering a reload
+ return getVersion( Shard( shardName.toString(), "" ) );
+ }
+
ChunkVersion ChunkManager::getVersion( const Shard& shard ) const {
ShardVersionMap::const_iterator i = _shardVersions.find( shard );
if ( i == _shardVersions.end() )
diff --git a/src/mongo/s/chunk.h b/src/mongo/s/chunk.h
index 25d36d44e4a..5b5eb7db1f1 100644
--- a/src/mongo/s/chunk.h
+++ b/src/mongo/s/chunk.h
@@ -30,6 +30,7 @@
#pragma once
+#include "mongo/base/string_data.h"
#include "mongo/bson/util/atomic_int.h"
#include "mongo/client/distlock.h"
#include "mongo/s/chunk_version.h"
@@ -436,6 +437,7 @@ namespace mongo {
string toString() const;
+ ChunkVersion getVersion( const StringData& shardName ) const;
ChunkVersion getVersion( const Shard& shard ) const;
ChunkVersion getVersion() const;
diff --git a/src/mongo/s/chunk_manager_targeter.cpp b/src/mongo/s/chunk_manager_targeter.cpp
index cc31e67b2ca..58c1ee2be46 100644
--- a/src/mongo/s/chunk_manager_targeter.cpp
+++ b/src/mongo/s/chunk_manager_targeter.cpp
@@ -177,26 +177,6 @@ namespace mongo {
return Status::OK();
}
- void ChunkManagerTargeter::noteStaleResponse( const ShardEndpoint& endpoint,
- const BSONObj& staleInfo ) {
- dassert( !_needsTargetingRefresh );
-
- ChunkVersion remoteShardVersion = ChunkVersion::fromBSON( staleInfo, "vWanted" );
-
- // We assume here that we can't have more than one stale config per-shard
- dassert( _remoteShardVersions.find( endpoint.shardName ) == _remoteShardVersions.end() );
- _remoteShardVersions.insert( make_pair( endpoint.shardName, remoteShardVersion ) );
- }
-
- void ChunkManagerTargeter::noteCouldNotTarget() {
- dassert( _remoteShardVersions.empty() );
- _needsTargetingRefresh = true;
- }
-
- const TargeterStats* ChunkManagerTargeter::getStats() const {
- return _stats.get();
- }
-
namespace {
//
@@ -230,8 +210,7 @@ namespace mongo {
else return CompareResult_GTE;
}
- // NOTE: CAN THROW, since Shard() throws
- ChunkVersion getShardVersion( const string& shardName,
+ ChunkVersion getShardVersion( const StringData& shardName,
const ChunkManagerPtr& manager,
const ShardPtr& primary ) {
@@ -240,7 +219,7 @@ namespace mongo {
if ( primary ) return ChunkVersion::UNSHARDED();
- return manager->getVersion( Shard( shardName ) );
+ return manager->getVersion( shardName );
}
/**
@@ -325,6 +304,36 @@ namespace mongo {
}
}
+ void ChunkManagerTargeter::noteStaleResponse( const ShardEndpoint& endpoint,
+ const BSONObj& staleInfo ) {
+ dassert( !_needsTargetingRefresh );
+
+ ChunkVersion remoteShardVersion;
+ if ( staleInfo["vWanted"].eoo() ) {
+ // If we don't have a vWanted sent, assume the version is higher than our current
+ // version.
+ remoteShardVersion = getShardVersion( endpoint.shardName, _manager, _primary );
+ remoteShardVersion.incMajor();
+ }
+ else {
+ remoteShardVersion = ChunkVersion::fromBSON( staleInfo, "vWanted" );
+ }
+
+ // We assume here that we can't have more than one stale config per-shard
+ dassert( _remoteShardVersions.find( endpoint.shardName ) == _remoteShardVersions.end() );
+
+ _remoteShardVersions.insert( make_pair( endpoint.shardName, remoteShardVersion ) );
+ }
+
+ void ChunkManagerTargeter::noteCouldNotTarget() {
+ dassert( _remoteShardVersions.empty() );
+ _needsTargetingRefresh = true;
+ }
+
+ const TargeterStats* ChunkManagerTargeter::getStats() const {
+ return _stats.get();
+ }
+
Status ChunkManagerTargeter::refreshIfNeeded() {
//
diff --git a/src/mongo/s/dbclient_multi_command.cpp b/src/mongo/s/dbclient_multi_command.cpp
index ebe2678804f..dd16eab2803 100644
--- a/src/mongo/s/dbclient_multi_command.cpp
+++ b/src/mongo/s/dbclient_multi_command.cpp
@@ -90,10 +90,9 @@ namespace mongo {
BatchedCommandRequest request( getBatchWriteType( cmdRequest ) );
// This should *always* parse correctly
- string errMsg;
- bool parsed = !request.parseBSON( cmdRequest, &errMsg );
+ bool parsed = request.parseBSON( cmdRequest, NULL );
(void) parsed; // for non-debug compile
- dassert( parsed && request.isValid( &errMsg ) );
+ dassert( parsed && request.isValid( NULL ) );
// Collection name is sent without db to the dispatcher
request.setNS( dbName.toString() + "." + request.getNS() );
@@ -104,7 +103,7 @@ namespace mongo {
batchSafeWriter.safeWriteBatch( conn, request, &response );
// Back to BSON
- dassert( response.isValid( &errMsg ) );
+ dassert( response.isValid( NULL ) );
*cmdResponse = response.toBSON();
}
}
diff --git a/src/mongo/s/dbclient_safe_writer.cpp b/src/mongo/s/dbclient_safe_writer.cpp
index 2240d28f982..551ed93c50e 100644
--- a/src/mongo/s/dbclient_safe_writer.cpp
+++ b/src/mongo/s/dbclient_safe_writer.cpp
@@ -28,6 +28,7 @@
#include "mongo/s/dbclient_safe_writer.h"
+#include "mongo/s/version_manager.h"
#include "mongo/util/assert_util.h"
namespace mongo {
@@ -39,6 +40,17 @@ namespace mongo {
const BatchedCommandRequest* request = itemRef.getRequest();
try {
+
+ // Default settings for checkShardVersion
+ const bool authoritative = false;
+ const int tryNum = 1;
+
+ // We need to set our version using setShardVersion, managed by checkShardVersionCB
+ versionManager.checkShardVersionCB( conn,
+ request->getTargetingNS(),
+ authoritative,
+ tryNum );
+
if ( request->getBatchType() == BatchedCommandRequest::BatchType_Insert ) {
conn->insert( request->getNS(),
request->getInsertRequest()->getDocumentsAt( itemRef.getItemIndex() ),
diff --git a/src/mongo/s/strategy_shard.cpp b/src/mongo/s/strategy_shard.cpp
index 7ff7389ca88..25399b38267 100644
--- a/src/mongo/s/strategy_shard.cpp
+++ b/src/mongo/s/strategy_shard.cpp
@@ -1302,6 +1302,12 @@ namespace mongo {
// Valve for turning on upconversion for all batch write commands
if ( useClusterWriteCommands ) {
+ // Ignore all messages from the WBL if we're using cluster write commands
+ // TODO: We'll have to turn it off at some point, but this is an easy way to ensure
+ // it has no impact.
+ bool fromWBL = r.d().reservedField() & Reserved_FromWriteback;
+ if ( fromWBL ) return;
+
auto_ptr<BatchedCommandRequest> request( msgToBatchRequest( r.m() ) );
BatchedCommandResponse response;