summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/commands/write_commands/batch_executor.cpp5
-rw-r--r--src/mongo/db/commands/write_commands/batch_executor.h9
-rw-r--r--src/mongo/s/chunk_manager_targeter.cpp14
-rw-r--r--src/mongo/s/client_info.cpp81
-rw-r--r--src/mongo/s/client_info.h50
-rw-r--r--src/mongo/s/cluster_write.cpp161
-rw-r--r--src/mongo/s/cluster_write.h45
-rw-r--r--src/mongo/s/commands/cluster_write_cmd.cpp31
-rw-r--r--src/mongo/s/commands_admin.cpp19
-rw-r--r--src/mongo/s/dbclient_multi_command.cpp6
-rw-r--r--src/mongo/s/dbclient_multi_command.h2
-rw-r--r--src/mongo/s/dbclient_shard_resolver.cpp10
-rw-r--r--src/mongo/s/request.cpp2
-rw-r--r--src/mongo/s/s_only.cpp2
-rw-r--r--src/mongo/s/server.cpp53
-rw-r--r--src/mongo/s/shardconnection.cpp2
-rw-r--r--src/mongo/s/strategy_shard.cpp67
-rw-r--r--src/mongo/s/write_ops/batch_upconvert.cpp80
-rw-r--r--src/mongo/s/write_ops/batch_upconvert.h21
-rw-r--r--src/mongo/s/write_ops/batch_upconvert_test.cpp7
-rw-r--r--src/mongo/s/write_ops/batch_write_exec.cpp43
-rw-r--r--src/mongo/s/write_ops/batch_write_exec.h41
-rw-r--r--src/mongo/s/write_ops/batch_write_op.cpp6
-rw-r--r--src/mongo/s/write_ops/batched_command_response.cpp4
-rw-r--r--src/mongo/util/assert_util.h10
25 files changed, 562 insertions, 209 deletions
diff --git a/src/mongo/db/commands/write_commands/batch_executor.cpp b/src/mongo/db/commands/write_commands/batch_executor.cpp
index aa09ca08af4..53d4dcdaf02 100644
--- a/src/mongo/db/commands/write_commands/batch_executor.cpp
+++ b/src/mongo/db/commands/write_commands/batch_executor.cpp
@@ -39,6 +39,7 @@
#include "mongo/db/ops/update.h"
#include "mongo/db/ops/update_lifecycle_impl.h"
#include "mongo/db/pagefault.h"
+#include "mongo/db/repl/replication_server_status.h"
#include "mongo/db/stats/counters.h"
#include "mongo/db/write_concern.h"
#include "mongo/s/collection_metadata.h"
@@ -168,6 +169,10 @@ namespace mongo {
}
}
+ // Send opTime in response
+ if ( anyReplEnabled() )
+ response->setLastOp( _client->getLastOp().asDate() );
+
// Apply write concern if we had any successful writes
if ( numItemErrors < numBatchItems ) {
diff --git a/src/mongo/db/commands/write_commands/batch_executor.h b/src/mongo/db/commands/write_commands/batch_executor.h
index f7608bbc094..d8033a7ab1a 100644
--- a/src/mongo/db/commands/write_commands/batch_executor.h
+++ b/src/mongo/db/commands/write_commands/batch_executor.h
@@ -82,15 +82,6 @@ namespace mongo {
int numUpserted;
int numDeleted;
-
- const std::string toString() const {
- return mongoutils::str::stream()
- << "numInserted: " << numInserted
- << " numModified: " << numModified
- << " numUpdated: " << numUpdated
- << " numUpserted: " << numUpserted
- << " numDeleted: " << numDeleted;
- }
};
/**
diff --git a/src/mongo/s/chunk_manager_targeter.cpp b/src/mongo/s/chunk_manager_targeter.cpp
index 08db5fab6cb..b992e7b5fdf 100644
--- a/src/mongo/s/chunk_manager_targeter.cpp
+++ b/src/mongo/s/chunk_manager_targeter.cpp
@@ -574,6 +574,17 @@ namespace mongo {
return Status::OK();
}
+ // To match legacy reload behavior, we have to backoff on config reload per-thread
+ // TODO: Centralize this behavior better by refactoring config reload in mongos
+ static const int maxWaitMillis = 500;
+ static boost::thread_specific_ptr<Backoff> perThreadBackoff;
+
+ static void refreshBackoff() {
+ if ( !perThreadBackoff.get() )
+ perThreadBackoff.reset( new Backoff( maxWaitMillis, maxWaitMillis * 2 ) );
+ perThreadBackoff.get()->nextSleepMillis();
+ }
+
Status ChunkManagerTargeter::refreshNow( RefreshType refreshType ) {
DBConfigPtr config;
@@ -583,6 +594,9 @@ namespace mongo {
return Status( ErrorCodes::DatabaseNotFound, errMsg );
}
+ // Try not to spam the configs
+ refreshBackoff();
+
// TODO: Improve synchronization and make more explicit
if ( refreshType == RefreshType_RefreshChunkManager ) {
try {
diff --git a/src/mongo/s/client_info.cpp b/src/mongo/s/client_info.cpp
index f54e243fc6f..9fe37517b39 100644
--- a/src/mongo/s/client_info.cpp
+++ b/src/mongo/s/client_info.cpp
@@ -64,9 +64,16 @@ namespace mongo {
ClientInfo::~ClientInfo() {
}
- void ClientInfo::addShard( const string& shard ) {
- _cur->insert( shard );
- _sinceLastGetError.insert( shard );
+ void ClientInfo::addShardHost( const string& shardHost ) {
+ _cur->shardHostsWritten.insert( shardHost );
+ _sinceLastGetError.insert( shardHost );
+ }
+
+ void ClientInfo::addHostOpTimes( const HostOpTimeMap& hostOpTimes ) {
+ for ( HostOpTimeMap::const_iterator it = hostOpTimes.begin();
+ it != hostOpTimes.end(); ++it ) {
+ _cur->hostOpTimes[it->first.toString()] = it->second;
+ }
}
void ClientInfo::newPeerRequest( const HostAndPort& peer ) {
@@ -84,7 +91,7 @@ namespace mongo {
void ClientInfo::newRequest() {
_lastAccess = (int) time(0);
- set<string> * temp = _cur;
+ RequestInfo* temp = _cur;
_cur = _prev;
_prev = temp;
_cur->clear();
@@ -175,7 +182,7 @@ namespace mongo {
}
void ClientInfo::disableForCommand() {
- set<string> * temp = _cur;
+ RequestInfo* temp = _cur;
_cur = _prev;
_prev = temp;
}
@@ -183,6 +190,68 @@ namespace mongo {
static TimerStats gleWtimeStats;
static ServerStatusMetricField<TimerStats> displayGleLatency( "getLastError.wtime", &gleWtimeStats );
+ static BSONObj addOpTimeTo( const BSONObj& options, const OpTime& opTime ) {
+ BSONObjBuilder builder;
+ builder.appendElements( options );
+ builder.appendTimestamp( "wOpTime", opTime.asDate() );
+ return builder.obj();
+ }
+
+ bool ClientInfo::enforceWriteConcern( const string& dbName,
+ const BSONObj& options,
+ string* errMsg ) {
+
+ const map<string, OpTime>& hostOpTimes = getPrevHostOpTimes();
+
+ if ( hostOpTimes.empty() ) {
+ return true;
+ }
+
+ for ( map<string, OpTime>::const_iterator it = hostOpTimes.begin(); it != hostOpTimes.end();
+ ++it ) {
+
+ const string& shardHost = it->first;
+ const OpTime& opTime = it->second;
+
+ LOG(5) << "enforcing write concern " << options << " on " << shardHost << endl;
+
+ BSONObj optionsWithOpTime = addOpTimeTo( options, opTime );
+
+ bool ok = false;
+
+ boost::scoped_ptr<ScopedDbConnection> connPtr;
+ try {
+ connPtr.reset( new ScopedDbConnection( shardHost ) );
+ ScopedDbConnection& conn = *connPtr;
+
+ BSONObj result;
+ ok = conn->runCommand( dbName , optionsWithOpTime , result );
+ if ( !ok )
+ *errMsg = result.toString();
+
+ conn.done();
+ }
+ catch( const DBException& ex ){
+ *errMsg = ex.toString();
+
+ if ( connPtr )
+ connPtr->done();
+ }
+
+ // Done if anyone fails
+ if ( !ok ) {
+
+ *errMsg = str::stream() << "could not enforce write concern on " << shardHost
+ << causedBy( errMsg );
+
+ warning() << *errMsg << endl;
+ return false;
+ }
+ }
+
+ return true;
+ }
+
bool ClientInfo::getLastError( const string& dbName,
const BSONObj& options,
BSONObjBuilder& result,
@@ -206,7 +275,7 @@ namespace mongo {
}
- set<string> * shards = getPrev();
+ set<string>* shards = getPrevShardHosts();
if ( shards->size() == 0 ) {
result.appendNull( "err" );
diff --git a/src/mongo/s/client_info.h b/src/mongo/s/client_info.h
index 536e6e0f405..a41268823ff 100644
--- a/src/mongo/s/client_info.h
+++ b/src/mongo/s/client_info.h
@@ -34,6 +34,7 @@
#include "mongo/db/client_basic.h"
#include "mongo/s/chunk.h"
#include "mongo/s/writeback_listener.h"
+#include "mongo/s/write_ops/batch_write_exec.h"
#include "mongo/util/net/hostandport.h"
namespace mongo {
@@ -71,12 +72,24 @@ namespace mongo {
* notes that this client use this shard
* keeps track of all shards accessed this request
*/
- void addShard( const string& shard );
+ void addShardHost( const string& shardHost );
+
+ /**
+ * Notes that this client wrote to these particular hosts with write commands.
+ */
+ void addHostOpTimes( const HostOpTimeMap& hostOpTimes );
/**
* gets shards used on the previous request
*/
- set<string> * getPrev() const { return _prev; };
+ set<string>* getPrevShardHosts() const { return &_prev->shardHostsWritten; }
+
+ /**
+ * Gets the shards, hosts, and opTimes the client last wrote to with write commands.
+ */
+ const map<string, OpTime>& getPrevHostOpTimes() const {
+ return _prev->hostOpTimes;
+ }
/**
* gets all shards we've accessed since the last time we called clearSinceLastGetError
@@ -90,13 +103,21 @@ namespace mongo {
/**
- * resets the list of shards using to process the current request
+ * resets the information stored for the current request
*/
- void clearCurrentShards(){ _cur->clear(); }
+ void clearRequestInfo(){ _cur->clear(); }
void disableForCommand();
/**
+ * Uses GLE and the shard hosts and opTimes last written by write commands to enforce a
+ * write concern.
+ *
+ * Returns true if write concern was enforced, false with errMsg if not.
+ */
+ bool enforceWriteConcern( const string& dbName, const BSONObj& options, string* errMsg );
+
+ /**
* calls getLastError
* resets shards since get last error
* @return if the command was ok or if there was an error
@@ -138,14 +159,25 @@ namespace mongo {
int _id; // unique client id
HostAndPort _remote; // server:port of remote socket end
- // we use _a and _b to store shards we've talked to on the current request and the previous
+ struct RequestInfo {
+
+ void clear() {
+ shardHostsWritten.clear();
+ hostOpTimes.clear();
+ }
+
+ set<string> shardHostsWritten;
+ map<string, OpTime> hostOpTimes;
+ };
+
+ // we use _a and _b to store info from the current request and the previous request
// we use 2 so we can flip for getLastError type operations
- set<string> _a; // actual set for _cur or _prev
- set<string> _b; // "
+ RequestInfo _a; // actual set for _cur or _prev
+ RequestInfo _b; // "
- set<string> * _cur; // pointer to _a or _b depending on state
- set<string> * _prev; // ""
+ RequestInfo* _cur; // pointer to _a or _b depending on state
+ RequestInfo* _prev; // ""
set<string> _sinceLastGetError; // all shards accessed since last getLastError
diff --git a/src/mongo/s/cluster_write.cpp b/src/mongo/s/cluster_write.cpp
index f54718395ee..6297eab2d1d 100644
--- a/src/mongo/s/cluster_write.cpp
+++ b/src/mongo/s/cluster_write.cpp
@@ -160,43 +160,73 @@ namespace mongo {
return configHosts;
}
- static void shardWrite( const BatchedCommandRequest& request,
- BatchedCommandResponse* response,
- bool autoSplit ) {
+ void clusterInsert( const string& ns,
+ const BSONObj& doc,
+ BatchedCommandResponse* response ) {
+ auto_ptr<BatchedInsertRequest> insert( new BatchedInsertRequest() );
+ insert->addToDocuments( doc );
- ChunkManagerTargeter targeter;
- Status targetInitStatus = targeter.init( NamespaceString( request.getTargetingNS() ) );
+ BatchedCommandRequest request( insert.release() );
+ request.setNS( ns );
- if ( !targetInitStatus.isOK() ) {
+ clusterWrite( request, response, false );
+ }
- warning() << "could not initialize targeter for"
- << ( request.isInsertIndexRequest() ? " index" : "" )
- << " write op in collection " << request.getTargetingNS() << endl;
+ void clusterUpdate( const string& ns,
+ const BSONObj& query,
+ const BSONObj& update,
+ bool upsert,
+ bool multi,
+ BatchedCommandResponse* response ) {
+ auto_ptr<BatchedUpdateDocument> updateDoc( new BatchedUpdateDocument() );
+ updateDoc->setQuery( query );
+ updateDoc->setUpdateExpr( update );
+ updateDoc->setUpsert( upsert );
+ updateDoc->setMulti( multi );
- // Errors will be reported in response if we are unable to target
- }
+ auto_ptr<BatchedUpdateRequest> updateRequest( new BatchedUpdateRequest() );
+ updateRequest->addToUpdates( updateDoc.release() );
- DBClientShardResolver resolver;
- DBClientMultiCommand dispatcher;
- BatchWriteExec exec( &targeter, &resolver, &dispatcher );
- exec.executeBatch( request, response );
+ BatchedCommandRequest request( updateRequest.release() );
+ request.setNS( ns );
- if ( autoSplit ) splitIfNeeded( request.getNS(), *targeter.getStats() );
+ clusterWrite( request, response, false );
}
- static void configWrite( const BatchedCommandRequest& request,
- BatchedCommandResponse* response,
- bool fsyncCheck ) {
+ void clusterDelete( const string& ns,
+ const BSONObj& query,
+ int limit,
+ BatchedCommandResponse* response ) {
+ auto_ptr<BatchedDeleteDocument> deleteDoc( new BatchedDeleteDocument );
+ deleteDoc->setQuery( query );
+ deleteDoc->setLimit( limit );
- DBClientMultiCommand dispatcher;
- dispatcher.setTimeoutMillis( ConfigOpTimeoutMillis );
- ConfigCoordinator exec( &dispatcher, getConfigHosts() );
- exec.executeBatch( request, response, fsyncCheck );
+ auto_ptr<BatchedDeleteRequest> deleteRequest( new BatchedDeleteRequest() );
+ deleteRequest->addToDeletes( deleteDoc.release() );
+
+ BatchedCommandRequest request( deleteRequest.release() );
+ request.setNS( ns );
+
+ clusterWrite( request, response, false );
+ }
+
+ void clusterCreateIndex( const string& ns,
+ BSONObj keys,
+ bool unique,
+ BatchedCommandResponse* response) {
+ clusterInsert( NamespaceString( ns ).getSystemIndexesCollection(),
+ createIndexDoc( ns, keys, unique ), response );
}
void clusterWrite( const BatchedCommandRequest& request,
BatchedCommandResponse* response,
bool autoSplit ) {
+ ClusterWriter writer( autoSplit, 0 );
+ writer.write( request, response );
+ }
+
+ void ClusterWriter::write( const BatchedCommandRequest& request,
+ BatchedCommandResponse* response ) {
// App-level validation of a create index insert
if ( request.isInsertIndexRequest() ) {
@@ -255,66 +285,63 @@ namespace mongo {
configWrite( request, response, verboseWC );
}
else {
- shardWrite( request, response, autoSplit );
+ shardWrite( request, response );
}
}
- void clusterInsert( const string& ns,
- const BSONObj& doc,
- BatchedCommandResponse* response ) {
- auto_ptr<BatchedInsertRequest> insert( new BatchedInsertRequest() );
- insert->addToDocuments( doc );
-
- BatchedCommandRequest request( insert.release() );
- request.setNS( ns );
+ ClusterWriter::ClusterWriter( bool autoSplit, int timeoutMillis ) :
+ _autoSplit( autoSplit ), _timeoutMillis( timeoutMillis ), _stats( new ClusterWriterStats ) {
+ }
- clusterWrite( request, response, false );
+ const ClusterWriterStats& ClusterWriter::getStats() {
+ return *_stats;
}
- void clusterUpdate( const string& ns,
- const BSONObj& query,
- const BSONObj& update,
- bool upsert,
- bool multi,
- BatchedCommandResponse* response ) {
- auto_ptr<BatchedUpdateDocument> updateDoc( new BatchedUpdateDocument() );
- updateDoc->setQuery( query );
- updateDoc->setUpdateExpr( update );
- updateDoc->setUpsert( upsert );
- updateDoc->setMulti( multi );
+ void ClusterWriter::shardWrite( const BatchedCommandRequest& request,
+ BatchedCommandResponse* response ) {
- auto_ptr<BatchedUpdateRequest> updateRequest( new BatchedUpdateRequest() );
- updateRequest->addToUpdates( updateDoc.release() );
+ ChunkManagerTargeter targeter;
+ Status targetInitStatus = targeter.init( NamespaceString( request.getTargetingNS() ) );
- BatchedCommandRequest request( updateRequest.release() );
- request.setNS( ns );
+ if ( !targetInitStatus.isOK() ) {
- clusterWrite( request, response, false );
+ warning() << "could not initialize targeter for"
+ << ( request.isInsertIndexRequest() ? " index" : "" )
+ << " write op in collection " << request.getTargetingNS() << endl;
+
+ // Errors will be reported in response if we are unable to target
+ }
+
+ DBClientShardResolver resolver;
+ DBClientMultiCommand dispatcher;
+ BatchWriteExec exec( &targeter, &resolver, &dispatcher );
+ exec.executeBatch( request, response );
+
+ if ( _autoSplit )
+ splitIfNeeded( request.getNS(), *targeter.getStats() );
+
+ _stats->setShardStats( exec.releaseStats() );
}
- void clusterDelete( const string& ns,
- const BSONObj& query,
- int limit,
- BatchedCommandResponse* response ) {
- auto_ptr<BatchedDeleteDocument> deleteDoc( new BatchedDeleteDocument );
- deleteDoc->setQuery( query );
- deleteDoc->setLimit( limit );
+ void ClusterWriter::configWrite( const BatchedCommandRequest& request,
+ BatchedCommandResponse* response,
+ bool fsyncCheck ) {
- auto_ptr<BatchedDeleteRequest> deleteRequest( new BatchedDeleteRequest() );
- deleteRequest->addToDeletes( deleteDoc.release() );
+ DBClientMultiCommand dispatcher;
+ ConfigCoordinator exec( &dispatcher, getConfigHosts() );
+ exec.executeBatch( request, response, fsyncCheck );
+ }
- BatchedCommandRequest request( deleteRequest.release() );
- request.setNS( ns );
+ void ClusterWriterStats::setShardStats( BatchWriteExecStats* shardStats ) {
+ _shardStats.reset( shardStats );
+ }
- clusterWrite( request, response, false );
+ bool ClusterWriterStats::hasShardStats() const {
+ return NULL != _shardStats.get();
}
- void clusterCreateIndex( const string& ns,
- BSONObj keys,
- bool unique,
- BatchedCommandResponse* response) {
- clusterInsert( NamespaceString( ns ).getSystemIndexesCollection(),
- createIndexDoc( ns, keys, unique ), response );
+ const BatchWriteExecStats& ClusterWriterStats::getShardStats() const {
+ return *_shardStats;
}
} // namespace mongo
diff --git a/src/mongo/s/cluster_write.h b/src/mongo/s/cluster_write.h
index 260ab4b5d18..c6cb2a92e22 100644
--- a/src/mongo/s/cluster_write.h
+++ b/src/mongo/s/cluster_write.h
@@ -28,11 +28,56 @@
#pragma once
+#include "mongo/s/write_ops/batch_write_exec.h"
#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/s/write_ops/batched_command_response.h"
namespace mongo {
+ class ClusterWriterStats;
+ class BatchWriteExecStats;
+
+ class ClusterWriter {
+ public:
+
+ ClusterWriter( bool autoSplit, int timeoutMillis );
+
+ void write( const BatchedCommandRequest& request, BatchedCommandResponse* response );
+
+ const ClusterWriterStats& getStats();
+
+ private:
+
+ void configWrite( const BatchedCommandRequest& request,
+ BatchedCommandResponse* response,
+ bool fsyncCheck );
+
+ void shardWrite( const BatchedCommandRequest& request,
+ BatchedCommandResponse* response );
+
+ bool _autoSplit;
+ int _timeoutMillis;
+
+ scoped_ptr<ClusterWriterStats> _stats;
+ };
+
+ class ClusterWriterStats {
+ public:
+
+ // Transfers ownership to the cluster write stats
+ void setShardStats( BatchWriteExecStats* _shardStats );
+
+ bool hasShardStats() const;
+
+ const BatchWriteExecStats& getShardStats() const;
+
+ // TODO: When we have ConfigCoordinator stats, put these here too.
+
+ private:
+
+ scoped_ptr<BatchWriteExecStats> _shardStats;
+ };
+
void clusterWrite( const BatchedCommandRequest& request,
BatchedCommandResponse* response,
bool autoSplit );
diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp
index 4f8ec473aca..c6d39a5845c 100644
--- a/src/mongo/s/commands/cluster_write_cmd.cpp
+++ b/src/mongo/s/commands/cluster_write_cmd.cpp
@@ -33,6 +33,7 @@
#include "mongo/db/commands/write_commands/write_commands_common.h"
#include "mongo/s/cluster_write.h"
#include "mongo/db/lasterror.h"
+#include "mongo/s/client_info.h"
#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/s/write_ops/batched_command_response.h"
#include "mongo/s/write_ops/batch_upconvert.h"
@@ -69,16 +70,10 @@ namespace mongo {
const std::string& dbname,
const BSONObj& cmdObj ) {
- Status status( auth::checkAuthForWriteCommand( client->getAuthorizationSession(),
- _writeType,
- NamespaceString( parseNs( dbname, cmdObj ) ),
- cmdObj ));
-
- if ( !status.isOK() ) {
- setLastError( status.code(), status.reason().c_str() );
- }
-
- return status;
+ return auth::checkAuthForWriteCommand( client->getAuthorizationSession(),
+ _writeType,
+ NamespaceString( parseNs( dbname, cmdObj ) ),
+ cmdObj );
}
// Cluster write command entry point.
@@ -154,6 +149,7 @@ namespace mongo {
BatchedCommandRequest request( _writeType );
BatchedCommandResponse response;
+ ClusterWriter writer( true /* autosplit */, 0 /* timeout */ );
// TODO: if we do namespace parsing, push this to the type
if ( !request.parseBSON( cmdObj, &errMsg ) || !request.isValid( &errMsg ) ) {
@@ -162,23 +158,22 @@ namespace mongo {
response.setOk( false );
response.setErrCode( ErrorCodes::FailedToParse );
response.setErrMessage( errMsg );
-
- dassert( response.isValid( &errMsg ) );
}
else {
// Fixup the namespace to be a full ns internally
NamespaceString nss( dbName, request.getNS() );
request.setNS( nss.ns() );
- clusterWrite( request, &response, true /* autosplit */ );
+ writer.write( request, &response );
}
- // Populate the lastError object based on the write
dassert( response.isValid( NULL ) );
- LastError* lastErrorForRequest = lastError.get( true /* create */ );
- dassert( lastErrorForRequest );
- lastErrorForRequest->reset();
- batchErrorToLastError( request, response, lastErrorForRequest );
+
+ // Save the last opTimes written on each shard for this client, to allow GLE to work
+ if ( ClientInfo::exists() && writer.getStats().hasShardStats() ) {
+ ClientInfo* clientInfo = ClientInfo::get( NULL );
+ clientInfo->addHostOpTimes( writer.getStats().getShardStats().getWriteOpTimes() );
+ }
// TODO
// There's a pending issue about how to report response here. If we use
diff --git a/src/mongo/s/commands_admin.cpp b/src/mongo/s/commands_admin.cpp
index e805c9fe215..f0c8858b586 100644
--- a/src/mongo/s/commands_admin.cpp
+++ b/src/mongo/s/commands_admin.cpp
@@ -1496,13 +1496,26 @@ namespace mongo {
virtual bool run(const string& dbName, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) {
LastError *le = lastError.disableForCommand();
verify( le );
+
+ if ( Strategy::useClusterWriteCommands ) {
+
+ if ( le->nPrev == 1 ) {
+ le->appendSelf( result );
+ }
+ else {
+ result.appendNull( "err" );
+ }
+
+ return ClientInfo::get()->enforceWriteConcern( dbName, cmdObj, &errmsg );
+ }
+
{
- if ( Strategy::useClusterWriteCommands ||
- ( le->msg.size() && le->nPrev == 1 ) ) {
+ if ( le->msg.size() && le->nPrev == 1 ) {
le->appendSelf( result );
return true;
}
}
+
ClientInfo * client = ClientInfo::get();
bool res = client->getLastError( dbName, cmdObj , result, errmsg );
client->disableForCommand();
@@ -1529,7 +1542,7 @@ namespace mongo {
le->reset();
ClientInfo * client = ClientInfo::get();
- set<string> * shards = client->getPrev();
+ set<string> * shards = client->getPrevShardHosts();
for ( set<string>::iterator i = shards->begin(); i != shards->end(); i++ ) {
string theShard = *i;
diff --git a/src/mongo/s/dbclient_multi_command.cpp b/src/mongo/s/dbclient_multi_command.cpp
index dc43c658460..3a400f2692d 100644
--- a/src/mongo/s/dbclient_multi_command.cpp
+++ b/src/mongo/s/dbclient_multi_command.cpp
@@ -148,10 +148,12 @@ namespace mongo {
dassert( NULL == command->conn );
try {
- // TODO: Figure out how to handle repl sets, configs
dassert( command->endpoint.type() == ConnectionString::MASTER ||
command->endpoint.type() == ConnectionString::CUSTOM );
- command->conn = shardConnectionPool.get( command->endpoint, _timeoutMillis / 1000 );
+
+ // TODO: Fix the pool up to take millis directly
+ int timeoutSecs = _timeoutMillis / 1000;
+ command->conn = shardConnectionPool.get( command->endpoint, timeoutSecs );
if ( hasBatchWriteFeature( command->conn )
|| !isBatchWriteCommand( command->cmdObj ) ) {
diff --git a/src/mongo/s/dbclient_multi_command.h b/src/mongo/s/dbclient_multi_command.h
index 684cb6cc3ef..56d8f8adabf 100644
--- a/src/mongo/s/dbclient_multi_command.h
+++ b/src/mongo/s/dbclient_multi_command.h
@@ -44,6 +44,8 @@ namespace mongo {
class DBClientMultiCommand : public MultiCommandDispatch {
public:
+ DBClientMultiCommand() : _timeoutMillis( 0 ) {}
+
~DBClientMultiCommand();
void addCommand( const ConnectionString& endpoint,
diff --git a/src/mongo/s/dbclient_shard_resolver.cpp b/src/mongo/s/dbclient_shard_resolver.cpp
index 9f8c45455bd..99c259ccf4b 100644
--- a/src/mongo/s/dbclient_shard_resolver.cpp
+++ b/src/mongo/s/dbclient_shard_resolver.cpp
@@ -54,7 +54,8 @@ namespace mongo {
// Internally uses our shard cache, does no reload
Shard shard = Shard::findIfExists( shardName );
if ( shard.getName() == "" ) {
- return Status( ErrorCodes::ShardNotFound, "" );
+ return Status( ErrorCodes::ShardNotFound,
+ string("unknown shard name ") + shardName );
}
ConnectionString rawShardHost = ConnectionString::parse( shard.getConnString(), errMsg );
@@ -75,7 +76,8 @@ namespace mongo {
ReplicaSetMonitorPtr replMonitor = ReplicaSetMonitor::get( rawShardHost.getSetName(),
false );
if ( !replMonitor ) {
- return Status( ErrorCodes::ReplicaSetNotFound, "" );
+ return Status( ErrorCodes::ReplicaSetNotFound,
+ string("unknown replica set ") + rawShardHost.getSetName() );
}
try {
@@ -86,7 +88,9 @@ namespace mongo {
return Status::OK();
}
catch ( const DBException& ex ) {
- return Status( ErrorCodes::HostNotFound, "" );
+ return Status( ErrorCodes::HostNotFound,
+ string("could not contact primary for replica set ")
+ + replMonitor->getName() );
}
// Unreachable
diff --git a/src/mongo/s/request.cpp b/src/mongo/s/request.cpp
index f856cd6e4a2..88a0b78bc21 100644
--- a/src/mongo/s/request.cpp
+++ b/src/mongo/s/request.cpp
@@ -91,7 +91,7 @@ namespace mongo {
}
_m.header()->id = _id;
- _clientInfo->clearCurrentShards();
+ _clientInfo->clearRequestInfo();
}
// Deprecated, will move to the strategy itself
diff --git a/src/mongo/s/s_only.cpp b/src/mongo/s/s_only.cpp
index 9c52e0a14f0..a2a8e491fd5 100644
--- a/src/mongo/s/s_only.cpp
+++ b/src/mongo/s/s_only.cpp
@@ -46,7 +46,7 @@ namespace mongo {
* in an operation to be read later by getLastError()
*/
void usingAShardConnection( const string& addr ) {
- ClientInfo::get()->addShard( addr );
+ ClientInfo::get()->addShardHost( addr );
}
TSP_DEFINE(Client,currentClient)
diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp
index 270c67139a2..4a94b94ffc9 100644
--- a/src/mongo/s/server.cpp
+++ b/src/mongo/s/server.cpp
@@ -110,6 +110,16 @@ namespace mongo {
return false;
}
+ static BSONObj buildErrReply( const DBException& ex ) {
+ BSONObjBuilder errB;
+ errB.append( "$err", ex.what() );
+ errB.append( "code", ex.getCode() );
+ if ( !ex._shard.empty() ) {
+ errB.append( "shard", ex._shard );
+ }
+ return errB.obj();
+ }
+
class ShardedMessageHandler : public MessageHandler {
public:
virtual ~ShardedMessageHandler() {}
@@ -135,37 +145,38 @@ namespace mongo {
ShardConnection::releaseMyConnections();
}
}
- catch ( AssertionException & e ) {
- LOG( e.isUserAssertion() ? 1 : 0 ) << "AssertionException while processing op type : " << m.operation() << " to : " << r.getns() << causedBy(e) << endl;
-
- le->raiseError( e.getCode() , e.what() );
+ catch ( const AssertionException& ex ) {
- m.header()->id = r.id();
+ LOG( ex.isUserAssertion() ? 1 : 0 ) << "Assertion failed"
+ << " while processing " << opToString( m.operation() ) << " op"
+ << " for " << r.getns() << causedBy( ex ) << endl;
if ( r.expectResponse() ) {
- BSONObj err = BSON( "$err" << e.what() << "code" << e.getCode() );
- replyToQuery( ResultFlag_ErrSet, p , m , err );
+ m.header()->id = r.id();
+ replyToQuery( ResultFlag_ErrSet, p , m , buildErrReply( ex ) );
+ }
+ else {
+ le->raiseError( ex.getCode() , ex.what() );
}
}
- catch ( DBException& e ) {
- // note that e.toString() is more detailed on a SocketException than
- // e.what(). we should think about what is the right level of detail both
- // for logging and return code.
- log() << "DBException in process: " << e.what() << endl;
-
- le->raiseError( e.getCode() , e.what() );
+ catch ( const DBException& ex ) {
- m.header()->id = r.id();
+ log() << "Exception thrown"
+ << " while processing " << opToString( m.operation() ) << " op"
+ << " for " << r.getns() << causedBy( ex ) << endl;
if ( r.expectResponse() ) {
- BSONObjBuilder b;
- b.append("$err",e.what()).append("code",e.getCode());
- if( !e._shard.empty() ) {
- b.append("shard",e._shard);
- }
- replyToQuery( ResultFlag_ErrSet, p , m , b.obj() );
+ m.header()->id = r.id();
+ replyToQuery( ResultFlag_ErrSet, p , m , buildErrReply( ex ) );
+ }
+ else {
+ le->raiseError( ex.getCode() , ex.what() );
}
}
+
+ // Clear out the last error for GLE unless it's been explicitly disabled
+ if ( r.expectResponse() && !le->disabled )
+ le->reset();
}
virtual void disconnected( AbstractMessagingPort* p ) {
diff --git a/src/mongo/s/shardconnection.cpp b/src/mongo/s/shardconnection.cpp
index 920a30383fc..286ff95c608 100644
--- a/src/mongo/s/shardconnection.cpp
+++ b/src/mongo/s/shardconnection.cpp
@@ -494,7 +494,7 @@ namespace mongo {
}
}
- bool ShardConnection::releaseConnectionsAfterResponse( false );
+ bool ShardConnection::releaseConnectionsAfterResponse( true );
ExportedServerParameter<bool> ReleaseConnectionsAfterResponse(
ServerParameterSet::getGlobal(),
diff --git a/src/mongo/s/strategy_shard.cpp b/src/mongo/s/strategy_shard.cpp
index a24f6268017..2aac2516409 100644
--- a/src/mongo/s/strategy_shard.cpp
+++ b/src/mongo/s/strategy_shard.cpp
@@ -31,6 +31,7 @@
#include "mongo/pch.h"
#include "mongo/base/status.h"
+#include "mongo/base/owned_pointer_vector.h"
#include "mongo/bson/util/builder.h"
#include "mongo/client/connpool.h"
#include "mongo/client/dbclientcursor.h"
@@ -59,7 +60,7 @@
namespace mongo {
- bool Strategy::useClusterWriteCommands = false;
+ bool Strategy::useClusterWriteCommands = true;
ExportedServerParameter<bool> _useClusterWriteCommands( ServerParameterSet::getGlobal(),
"useClusterWriteCommands",
&Strategy::useClusterWriteCommands,
@@ -1312,20 +1313,62 @@ namespace mongo {
bool fromWBL = r.d().reservedField() & Reserved_FromWriteback;
if ( fromWBL ) return;
- auto_ptr<BatchedCommandRequest> request( msgToBatchRequest( r.m() ) );
+ // make sure we have a last error
+ dassert( lastError.get( false /* don't create */) );
- // Adjust namespaces for command
- NamespaceString fullNS( request->getNS() );
- string cmdNS = fullNS.getCommandNS();
- // We only pass in collection name to command
- request->setNS( fullNS.coll() );
+ OwnedPointerVector<BatchedCommandRequest> requestsOwned;
+ vector<BatchedCommandRequest*>& requests = requestsOwned.mutableVector();
- BSONObjBuilder builder;
- BSONObj requestBSON = request->toBSON();
- Command::runAgainstRegistered( cmdNS.c_str(), requestBSON, builder, 0 );
- // We purposely don't do anything with the response, lastError is populated in
- // the command itself.
+ msgToBatchRequests( r.m(), &requests );
+ for ( vector<BatchedCommandRequest*>::iterator it = requests.begin();
+ it != requests.end(); ++it ) {
+
+ // Multiple commands registered to last error as multiple requests
+ if ( it != requests.begin() )
+ lastError.startRequest( r.m(), lastError.get( false ) );
+
+ BatchedCommandRequest* request = *it;
+
+ // Adjust namespaces for command
+ NamespaceString fullNS( request->getNS() );
+ string cmdNS = fullNS.getCommandNS();
+ // We only pass in collection name to command
+ request->setNS( fullNS.coll() );
+
+ BSONObjBuilder builder;
+ BSONObj requestBSON = request->toBSON();
+
+ {
+ // Disable the last error object for the duration of the write cmd
+ LastError::Disabled disableLastError( lastError.get( false ) );
+ Command::runAgainstRegistered( cmdNS.c_str(), requestBSON, builder, 0 );
+ }
+
+ BatchedCommandResponse response;
+ bool parsed = response.parseBSON( builder.done(), NULL );
+ (void) parsed; // for compile
+ dassert( parsed && response.isValid( NULL ) );
+
+ // Populate the lastError object based on the write
+ lastError.get( false )->reset();
+ bool hadError = batchErrorToLastError( *request,
+ response,
+ lastError.get( false ) );
+
+ // Need to specially count inserts
+ if ( op == dbInsert ) {
+ for( int i = 0; i < response.getN(); ++i )
+ r.gotInsert();
+ }
+
+ // If this is an ordered batch and we had a non-write-concern error, we should
+ // stop sending.
+ if ( request->getOrdered() && hadError )
+ break;
+ }
+
+ // Cut off the legacy writes
return;
}
diff --git a/src/mongo/s/write_ops/batch_upconvert.cpp b/src/mongo/s/write_ops/batch_upconvert.cpp
index de35c334100..6936bf7e24c 100644
--- a/src/mongo/s/write_ops/batch_upconvert.cpp
+++ b/src/mongo/s/write_ops/batch_upconvert.cpp
@@ -40,52 +40,69 @@
namespace mongo {
using mongoutils::str::stream;
+ using std::vector;
- BatchedCommandRequest* msgToBatchRequest( const Message& msg ) {
+ void msgToBatchRequests( const Message& msg, vector<BatchedCommandRequest*>* requests ) {
int opType = msg.operation();
auto_ptr<BatchedCommandRequest> request;
if ( opType == dbInsert ) {
- request.reset( msgToBatchInsert( msg ) );
+ msgToBatchInserts( msg, requests );
}
else if ( opType == dbUpdate ) {
- request.reset( msgToBatchUpdate( msg ) );
+ requests->push_back( msgToBatchUpdate( msg ) );
}
else {
dassert( opType == dbDelete );
- request.reset( msgToBatchDelete( msg ) );
+ requests->push_back( msgToBatchDelete( msg ) );
}
-
- return request.release();
}
- BatchedCommandRequest* msgToBatchInsert( const Message& insertMsg ) {
+ void msgToBatchInserts( const Message& insertMsg,
+ vector<BatchedCommandRequest*>* insertRequests ) {
// Parsing DbMessage throws
DbMessage dbMsg( insertMsg );
NamespaceString nss( dbMsg.getns() );
- bool coe = dbMsg.reservedField() & Reserved_InsertOption_ContinueOnError;
-
- vector<BSONObj> docs;
- do {
- docs.push_back( dbMsg.nextJsObj() );
- }
- while ( dbMsg.moreJSObjs() );
// Continue-on-error == unordered
+ bool coe = dbMsg.reservedField() & Reserved_InsertOption_ContinueOnError;
bool ordered = !coe;
- // No exceptions from here on
- BatchedCommandRequest* request =
- new BatchedCommandRequest( BatchedCommandRequest::BatchType_Insert );
- request->setNS( nss.ns() );
- for ( vector<BSONObj>::const_iterator it = docs.begin(); it != docs.end(); ++it ) {
- request->getInsertRequest()->addToDocuments( *it );
- }
- request->setOrdered( ordered );
+ while ( insertRequests->empty() || dbMsg.moreJSObjs() ) {
+
+ // Collect docs for next batch, but don't exceed maximum size
+ int totalInsertSize = 0;
+ vector<BSONObj> docs;
+ do {
+ const char* prevObjMark = dbMsg.markGet();
+ BSONObj nextObj = dbMsg.nextJsObj();
+ if ( totalInsertSize + nextObj.objsize() <= BSONObjMaxUserSize ) {
+ docs.push_back( nextObj );
+ totalInsertSize += docs.back().objsize();
+ }
+ else {
+ // Size limit exceeded, rollback to previous insert position
+ dbMsg.markReset( prevObjMark );
+ break;
+ }
+ }
+ while ( dbMsg.moreJSObjs() );
- return request;
+ dassert( !docs.empty() );
+
+ // No exceptions from here on
+ BatchedCommandRequest* request =
+ new BatchedCommandRequest( BatchedCommandRequest::BatchType_Insert );
+ request->setNS( nss.ns() );
+ for ( vector<BSONObj>::const_iterator it = docs.begin(); it != docs.end(); ++it ) {
+ request->getInsertRequest()->addToDocuments( *it );
+ }
+ request->setOrdered( ordered );
+
+ insertRequests->push_back( request );
+ }
}
BatchedCommandRequest* msgToBatchUpdate( const Message& updateMsg ) {
@@ -142,7 +159,7 @@ namespace mongo {
error->setErrMessage( response.getErrMessage() );
}
- void batchErrorToLastError( const BatchedCommandRequest& request,
+ bool batchErrorToLastError( const BatchedCommandRequest& request,
const BatchedCommandResponse& response,
LastError* error ) {
@@ -170,9 +187,10 @@ namespace mongo {
// Record an error if one exists
if ( lastBatchError ) {
+ string errMsg = lastBatchError->getErrMessage();
error->raiseError( lastBatchError->getErrCode(),
- lastBatchError->getErrMessage().c_str() );
- return;
+ errMsg.empty() ? "see code for details" : errMsg.c_str() );
+ return true;
}
// Record write stats otherwise
@@ -198,10 +216,18 @@ namespace mongo {
int numUpdated = response.getN() - numUpserted;
dassert( numUpdated >= 0 );
- error->recordUpdate( numUpdated > 0, response.getN(), upsertedId );
+
+ // Wrap upserted id in "upserted" field
+ BSONObj leUpsertedId;
+ if ( !upsertedId.isEmpty() )
+ leUpsertedId = upsertedId.firstElement().wrap( kUpsertedFieldName );
+
+ error->recordUpdate( numUpdated > 0, response.getN(), leUpsertedId );
}
else if ( request.getBatchType() == BatchedCommandRequest::BatchType_Delete ) {
error->recordDelete( response.getN() );
}
+
+ return false;
}
}
diff --git a/src/mongo/s/write_ops/batch_upconvert.h b/src/mongo/s/write_ops/batch_upconvert.h
index ca2f2c56df6..46b6d4552b6 100644
--- a/src/mongo/s/write_ops/batch_upconvert.h
+++ b/src/mongo/s/write_ops/batch_upconvert.h
@@ -28,6 +28,8 @@
#pragma once
+#include <vector>
+
#include "mongo/db/lasterror.h"
#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/s/write_ops/batched_command_response.h"
@@ -40,20 +42,23 @@ namespace mongo {
// NOTE: These functions throw on invalid message format.
//
- BatchedCommandRequest* msgToBatchRequest( const Message& msg );
+ void msgToBatchRequests( const Message& msg, std::vector<BatchedCommandRequest*>* requests );
- BatchedCommandRequest* msgToBatchInsert( const Message& insertMsg );
+ // Batch inserts may get mapped to multiple batch requests, to avoid spilling MaxBSONObjSize
+ void msgToBatchInserts( const Message& insertMsg,
+ std::vector<BatchedCommandRequest*>* insertRequests );
BatchedCommandRequest* msgToBatchUpdate( const Message& updateMsg );
BatchedCommandRequest* msgToBatchDelete( const Message& deleteMsg );
- //
- // Utility function for recording completed batch writes into the LastError object.
- // (Interpreting the response requires the request object as well.)
- //
-
- void batchErrorToLastError( const BatchedCommandRequest& request,
+ /**
+ * Utility function for recording completed batch writes into the LastError object.
+ * (Interpreting the response requires the request object as well.)
+ *
+ * Returns true if an error occurred in the batch.
+ */
+ bool batchErrorToLastError( const BatchedCommandRequest& request,
const BatchedCommandResponse& response,
LastError* error );
diff --git a/src/mongo/s/write_ops/batch_upconvert_test.cpp b/src/mongo/s/write_ops/batch_upconvert_test.cpp
index 880c91129db..0469e6324bf 100644
--- a/src/mongo/s/write_ops/batch_upconvert_test.cpp
+++ b/src/mongo/s/write_ops/batch_upconvert_test.cpp
@@ -28,6 +28,7 @@
#include "mongo/s/write_ops/batch_upconvert.h"
+#include "mongo/base/owned_pointer_vector.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/bson/util/builder.h"
@@ -56,7 +57,11 @@ namespace {
doc.appendSelfToBufBuilder( insertMsgB );
insertMsg.setData( dbInsert, insertMsgB.buf(), insertMsgB.len() );
- auto_ptr<BatchedCommandRequest> request( msgToBatchRequest( insertMsg ) );
+ OwnedPointerVector<BatchedCommandRequest> requestsOwned;
+ vector<BatchedCommandRequest*>& requests = requestsOwned.mutableVector();
+ msgToBatchRequests( insertMsg, &requests );
+
+ BatchedCommandRequest* request = requests.back();
ASSERT_EQUALS( request->getBatchType(), BatchedCommandRequest::BatchType_Insert );
string errMsg;
ASSERT( request->isValid( &errMsg ) );
diff --git a/src/mongo/s/write_ops/batch_write_exec.cpp b/src/mongo/s/write_ops/batch_write_exec.cpp
index de7926f2369..15000036ed3 100644
--- a/src/mongo/s/write_ops/batch_write_exec.cpp
+++ b/src/mongo/s/write_ops/batch_write_exec.cpp
@@ -36,19 +36,22 @@
namespace mongo {
- namespace {
+ BatchWriteExec::BatchWriteExec( NSTargeter* targeter,
+ ShardResolver* resolver,
+ MultiCommandDispatch* dispatcher ) :
+ _targeter( targeter ),
+ _resolver( resolver ),
+ _dispatcher( dispatcher ),
+ _stats( new BatchWriteExecStats ) {
+ }
- struct ConnectionStringComp {
- bool operator()( const ConnectionString& connStrA,
- const ConnectionString& connStrB ) const {
- return connStrA.toString().compare( connStrB.toString() ) < 0;
- }
- };
+ namespace {
//
// Map which allows associating ConnectionString hosts with TargetedWriteBatches
// This is needed since the dispatcher only returns hosts with responses.
//
+
// TODO: Unordered map?
typedef map<ConnectionString, TargetedWriteBatch*, ConnectionStringComp> HostBatchMap;
}
@@ -137,7 +140,8 @@ namespace mongo {
//
size_t numSent = 0;
- while ( numSent != childBatches.size() ) {
+ size_t numToSend = childBatches.size();
+ while ( numSent != numToSend ) {
// Collect batches out on the network, mapped by endpoint
HostBatchMap pendingBatches;
@@ -176,6 +180,7 @@ namespace mongo {
// We're done with this batch
*it = NULL;
+ --numToSend;
continue;
}
@@ -243,6 +248,13 @@ namespace mongo {
noteStaleResponses( staleErrors, _targeter );
++numStaleBatches;
}
+
+ // Remember that we successfully wrote to this shard
+ // NOTE: This will record lastOps for shards where we actually didn't update
+ // or delete any documents, which preserves old behavior but is conservative
+ _stats->noteWriteAt( shardHost,
+ response.isLastOpSet() ?
+ OpTime( response.getLastOp() ) : OpTime() );
}
else {
@@ -258,4 +270,19 @@ namespace mongo {
batchOp.buildClientResponse( clientResponse );
}
+ const BatchWriteExecStats& BatchWriteExec::getStats() {
+ return *_stats;
+ }
+
+ BatchWriteExecStats* BatchWriteExec::releaseStats() {
+ return _stats.release();
+ }
+
+ void BatchWriteExecStats::noteWriteAt( const ConnectionString& host, OpTime opTime ) {
+ _writeOpTimes[host] = opTime;
+ }
+
+ const HostOpTimeMap& BatchWriteExecStats::getWriteOpTimes() const {
+ return _writeOpTimes;
+ }
}
diff --git a/src/mongo/s/write_ops/batch_write_exec.h b/src/mongo/s/write_ops/batch_write_exec.h
index 1bc136f3262..d9324f2f8ae 100644
--- a/src/mongo/s/write_ops/batch_write_exec.h
+++ b/src/mongo/s/write_ops/batch_write_exec.h
@@ -30,7 +30,11 @@
#include <boost/scoped_ptr.hpp>
+#include <map>
+#include <string>
+
#include "mongo/base/disallow_copying.h"
+#include "mongo/bson/optime.h"
#include "mongo/s/ns_targeter.h"
#include "mongo/s/multi_command_dispatch.h"
#include "mongo/s/shard_resolver.h"
@@ -39,6 +43,8 @@
namespace mongo {
+ class BatchWriteExecStats;
+
/**
* The BatchWriteExec is able to execute client batch write requests, resulting in a batch
* response to send back to the client.
@@ -60,9 +66,7 @@ namespace mongo {
BatchWriteExec( NSTargeter* targeter,
ShardResolver* resolver,
- MultiCommandDispatch* dispatcher ) :
- _targeter( targeter ), _resolver( resolver ), _dispatcher( dispatcher ) {
- }
+ MultiCommandDispatch* dispatcher );
/**
* Executes a client batch write request by sending child batches to several shard
@@ -77,6 +81,10 @@ namespace mongo {
void executeBatch( const BatchedCommandRequest& clientRequest,
BatchedCommandResponse* clientResponse );
+ const BatchWriteExecStats& getStats();
+
+ BatchWriteExecStats* releaseStats();
+
private:
// Not owned here
@@ -87,5 +95,32 @@ namespace mongo {
// Not owned here
MultiCommandDispatch* _dispatcher;
+
+ // Stats
+ auto_ptr<BatchWriteExecStats> _stats;
+ };
+
+ // Useful comparator for using connection strings in ordered sets and maps
+ struct ConnectionStringComp {
+ bool operator()( const ConnectionString& connStrA,
+ const ConnectionString& connStrB ) const {
+ return connStrA.toString().compare( connStrB.toString() ) < 0;
+ }
+ };
+
+ typedef std::map<ConnectionString, OpTime, ConnectionStringComp> HostOpTimeMap;
+
+ class BatchWriteExecStats {
+ public:
+
+ // TODO: Other stats can go here
+
+ void noteWriteAt( const ConnectionString& host, OpTime opTime );
+
+ const HostOpTimeMap& getWriteOpTimes() const;
+
+ private:
+
+ HostOpTimeMap _writeOpTimes;
};
}
diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp
index e5f43628405..81843c41df6 100644
--- a/src/mongo/s/write_ops/batch_write_op.cpp
+++ b/src/mongo/s/write_ops/batch_write_op.cpp
@@ -33,7 +33,7 @@
namespace mongo {
BatchWriteStats::BatchWriteStats() :
- numInserted( 0 ), numUpserted( 0 ), numUpdated( 0 ), numDeleted( 0 ) {
+ numInserted( 0 ), numUpserted( 0 ), numUpdated( 0 ), numModified( 0 ), numDeleted( 0 ) {
}
BatchWriteOp::BatchWriteOp() :
@@ -340,7 +340,7 @@ namespace mongo {
numUpserted = 1;
}
stats->numUpdated += ( response.getN() - numUpserted );
- stats->numModified += ( response.getNDocsModified() - numUpserted );
+ stats->numModified += response.getNDocsModified();
stats->numUpserted += numUpserted;
}
else {
@@ -629,6 +629,8 @@ namespace mongo {
int nValue = _stats->numInserted + _stats->numUpserted + _stats->numUpdated
+ _stats->numDeleted;
batchResp->setN( nValue );
+ if ( _clientRequest->getBatchType() == BatchedCommandRequest::BatchType_Update )
+ batchResp->setNDocsModified( _stats->numModified );
batchResp->setOk( !batchResp->isErrCodeSet() );
dassert( batchResp->isValid( NULL ) );
diff --git a/src/mongo/s/write_ops/batched_command_response.cpp b/src/mongo/s/write_ops/batched_command_response.cpp
index e45bfa64494..2238a1773c4 100644
--- a/src/mongo/s/write_ops/batched_command_response.cpp
+++ b/src/mongo/s/write_ops/batched_command_response.cpp
@@ -148,7 +148,7 @@ namespace mongo {
// We're using appendNumber on generation so we'll try a smaller type
// (int) first and then fall back to the original type (long long).
- BSONField<int> fieldN("n");
+ BSONField<int> fieldN(n());
int tempN;
fieldState = FieldParser::extract(source, fieldN, &tempN, errMsg);
if (fieldState == FieldParser::FIELD_INVALID) {
@@ -164,7 +164,7 @@ namespace mongo {
// We're using appendNumber on generation so we'll try a smaller type
// (int) first and then fall back to the original type (long long).
- BSONField<int> fieldNUpdated("nDocumentsModified");
+ BSONField<int> fieldNUpdated(nDocsModified());
int tempNUpdated;
fieldState = FieldParser::extract(source, fieldNUpdated, &tempNUpdated, errMsg);
if (fieldState == FieldParser::FIELD_INVALID) {
diff --git a/src/mongo/util/assert_util.h b/src/mongo/util/assert_util.h
index b141611a6f6..e7ab0fd6777 100644
--- a/src/mongo/util/assert_util.h
+++ b/src/mongo/util/assert_util.h
@@ -141,8 +141,8 @@ namespace mongo {
virtual ~AssertionException() throw() { }
- virtual bool severe() { return true; }
- virtual bool isUserAssertion() { return false; }
+ virtual bool severe() const { return true; }
+ virtual bool isUserAssertion() const { return false; }
/* true if an interrupted exception - see KillCurrentOp */
bool interrupted() {
@@ -155,8 +155,8 @@ namespace mongo {
class MONGO_CLIENT_API UserException : public AssertionException {
public:
UserException(int c , const std::string& m) : AssertionException( m , c ) {}
- virtual bool severe() { return false; }
- virtual bool isUserAssertion() { return true; }
+ virtual bool severe() const { return false; }
+ virtual bool isUserAssertion() const { return true; }
virtual void appendPrefix( std::stringstream& ss ) const;
};
@@ -164,7 +164,7 @@ namespace mongo {
public:
MsgAssertionException( const ExceptionInfo& ei ) : AssertionException( ei ) {}
MsgAssertionException(int c, const std::string& m) : AssertionException( m , c ) {}
- virtual bool severe() { return false; }
+ virtual bool severe() const { return false; }
virtual void appendPrefix( std::stringstream& ss ) const;
};