summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJustin Seyster <justin.seyster@mongodb.com>2017-11-06 14:03:50 -0500
committerJustin Seyster <justin.seyster@mongodb.com>2017-11-06 14:26:42 -0500
commitdf03dc67fa491b944dd1e6c366f4a19cf6b38032 (patch)
tree08b8479c4c19ecd2553526d8b84f9ab8ae13b58c
parent3eb5a53f7d4ad457e21c2ba0a1497c41724bdb8b (diff)
downloadmongo-df03dc67fa491b944dd1e6c366f4a19cf6b38032.tar.gz
SERVER-31648 Perform sets/unsets to session tags atomically.
As described in the SERVER ticket, there is a window in CmdIsMaster::run() during which there is no flag set to prevent a session from being closed by an FCV update (even when the session is one that should stay open through the upgrade). This change closes that window. As part of the fix, this change also replaces the replaceTags() function with functions that can atomically update the session tags.
-rw-r--r--src/mongo/db/repl/repl_set_commands.cpp8
-rw-r--r--src/mongo/db/repl/repl_set_request_votes.cpp8
-rw-r--r--src/mongo/db/repl/replication_info.cpp53
-rw-r--r--src/mongo/transport/session.cpp21
-rw-r--r--src/mongo/transport/session.h29
5 files changed, 78 insertions, 41 deletions
diff --git a/src/mongo/db/repl/repl_set_commands.cpp b/src/mongo/db/repl/repl_set_commands.cpp
index fa5b43b5457..76873a1dd9b 100644
--- a/src/mongo/db/repl/repl_set_commands.cpp
+++ b/src/mongo/db/repl/repl_set_commands.cpp
@@ -744,17 +744,15 @@ public:
/* we want to keep heartbeat connections open when relinquishing primary.
tag them here. */
- transport::Session::TagMask originalTag = 0;
auto session = opCtx->getClient()->session();
if (session) {
- originalTag = session->getTags();
- session->replaceTags(originalTag | transport::Session::kKeepOpen);
+ session->setTags(transport::Session::kKeepOpen);
}
// Unset the tag on block exit
- ON_BLOCK_EXIT([session, originalTag]() {
+ ON_BLOCK_EXIT([session]() {
if (session) {
- session->replaceTags(originalTag);
+ session->unsetTags(transport::Session::kKeepOpen);
}
});
diff --git a/src/mongo/db/repl/repl_set_request_votes.cpp b/src/mongo/db/repl/repl_set_request_votes.cpp
index 57bf4a9a537..e0c28def452 100644
--- a/src/mongo/db/repl/repl_set_request_votes.cpp
+++ b/src/mongo/db/repl/repl_set_request_votes.cpp
@@ -64,17 +64,15 @@ private:
// We want to keep request vote connection open when relinquishing primary.
// Tag it here.
- transport::Session::TagMask originalTag = 0;
auto session = opCtx->getClient()->session();
if (session) {
- originalTag = session->getTags();
- session->replaceTags(originalTag | transport::Session::kKeepOpen);
+ session->setTags(transport::Session::kKeepOpen);
}
// Untag the connection on exit.
- ON_BLOCK_EXIT([session, originalTag]() {
+ ON_BLOCK_EXIT([session]() {
if (session) {
- session->replaceTags(originalTag);
+ session->unsetTags(transport::Session::kKeepOpen);
}
});
diff --git a/src/mongo/db/repl/replication_info.cpp b/src/mongo/db/repl/replication_info.cpp
index 5d116d1fe2e..192d7c67b22 100644
--- a/src/mongo/db/repl/replication_info.cpp
+++ b/src/mongo/db/repl/replication_info.cpp
@@ -243,14 +243,13 @@ public:
LastError::get(opCtx->getClient()).disable();
}
+ transport::Session::TagMask sessionTagsToSet = 0;
+ transport::Session::TagMask sessionTagsToUnset = 0;
+
// Tag connections to avoid closing them on stepdown.
auto hangUpElement = cmdObj["hangUpOnStepDown"];
if (!hangUpElement.eoo() && !hangUpElement.trueValue()) {
- auto session = opCtx->getClient()->session();
- if (session) {
- session->replaceTags(session->getTags() |
- executor::NetworkInterface::kMessagingPortKeepOpen);
- }
+ sessionTagsToSet |= transport::Session::kKeepOpen;
}
auto& clientMetadataIsMasterState = ClientMetadataIsMasterState::get(opCtx->getClient());
@@ -286,10 +285,7 @@ public:
// mongod and mongos.
auto internalClientElement = cmdObj["internalClient"];
if (internalClientElement) {
- auto session = opCtx->getClient()->session();
- if (session) {
- session->replaceTags(session->getTags() | transport::Session::kInternalClient);
- }
+ sessionTagsToSet |= transport::Session::kInternalClient;
uassert(ErrorCodes::TypeMismatch,
str::stream() << "'internalClient' must be of type Object, but was of type "
@@ -314,17 +310,11 @@ public:
// All incoming connections from mongod/mongos of earlier versions should be
// closed if the featureCompatibilityVersion is bumped to 3.6.
if (elem.numberInt() >= WireSpec::instance().incoming.maxWireVersion) {
- if (session) {
- session->replaceTags(
- session->getTags() |
- transport::Session::kLatestVersionInternalClientKeepOpen);
- }
+ sessionTagsToSet |=
+ transport::Session::kLatestVersionInternalClientKeepOpen;
} else {
- if (session) {
- session->replaceTags(
- session->getTags() &
- ~transport::Session::kLatestVersionInternalClientKeepOpen);
- }
+ sessionTagsToUnset |=
+ transport::Session::kLatestVersionInternalClientKeepOpen;
}
} else {
uasserted(ErrorCodes::BadValue,
@@ -338,11 +328,26 @@ public:
"Missing required field 'maxWireVersion' of 'internalClient'",
foundMaxWireVersion);
} else {
- auto session = opCtx->getClient()->session();
- if (session && !(session->getTags() & transport::Session::kInternalClient)) {
- session->replaceTags(session->getTags() |
- transport::Session::kExternalClientKeepOpen);
- }
+ sessionTagsToUnset |= (transport::Session::kInternalClient |
+ transport::Session::kLatestVersionInternalClientKeepOpen);
+ sessionTagsToSet |= transport::Session::kExternalClientKeepOpen;
+ }
+
+ auto session = opCtx->getClient()->session();
+ if (session) {
+ session->mutateTags(
+ [sessionTagsToSet, sessionTagsToUnset](transport::Session::TagMask originalTags) {
+ // After a mongos sends the initial "isMaster" command with its mongos client
+ // information, it sometimes sends another "isMaster" command that is forwarded
+ // from its client. Once kInternalClient has been set, we assume that any future
+ // "isMaster" commands are forwarded in this manner, and we do not update the
+ // session tags.
+ if ((originalTags & transport::Session::kInternalClient) == 0) {
+ return (originalTags | sessionTagsToSet) & ~sessionTagsToUnset;
+ } else {
+ return originalTags;
+ }
+ });
}
appendReplicationInfo(opCtx, result, 0);
diff --git a/src/mongo/transport/session.cpp b/src/mongo/transport/session.cpp
index 5cf18eeb035..ad2eceae039 100644
--- a/src/mongo/transport/session.cpp
+++ b/src/mongo/transport/session.cpp
@@ -53,10 +53,23 @@ Ticket Session::sinkMessage(const Message& message, Date_t expiration) {
return getTransportLayer()->sinkMessage(shared_from_this(), message, expiration);
}
-void Session::replaceTags(TagMask tags) {
- // Don't allow explicit assignment of the pending tag, it's only used to describe a new session
- // until the tags gets assigned.
- _tags.store(tags & ~kPending);
+void Session::setTags(TagMask tagsToSet) {
+ mutateTags([tagsToSet](TagMask originalTags) { return (originalTags | tagsToSet); });
+}
+
+void Session::unsetTags(TagMask tagsToUnset) {
+ mutateTags([tagsToUnset](TagMask originalTags) { return (originalTags & ~tagsToUnset); });
+}
+
+void Session::mutateTags(const stdx::function<TagMask(TagMask)>& mutateFunc) {
+ TagMask oldValue, newValue;
+ do {
+ oldValue = _tags.load();
+ newValue = mutateFunc(oldValue);
+
+ // Any change to the session tags automatically clears kPending status.
+ newValue &= ~kPending;
+ } while (_tags.compareAndSwap(oldValue, newValue) != oldValue);
}
Session::TagMask Session::getTags() const {
diff --git a/src/mongo/transport/session.h b/src/mongo/transport/session.h
index c5844ade1b4..b2160117807 100644
--- a/src/mongo/transport/session.h
+++ b/src/mongo/transport/session.h
@@ -119,10 +119,33 @@ public:
virtual const HostAndPort& local() const = 0;
/**
- * Set this session's tags. This Session will register
- * its new tags with its TransportLayer.
+ * Atomically set all of the session tags specified in the 'tagsToSet' bit field. If the
+ * 'kPending' tag is set, indicating that no tags have yet been specified for the session, this
+ * function also clears that tag as part of the same atomic operation.
+ *
+ * The 'kPending' tag is only for new sessions; callers should not set it directly.
+ */
+ virtual void setTags(TagMask tagsToSet);
+
+ /**
+ * Atomically clears all of the session tags specified in the 'tagsToUnset' bit field. If the
+ * 'kPending' tag is set, indicating that no tags have yet been specified for the session, this
+ * function also clears that tag as part of the same atomic operation.
+ */
+ virtual void unsetTags(TagMask tagsToUnset);
+
+ /**
+ * Loads the session tags, passes them to 'mutateFunc' and then stores the result of that call
+ * as the new session tags, all in one atomic operation.
+ *
+ * In order to ensure atomicity, 'mutateFunc' may get called multiple times, so it should not
+ * perform expensive computations or operations with side effects.
+ *
+ * If the 'kPending' tag is set originally, mutateTags() will unset it regardless of the result
+ * of the 'mutateFunc' call. The 'kPending' tag is only for new sessions; callers should never
+ * try to set it.
*/
- virtual void replaceTags(TagMask tags);
+ virtual void mutateTags(const stdx::function<TagMask(TagMask)>& mutateFunc);
/**
* Get this session's tags.