diff options
author | Greg Studer <greg@10gen.com> | 2013-12-14 16:01:30 -0500 |
---|---|---|
committer | Greg Studer <greg@10gen.com> | 2013-12-16 14:49:01 -0500 |
commit | b0cc3e092880c00cdbd531d88f38cb9e0a52d881 (patch) | |
tree | d26701d6c4ed9dc98891ab181d9eded0e8a0c083 /src/mongo/s/write_ops | |
parent | 99dff054c8b83caf43c42d01b0497dcb0e1ee5bf (diff) | |
download | mongo-b0cc3e092880c00cdbd531d88f38cb9e0a52d881.tar.gz |
SERVER-11681 mongos upconverts all writes by default
Diffstat (limited to 'src/mongo/s/write_ops')
-rw-r--r-- | src/mongo/s/write_ops/batch_upconvert.cpp | 80 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_upconvert.h | 21 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_upconvert_test.cpp | 7 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_write_exec.cpp | 43 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_write_exec.h | 41 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_write_op.cpp | 6 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batched_command_response.cpp | 4 |
7 files changed, 151 insertions, 51 deletions
diff --git a/src/mongo/s/write_ops/batch_upconvert.cpp b/src/mongo/s/write_ops/batch_upconvert.cpp index de35c334100..6936bf7e24c 100644 --- a/src/mongo/s/write_ops/batch_upconvert.cpp +++ b/src/mongo/s/write_ops/batch_upconvert.cpp @@ -40,52 +40,69 @@ namespace mongo { using mongoutils::str::stream; + using std::vector; - BatchedCommandRequest* msgToBatchRequest( const Message& msg ) { + void msgToBatchRequests( const Message& msg, vector<BatchedCommandRequest*>* requests ) { int opType = msg.operation(); auto_ptr<BatchedCommandRequest> request; if ( opType == dbInsert ) { - request.reset( msgToBatchInsert( msg ) ); + msgToBatchInserts( msg, requests ); } else if ( opType == dbUpdate ) { - request.reset( msgToBatchUpdate( msg ) ); + requests->push_back( msgToBatchUpdate( msg ) ); } else { dassert( opType == dbDelete ); - request.reset( msgToBatchDelete( msg ) ); + requests->push_back( msgToBatchDelete( msg ) ); } - - return request.release(); } - BatchedCommandRequest* msgToBatchInsert( const Message& insertMsg ) { + void msgToBatchInserts( const Message& insertMsg, + vector<BatchedCommandRequest*>* insertRequests ) { // Parsing DbMessage throws DbMessage dbMsg( insertMsg ); NamespaceString nss( dbMsg.getns() ); - bool coe = dbMsg.reservedField() & Reserved_InsertOption_ContinueOnError; - - vector<BSONObj> docs; - do { - docs.push_back( dbMsg.nextJsObj() ); - } - while ( dbMsg.moreJSObjs() ); // Continue-on-error == unordered + bool coe = dbMsg.reservedField() & Reserved_InsertOption_ContinueOnError; bool ordered = !coe; - // No exceptions from here on - BatchedCommandRequest* request = - new BatchedCommandRequest( BatchedCommandRequest::BatchType_Insert ); - request->setNS( nss.ns() ); - for ( vector<BSONObj>::const_iterator it = docs.begin(); it != docs.end(); ++it ) { - request->getInsertRequest()->addToDocuments( *it ); - } - request->setOrdered( ordered ); + while ( insertRequests->empty() || dbMsg.moreJSObjs() ) { + + // Collect docs for next batch, but don't exceed maximum size + int totalInsertSize = 0; + vector<BSONObj> docs; + do { + const char* prevObjMark = dbMsg.markGet(); + BSONObj nextObj = dbMsg.nextJsObj(); + if ( totalInsertSize + nextObj.objsize() <= BSONObjMaxUserSize ) { + docs.push_back( nextObj ); + totalInsertSize += docs.back().objsize(); + } + else { + // Size limit exceeded, rollback to previous insert position + dbMsg.markReset( prevObjMark ); + break; + } + } + while ( dbMsg.moreJSObjs() ); - return request; + dassert( !docs.empty() ); + + // No exceptions from here on + BatchedCommandRequest* request = + new BatchedCommandRequest( BatchedCommandRequest::BatchType_Insert ); + request->setNS( nss.ns() ); + for ( vector<BSONObj>::const_iterator it = docs.begin(); it != docs.end(); ++it ) { + request->getInsertRequest()->addToDocuments( *it ); + } + request->setOrdered( ordered ); + + insertRequests->push_back( request ); + } } BatchedCommandRequest* msgToBatchUpdate( const Message& updateMsg ) { @@ -142,7 +159,7 @@ namespace mongo { error->setErrMessage( response.getErrMessage() ); } - void batchErrorToLastError( const BatchedCommandRequest& request, + bool batchErrorToLastError( const BatchedCommandRequest& request, const BatchedCommandResponse& response, LastError* error ) { @@ -170,9 +187,10 @@ namespace mongo { // Record an error if one exists if ( lastBatchError ) { + string errMsg = lastBatchError->getErrMessage(); error->raiseError( lastBatchError->getErrCode(), - lastBatchError->getErrMessage().c_str() ); - return; + errMsg.empty() ? "see code for details" : errMsg.c_str() ); + return true; } // Record write stats otherwise @@ -198,10 +216,18 @@ namespace mongo { int numUpdated = response.getN() - numUpserted; dassert( numUpdated >= 0 ); - error->recordUpdate( numUpdated > 0, response.getN(), upsertedId ); + + // Wrap upserted id in "upserted" field + BSONObj leUpsertedId; + if ( !upsertedId.isEmpty() ) + leUpsertedId = upsertedId.firstElement().wrap( kUpsertedFieldName ); + + error->recordUpdate( numUpdated > 0, response.getN(), leUpsertedId ); } else if ( request.getBatchType() == BatchedCommandRequest::BatchType_Delete ) { error->recordDelete( response.getN() ); } + + return false; } } diff --git a/src/mongo/s/write_ops/batch_upconvert.h b/src/mongo/s/write_ops/batch_upconvert.h index ca2f2c56df6..46b6d4552b6 100644 --- a/src/mongo/s/write_ops/batch_upconvert.h +++ b/src/mongo/s/write_ops/batch_upconvert.h @@ -28,6 +28,8 @@ #pragma once +#include <vector> + #include "mongo/db/lasterror.h" #include "mongo/s/write_ops/batched_command_request.h" #include "mongo/s/write_ops/batched_command_response.h" @@ -40,20 +42,23 @@ namespace mongo { // NOTE: These functions throw on invalid message format. // - BatchedCommandRequest* msgToBatchRequest( const Message& msg ); + void msgToBatchRequests( const Message& msg, std::vector<BatchedCommandRequest*>* requests ); - BatchedCommandRequest* msgToBatchInsert( const Message& insertMsg ); + // Batch inserts may get mapped to multiple batch requests, to avoid spilling MaxBSONObjSize + void msgToBatchInserts( const Message& insertMsg, + std::vector<BatchedCommandRequest*>* insertRequests ); BatchedCommandRequest* msgToBatchUpdate( const Message& updateMsg ); BatchedCommandRequest* msgToBatchDelete( const Message& deleteMsg ); - // - // Utility function for recording completed batch writes into the LastError object. - // (Interpreting the response requires the request object as well.) - // - - void batchErrorToLastError( const BatchedCommandRequest& request, + /** + * Utility function for recording completed batch writes into the LastError object. + * (Interpreting the response requires the request object as well.) + * + * Returns true if an error occurred in the batch. + */ + bool batchErrorToLastError( const BatchedCommandRequest& request, const BatchedCommandResponse& response, LastError* error ); diff --git a/src/mongo/s/write_ops/batch_upconvert_test.cpp b/src/mongo/s/write_ops/batch_upconvert_test.cpp index 880c91129db..0469e6324bf 100644 --- a/src/mongo/s/write_ops/batch_upconvert_test.cpp +++ b/src/mongo/s/write_ops/batch_upconvert_test.cpp @@ -28,6 +28,7 @@ #include "mongo/s/write_ops/batch_upconvert.h" +#include "mongo/base/owned_pointer_vector.h" #include "mongo/bson/bsonobj.h" #include "mongo/bson/bsonobjbuilder.h" #include "mongo/bson/util/builder.h" @@ -56,7 +57,11 @@ namespace { doc.appendSelfToBufBuilder( insertMsgB ); insertMsg.setData( dbInsert, insertMsgB.buf(), insertMsgB.len() ); - auto_ptr<BatchedCommandRequest> request( msgToBatchRequest( insertMsg ) ); + OwnedPointerVector<BatchedCommandRequest> requestsOwned; + vector<BatchedCommandRequest*>& requests = requestsOwned.mutableVector(); + msgToBatchRequests( insertMsg, &requests ); + + BatchedCommandRequest* request = requests.back(); ASSERT_EQUALS( request->getBatchType(), BatchedCommandRequest::BatchType_Insert ); string errMsg; ASSERT( request->isValid( &errMsg ) ); diff --git a/src/mongo/s/write_ops/batch_write_exec.cpp b/src/mongo/s/write_ops/batch_write_exec.cpp index de7926f2369..15000036ed3 100644 --- a/src/mongo/s/write_ops/batch_write_exec.cpp +++ b/src/mongo/s/write_ops/batch_write_exec.cpp @@ -36,19 +36,22 @@ namespace mongo { - namespace { + BatchWriteExec::BatchWriteExec( NSTargeter* targeter, + ShardResolver* resolver, + MultiCommandDispatch* dispatcher ) : + _targeter( targeter ), + _resolver( resolver ), + _dispatcher( dispatcher ), + _stats( new BatchWriteExecStats ) { + } - struct ConnectionStringComp { - bool operator()( const ConnectionString& connStrA, - const ConnectionString& connStrB ) const { - return connStrA.toString().compare( connStrB.toString() ) < 0; - } - }; + namespace { // // Map which allows associating ConnectionString hosts with TargetedWriteBatches // This is needed since the dispatcher only returns hosts with responses. // + // TODO: Unordered map? typedef map<ConnectionString, TargetedWriteBatch*, ConnectionStringComp> HostBatchMap; } @@ -137,7 +140,8 @@ namespace mongo { // size_t numSent = 0; - while ( numSent != childBatches.size() ) { + size_t numToSend = childBatches.size(); + while ( numSent != numToSend ) { // Collect batches out on the network, mapped by endpoint HostBatchMap pendingBatches; @@ -176,6 +180,7 @@ namespace mongo { // We're done with this batch *it = NULL; + --numToSend; continue; } @@ -243,6 +248,13 @@ namespace mongo { noteStaleResponses( staleErrors, _targeter ); ++numStaleBatches; } + + // Remember that we successfully wrote to this shard + // NOTE: This will record lastOps for shards where we actually didn't update + // or delete any documents, which preserves old behavior but is conservative + _stats->noteWriteAt( shardHost, + response.isLastOpSet() ? + OpTime( response.getLastOp() ) : OpTime() ); } else { @@ -258,4 +270,19 @@ namespace mongo { batchOp.buildClientResponse( clientResponse ); } + const BatchWriteExecStats& BatchWriteExec::getStats() { + return *_stats; + } + + BatchWriteExecStats* BatchWriteExec::releaseStats() { + return _stats.release(); + } + + void BatchWriteExecStats::noteWriteAt( const ConnectionString& host, OpTime opTime ) { + _writeOpTimes[host] = opTime; + } + + const HostOpTimeMap& BatchWriteExecStats::getWriteOpTimes() const { + return _writeOpTimes; + } } diff --git a/src/mongo/s/write_ops/batch_write_exec.h b/src/mongo/s/write_ops/batch_write_exec.h index 1bc136f3262..d9324f2f8ae 100644 --- a/src/mongo/s/write_ops/batch_write_exec.h +++ b/src/mongo/s/write_ops/batch_write_exec.h @@ -30,7 +30,11 @@ #include <boost/scoped_ptr.hpp> +#include <map> +#include <string> + #include "mongo/base/disallow_copying.h" +#include "mongo/bson/optime.h" #include "mongo/s/ns_targeter.h" #include "mongo/s/multi_command_dispatch.h" #include "mongo/s/shard_resolver.h" @@ -39,6 +43,8 @@ namespace mongo { + class BatchWriteExecStats; + /** * The BatchWriteExec is able to execute client batch write requests, resulting in a batch * response to send back to the client. @@ -60,9 +66,7 @@ namespace mongo { BatchWriteExec( NSTargeter* targeter, ShardResolver* resolver, - MultiCommandDispatch* dispatcher ) : - _targeter( targeter ), _resolver( resolver ), _dispatcher( dispatcher ) { - } + MultiCommandDispatch* dispatcher ); /** * Executes a client batch write request by sending child batches to several shard @@ -77,6 +81,10 @@ namespace mongo { void executeBatch( const BatchedCommandRequest& clientRequest, BatchedCommandResponse* clientResponse ); + const BatchWriteExecStats& getStats(); + + BatchWriteExecStats* releaseStats(); + private: // Not owned here @@ -87,5 +95,32 @@ namespace mongo { // Not owned here MultiCommandDispatch* _dispatcher; + + // Stats + auto_ptr<BatchWriteExecStats> _stats; + }; + + // Useful comparator for using connection strings in ordered sets and maps + struct ConnectionStringComp { + bool operator()( const ConnectionString& connStrA, + const ConnectionString& connStrB ) const { + return connStrA.toString().compare( connStrB.toString() ) < 0; + } + }; + + typedef std::map<ConnectionString, OpTime, ConnectionStringComp> HostOpTimeMap; + + class BatchWriteExecStats { + public: + + // TODO: Other stats can go here + + void noteWriteAt( const ConnectionString& host, OpTime opTime ); + + const HostOpTimeMap& getWriteOpTimes() const; + + private: + + HostOpTimeMap _writeOpTimes; }; } diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp index e5f43628405..81843c41df6 100644 --- a/src/mongo/s/write_ops/batch_write_op.cpp +++ b/src/mongo/s/write_ops/batch_write_op.cpp @@ -33,7 +33,7 @@ namespace mongo { BatchWriteStats::BatchWriteStats() : - numInserted( 0 ), numUpserted( 0 ), numUpdated( 0 ), numDeleted( 0 ) { + numInserted( 0 ), numUpserted( 0 ), numUpdated( 0 ), numModified( 0 ), numDeleted( 0 ) { } BatchWriteOp::BatchWriteOp() : @@ -340,7 +340,7 @@ namespace mongo { numUpserted = 1; } stats->numUpdated += ( response.getN() - numUpserted ); - stats->numModified += ( response.getNDocsModified() - numUpserted ); + stats->numModified += response.getNDocsModified(); stats->numUpserted += numUpserted; } else { @@ -629,6 +629,8 @@ namespace mongo { int nValue = _stats->numInserted + _stats->numUpserted + _stats->numUpdated + _stats->numDeleted; batchResp->setN( nValue ); + if ( _clientRequest->getBatchType() == BatchedCommandRequest::BatchType_Update ) + batchResp->setNDocsModified( _stats->numModified ); batchResp->setOk( !batchResp->isErrCodeSet() ); dassert( batchResp->isValid( NULL ) ); diff --git a/src/mongo/s/write_ops/batched_command_response.cpp b/src/mongo/s/write_ops/batched_command_response.cpp index e45bfa64494..2238a1773c4 100644 --- a/src/mongo/s/write_ops/batched_command_response.cpp +++ b/src/mongo/s/write_ops/batched_command_response.cpp @@ -148,7 +148,7 @@ namespace mongo { // We're using appendNumber on generation so we'll try a smaller type // (int) first and then fall back to the original type (long long). - BSONField<int> fieldN("n"); + BSONField<int> fieldN(n()); int tempN; fieldState = FieldParser::extract(source, fieldN, &tempN, errMsg); if (fieldState == FieldParser::FIELD_INVALID) { @@ -164,7 +164,7 @@ namespace mongo { // We're using appendNumber on generation so we'll try a smaller type // (int) first and then fall back to the original type (long long). - BSONField<int> fieldNUpdated("nDocumentsModified"); + BSONField<int> fieldNUpdated(nDocsModified()); int tempNUpdated; fieldState = FieldParser::extract(source, fieldNUpdated, &tempNUpdated, errMsg); if (fieldState == FieldParser::FIELD_INVALID) { |