diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2017-09-19 16:43:18 -0400 |
---|---|---|
committer | Bernard Gorman <bernard.gorman@gmail.com> | 2017-09-26 12:56:52 -0400 |
commit | 5c29ce7718b423ae23ba0eccf71249cf69d36d37 (patch) | |
tree | d5f9ca74111c814b6545cf88696efcaa38498d79 | |
parent | b3b44c1ecd30adaf7421ef9c93a237693a1fca06 (diff) | |
download | mongo-5c29ce7718b423ae23ba0eccf71249cf69d36d37.tar.gz |
SERVER-29141 Update AsyncResultsMerger to merge multiple change streams
-rw-r--r-- | src/mongo/s/query/async_results_merger.cpp | 110 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger.h | 14 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger_test.cpp | 149 |
3 files changed, 257 insertions, 16 deletions
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index 9801fb898ed..a9434158f40 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -32,6 +32,7 @@ #include "mongo/s/query/async_results_merger.h" +#include "mongo/bson/simple_bsonobj_comparator.h" #include "mongo/client/remote_command_targeter.h" #include "mongo/db/query/cursor_response.h" #include "mongo/db/query/getmore_request.h" @@ -49,6 +50,30 @@ namespace { // Maximum number of retries for network and replication notMaster errors (per host). const int kMaxNumFailedHostRetryAttempts = 3; +/** + * Returns the sort key out of the $sortKey metadata field in 'obj'. This object is of the form + * {'': 'firstSortKey', '': 'secondSortKey', ...}. + */ +BSONObj extractSortKey(BSONObj obj) { + auto key = obj[ClusterClientCursorParams::kSortKeyField]; + invariant(key.type() == BSONType::Object); + return key.Obj(); +} + +/** + * Returns an int less than 0 if 'leftSortKey' < 'rightSortKey', 0 if the two are equal, and an int + * > 0 if 'leftSortKey' > 'rightSortKey' according to the pattern 'sortKeyPattern'. + */ +int compareSortKeys(BSONObj leftSortKey, BSONObj rightSortKey, BSONObj sortKeyPattern) { + // This does not need to sort with a collator, since mongod has already mapped strings to their + // ICU comparison keys as part of the $sortKey meta projection. + const bool considerFieldName = false; + return leftSortKey.woCompare(rightSortKey, sortKeyPattern, considerFieldName); +} + +const BSONObj kChangeStreamSortSpec = + BSON("_id.clusterTime.ts" << 1 << "_id.uuid" << 1 << "_id.documentKey" << 1); + } // namespace AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx, @@ -66,7 +91,7 @@ AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx, // We don't check the return value of _addBatchToBuffer here; if there was an error, // it will be stored in the remote and the first call to ready() will return true. - _addBatchToBuffer(WithLock::withoutLock(), remoteIndex, remote.cursorResponse.getBatch()); + _addBatchToBuffer(WithLock::withoutLock(), remoteIndex, remote.cursorResponse); ++remoteIndex; } @@ -152,8 +177,11 @@ bool AsyncResultsMerger::_ready(WithLock lk) { return hasSort ? _readySorted(lk) : _readyUnsorted(lk); } -bool AsyncResultsMerger::_readySorted(WithLock) { - // Tailable cursors cannot have a sort. +bool AsyncResultsMerger::_readySorted(WithLock lk) { + if (_params->tailableMode == TailableMode::kTailableAndAwaitData) { + return _readySortedTailable(lk); + } + // Tailable non-awaitData cursors cannot have a sort. invariant(_params->tailableMode == TailableMode::kNormal); for (const auto& remote : _remotes) { @@ -165,6 +193,28 @@ bool AsyncResultsMerger::_readySorted(WithLock) { return true; } +bool AsyncResultsMerger::_readySortedTailable(WithLock) { + if (_mergeQueue.empty()) { + return false; + } + + auto smallestRemote = _mergeQueue.top(); + auto smallestResult = _remotes[smallestRemote].docBuffer.front(); + auto keyWeWantToReturn = extractSortKey(*smallestResult.getResult()); + for (const auto& remote : _remotes) { + if (!remote.promisedMinSortKey) { + // In order to merge sorted tailable cursors, we need this value to be populated. + return false; + } + if (compareSortKeys(keyWeWantToReturn, *remote.promisedMinSortKey, _params->sort) > 0) { + // The key we want to return is not guaranteed to be smaller than future results from + // this remote, so we can't yet return it. + return false; + } + } + return true; +} + bool AsyncResultsMerger::_readyUnsorted(WithLock) { bool allExhausted = true; for (const auto& remote : _remotes) { @@ -201,8 +251,8 @@ StatusWith<ClusterQueryResult> AsyncResultsMerger::nextReady() { } ClusterQueryResult AsyncResultsMerger::_nextReadySorted(WithLock) { - // Tailable cursors cannot have a sort. - invariant(_params->tailableMode == TailableMode::kNormal); + // Tailable non-awaitData cursors cannot have a sort. + invariant(_params->tailableMode != TailableMode::kTailable); if (_mergeQueue.empty()) { return {}; @@ -374,6 +424,38 @@ StatusWith<CursorResponse> AsyncResultsMerger::_parseCursorResponse( return std::move(cursorResponse); } +void AsyncResultsMerger::updateRemoteMetadata(RemoteCursorData* remote, + const CursorResponse& response) { + // Update the cursorId; it is sent as '0' when the cursor has been exhausted on the shard. + remote->cursorId = response.getCursorId(); + if (response.getLastOplogTimestamp() && !response.getLastOplogTimestamp()->isNull()) { + // We only expect to see this for change streams. + invariant( + SimpleBSONObjComparator::kInstance.evaluate(_params->sort == kChangeStreamSortSpec)); + + // Our new minimum promised sort key is the first key whose timestamp matches the most + // recent reported oplog timestamp. It should never be smaller than the previous min sort + // key for this remote, if one exists. + auto newPromisedMin = + BSON("" << *response.getLastOplogTimestamp() << "" << MINKEY << "" << MINKEY); + invariant(!remote->promisedMinSortKey || + compareSortKeys( + *remote->promisedMinSortKey, newPromisedMin, kChangeStreamSortSpec) <= 0); + + // The promised min sort key should never be smaller than any results returned. If the + // last entry in the batch is also the most recent entry in the oplog, then its sort key + // of {lastOplogTimestamp, uuid, docID} will be greater than the artificial promised min + // sort key of {lastOplogTimestamp, MINKEY, MINKEY}. + auto maxSortKeyFromResponse = + (response.getBatch().empty() ? BSONObj() : extractSortKey(response.getBatch().back())); + + remote->promisedMinSortKey = + (compareSortKeys(newPromisedMin, maxSortKeyFromResponse, kChangeStreamSortSpec) < 0 + ? maxSortKeyFromResponse.getOwned() + : newPromisedMin.getOwned()); + } +} + void AsyncResultsMerger::_handleBatchResponse(WithLock lk, CbData const& cbData, size_t remoteIndex) { @@ -448,7 +530,7 @@ void AsyncResultsMerger::_processBatchResults(WithLock lk, remote.cursorId = cursorResponse.getCursorId(); // Save the batch in the remote's buffer. - if (!_addBatchToBuffer(lk, remoteIndex, cursorResponse.getBatch())) { + if (!_addBatchToBuffer(lk, remoteIndex, cursorResponse)) { return; } @@ -470,9 +552,10 @@ void AsyncResultsMerger::_processBatchResults(WithLock lk, bool AsyncResultsMerger::_addBatchToBuffer(WithLock lk, size_t remoteIndex, - std::vector<BSONObj> const& batch) { + const CursorResponse& response) { auto& remote = _remotes[remoteIndex]; - for (const auto& obj : batch) { + updateRemoteMetadata(&remote, response); + for (const auto& obj : response.getBatch()) { // If there's a sort, we're expecting the remote node to have given us back a sort key. if (!_params->sort.isEmpty() && obj[ClusterClientCursorParams::kSortKeyField].type() != BSONType::Object) { @@ -491,7 +574,7 @@ bool AsyncResultsMerger::_addBatchToBuffer(WithLock lk, // If we're doing a sorted merge, then we have to make sure to put this remote onto the // merge queue. - if (!_params->sort.isEmpty() && !batch.empty()) { + if (!_params->sort.isEmpty() && !response.getBatch().empty()) { _mergeQueue.push(remoteIndex); } return true; @@ -611,12 +694,9 @@ bool AsyncResultsMerger::MergingComparator::operator()(const size_t& lhs, const const ClusterQueryResult& leftDoc = _remotes[lhs].docBuffer.front(); const ClusterQueryResult& rightDoc = _remotes[rhs].docBuffer.front(); - BSONObj leftDocKey = (*leftDoc.getResult())[ClusterClientCursorParams::kSortKeyField].Obj(); - BSONObj rightDocKey = (*rightDoc.getResult())[ClusterClientCursorParams::kSortKeyField].Obj(); - - // This does not need to sort with a collator, since mongod has already mapped strings to their - // ICU comparison keys as part of the $sortKey meta projection. - return leftDocKey.woCompare(rightDocKey, _sort, false /*considerFieldName*/) > 0; + return compareSortKeys(extractSortKey(*leftDoc.getResult()), + extractSortKey(*rightDoc.getResult()), + _sort) > 0; } } // namespace mongo diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h index 6a1ebb4ee8c..5068efc8db2 100644 --- a/src/mongo/s/query/async_results_merger.h +++ b/src/mongo/s/query/async_results_merger.h @@ -228,6 +228,12 @@ private: */ std::shared_ptr<Shard> getShard(); + // Used when merging tailable awaitData cursors in sorted order. In order to return any + // result to the client we have to know that no shard will ever return anything that sorts + // before it. This object represents a promise from the remote that it will never return a + // result with a sort key lower than this. + boost::optional<BSONObj> promisedMinSortKey; + // The cursor id for the remote cursor. If a remote cursor is not yet exhausted, this member // will be set to a valid non-zero cursor id. If a remote cursor is now exhausted, this // member will be set to zero. @@ -299,6 +305,7 @@ private: bool _ready(WithLock); bool _readySorted(WithLock); + bool _readySortedTailable(WithLock); bool _readyUnsorted(WithLock); // @@ -339,7 +346,7 @@ private: * Adds the batch of results to the RemoteCursorData. Returns false if there was an error * parsing the batch. */ - bool _addBatchToBuffer(WithLock, size_t remoteIndex, std::vector<BSONObj> const& batch); + bool _addBatchToBuffer(WithLock, size_t remoteIndex, const CursorResponse& response); /** * If there is a valid unsignaled event that has been requested via nextReady() and there are @@ -360,6 +367,11 @@ private: */ void _scheduleKillCursors(WithLock, OperationContext* opCtx); + /** + * Updates 'remote's metadata (e.g. the cursor id) based on information in 'response'. + */ + void updateRemoteMetadata(RemoteCursorData* remote, const CursorResponse& response); + OperationContext* _opCtx; executor::TaskExecutor* _executor; ClusterClientCursorParams* _params; diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp index 682cadc9a6f..1a9a2337095 100644 --- a/src/mongo/s/query/async_results_merger_test.cpp +++ b/src/mongo/s/query/async_results_merger_test.cpp @@ -1459,6 +1459,155 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) { CursorResponse::ResponseType::SubsequentResponse); } +TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneOrMoreRemotesHasNoOplogTimestamp) { + auto params = + stdx::make_unique<ClusterClientCursorParams>(_nss, UserNameIterator(), boost::none); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {})); + cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 456, {})); + params->remotes = std::move(cursors); + params->tailableMode = TailableMode::kTailableAndAwaitData; + params->sort = fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}"); + arm = stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), params.get()); + + auto readyEvent = unittest::assertGet(arm->nextEvent()); + + ASSERT_FALSE(arm->ready()); + + // Schedule one response with an oplog timestamp in it. + std::vector<CursorResponse> responses; + std::vector<BSONObj> batch1 = { + fromjson("{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: 1, documentKey: {_id: 1}}, " + "$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")}; + const Timestamp lastObservedFirstCursor = Timestamp(1, 6); + responses.emplace_back(_nss, CursorId(123), batch1, boost::none, lastObservedFirstCursor); + scheduleNetworkResponses(std::move(responses), + CursorResponse::ResponseType::SubsequentResponse); + + // Still shouldn't be ready, we don't have a guarantee from each shard. + ASSERT_FALSE(arm->ready()); + + // Schedule another response from the other shard. + responses.clear(); + std::vector<BSONObj> batch2 = { + fromjson("{_id: {clusterTime: {ts: Timestamp(1, 5)}, uuid: 1, documentKey: {_id: 2}}, " + "$sortKey: {'': Timestamp(1, 5), '': 1, '': 2}}")}; + const Timestamp lastObservedSecondCursor = Timestamp(1, 5); + responses.emplace_back(_nss, CursorId(456), batch2, boost::none, lastObservedSecondCursor); + scheduleNetworkResponses(std::move(responses), + CursorResponse::ResponseType::SubsequentResponse); + executor()->waitForEvent(readyEvent); + ASSERT_TRUE(arm->ready()); + ASSERT_BSONOBJ_EQ( + fromjson("{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: 1, documentKey: {_id: 1}}, " + "$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}"), + *unittest::assertGet(arm->nextReady()).getResult()); + ASSERT_BSONOBJ_EQ( + fromjson("{_id: {clusterTime: {ts: Timestamp(1, 5)}, uuid: 1, documentKey: {_id: 2}}, " + "$sortKey: {'': Timestamp(1, 5), '': 1, '': 2}}"), + *unittest::assertGet(arm->nextReady()).getResult()); + ASSERT_FALSE(arm->ready()); + + readyEvent = unittest::assertGet(arm->nextEvent()); + + // Clean up the cursors. + responses.clear(); + std::vector<BSONObj> batch3 = {}; + responses.emplace_back(_nss, CursorId(0), batch3); + scheduleNetworkResponses(std::move(responses), + CursorResponse::ResponseType::SubsequentResponse); + responses.clear(); + std::vector<BSONObj> batch4 = {}; + responses.emplace_back(_nss, CursorId(0), batch4); + scheduleNetworkResponses(std::move(responses), + CursorResponse::ResponseType::SubsequentResponse); +} + +TEST_F(AsyncResultsMergerTest, + SortedTailableCursorNotReadyIfOneOrMoreRemotesHasNullOplogTimestamp) { + auto params = + stdx::make_unique<ClusterClientCursorParams>(_nss, UserNameIterator(), boost::none); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back( + kTestShardIds[0], + kTestShardHosts[0], + CursorResponse( + _nss, + 123, + {fromjson("{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: 1, documentKey: {_id: 1}}, " + "$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")}, + boost::none, + Timestamp(1, 5))); + cursors.emplace_back(kTestShardIds[1], + kTestShardHosts[1], + CursorResponse(_nss, 456, {}, boost::none, Timestamp())); + params->remotes = std::move(cursors); + params->tailableMode = TailableMode::kTailableAndAwaitData; + params->sort = fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}"); + arm = stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), params.get()); + + auto readyEvent = unittest::assertGet(arm->nextEvent()); + + ASSERT_FALSE(arm->ready()); + + std::vector<CursorResponse> responses; + std::vector<BSONObj> batch3 = {}; + responses.emplace_back(_nss, CursorId(0), batch3, boost::none, Timestamp(1, 8)); + scheduleNetworkResponses(std::move(responses), + CursorResponse::ResponseType::SubsequentResponse); + executor()->waitForEvent(unittest::assertGet(arm->nextEvent())); + ASSERT_TRUE(arm->ready()); + ASSERT_BSONOBJ_EQ( + fromjson("{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: 1, documentKey: {_id: 1}}, " + "$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}"), + *unittest::assertGet(arm->nextReady()).getResult()); + ASSERT_FALSE(arm->ready()); + + readyEvent = unittest::assertGet(arm->nextEvent()); + + // Clean up. + responses.clear(); + std::vector<BSONObj> batch4 = {}; + responses.emplace_back(_nss, CursorId(0), batch4); + scheduleNetworkResponses(std::move(responses), + CursorResponse::ResponseType::SubsequentResponse); +} + +TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneRemoteHasLowerOplogTime) { + auto params = + stdx::make_unique<ClusterClientCursorParams>(_nss, UserNameIterator(), boost::none); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + Timestamp tooLow = Timestamp(1, 2); + cursors.emplace_back( + kTestShardIds[0], + kTestShardHosts[0], + CursorResponse( + _nss, + 123, + {fromjson("{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: 1, documentKey: {_id: 1}}, " + "$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")}, + boost::none, + Timestamp(1, 5))); + cursors.emplace_back( + kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 456, {}, boost::none, tooLow)); + params->remotes = std::move(cursors); + params->tailableMode = TailableMode::kTailableAndAwaitData; + params->sort = fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}"); + arm = stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), params.get()); + + auto readyEvent = unittest::assertGet(arm->nextEvent()); + + ASSERT_FALSE(arm->ready()); + + // Clean up the cursors. + std::vector<CursorResponse> responses; + responses.emplace_back(_nss, CursorId(0), std::vector<BSONObj>{}); + scheduleNetworkResponses(std::move(responses), + CursorResponse::ResponseType::SubsequentResponse); + auto killEvent = arm->kill(operationContext()); + executor()->waitForEvent(killEvent); +} + TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutTailableCantHaveMaxTime) { BSONObj findCmd = fromjson("{find: 'testcoll'}"); std::vector<ClusterClientCursorParams::RemoteCursor> cursors; |