summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2017-09-19 16:43:18 -0400
committerBernard Gorman <bernard.gorman@gmail.com>2017-09-26 12:56:52 -0400
commit5c29ce7718b423ae23ba0eccf71249cf69d36d37 (patch)
treed5f9ca74111c814b6545cf88696efcaa38498d79
parentb3b44c1ecd30adaf7421ef9c93a237693a1fca06 (diff)
downloadmongo-5c29ce7718b423ae23ba0eccf71249cf69d36d37.tar.gz
SERVER-29141 Update AsyncResultsMerger to merge multiple change streams
-rw-r--r--src/mongo/s/query/async_results_merger.cpp110
-rw-r--r--src/mongo/s/query/async_results_merger.h14
-rw-r--r--src/mongo/s/query/async_results_merger_test.cpp149
3 files changed, 257 insertions, 16 deletions
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index 9801fb898ed..a9434158f40 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -32,6 +32,7 @@
#include "mongo/s/query/async_results_merger.h"
+#include "mongo/bson/simple_bsonobj_comparator.h"
#include "mongo/client/remote_command_targeter.h"
#include "mongo/db/query/cursor_response.h"
#include "mongo/db/query/getmore_request.h"
@@ -49,6 +50,30 @@ namespace {
// Maximum number of retries for network and replication notMaster errors (per host).
const int kMaxNumFailedHostRetryAttempts = 3;
+/**
+ * Returns the sort key out of the $sortKey metadata field in 'obj'. This object is of the form
+ * {'': 'firstSortKey', '': 'secondSortKey', ...}.
+ */
+BSONObj extractSortKey(BSONObj obj) {
+ auto key = obj[ClusterClientCursorParams::kSortKeyField];
+ invariant(key.type() == BSONType::Object);
+ return key.Obj();
+}
+
+/**
+ * Returns an int less than 0 if 'leftSortKey' < 'rightSortKey', 0 if the two are equal, and an int
+ * > 0 if 'leftSortKey' > 'rightSortKey' according to the pattern 'sortKeyPattern'.
+ */
+int compareSortKeys(BSONObj leftSortKey, BSONObj rightSortKey, BSONObj sortKeyPattern) {
+ // This does not need to sort with a collator, since mongod has already mapped strings to their
+ // ICU comparison keys as part of the $sortKey meta projection.
+ const bool considerFieldName = false;
+ return leftSortKey.woCompare(rightSortKey, sortKeyPattern, considerFieldName);
+}
+
+const BSONObj kChangeStreamSortSpec =
+ BSON("_id.clusterTime.ts" << 1 << "_id.uuid" << 1 << "_id.documentKey" << 1);
+
} // namespace
AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx,
@@ -66,7 +91,7 @@ AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx,
// We don't check the return value of _addBatchToBuffer here; if there was an error,
// it will be stored in the remote and the first call to ready() will return true.
- _addBatchToBuffer(WithLock::withoutLock(), remoteIndex, remote.cursorResponse.getBatch());
+ _addBatchToBuffer(WithLock::withoutLock(), remoteIndex, remote.cursorResponse);
++remoteIndex;
}
@@ -152,8 +177,11 @@ bool AsyncResultsMerger::_ready(WithLock lk) {
return hasSort ? _readySorted(lk) : _readyUnsorted(lk);
}
-bool AsyncResultsMerger::_readySorted(WithLock) {
- // Tailable cursors cannot have a sort.
+bool AsyncResultsMerger::_readySorted(WithLock lk) {
+ if (_params->tailableMode == TailableMode::kTailableAndAwaitData) {
+ return _readySortedTailable(lk);
+ }
+ // Tailable non-awaitData cursors cannot have a sort.
invariant(_params->tailableMode == TailableMode::kNormal);
for (const auto& remote : _remotes) {
@@ -165,6 +193,28 @@ bool AsyncResultsMerger::_readySorted(WithLock) {
return true;
}
+bool AsyncResultsMerger::_readySortedTailable(WithLock) {
+ if (_mergeQueue.empty()) {
+ return false;
+ }
+
+ auto smallestRemote = _mergeQueue.top();
+ auto smallestResult = _remotes[smallestRemote].docBuffer.front();
+ auto keyWeWantToReturn = extractSortKey(*smallestResult.getResult());
+ for (const auto& remote : _remotes) {
+ if (!remote.promisedMinSortKey) {
+ // In order to merge sorted tailable cursors, we need this value to be populated.
+ return false;
+ }
+ if (compareSortKeys(keyWeWantToReturn, *remote.promisedMinSortKey, _params->sort) > 0) {
+ // The key we want to return is not guaranteed to be smaller than future results from
+ // this remote, so we can't yet return it.
+ return false;
+ }
+ }
+ return true;
+}
+
bool AsyncResultsMerger::_readyUnsorted(WithLock) {
bool allExhausted = true;
for (const auto& remote : _remotes) {
@@ -201,8 +251,8 @@ StatusWith<ClusterQueryResult> AsyncResultsMerger::nextReady() {
}
ClusterQueryResult AsyncResultsMerger::_nextReadySorted(WithLock) {
- // Tailable cursors cannot have a sort.
- invariant(_params->tailableMode == TailableMode::kNormal);
+ // Tailable non-awaitData cursors cannot have a sort.
+ invariant(_params->tailableMode != TailableMode::kTailable);
if (_mergeQueue.empty()) {
return {};
@@ -374,6 +424,38 @@ StatusWith<CursorResponse> AsyncResultsMerger::_parseCursorResponse(
return std::move(cursorResponse);
}
+void AsyncResultsMerger::updateRemoteMetadata(RemoteCursorData* remote,
+ const CursorResponse& response) {
+ // Update the cursorId; it is sent as '0' when the cursor has been exhausted on the shard.
+ remote->cursorId = response.getCursorId();
+ if (response.getLastOplogTimestamp() && !response.getLastOplogTimestamp()->isNull()) {
+ // We only expect to see this for change streams.
+ invariant(
+ SimpleBSONObjComparator::kInstance.evaluate(_params->sort == kChangeStreamSortSpec));
+
+ // Our new minimum promised sort key is the first key whose timestamp matches the most
+ // recent reported oplog timestamp. It should never be smaller than the previous min sort
+ // key for this remote, if one exists.
+ auto newPromisedMin =
+ BSON("" << *response.getLastOplogTimestamp() << "" << MINKEY << "" << MINKEY);
+ invariant(!remote->promisedMinSortKey ||
+ compareSortKeys(
+ *remote->promisedMinSortKey, newPromisedMin, kChangeStreamSortSpec) <= 0);
+
+ // The promised min sort key should never be smaller than any results returned. If the
+ // last entry in the batch is also the most recent entry in the oplog, then its sort key
+ // of {lastOplogTimestamp, uuid, docID} will be greater than the artificial promised min
+ // sort key of {lastOplogTimestamp, MINKEY, MINKEY}.
+ auto maxSortKeyFromResponse =
+ (response.getBatch().empty() ? BSONObj() : extractSortKey(response.getBatch().back()));
+
+ remote->promisedMinSortKey =
+ (compareSortKeys(newPromisedMin, maxSortKeyFromResponse, kChangeStreamSortSpec) < 0
+ ? maxSortKeyFromResponse.getOwned()
+ : newPromisedMin.getOwned());
+ }
+}
+
void AsyncResultsMerger::_handleBatchResponse(WithLock lk,
CbData const& cbData,
size_t remoteIndex) {
@@ -448,7 +530,7 @@ void AsyncResultsMerger::_processBatchResults(WithLock lk,
remote.cursorId = cursorResponse.getCursorId();
// Save the batch in the remote's buffer.
- if (!_addBatchToBuffer(lk, remoteIndex, cursorResponse.getBatch())) {
+ if (!_addBatchToBuffer(lk, remoteIndex, cursorResponse)) {
return;
}
@@ -470,9 +552,10 @@ void AsyncResultsMerger::_processBatchResults(WithLock lk,
bool AsyncResultsMerger::_addBatchToBuffer(WithLock lk,
size_t remoteIndex,
- std::vector<BSONObj> const& batch) {
+ const CursorResponse& response) {
auto& remote = _remotes[remoteIndex];
- for (const auto& obj : batch) {
+ updateRemoteMetadata(&remote, response);
+ for (const auto& obj : response.getBatch()) {
// If there's a sort, we're expecting the remote node to have given us back a sort key.
if (!_params->sort.isEmpty() &&
obj[ClusterClientCursorParams::kSortKeyField].type() != BSONType::Object) {
@@ -491,7 +574,7 @@ bool AsyncResultsMerger::_addBatchToBuffer(WithLock lk,
// If we're doing a sorted merge, then we have to make sure to put this remote onto the
// merge queue.
- if (!_params->sort.isEmpty() && !batch.empty()) {
+ if (!_params->sort.isEmpty() && !response.getBatch().empty()) {
_mergeQueue.push(remoteIndex);
}
return true;
@@ -611,12 +694,9 @@ bool AsyncResultsMerger::MergingComparator::operator()(const size_t& lhs, const
const ClusterQueryResult& leftDoc = _remotes[lhs].docBuffer.front();
const ClusterQueryResult& rightDoc = _remotes[rhs].docBuffer.front();
- BSONObj leftDocKey = (*leftDoc.getResult())[ClusterClientCursorParams::kSortKeyField].Obj();
- BSONObj rightDocKey = (*rightDoc.getResult())[ClusterClientCursorParams::kSortKeyField].Obj();
-
- // This does not need to sort with a collator, since mongod has already mapped strings to their
- // ICU comparison keys as part of the $sortKey meta projection.
- return leftDocKey.woCompare(rightDocKey, _sort, false /*considerFieldName*/) > 0;
+ return compareSortKeys(extractSortKey(*leftDoc.getResult()),
+ extractSortKey(*rightDoc.getResult()),
+ _sort) > 0;
}
} // namespace mongo
diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h
index 6a1ebb4ee8c..5068efc8db2 100644
--- a/src/mongo/s/query/async_results_merger.h
+++ b/src/mongo/s/query/async_results_merger.h
@@ -228,6 +228,12 @@ private:
*/
std::shared_ptr<Shard> getShard();
+ // Used when merging tailable awaitData cursors in sorted order. In order to return any
+ // result to the client we have to know that no shard will ever return anything that sorts
+ // before it. This object represents a promise from the remote that it will never return a
+ // result with a sort key lower than this.
+ boost::optional<BSONObj> promisedMinSortKey;
+
// The cursor id for the remote cursor. If a remote cursor is not yet exhausted, this member
// will be set to a valid non-zero cursor id. If a remote cursor is now exhausted, this
// member will be set to zero.
@@ -299,6 +305,7 @@ private:
bool _ready(WithLock);
bool _readySorted(WithLock);
+ bool _readySortedTailable(WithLock);
bool _readyUnsorted(WithLock);
//
@@ -339,7 +346,7 @@ private:
* Adds the batch of results to the RemoteCursorData. Returns false if there was an error
* parsing the batch.
*/
- bool _addBatchToBuffer(WithLock, size_t remoteIndex, std::vector<BSONObj> const& batch);
+ bool _addBatchToBuffer(WithLock, size_t remoteIndex, const CursorResponse& response);
/**
* If there is a valid unsignaled event that has been requested via nextReady() and there are
@@ -360,6 +367,11 @@ private:
*/
void _scheduleKillCursors(WithLock, OperationContext* opCtx);
+ /**
+ * Updates 'remote's metadata (e.g. the cursor id) based on information in 'response'.
+ */
+ void updateRemoteMetadata(RemoteCursorData* remote, const CursorResponse& response);
+
OperationContext* _opCtx;
executor::TaskExecutor* _executor;
ClusterClientCursorParams* _params;
diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp
index 682cadc9a6f..1a9a2337095 100644
--- a/src/mongo/s/query/async_results_merger_test.cpp
+++ b/src/mongo/s/query/async_results_merger_test.cpp
@@ -1459,6 +1459,155 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) {
CursorResponse::ResponseType::SubsequentResponse);
}
+TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneOrMoreRemotesHasNoOplogTimestamp) {
+ auto params =
+ stdx::make_unique<ClusterClientCursorParams>(_nss, UserNameIterator(), boost::none);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 456, {}));
+ params->remotes = std::move(cursors);
+ params->tailableMode = TailableMode::kTailableAndAwaitData;
+ params->sort = fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}");
+ arm = stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), params.get());
+
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
+
+ ASSERT_FALSE(arm->ready());
+
+ // Schedule one response with an oplog timestamp in it.
+ std::vector<CursorResponse> responses;
+ std::vector<BSONObj> batch1 = {
+ fromjson("{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: 1, documentKey: {_id: 1}}, "
+ "$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")};
+ const Timestamp lastObservedFirstCursor = Timestamp(1, 6);
+ responses.emplace_back(_nss, CursorId(123), batch1, boost::none, lastObservedFirstCursor);
+ scheduleNetworkResponses(std::move(responses),
+ CursorResponse::ResponseType::SubsequentResponse);
+
+ // Still shouldn't be ready, we don't have a guarantee from each shard.
+ ASSERT_FALSE(arm->ready());
+
+ // Schedule another response from the other shard.
+ responses.clear();
+ std::vector<BSONObj> batch2 = {
+ fromjson("{_id: {clusterTime: {ts: Timestamp(1, 5)}, uuid: 1, documentKey: {_id: 2}}, "
+ "$sortKey: {'': Timestamp(1, 5), '': 1, '': 2}}")};
+ const Timestamp lastObservedSecondCursor = Timestamp(1, 5);
+ responses.emplace_back(_nss, CursorId(456), batch2, boost::none, lastObservedSecondCursor);
+ scheduleNetworkResponses(std::move(responses),
+ CursorResponse::ResponseType::SubsequentResponse);
+ executor()->waitForEvent(readyEvent);
+ ASSERT_TRUE(arm->ready());
+ ASSERT_BSONOBJ_EQ(
+ fromjson("{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: 1, documentKey: {_id: 1}}, "
+ "$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}"),
+ *unittest::assertGet(arm->nextReady()).getResult());
+ ASSERT_BSONOBJ_EQ(
+ fromjson("{_id: {clusterTime: {ts: Timestamp(1, 5)}, uuid: 1, documentKey: {_id: 2}}, "
+ "$sortKey: {'': Timestamp(1, 5), '': 1, '': 2}}"),
+ *unittest::assertGet(arm->nextReady()).getResult());
+ ASSERT_FALSE(arm->ready());
+
+ readyEvent = unittest::assertGet(arm->nextEvent());
+
+ // Clean up the cursors.
+ responses.clear();
+ std::vector<BSONObj> batch3 = {};
+ responses.emplace_back(_nss, CursorId(0), batch3);
+ scheduleNetworkResponses(std::move(responses),
+ CursorResponse::ResponseType::SubsequentResponse);
+ responses.clear();
+ std::vector<BSONObj> batch4 = {};
+ responses.emplace_back(_nss, CursorId(0), batch4);
+ scheduleNetworkResponses(std::move(responses),
+ CursorResponse::ResponseType::SubsequentResponse);
+}
+
+TEST_F(AsyncResultsMergerTest,
+ SortedTailableCursorNotReadyIfOneOrMoreRemotesHasNullOplogTimestamp) {
+ auto params =
+ stdx::make_unique<ClusterClientCursorParams>(_nss, UserNameIterator(), boost::none);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(
+ kTestShardIds[0],
+ kTestShardHosts[0],
+ CursorResponse(
+ _nss,
+ 123,
+ {fromjson("{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: 1, documentKey: {_id: 1}}, "
+ "$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")},
+ boost::none,
+ Timestamp(1, 5)));
+ cursors.emplace_back(kTestShardIds[1],
+ kTestShardHosts[1],
+ CursorResponse(_nss, 456, {}, boost::none, Timestamp()));
+ params->remotes = std::move(cursors);
+ params->tailableMode = TailableMode::kTailableAndAwaitData;
+ params->sort = fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}");
+ arm = stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), params.get());
+
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
+
+ ASSERT_FALSE(arm->ready());
+
+ std::vector<CursorResponse> responses;
+ std::vector<BSONObj> batch3 = {};
+ responses.emplace_back(_nss, CursorId(0), batch3, boost::none, Timestamp(1, 8));
+ scheduleNetworkResponses(std::move(responses),
+ CursorResponse::ResponseType::SubsequentResponse);
+ executor()->waitForEvent(unittest::assertGet(arm->nextEvent()));
+ ASSERT_TRUE(arm->ready());
+ ASSERT_BSONOBJ_EQ(
+ fromjson("{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: 1, documentKey: {_id: 1}}, "
+ "$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}"),
+ *unittest::assertGet(arm->nextReady()).getResult());
+ ASSERT_FALSE(arm->ready());
+
+ readyEvent = unittest::assertGet(arm->nextEvent());
+
+ // Clean up.
+ responses.clear();
+ std::vector<BSONObj> batch4 = {};
+ responses.emplace_back(_nss, CursorId(0), batch4);
+ scheduleNetworkResponses(std::move(responses),
+ CursorResponse::ResponseType::SubsequentResponse);
+}
+
+TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneRemoteHasLowerOplogTime) {
+ auto params =
+ stdx::make_unique<ClusterClientCursorParams>(_nss, UserNameIterator(), boost::none);
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ Timestamp tooLow = Timestamp(1, 2);
+ cursors.emplace_back(
+ kTestShardIds[0],
+ kTestShardHosts[0],
+ CursorResponse(
+ _nss,
+ 123,
+ {fromjson("{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: 1, documentKey: {_id: 1}}, "
+ "$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")},
+ boost::none,
+ Timestamp(1, 5)));
+ cursors.emplace_back(
+ kTestShardIds[1], kTestShardHosts[1], CursorResponse(_nss, 456, {}, boost::none, tooLow));
+ params->remotes = std::move(cursors);
+ params->tailableMode = TailableMode::kTailableAndAwaitData;
+ params->sort = fromjson("{'_id.clusterTime.ts': 1, '_id.uuid': 1, '_id.documentKey': 1}");
+ arm = stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), params.get());
+
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
+
+ ASSERT_FALSE(arm->ready());
+
+ // Clean up the cursors.
+ std::vector<CursorResponse> responses;
+ responses.emplace_back(_nss, CursorId(0), std::vector<BSONObj>{});
+ scheduleNetworkResponses(std::move(responses),
+ CursorResponse::ResponseType::SubsequentResponse);
+ auto killEvent = arm->kill(operationContext());
+ executor()->waitForEvent(killEvent);
+}
+
TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutTailableCantHaveMaxTime) {
BSONObj findCmd = fromjson("{find: 'testcoll'}");
std::vector<ClusterClientCursorParams::RemoteCursor> cursors;