summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2015-05-21 13:53:55 -0400
committerBenety Goh <benety@mongodb.com>2015-05-21 19:43:45 -0400
commitfb26d2b0397e5b3c4bf2e90ba6838a0f5784f35c (patch)
tree240c30e35f82709c606c58a656d1c18e23313d79
parent0ea58cb3bc5ad4d0dbf06c0e8b12e2a1701fb56d (diff)
downloadmongo-fb26d2b0397e5b3c4bf2e90ba6838a0f5784f35c.tar.gz
SERVER-18035 fixed fetcher wait() to block until fetcher is inactive
-rw-r--r--src/mongo/db/repl/fetcher.cpp69
-rw-r--r--src/mongo/db/repl/fetcher.h14
-rw-r--r--src/mongo/db/repl/fetcher_test.cpp20
3 files changed, 63 insertions, 40 deletions
diff --git a/src/mongo/db/repl/fetcher.cpp b/src/mongo/db/repl/fetcher.cpp
index 6ae2419d50d..d89a761cb41 100644
--- a/src/mongo/db/repl/fetcher.cpp
+++ b/src/mongo/db/repl/fetcher.cpp
@@ -30,8 +30,6 @@
#include "mongo/db/repl/fetcher.h"
-#include <boost/thread/lock_guard.hpp>
-
#include "mongo/db/jsobj.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/replication_executor.h"
@@ -161,10 +159,15 @@ namespace {
uassert(ErrorCodes::BadValue, "callback function cannot be null", work);
}
- Fetcher::~Fetcher() { }
+ Fetcher::~Fetcher() {
+ DESTRUCTOR_GUARD(
+ cancel();
+ wait();
+ );
+ }
std::string Fetcher::getDiagnosticString() const {
- boost::lock_guard<boost::mutex> lk(_mutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
str::stream output;
output << "Fetcher";
output << " executor: " << _executor->getDiagnosticString();
@@ -176,19 +179,22 @@ namespace {
}
bool Fetcher::isActive() const {
- boost::lock_guard<boost::mutex> lk(_mutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
return _active;
}
Status Fetcher::schedule() {
- boost::lock_guard<boost::mutex> lk(_mutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (_active) {
+ return Status(ErrorCodes::IllegalOperation, "fetcher already scheduled");
+ }
return _schedule_inlock(_cmdObj, kFirstBatchFieldName);
}
void Fetcher::cancel() {
ReplicationExecutor::CallbackHandle remoteCommandCallbackHandle;
{
- boost::lock_guard<boost::mutex> lk(_mutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
if (!_active) {
return;
@@ -202,26 +208,11 @@ namespace {
}
void Fetcher::wait() {
- ReplicationExecutor::CallbackHandle remoteCommandCallbackHandle;
- {
- boost::lock_guard<boost::mutex> lk(_mutex);
-
- if (!_active) {
- return;
- }
-
- remoteCommandCallbackHandle = _remoteCommandCallbackHandle;
- }
-
- invariant(remoteCommandCallbackHandle.isValid());
- _executor->wait(remoteCommandCallbackHandle);
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _condition.wait(lk, [this]() { return !_active; });
}
Status Fetcher::_schedule_inlock(const BSONObj& cmdObj, const char* batchFieldName) {
- if (_active) {
- return Status(ErrorCodes::IllegalOperation, "fetcher already scheduled");
- }
-
StatusWith<ReplicationExecutor::CallbackHandle> scheduleResult =
_executor->scheduleRemoteCommand(
RemoteCommandRequest(_source, _dbname, cmdObj),
@@ -238,13 +229,11 @@ namespace {
void Fetcher::_callback(const ReplicationExecutor::RemoteCommandCallbackData& rcbd,
const char* batchFieldName) {
- boost::lock_guard<boost::mutex> lk(_mutex);
- _active = false;
-
NextAction nextAction = NextAction::kNoAction;
if (!rcbd.response.isOK()) {
_work(StatusWith<Fetcher::BatchData>(rcbd.response.getStatus()), &nextAction);
+ _finishCallback();
return;
}
@@ -252,6 +241,7 @@ namespace {
Status status = getStatusFromCommandResult(cursorReponseObj);
if (!status.isOK()) {
_work(StatusWith<Fetcher::BatchData>(status), &nextAction);
+ _finishCallback();
return;
}
@@ -261,6 +251,7 @@ namespace {
status = parseCursorResponse(cursorReponseObj, batchFieldName, &batchData, &nss);
if (!status.isOK()) {
_work(StatusWith<Fetcher::BatchData>(status), &nextAction);
+ _finishCallback();
return;
}
@@ -272,21 +263,29 @@ namespace {
// Callback function _work may modify nextAction to request the fetcher
// not to schedule a getMore command.
- if (nextAction != NextAction::kContinue) {
+ if (!batchData.cursorId || nextAction != NextAction::kContinue) {
+ _finishCallback();
return;
}
- nextAction = NextAction::kNoAction;
-
- if (batchData.cursorId) {
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
BSONObj getMoreCmdObj = BSON("getMore" << batchData.cursorId <<
"collection" << nss.coll());
status = _schedule_inlock(getMoreCmdObj, kNextBatchFieldName);
- if (!status.isOK()) {
- _work(StatusWith<Fetcher::BatchData>(status), &nextAction);
- return;
- }
}
+ if (!status.isOK()) {
+ nextAction = NextAction::kNoAction;
+ _work(StatusWith<Fetcher::BatchData>(status), &nextAction);
+ _finishCallback();
+ return;
+ }
+ }
+
+ void Fetcher::_finishCallback() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _active = false;
+ _condition.notify_all();
}
} // namespace repl
diff --git a/src/mongo/db/repl/fetcher.h b/src/mongo/db/repl/fetcher.h
index 8057004681f..33eba1b25e4 100644
--- a/src/mongo/db/repl/fetcher.h
+++ b/src/mongo/db/repl/fetcher.h
@@ -28,7 +28,6 @@
#pragma once
-#include <boost/thread/mutex.hpp>
#include <string>
#include <vector>
@@ -39,6 +38,8 @@
#include "mongo/db/clientcursor.h"
#include "mongo/db/repl/replication_executor.h"
#include "mongo/stdx/functional.h"
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/stdx/mutex.h"
#include "mongo/util/net/hostandport.h"
namespace mongo {
@@ -138,7 +139,7 @@ namespace repl {
void cancel();
/**
- * Waits for active remote command request to complete.
+ * Waits for remote command requests to complete.
* Returns immediately if fetcher is not active.
*/
void wait();
@@ -156,6 +157,11 @@ namespace repl {
void _callback(const ReplicationExecutor::RemoteCommandCallbackData& rcbd,
const char* batchFieldName);
+ /**
+ * Sets fetcher state to inactive and notifies waiters.
+ */
+ void _finishCallback();
+
// Not owned by us.
ReplicationExecutor* _executor;
@@ -165,7 +171,9 @@ namespace repl {
CallbackFn _work;
// Protects member data of this Fetcher.
- mutable boost::mutex _mutex;
+ mutable stdx::mutex _mutex;
+
+ mutable stdx::condition_variable _condition;
// _active is true when Fetcher is scheduled to be run by the executor.
bool _active;
diff --git a/src/mongo/db/repl/fetcher_test.cpp b/src/mongo/db/repl/fetcher_test.cpp
index ed60337afab..4cccf217311 100644
--- a/src/mongo/db/repl/fetcher_test.cpp
+++ b/src/mongo/db/repl/fetcher_test.cpp
@@ -138,7 +138,6 @@ namespace {
ASSERT_TRUE(fetcher->isActive());
getNet()->runReadyNetworkOperations();
ASSERT_FALSE(getNet()->hasReadyRequests());
- fetcher->wait();
ASSERT_FALSE(fetcher->isActive());
}
@@ -405,6 +404,24 @@ namespace {
ASSERT_TRUE(Fetcher::NextAction::kNoAction == nextAction);
}
+ TEST_F(FetcherTest, SetNextActionToContinueWhenNextBatchIsNotAvailable) {
+ ASSERT_OK(fetcher->schedule());
+ const BSONObj doc = BSON("_id" << 1);
+ callbackHook = [](const StatusWith<Fetcher::BatchData>& fetchResult,
+ Fetcher::NextAction* nextAction) {
+ *nextAction = Fetcher::NextAction::kContinue;
+ };
+ processNetworkResponse(BSON("cursor" << BSON("id" << 0LL <<
+ "ns" << "db.coll" <<
+ "firstBatch" << BSON_ARRAY(doc)) <<
+ "ok" << 1));
+ ASSERT_OK(status);
+ ASSERT_EQUALS(0, cursorId);
+ ASSERT_EQUALS(1U, documents.size());
+ ASSERT_EQUALS(doc, documents.front());
+ ASSERT_TRUE(Fetcher::NextAction::kNoAction == nextAction);
+ }
+
TEST_F(FetcherTest, FetchMultipleBatches) {
ASSERT_OK(fetcher->schedule());
const BSONObj doc = BSON("_id" << 1);
@@ -446,7 +463,6 @@ namespace {
ASSERT_FALSE(fetcher->isActive());
ASSERT_FALSE(getNet()->hasReadyRequests());
- ASSERT_FALSE(fetcher->isActive());
}
TEST_F(FetcherTest, ScheduleGetMoreAndCancel) {