summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorsamantharitter <samantha.ritter@10gen.com>2017-08-15 16:17:37 -0400
committersamantharitter <samantha.ritter@10gen.com>2017-08-18 08:49:39 -0400
commit854cc3ca62115c0296e27c75ff017a11614254c6 (patch)
treea68e64b99d5fb4ece3c4562aa5463d42cd02c66c /src
parent583127818f1ead21b67a57eb117b9678232e5472 (diff)
downloadmongo-854cc3ca62115c0296e27c75ff017a11614254c6.tar.gz
SERVER-29202 Implement SessionsCollectionRS
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/SConscript21
-rw-r--r--src/mongo/db/commands/refresh_logical_session_cache_now.cpp5
-rw-r--r--src/mongo/db/logical_session_cache.cpp19
-rw-r--r--src/mongo/db/logical_session_cache.h5
-rw-r--r--src/mongo/db/logical_session_cache_factory_mongod.cpp5
-rw-r--r--src/mongo/db/logical_session_cache_test.cpp28
-rw-r--r--src/mongo/db/sessions_collection.cpp122
-rw-r--r--src/mongo/db/sessions_collection.h13
-rw-r--r--src/mongo/db/sessions_collection_rs.cpp170
-rw-r--r--src/mongo/db/sessions_collection_rs.h77
-rw-r--r--src/mongo/db/sessions_collection_standalone.cpp19
-rw-r--r--src/mongo/db/sessions_collection_standalone.h3
12 files changed, 418 insertions, 69 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 8a07cca985f..e166bea6c81 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -979,6 +979,8 @@ env.Library(
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/client/clientdriver',
+ '$BUILD_DIR/mongo/s/write_ops/batch_write_types',
'logical_session_id',
],
)
@@ -996,6 +998,22 @@ env.Library(
)
env.Library(
+ target='sessions_collection_rs',
+ source=[
+ 'sessions_collection_rs.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/client/remote_command_targeter',
+ '$BUILD_DIR/mongo/db/auth/authcommon',
+ '$BUILD_DIR/mongo/db/concurrency/lock_manager',
+ '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
+ 'dbdirectclient',
+ 'sessions_collection',
+ ],
+)
+
+env.Library(
target='sessions_collection_standalone',
source=[
'sessions_collection_standalone.cpp',
@@ -1051,7 +1069,8 @@ envWithAsio.Library(
LIBDEPS=[
'logical_session_cache',
'service_liason_mongod',
- 'sessions_collection_mock', # TODO SERVER-29202, SERVER-29203
+ 'sessions_collection_mock', # TODO SERVER-29203
+ 'sessions_collection_rs',
'sessions_collection_standalone',
],
)
diff --git a/src/mongo/db/commands/refresh_logical_session_cache_now.cpp b/src/mongo/db/commands/refresh_logical_session_cache_now.cpp
index dad267f9b16..f1b61473cda 100644
--- a/src/mongo/db/commands/refresh_logical_session_cache_now.cpp
+++ b/src/mongo/db/commands/refresh_logical_session_cache_now.cpp
@@ -74,7 +74,10 @@ public:
auto cache = LogicalSessionCache::get(opCtx);
auto client = opCtx->getClient();
- cache->refreshNow(client);
+ auto res = cache->refreshNow(client);
+ if (!res.isOK()) {
+ return appendCommandStatus(result, res);
+ }
return true;
}
diff --git a/src/mongo/db/logical_session_cache.cpp b/src/mongo/db/logical_session_cache.cpp
index 1631842fb60..d76c8040325 100644
--- a/src/mongo/db/logical_session_cache.cpp
+++ b/src/mongo/db/logical_session_cache.cpp
@@ -88,7 +88,7 @@ LogicalSessionCache::LogicalSessionCache(std::unique_ptr<ServiceLiason> service,
_service(std::move(service)),
_sessionsColl(std::move(collection)),
_cache(options.capacity) {
- PeriodicRunner::PeriodicJob job{[this](Client* client) { _refresh(client); },
+ PeriodicRunner::PeriodicJob job{[this](Client* client) { _periodicRefresh(client); },
duration_cast<Milliseconds>(_refreshInterval)};
_service->scheduleJob(std::move(job));
}
@@ -182,7 +182,7 @@ Status LogicalSessionCache::refreshSessions(OperationContext* opCtx,
return _sessionsColl->refreshSessions(opCtx, toRefresh, now());
}
-void LogicalSessionCache::refreshNow(Client* client) {
+Status LogicalSessionCache::refreshNow(Client* client) {
return _refresh(client);
}
@@ -195,7 +195,16 @@ size_t LogicalSessionCache::size() {
return _cache.size();
}
-void LogicalSessionCache::_refresh(Client* client) {
+void LogicalSessionCache::_periodicRefresh(Client* client) {
+ auto res = _refresh(client);
+ if (!res.isOK()) {
+ log() << "Failed to refresh session cache: " << res;
+ }
+
+ return;
+}
+
+Status LogicalSessionCache::_refresh(Client* client) {
LogicalSessionRecordSet activeSessions;
LogicalSessionRecordSet deadSessions;
@@ -267,7 +276,7 @@ void LogicalSessionCache::_refresh(Client* client) {
auto res = _sessionsColl->refreshSessions(opCtx, std::move(activeSessions), time);
if (!res.isOK()) {
// TODO SERVER-29709: handle network errors here.
- return;
+ return res;
}
}
@@ -279,6 +288,8 @@ void LogicalSessionCache::_refresh(Client* client) {
{
// TODO SERVER-29709: handle expiration separately from failure to refresh.
}
+
+ return Status::OK();
}
bool LogicalSessionCache::_isDead(const LogicalSessionRecord& record, Date_t now) const {
diff --git a/src/mongo/db/logical_session_cache.h b/src/mongo/db/logical_session_cache.h
index 0279fe0b7b6..0c903482a47 100644
--- a/src/mongo/db/logical_session_cache.h
+++ b/src/mongo/db/logical_session_cache.h
@@ -152,7 +152,7 @@ public:
* Refreshes the cache synchronously. This flushes all pending refreshes and
* inserts to the sessions collection.
*/
- void refreshNow(Client* client);
+ Status refreshNow(Client* client);
/**
* Returns the current time.
@@ -169,7 +169,8 @@ private:
* Internal methods to handle scheduling and perform refreshes for active
* session records contained within the cache.
*/
- void _refresh(Client* client);
+ void _periodicRefresh(Client* client);
+ Status _refresh(Client* client);
/**
* Returns true if a record has passed its given expiration.
diff --git a/src/mongo/db/logical_session_cache_factory_mongod.cpp b/src/mongo/db/logical_session_cache_factory_mongod.cpp
index 883e9a3da62..52fc1ed1ce3 100644
--- a/src/mongo/db/logical_session_cache_factory_mongod.cpp
+++ b/src/mongo/db/logical_session_cache_factory_mongod.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/service_liason_mongod.h"
#include "mongo/db/sessions_collection_mock.h"
+#include "mongo/db/sessions_collection_rs.h"
#include "mongo/db/sessions_collection_standalone.h"
#include "mongo/stdx/memory.h"
@@ -48,9 +49,7 @@ std::unique_ptr<SessionsCollection> makeSessionsCollection(LogicalSessionCacheSe
return stdx::make_unique<MockSessionsCollection>(
std::make_shared<MockSessionsCollectionImpl>());
case LogicalSessionCacheServer::kReplicaSet:
- // TODO SERVER-29202, replace with SessionsCollectionRS
- return stdx::make_unique<MockSessionsCollection>(
- std::make_shared<MockSessionsCollectionImpl>());
+ return stdx::make_unique<SessionsCollectionRS>();
case LogicalSessionCacheServer::kStandalone:
return stdx::make_unique<SessionsCollectionStandalone>();
default:
diff --git a/src/mongo/db/logical_session_cache_test.cpp b/src/mongo/db/logical_session_cache_test.cpp
index bf4a07e46c9..5fd4ad17bee 100644
--- a/src/mongo/db/logical_session_cache_test.cpp
+++ b/src/mongo/db/logical_session_cache_test.cpp
@@ -177,7 +177,7 @@ TEST_F(LogicalSessionCacheTest, FetchUpdatesLastUse) {
auto start = service()->now();
// Insert the record into the sessions collection with 'start'
- sessions()->add(makeLogicalSessionRecord(lsid, start));
+ ASSERT(cache()->startSession(opCtx(), makeLogicalSessionRecord(lsid, start)).isOK());
// Fast forward time and fetch
service()->fastForward(Milliseconds(500));
@@ -225,7 +225,7 @@ TEST_F(LogicalSessionCacheTest, StartSession) {
// Do refresh, cached records should get flushed to collection.
clearOpCtx();
- cache()->refreshNow(client());
+ ASSERT(cache()->refreshNow(client()).isOK());
ASSERT(sessions()->has(lsid));
// Try to start the same session again, should succeed.
@@ -269,7 +269,7 @@ TEST_F(LogicalSessionCacheTest, CacheRefreshesOwnRecords) {
// Wait for the refresh to happen
clearOpCtx();
service()->fastForward(kForceRefresh);
- cache()->refreshNow(client());
+ ASSERT(cache()->refreshNow(client()).isOK());
refreshFuture.wait();
ASSERT_EQ(refreshFuture.get(), 2);
@@ -294,7 +294,7 @@ TEST_F(LogicalSessionCacheTest, CacheRefreshesOwnRecords) {
clearOpCtx();
service()->fastForward(kSessionTimeout - kForceRefresh + Milliseconds(1));
- cache()->refreshNow(client());
+ ASSERT(cache()->refreshNow(client()).isOK());
refresh2Future.wait();
ASSERT_EQ(refresh2Future.get(), record1.getId());
}
@@ -311,7 +311,7 @@ TEST_F(LogicalSessionCacheTest, BasicSessionExpiration) {
service()->fastForward(Milliseconds(kSessionTimeout.count() + 5));
// Check that it is no longer in the cache
- cache()->refreshNow(client());
+ ASSERT(cache()->refreshNow(client()).isOK());
res = cache()->promote(record.getId());
// TODO SERVER-29709
// ASSERT(!res.isOK());
@@ -337,17 +337,17 @@ TEST_F(LogicalSessionCacheTest, LongRunningQueriesAreRefreshed) {
// Force a refresh, it should refresh our active session
service()->fastForward(kForceRefresh);
- cache()->refreshNow(client());
+ ASSERT(cache()->refreshNow(client()).isOK());
ASSERT_EQ(count, 1);
// Force a session timeout, session is still on the service
service()->fastForward(kSessionTimeout);
- cache()->refreshNow(client());
+ ASSERT(cache()->refreshNow(client()).isOK());
ASSERT_EQ(count, 2);
// Force another refresh, check that it refreshes that active lsid again
service()->fastForward(kForceRefresh);
- cache()->refreshNow(client());
+ ASSERT(cache()->refreshNow(client()).isOK());
ASSERT_EQ(count, 3);
}
@@ -368,7 +368,7 @@ TEST_F(LogicalSessionCacheTest, RefreshCachedAndServiceSignedLsidsTogether) {
// Force a refresh
clearOpCtx();
service()->fastForward(kForceRefresh);
- cache()->refreshNow(client());
+ ASSERT(cache()->refreshNow(client()).isOK());
}
// Test large sets of cache-only session lsids
@@ -388,7 +388,7 @@ TEST_F(LogicalSessionCacheTest, ManySignedLsidsInCacheRefresh) {
// Force a refresh
clearOpCtx();
service()->fastForward(kForceRefresh);
- cache()->refreshNow(client());
+ ASSERT(cache()->refreshNow(client()).isOK());
}
// Test larger sets of service-only session lsids
@@ -408,7 +408,7 @@ TEST_F(LogicalSessionCacheTest, ManyLongRunningSessionsRefresh) {
// Force a refresh
clearOpCtx();
service()->fastForward(kForceRefresh);
- cache()->refreshNow(client());
+ ASSERT(cache()->refreshNow(client()).isOK());
}
// Test larger mixed sets of cache/service active sessions
@@ -433,7 +433,7 @@ TEST_F(LogicalSessionCacheTest, ManySessionsRefreshComboDeluxe) {
// Force a refresh
clearOpCtx();
service()->fastForward(kForceRefresh);
- cache()->refreshNow(client());
+ ASSERT(cache()->refreshNow(client()).isOK());
ASSERT_EQ(nRefreshed, count * 2);
// Remove all of the service sessions, should just refresh the cache entries
@@ -445,14 +445,14 @@ TEST_F(LogicalSessionCacheTest, ManySessionsRefreshComboDeluxe) {
// Force another refresh
service()->fastForward(kForceRefresh);
- cache()->refreshNow(client());
+ ASSERT(cache()->refreshNow(client()).isOK());
// We should not have refreshed any sessions from the service, only the cache
ASSERT_EQ(nRefreshed, count);
// Force a third refresh
service()->fastForward(kForceRefresh);
- cache()->refreshNow(client());
+ ASSERT(cache()->refreshNow(client()).isOK());
// Again, we should have only refreshed sessions from the cache
ASSERT_EQ(nRefreshed, count);
diff --git a/src/mongo/db/sessions_collection.cpp b/src/mongo/db/sessions_collection.cpp
index 04f73281234..aa43c706ddb 100644
--- a/src/mongo/db/sessions_collection.cpp
+++ b/src/mongo/db/sessions_collection.cpp
@@ -31,10 +31,15 @@
#include "mongo/db/sessions_collection.h"
#include <memory>
+#include <vector>
#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/client/dbclientinterface.h"
#include "mongo/db/logical_session_id.h"
#include "mongo/db/ops/write_ops.h"
+#include "mongo/db/refresh_sessions_gen.h"
+#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/s/write_ops/batched_command_response.h"
#include "mongo/stdx/functional.h"
#include "mongo/stdx/memory.h"
@@ -69,34 +74,24 @@ BSONObj updateQuery(const LogicalSessionRecord& record, Date_t refreshTime) {
return updateBuilder.obj();
}
-template <typename InitBatchFn, typename AddLineFn, typename SendBatchFn, typename Container>
-Status runBulkCmd(StringData label,
- InitBatchFn&& initBatch,
- AddLineFn&& addLine,
- SendBatchFn&& sendBatch,
- const Container& items) {
- size_t i = 0;
- BufBuilder buf;
+template <typename TFactory, typename AddLineFn, typename SendFn, typename Container>
+Status runBulkGeneric(TFactory makeT, AddLineFn addLine, SendFn sendBatch, const Container& items) {
+ using T = decltype(makeT());
- boost::optional<BSONObjBuilder> batchBuilder;
- boost::optional<BSONArrayBuilder> entries;
+ size_t i = 0;
+ boost::optional<T> thing;
- auto setupBatchBuilder = [&] {
- buf.reset();
- batchBuilder.emplace(buf);
- initBatch(&(batchBuilder.get()));
- entries.emplace(batchBuilder->subarrayStart(label));
+ auto setupBatch = [&] {
+ i = 0;
+ thing.emplace(makeT());
};
- auto sendLocalBatch = [&] {
- entries->done();
- return sendBatch(batchBuilder->done());
- };
+ auto sendLocalBatch = [&] { return sendBatch(thing.value()); };
- setupBatchBuilder();
+ setupBatch();
for (const auto& item : items) {
- addLine(&(entries.get()), item);
+ addLine(*thing, item);
if (++i >= write_ops::kMaxWriteBatchSize) {
auto res = sendLocalBatch();
@@ -104,8 +99,7 @@ Status runBulkCmd(StringData label,
return res;
}
- setupBatchBuilder();
- i = 0;
+ setupBatch();
}
}
@@ -116,6 +110,34 @@ Status runBulkCmd(StringData label,
}
}
+template <typename InitBatchFn, typename AddLineFn, typename SendBatchFn, typename Container>
+Status runBulkCmd(StringData label,
+ InitBatchFn&& initBatch,
+ AddLineFn&& addLine,
+ SendBatchFn&& sendBatch,
+ const Container& items) {
+ BufBuilder buf;
+
+ boost::optional<BSONObjBuilder> batchBuilder;
+ boost::optional<BSONArrayBuilder> entries;
+
+ auto makeBatch = [&] {
+ buf.reset();
+ batchBuilder.emplace(buf);
+ initBatch(&(batchBuilder.get()));
+ entries.emplace(batchBuilder->subarrayStart(label));
+
+ return &(entries.get());
+ };
+
+ auto sendLocalBatch = [&](BSONArrayBuilder*) {
+ entries->done();
+ return sendBatch(batchBuilder->done());
+ };
+
+ return runBulkGeneric(makeBatch, addLine, sendLocalBatch, items);
+}
+
} // namespace
@@ -126,6 +148,38 @@ constexpr StringData SessionsCollection::kSessionsFullNS;
SessionsCollection::~SessionsCollection() = default;
+SessionsCollection::SendBatchFn SessionsCollection::makeSendFnForBatchWrite(DBClientBase* client) {
+ auto send = [client](BSONObj batch) -> Status {
+ BSONObj res;
+ if (!client->runCommand(SessionsCollection::kSessionsDb.toString(), batch, res)) {
+ return getStatusFromCommandResult(res);
+ }
+
+ BatchedCommandResponse response;
+ std::string errmsg;
+ if (!response.parseBSON(res, &errmsg)) {
+ return {ErrorCodes::FailedToParse, errmsg};
+ }
+
+ return response.toStatus();
+ };
+
+ return send;
+}
+
+SessionsCollection::SendBatchFn SessionsCollection::makeSendFnForCommand(DBClientBase* client) {
+ auto send = [client](BSONObj cmd) -> Status {
+ BSONObj res;
+ if (!client->runCommand(SessionsCollection::kSessionsDb.toString(), cmd, res)) {
+ return getStatusFromCommandResult(res);
+ }
+
+ return Status::OK();
+ };
+
+ return send;
+}
+
Status SessionsCollection::doRefresh(const LogicalSessionRecordSet& sessions,
Date_t refreshTime,
SendBatchFn send) {
@@ -143,6 +197,23 @@ Status SessionsCollection::doRefresh(const LogicalSessionRecordSet& sessions,
return runBulkCmd("updates", init, add, send, sessions);
}
+Status SessionsCollection::doRefreshExternal(const LogicalSessionRecordSet& sessions,
+ Date_t refreshTime,
+ SendBatchFn send) {
+ auto makeT = [] { return std::vector<LogicalSessionRecord>{}; };
+
+ auto add = [&refreshTime](std::vector<LogicalSessionRecord>& batch,
+ const LogicalSessionRecord& record) { batch.push_back(record); };
+
+ auto sendLocal = [&](std::vector<LogicalSessionRecord>& batch) {
+ RefreshSessionsCmdFromClusterMember idl;
+ idl.setRefreshSessionsInternal(batch);
+ return send(idl.toBSON());
+ };
+
+ return runBulkGeneric(makeT, add, sendLocal, sessions);
+}
+
Status SessionsCollection::doRemove(const LogicalSessionIdSet& sessions, SendBatchFn send) {
auto init = [](BSONObjBuilder* batch) {
batch->append("delete", kSessionsCollection);
@@ -156,4 +227,9 @@ Status SessionsCollection::doRemove(const LogicalSessionIdSet& sessions, SendBat
return runBulkCmd("deletes", init, add, send, sessions);
}
+Status SessionsCollection::doRemoveExternal(const LogicalSessionIdSet& sessions, SendBatchFn send) {
+ // TODO SERVER-28335 Implement endSessions, with internal counterpart.
+ return Status::OK();
+}
+
} // namespace mongo
diff --git a/src/mongo/db/sessions_collection.h b/src/mongo/db/sessions_collection.h
index 128898e63de..79a9b23535f 100644
--- a/src/mongo/db/sessions_collection.h
+++ b/src/mongo/db/sessions_collection.h
@@ -35,15 +35,17 @@ namespace mongo {
class BSONArrayBuilder;
class BSONObjBuilder;
+class DBClientBase;
class OperationContext;
/**
* An abstract interface describing the entrypoint into the sessions collection.
*
* Different server deployments (standalone, replica set, sharded cluster) should
- * implement their own class that fulfill this interface.
+ * implement their own classes that fulfill this interface.
*/
class SessionsCollection {
+
public:
virtual ~SessionsCollection();
@@ -77,17 +79,26 @@ public:
virtual Status removeRecords(OperationContext* opCtx, const LogicalSessionIdSet& sessions) = 0;
protected:
+ /**
+ * Makes a send function for the given client.
+ */
using SendBatchFn = stdx::function<Status(BSONObj batch)>;
+ SendBatchFn makeSendFnForCommand(DBClientBase* client);
+ SendBatchFn makeSendFnForBatchWrite(DBClientBase* client);
/**
* Formats and sends batches of refreshes for the given set of sessions.
*/
Status doRefresh(const LogicalSessionRecordSet& sessions, Date_t refreshTime, SendBatchFn send);
+ Status doRefreshExternal(const LogicalSessionRecordSet& sessions,
+ Date_t refreshTime,
+ SendBatchFn send);
/**
* Formats and sends batches of deletes for the given set of sessions.
*/
Status doRemove(const LogicalSessionIdSet& sessions, SendBatchFn send);
+ Status doRemoveExternal(const LogicalSessionIdSet& sessions, SendBatchFn send);
};
} // namespace mongo
diff --git a/src/mongo/db/sessions_collection_rs.cpp b/src/mongo/db/sessions_collection_rs.cpp
new file mode 100644
index 00000000000..58dc3bd0ad0
--- /dev/null
+++ b/src/mongo/db/sessions_collection_rs.cpp
@@ -0,0 +1,170 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/sessions_collection_rs.h"
+
+#include <boost/optional.hpp>
+#include <utility>
+
+#include "mongo/client/connection_string.h"
+#include "mongo/client/dbclientinterface.h"
+#include "mongo/client/query.h"
+#include "mongo/client/read_preference.h"
+#include "mongo/client/remote_command_targeter_factory_impl.h"
+#include "mongo/db/auth/internal_user_auth.h"
+#include "mongo/db/concurrency/d_concurrency.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/repl_set_config.h"
+#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/stdx/memory.h"
+
+namespace mongo {
+
+namespace {
+
+BSONObj lsidQuery(const LogicalSessionId& lsid) {
+ return BSON(LogicalSessionRecord::kIdFieldName << lsid.toBSON());
+}
+
+Status makePrimaryConnection(OperationContext* opCtx, boost::optional<ScopedDbConnection>* conn) {
+ auto coord = mongo::repl::ReplicationCoordinator::get(opCtx);
+ auto config = coord->getConfig();
+ if (!config.isInitialized()) {
+ return {ErrorCodes::NotYetInitialized, "Replication has not yet been configured"};
+ }
+
+ // Find the primary
+ RemoteCommandTargeterFactoryImpl factory;
+ auto targeter = factory.create(config.getConnectionString());
+ auto res = targeter->findHost(opCtx, ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+ if (!res.isOK()) {
+ return res.getStatus();
+ }
+
+ auto hostname = res.getValue().toString();
+
+ // Make a connection to the primary, auth, then send
+ try {
+ conn->emplace(hostname);
+ if (isInternalAuthSet()) {
+ (*conn)->get()->auth(getInternalUserAuthParams());
+ }
+ return Status::OK();
+ } catch (...) {
+ return exceptionToStatus();
+ }
+}
+
+template <typename Callback>
+Status runIfStandaloneOrPrimary(OperationContext* opCtx, Callback callback) {
+ Lock::DBLock lk(opCtx, SessionsCollection::kSessionsDb, MODE_IX);
+ Lock::CollectionLock lock(opCtx->lockState(), SessionsCollection::kSessionsFullNS, MODE_IX);
+
+ auto coord = mongo::repl::ReplicationCoordinator::get(opCtx);
+ if (coord->canAcceptWritesForDatabase(opCtx, SessionsCollection::kSessionsDb)) {
+ return callback();
+ }
+
+ return {ErrorCodes::NotMaster, "Cannot perform a local write"};
+}
+
+} // namespace
+
+StatusWith<LogicalSessionRecord> SessionsCollectionRS::fetchRecord(OperationContext* opCtx,
+ const LogicalSessionId& lsid) {
+
+ DBDirectClient client(opCtx);
+ auto cursor = client.query(kSessionsFullNS.toString(), lsidQuery(lsid), 1);
+ if (!cursor->more()) {
+ return {ErrorCodes::NoSuchSession, "No matching record in the sessions collection"};
+ }
+
+ try {
+ IDLParserErrorContext ctx("LogicalSessionRecord");
+ return LogicalSessionRecord::parse(ctx, cursor->next());
+ } catch (...) {
+ return exceptionToStatus();
+ }
+}
+
+Status SessionsCollectionRS::refreshSessions(OperationContext* opCtx,
+ const LogicalSessionRecordSet& sessions,
+ Date_t refreshTime) {
+ bool ran = false;
+
+ // If we are the primary, write directly to ourself.
+ auto status = runIfStandaloneOrPrimary(opCtx, [&] {
+ ran = true;
+ DBDirectClient client(opCtx);
+ return doRefresh(sessions, refreshTime, makeSendFnForBatchWrite(&client));
+ });
+
+ if (ran) {
+ return status;
+ }
+
+ // If we are not writeable, then send refreshSessions cmd to the primary.
+ boost::optional<ScopedDbConnection> conn;
+ auto res = makePrimaryConnection(opCtx, &conn);
+ if (!res.isOK()) {
+ return res;
+ }
+
+ return doRefreshExternal(sessions, refreshTime, makeSendFnForCommand(conn->get()));
+}
+
+Status SessionsCollectionRS::removeRecords(OperationContext* opCtx,
+ const LogicalSessionIdSet& sessions) {
+ bool ran = false;
+
+ // If we are the primary, write directly to ourself.
+ auto status = runIfStandaloneOrPrimary(opCtx, [&] {
+ ran = true;
+ DBDirectClient client(opCtx);
+ return doRemove(sessions, makeSendFnForBatchWrite(&client));
+ });
+
+ if (ran) {
+ return status;
+ }
+
+ // If we are not writeable, then send endSessions cmd to the primary
+ boost::optional<ScopedDbConnection> conn;
+ auto res = makePrimaryConnection(opCtx, &conn);
+ if (!res.isOK()) {
+ return res;
+ }
+
+ return doRemoveExternal(sessions, makeSendFnForCommand(conn->get()));
+}
+
+
+} // namespace mongo
diff --git a/src/mongo/db/sessions_collection_rs.h b/src/mongo/db/sessions_collection_rs.h
new file mode 100644
index 00000000000..7c08ae2121c
--- /dev/null
+++ b/src/mongo/db/sessions_collection_rs.h
@@ -0,0 +1,77 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "mongo/client/connection_string.h"
+#include "mongo/client/connpool.h"
+#include "mongo/client/remote_command_targeter.h"
+#include "mongo/db/logical_session_id.h"
+#include "mongo/db/sessions_collection.h"
+#include "mongo/util/time_support.h"
+
+namespace mongo {
+
+class DBDirectClient;
+class OperationContext;
+class RemoteCommandTargeter;
+
+/**
+ * Accesses the sessions collection for replica set members.
+ */
+class SessionsCollectionRS : public SessionsCollection {
+public:
+ /**
+ * Constructs a new SessionsCollectionRS.
+ */
+ SessionsCollectionRS() = default;
+
+ /**
+ * Returns a LogicalSessionRecord for the given session id, or an error if
+ * no such record was found.
+ */
+ StatusWith<LogicalSessionRecord> fetchRecord(OperationContext* opCtx,
+ const LogicalSessionId& lsid) override;
+
+ /**
+ * Updates the last-use times on the given sessions to be greater than
+ * or equal to the current time.
+ */
+ Status refreshSessions(OperationContext* opCtx,
+ const LogicalSessionRecordSet& sessions,
+ Date_t refreshTime) override;
+
+ /**
+ * Removes the authoritative records for the specified sessions.
+ */
+ Status removeRecords(OperationContext* opCtx, const LogicalSessionIdSet& sessions) override;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/sessions_collection_standalone.cpp b/src/mongo/db/sessions_collection_standalone.cpp
index 093b61ef959..bbff1db1757 100644
--- a/src/mongo/db/sessions_collection_standalone.cpp
+++ b/src/mongo/db/sessions_collection_standalone.cpp
@@ -64,28 +64,13 @@ Status SessionsCollectionStandalone::refreshSessions(OperationContext* opCtx,
const LogicalSessionRecordSet& sessions,
Date_t refreshTime) {
DBDirectClient client(opCtx);
- return doRefresh(sessions, refreshTime, makeSendFn(&client));
+ return doRefresh(sessions, refreshTime, makeSendFnForBatchWrite(&client));
}
Status SessionsCollectionStandalone::removeRecords(OperationContext* opCtx,
const LogicalSessionIdSet& sessions) {
DBDirectClient client(opCtx);
- return doRemove(sessions, makeSendFn(&client));
+ return doRemove(sessions, makeSendFnForBatchWrite(&client));
}
-SessionsCollection::SendBatchFn SessionsCollectionStandalone::makeSendFn(DBDirectClient* client) {
- auto send = [client](BSONObj batch) -> Status {
- BSONObj res;
- auto ok = client->runCommand(SessionsCollection::kSessionsDb.toString(), batch, res);
- if (!ok) {
- return {ErrorCodes::UnknownError,
- client->getLastError(SessionsCollection::kSessionsDb.toString())};
- }
- return Status::OK();
- };
-
- return send;
-}
-
-
} // namespace mongo
diff --git a/src/mongo/db/sessions_collection_standalone.h b/src/mongo/db/sessions_collection_standalone.h
index 0fee4cbd164..949d7ec2e0a 100644
--- a/src/mongo/db/sessions_collection_standalone.h
+++ b/src/mongo/db/sessions_collection_standalone.h
@@ -61,9 +61,6 @@ public:
* Removes the authoritative records for the specified sessions.
*/
Status removeRecords(OperationContext* opCtx, const LogicalSessionIdSet& sessions) override;
-
-private:
- SessionsCollection::SendBatchFn makeSendFn(DBDirectClient* client);
};
} // namespace mongo