diff options
Diffstat (limited to 'src/mongo/client/fetcher.cpp')
-rw-r--r-- | src/mongo/client/fetcher.cpp | 457 |
1 files changed, 226 insertions, 231 deletions
diff --git a/src/mongo/client/fetcher.cpp b/src/mongo/client/fetcher.cpp index f11ced0ce2f..01da06bc357 100644 --- a/src/mongo/client/fetcher.cpp +++ b/src/mongo/client/fetcher.cpp @@ -40,272 +40,267 @@ namespace mongo { namespace { - const char* kCursorFieldName = "cursor"; - const char* kCursorIdFieldName = "id"; - const char* kNamespaceFieldName = "ns"; - - const char* kFirstBatchFieldName = "firstBatch"; - const char* kNextBatchFieldName = "nextBatch"; - - /** - * Parses cursor response in command result for cursor ID, namespace and documents. - * 'batchFieldName' will be 'firstBatch' for the initial remote command invocation and - * 'nextBatch' for getMore. - */ - Status parseCursorResponse(const BSONObj& obj, - const std::string& batchFieldName, - Fetcher::QueryResponse* batchData) { - invariant(batchFieldName == kFirstBatchFieldName || batchFieldName == kNextBatchFieldName); - invariant(batchData); - - BSONElement cursorElement = obj.getField(kCursorFieldName); - if (cursorElement.eoo()) { - return Status(ErrorCodes::FailedToParse, str::stream() << - "cursor response must contain '" << kCursorFieldName << - "' field: " << obj); - } - if (!cursorElement.isABSONObj()) { - return Status(ErrorCodes::FailedToParse, str::stream() << - "'" << kCursorFieldName << "' field must be an object: " << obj); - } - BSONObj cursorObj = cursorElement.Obj(); - - BSONElement cursorIdElement = cursorObj.getField(kCursorIdFieldName); - if (cursorIdElement.eoo()) { - return Status(ErrorCodes::FailedToParse, str::stream() << - "cursor response must contain '" << kCursorFieldName << "." << - kCursorIdFieldName << "' field: " << obj); - } - if (!(cursorIdElement.type() == mongo::NumberLong || - cursorIdElement.type() == mongo::NumberInt)) { - return Status(ErrorCodes::FailedToParse, str::stream() << - "'" << kCursorFieldName << "." << kCursorIdFieldName << - "' field must be a integral number of type 'int' or 'long' but was a '" - << typeName(cursorIdElement.type()) << "': " << obj); - } - batchData->cursorId = cursorIdElement.numberLong(); - - BSONElement namespaceElement = cursorObj.getField(kNamespaceFieldName); - if (namespaceElement.eoo()) { - return Status(ErrorCodes::FailedToParse, str::stream() << - "cursor response must contain " << - "'" << kCursorFieldName << "." << kNamespaceFieldName << "' field: " << - obj); - } - if (namespaceElement.type() != mongo::String) { - return Status(ErrorCodes::FailedToParse, str::stream() << - "'" << kCursorFieldName << "." << kNamespaceFieldName << - "' field must be a string: " << obj); - } - NamespaceString tempNss(namespaceElement.valuestrsafe()); - if (!tempNss.isValid()) { - return Status(ErrorCodes::BadValue, str::stream() << - "'" << kCursorFieldName << "." << kNamespaceFieldName << - "' contains an invalid namespace: " << obj); - } - batchData->nss = tempNss; +const char* kCursorFieldName = "cursor"; +const char* kCursorIdFieldName = "id"; +const char* kNamespaceFieldName = "ns"; - BSONElement batchElement = cursorObj.getField(batchFieldName); - if (batchElement.eoo()) { - return Status(ErrorCodes::FailedToParse, str::stream() << - "cursor response must contain '" << kCursorFieldName << "." << - batchFieldName << "' field: " << obj); - } - if (!batchElement.isABSONObj()) { - return Status(ErrorCodes::FailedToParse, str::stream() << - "'" << kCursorFieldName << "." << batchFieldName << - "' field must be an array: " << obj); - } - BSONObj batchObj = batchElement.Obj(); - for (auto itemElement : batchObj) { - if (!itemElement.isABSONObj()) { - return Status(ErrorCodes::FailedToParse, str::stream() << - "found non-object " << itemElement << " in " << - "'" << kCursorFieldName << "." << batchFieldName << "' field: " << - obj); - } - batchData->documents.push_back(itemElement.Obj().getOwned()); - } +const char* kFirstBatchFieldName = "firstBatch"; +const char* kNextBatchFieldName = "nextBatch"; - return Status::OK(); +/** + * Parses cursor response in command result for cursor ID, namespace and documents. + * 'batchFieldName' will be 'firstBatch' for the initial remote command invocation and + * 'nextBatch' for getMore. + */ +Status parseCursorResponse(const BSONObj& obj, + const std::string& batchFieldName, + Fetcher::QueryResponse* batchData) { + invariant(batchFieldName == kFirstBatchFieldName || batchFieldName == kNextBatchFieldName); + invariant(batchData); + + BSONElement cursorElement = obj.getField(kCursorFieldName); + if (cursorElement.eoo()) { + return Status(ErrorCodes::FailedToParse, + str::stream() << "cursor response must contain '" << kCursorFieldName + << "' field: " << obj); } + if (!cursorElement.isABSONObj()) { + return Status(ErrorCodes::FailedToParse, + str::stream() << "'" << kCursorFieldName + << "' field must be an object: " << obj); + } + BSONObj cursorObj = cursorElement.Obj(); - Status parseReplResponse(const BSONObj& obj) { - return Status::OK(); + BSONElement cursorIdElement = cursorObj.getField(kCursorIdFieldName); + if (cursorIdElement.eoo()) { + return Status(ErrorCodes::FailedToParse, + str::stream() << "cursor response must contain '" << kCursorFieldName << "." + << kCursorIdFieldName << "' field: " << obj); } - -} // namespace - - Fetcher::QueryResponse::QueryResponse(CursorId theCursorId, - const NamespaceString& theNss, - Documents theDocuments) - : cursorId(theCursorId), - nss(theNss), - documents(theDocuments) { } - - Fetcher::Fetcher(executor::TaskExecutor* executor, - const HostAndPort& source, - const std::string& dbname, - const BSONObj& findCmdObj, - const CallbackFn& work) - : _executor(executor), - _source(source), - _dbname(dbname), - _cmdObj(findCmdObj.getOwned()), - _work(work), - _active(false), - _remoteCommandCallbackHandle() { - - uassert(ErrorCodes::BadValue, "null replication executor", executor); - uassert(ErrorCodes::BadValue, "database name cannot be empty", !dbname.empty()); - uassert(ErrorCodes::BadValue, "command object cannot be empty", !findCmdObj.isEmpty()); - uassert(ErrorCodes::BadValue, "callback function cannot be null", work); + if (!(cursorIdElement.type() == mongo::NumberLong || + cursorIdElement.type() == mongo::NumberInt)) { + return Status(ErrorCodes::FailedToParse, + str::stream() + << "'" << kCursorFieldName << "." << kCursorIdFieldName + << "' field must be a integral number of type 'int' or 'long' but was a '" + << typeName(cursorIdElement.type()) << "': " << obj); } - - Fetcher::~Fetcher() { - DESTRUCTOR_GUARD( - cancel(); - wait(); - ); + batchData->cursorId = cursorIdElement.numberLong(); + + BSONElement namespaceElement = cursorObj.getField(kNamespaceFieldName); + if (namespaceElement.eoo()) { + return Status(ErrorCodes::FailedToParse, + str::stream() << "cursor response must contain " + << "'" << kCursorFieldName << "." << kNamespaceFieldName + << "' field: " << obj); } - - std::string Fetcher::getDiagnosticString() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); - str::stream output; - output << "Fetcher"; - output << " executor: " << _executor->getDiagnosticString(); - output << " source: " << _source.toString(); - output << " database: " << _dbname; - output << " query: " << _cmdObj; - output << " active: " << _active; - return output; + if (namespaceElement.type() != mongo::String) { + return Status(ErrorCodes::FailedToParse, + str::stream() << "'" << kCursorFieldName << "." << kNamespaceFieldName + << "' field must be a string: " << obj); } - - bool Fetcher::isActive() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); - return _active; + NamespaceString tempNss(namespaceElement.valuestrsafe()); + if (!tempNss.isValid()) { + return Status(ErrorCodes::BadValue, + str::stream() << "'" << kCursorFieldName << "." << kNamespaceFieldName + << "' contains an invalid namespace: " << obj); } + batchData->nss = tempNss; - Status Fetcher::schedule() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_active) { - return Status(ErrorCodes::IllegalOperation, "fetcher already scheduled"); + BSONElement batchElement = cursorObj.getField(batchFieldName); + if (batchElement.eoo()) { + return Status(ErrorCodes::FailedToParse, + str::stream() << "cursor response must contain '" << kCursorFieldName << "." + << batchFieldName << "' field: " << obj); + } + if (!batchElement.isABSONObj()) { + return Status(ErrorCodes::FailedToParse, + str::stream() << "'" << kCursorFieldName << "." << batchFieldName + << "' field must be an array: " << obj); + } + BSONObj batchObj = batchElement.Obj(); + for (auto itemElement : batchObj) { + if (!itemElement.isABSONObj()) { + return Status(ErrorCodes::FailedToParse, + str::stream() << "found non-object " << itemElement << " in " + << "'" << kCursorFieldName << "." << batchFieldName + << "' field: " << obj); } - return _schedule_inlock(_cmdObj, kFirstBatchFieldName); + batchData->documents.push_back(itemElement.Obj().getOwned()); } - void Fetcher::cancel() { - executor::TaskExecutor::CallbackHandle remoteCommandCallbackHandle; - { - stdx::lock_guard<stdx::mutex> lk(_mutex); + return Status::OK(); +} + +Status parseReplResponse(const BSONObj& obj) { + return Status::OK(); +} + +} // namespace + +Fetcher::QueryResponse::QueryResponse(CursorId theCursorId, + const NamespaceString& theNss, + Documents theDocuments) + : cursorId(theCursorId), nss(theNss), documents(theDocuments) {} + +Fetcher::Fetcher(executor::TaskExecutor* executor, + const HostAndPort& source, + const std::string& dbname, + const BSONObj& findCmdObj, + const CallbackFn& work) + : _executor(executor), + _source(source), + _dbname(dbname), + _cmdObj(findCmdObj.getOwned()), + _work(work), + _active(false), + _remoteCommandCallbackHandle() { + uassert(ErrorCodes::BadValue, "null replication executor", executor); + uassert(ErrorCodes::BadValue, "database name cannot be empty", !dbname.empty()); + uassert(ErrorCodes::BadValue, "command object cannot be empty", !findCmdObj.isEmpty()); + uassert(ErrorCodes::BadValue, "callback function cannot be null", work); +} + +Fetcher::~Fetcher() { + DESTRUCTOR_GUARD(cancel(); wait();); +} + +std::string Fetcher::getDiagnosticString() const { + stdx::lock_guard<stdx::mutex> lk(_mutex); + str::stream output; + output << "Fetcher"; + output << " executor: " << _executor->getDiagnosticString(); + output << " source: " << _source.toString(); + output << " database: " << _dbname; + output << " query: " << _cmdObj; + output << " active: " << _active; + return output; +} + +bool Fetcher::isActive() const { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _active; +} + +Status Fetcher::schedule() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + if (_active) { + return Status(ErrorCodes::IllegalOperation, "fetcher already scheduled"); + } + return _schedule_inlock(_cmdObj, kFirstBatchFieldName); +} - if (!_active) { - return; - } +void Fetcher::cancel() { + executor::TaskExecutor::CallbackHandle remoteCommandCallbackHandle; + { + stdx::lock_guard<stdx::mutex> lk(_mutex); - remoteCommandCallbackHandle = _remoteCommandCallbackHandle; + if (!_active) { + return; } - invariant(remoteCommandCallbackHandle.isValid()); - _executor->cancel(remoteCommandCallbackHandle); + remoteCommandCallbackHandle = _remoteCommandCallbackHandle; } - void Fetcher::wait() { - stdx::unique_lock<stdx::mutex> lk(_mutex); - _condition.wait(lk, [this]() { return !_active; }); - } + invariant(remoteCommandCallbackHandle.isValid()); + _executor->cancel(remoteCommandCallbackHandle); +} - Status Fetcher::_schedule_inlock(const BSONObj& cmdObj, const char* batchFieldName) { - StatusWith<executor::TaskExecutor::CallbackHandle> scheduleResult = - _executor->scheduleRemoteCommand( - RemoteCommandRequest(_source, _dbname, cmdObj), - stdx::bind(&Fetcher::_callback, this, stdx::placeholders::_1, batchFieldName)); +void Fetcher::wait() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + _condition.wait(lk, [this]() { return !_active; }); +} - if (!scheduleResult.isOK()) { - return scheduleResult.getStatus(); - } +Status Fetcher::_schedule_inlock(const BSONObj& cmdObj, const char* batchFieldName) { + StatusWith<executor::TaskExecutor::CallbackHandle> scheduleResult = + _executor->scheduleRemoteCommand( + RemoteCommandRequest(_source, _dbname, cmdObj), + stdx::bind(&Fetcher::_callback, this, stdx::placeholders::_1, batchFieldName)); - _active = true; - _remoteCommandCallbackHandle = scheduleResult.getValue(); - return Status::OK(); + if (!scheduleResult.isOK()) { + return scheduleResult.getStatus(); } - void Fetcher::_callback(const executor::TaskExecutor::RemoteCommandCallbackArgs& rcbd, - const char* batchFieldName) { - - if (!rcbd.response.isOK()) { - _work(StatusWith<Fetcher::QueryResponse>(rcbd.response.getStatus()), nullptr, nullptr); - _finishCallback(); - return; - } - - const BSONObj& queryResponseObj = rcbd.response.getValue().data; - Status status = getStatusFromCommandResult(queryResponseObj); - if (!status.isOK()) { - _work(StatusWith<Fetcher::QueryResponse>(status), nullptr, nullptr); - _finishCallback(); - return; - } + _active = true; + _remoteCommandCallbackHandle = scheduleResult.getValue(); + return Status::OK(); +} + +void Fetcher::_callback(const executor::TaskExecutor::RemoteCommandCallbackArgs& rcbd, + const char* batchFieldName) { + if (!rcbd.response.isOK()) { + _work(StatusWith<Fetcher::QueryResponse>(rcbd.response.getStatus()), nullptr, nullptr); + _finishCallback(); + return; + } - status = parseReplResponse(queryResponseObj); - if (!status.isOK()) { - _work(StatusWith<Fetcher::QueryResponse>(status), nullptr, nullptr); - _finishCallback(); - return; - } + const BSONObj& queryResponseObj = rcbd.response.getValue().data; + Status status = getStatusFromCommandResult(queryResponseObj); + if (!status.isOK()) { + _work(StatusWith<Fetcher::QueryResponse>(status), nullptr, nullptr); + _finishCallback(); + return; + } - QueryResponse batchData; - status = parseCursorResponse(queryResponseObj, batchFieldName, &batchData); - if (!status.isOK()) { - _work(StatusWith<Fetcher::QueryResponse>(status), nullptr, nullptr); - _finishCallback(); - return; - } + status = parseReplResponse(queryResponseObj); + if (!status.isOK()) { + _work(StatusWith<Fetcher::QueryResponse>(status), nullptr, nullptr); + _finishCallback(); + return; + } - NextAction nextAction = NextAction::kNoAction; + QueryResponse batchData; + status = parseCursorResponse(queryResponseObj, batchFieldName, &batchData); + if (!status.isOK()) { + _work(StatusWith<Fetcher::QueryResponse>(status), nullptr, nullptr); + _finishCallback(); + return; + } - if (!batchData.cursorId) { - _work(StatusWith<QueryResponse>(batchData), &nextAction, nullptr); - _finishCallback(); - return; - } + NextAction nextAction = NextAction::kNoAction; - nextAction = NextAction::kGetMore; + if (!batchData.cursorId) { + _work(StatusWith<QueryResponse>(batchData), &nextAction, nullptr); + _finishCallback(); + return; + } - BSONObjBuilder bob; - _work(StatusWith<QueryResponse>(batchData), &nextAction, &bob); + nextAction = NextAction::kGetMore; - // Callback function _work may modify nextAction to request the fetcher - // not to schedule a getMore command. - if (nextAction != NextAction::kGetMore) { - _finishCallback(); - return; - } + BSONObjBuilder bob; + _work(StatusWith<QueryResponse>(batchData), &nextAction, &bob); - // Callback function may also disable the fetching of additional data by not filling in the - // BSONObjBuilder for the getMore command. - auto cmdObj = bob.obj(); - if (cmdObj.isEmpty()) { - _finishCallback(); - return; - } + // Callback function _work may modify nextAction to request the fetcher + // not to schedule a getMore command. + if (nextAction != NextAction::kGetMore) { + _finishCallback(); + return; + } - { - stdx::lock_guard<stdx::mutex> lk(_mutex); - status = _schedule_inlock(cmdObj, kNextBatchFieldName); - } - if (!status.isOK()) { - nextAction = NextAction::kNoAction; - _work(StatusWith<Fetcher::QueryResponse>(status), nullptr, nullptr); - _finishCallback(); - return; - } + // Callback function may also disable the fetching of additional data by not filling in the + // BSONObjBuilder for the getMore command. + auto cmdObj = bob.obj(); + if (cmdObj.isEmpty()) { + _finishCallback(); + return; } - void Fetcher::_finishCallback() { + { stdx::lock_guard<stdx::mutex> lk(_mutex); - _active = false; - _condition.notify_all(); + status = _schedule_inlock(cmdObj, kNextBatchFieldName); + } + if (!status.isOK()) { + nextAction = NextAction::kNoAction; + _work(StatusWith<Fetcher::QueryResponse>(status), nullptr, nullptr); + _finishCallback(); + return; } +} + +void Fetcher::_finishCallback() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _active = false; + _condition.notify_all(); +} -} // namespace mongo +} // namespace mongo |