diff options
author | Eric Milkie <milkie@10gen.com> | 2015-10-05 10:39:54 -0400 |
---|---|---|
committer | Eric Milkie <milkie@10gen.com> | 2015-10-06 09:26:21 -0400 |
commit | a8e6c0ebc70fad95a5e05281076cfa4b6db500c2 (patch) | |
tree | fded1cf6e1af1957f108cb83f4a9406c9f019423 /src | |
parent | 7e699ba3b1de8b7081d4e7ec8c5e0ebbde8bb178 (diff) | |
download | mongo-a8e6c0ebc70fad95a5e05281076cfa4b6db500c2.tar.gz |
SERVER-20604 unblock AwaitData queries if commit level needs updating downstream.
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/catalog/collection.h | 10 | ||||
-rw-r--r-- | src/mongo/db/commands/getmore_cmd.cpp | 53 | ||||
-rw-r--r-- | src/mongo/db/query/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/query/getmore_request.cpp | 22 | ||||
-rw-r--r-- | src/mongo/db/query/getmore_request.h | 6 | ||||
-rw-r--r-- | src/mongo/db/query/getmore_request_test.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.h | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.h | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_mock.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_mock.h | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 2 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger.cpp | 3 | ||||
-rw-r--r-- | src/mongo/s/strategy.cpp | 2 | ||||
-rw-r--r-- | src/mongo/shell/shardingtest.js | 2 |
18 files changed, 95 insertions, 37 deletions
diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h index 49276f55f1c..d5956382f91 100644 --- a/src/mongo/db/catalog/collection.h +++ b/src/mongo/db/catalog/collection.h @@ -407,6 +407,11 @@ public: _minVisibleSnapshot = name; } + /** + * Notify (capped collection) waiters of data changes, like an insert. + */ + void notifyCappedWaitersIfNeeded(); + private: /** * Returns a non-ok Status if document does not pass this collection's validator. @@ -428,11 +433,6 @@ private: Status aboutToDeleteCapped(OperationContext* txn, const RecordId& loc, RecordData data); /** - * Notify (capped collection) waiters of data changes, like an insert. - */ - void notifyCappedWaitersIfNeeded(); - - /** * same semantics as insertDocument, but doesn't do: * - some user error checks * - adjust padding diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index 34aa1615810..b349093e212 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -300,27 +300,38 @@ public: // If this is an await data cursor, and we hit EOF without generating any results, then // we block waiting for new data to arrive. if (isCursorAwaitData(cursor) && state == PlanExecutor::IS_EOF && numResults == 0) { - // Save the PlanExecutor and drop our locks. - exec->saveState(); - ctx.reset(); - - // Block waiting for data. - Microseconds timeout(CurOp::get(txn)->getRemainingMaxTimeMicros()); - notifier->wait(notifierVersion, timeout); - notifier.reset(); - - // Set expected latency to match wait time. This makes sure the logs aren't spammed - // by awaitData queries that exceed slowms due to blocking on the CappedInsertNotifier. - CurOp::get(txn)->setExpectedLatencyMs(durationCount<Milliseconds>(timeout)); - - ctx.reset(new AutoGetCollectionForRead(txn, request.nss)); - exec->restoreState(); - - // We woke up because either the timed_wait expired, or there was more data. Either - // way, attempt to generate another batch of results. - batchStatus = generateBatch(cursor, request, &nextBatch, &state, &numResults); - if (!batchStatus.isOK()) { - return appendCommandStatus(result, batchStatus); + auto replCoord = repl::ReplicationCoordinator::get(txn); + // Return immediately if we need to update the commit time. + if (request.lastKnownCommittedOpTime == replCoord->getLastCommittedOpTime()) { + // Retrieve the notifier which we will wait on until new data arrives. We make sure + // to do this in the lock because once we drop the lock it is possible for the + // collection to become invalid. The notifier itself will outlive the collection if + // the collection is dropped, as we keep a shared_ptr to it. + auto notifier = ctx->getCollection()->getCappedInsertNotifier(); + + // Save the PlanExecutor and drop our locks. + exec->saveState(); + ctx.reset(); + + // Block waiting for data. + Microseconds timeout(CurOp::get(txn)->getRemainingMaxTimeMicros()); + notifier->wait(notifierVersion, timeout); + notifier.reset(); + + // Set expected latency to match wait time. This makes sure the logs aren't spammed + // by awaitData queries that exceed slowms due to blocking on the + // CappedInsertNotifier. + CurOp::get(txn)->setExpectedLatencyMs(durationCount<Milliseconds>(timeout)); + + ctx.reset(new AutoGetCollectionForRead(txn, request.nss)); + exec->restoreState(); + + // We woke up because either the timed_wait expired, or there was more data. Either + // way, attempt to generate another batch of results. + batchStatus = generateBatch(cursor, request, &nextBatch, &state, &numResults); + if (!batchStatus.isOK()) { + return appendCommandStatus(result, batchStatus); + } } } diff --git a/src/mongo/db/query/SConscript b/src/mongo/db/query/SConscript index dfcb8f9265c..f4a9a1c7b02 100644 --- a/src/mongo/db/query/SConscript +++ b/src/mongo/db/query/SConscript @@ -121,8 +121,10 @@ env.Library( ], LIBDEPS=[ '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/bson/util/bson_extract', '$BUILD_DIR/mongo/db/common', '$BUILD_DIR/mongo/db/namespace_string', + '$BUILD_DIR/mongo/db/repl/optime', '$BUILD_DIR/mongo/rpc/command_status', ] ) diff --git a/src/mongo/db/query/getmore_request.cpp b/src/mongo/db/query/getmore_request.cpp index 829a3740ea2..9f388d2cbbd 100644 --- a/src/mongo/db/query/getmore_request.cpp +++ b/src/mongo/db/query/getmore_request.cpp @@ -34,6 +34,7 @@ #include <boost/optional.hpp> +#include "mongo/bson/util/bson_extract.h" #include "mongo/db/namespace_string.h" #include "mongo/util/assert_util.h" #include "mongo/util/stringutils.h" @@ -46,6 +47,7 @@ const char kCollectionField[] = "collection"; const char kBatchSizeField[] = "batchSize"; const char kMaxTimeMSField[] = "maxTimeMS"; const char kTermField[] = "term"; +const char kLastKnownCommittedOpTimeField[] = "lastKnownCommittedOpTime"; } // namespace @@ -56,8 +58,13 @@ GetMoreRequest::GetMoreRequest() : cursorid(0), batchSize(0) {} GetMoreRequest::GetMoreRequest(NamespaceString namespaceString, CursorId id, boost::optional<long long> sizeOfBatch, - boost::optional<long long> term) - : nss(std::move(namespaceString)), cursorid(id), batchSize(sizeOfBatch), term(term) {} + boost::optional<long long> term, + boost::optional<repl::OpTime> lastKnownCommittedOpTime) + : nss(std::move(namespaceString)), + cursorid(id), + batchSize(sizeOfBatch), + term(term), + lastKnownCommittedOpTime(lastKnownCommittedOpTime) {} Status GetMoreRequest::isValid() const { if (!nss.isValid()) { @@ -98,6 +105,7 @@ StatusWith<GetMoreRequest> GetMoreRequest::parseFromBSON(const std::string& dbna // Optional fields. boost::optional<long long> batchSize; boost::optional<long long> term; + boost::optional<repl::OpTime> lastKnownCommittedOpTime; for (BSONElement el : cmdObj) { const char* fieldName = el.fieldName(); @@ -133,6 +141,13 @@ StatusWith<GetMoreRequest> GetMoreRequest::parseFromBSON(const std::string& dbna str::stream() << "Field 'term' must be of type NumberLong in: " << cmdObj}; } term = el.Long(); + } else if (str::equals(fieldName, kLastKnownCommittedOpTimeField)) { + repl::OpTime ot; + Status status = bsonExtractOpTimeField(el.wrap(), kLastKnownCommittedOpTimeField, &ot); + if (!status.isOK()) { + return status; + } + lastKnownCommittedOpTime = ot; } else if (!str::startsWith(fieldName, "$")) { return {ErrorCodes::FailedToParse, str::stream() << "Failed to parse: " << cmdObj << ". " @@ -150,7 +165,8 @@ StatusWith<GetMoreRequest> GetMoreRequest::parseFromBSON(const std::string& dbna str::stream() << "Field 'collection' missing in: " << cmdObj}; } - GetMoreRequest request(NamespaceString(*fullns), *cursorid, batchSize, term); + GetMoreRequest request( + NamespaceString(*fullns), *cursorid, batchSize, term, lastKnownCommittedOpTime); Status validStatus = request.isValid(); if (!validStatus.isOK()) { return validStatus; diff --git a/src/mongo/db/query/getmore_request.h b/src/mongo/db/query/getmore_request.h index 972fea15012..56a1f2031b0 100644 --- a/src/mongo/db/query/getmore_request.h +++ b/src/mongo/db/query/getmore_request.h @@ -52,7 +52,8 @@ struct GetMoreRequest { GetMoreRequest(NamespaceString namespaceString, CursorId id, boost::optional<long long> sizeOfBatch, - boost::optional<long long> term); + boost::optional<long long> term, + boost::optional<repl::OpTime> lastKnownCommittedOpTime); /** * Construct a GetMoreRequest from the command specification and db name. @@ -78,6 +79,9 @@ struct GetMoreRequest { // Only internal queries from replication will typically have a term. const boost::optional<long long> term; + // Only internal queries from replication will have a last known committed optime. + const boost::optional<repl::OpTime> lastKnownCommittedOpTime; + private: /** * Returns a non-OK status if there are semantic errors in the parsed request diff --git a/src/mongo/db/query/getmore_request_test.cpp b/src/mongo/db/query/getmore_request_test.cpp index d370551b4e7..de88e3ffc7f 100644 --- a/src/mongo/db/query/getmore_request_test.cpp +++ b/src/mongo/db/query/getmore_request_test.cpp @@ -186,7 +186,7 @@ TEST(GetMoreRequestTest, parseFromBSONIgnoreMaxTimeMS) { } TEST(GetMoreRequestTest, toBSONHasBatchSize) { - GetMoreRequest request(NamespaceString("testdb.testcoll"), 123, 99, boost::none); + GetMoreRequest request(NamespaceString("testdb.testcoll"), 123, 99, boost::none, boost::none); BSONObj requestObj = request.toBSON(); BSONObj expectedRequest = BSON("getMore" << CursorId(123) << "collection" << "testcoll" @@ -195,7 +195,8 @@ TEST(GetMoreRequestTest, toBSONHasBatchSize) { } TEST(GetMoreRequestTest, toBSONMissingMatchSize) { - GetMoreRequest request(NamespaceString("testdb.testcoll"), 123, boost::none, boost::none); + GetMoreRequest request( + NamespaceString("testdb.testcoll"), 123, boost::none, boost::none, boost::none); BSONObj requestObj = request.toBSON(); BSONObj expectedRequest = BSON("getMore" << CursorId(123) << "collection" << "testcoll"); @@ -203,7 +204,7 @@ TEST(GetMoreRequestTest, toBSONMissingMatchSize) { } TEST(GetMoreRequestTest, toBSONHasTerm) { - GetMoreRequest request(NamespaceString("testdb.testcoll"), 123, 99, 1); + GetMoreRequest request(NamespaceString("testdb.testcoll"), 123, 99, 1, boost::none); BSONObj requestObj = request.toBSON(); BSONObj expectedRequest = BSON("getMore" << CursorId(123) << "collection" << "testcoll" diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 40afe9f7356..4e4a69385bd 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -557,6 +557,7 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& bob->append("maxTimeMS", durationCount<Milliseconds>(fetcherMaxTimeMS)); if (receivedMetadata) { bob->append("term", _replCoord->getTerm()); + _replCoord->getLastCommittedOpTime().append(bob, "lastKnownCommittedOpTime"); } } diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index ffa20206be2..16c9afe5289 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -920,6 +920,11 @@ void oplogCheckCloseDatabase(OperationContext* txn, Database* db) { _localOplogCollection = nullptr; } +void signalOplogWaiters() { + if (_localOplogCollection) { + _localOplogCollection->notifyCappedWaitersIfNeeded(); + } +} MONGO_EXPORT_STARTUP_SERVER_PARAMETER(replSnapshotThreadThrottleMicros, int, 1000); diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index cfde2ce8ccd..764d1cfe1c0 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -145,5 +145,11 @@ void setNewTimestamp(const Timestamp& newTime); * Detects the current replication mode and sets the "_oplogCollectionName" accordingly. */ void setOplogCollectionName(); + +/** + * Signal any waiting AwaitData queries on the oplog that there is new data or metadata available. + */ +void signalOplogWaiters(); + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h index 4114ba3a18c..4b5364f3424 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state.h +++ b/src/mongo/db/repl/replication_coordinator_external_state.h @@ -217,6 +217,8 @@ public: * Returns whether or not the SnapshotThread is active. */ virtual bool snapshotsEnabled() const = 0; + + virtual void notifyOplogMetadataWaiters() = 0; }; } // namespace repl diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 215e6d96e0a..3aa445c029a 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -388,5 +388,10 @@ void ReplicationCoordinatorExternalStateImpl::forceSnapshotCreation() { bool ReplicationCoordinatorExternalStateImpl::snapshotsEnabled() const { return _snapshotThread != nullptr; } + +void ReplicationCoordinatorExternalStateImpl::notifyOplogMetadataWaiters() { + signalOplogWaiters(); +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h index 2b1687507b0..8dce3dbb632 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h @@ -75,6 +75,7 @@ public: void updateCommittedSnapshot(SnapshotName newCommitPoint) final; void forceSnapshotCreation() final; virtual bool snapshotsEnabled() const; + virtual void notifyOplogMetadataWaiters(); std::string getNextOpContextThreadName(); diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp index c4b4c5458aa..abf8b7ad0b1 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp @@ -214,6 +214,8 @@ bool ReplicationCoordinatorExternalStateMock::snapshotsEnabled() const { return true; } +void ReplicationCoordinatorExternalStateMock::notifyOplogMetadataWaiters() {} + void ReplicationCoordinatorExternalStateMock::logTransitionToPrimaryToOplog(OperationContext* txn) { _lastOpTime = OpTime(Timestamp(1, 0), 1); } diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h index a6075807c2a..58d66934e77 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h @@ -78,6 +78,7 @@ public: virtual void updateCommittedSnapshot(SnapshotName newCommitPoint); virtual void forceSnapshotCreation(); virtual bool snapshotsEnabled() const; + virtual void notifyOplogMetadataWaiters(); /** * Adds "host" to the list of hosts that this mock will match when responding to "isSelf" diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 5436ed31f57..1ef937a4dc8 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -2727,6 +2727,8 @@ void ReplicationCoordinatorImpl::_setLastCommittedOpTime_inlock(const OpTime& co _lastCommittedOpTime = committedOpTime; + _externalState->notifyOplogMetadataWaiters(); + auto maxSnapshotForOpTime = SnapshotInfo{committedOpTime, SnapshotName::max()}; if (!_uncommittedSnapshots.empty() && _uncommittedSnapshots.front() <= maxSnapshotForOpTime) { // At least one uncommitted snapshot is ready to be blessed as committed. diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index abecbba1a34..d7e8edf0dcd 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -230,7 +230,8 @@ Status AsyncResultsMerger::askForNextBatch_inlock(size_t remoteIndex) { } BSONObj cmdObj = remote.cursorId - ? GetMoreRequest(_params.nsString, *remote.cursorId, adjustedBatchSize, boost::none) + ? GetMoreRequest( + _params.nsString, *remote.cursorId, adjustedBatchSize, boost::none, boost::none) .toBSON() : *remote.cmdObj; diff --git a/src/mongo/s/strategy.cpp b/src/mongo/s/strategy.cpp index 3f1b66a7c04..109f4623643 100644 --- a/src/mongo/s/strategy.cpp +++ b/src/mongo/s/strategy.cpp @@ -576,7 +576,7 @@ void Strategy::getMore(OperationContext* txn, Request& request) { if (ntoreturn) { batchSize = ntoreturn; } - GetMoreRequest getMoreRequest(NamespaceString(ns), id, batchSize, boost::none); + GetMoreRequest getMoreRequest(NamespaceString(ns), id, batchSize, boost::none, boost::none); auto cursorResponse = ClusterFind::runGetMore(txn, getMoreRequest); if (cursorResponse == ErrorCodes::CursorNotFound) { diff --git a/src/mongo/shell/shardingtest.js b/src/mongo/shell/shardingtest.js index d2d1eae7f5b..eee765ff745 100644 --- a/src/mongo/shell/shardingtest.js +++ b/src/mongo/shell/shardingtest.js @@ -353,9 +353,7 @@ ShardingTest = function( testName , numShards , verboseLevel , numMongos , other var config = this.configRS.getReplSetConfig(); config.configsvr = true; - config.protocolVersion = 0; // SERVER-20604 config.settings = config.settings || {}; - config.settings.heartbeatIntervalMillis = 100; // SERVER-20493 this.configRS.initiate(config); this.configRS.getMaster(); // Wait for master to be elected before starting mongos |