/** * Copyright (C) 2013 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding #include "mongo/platform/basic.h" #include "mongo/s/write_ops/batch_write_exec.h" #include "mongo/base/error_codes.h" #include "mongo/base/owned_pointer_map.h" #include "mongo/base/status.h" #include "mongo/bson/util/builder.h" #include "mongo/client/dbclientinterface.h" // ConnectionString (header-only) #include "mongo/s/client/multi_command_dispatch.h" #include "mongo/s/write_ops/batch_write_op.h" #include "mongo/s/write_ops/write_error_detail.h" #include "mongo/util/log.h" namespace mongo { using std::endl; using std::make_pair; using std::stringstream; using std::vector; BatchWriteExec::BatchWriteExec(NSTargeter* targeter, ShardResolver* resolver, MultiCommandDispatch* dispatcher) : _targeter(targeter), _resolver(resolver), _dispatcher(dispatcher), _stats(new BatchWriteExecStats) {} namespace { // // Map which allows associating ConnectionString hosts with TargetedWriteBatches // This is needed since the dispatcher only returns hosts with responses. // // TODO: Unordered map? typedef OwnedPointerMap OwnedHostBatchMap; } static void buildErrorFrom(const Status& status, WriteErrorDetail* error) { error->setErrCode(status.code()); error->setErrMessage(status.reason()); } // Helper to note several stale errors from a response static void noteStaleResponses(const vector& staleErrors, NSTargeter* targeter) { for (vector::const_iterator it = staleErrors.begin(); it != staleErrors.end(); ++it) { const ShardError* error = *it; targeter->noteStaleResponse( error->endpoint, error->error.isErrInfoSet() ? error->error.getErrInfo() : BSONObj()); } } static bool isShardMetadataChanging(const vector& staleErrors) { if (!staleErrors.empty() && staleErrors.back()->error.isErrInfoSet()) return staleErrors.back()->error.getErrInfo()["inCriticalSection"].trueValue(); return false; } // The number of times we'll try to continue a batch op if no progress is being made // This only applies when no writes are occurring and metadata is not changing on reload static const int kMaxRoundsWithoutProgress(5); void BatchWriteExec::executeBatch(const BatchedCommandRequest& clientRequest, BatchedCommandResponse* clientResponse) { LOG(4) << "starting execution of write batch of size " << static_cast(clientRequest.sizeWriteOps()) << " for " << clientRequest.getNS() << endl; BatchWriteOp batchOp; batchOp.initClientRequest(&clientRequest); // Current batch status bool refreshedTargeter = false; int rounds = 0; int numCompletedOps = 0; int numRoundsWithoutProgress = 0; while (!batchOp.isFinished()) { // // Get child batches to send using the targeter // // Targeting errors can be caused by remote metadata changing (the collection could have // been dropped and recreated, for example with a new shard key). If a remote metadata // change occurs *before* a client sends us a batch, we need to make sure that we don't // error out just because we're staler than the client - otherwise mongos will be have // unpredictable behavior. // // (If a metadata change happens *during* or *after* a client sends us a batch, however, // we make no guarantees about delivery.) // // For this reason, we don't record targeting errors until we've refreshed our targeting // metadata at least once *after* receiving the client batch - at that point, we know: // // 1) our new metadata is the same as the metadata when the client sent a batch, and so // targeting errors are real. // OR // 2) our new metadata is a newer version than when the client sent a batch, and so // the metadata must have changed after the client batch was sent. We don't need to // deliver in this case, since for all the client knows we may have gotten the batch // exactly when the metadata changed. // OwnedPointerVector childBatchesOwned; vector& childBatches = childBatchesOwned.mutableVector(); // If we've already had a targeting error, we've refreshed the metadata once and can // record target errors definitively. bool recordTargetErrors = refreshedTargeter; Status targetStatus = batchOp.targetBatch(*_targeter, recordTargetErrors, &childBatches); if (!targetStatus.isOK()) { // Don't do anything until a targeter refresh _targeter->noteCouldNotTarget(); refreshedTargeter = true; ++_stats->numTargetErrors; dassert(childBatches.size() == 0u); } // // Send all child batches // size_t numSent = 0; size_t numToSend = childBatches.size(); bool remoteMetadataChanging = false; while (numSent != numToSend) { // Collect batches out on the network, mapped by endpoint OwnedHostBatchMap ownedPendingBatches; OwnedHostBatchMap::MapType& pendingBatches = ownedPendingBatches.mutableMap(); // // Send side // // Get as many batches as we can at once for (vector::iterator it = childBatches.begin(); it != childBatches.end(); ++it) { // // Collect the info needed to dispatch our targeted batch // TargetedWriteBatch* nextBatch = *it; // If the batch is NULL, we sent it previously, so skip if (nextBatch == NULL) continue; // Figure out what host we need to dispatch our targeted batch ConnectionString shardHost; Status resolveStatus = _resolver->chooseWriteHost(nextBatch->getEndpoint().shardName, &shardHost); if (!resolveStatus.isOK()) { ++_stats->numResolveErrors; // Record a resolve failure // TODO: It may be necessary to refresh the cache if stale, or maybe just // cancel and retarget the batch WriteErrorDetail error; buildErrorFrom(resolveStatus, &error); LOG(4) << "unable to send write batch to " << shardHost.toString() << causedBy(resolveStatus.toString()) << endl; batchOp.noteBatchError(*nextBatch, error); // We're done with this batch // Clean up when we can't resolve a host delete *it; *it = NULL; --numToSend; continue; } // If we already have a batch for this host, wait until the next time OwnedHostBatchMap::MapType::iterator pendingIt = pendingBatches.find(shardHost); if (pendingIt != pendingBatches.end()) continue; // // We now have all the info needed to dispatch the batch // BatchedCommandRequest request(clientRequest.getBatchType()); batchOp.buildBatchRequest(*nextBatch, &request); // Internally we use full namespaces for request/response, but we send the // command to a database with the collection name in the request. NamespaceString nss(request.getNS()); request.setNS(nss.coll()); LOG(4) << "sending write batch to " << shardHost.toString() << ": " << request.toString() << endl; _dispatcher->addCommand(shardHost, nss.db(), request); // Indicate we're done by setting the batch to NULL // We'll only get duplicate hostEndpoints if we have broadcast and non-broadcast // endpoints for the same host, so this should be pretty efficient without // moving stuff around. *it = NULL; // Recv-side is responsible for cleaning up the nextBatch when used pendingBatches.insert(make_pair(shardHost, nextBatch)); } // Send them all out _dispatcher->sendAll(); numSent += pendingBatches.size(); // // Recv side // while (_dispatcher->numPending() > 0) { // Get the response ConnectionString shardHost; BatchedCommandResponse response; Status dispatchStatus = _dispatcher->recvAny(&shardHost, &response); // Get the TargetedWriteBatch to find where to put the response dassert(pendingBatches.find(shardHost) != pendingBatches.end()); TargetedWriteBatch* batch = pendingBatches.find(shardHost)->second; if (dispatchStatus.isOK()) { TrackedErrors trackedErrors; trackedErrors.startTracking(ErrorCodes::StaleShardVersion); LOG(4) << "write results received from " << shardHost.toString() << ": " << response.toString() << endl; // Dispatch was ok, note response batchOp.noteBatchResponse(*batch, response, &trackedErrors); // Note if anything was stale const vector& staleErrors = trackedErrors.getErrors(ErrorCodes::StaleShardVersion); if (staleErrors.size() > 0) { noteStaleResponses(staleErrors, _targeter); ++_stats->numStaleBatches; } // Remember if the shard is actively changing metadata right now if (isShardMetadataChanging(staleErrors)) { remoteMetadataChanging = true; } // Remember that we successfully wrote to this shard // NOTE: This will record lastOps for shards where we actually didn't update // or delete any documents, which preserves old behavior but is conservative _stats->noteWriteAt(shardHost, response.isLastOpSet() ? response.getLastOp() : Timestamp(), response.isElectionIdSet() ? response.getElectionId() : OID()); } else { // Error occurred dispatching, note it stringstream msg; msg << "write results unavailable from " << shardHost.toString() << causedBy(dispatchStatus.toString()); WriteErrorDetail error; buildErrorFrom(Status(ErrorCodes::RemoteResultsUnavailable, msg.str()), &error); LOG(4) << "unable to receive write results from " << shardHost.toString() << causedBy(dispatchStatus.toString()) << endl; batchOp.noteBatchError(*batch, error); } } } ++rounds; ++_stats->numRounds; // If we're done, get out if (batchOp.isFinished()) break; // MORE WORK TO DO // // Refresh the targeter if we need to (no-op if nothing stale) // bool targeterChanged = false; Status refreshStatus = _targeter->refreshIfNeeded(&targeterChanged); if (!refreshStatus.isOK()) { // It's okay if we can't refresh, we'll just record errors for the ops if // needed. warning() << "could not refresh targeter" << causedBy(refreshStatus.reason()) << endl; } // // Ensure progress is being made toward completing the batch op // int currCompletedOps = batchOp.numWriteOpsIn(WriteOpState_Completed); if (currCompletedOps == numCompletedOps && !targeterChanged && !remoteMetadataChanging) { ++numRoundsWithoutProgress; } else { numRoundsWithoutProgress = 0; } numCompletedOps = currCompletedOps; if (numRoundsWithoutProgress > kMaxRoundsWithoutProgress) { stringstream msg; msg << "no progress was made executing batch write op in " << clientRequest.getNS() << " after " << kMaxRoundsWithoutProgress << " rounds (" << numCompletedOps << " ops completed in " << rounds << " rounds total)"; WriteErrorDetail error; buildErrorFrom(Status(ErrorCodes::NoProgressMade, msg.str()), &error); batchOp.abortBatch(error); break; } } batchOp.buildClientResponse(clientResponse); LOG(4) << "finished execution of write batch" << (clientResponse->isErrDetailsSet() ? " with write errors" : "") << (clientResponse->isErrDetailsSet() && clientResponse->isWriteConcernErrorSet() ? " and" : "") << (clientResponse->isWriteConcernErrorSet() ? " with write concern error" : "") << " for " << clientRequest.getNS() << endl; } const BatchWriteExecStats& BatchWriteExec::getStats() { return *_stats; } BatchWriteExecStats* BatchWriteExec::releaseStats() { return _stats.release(); } void BatchWriteExecStats::noteWriteAt(const ConnectionString& host, Timestamp opTime, const OID& electionId) { _writeOpTimes[host] = HostOpTime(opTime, electionId); } const HostOpTimeMap& BatchWriteExecStats::getWriteOpTimes() const { return _writeOpTimes; } }