summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2017-05-19 07:09:05 -0400
committerBenety Goh <benety@mongodb.com>2017-05-24 18:45:02 -0400
commitf4cd5a8511fbda4ac06e70ba69eb90ad1d1beb39 (patch)
tree0e5c7c7a165396c25fd297dfbcc841578547f4c1
parenta9023ef65e67ca90018f94b57bb84aaff7484917 (diff)
downloadmongo-f4cd5a8511fbda4ac06e70ba69eb90ad1d1beb39.tar.gz
SERVER-29275 ReplicationCoordinator cleans up drop-pending collection when commit level advances
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp33
1 files changed, 33 insertions, 0 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 6967d6ae8cf..a70df25f4e0 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -52,6 +52,7 @@
#include "mongo/db/op_observer.h"
#include "mongo/db/repair_database.h"
#include "mongo/db/repl/bgsync.h"
+#include "mongo/db/repl/drop_pending_collection_reaper.h"
#include "mongo/db/repl/isself.h"
#include "mongo/db/repl/last_vote.h"
#include "mongo/db/repl/master_slave.h"
@@ -179,6 +180,24 @@ std::unique_ptr<ThreadPool> makeThreadPool() {
return stdx::make_unique<ThreadPool>(threadPoolOptions);
}
+/**
+ * Schedules a task using the executor. This task is always run unless the task executor is shutting
+ * down.
+ */
+void scheduleWork(executor::TaskExecutor* executor,
+ const executor::TaskExecutor::CallbackFn& work) {
+ auto cbh = executor->scheduleWork([work](const executor::TaskExecutor::CallbackArgs& args) {
+ if (args.status == ErrorCodes::CallbackCanceled) {
+ return;
+ }
+ work(args);
+ });
+ if (cbh == ErrorCodes::ShutdownInProgress) {
+ return;
+ }
+ fassertStatusOK(40460, cbh);
+}
+
} // namespace
ReplicationCoordinatorExternalStateImpl::ReplicationCoordinatorExternalStateImpl(
@@ -856,6 +875,20 @@ bool ReplicationCoordinatorExternalStateImpl::snapshotsEnabled() const {
void ReplicationCoordinatorExternalStateImpl::notifyOplogMetadataWaiters(
const OpTime& committedOpTime) {
signalOplogWaiters();
+
+ // Notify the DropPendingCollectionReaper if there are any drop-pending collections with drop
+ // optimes before or at the committed optime.
+ if (auto earliestDropOpTime = _dropPendingCollectionReaper->getEarliestDropOpTime()) {
+ if (committedOpTime >= *earliestDropOpTime) {
+ auto reaper = _dropPendingCollectionReaper;
+ scheduleWork(
+ _taskExecutor.get(),
+ [committedOpTime, reaper](const executor::TaskExecutor::CallbackArgs& args) {
+ auto opCtx = cc().makeOperationContext();
+ reaper->dropCollectionsOlderThan(opCtx.get(), committedOpTime);
+ });
+ }
+ }
}
double ReplicationCoordinatorExternalStateImpl::getElectionTimeoutOffsetLimitFraction() const {