summaryrefslogtreecommitdiff
path: root/src/mongo/s/write_ops
diff options
context:
space:
mode:
authorGreg Studer <greg@10gen.com>2013-12-14 16:01:30 -0500
committerGreg Studer <greg@10gen.com>2013-12-16 14:49:01 -0500
commitb0cc3e092880c00cdbd531d88f38cb9e0a52d881 (patch)
treed26701d6c4ed9dc98891ab181d9eded0e8a0c083 /src/mongo/s/write_ops
parent99dff054c8b83caf43c42d01b0497dcb0e1ee5bf (diff)
downloadmongo-b0cc3e092880c00cdbd531d88f38cb9e0a52d881.tar.gz
SERVER-11681 mongos upconverts all writes by default
Diffstat (limited to 'src/mongo/s/write_ops')
-rw-r--r--src/mongo/s/write_ops/batch_upconvert.cpp80
-rw-r--r--src/mongo/s/write_ops/batch_upconvert.h21
-rw-r--r--src/mongo/s/write_ops/batch_upconvert_test.cpp7
-rw-r--r--src/mongo/s/write_ops/batch_write_exec.cpp43
-rw-r--r--src/mongo/s/write_ops/batch_write_exec.h41
-rw-r--r--src/mongo/s/write_ops/batch_write_op.cpp6
-rw-r--r--src/mongo/s/write_ops/batched_command_response.cpp4
7 files changed, 151 insertions, 51 deletions
diff --git a/src/mongo/s/write_ops/batch_upconvert.cpp b/src/mongo/s/write_ops/batch_upconvert.cpp
index de35c334100..6936bf7e24c 100644
--- a/src/mongo/s/write_ops/batch_upconvert.cpp
+++ b/src/mongo/s/write_ops/batch_upconvert.cpp
@@ -40,52 +40,69 @@
namespace mongo {
using mongoutils::str::stream;
+ using std::vector;
- BatchedCommandRequest* msgToBatchRequest( const Message& msg ) {
+ void msgToBatchRequests( const Message& msg, vector<BatchedCommandRequest*>* requests ) {
int opType = msg.operation();
auto_ptr<BatchedCommandRequest> request;
if ( opType == dbInsert ) {
- request.reset( msgToBatchInsert( msg ) );
+ msgToBatchInserts( msg, requests );
}
else if ( opType == dbUpdate ) {
- request.reset( msgToBatchUpdate( msg ) );
+ requests->push_back( msgToBatchUpdate( msg ) );
}
else {
dassert( opType == dbDelete );
- request.reset( msgToBatchDelete( msg ) );
+ requests->push_back( msgToBatchDelete( msg ) );
}
-
- return request.release();
}
- BatchedCommandRequest* msgToBatchInsert( const Message& insertMsg ) {
+ void msgToBatchInserts( const Message& insertMsg,
+ vector<BatchedCommandRequest*>* insertRequests ) {
// Parsing DbMessage throws
DbMessage dbMsg( insertMsg );
NamespaceString nss( dbMsg.getns() );
- bool coe = dbMsg.reservedField() & Reserved_InsertOption_ContinueOnError;
-
- vector<BSONObj> docs;
- do {
- docs.push_back( dbMsg.nextJsObj() );
- }
- while ( dbMsg.moreJSObjs() );
// Continue-on-error == unordered
+ bool coe = dbMsg.reservedField() & Reserved_InsertOption_ContinueOnError;
bool ordered = !coe;
- // No exceptions from here on
- BatchedCommandRequest* request =
- new BatchedCommandRequest( BatchedCommandRequest::BatchType_Insert );
- request->setNS( nss.ns() );
- for ( vector<BSONObj>::const_iterator it = docs.begin(); it != docs.end(); ++it ) {
- request->getInsertRequest()->addToDocuments( *it );
- }
- request->setOrdered( ordered );
+ while ( insertRequests->empty() || dbMsg.moreJSObjs() ) {
+
+ // Collect docs for next batch, but don't exceed maximum size
+ int totalInsertSize = 0;
+ vector<BSONObj> docs;
+ do {
+ const char* prevObjMark = dbMsg.markGet();
+ BSONObj nextObj = dbMsg.nextJsObj();
+ if ( totalInsertSize + nextObj.objsize() <= BSONObjMaxUserSize ) {
+ docs.push_back( nextObj );
+ totalInsertSize += docs.back().objsize();
+ }
+ else {
+ // Size limit exceeded, rollback to previous insert position
+ dbMsg.markReset( prevObjMark );
+ break;
+ }
+ }
+ while ( dbMsg.moreJSObjs() );
- return request;
+ dassert( !docs.empty() );
+
+ // No exceptions from here on
+ BatchedCommandRequest* request =
+ new BatchedCommandRequest( BatchedCommandRequest::BatchType_Insert );
+ request->setNS( nss.ns() );
+ for ( vector<BSONObj>::const_iterator it = docs.begin(); it != docs.end(); ++it ) {
+ request->getInsertRequest()->addToDocuments( *it );
+ }
+ request->setOrdered( ordered );
+
+ insertRequests->push_back( request );
+ }
}
BatchedCommandRequest* msgToBatchUpdate( const Message& updateMsg ) {
@@ -142,7 +159,7 @@ namespace mongo {
error->setErrMessage( response.getErrMessage() );
}
- void batchErrorToLastError( const BatchedCommandRequest& request,
+ bool batchErrorToLastError( const BatchedCommandRequest& request,
const BatchedCommandResponse& response,
LastError* error ) {
@@ -170,9 +187,10 @@ namespace mongo {
// Record an error if one exists
if ( lastBatchError ) {
+ string errMsg = lastBatchError->getErrMessage();
error->raiseError( lastBatchError->getErrCode(),
- lastBatchError->getErrMessage().c_str() );
- return;
+ errMsg.empty() ? "see code for details" : errMsg.c_str() );
+ return true;
}
// Record write stats otherwise
@@ -198,10 +216,18 @@ namespace mongo {
int numUpdated = response.getN() - numUpserted;
dassert( numUpdated >= 0 );
- error->recordUpdate( numUpdated > 0, response.getN(), upsertedId );
+
+ // Wrap upserted id in "upserted" field
+ BSONObj leUpsertedId;
+ if ( !upsertedId.isEmpty() )
+ leUpsertedId = upsertedId.firstElement().wrap( kUpsertedFieldName );
+
+ error->recordUpdate( numUpdated > 0, response.getN(), leUpsertedId );
}
else if ( request.getBatchType() == BatchedCommandRequest::BatchType_Delete ) {
error->recordDelete( response.getN() );
}
+
+ return false;
}
}
diff --git a/src/mongo/s/write_ops/batch_upconvert.h b/src/mongo/s/write_ops/batch_upconvert.h
index ca2f2c56df6..46b6d4552b6 100644
--- a/src/mongo/s/write_ops/batch_upconvert.h
+++ b/src/mongo/s/write_ops/batch_upconvert.h
@@ -28,6 +28,8 @@
#pragma once
+#include <vector>
+
#include "mongo/db/lasterror.h"
#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/s/write_ops/batched_command_response.h"
@@ -40,20 +42,23 @@ namespace mongo {
// NOTE: These functions throw on invalid message format.
//
- BatchedCommandRequest* msgToBatchRequest( const Message& msg );
+ void msgToBatchRequests( const Message& msg, std::vector<BatchedCommandRequest*>* requests );
- BatchedCommandRequest* msgToBatchInsert( const Message& insertMsg );
+ // Batch inserts may get mapped to multiple batch requests, to avoid spilling MaxBSONObjSize
+ void msgToBatchInserts( const Message& insertMsg,
+ std::vector<BatchedCommandRequest*>* insertRequests );
BatchedCommandRequest* msgToBatchUpdate( const Message& updateMsg );
BatchedCommandRequest* msgToBatchDelete( const Message& deleteMsg );
- //
- // Utility function for recording completed batch writes into the LastError object.
- // (Interpreting the response requires the request object as well.)
- //
-
- void batchErrorToLastError( const BatchedCommandRequest& request,
+ /**
+ * Utility function for recording completed batch writes into the LastError object.
+ * (Interpreting the response requires the request object as well.)
+ *
+ * Returns true if an error occurred in the batch.
+ */
+ bool batchErrorToLastError( const BatchedCommandRequest& request,
const BatchedCommandResponse& response,
LastError* error );
diff --git a/src/mongo/s/write_ops/batch_upconvert_test.cpp b/src/mongo/s/write_ops/batch_upconvert_test.cpp
index 880c91129db..0469e6324bf 100644
--- a/src/mongo/s/write_ops/batch_upconvert_test.cpp
+++ b/src/mongo/s/write_ops/batch_upconvert_test.cpp
@@ -28,6 +28,7 @@
#include "mongo/s/write_ops/batch_upconvert.h"
+#include "mongo/base/owned_pointer_vector.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/bson/util/builder.h"
@@ -56,7 +57,11 @@ namespace {
doc.appendSelfToBufBuilder( insertMsgB );
insertMsg.setData( dbInsert, insertMsgB.buf(), insertMsgB.len() );
- auto_ptr<BatchedCommandRequest> request( msgToBatchRequest( insertMsg ) );
+ OwnedPointerVector<BatchedCommandRequest> requestsOwned;
+ vector<BatchedCommandRequest*>& requests = requestsOwned.mutableVector();
+ msgToBatchRequests( insertMsg, &requests );
+
+ BatchedCommandRequest* request = requests.back();
ASSERT_EQUALS( request->getBatchType(), BatchedCommandRequest::BatchType_Insert );
string errMsg;
ASSERT( request->isValid( &errMsg ) );
diff --git a/src/mongo/s/write_ops/batch_write_exec.cpp b/src/mongo/s/write_ops/batch_write_exec.cpp
index de7926f2369..15000036ed3 100644
--- a/src/mongo/s/write_ops/batch_write_exec.cpp
+++ b/src/mongo/s/write_ops/batch_write_exec.cpp
@@ -36,19 +36,22 @@
namespace mongo {
- namespace {
+ BatchWriteExec::BatchWriteExec( NSTargeter* targeter,
+ ShardResolver* resolver,
+ MultiCommandDispatch* dispatcher ) :
+ _targeter( targeter ),
+ _resolver( resolver ),
+ _dispatcher( dispatcher ),
+ _stats( new BatchWriteExecStats ) {
+ }
- struct ConnectionStringComp {
- bool operator()( const ConnectionString& connStrA,
- const ConnectionString& connStrB ) const {
- return connStrA.toString().compare( connStrB.toString() ) < 0;
- }
- };
+ namespace {
//
// Map which allows associating ConnectionString hosts with TargetedWriteBatches
// This is needed since the dispatcher only returns hosts with responses.
//
+
// TODO: Unordered map?
typedef map<ConnectionString, TargetedWriteBatch*, ConnectionStringComp> HostBatchMap;
}
@@ -137,7 +140,8 @@ namespace mongo {
//
size_t numSent = 0;
- while ( numSent != childBatches.size() ) {
+ size_t numToSend = childBatches.size();
+ while ( numSent != numToSend ) {
// Collect batches out on the network, mapped by endpoint
HostBatchMap pendingBatches;
@@ -176,6 +180,7 @@ namespace mongo {
// We're done with this batch
*it = NULL;
+ --numToSend;
continue;
}
@@ -243,6 +248,13 @@ namespace mongo {
noteStaleResponses( staleErrors, _targeter );
++numStaleBatches;
}
+
+ // Remember that we successfully wrote to this shard
+ // NOTE: This will record lastOps for shards where we actually didn't update
+ // or delete any documents, which preserves old behavior but is conservative
+ _stats->noteWriteAt( shardHost,
+ response.isLastOpSet() ?
+ OpTime( response.getLastOp() ) : OpTime() );
}
else {
@@ -258,4 +270,19 @@ namespace mongo {
batchOp.buildClientResponse( clientResponse );
}
+ const BatchWriteExecStats& BatchWriteExec::getStats() {
+ return *_stats;
+ }
+
+ BatchWriteExecStats* BatchWriteExec::releaseStats() {
+ return _stats.release();
+ }
+
+ void BatchWriteExecStats::noteWriteAt( const ConnectionString& host, OpTime opTime ) {
+ _writeOpTimes[host] = opTime;
+ }
+
+ const HostOpTimeMap& BatchWriteExecStats::getWriteOpTimes() const {
+ return _writeOpTimes;
+ }
}
diff --git a/src/mongo/s/write_ops/batch_write_exec.h b/src/mongo/s/write_ops/batch_write_exec.h
index 1bc136f3262..d9324f2f8ae 100644
--- a/src/mongo/s/write_ops/batch_write_exec.h
+++ b/src/mongo/s/write_ops/batch_write_exec.h
@@ -30,7 +30,11 @@
#include <boost/scoped_ptr.hpp>
+#include <map>
+#include <string>
+
#include "mongo/base/disallow_copying.h"
+#include "mongo/bson/optime.h"
#include "mongo/s/ns_targeter.h"
#include "mongo/s/multi_command_dispatch.h"
#include "mongo/s/shard_resolver.h"
@@ -39,6 +43,8 @@
namespace mongo {
+ class BatchWriteExecStats;
+
/**
* The BatchWriteExec is able to execute client batch write requests, resulting in a batch
* response to send back to the client.
@@ -60,9 +66,7 @@ namespace mongo {
BatchWriteExec( NSTargeter* targeter,
ShardResolver* resolver,
- MultiCommandDispatch* dispatcher ) :
- _targeter( targeter ), _resolver( resolver ), _dispatcher( dispatcher ) {
- }
+ MultiCommandDispatch* dispatcher );
/**
* Executes a client batch write request by sending child batches to several shard
@@ -77,6 +81,10 @@ namespace mongo {
void executeBatch( const BatchedCommandRequest& clientRequest,
BatchedCommandResponse* clientResponse );
+ const BatchWriteExecStats& getStats();
+
+ BatchWriteExecStats* releaseStats();
+
private:
// Not owned here
@@ -87,5 +95,32 @@ namespace mongo {
// Not owned here
MultiCommandDispatch* _dispatcher;
+
+ // Stats
+ auto_ptr<BatchWriteExecStats> _stats;
+ };
+
+ // Useful comparator for using connection strings in ordered sets and maps
+ struct ConnectionStringComp {
+ bool operator()( const ConnectionString& connStrA,
+ const ConnectionString& connStrB ) const {
+ return connStrA.toString().compare( connStrB.toString() ) < 0;
+ }
+ };
+
+ typedef std::map<ConnectionString, OpTime, ConnectionStringComp> HostOpTimeMap;
+
+ class BatchWriteExecStats {
+ public:
+
+ // TODO: Other stats can go here
+
+ void noteWriteAt( const ConnectionString& host, OpTime opTime );
+
+ const HostOpTimeMap& getWriteOpTimes() const;
+
+ private:
+
+ HostOpTimeMap _writeOpTimes;
};
}
diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp
index e5f43628405..81843c41df6 100644
--- a/src/mongo/s/write_ops/batch_write_op.cpp
+++ b/src/mongo/s/write_ops/batch_write_op.cpp
@@ -33,7 +33,7 @@
namespace mongo {
BatchWriteStats::BatchWriteStats() :
- numInserted( 0 ), numUpserted( 0 ), numUpdated( 0 ), numDeleted( 0 ) {
+ numInserted( 0 ), numUpserted( 0 ), numUpdated( 0 ), numModified( 0 ), numDeleted( 0 ) {
}
BatchWriteOp::BatchWriteOp() :
@@ -340,7 +340,7 @@ namespace mongo {
numUpserted = 1;
}
stats->numUpdated += ( response.getN() - numUpserted );
- stats->numModified += ( response.getNDocsModified() - numUpserted );
+ stats->numModified += response.getNDocsModified();
stats->numUpserted += numUpserted;
}
else {
@@ -629,6 +629,8 @@ namespace mongo {
int nValue = _stats->numInserted + _stats->numUpserted + _stats->numUpdated
+ _stats->numDeleted;
batchResp->setN( nValue );
+ if ( _clientRequest->getBatchType() == BatchedCommandRequest::BatchType_Update )
+ batchResp->setNDocsModified( _stats->numModified );
batchResp->setOk( !batchResp->isErrCodeSet() );
dassert( batchResp->isValid( NULL ) );
diff --git a/src/mongo/s/write_ops/batched_command_response.cpp b/src/mongo/s/write_ops/batched_command_response.cpp
index e45bfa64494..2238a1773c4 100644
--- a/src/mongo/s/write_ops/batched_command_response.cpp
+++ b/src/mongo/s/write_ops/batched_command_response.cpp
@@ -148,7 +148,7 @@ namespace mongo {
// We're using appendNumber on generation so we'll try a smaller type
// (int) first and then fall back to the original type (long long).
- BSONField<int> fieldN("n");
+ BSONField<int> fieldN(n());
int tempN;
fieldState = FieldParser::extract(source, fieldN, &tempN, errMsg);
if (fieldState == FieldParser::FIELD_INVALID) {
@@ -164,7 +164,7 @@ namespace mongo {
// We're using appendNumber on generation so we'll try a smaller type
// (int) first and then fall back to the original type (long long).
- BSONField<int> fieldNUpdated("nDocumentsModified");
+ BSONField<int> fieldNUpdated(nDocsModified());
int tempNUpdated;
fieldState = FieldParser::extract(source, fieldNUpdated, &tempNUpdated, errMsg);
if (fieldState == FieldParser::FIELD_INVALID) {