summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorEric Milkie <milkie@10gen.com>2015-10-05 10:39:54 -0400
committerEric Milkie <milkie@10gen.com>2015-10-06 09:26:21 -0400
commita8e6c0ebc70fad95a5e05281076cfa4b6db500c2 (patch)
treefded1cf6e1af1957f108cb83f4a9406c9f019423 /src
parent7e699ba3b1de8b7081d4e7ec8c5e0ebbde8bb178 (diff)
downloadmongo-a8e6c0ebc70fad95a5e05281076cfa4b6db500c2.tar.gz
SERVER-20604 unblock AwaitData queries if commit level needs updating downstream.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/catalog/collection.h10
-rw-r--r--src/mongo/db/commands/getmore_cmd.cpp53
-rw-r--r--src/mongo/db/query/SConscript2
-rw-r--r--src/mongo/db/query/getmore_request.cpp22
-rw-r--r--src/mongo/db/query/getmore_request.h6
-rw-r--r--src/mongo/db/query/getmore_request_test.cpp7
-rw-r--r--src/mongo/db/repl/bgsync.cpp1
-rw-r--r--src/mongo/db/repl/oplog.cpp5
-rw-r--r--src/mongo/db/repl/oplog.h6
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state.h2
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp5
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.h1
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.h1
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp2
-rw-r--r--src/mongo/s/query/async_results_merger.cpp3
-rw-r--r--src/mongo/s/strategy.cpp2
-rw-r--r--src/mongo/shell/shardingtest.js2
18 files changed, 95 insertions, 37 deletions
diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h
index 49276f55f1c..d5956382f91 100644
--- a/src/mongo/db/catalog/collection.h
+++ b/src/mongo/db/catalog/collection.h
@@ -407,6 +407,11 @@ public:
_minVisibleSnapshot = name;
}
+ /**
+ * Notify (capped collection) waiters of data changes, like an insert.
+ */
+ void notifyCappedWaitersIfNeeded();
+
private:
/**
* Returns a non-ok Status if document does not pass this collection's validator.
@@ -428,11 +433,6 @@ private:
Status aboutToDeleteCapped(OperationContext* txn, const RecordId& loc, RecordData data);
/**
- * Notify (capped collection) waiters of data changes, like an insert.
- */
- void notifyCappedWaitersIfNeeded();
-
- /**
* same semantics as insertDocument, but doesn't do:
* - some user error checks
* - adjust padding
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp
index 34aa1615810..b349093e212 100644
--- a/src/mongo/db/commands/getmore_cmd.cpp
+++ b/src/mongo/db/commands/getmore_cmd.cpp
@@ -300,27 +300,38 @@ public:
// If this is an await data cursor, and we hit EOF without generating any results, then
// we block waiting for new data to arrive.
if (isCursorAwaitData(cursor) && state == PlanExecutor::IS_EOF && numResults == 0) {
- // Save the PlanExecutor and drop our locks.
- exec->saveState();
- ctx.reset();
-
- // Block waiting for data.
- Microseconds timeout(CurOp::get(txn)->getRemainingMaxTimeMicros());
- notifier->wait(notifierVersion, timeout);
- notifier.reset();
-
- // Set expected latency to match wait time. This makes sure the logs aren't spammed
- // by awaitData queries that exceed slowms due to blocking on the CappedInsertNotifier.
- CurOp::get(txn)->setExpectedLatencyMs(durationCount<Milliseconds>(timeout));
-
- ctx.reset(new AutoGetCollectionForRead(txn, request.nss));
- exec->restoreState();
-
- // We woke up because either the timed_wait expired, or there was more data. Either
- // way, attempt to generate another batch of results.
- batchStatus = generateBatch(cursor, request, &nextBatch, &state, &numResults);
- if (!batchStatus.isOK()) {
- return appendCommandStatus(result, batchStatus);
+ auto replCoord = repl::ReplicationCoordinator::get(txn);
+ // Return immediately if we need to update the commit time.
+ if (request.lastKnownCommittedOpTime == replCoord->getLastCommittedOpTime()) {
+ // Retrieve the notifier which we will wait on until new data arrives. We make sure
+ // to do this in the lock because once we drop the lock it is possible for the
+ // collection to become invalid. The notifier itself will outlive the collection if
+ // the collection is dropped, as we keep a shared_ptr to it.
+ auto notifier = ctx->getCollection()->getCappedInsertNotifier();
+
+ // Save the PlanExecutor and drop our locks.
+ exec->saveState();
+ ctx.reset();
+
+ // Block waiting for data.
+ Microseconds timeout(CurOp::get(txn)->getRemainingMaxTimeMicros());
+ notifier->wait(notifierVersion, timeout);
+ notifier.reset();
+
+ // Set expected latency to match wait time. This makes sure the logs aren't spammed
+ // by awaitData queries that exceed slowms due to blocking on the
+ // CappedInsertNotifier.
+ CurOp::get(txn)->setExpectedLatencyMs(durationCount<Milliseconds>(timeout));
+
+ ctx.reset(new AutoGetCollectionForRead(txn, request.nss));
+ exec->restoreState();
+
+ // We woke up because either the timed_wait expired, or there was more data. Either
+ // way, attempt to generate another batch of results.
+ batchStatus = generateBatch(cursor, request, &nextBatch, &state, &numResults);
+ if (!batchStatus.isOK()) {
+ return appendCommandStatus(result, batchStatus);
+ }
}
}
diff --git a/src/mongo/db/query/SConscript b/src/mongo/db/query/SConscript
index dfcb8f9265c..f4a9a1c7b02 100644
--- a/src/mongo/db/query/SConscript
+++ b/src/mongo/db/query/SConscript
@@ -121,8 +121,10 @@ env.Library(
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/bson/util/bson_extract',
'$BUILD_DIR/mongo/db/common',
'$BUILD_DIR/mongo/db/namespace_string',
+ '$BUILD_DIR/mongo/db/repl/optime',
'$BUILD_DIR/mongo/rpc/command_status',
]
)
diff --git a/src/mongo/db/query/getmore_request.cpp b/src/mongo/db/query/getmore_request.cpp
index 829a3740ea2..9f388d2cbbd 100644
--- a/src/mongo/db/query/getmore_request.cpp
+++ b/src/mongo/db/query/getmore_request.cpp
@@ -34,6 +34,7 @@
#include <boost/optional.hpp>
+#include "mongo/bson/util/bson_extract.h"
#include "mongo/db/namespace_string.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/stringutils.h"
@@ -46,6 +47,7 @@ const char kCollectionField[] = "collection";
const char kBatchSizeField[] = "batchSize";
const char kMaxTimeMSField[] = "maxTimeMS";
const char kTermField[] = "term";
+const char kLastKnownCommittedOpTimeField[] = "lastKnownCommittedOpTime";
} // namespace
@@ -56,8 +58,13 @@ GetMoreRequest::GetMoreRequest() : cursorid(0), batchSize(0) {}
GetMoreRequest::GetMoreRequest(NamespaceString namespaceString,
CursorId id,
boost::optional<long long> sizeOfBatch,
- boost::optional<long long> term)
- : nss(std::move(namespaceString)), cursorid(id), batchSize(sizeOfBatch), term(term) {}
+ boost::optional<long long> term,
+ boost::optional<repl::OpTime> lastKnownCommittedOpTime)
+ : nss(std::move(namespaceString)),
+ cursorid(id),
+ batchSize(sizeOfBatch),
+ term(term),
+ lastKnownCommittedOpTime(lastKnownCommittedOpTime) {}
Status GetMoreRequest::isValid() const {
if (!nss.isValid()) {
@@ -98,6 +105,7 @@ StatusWith<GetMoreRequest> GetMoreRequest::parseFromBSON(const std::string& dbna
// Optional fields.
boost::optional<long long> batchSize;
boost::optional<long long> term;
+ boost::optional<repl::OpTime> lastKnownCommittedOpTime;
for (BSONElement el : cmdObj) {
const char* fieldName = el.fieldName();
@@ -133,6 +141,13 @@ StatusWith<GetMoreRequest> GetMoreRequest::parseFromBSON(const std::string& dbna
str::stream() << "Field 'term' must be of type NumberLong in: " << cmdObj};
}
term = el.Long();
+ } else if (str::equals(fieldName, kLastKnownCommittedOpTimeField)) {
+ repl::OpTime ot;
+ Status status = bsonExtractOpTimeField(el.wrap(), kLastKnownCommittedOpTimeField, &ot);
+ if (!status.isOK()) {
+ return status;
+ }
+ lastKnownCommittedOpTime = ot;
} else if (!str::startsWith(fieldName, "$")) {
return {ErrorCodes::FailedToParse,
str::stream() << "Failed to parse: " << cmdObj << ". "
@@ -150,7 +165,8 @@ StatusWith<GetMoreRequest> GetMoreRequest::parseFromBSON(const std::string& dbna
str::stream() << "Field 'collection' missing in: " << cmdObj};
}
- GetMoreRequest request(NamespaceString(*fullns), *cursorid, batchSize, term);
+ GetMoreRequest request(
+ NamespaceString(*fullns), *cursorid, batchSize, term, lastKnownCommittedOpTime);
Status validStatus = request.isValid();
if (!validStatus.isOK()) {
return validStatus;
diff --git a/src/mongo/db/query/getmore_request.h b/src/mongo/db/query/getmore_request.h
index 972fea15012..56a1f2031b0 100644
--- a/src/mongo/db/query/getmore_request.h
+++ b/src/mongo/db/query/getmore_request.h
@@ -52,7 +52,8 @@ struct GetMoreRequest {
GetMoreRequest(NamespaceString namespaceString,
CursorId id,
boost::optional<long long> sizeOfBatch,
- boost::optional<long long> term);
+ boost::optional<long long> term,
+ boost::optional<repl::OpTime> lastKnownCommittedOpTime);
/**
* Construct a GetMoreRequest from the command specification and db name.
@@ -78,6 +79,9 @@ struct GetMoreRequest {
// Only internal queries from replication will typically have a term.
const boost::optional<long long> term;
+ // Only internal queries from replication will have a last known committed optime.
+ const boost::optional<repl::OpTime> lastKnownCommittedOpTime;
+
private:
/**
* Returns a non-OK status if there are semantic errors in the parsed request
diff --git a/src/mongo/db/query/getmore_request_test.cpp b/src/mongo/db/query/getmore_request_test.cpp
index d370551b4e7..de88e3ffc7f 100644
--- a/src/mongo/db/query/getmore_request_test.cpp
+++ b/src/mongo/db/query/getmore_request_test.cpp
@@ -186,7 +186,7 @@ TEST(GetMoreRequestTest, parseFromBSONIgnoreMaxTimeMS) {
}
TEST(GetMoreRequestTest, toBSONHasBatchSize) {
- GetMoreRequest request(NamespaceString("testdb.testcoll"), 123, 99, boost::none);
+ GetMoreRequest request(NamespaceString("testdb.testcoll"), 123, 99, boost::none, boost::none);
BSONObj requestObj = request.toBSON();
BSONObj expectedRequest = BSON("getMore" << CursorId(123) << "collection"
<< "testcoll"
@@ -195,7 +195,8 @@ TEST(GetMoreRequestTest, toBSONHasBatchSize) {
}
TEST(GetMoreRequestTest, toBSONMissingMatchSize) {
- GetMoreRequest request(NamespaceString("testdb.testcoll"), 123, boost::none, boost::none);
+ GetMoreRequest request(
+ NamespaceString("testdb.testcoll"), 123, boost::none, boost::none, boost::none);
BSONObj requestObj = request.toBSON();
BSONObj expectedRequest = BSON("getMore" << CursorId(123) << "collection"
<< "testcoll");
@@ -203,7 +204,7 @@ TEST(GetMoreRequestTest, toBSONMissingMatchSize) {
}
TEST(GetMoreRequestTest, toBSONHasTerm) {
- GetMoreRequest request(NamespaceString("testdb.testcoll"), 123, 99, 1);
+ GetMoreRequest request(NamespaceString("testdb.testcoll"), 123, 99, 1, boost::none);
BSONObj requestObj = request.toBSON();
BSONObj expectedRequest = BSON("getMore" << CursorId(123) << "collection"
<< "testcoll"
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 40afe9f7356..4e4a69385bd 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -557,6 +557,7 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>&
bob->append("maxTimeMS", durationCount<Milliseconds>(fetcherMaxTimeMS));
if (receivedMetadata) {
bob->append("term", _replCoord->getTerm());
+ _replCoord->getLastCommittedOpTime().append(bob, "lastKnownCommittedOpTime");
}
}
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index ffa20206be2..16c9afe5289 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -920,6 +920,11 @@ void oplogCheckCloseDatabase(OperationContext* txn, Database* db) {
_localOplogCollection = nullptr;
}
+void signalOplogWaiters() {
+ if (_localOplogCollection) {
+ _localOplogCollection->notifyCappedWaitersIfNeeded();
+ }
+}
MONGO_EXPORT_STARTUP_SERVER_PARAMETER(replSnapshotThreadThrottleMicros, int, 1000);
diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h
index cfde2ce8ccd..764d1cfe1c0 100644
--- a/src/mongo/db/repl/oplog.h
+++ b/src/mongo/db/repl/oplog.h
@@ -145,5 +145,11 @@ void setNewTimestamp(const Timestamp& newTime);
* Detects the current replication mode and sets the "_oplogCollectionName" accordingly.
*/
void setOplogCollectionName();
+
+/**
+ * Signal any waiting AwaitData queries on the oplog that there is new data or metadata available.
+ */
+void signalOplogWaiters();
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h
index 4114ba3a18c..4b5364f3424 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state.h
@@ -217,6 +217,8 @@ public:
* Returns whether or not the SnapshotThread is active.
*/
virtual bool snapshotsEnabled() const = 0;
+
+ virtual void notifyOplogMetadataWaiters() = 0;
};
} // namespace repl
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 215e6d96e0a..3aa445c029a 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -388,5 +388,10 @@ void ReplicationCoordinatorExternalStateImpl::forceSnapshotCreation() {
bool ReplicationCoordinatorExternalStateImpl::snapshotsEnabled() const {
return _snapshotThread != nullptr;
}
+
+void ReplicationCoordinatorExternalStateImpl::notifyOplogMetadataWaiters() {
+ signalOplogWaiters();
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
index 2b1687507b0..8dce3dbb632 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
@@ -75,6 +75,7 @@ public:
void updateCommittedSnapshot(SnapshotName newCommitPoint) final;
void forceSnapshotCreation() final;
virtual bool snapshotsEnabled() const;
+ virtual void notifyOplogMetadataWaiters();
std::string getNextOpContextThreadName();
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
index c4b4c5458aa..abf8b7ad0b1 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
@@ -214,6 +214,8 @@ bool ReplicationCoordinatorExternalStateMock::snapshotsEnabled() const {
return true;
}
+void ReplicationCoordinatorExternalStateMock::notifyOplogMetadataWaiters() {}
+
void ReplicationCoordinatorExternalStateMock::logTransitionToPrimaryToOplog(OperationContext* txn) {
_lastOpTime = OpTime(Timestamp(1, 0), 1);
}
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
index a6075807c2a..58d66934e77 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
@@ -78,6 +78,7 @@ public:
virtual void updateCommittedSnapshot(SnapshotName newCommitPoint);
virtual void forceSnapshotCreation();
virtual bool snapshotsEnabled() const;
+ virtual void notifyOplogMetadataWaiters();
/**
* Adds "host" to the list of hosts that this mock will match when responding to "isSelf"
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 5436ed31f57..1ef937a4dc8 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -2727,6 +2727,8 @@ void ReplicationCoordinatorImpl::_setLastCommittedOpTime_inlock(const OpTime& co
_lastCommittedOpTime = committedOpTime;
+ _externalState->notifyOplogMetadataWaiters();
+
auto maxSnapshotForOpTime = SnapshotInfo{committedOpTime, SnapshotName::max()};
if (!_uncommittedSnapshots.empty() && _uncommittedSnapshots.front() <= maxSnapshotForOpTime) {
// At least one uncommitted snapshot is ready to be blessed as committed.
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index abecbba1a34..d7e8edf0dcd 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -230,7 +230,8 @@ Status AsyncResultsMerger::askForNextBatch_inlock(size_t remoteIndex) {
}
BSONObj cmdObj = remote.cursorId
- ? GetMoreRequest(_params.nsString, *remote.cursorId, adjustedBatchSize, boost::none)
+ ? GetMoreRequest(
+ _params.nsString, *remote.cursorId, adjustedBatchSize, boost::none, boost::none)
.toBSON()
: *remote.cmdObj;
diff --git a/src/mongo/s/strategy.cpp b/src/mongo/s/strategy.cpp
index 3f1b66a7c04..109f4623643 100644
--- a/src/mongo/s/strategy.cpp
+++ b/src/mongo/s/strategy.cpp
@@ -576,7 +576,7 @@ void Strategy::getMore(OperationContext* txn, Request& request) {
if (ntoreturn) {
batchSize = ntoreturn;
}
- GetMoreRequest getMoreRequest(NamespaceString(ns), id, batchSize, boost::none);
+ GetMoreRequest getMoreRequest(NamespaceString(ns), id, batchSize, boost::none, boost::none);
auto cursorResponse = ClusterFind::runGetMore(txn, getMoreRequest);
if (cursorResponse == ErrorCodes::CursorNotFound) {
diff --git a/src/mongo/shell/shardingtest.js b/src/mongo/shell/shardingtest.js
index d2d1eae7f5b..eee765ff745 100644
--- a/src/mongo/shell/shardingtest.js
+++ b/src/mongo/shell/shardingtest.js
@@ -353,9 +353,7 @@ ShardingTest = function( testName , numShards , verboseLevel , numMongos , other
var config = this.configRS.getReplSetConfig();
config.configsvr = true;
- config.protocolVersion = 0; // SERVER-20604
config.settings = config.settings || {};
- config.settings.heartbeatIntervalMillis = 100; // SERVER-20493
this.configRS.initiate(config);
this.configRS.getMaster(); // Wait for master to be elected before starting mongos