summaryrefslogtreecommitdiff
path: root/src/mongo/db/sessions_collection.cpp
diff options
context:
space:
mode:
authorsamantharitter <samantha.ritter@10gen.com>2017-08-15 16:17:37 -0400
committersamantharitter <samantha.ritter@10gen.com>2017-08-18 08:49:39 -0400
commit854cc3ca62115c0296e27c75ff017a11614254c6 (patch)
treea68e64b99d5fb4ece3c4562aa5463d42cd02c66c /src/mongo/db/sessions_collection.cpp
parent583127818f1ead21b67a57eb117b9678232e5472 (diff)
downloadmongo-854cc3ca62115c0296e27c75ff017a11614254c6.tar.gz
SERVER-29202 Implement SessionsCollectionRS
Diffstat (limited to 'src/mongo/db/sessions_collection.cpp')
-rw-r--r--src/mongo/db/sessions_collection.cpp122
1 files changed, 99 insertions, 23 deletions
diff --git a/src/mongo/db/sessions_collection.cpp b/src/mongo/db/sessions_collection.cpp
index 04f73281234..aa43c706ddb 100644
--- a/src/mongo/db/sessions_collection.cpp
+++ b/src/mongo/db/sessions_collection.cpp
@@ -31,10 +31,15 @@
#include "mongo/db/sessions_collection.h"
#include <memory>
+#include <vector>
#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/client/dbclientinterface.h"
#include "mongo/db/logical_session_id.h"
#include "mongo/db/ops/write_ops.h"
+#include "mongo/db/refresh_sessions_gen.h"
+#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/s/write_ops/batched_command_response.h"
#include "mongo/stdx/functional.h"
#include "mongo/stdx/memory.h"
@@ -69,34 +74,24 @@ BSONObj updateQuery(const LogicalSessionRecord& record, Date_t refreshTime) {
return updateBuilder.obj();
}
-template <typename InitBatchFn, typename AddLineFn, typename SendBatchFn, typename Container>
-Status runBulkCmd(StringData label,
- InitBatchFn&& initBatch,
- AddLineFn&& addLine,
- SendBatchFn&& sendBatch,
- const Container& items) {
- size_t i = 0;
- BufBuilder buf;
+template <typename TFactory, typename AddLineFn, typename SendFn, typename Container>
+Status runBulkGeneric(TFactory makeT, AddLineFn addLine, SendFn sendBatch, const Container& items) {
+ using T = decltype(makeT());
- boost::optional<BSONObjBuilder> batchBuilder;
- boost::optional<BSONArrayBuilder> entries;
+ size_t i = 0;
+ boost::optional<T> thing;
- auto setupBatchBuilder = [&] {
- buf.reset();
- batchBuilder.emplace(buf);
- initBatch(&(batchBuilder.get()));
- entries.emplace(batchBuilder->subarrayStart(label));
+ auto setupBatch = [&] {
+ i = 0;
+ thing.emplace(makeT());
};
- auto sendLocalBatch = [&] {
- entries->done();
- return sendBatch(batchBuilder->done());
- };
+ auto sendLocalBatch = [&] { return sendBatch(thing.value()); };
- setupBatchBuilder();
+ setupBatch();
for (const auto& item : items) {
- addLine(&(entries.get()), item);
+ addLine(*thing, item);
if (++i >= write_ops::kMaxWriteBatchSize) {
auto res = sendLocalBatch();
@@ -104,8 +99,7 @@ Status runBulkCmd(StringData label,
return res;
}
- setupBatchBuilder();
- i = 0;
+ setupBatch();
}
}
@@ -116,6 +110,34 @@ Status runBulkCmd(StringData label,
}
}
+template <typename InitBatchFn, typename AddLineFn, typename SendBatchFn, typename Container>
+Status runBulkCmd(StringData label,
+ InitBatchFn&& initBatch,
+ AddLineFn&& addLine,
+ SendBatchFn&& sendBatch,
+ const Container& items) {
+ BufBuilder buf;
+
+ boost::optional<BSONObjBuilder> batchBuilder;
+ boost::optional<BSONArrayBuilder> entries;
+
+ auto makeBatch = [&] {
+ buf.reset();
+ batchBuilder.emplace(buf);
+ initBatch(&(batchBuilder.get()));
+ entries.emplace(batchBuilder->subarrayStart(label));
+
+ return &(entries.get());
+ };
+
+ auto sendLocalBatch = [&](BSONArrayBuilder*) {
+ entries->done();
+ return sendBatch(batchBuilder->done());
+ };
+
+ return runBulkGeneric(makeBatch, addLine, sendLocalBatch, items);
+}
+
} // namespace
@@ -126,6 +148,38 @@ constexpr StringData SessionsCollection::kSessionsFullNS;
SessionsCollection::~SessionsCollection() = default;
+SessionsCollection::SendBatchFn SessionsCollection::makeSendFnForBatchWrite(DBClientBase* client) {
+ auto send = [client](BSONObj batch) -> Status {
+ BSONObj res;
+ if (!client->runCommand(SessionsCollection::kSessionsDb.toString(), batch, res)) {
+ return getStatusFromCommandResult(res);
+ }
+
+ BatchedCommandResponse response;
+ std::string errmsg;
+ if (!response.parseBSON(res, &errmsg)) {
+ return {ErrorCodes::FailedToParse, errmsg};
+ }
+
+ return response.toStatus();
+ };
+
+ return send;
+}
+
+SessionsCollection::SendBatchFn SessionsCollection::makeSendFnForCommand(DBClientBase* client) {
+ auto send = [client](BSONObj cmd) -> Status {
+ BSONObj res;
+ if (!client->runCommand(SessionsCollection::kSessionsDb.toString(), cmd, res)) {
+ return getStatusFromCommandResult(res);
+ }
+
+ return Status::OK();
+ };
+
+ return send;
+}
+
Status SessionsCollection::doRefresh(const LogicalSessionRecordSet& sessions,
Date_t refreshTime,
SendBatchFn send) {
@@ -143,6 +197,23 @@ Status SessionsCollection::doRefresh(const LogicalSessionRecordSet& sessions,
return runBulkCmd("updates", init, add, send, sessions);
}
+Status SessionsCollection::doRefreshExternal(const LogicalSessionRecordSet& sessions,
+ Date_t refreshTime,
+ SendBatchFn send) {
+ auto makeT = [] { return std::vector<LogicalSessionRecord>{}; };
+
+ auto add = [&refreshTime](std::vector<LogicalSessionRecord>& batch,
+ const LogicalSessionRecord& record) { batch.push_back(record); };
+
+ auto sendLocal = [&](std::vector<LogicalSessionRecord>& batch) {
+ RefreshSessionsCmdFromClusterMember idl;
+ idl.setRefreshSessionsInternal(batch);
+ return send(idl.toBSON());
+ };
+
+ return runBulkGeneric(makeT, add, sendLocal, sessions);
+}
+
Status SessionsCollection::doRemove(const LogicalSessionIdSet& sessions, SendBatchFn send) {
auto init = [](BSONObjBuilder* batch) {
batch->append("delete", kSessionsCollection);
@@ -156,4 +227,9 @@ Status SessionsCollection::doRemove(const LogicalSessionIdSet& sessions, SendBat
return runBulkCmd("deletes", init, add, send, sessions);
}
+Status SessionsCollection::doRemoveExternal(const LogicalSessionIdSet& sessions, SendBatchFn send) {
+ // TODO SERVER-28335 Implement endSessions, with internal counterpart.
+ return Status::OK();
+}
+
} // namespace mongo