diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/s/SConscript | 23 | ||||
-rw-r--r-- | src/mongo/s/batched_command_request.h | 47 | ||||
-rw-r--r-- | src/mongo/s/ns_targeter.h | 152 | ||||
-rw-r--r-- | src/mongo/s/write_op.cpp | 251 | ||||
-rw-r--r-- | src/mongo/s/write_op.h | 207 | ||||
-rw-r--r-- | src/mongo/s/write_op_test.cpp | 31 |
6 files changed, 711 insertions, 0 deletions
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index 55ec7ac01a1..4ab0267132c 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -196,3 +196,26 @@ env.CppUnitTest( ] ) +env.StaticLibrary( + target='write_ops', + source=[ + 'write_op.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/bson', + 'batched_write_ops', + ], +) + +env.CppUnitTest( + target='write_op_test', + source=[ + 'write_op_test.cpp', + ], + LIBDEPS=[ + 'write_ops', + '$BUILD_DIR/mongo/db/common', + ] +) + + diff --git a/src/mongo/s/batched_command_request.h b/src/mongo/s/batched_command_request.h index 8ab4d4f47ba..0764b997835 100644 --- a/src/mongo/s/batched_command_request.h +++ b/src/mongo/s/batched_command_request.h @@ -124,4 +124,51 @@ namespace mongo { }; + /** + * Similar to above, this class wraps the write items of a command request into a generically + * usable type. Very thin wrapper, does not own the write item itself. + * + * TODO: Use in BatchedCommandRequest above + */ + class BatchItemRef { + public: + + explicit BatchItemRef( const BatchItemRef& itemRef ) : + _request( itemRef._request ), _itemIndex( itemRef._itemIndex ) { + } + + BatchItemRef( const BatchedCommandRequest* request, int itemIndex ) : + _request( request ), _itemIndex( itemIndex ) { + } + + const BatchedCommandRequest* getRequest() const { + return _request; + } + + int getItemIndex() const { + return _itemIndex; + } + + BatchedCommandRequest::BatchType getOpType() const { + return _request->getBatchType(); + } + + BSONObj getDocument() const { + return _request->getInsertRequest()->getDocumentsAt( _itemIndex ); + } + + const BatchedUpdateDocument* getUpdate() const { + return _request->getUpdateRequest()->getUpdatesAt( _itemIndex ); + } + + const BatchedDeleteDocument* getDelete() const { + return _request->getDeleteRequest()->getDeletesAt( _itemIndex ); + } + + private: + + const BatchedCommandRequest* _request; + const int _itemIndex; + }; + } // namespace mongo diff --git a/src/mongo/s/ns_targeter.h b/src/mongo/s/ns_targeter.h new file mode 100644 index 00000000000..56395b30b7c --- /dev/null +++ b/src/mongo/s/ns_targeter.h @@ -0,0 +1,152 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include <string> + +#include "mongo/bson/bsonobj.h" +#include "mongo/base/status.h" +#include "mongo/client/dbclientinterface.h" +#include "mongo/db/namespace_string.h" +#include "mongo/s/chunk_version.h" + +namespace mongo { + + struct ShardEndpoint; + + /** + * The NSTargeter interface is used by a WriteOp to generate and target child write operations + * to a particular collection. + * + * The lifecyle of a NSTargeter is: + * 0. refreshIfNeeded() to get targeting information (this must be the *first* call to the + * targeter, since we may need to load initial information) + * 1. targetDoc/targetQuery as many times as is required + * 1a. On targeting failure, we may need to refresh, note this and goto 0. + * 2. On stale config from a child write operation, note the error + * 3. Goto 0. + * + * The refreshIfNeeded() operation must make progress against noted targeting or stale config + * failures, see comments below. No functions may block for shared resources or network calls + * except refreshIfNeeded(). + * + * Implementers are free to define more specific targeting error codes to allow more complex + * error handling. + * + * Interface must be externally synchronized if used in multiple threads, for now. + * TODO: Determine if we should internally synchronize. + */ + class NSTargeter { + public: + + virtual ~NSTargeter() { + } + + /** + * Returns the namespace targeted. + */ + virtual const NamespaceString& getNS() const = 0; + + /** + * Refreshes the targeting metadata for the namespace if needed, based on previously-noted + * stale responses and targeting failures. + * + * After this function is called, the targeter should be in a state such that the noted + * stale responses are not seen again and if a targeting failure occurred it reloaded - + * it should make progress. + * + * NOTE: This function may block for shared resources or network calls. + * Returns !OK with message if could not refresh + */ + virtual Status refreshIfNeeded() = 0; + + /** + * Returns a ShardEndpoint for a single document write. + * + * Returns ShardKeyNotFound if document does not have a full shard key. + * Returns !OK with message if document could not be targeted for other reasons. + */ + virtual Status targetDoc( const BSONObj& doc, + ShardEndpoint** endpoint ) const = 0; + + /** + * Returns a vector of ShardEndpoints for a potentially multi-shard query. + * + * Returns !OK with message if query could not be targeted. + */ + virtual Status targetQuery( const BSONObj& query, + std::vector<ShardEndpoint*>* endpoints ) const = 0; + + /** + * Informs the targeter of stale config responses for this namespace from an endpoint, with + * further information available in the returned staleInfo. + * + * Any stale responses noted here will be taken into account on the next refresh. + */ + virtual void noteStaleResponse( const ShardEndpoint& endpoint, + const BSONObj& staleInfo ) = 0; + + /** + * Informs the targeter that a remote refresh is needed on the next refresh. + */ + virtual void noteNeedsRefresh() = 0; + + }; + + /** + * A ShardEndpoint represents a destination for a targeted query or document. It contains both + * the logical target (shard name/version/broadcast) and the physical target (host name). + */ + struct ShardEndpoint { + + ShardEndpoint() { + } + + ShardEndpoint( const ShardEndpoint& other ) : + shardName( other.shardName ), + shardVersion( other.shardVersion ), + shardHost( other.shardHost ) { + } + + ShardEndpoint( const string& shardName, + const ChunkVersion& shardVersion, + const ConnectionString& shardHost ) : + shardName( shardName ), shardVersion( shardVersion ), shardHost( shardHost ) { + } + + const std::string shardName; + const ChunkVersion shardVersion; + const ConnectionString shardHost; + + // + // For testing *only* - do not use as part of API + // + + BSONObj toBSON() const { + BSONObjBuilder b; + appendBSON( &b ); + return b.obj(); + } + + void appendBSON( BSONObjBuilder* builder ) const { + builder->append( "shardName", shardName ); + shardVersion.addToBSON( *builder, "shardVersion" ); + builder->append( "shardHost", shardHost.toString() ); + } + }; + +} // namespace mongo diff --git a/src/mongo/s/write_op.cpp b/src/mongo/s/write_op.cpp new file mode 100644 index 00000000000..179f3e70171 --- /dev/null +++ b/src/mongo/s/write_op.cpp @@ -0,0 +1,251 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "mongo/s/write_op.h" + +#include "mongo/base/owned_pointer_vector.h" +#include "mongo/util/assert_util.h" + +namespace mongo { + + static void clear( vector<ChildWriteOp*>* childOps ) { + for ( vector<ChildWriteOp*>::const_iterator it = childOps->begin(); it != childOps->end(); + ++it ) { + delete *it; + } + childOps->clear(); + } + + WriteOp::~WriteOp() { + clear( &_childOps ); + clear( &_history ); + } + + WriteOpState WriteOp::getWriteState() const { + return _state; + } + + const BatchedErrorDetail& WriteOp::getOpError() const { + dassert( _state == WriteOpState_Error ); + return *_error; + } + + // + // TODO: Mongos targeting checks for updates/deletes go here, i.e. we can only do multi-ops if + // we've got the right flags set. + // + + static Status updateTargetsOk( const WriteOp& writeOp, + const vector<ShardEndpoint*>& endpoints ) { + // TODO: Multi, etc. + return Status::OK(); + } + + static Status deleteTargetsOk( const WriteOp& writeOp, + const vector<ShardEndpoint*>& endpoints ) { + // TODO: Single, etc. + return Status::OK(); + } + + Status WriteOp::targetWrites( const NSTargeter& targeter, + std::vector<TargetedWrite*>* targetedWrites ) { + + bool isUpdate = _itemRef.getOpType() == BatchedCommandRequest::BatchType_Update; + bool isDelete = _itemRef.getOpType() == BatchedCommandRequest::BatchType_Delete; + + // In case of error, don't leak. + OwnedPointerVector<ShardEndpoint> endpointsOwned; + vector<ShardEndpoint*>& endpoints = endpointsOwned.mutableVector(); + + if ( isUpdate || isDelete ) { + + // Updates/deletes targeted by query + + BSONObj queryDoc = + isUpdate ? _itemRef.getUpdate()->getQuery() : _itemRef.getDelete()->getQuery(); + + Status targetStatus = targeter.targetQuery( queryDoc, &endpoints ); + + if ( targetStatus.isOK() ) { + targetStatus = + isUpdate ? + updateTargetsOk( *this, endpoints ) : deleteTargetsOk( *this, endpoints ); + } + + if ( !targetStatus.isOK() ) return targetStatus; + } + else { + dassert( _itemRef.getOpType() == BatchedCommandRequest::BatchType_Insert ); + + // Inserts targeted by doc itself + + ShardEndpoint* endpoint = NULL; + Status targetStatus = targeter.targetDoc( _itemRef.getDocument(), &endpoint ); + + if ( !targetStatus.isOK() ) { + dassert( NULL == endpoint ); + return targetStatus; + } + + dassert( NULL != endpoint ); + endpoints.push_back( endpoint ); + } + + for ( vector<ShardEndpoint*>::iterator it = endpoints.begin(); it != endpoints.end(); ) { + + ShardEndpoint* endpoint = *it; + + _childOps.push_back( new ChildWriteOp( this ) ); + + WriteOpRef ref( _itemRef.getItemIndex(), _childOps.size() - 1 ); + + // For now, multiple endpoints imply no versioning + if ( endpoints.size() == 1u ) { + targetedWrites->push_back( new TargetedWrite( *endpoint, ref ) ); + } + else { + ShardEndpoint broadcastEndpoint( endpoint->shardName, + ChunkVersion::IGNORED(), + endpoint->shardHost ); + targetedWrites->push_back( new TargetedWrite( broadcastEndpoint, ref ) ); + } + + _childOps.back()->pendingWrite = targetedWrites->back(); + _childOps.back()->state = WriteOpState_Pending; + + // Don't cleanup the endpoint, now owned by child op + endpoints.erase( it++ ); + } + + return Status::OK(); + } + + static bool isRetryErrCode( int errCode ) { + return errCode == RecvStaleConfigCode; + } + + // Aggregate a bunch of errors for a single op together + static void combineOpErrors( const vector<ChildWriteOp*>& errOps, BatchedErrorDetail* error ) { + + // Special case single response + if ( errOps.size() == 1 ) { + errOps.front()->error->cloneTo( error ); + return; + } + + // TODO: XXX + error->setErrCode( 99999 ); + + // Generate the multi-error message below + stringstream msg; + msg << "multiple errors for op : "; + + BSONArrayBuilder errB; + for ( vector<ChildWriteOp*>::const_iterator it = errOps.begin(); it != errOps.end(); + ++it ) { + const ChildWriteOp* errOp = *it; + if ( it != errOps.begin() ) msg << " :: and :: "; + msg << errOp->error->getErrMessage(); + errB.append( errOp->error->toBSON() ); + } + + error->setErrInfo( BSON( "causedBy" << errB.arr() ) ); + error->setErrMessage( msg.str() ); + } + + /** + * This is the core function which aggregates all the results of a write operation on multiple + * shards and updates the write operation's state. + */ + void WriteOp::updateOpState() { + + vector<ChildWriteOp*> childErrors; + + bool isRetryError = true; + for ( vector<ChildWriteOp*>::iterator it = _childOps.begin(); it != _childOps.end(); + it++ ) { + + ChildWriteOp* childOp = *it; + + // Don't do anything till we have all the info + if ( childOp->state != WriteOpState_Completed + && childOp->state != WriteOpState_Error ) { + return; + } + + if ( childOp->state == WriteOpState_Error ) { + childErrors.push_back( childOp ); + // Any non-retry error aborts all + if ( !isRetryErrCode( childOp->error->getErrCode() ) ) isRetryError = false; + } + } + + if ( !childErrors.empty() && isRetryError ) { + // Since we're using broadcast mode for multi-shard writes, which cannot SCE + dassert( childErrors.size() == 1u ); + _state = WriteOpState_Ready; + } + else if ( !childErrors.empty() ) { + _error.reset( new BatchedErrorDetail ); + combineOpErrors( childErrors, _error.get() ); + _state = WriteOpState_Error; + } + else { + _state = WriteOpState_Completed; + } + + // Now that we're done with the child ops, do something with them + // TODO: Don't store unlimited history? + dassert( _state != WriteOpState_Pending ); + _history.insert( _history.end(), _childOps.begin(), _childOps.end() ); + _childOps.clear(); + } + + void WriteOp::noteWriteComplete( const TargetedWrite& targetedWrite ) { + + const WriteOpRef& ref = targetedWrite.writeOpRef; + dassert( static_cast<size_t>( ref.second ) < _childOps.size() ); + ChildWriteOp& childOp = *_childOps.at( ref.second ); + + childOp.pendingWrite = NULL; + childOp.endpoint.reset( new ShardEndpoint( targetedWrite.endpoint ) ); + childOp.state = WriteOpState_Completed; + updateOpState(); + } + + void WriteOp::noteWriteError( const TargetedWrite& targetedWrite, + const BatchedErrorDetail& error ) { + + const WriteOpRef& ref = targetedWrite.writeOpRef; + ChildWriteOp& childOp = *_childOps.at( ref.second ); + + childOp.pendingWrite = NULL; + childOp.endpoint.reset( new ShardEndpoint( targetedWrite.endpoint ) ); + childOp.error.reset( new BatchedErrorDetail ); + error.cloneTo( childOp.error.get() ); + childOp.state = WriteOpState_Error; + updateOpState(); + } + + void WriteOp::setOpError( const BatchedErrorDetail& error ) { + dassert( _state == WriteOpState_Ready ); + _error.reset( new BatchedErrorDetail ); + error.cloneTo( _error.get() ); + _state = WriteOpState_Error; + // No need to updateOpState, set directly + } + +} diff --git a/src/mongo/s/write_op.h b/src/mongo/s/write_op.h new file mode 100644 index 00000000000..9cc2ff63455 --- /dev/null +++ b/src/mongo/s/write_op.h @@ -0,0 +1,207 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include <boost/scoped_ptr.hpp> +#include <vector> + +#include "mongo/bson/bsonobj.h" +#include "mongo/s/batched_error_detail.h" +#include "mongo/s/batched_command_request.h" +#include "mongo/s/ns_targeter.h" + +namespace mongo { + + struct TargetedWrite; + struct ChildWriteOp; + + enum WriteOpState { + + // Item is ready to be targeted + WriteOpState_Ready, + + // Item is targeted and we're waiting for outstanding shard requests to populate + // responses + WriteOpState_Pending, + + // Op was successful, write completed + WriteOpState_Completed, + + // Op failed with some error + WriteOpState_Error, + + // Catch-all error state. + WriteOpState_Unknown + }; + + /** + * State of a single write item in-progress from a client request. + * + * The lifecyle of a write op: + * + * 0. Begins at _Ready, + * + * 1a. Targeted, and a ChildWriteOp created to track the state of each returned TargetedWrite. + * The state is changed to _Pending. + * 1b. If the op cannot be targeted, the error is set directly (_Error), and the write op is + * completed. + * + * 2. TargetedWrites finish successfully and unsuccessfully. + * + * On the last error arriving... + * + * 3a. If the errors allow for retry, the WriteOp is reset to _Ready, previous ChildWriteOps + * are placed in the history, and goto 0. + * 3b. If the errors don't allow for retry, they are combined into a single error and the + * state is changed to _Error. + * 3c. If there are no errors, the state is changed to _Completed. + * + * WriteOps finish in a _Completed or _Error state. + */ + class WriteOp { + public: + + WriteOp( const BatchItemRef& itemRef ) : + _itemRef( itemRef ), _state( WriteOpState_Ready ) { + } + + ~WriteOp(); + + /** + * Returns the op's current state. + */ + WriteOpState getWriteState() const; + + /** + * Returns the op's error. + * + * Can only be used in state _Error + */ + const BatchedErrorDetail& getOpError() const; + + /** + * Creates TargetedWrite operations for every applicable shard, which contain the + * information needed to send the child writes generated from this write item. + * + * The ShardTargeter determines the ShardEndpoints to send child writes to, but is not + * modified by this operation. + * + * Returns !OK if the targeting process itself fails + * (no TargetedWrites will be added, state unchanged) + */ + Status targetWrites( const NSTargeter& targeter, + std::vector<TargetedWrite*>* targetedWrites ); + + /** + * Marks the targeted write as finished for this write op. + * + * One of noteWriteComplete or noteWriteError should be called exactly once for every + * TargetedWrite. + */ + void noteWriteComplete( const TargetedWrite& targetedWrite ); + + /** + * Stores the error response of a TargetedWrite for later use, marks the write as finished. + * + * As above, one of noteWriteComplete or noteWriteError should be called exactly once for + * every TargetedWrite. + */ + void noteWriteError( const TargetedWrite& targetedWrite, const BatchedErrorDetail& error ); + + /** + * Sets the error for this write op directly, and forces the state to _Error. + * + * Should only be used when in state _Ready. + */ + void setOpError( const BatchedErrorDetail& error ); + + private: + + /** + * Updates the op state after new information is received. + */ + void updateOpState(); + + // Owned elsewhere, reference to a batch with a write item + const BatchItemRef _itemRef; + + // What stage of the operation we are at + WriteOpState _state; + + // filled when state == _Pending + std::vector<ChildWriteOp*> _childOps; + + // filled when state == _Error + scoped_ptr<BatchedErrorDetail> _error; + + // Finished child operations, for debugging + std::vector<ChildWriteOp*> _history; + }; + + /** + * State of a write in-progress (to a single shard) which is one part of a larger write + * operation. + * + * As above, the write op may finish in either a successful (_Completed) or unsuccessful + * (_Error) state. + */ + struct ChildWriteOp { + + ChildWriteOp( WriteOp* const parent ) : + parentOp( parent ), state( WriteOpState_Ready ), pendingWrite( NULL ) { + } + + const WriteOp* const parentOp; + WriteOpState state; + + // non-zero when state == _Pending + // Not owned here but tracked for reporting + TargetedWrite* pendingWrite; + + // filled when state > _Pending + scoped_ptr<ShardEndpoint> endpoint; + + // filled when state == _Error + scoped_ptr<BatchedErrorDetail> error; + }; + + // First value is write item index in the batch, second value is child write op index + typedef pair<int, int> WriteOpRef; + + /** + * A write with A) a request targeted at a particular shard endpoint, and B) a response targeted + * at a particular WriteOp. + * + * TargetedWrites are the link between the RPC layer and the in-progress write + * operation. + */ + struct TargetedWrite { + + TargetedWrite( const ShardEndpoint& endpoint, WriteOpRef writeOpRef ) : + endpoint( endpoint ), writeOpRef( writeOpRef ) { + } + + // Where to send the write + ShardEndpoint endpoint; + + // Where to find the write item and put the response + // TODO: Could be a more complex handle, shared between write state and networking code if + // we need to be able to cancel ops. + WriteOpRef writeOpRef; + }; + +} diff --git a/src/mongo/s/write_op_test.cpp b/src/mongo/s/write_op_test.cpp new file mode 100644 index 00000000000..e32f85a7d79 --- /dev/null +++ b/src/mongo/s/write_op_test.cpp @@ -0,0 +1,31 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "mongo/s/write_op.h" + +#include "mongo/s/batched_command_request.h" +#include "mongo/unittest/unittest.h" + +namespace { + + using namespace mongo; + + TEST(WriteOpTests, Basic) { + WriteOp( BatchItemRef( NULL, 0 ) ); + ASSERT( true ); + } + +} // unnamed namespace |