diff options
author | Ben Caimano <ben.caimano@10gen.com> | 2019-05-23 13:18:38 -0400 |
---|---|---|
committer | Ben Caimano <ben.caimano@10gen.com> | 2019-05-29 16:23:44 -0400 |
commit | 60008cd952996d5558b9fb4c5f66d1e2d1af0d4d (patch) | |
tree | 12bdd4ab9b3a2b31ea17dd619640a0ff668729c2 /src/mongo/executor/connection_pool_test_fixture.cpp | |
parent | ffd64883d70c9139d7b56d076e249f3fef77e54e (diff) | |
download | mongo-60008cd952996d5558b9fb4c5f66d1e2d1af0d4d.tar.gz |
SERVER-41318 Return SemiFutures for ConnectionPool
Diffstat (limited to 'src/mongo/executor/connection_pool_test_fixture.cpp')
-rw-r--r-- | src/mongo/executor/connection_pool_test_fixture.cpp | 87 |
1 files changed, 45 insertions, 42 deletions
diff --git a/src/mongo/executor/connection_pool_test_fixture.cpp b/src/mongo/executor/connection_pool_test_fixture.cpp index 6c0321fa619..1f6217c6ef3 100644 --- a/src/mongo/executor/connection_pool_test_fixture.cpp +++ b/src/mongo/executor/connection_pool_test_fixture.cpp @@ -44,8 +44,6 @@ TimerImpl::~TimerImpl() { } void TimerImpl::setTimeout(Milliseconds timeout, TimeoutCallback cb) { - _timers.erase(this); - _cb = std::move(cb); _expiration = _global->now() + timeout; @@ -53,9 +51,10 @@ void TimerImpl::setTimeout(Milliseconds timeout, TimeoutCallback cb) { } void TimerImpl::cancelTimeout() { - _timers.erase(this); TimeoutCallback cb; _cb.swap(cb); + + _timers.erase(this); } void TimerImpl::clear() { @@ -72,7 +71,12 @@ void TimerImpl::fireIfNecessary() { for (auto&& x : timers) { if (_timers.count(x) && (x->_expiration <= now)) { - x->_cb(); + auto execCB = [cb = std::move(x->_cb)](auto&&) mutable { + std::move(cb)(); + }; + auto global = x->_global; + _timers.erase(x); + global->_executor->schedule(std::move(execCB)); } } } @@ -113,18 +117,24 @@ void ConnectionImpl::clear() { _pushRefreshQueue.clear(); } -void ConnectionImpl::pushSetup(PushSetupCallback status) { - _pushSetupQueue.push_back(status); - - if (_setupQueue.size()) { - auto connPtr = _setupQueue.front(); - auto callback = _pushSetupQueue.front(); - _setupQueue.pop_front(); - _pushSetupQueue.pop_front(); +void ConnectionImpl::processSetup() { + auto connPtr = _setupQueue.front(); + auto callback = std::move(_pushSetupQueue.front()); + _setupQueue.pop_front(); + _pushSetupQueue.pop_front(); - auto cb = connPtr->_setupCallback; + connPtr->_global->_executor->schedule([ connPtr, callback = std::move(callback) ](auto&&) { + auto cb = std::move(connPtr->_setupCallback); connPtr->indicateUsed(); cb(connPtr, callback()); + }); +} + +void ConnectionImpl::pushSetup(PushSetupCallback status) { + _pushSetupQueue.push_back(std::move(status)); + + if (_setupQueue.size()) { + processSetup(); } } @@ -136,19 +146,25 @@ size_t ConnectionImpl::setupQueueDepth() { return _setupQueue.size(); } -void ConnectionImpl::pushRefresh(PushRefreshCallback status) { - _pushRefreshQueue.push_back(status); - - if (_refreshQueue.size()) { - auto connPtr = _refreshQueue.front(); - auto callback = _pushRefreshQueue.front(); +void ConnectionImpl::processRefresh() { + auto connPtr = _refreshQueue.front(); + auto callback = std::move(_pushRefreshQueue.front()); - _refreshQueue.pop_front(); - _pushRefreshQueue.pop_front(); + _refreshQueue.pop_front(); + _pushRefreshQueue.pop_front(); - auto cb = connPtr->_refreshCallback; + connPtr->_global->_executor->schedule([ connPtr, callback = std::move(callback) ](auto&&) { + auto cb = std::move(connPtr->_refreshCallback); connPtr->indicateUsed(); cb(connPtr, callback()); + }); +} + +void ConnectionImpl::pushRefresh(PushRefreshCallback status) { + _pushRefreshQueue.push_back(std::move(status)); + + if (_refreshQueue.size()) { + processRefresh(); } } @@ -161,7 +177,7 @@ size_t ConnectionImpl::refreshQueueDepth() { } void ConnectionImpl::setTimeout(Milliseconds timeout, TimeoutCallback cb) { - _timer.setTimeout(timeout, cb); + _timer.setTimeout(timeout, std::move(cb)); } void ConnectionImpl::cancelTimeout() { @@ -172,20 +188,14 @@ void ConnectionImpl::setup(Milliseconds timeout, SetupCallback cb) { _setupCallback = std::move(cb); _timer.setTimeout(timeout, [this] { - _setupCallback(this, Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, "timeout")); + auto setupCb = std::move(_setupCallback); + setupCb(this, Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, "timeout")); }); _setupQueue.push_back(this); if (_pushSetupQueue.size()) { - auto connPtr = _setupQueue.front(); - auto callback = _pushSetupQueue.front(); - _setupQueue.pop_front(); - _pushSetupQueue.pop_front(); - - auto refreshCb = connPtr->_setupCallback; - connPtr->indicateUsed(); - refreshCb(connPtr, callback()); + processSetup(); } } @@ -193,21 +203,14 @@ void ConnectionImpl::refresh(Milliseconds timeout, RefreshCallback cb) { _refreshCallback = std::move(cb); _timer.setTimeout(timeout, [this] { - _refreshCallback(this, Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, "timeout")); + auto refreshCb = std::move(_refreshCallback); + refreshCb(this, Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, "timeout")); }); _refreshQueue.push_back(this); if (_pushRefreshQueue.size()) { - auto connPtr = _refreshQueue.front(); - auto callback = _pushRefreshQueue.front(); - - _refreshQueue.pop_front(); - _pushRefreshQueue.pop_front(); - - auto refreshCb = connPtr->_refreshCallback; - connPtr->indicateUsed(); - refreshCb(connPtr, callback()); + processRefresh(); } } |