/**
* Copyright (C) 2017 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#include "mongo/platform/basic.h"
#include "mongo/db/curop.h"
#include "mongo/db/pipeline/document_source_check_resume_token.h"
using boost::intrusive_ptr;
namespace mongo {
namespace {
using ResumeStatus = DocumentSourceEnsureResumeTokenPresent::ResumeStatus;
// Returns ResumeStatus::kFoundToken if the document retrieved from the resumed pipeline satisfies
// the client's resume token, ResumeStatus::kCheckNextDoc if it is older than the client's token,
// and ResumeToken::kCannotResume if it is more recent than the client's resume token (indicating
// that we will never see the token). If the resume token's documentKey contains only the _id field
// while the pipeline documentKey contains additional fields, then the collection has become
// sharded since the resume token was generated. In that case, we relax the requirements such that
// only the timestamp, version, applyOpsIndex, UUID and documentKey._id need match. This remains
// correct, since the only circumstances under which the resume token omits the shard key is if it
// was generated either (1) before the collection was sharded, (2) after the collection was sharded
// but before the primary shard became aware of that fact, implying that it was before the first
// chunk moved off the shard, or (3) by a malicious client who has constructed their own resume
// token. In the first two cases, we can be guaranteed that the _id is unique and the stream can
// therefore be resumed seamlessly; in the third case, the worst that can happen is that some
// entries are missed or duplicated. Note that the simple collation is used to compare the resume
// tokens, and that we purposefully avoid the user's requested collation if present.
ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr& expCtx,
const Document& documentFromResumedStream,
const ResumeToken& tokenFromClient) {
// Parse both the stream doc and the client's resume token into comprehensible ResumeTokenData.
auto tokenDataFromClient = tokenFromClient.getData();
auto tokenDataFromResumedStream =
ResumeToken::parse(documentFromResumedStream["_id"].getDocument()).getData();
// We start the resume with a $gte query on the timestamp, so we never expect it to be lower
// than our resume token's timestamp.
invariant(tokenDataFromResumedStream.clusterTime >= tokenDataFromClient.clusterTime);
// If the clusterTime differs from the client's token, this stream cannot be resumed.
if (tokenDataFromResumedStream.clusterTime != tokenDataFromClient.clusterTime) {
return ResumeStatus::kCannotResume;
}
if (tokenDataFromResumedStream.applyOpsIndex < tokenDataFromClient.applyOpsIndex) {
return ResumeStatus::kCheckNextDoc;
} else if (tokenDataFromResumedStream.applyOpsIndex > tokenDataFromClient.applyOpsIndex) {
// This could happen if the client provided an applyOpsIndex of 0, yet the 0th document in
// the applyOps was irrelevant (meaning it was an operation on a collection or DB not being
// watched). This indicates a corrupt resume token.
uasserted(50792, "Invalid resumeToken: applyOpsIndex was skipped");
}
// It is acceptable for the stream UUID to differ from the client's, if this is a whole-database
// or cluster-wide stream and we are comparing operations from different shards at the same
// clusterTime. If the stream UUID sorts after the client's, however, then the stream is not
// resumable; we are past the point in the stream where the token should have appeared.
if (tokenDataFromResumedStream.uuid != tokenDataFromClient.uuid) {
// If we're not in mongos then this must be a replica set deployment, in which case we don't
// ever expect to see identical timestamps and we reject the resume attempt immediately.
return !expCtx->inMongos || tokenDataFromResumedStream.uuid > tokenDataFromClient.uuid
? ResumeStatus::kCannotResume
: ResumeStatus::kCheckNextDoc;
}
// If all the fields match exactly, then we have found the token.
if (ValueComparator::kInstance.evaluate(tokenDataFromResumedStream.documentKey ==
tokenDataFromClient.documentKey)) {
return ResumeStatus::kFoundToken;
}
// At this point, we know that the tokens differ only by documentKey. The status we return will
// depend on whether the stream token is logically before or after the client token. If the
// latter, then we will never see the resume token and the stream cannot be resumed. However,
// before we can return this value, we need to check the possibility that the resumed stream is
// on a sharded collection and the client token is from before the collection was sharded.
const auto defaultResumeStatus =
ValueComparator::kInstance.evaluate(tokenDataFromResumedStream.documentKey >
tokenDataFromClient.documentKey)
? ResumeStatus::kCannotResume
: ResumeStatus::kCheckNextDoc;
// If we're not running in a sharded context, we don't need to proceed any further.
if (!expCtx->needsMerge && !expCtx->inMongos) {
return defaultResumeStatus;
}
// If we reach here, we still need to check the possibility that the collection has become
// sharded in the time since the client's resume token was generated. If so, then the client
// token will only have an _id field, while the token from the new pipeline may have additional
// shard key fields.
// We expect the documentKey to be an object in both the client and stream tokens. If either is
// not, then we cannot compare the embedded _id values in each, and so the stream token does not
// satisfy the client token.
if (tokenDataFromClient.documentKey.getType() != BSONType::Object ||
tokenDataFromResumedStream.documentKey.getType() != BSONType::Object) {
return defaultResumeStatus;
}
auto documentKeyFromResumedStream = tokenDataFromResumedStream.documentKey.getDocument();
auto documentKeyFromClient = tokenDataFromClient.documentKey.getDocument();
// In order for the relaxed comparison to be applicable, the client token must have a single _id
// field, and the resumed stream token must have additional fields beyond _id.
if (!(documentKeyFromClient.size() == 1 && documentKeyFromResumedStream.size() > 1)) {
return defaultResumeStatus;
}
// If the resume token's documentKey only contains the _id field while the pipeline's
// documentKey contains additional fields, we require only that the _ids match.
return (!documentKeyFromClient["_id"].missing() &&
ValueComparator::kInstance.evaluate(documentKeyFromResumedStream["_id"] ==
documentKeyFromClient["_id"])
? ResumeStatus::kFoundToken
: defaultResumeStatus);
}
} // namespace
const char* DocumentSourceEnsureResumeTokenPresent::getSourceName() const {
return "$_ensureResumeTokenPresent";
}
Value DocumentSourceEnsureResumeTokenPresent::serialize(
boost::optional explain) const {
// This stage is created by the DocumentSourceChangeStream stage, so serializing it here
// would result in it being created twice.
return Value();
}
intrusive_ptr
DocumentSourceEnsureResumeTokenPresent::create(const intrusive_ptr& expCtx,
ResumeToken token) {
return new DocumentSourceEnsureResumeTokenPresent(expCtx, std::move(token));
}
DocumentSourceEnsureResumeTokenPresent::DocumentSourceEnsureResumeTokenPresent(
const intrusive_ptr& expCtx, ResumeToken token)
: DocumentSource(expCtx), _tokenFromClient(std::move(token)) {}
DocumentSource::GetNextResult DocumentSourceEnsureResumeTokenPresent::getNext() {
pExpCtx->checkForInterrupt();
if (_resumeStatus == ResumeStatus::kFoundToken) {
// We've already verified the resume token is present.
return pSource->getNext();
}
Document documentFromResumedStream;
// Keep iterating the stream until we see either the resume token we're looking for,
// or a change with a higher timestamp than our resume token.
while (_resumeStatus == ResumeStatus::kCheckNextDoc) {
auto nextInput = pSource->getNext();
if (!nextInput.isAdvanced())
return nextInput;
// The incoming documents are sorted on clusterTime, uuid, documentKey. We examine a range
// of documents that have the same prefix (i.e. clusterTime and uuid). If the user provided
// token would sort before this received document we cannot resume the change stream.
_resumeStatus = compareAgainstClientResumeToken(
pExpCtx, (documentFromResumedStream = nextInput.getDocument()), _tokenFromClient);
}
uassert(40585,
str::stream()
<< "resume of change stream was not possible, as the resume token was not found. "
<< documentFromResumedStream["_id"].getDocument().toString(),
_resumeStatus != ResumeStatus::kCannotResume);
// If we reach this point, then we've seen the resume token.
invariant(_resumeStatus == ResumeStatus::kFoundToken);
// Don't return the document which has the token; the user has already seen it.
return pSource->getNext();
}
const char* DocumentSourceShardCheckResumability::getSourceName() const {
return "$_checkShardResumability";
}
Value DocumentSourceShardCheckResumability::serialize(
boost::optional explain) const {
// This stage is created by the DocumentSourceChangeStream stage, so serializing it here
// would result in it being created twice.
return Value();
}
intrusive_ptr DocumentSourceShardCheckResumability::create(
const intrusive_ptr& expCtx, Timestamp ts) {
return new DocumentSourceShardCheckResumability(expCtx, ts);
}
DocumentSourceShardCheckResumability::DocumentSourceShardCheckResumability(
const intrusive_ptr& expCtx, Timestamp ts)
: DocumentSource(expCtx), _resumeTimestamp(ts), _verifiedResumability(false) {}
DocumentSource::GetNextResult DocumentSourceShardCheckResumability::getNext() {
pExpCtx->checkForInterrupt();
auto nextInput = pSource->getNext();
if (_verifiedResumability)
return nextInput;
_verifiedResumability = true;
if (nextInput.isAdvanced()) {
auto doc = nextInput.getDocument();
auto receivedTimestamp = ResumeToken::parse(doc["_id"].getDocument()).getClusterTime();
if (receivedTimestamp == _resumeTimestamp) {
// Pass along the document, as the DocumentSourceEnsureResumeTokenPresent stage on the
// merger will need to see it.
return nextInput;
}
}
// If we make it here, we need to look up the first document in the oplog and compare it
// with the resume token.
auto firstEntryExpCtx = pExpCtx->copyWith(NamespaceString::kRsOplogNamespace);
auto matchSpec = BSON("$match" << BSONObj());
auto pipeline = uassertStatusOK(
pExpCtx->mongoProcessInterface->makePipeline({matchSpec}, firstEntryExpCtx));
if (auto first = pipeline->getNext()) {
auto firstOplogEntry = Value(*first);
uassert(40576,
"Resume of change stream was not possible, as the resume point may no longer "
"be in the oplog. ",
firstOplogEntry["ts"].getTimestamp() < _resumeTimestamp);
} else {
// Very unusual case: the oplog is empty. We can always resume. It should never be
// possible that the oplog is empty and we got a document matching the filter, however.
invariant(nextInput.isEOF());
}
// The query on the oplog above will overwrite the namespace in current op which is used in
// profiling, reset it back to the original namespace. This is a generic problem with any
// aggregation that involves sub-operations on different namespaces, and is being tracked by
// SERVER-31098.
{
stdx::lock_guard lk(*pExpCtx->opCtx->getClient());
CurOp::get(pExpCtx->opCtx)->setNS_inlock(pExpCtx->ns.ns());
}
return nextInput;
}
} // namespace mongo