summaryrefslogtreecommitdiff
path: root/src/mongo/executor/connection_pool_test_fixture.cpp
diff options
context:
space:
mode:
authorBen Caimano <ben.caimano@10gen.com>2019-05-23 13:18:38 -0400
committerBen Caimano <ben.caimano@10gen.com>2019-05-29 16:23:44 -0400
commit60008cd952996d5558b9fb4c5f66d1e2d1af0d4d (patch)
tree12bdd4ab9b3a2b31ea17dd619640a0ff668729c2 /src/mongo/executor/connection_pool_test_fixture.cpp
parentffd64883d70c9139d7b56d076e249f3fef77e54e (diff)
downloadmongo-60008cd952996d5558b9fb4c5f66d1e2d1af0d4d.tar.gz
SERVER-41318 Return SemiFutures for ConnectionPool
Diffstat (limited to 'src/mongo/executor/connection_pool_test_fixture.cpp')
-rw-r--r--src/mongo/executor/connection_pool_test_fixture.cpp87
1 files changed, 45 insertions, 42 deletions
diff --git a/src/mongo/executor/connection_pool_test_fixture.cpp b/src/mongo/executor/connection_pool_test_fixture.cpp
index 6c0321fa619..1f6217c6ef3 100644
--- a/src/mongo/executor/connection_pool_test_fixture.cpp
+++ b/src/mongo/executor/connection_pool_test_fixture.cpp
@@ -44,8 +44,6 @@ TimerImpl::~TimerImpl() {
}
void TimerImpl::setTimeout(Milliseconds timeout, TimeoutCallback cb) {
- _timers.erase(this);
-
_cb = std::move(cb);
_expiration = _global->now() + timeout;
@@ -53,9 +51,10 @@ void TimerImpl::setTimeout(Milliseconds timeout, TimeoutCallback cb) {
}
void TimerImpl::cancelTimeout() {
- _timers.erase(this);
TimeoutCallback cb;
_cb.swap(cb);
+
+ _timers.erase(this);
}
void TimerImpl::clear() {
@@ -72,7 +71,12 @@ void TimerImpl::fireIfNecessary() {
for (auto&& x : timers) {
if (_timers.count(x) && (x->_expiration <= now)) {
- x->_cb();
+ auto execCB = [cb = std::move(x->_cb)](auto&&) mutable {
+ std::move(cb)();
+ };
+ auto global = x->_global;
+ _timers.erase(x);
+ global->_executor->schedule(std::move(execCB));
}
}
}
@@ -113,18 +117,24 @@ void ConnectionImpl::clear() {
_pushRefreshQueue.clear();
}
-void ConnectionImpl::pushSetup(PushSetupCallback status) {
- _pushSetupQueue.push_back(status);
-
- if (_setupQueue.size()) {
- auto connPtr = _setupQueue.front();
- auto callback = _pushSetupQueue.front();
- _setupQueue.pop_front();
- _pushSetupQueue.pop_front();
+void ConnectionImpl::processSetup() {
+ auto connPtr = _setupQueue.front();
+ auto callback = std::move(_pushSetupQueue.front());
+ _setupQueue.pop_front();
+ _pushSetupQueue.pop_front();
- auto cb = connPtr->_setupCallback;
+ connPtr->_global->_executor->schedule([ connPtr, callback = std::move(callback) ](auto&&) {
+ auto cb = std::move(connPtr->_setupCallback);
connPtr->indicateUsed();
cb(connPtr, callback());
+ });
+}
+
+void ConnectionImpl::pushSetup(PushSetupCallback status) {
+ _pushSetupQueue.push_back(std::move(status));
+
+ if (_setupQueue.size()) {
+ processSetup();
}
}
@@ -136,19 +146,25 @@ size_t ConnectionImpl::setupQueueDepth() {
return _setupQueue.size();
}
-void ConnectionImpl::pushRefresh(PushRefreshCallback status) {
- _pushRefreshQueue.push_back(status);
-
- if (_refreshQueue.size()) {
- auto connPtr = _refreshQueue.front();
- auto callback = _pushRefreshQueue.front();
+void ConnectionImpl::processRefresh() {
+ auto connPtr = _refreshQueue.front();
+ auto callback = std::move(_pushRefreshQueue.front());
- _refreshQueue.pop_front();
- _pushRefreshQueue.pop_front();
+ _refreshQueue.pop_front();
+ _pushRefreshQueue.pop_front();
- auto cb = connPtr->_refreshCallback;
+ connPtr->_global->_executor->schedule([ connPtr, callback = std::move(callback) ](auto&&) {
+ auto cb = std::move(connPtr->_refreshCallback);
connPtr->indicateUsed();
cb(connPtr, callback());
+ });
+}
+
+void ConnectionImpl::pushRefresh(PushRefreshCallback status) {
+ _pushRefreshQueue.push_back(std::move(status));
+
+ if (_refreshQueue.size()) {
+ processRefresh();
}
}
@@ -161,7 +177,7 @@ size_t ConnectionImpl::refreshQueueDepth() {
}
void ConnectionImpl::setTimeout(Milliseconds timeout, TimeoutCallback cb) {
- _timer.setTimeout(timeout, cb);
+ _timer.setTimeout(timeout, std::move(cb));
}
void ConnectionImpl::cancelTimeout() {
@@ -172,20 +188,14 @@ void ConnectionImpl::setup(Milliseconds timeout, SetupCallback cb) {
_setupCallback = std::move(cb);
_timer.setTimeout(timeout, [this] {
- _setupCallback(this, Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, "timeout"));
+ auto setupCb = std::move(_setupCallback);
+ setupCb(this, Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, "timeout"));
});
_setupQueue.push_back(this);
if (_pushSetupQueue.size()) {
- auto connPtr = _setupQueue.front();
- auto callback = _pushSetupQueue.front();
- _setupQueue.pop_front();
- _pushSetupQueue.pop_front();
-
- auto refreshCb = connPtr->_setupCallback;
- connPtr->indicateUsed();
- refreshCb(connPtr, callback());
+ processSetup();
}
}
@@ -193,21 +203,14 @@ void ConnectionImpl::refresh(Milliseconds timeout, RefreshCallback cb) {
_refreshCallback = std::move(cb);
_timer.setTimeout(timeout, [this] {
- _refreshCallback(this, Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, "timeout"));
+ auto refreshCb = std::move(_refreshCallback);
+ refreshCb(this, Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, "timeout"));
});
_refreshQueue.push_back(this);
if (_pushRefreshQueue.size()) {
- auto connPtr = _refreshQueue.front();
- auto callback = _pushRefreshQueue.front();
-
- _refreshQueue.pop_front();
- _pushRefreshQueue.pop_front();
-
- auto refreshCb = connPtr->_refreshCallback;
- connPtr->indicateUsed();
- refreshCb(connPtr, callback());
+ processRefresh();
}
}