diff options
author | samantharitter <samantha.ritter@10gen.com> | 2017-08-15 16:17:37 -0400 |
---|---|---|
committer | samantharitter <samantha.ritter@10gen.com> | 2017-08-18 08:49:39 -0400 |
commit | 854cc3ca62115c0296e27c75ff017a11614254c6 (patch) | |
tree | a68e64b99d5fb4ece3c4562aa5463d42cd02c66c /src/mongo/db/sessions_collection.cpp | |
parent | 583127818f1ead21b67a57eb117b9678232e5472 (diff) | |
download | mongo-854cc3ca62115c0296e27c75ff017a11614254c6.tar.gz |
SERVER-29202 Implement SessionsCollectionRS
Diffstat (limited to 'src/mongo/db/sessions_collection.cpp')
-rw-r--r-- | src/mongo/db/sessions_collection.cpp | 122 |
1 files changed, 99 insertions, 23 deletions
diff --git a/src/mongo/db/sessions_collection.cpp b/src/mongo/db/sessions_collection.cpp index 04f73281234..aa43c706ddb 100644 --- a/src/mongo/db/sessions_collection.cpp +++ b/src/mongo/db/sessions_collection.cpp @@ -31,10 +31,15 @@ #include "mongo/db/sessions_collection.h" #include <memory> +#include <vector> #include "mongo/bson/bsonobjbuilder.h" +#include "mongo/client/dbclientinterface.h" #include "mongo/db/logical_session_id.h" #include "mongo/db/ops/write_ops.h" +#include "mongo/db/refresh_sessions_gen.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/s/write_ops/batched_command_response.h" #include "mongo/stdx/functional.h" #include "mongo/stdx/memory.h" @@ -69,34 +74,24 @@ BSONObj updateQuery(const LogicalSessionRecord& record, Date_t refreshTime) { return updateBuilder.obj(); } -template <typename InitBatchFn, typename AddLineFn, typename SendBatchFn, typename Container> -Status runBulkCmd(StringData label, - InitBatchFn&& initBatch, - AddLineFn&& addLine, - SendBatchFn&& sendBatch, - const Container& items) { - size_t i = 0; - BufBuilder buf; +template <typename TFactory, typename AddLineFn, typename SendFn, typename Container> +Status runBulkGeneric(TFactory makeT, AddLineFn addLine, SendFn sendBatch, const Container& items) { + using T = decltype(makeT()); - boost::optional<BSONObjBuilder> batchBuilder; - boost::optional<BSONArrayBuilder> entries; + size_t i = 0; + boost::optional<T> thing; - auto setupBatchBuilder = [&] { - buf.reset(); - batchBuilder.emplace(buf); - initBatch(&(batchBuilder.get())); - entries.emplace(batchBuilder->subarrayStart(label)); + auto setupBatch = [&] { + i = 0; + thing.emplace(makeT()); }; - auto sendLocalBatch = [&] { - entries->done(); - return sendBatch(batchBuilder->done()); - }; + auto sendLocalBatch = [&] { return sendBatch(thing.value()); }; - setupBatchBuilder(); + setupBatch(); for (const auto& item : items) { - addLine(&(entries.get()), item); + addLine(*thing, item); if (++i >= write_ops::kMaxWriteBatchSize) { auto res = sendLocalBatch(); @@ -104,8 +99,7 @@ Status runBulkCmd(StringData label, return res; } - setupBatchBuilder(); - i = 0; + setupBatch(); } } @@ -116,6 +110,34 @@ Status runBulkCmd(StringData label, } } +template <typename InitBatchFn, typename AddLineFn, typename SendBatchFn, typename Container> +Status runBulkCmd(StringData label, + InitBatchFn&& initBatch, + AddLineFn&& addLine, + SendBatchFn&& sendBatch, + const Container& items) { + BufBuilder buf; + + boost::optional<BSONObjBuilder> batchBuilder; + boost::optional<BSONArrayBuilder> entries; + + auto makeBatch = [&] { + buf.reset(); + batchBuilder.emplace(buf); + initBatch(&(batchBuilder.get())); + entries.emplace(batchBuilder->subarrayStart(label)); + + return &(entries.get()); + }; + + auto sendLocalBatch = [&](BSONArrayBuilder*) { + entries->done(); + return sendBatch(batchBuilder->done()); + }; + + return runBulkGeneric(makeBatch, addLine, sendLocalBatch, items); +} + } // namespace @@ -126,6 +148,38 @@ constexpr StringData SessionsCollection::kSessionsFullNS; SessionsCollection::~SessionsCollection() = default; +SessionsCollection::SendBatchFn SessionsCollection::makeSendFnForBatchWrite(DBClientBase* client) { + auto send = [client](BSONObj batch) -> Status { + BSONObj res; + if (!client->runCommand(SessionsCollection::kSessionsDb.toString(), batch, res)) { + return getStatusFromCommandResult(res); + } + + BatchedCommandResponse response; + std::string errmsg; + if (!response.parseBSON(res, &errmsg)) { + return {ErrorCodes::FailedToParse, errmsg}; + } + + return response.toStatus(); + }; + + return send; +} + +SessionsCollection::SendBatchFn SessionsCollection::makeSendFnForCommand(DBClientBase* client) { + auto send = [client](BSONObj cmd) -> Status { + BSONObj res; + if (!client->runCommand(SessionsCollection::kSessionsDb.toString(), cmd, res)) { + return getStatusFromCommandResult(res); + } + + return Status::OK(); + }; + + return send; +} + Status SessionsCollection::doRefresh(const LogicalSessionRecordSet& sessions, Date_t refreshTime, SendBatchFn send) { @@ -143,6 +197,23 @@ Status SessionsCollection::doRefresh(const LogicalSessionRecordSet& sessions, return runBulkCmd("updates", init, add, send, sessions); } +Status SessionsCollection::doRefreshExternal(const LogicalSessionRecordSet& sessions, + Date_t refreshTime, + SendBatchFn send) { + auto makeT = [] { return std::vector<LogicalSessionRecord>{}; }; + + auto add = [&refreshTime](std::vector<LogicalSessionRecord>& batch, + const LogicalSessionRecord& record) { batch.push_back(record); }; + + auto sendLocal = [&](std::vector<LogicalSessionRecord>& batch) { + RefreshSessionsCmdFromClusterMember idl; + idl.setRefreshSessionsInternal(batch); + return send(idl.toBSON()); + }; + + return runBulkGeneric(makeT, add, sendLocal, sessions); +} + Status SessionsCollection::doRemove(const LogicalSessionIdSet& sessions, SendBatchFn send) { auto init = [](BSONObjBuilder* batch) { batch->append("delete", kSessionsCollection); @@ -156,4 +227,9 @@ Status SessionsCollection::doRemove(const LogicalSessionIdSet& sessions, SendBat return runBulkCmd("deletes", init, add, send, sessions); } +Status SessionsCollection::doRemoveExternal(const LogicalSessionIdSet& sessions, SendBatchFn send) { + // TODO SERVER-28335 Implement endSessions, with internal counterpart. + return Status::OK(); +} + } // namespace mongo |