diff options
author | samantharitter <samantha.ritter@10gen.com> | 2017-08-15 16:17:37 -0400 |
---|---|---|
committer | samantharitter <samantha.ritter@10gen.com> | 2017-08-18 08:49:39 -0400 |
commit | 854cc3ca62115c0296e27c75ff017a11614254c6 (patch) | |
tree | a68e64b99d5fb4ece3c4562aa5463d42cd02c66c /src | |
parent | 583127818f1ead21b67a57eb117b9678232e5472 (diff) | |
download | mongo-854cc3ca62115c0296e27c75ff017a11614254c6.tar.gz |
SERVER-29202 Implement SessionsCollectionRS
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/SConscript | 21 | ||||
-rw-r--r-- | src/mongo/db/commands/refresh_logical_session_cache_now.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/logical_session_cache.cpp | 19 | ||||
-rw-r--r-- | src/mongo/db/logical_session_cache.h | 5 | ||||
-rw-r--r-- | src/mongo/db/logical_session_cache_factory_mongod.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/logical_session_cache_test.cpp | 28 | ||||
-rw-r--r-- | src/mongo/db/sessions_collection.cpp | 122 | ||||
-rw-r--r-- | src/mongo/db/sessions_collection.h | 13 | ||||
-rw-r--r-- | src/mongo/db/sessions_collection_rs.cpp | 170 | ||||
-rw-r--r-- | src/mongo/db/sessions_collection_rs.h | 77 | ||||
-rw-r--r-- | src/mongo/db/sessions_collection_standalone.cpp | 19 | ||||
-rw-r--r-- | src/mongo/db/sessions_collection_standalone.h | 3 |
12 files changed, 418 insertions, 69 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 8a07cca985f..e166bea6c81 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -979,6 +979,8 @@ env.Library( ], LIBDEPS=[ '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/client/clientdriver', + '$BUILD_DIR/mongo/s/write_ops/batch_write_types', 'logical_session_id', ], ) @@ -996,6 +998,22 @@ env.Library( ) env.Library( + target='sessions_collection_rs', + source=[ + 'sessions_collection_rs.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/client/remote_command_targeter', + '$BUILD_DIR/mongo/db/auth/authcommon', + '$BUILD_DIR/mongo/db/concurrency/lock_manager', + '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface', + 'dbdirectclient', + 'sessions_collection', + ], +) + +env.Library( target='sessions_collection_standalone', source=[ 'sessions_collection_standalone.cpp', @@ -1051,7 +1069,8 @@ envWithAsio.Library( LIBDEPS=[ 'logical_session_cache', 'service_liason_mongod', - 'sessions_collection_mock', # TODO SERVER-29202, SERVER-29203 + 'sessions_collection_mock', # TODO SERVER-29203 + 'sessions_collection_rs', 'sessions_collection_standalone', ], ) diff --git a/src/mongo/db/commands/refresh_logical_session_cache_now.cpp b/src/mongo/db/commands/refresh_logical_session_cache_now.cpp index dad267f9b16..f1b61473cda 100644 --- a/src/mongo/db/commands/refresh_logical_session_cache_now.cpp +++ b/src/mongo/db/commands/refresh_logical_session_cache_now.cpp @@ -74,7 +74,10 @@ public: auto cache = LogicalSessionCache::get(opCtx); auto client = opCtx->getClient(); - cache->refreshNow(client); + auto res = cache->refreshNow(client); + if (!res.isOK()) { + return appendCommandStatus(result, res); + } return true; } diff --git a/src/mongo/db/logical_session_cache.cpp b/src/mongo/db/logical_session_cache.cpp index 1631842fb60..d76c8040325 100644 --- a/src/mongo/db/logical_session_cache.cpp +++ b/src/mongo/db/logical_session_cache.cpp @@ -88,7 +88,7 @@ LogicalSessionCache::LogicalSessionCache(std::unique_ptr<ServiceLiason> service, _service(std::move(service)), _sessionsColl(std::move(collection)), _cache(options.capacity) { - PeriodicRunner::PeriodicJob job{[this](Client* client) { _refresh(client); }, + PeriodicRunner::PeriodicJob job{[this](Client* client) { _periodicRefresh(client); }, duration_cast<Milliseconds>(_refreshInterval)}; _service->scheduleJob(std::move(job)); } @@ -182,7 +182,7 @@ Status LogicalSessionCache::refreshSessions(OperationContext* opCtx, return _sessionsColl->refreshSessions(opCtx, toRefresh, now()); } -void LogicalSessionCache::refreshNow(Client* client) { +Status LogicalSessionCache::refreshNow(Client* client) { return _refresh(client); } @@ -195,7 +195,16 @@ size_t LogicalSessionCache::size() { return _cache.size(); } -void LogicalSessionCache::_refresh(Client* client) { +void LogicalSessionCache::_periodicRefresh(Client* client) { + auto res = _refresh(client); + if (!res.isOK()) { + log() << "Failed to refresh session cache: " << res; + } + + return; +} + +Status LogicalSessionCache::_refresh(Client* client) { LogicalSessionRecordSet activeSessions; LogicalSessionRecordSet deadSessions; @@ -267,7 +276,7 @@ void LogicalSessionCache::_refresh(Client* client) { auto res = _sessionsColl->refreshSessions(opCtx, std::move(activeSessions), time); if (!res.isOK()) { // TODO SERVER-29709: handle network errors here. - return; + return res; } } @@ -279,6 +288,8 @@ void LogicalSessionCache::_refresh(Client* client) { { // TODO SERVER-29709: handle expiration separately from failure to refresh. } + + return Status::OK(); } bool LogicalSessionCache::_isDead(const LogicalSessionRecord& record, Date_t now) const { diff --git a/src/mongo/db/logical_session_cache.h b/src/mongo/db/logical_session_cache.h index 0279fe0b7b6..0c903482a47 100644 --- a/src/mongo/db/logical_session_cache.h +++ b/src/mongo/db/logical_session_cache.h @@ -152,7 +152,7 @@ public: * Refreshes the cache synchronously. This flushes all pending refreshes and * inserts to the sessions collection. */ - void refreshNow(Client* client); + Status refreshNow(Client* client); /** * Returns the current time. @@ -169,7 +169,8 @@ private: * Internal methods to handle scheduling and perform refreshes for active * session records contained within the cache. */ - void _refresh(Client* client); + void _periodicRefresh(Client* client); + Status _refresh(Client* client); /** * Returns true if a record has passed its given expiration. diff --git a/src/mongo/db/logical_session_cache_factory_mongod.cpp b/src/mongo/db/logical_session_cache_factory_mongod.cpp index 883e9a3da62..52fc1ed1ce3 100644 --- a/src/mongo/db/logical_session_cache_factory_mongod.cpp +++ b/src/mongo/db/logical_session_cache_factory_mongod.cpp @@ -34,6 +34,7 @@ #include "mongo/db/service_liason_mongod.h" #include "mongo/db/sessions_collection_mock.h" +#include "mongo/db/sessions_collection_rs.h" #include "mongo/db/sessions_collection_standalone.h" #include "mongo/stdx/memory.h" @@ -48,9 +49,7 @@ std::unique_ptr<SessionsCollection> makeSessionsCollection(LogicalSessionCacheSe return stdx::make_unique<MockSessionsCollection>( std::make_shared<MockSessionsCollectionImpl>()); case LogicalSessionCacheServer::kReplicaSet: - // TODO SERVER-29202, replace with SessionsCollectionRS - return stdx::make_unique<MockSessionsCollection>( - std::make_shared<MockSessionsCollectionImpl>()); + return stdx::make_unique<SessionsCollectionRS>(); case LogicalSessionCacheServer::kStandalone: return stdx::make_unique<SessionsCollectionStandalone>(); default: diff --git a/src/mongo/db/logical_session_cache_test.cpp b/src/mongo/db/logical_session_cache_test.cpp index bf4a07e46c9..5fd4ad17bee 100644 --- a/src/mongo/db/logical_session_cache_test.cpp +++ b/src/mongo/db/logical_session_cache_test.cpp @@ -177,7 +177,7 @@ TEST_F(LogicalSessionCacheTest, FetchUpdatesLastUse) { auto start = service()->now(); // Insert the record into the sessions collection with 'start' - sessions()->add(makeLogicalSessionRecord(lsid, start)); + ASSERT(cache()->startSession(opCtx(), makeLogicalSessionRecord(lsid, start)).isOK()); // Fast forward time and fetch service()->fastForward(Milliseconds(500)); @@ -225,7 +225,7 @@ TEST_F(LogicalSessionCacheTest, StartSession) { // Do refresh, cached records should get flushed to collection. clearOpCtx(); - cache()->refreshNow(client()); + ASSERT(cache()->refreshNow(client()).isOK()); ASSERT(sessions()->has(lsid)); // Try to start the same session again, should succeed. @@ -269,7 +269,7 @@ TEST_F(LogicalSessionCacheTest, CacheRefreshesOwnRecords) { // Wait for the refresh to happen clearOpCtx(); service()->fastForward(kForceRefresh); - cache()->refreshNow(client()); + ASSERT(cache()->refreshNow(client()).isOK()); refreshFuture.wait(); ASSERT_EQ(refreshFuture.get(), 2); @@ -294,7 +294,7 @@ TEST_F(LogicalSessionCacheTest, CacheRefreshesOwnRecords) { clearOpCtx(); service()->fastForward(kSessionTimeout - kForceRefresh + Milliseconds(1)); - cache()->refreshNow(client()); + ASSERT(cache()->refreshNow(client()).isOK()); refresh2Future.wait(); ASSERT_EQ(refresh2Future.get(), record1.getId()); } @@ -311,7 +311,7 @@ TEST_F(LogicalSessionCacheTest, BasicSessionExpiration) { service()->fastForward(Milliseconds(kSessionTimeout.count() + 5)); // Check that it is no longer in the cache - cache()->refreshNow(client()); + ASSERT(cache()->refreshNow(client()).isOK()); res = cache()->promote(record.getId()); // TODO SERVER-29709 // ASSERT(!res.isOK()); @@ -337,17 +337,17 @@ TEST_F(LogicalSessionCacheTest, LongRunningQueriesAreRefreshed) { // Force a refresh, it should refresh our active session service()->fastForward(kForceRefresh); - cache()->refreshNow(client()); + ASSERT(cache()->refreshNow(client()).isOK()); ASSERT_EQ(count, 1); // Force a session timeout, session is still on the service service()->fastForward(kSessionTimeout); - cache()->refreshNow(client()); + ASSERT(cache()->refreshNow(client()).isOK()); ASSERT_EQ(count, 2); // Force another refresh, check that it refreshes that active lsid again service()->fastForward(kForceRefresh); - cache()->refreshNow(client()); + ASSERT(cache()->refreshNow(client()).isOK()); ASSERT_EQ(count, 3); } @@ -368,7 +368,7 @@ TEST_F(LogicalSessionCacheTest, RefreshCachedAndServiceSignedLsidsTogether) { // Force a refresh clearOpCtx(); service()->fastForward(kForceRefresh); - cache()->refreshNow(client()); + ASSERT(cache()->refreshNow(client()).isOK()); } // Test large sets of cache-only session lsids @@ -388,7 +388,7 @@ TEST_F(LogicalSessionCacheTest, ManySignedLsidsInCacheRefresh) { // Force a refresh clearOpCtx(); service()->fastForward(kForceRefresh); - cache()->refreshNow(client()); + ASSERT(cache()->refreshNow(client()).isOK()); } // Test larger sets of service-only session lsids @@ -408,7 +408,7 @@ TEST_F(LogicalSessionCacheTest, ManyLongRunningSessionsRefresh) { // Force a refresh clearOpCtx(); service()->fastForward(kForceRefresh); - cache()->refreshNow(client()); + ASSERT(cache()->refreshNow(client()).isOK()); } // Test larger mixed sets of cache/service active sessions @@ -433,7 +433,7 @@ TEST_F(LogicalSessionCacheTest, ManySessionsRefreshComboDeluxe) { // Force a refresh clearOpCtx(); service()->fastForward(kForceRefresh); - cache()->refreshNow(client()); + ASSERT(cache()->refreshNow(client()).isOK()); ASSERT_EQ(nRefreshed, count * 2); // Remove all of the service sessions, should just refresh the cache entries @@ -445,14 +445,14 @@ TEST_F(LogicalSessionCacheTest, ManySessionsRefreshComboDeluxe) { // Force another refresh service()->fastForward(kForceRefresh); - cache()->refreshNow(client()); + ASSERT(cache()->refreshNow(client()).isOK()); // We should not have refreshed any sessions from the service, only the cache ASSERT_EQ(nRefreshed, count); // Force a third refresh service()->fastForward(kForceRefresh); - cache()->refreshNow(client()); + ASSERT(cache()->refreshNow(client()).isOK()); // Again, we should have only refreshed sessions from the cache ASSERT_EQ(nRefreshed, count); diff --git a/src/mongo/db/sessions_collection.cpp b/src/mongo/db/sessions_collection.cpp index 04f73281234..aa43c706ddb 100644 --- a/src/mongo/db/sessions_collection.cpp +++ b/src/mongo/db/sessions_collection.cpp @@ -31,10 +31,15 @@ #include "mongo/db/sessions_collection.h" #include <memory> +#include <vector> #include "mongo/bson/bsonobjbuilder.h" +#include "mongo/client/dbclientinterface.h" #include "mongo/db/logical_session_id.h" #include "mongo/db/ops/write_ops.h" +#include "mongo/db/refresh_sessions_gen.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/s/write_ops/batched_command_response.h" #include "mongo/stdx/functional.h" #include "mongo/stdx/memory.h" @@ -69,34 +74,24 @@ BSONObj updateQuery(const LogicalSessionRecord& record, Date_t refreshTime) { return updateBuilder.obj(); } -template <typename InitBatchFn, typename AddLineFn, typename SendBatchFn, typename Container> -Status runBulkCmd(StringData label, - InitBatchFn&& initBatch, - AddLineFn&& addLine, - SendBatchFn&& sendBatch, - const Container& items) { - size_t i = 0; - BufBuilder buf; +template <typename TFactory, typename AddLineFn, typename SendFn, typename Container> +Status runBulkGeneric(TFactory makeT, AddLineFn addLine, SendFn sendBatch, const Container& items) { + using T = decltype(makeT()); - boost::optional<BSONObjBuilder> batchBuilder; - boost::optional<BSONArrayBuilder> entries; + size_t i = 0; + boost::optional<T> thing; - auto setupBatchBuilder = [&] { - buf.reset(); - batchBuilder.emplace(buf); - initBatch(&(batchBuilder.get())); - entries.emplace(batchBuilder->subarrayStart(label)); + auto setupBatch = [&] { + i = 0; + thing.emplace(makeT()); }; - auto sendLocalBatch = [&] { - entries->done(); - return sendBatch(batchBuilder->done()); - }; + auto sendLocalBatch = [&] { return sendBatch(thing.value()); }; - setupBatchBuilder(); + setupBatch(); for (const auto& item : items) { - addLine(&(entries.get()), item); + addLine(*thing, item); if (++i >= write_ops::kMaxWriteBatchSize) { auto res = sendLocalBatch(); @@ -104,8 +99,7 @@ Status runBulkCmd(StringData label, return res; } - setupBatchBuilder(); - i = 0; + setupBatch(); } } @@ -116,6 +110,34 @@ Status runBulkCmd(StringData label, } } +template <typename InitBatchFn, typename AddLineFn, typename SendBatchFn, typename Container> +Status runBulkCmd(StringData label, + InitBatchFn&& initBatch, + AddLineFn&& addLine, + SendBatchFn&& sendBatch, + const Container& items) { + BufBuilder buf; + + boost::optional<BSONObjBuilder> batchBuilder; + boost::optional<BSONArrayBuilder> entries; + + auto makeBatch = [&] { + buf.reset(); + batchBuilder.emplace(buf); + initBatch(&(batchBuilder.get())); + entries.emplace(batchBuilder->subarrayStart(label)); + + return &(entries.get()); + }; + + auto sendLocalBatch = [&](BSONArrayBuilder*) { + entries->done(); + return sendBatch(batchBuilder->done()); + }; + + return runBulkGeneric(makeBatch, addLine, sendLocalBatch, items); +} + } // namespace @@ -126,6 +148,38 @@ constexpr StringData SessionsCollection::kSessionsFullNS; SessionsCollection::~SessionsCollection() = default; +SessionsCollection::SendBatchFn SessionsCollection::makeSendFnForBatchWrite(DBClientBase* client) { + auto send = [client](BSONObj batch) -> Status { + BSONObj res; + if (!client->runCommand(SessionsCollection::kSessionsDb.toString(), batch, res)) { + return getStatusFromCommandResult(res); + } + + BatchedCommandResponse response; + std::string errmsg; + if (!response.parseBSON(res, &errmsg)) { + return {ErrorCodes::FailedToParse, errmsg}; + } + + return response.toStatus(); + }; + + return send; +} + +SessionsCollection::SendBatchFn SessionsCollection::makeSendFnForCommand(DBClientBase* client) { + auto send = [client](BSONObj cmd) -> Status { + BSONObj res; + if (!client->runCommand(SessionsCollection::kSessionsDb.toString(), cmd, res)) { + return getStatusFromCommandResult(res); + } + + return Status::OK(); + }; + + return send; +} + Status SessionsCollection::doRefresh(const LogicalSessionRecordSet& sessions, Date_t refreshTime, SendBatchFn send) { @@ -143,6 +197,23 @@ Status SessionsCollection::doRefresh(const LogicalSessionRecordSet& sessions, return runBulkCmd("updates", init, add, send, sessions); } +Status SessionsCollection::doRefreshExternal(const LogicalSessionRecordSet& sessions, + Date_t refreshTime, + SendBatchFn send) { + auto makeT = [] { return std::vector<LogicalSessionRecord>{}; }; + + auto add = [&refreshTime](std::vector<LogicalSessionRecord>& batch, + const LogicalSessionRecord& record) { batch.push_back(record); }; + + auto sendLocal = [&](std::vector<LogicalSessionRecord>& batch) { + RefreshSessionsCmdFromClusterMember idl; + idl.setRefreshSessionsInternal(batch); + return send(idl.toBSON()); + }; + + return runBulkGeneric(makeT, add, sendLocal, sessions); +} + Status SessionsCollection::doRemove(const LogicalSessionIdSet& sessions, SendBatchFn send) { auto init = [](BSONObjBuilder* batch) { batch->append("delete", kSessionsCollection); @@ -156,4 +227,9 @@ Status SessionsCollection::doRemove(const LogicalSessionIdSet& sessions, SendBat return runBulkCmd("deletes", init, add, send, sessions); } +Status SessionsCollection::doRemoveExternal(const LogicalSessionIdSet& sessions, SendBatchFn send) { + // TODO SERVER-28335 Implement endSessions, with internal counterpart. + return Status::OK(); +} + } // namespace mongo diff --git a/src/mongo/db/sessions_collection.h b/src/mongo/db/sessions_collection.h index 128898e63de..79a9b23535f 100644 --- a/src/mongo/db/sessions_collection.h +++ b/src/mongo/db/sessions_collection.h @@ -35,15 +35,17 @@ namespace mongo { class BSONArrayBuilder; class BSONObjBuilder; +class DBClientBase; class OperationContext; /** * An abstract interface describing the entrypoint into the sessions collection. * * Different server deployments (standalone, replica set, sharded cluster) should - * implement their own class that fulfill this interface. + * implement their own classes that fulfill this interface. */ class SessionsCollection { + public: virtual ~SessionsCollection(); @@ -77,17 +79,26 @@ public: virtual Status removeRecords(OperationContext* opCtx, const LogicalSessionIdSet& sessions) = 0; protected: + /** + * Makes a send function for the given client. + */ using SendBatchFn = stdx::function<Status(BSONObj batch)>; + SendBatchFn makeSendFnForCommand(DBClientBase* client); + SendBatchFn makeSendFnForBatchWrite(DBClientBase* client); /** * Formats and sends batches of refreshes for the given set of sessions. */ Status doRefresh(const LogicalSessionRecordSet& sessions, Date_t refreshTime, SendBatchFn send); + Status doRefreshExternal(const LogicalSessionRecordSet& sessions, + Date_t refreshTime, + SendBatchFn send); /** * Formats and sends batches of deletes for the given set of sessions. */ Status doRemove(const LogicalSessionIdSet& sessions, SendBatchFn send); + Status doRemoveExternal(const LogicalSessionIdSet& sessions, SendBatchFn send); }; } // namespace mongo diff --git a/src/mongo/db/sessions_collection_rs.cpp b/src/mongo/db/sessions_collection_rs.cpp new file mode 100644 index 00000000000..58dc3bd0ad0 --- /dev/null +++ b/src/mongo/db/sessions_collection_rs.cpp @@ -0,0 +1,170 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/sessions_collection_rs.h" + +#include <boost/optional.hpp> +#include <utility> + +#include "mongo/client/connection_string.h" +#include "mongo/client/dbclientinterface.h" +#include "mongo/client/query.h" +#include "mongo/client/read_preference.h" +#include "mongo/client/remote_command_targeter_factory_impl.h" +#include "mongo/db/auth/internal_user_auth.h" +#include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/repl/repl_set_config.h" +#include "mongo/db/repl/replication_coordinator.h" +#include "mongo/stdx/memory.h" + +namespace mongo { + +namespace { + +BSONObj lsidQuery(const LogicalSessionId& lsid) { + return BSON(LogicalSessionRecord::kIdFieldName << lsid.toBSON()); +} + +Status makePrimaryConnection(OperationContext* opCtx, boost::optional<ScopedDbConnection>* conn) { + auto coord = mongo::repl::ReplicationCoordinator::get(opCtx); + auto config = coord->getConfig(); + if (!config.isInitialized()) { + return {ErrorCodes::NotYetInitialized, "Replication has not yet been configured"}; + } + + // Find the primary + RemoteCommandTargeterFactoryImpl factory; + auto targeter = factory.create(config.getConnectionString()); + auto res = targeter->findHost(opCtx, ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + if (!res.isOK()) { + return res.getStatus(); + } + + auto hostname = res.getValue().toString(); + + // Make a connection to the primary, auth, then send + try { + conn->emplace(hostname); + if (isInternalAuthSet()) { + (*conn)->get()->auth(getInternalUserAuthParams()); + } + return Status::OK(); + } catch (...) { + return exceptionToStatus(); + } +} + +template <typename Callback> +Status runIfStandaloneOrPrimary(OperationContext* opCtx, Callback callback) { + Lock::DBLock lk(opCtx, SessionsCollection::kSessionsDb, MODE_IX); + Lock::CollectionLock lock(opCtx->lockState(), SessionsCollection::kSessionsFullNS, MODE_IX); + + auto coord = mongo::repl::ReplicationCoordinator::get(opCtx); + if (coord->canAcceptWritesForDatabase(opCtx, SessionsCollection::kSessionsDb)) { + return callback(); + } + + return {ErrorCodes::NotMaster, "Cannot perform a local write"}; +} + +} // namespace + +StatusWith<LogicalSessionRecord> SessionsCollectionRS::fetchRecord(OperationContext* opCtx, + const LogicalSessionId& lsid) { + + DBDirectClient client(opCtx); + auto cursor = client.query(kSessionsFullNS.toString(), lsidQuery(lsid), 1); + if (!cursor->more()) { + return {ErrorCodes::NoSuchSession, "No matching record in the sessions collection"}; + } + + try { + IDLParserErrorContext ctx("LogicalSessionRecord"); + return LogicalSessionRecord::parse(ctx, cursor->next()); + } catch (...) { + return exceptionToStatus(); + } +} + +Status SessionsCollectionRS::refreshSessions(OperationContext* opCtx, + const LogicalSessionRecordSet& sessions, + Date_t refreshTime) { + bool ran = false; + + // If we are the primary, write directly to ourself. + auto status = runIfStandaloneOrPrimary(opCtx, [&] { + ran = true; + DBDirectClient client(opCtx); + return doRefresh(sessions, refreshTime, makeSendFnForBatchWrite(&client)); + }); + + if (ran) { + return status; + } + + // If we are not writeable, then send refreshSessions cmd to the primary. + boost::optional<ScopedDbConnection> conn; + auto res = makePrimaryConnection(opCtx, &conn); + if (!res.isOK()) { + return res; + } + + return doRefreshExternal(sessions, refreshTime, makeSendFnForCommand(conn->get())); +} + +Status SessionsCollectionRS::removeRecords(OperationContext* opCtx, + const LogicalSessionIdSet& sessions) { + bool ran = false; + + // If we are the primary, write directly to ourself. + auto status = runIfStandaloneOrPrimary(opCtx, [&] { + ran = true; + DBDirectClient client(opCtx); + return doRemove(sessions, makeSendFnForBatchWrite(&client)); + }); + + if (ran) { + return status; + } + + // If we are not writeable, then send endSessions cmd to the primary + boost::optional<ScopedDbConnection> conn; + auto res = makePrimaryConnection(opCtx, &conn); + if (!res.isOK()) { + return res; + } + + return doRemoveExternal(sessions, makeSendFnForCommand(conn->get())); +} + + +} // namespace mongo diff --git a/src/mongo/db/sessions_collection_rs.h b/src/mongo/db/sessions_collection_rs.h new file mode 100644 index 00000000000..7c08ae2121c --- /dev/null +++ b/src/mongo/db/sessions_collection_rs.h @@ -0,0 +1,77 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <memory> + +#include "mongo/client/connection_string.h" +#include "mongo/client/connpool.h" +#include "mongo/client/remote_command_targeter.h" +#include "mongo/db/logical_session_id.h" +#include "mongo/db/sessions_collection.h" +#include "mongo/util/time_support.h" + +namespace mongo { + +class DBDirectClient; +class OperationContext; +class RemoteCommandTargeter; + +/** + * Accesses the sessions collection for replica set members. + */ +class SessionsCollectionRS : public SessionsCollection { +public: + /** + * Constructs a new SessionsCollectionRS. + */ + SessionsCollectionRS() = default; + + /** + * Returns a LogicalSessionRecord for the given session id, or an error if + * no such record was found. + */ + StatusWith<LogicalSessionRecord> fetchRecord(OperationContext* opCtx, + const LogicalSessionId& lsid) override; + + /** + * Updates the last-use times on the given sessions to be greater than + * or equal to the current time. + */ + Status refreshSessions(OperationContext* opCtx, + const LogicalSessionRecordSet& sessions, + Date_t refreshTime) override; + + /** + * Removes the authoritative records for the specified sessions. + */ + Status removeRecords(OperationContext* opCtx, const LogicalSessionIdSet& sessions) override; +}; + +} // namespace mongo diff --git a/src/mongo/db/sessions_collection_standalone.cpp b/src/mongo/db/sessions_collection_standalone.cpp index 093b61ef959..bbff1db1757 100644 --- a/src/mongo/db/sessions_collection_standalone.cpp +++ b/src/mongo/db/sessions_collection_standalone.cpp @@ -64,28 +64,13 @@ Status SessionsCollectionStandalone::refreshSessions(OperationContext* opCtx, const LogicalSessionRecordSet& sessions, Date_t refreshTime) { DBDirectClient client(opCtx); - return doRefresh(sessions, refreshTime, makeSendFn(&client)); + return doRefresh(sessions, refreshTime, makeSendFnForBatchWrite(&client)); } Status SessionsCollectionStandalone::removeRecords(OperationContext* opCtx, const LogicalSessionIdSet& sessions) { DBDirectClient client(opCtx); - return doRemove(sessions, makeSendFn(&client)); + return doRemove(sessions, makeSendFnForBatchWrite(&client)); } -SessionsCollection::SendBatchFn SessionsCollectionStandalone::makeSendFn(DBDirectClient* client) { - auto send = [client](BSONObj batch) -> Status { - BSONObj res; - auto ok = client->runCommand(SessionsCollection::kSessionsDb.toString(), batch, res); - if (!ok) { - return {ErrorCodes::UnknownError, - client->getLastError(SessionsCollection::kSessionsDb.toString())}; - } - return Status::OK(); - }; - - return send; -} - - } // namespace mongo diff --git a/src/mongo/db/sessions_collection_standalone.h b/src/mongo/db/sessions_collection_standalone.h index 0fee4cbd164..949d7ec2e0a 100644 --- a/src/mongo/db/sessions_collection_standalone.h +++ b/src/mongo/db/sessions_collection_standalone.h @@ -61,9 +61,6 @@ public: * Removes the authoritative records for the specified sessions. */ Status removeRecords(OperationContext* opCtx, const LogicalSessionIdSet& sessions) override; - -private: - SessionsCollection::SendBatchFn makeSendFn(DBDirectClient* client); }; } // namespace mongo |