summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormatt dannenberg <matt.dannenberg@10gen.com>2016-01-26 12:40:58 -0500
committermatt dannenberg <matt.dannenberg@10gen.com>2016-02-04 11:22:18 -0500
commitc3d0eecc3cad249849734eae96096b8ad313ab04 (patch)
tree4faa4a6bc06cd5a99586f3ee9f603a37acac0b15
parent2c2e6a38f559f25559c2b24eff51511c6fbc4a5b (diff)
downloadmongo-c3d0eecc3cad249849734eae96096b8ad313ab04.tar.gz
SERVER-22269 make ReadConcern: majority reflect journaled state on PRIMARY
-rw-r--r--src/mongo/db/repl/optime.cpp20
-rw-r--r--src/mongo/db/repl/optime.h20
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp10
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.h8
-rw-r--r--src/mongo/db/storage/SConscript9
-rw-r--r--src/mongo/db/storage/devnull/devnull_kv_engine.h4
-rw-r--r--src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_engine.h4
-rw-r--r--src/mongo/db/storage/journal_listener.cpp35
-rw-r--r--src/mongo/db/storage/journal_listener.h72
-rw-r--r--src/mongo/db/storage/kv/kv_engine.h7
-rw-r--r--src/mongo/db/storage/kv/kv_storage_engine.cpp4
-rw-r--r--src/mongo/db/storage/kv/kv_storage_engine.h3
-rw-r--r--src/mongo/db/storage/mmap_v1/SConscript1
-rw-r--r--src/mongo/db/storage/mmap_v1/dur.cpp17
-rw-r--r--src/mongo/db/storage/mmap_v1/dur.h9
-rw-r--r--src/mongo/db/storage/mmap_v1/dur_journal_writer.cpp4
-rw-r--r--src/mongo/db/storage/mmap_v1/dur_journal_writer.h3
-rw-r--r--src/mongo/db/storage/mmap_v1/mmap_v1_engine.cpp4
-rw-r--r--src/mongo/db/storage/mmap_v1/mmap_v1_engine.h3
-rw-r--r--src/mongo/db/storage/storage_engine.h7
-rw-r--r--src/mongo/db/storage/wiredtiger/SConscript1
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp5
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h3
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp20
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h8
25 files changed, 250 insertions, 31 deletions
diff --git a/src/mongo/db/repl/optime.cpp b/src/mongo/db/repl/optime.cpp
index 8cf08a0afd7..288826821c2 100644
--- a/src/mongo/db/repl/optime.cpp
+++ b/src/mongo/db/repl/optime.cpp
@@ -41,26 +41,6 @@ namespace repl {
const char OpTime::kTimestampFieldName[] = "ts";
const char OpTime::kTermFieldName[] = "t";
-OpTime::OpTime() : _timestamp(Timestamp(0, 0)), _term(OpTime::kUninitializedTerm) {}
-
-OpTime::OpTime(Timestamp ts, long long term) : _timestamp(std::move(ts)), _term(term) {}
-
-Timestamp OpTime::getTimestamp() const {
- return _timestamp;
-}
-
-long long OpTime::getSecs() const {
- return _timestamp.getSecs();
-}
-
-long long OpTime::getTerm() const {
- return _term;
-}
-
-bool OpTime::isNull() const {
- return _timestamp.isNull();
-}
-
void OpTime::append(BSONObjBuilder* builder, const std::string& subObjName) const {
BSONObjBuilder opTimeBuilder(builder->subobjStart(subObjName));
opTimeBuilder.append(kTimestampFieldName, _timestamp);
diff --git a/src/mongo/db/repl/optime.h b/src/mongo/db/repl/optime.h
index b4c647047b9..d606935f340 100644
--- a/src/mongo/db/repl/optime.h
+++ b/src/mongo/db/repl/optime.h
@@ -62,14 +62,20 @@ public:
static const long long kInitialTerm = 0;
// Default OpTime, also the smallest one.
- OpTime();
- OpTime(Timestamp ts, long long term);
+ OpTime() : _timestamp(Timestamp(0, 0)), _term(kUninitializedTerm) {}
+ OpTime(Timestamp ts, long long term) : _timestamp(std::move(ts)), _term(term) {}
- Timestamp getTimestamp() const;
+ Timestamp getTimestamp() const {
+ return _timestamp;
+ }
- long long getSecs() const;
+ long long getSecs() const {
+ return _timestamp.getSecs();
+ }
- long long getTerm() const;
+ long long getTerm() const {
+ return _term;
+ }
/**
* Serializes the contents of this optime to the specified builder in the form:
@@ -83,7 +89,9 @@ public:
std::string toString() const;
// Returns true when this OpTime is not yet initialized.
- bool isNull() const;
+ bool isNull() const {
+ return _timestamp.isNull();
+ }
inline bool operator==(const OpTime& rhs) const {
// Only compare timestamp if either of the two OpTimes is generated by old protocol,
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index ee12989dbbb..c14dae19b8c 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -53,6 +53,7 @@
#include "mongo/db/repl/minvalid.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/repl_settings.h"
+#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/repl/rs_sync.h"
#include "mongo/db/repl/snapshot_thread.h"
#include "mongo/db/server_parameters.h"
@@ -108,6 +109,7 @@ void ReplicationCoordinatorExternalStateImpl::startThreads(const ReplSettings& s
_snapshotThread = SnapshotThread::start(getGlobalServiceContext());
}
_startedThreads = true;
+ getGlobalServiceContext()->getGlobalStorageEngine()->setJournalListener(this);
}
void ReplicationCoordinatorExternalStateImpl::startMasterSlave(OperationContext* txn) {
@@ -430,5 +432,13 @@ bool ReplicationCoordinatorExternalStateImpl::isReadCommittedSupportedByStorageE
return storageEngine->getSnapshotManager();
}
+JournalListener::Token ReplicationCoordinatorExternalStateImpl::getToken() {
+ return repl::getGlobalReplicationCoordinator()->getMyLastAppliedOpTime();
+}
+
+void ReplicationCoordinatorExternalStateImpl::onDurable(const JournalListener::Token& token) {
+ repl::getGlobalReplicationCoordinator()->setMyLastDurableOpTimeForward(token);
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
index 051c225c5bc..577c0b2aa4f 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
@@ -34,6 +34,7 @@
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/repl/replication_coordinator_external_state.h"
#include "mongo/db/repl/sync_source_feedback.h"
+#include "mongo/db/storage/journal_listener.h"
#include "mongo/db/storage/snapshot_manager.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/thread.h"
@@ -43,7 +44,8 @@ namespace repl {
class SnapshotThread;
-class ReplicationCoordinatorExternalStateImpl : public ReplicationCoordinatorExternalState {
+class ReplicationCoordinatorExternalStateImpl : public ReplicationCoordinatorExternalState,
+ public JournalListener {
MONGO_DISALLOW_COPYING(ReplicationCoordinatorExternalStateImpl);
public:
@@ -84,6 +86,10 @@ public:
std::string getNextOpContextThreadName();
+ // Methods from JournalListener.
+ virtual JournalListener::Token getToken();
+ virtual void onDurable(const JournalListener::Token& token);
+
private:
// Guards starting threads and setting _startedThreads
stdx::mutex _threadMutex;
diff --git a/src/mongo/db/storage/SConscript b/src/mongo/db/storage/SConscript
index d0468f266e1..eae01a14119 100644
--- a/src/mongo/db/storage/SConscript
+++ b/src/mongo/db/storage/SConscript
@@ -12,6 +12,15 @@ env.SConscript(
env.Library(
+ target='journal_listener',
+ source=[
+ 'journal_listener.cpp',
+ ],
+ LIBDEPS=[
+ ],
+ )
+
+env.Library(
target='index_entry_comparison',
source=[
'index_entry_comparison.cpp',
diff --git a/src/mongo/db/storage/devnull/devnull_kv_engine.h b/src/mongo/db/storage/devnull/devnull_kv_engine.h
index a6d559ef35a..f31b07aa242 100644
--- a/src/mongo/db/storage/devnull/devnull_kv_engine.h
+++ b/src/mongo/db/storage/devnull/devnull_kv_engine.h
@@ -36,6 +36,8 @@
namespace mongo {
+class JournalListener;
+
class DevNullKVEngine : public KVEngine {
public:
virtual ~DevNullKVEngine() {}
@@ -100,6 +102,8 @@ public:
virtual void cleanShutdown(){};
+ void setJournalListener(JournalListener* jl) final {}
+
private:
std::shared_ptr<void> _catalogInfo;
};
diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_engine.h b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_engine.h
index 2c6df27be31..01e322f60bb 100644
--- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_engine.h
+++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_engine.h
@@ -36,6 +36,8 @@
namespace mongo {
+class JournalListener;
+
class EphemeralForTestEngine : public KVEngine {
public:
virtual RecoveryUnit* newRecoveryUnit();
@@ -96,6 +98,8 @@ public:
std::vector<std::string> getAllIdents(OperationContext* opCtx) const;
+ void setJournalListener(JournalListener* jl) final {}
+
private:
typedef StringMap<std::shared_ptr<void>> DataMap;
diff --git a/src/mongo/db/storage/journal_listener.cpp b/src/mongo/db/storage/journal_listener.cpp
new file mode 100644
index 00000000000..ff9cb4cd979
--- /dev/null
+++ b/src/mongo/db/storage/journal_listener.cpp
@@ -0,0 +1,35 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/storage/journal_listener.h"
+
+namespace mongo {
+NoOpJournalListener NoOpJournalListener::instance;
+} // namespace mongo
diff --git a/src/mongo/db/storage/journal_listener.h b/src/mongo/db/storage/journal_listener.h
new file mode 100644
index 00000000000..f0f2d5c2aa7
--- /dev/null
+++ b/src/mongo/db/storage/journal_listener.h
@@ -0,0 +1,72 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/repl/optime.h"
+
+namespace mongo {
+
+/**
+ * This class allows for the storageEngine to alert the rest of the system about journaled write
+ * progress.
+ *
+ * It has two methods. The first, getToken(), returns a token representing the current progress
+ * applied to the node. It should be called just prior to making writes durable (usually, syncing a
+ * journal entry to disk).
+ *
+ * The second method, onDurable(), takes this token as an argument and relays to the rest of the
+ * system that writes through that point have been journaled. All implementations must be prepared
+ * to receive default-constructed Tokens generated by NoOpJournalListener, in case they are
+ * activated while a journal commit is in progress.
+ */
+class JournalListener {
+public:
+ using Token = repl::OpTime;
+ virtual ~JournalListener() = default;
+ virtual Token getToken() = 0;
+ virtual void onDurable(const Token& token) = 0;
+};
+
+/**
+ * The NoOpJournalListener is a trivial implementation of a JournalListener, that does nothing.
+ * NoOpJournalListener::instance exists simply as a default implementation for storage engines to
+ * use until they are passed a JournalListener with greater functionality, allowing us to avoid
+ * checking for JournalListener-nullness.
+ */
+class NoOpJournalListener : public JournalListener {
+public:
+ virtual ~NoOpJournalListener() = default;
+ virtual JournalListener::Token getToken() {
+ return JournalListener::Token();
+ }
+ virtual void onDurable(const Token& token) {}
+ // As this has no state, it is de facto const and can be safely shared freely.
+ static NoOpJournalListener instance;
+};
+}
diff --git a/src/mongo/db/storage/kv/kv_engine.h b/src/mongo/db/storage/kv/kv_engine.h
index 30e8f9f8e65..0e14bdb0c21 100644
--- a/src/mongo/db/storage/kv/kv_engine.h
+++ b/src/mongo/db/storage/kv/kv_engine.h
@@ -40,6 +40,7 @@
namespace mongo {
class IndexDescriptor;
+class JournalListener;
class OperationContext;
class RecordStore;
class RecoveryUnit;
@@ -161,6 +162,12 @@ public:
}
/**
+ * Sets a new JournalListener, which is used to alert the rest of the
+ * system about journaled write progress.
+ */
+ virtual void setJournalListener(JournalListener* jl) = 0;
+
+ /**
* The destructor will never be called from mongod, but may be called from tests.
* Engines may assume that this will only be called in the case of clean shutdown, even if
* cleanShutdown() hasn't been called.
diff --git a/src/mongo/db/storage/kv/kv_storage_engine.cpp b/src/mongo/db/storage/kv/kv_storage_engine.cpp
index 3fb3fa695cc..1cca70a962b 100644
--- a/src/mongo/db/storage/kv/kv_storage_engine.cpp
+++ b/src/mongo/db/storage/kv/kv_storage_engine.cpp
@@ -287,4 +287,8 @@ Status KVStorageEngine::repairRecordStore(OperationContext* txn, const std::stri
_dbs[nsToDatabase(ns)]->reinitCollectionAfterRepair(txn, ns);
return Status::OK();
}
+
+void KVStorageEngine::setJournalListener(JournalListener* jl) {
+ _engine->setJournalListener(jl);
+}
}
diff --git a/src/mongo/db/storage/kv/kv_storage_engine.h b/src/mongo/db/storage/kv/kv_storage_engine.h
index 98216477336..4c520ad1e83 100644
--- a/src/mongo/db/storage/kv/kv_storage_engine.h
+++ b/src/mongo/db/storage/kv/kv_storage_engine.h
@@ -33,6 +33,7 @@
#include <map>
#include <string>
+#include "mongo/db/storage/journal_listener.h"
#include "mongo/db/storage/kv/kv_catalog.h"
#include "mongo/db/storage/record_store.h"
#include "mongo/db/storage/storage_engine.h"
@@ -95,6 +96,8 @@ public:
SnapshotManager* getSnapshotManager() const final;
+ void setJournalListener(JournalListener* jl) final;
+
// ------ kv ------
KVEngine* getEngine() {
diff --git a/src/mongo/db/storage/mmap_v1/SConscript b/src/mongo/db/storage/mmap_v1/SConscript
index 8f8d4343300..99072946ac6 100644
--- a/src/mongo/db/storage/mmap_v1/SConscript
+++ b/src/mongo/db/storage/mmap_v1/SConscript
@@ -36,6 +36,7 @@ env.Library(
'logfile',
'compress',
'$BUILD_DIR/mongo/db/catalog/collection_options',
+ '$BUILD_DIR/mongo/db/storage/journal_listener',
'$BUILD_DIR/mongo/db/storage/paths',
],
LIBDEPS_TAGS=[
diff --git a/src/mongo/db/storage/mmap_v1/dur.cpp b/src/mongo/db/storage/mmap_v1/dur.cpp
index 979fe0ab900..f50be1ce146 100644
--- a/src/mongo/db/storage/mmap_v1/dur.cpp
+++ b/src/mongo/db/storage/mmap_v1/dur.cpp
@@ -400,6 +400,11 @@ void remapPrivateViewImpl(double fraction) {
DurableImpl durableImpl;
NonDurableImpl nonDurableImpl;
+// Notified when we commit to the journal.
+static JournalListener* journalListener = &NoOpJournalListener::instance;
+// Protects journalListener.
+static stdx::mutex journalListenerMutex;
+
} // namespace
@@ -730,6 +735,7 @@ static void durThread() {
// writes (hasWritten == false).
JournalWriter::Buffer* const buffer = journalWriter.newBuffer();
buffer->setNoop();
+ buffer->journalListenerToken = getJournalListener()->getToken();
journalWriter.writeBuffer(buffer, commitNumber);
} else {
@@ -794,6 +800,7 @@ static void durThread() {
autoFlushLock.release();
}
+ buffer->journalListenerToken = getJournalListener()->getToken();
// Request async I/O to the journal. This may block.
journalWriter.writeBuffer(buffer, commitNumber);
@@ -886,5 +893,15 @@ void startup() {
DurableInterface::_impl = &durableImpl;
}
+void setJournalListener(JournalListener* jl) {
+ stdx::unique_lock<stdx::mutex> lk(journalListenerMutex);
+ journalListener = jl;
+}
+
+JournalListener* getJournalListener() {
+ stdx::unique_lock<stdx::mutex> lk(journalListenerMutex);
+ return journalListener;
+}
+
} // namespace dur
} // namespace mongo
diff --git a/src/mongo/db/storage/mmap_v1/dur.h b/src/mongo/db/storage/mmap_v1/dur.h
index 7cfd46fada3..43f2ebe7c23 100644
--- a/src/mongo/db/storage/mmap_v1/dur.h
+++ b/src/mongo/db/storage/mmap_v1/dur.h
@@ -32,6 +32,7 @@
#include <vector>
#include "mongo/base/disallow_copying.h"
+#include "mongo/db/storage/journal_listener.h"
namespace mongo {
@@ -42,7 +43,6 @@ namespace dur {
// a smaller limit is likely better on 32 bit
const unsigned UncommittedBytesLimit = (sizeof(void*) == 4) ? 50 * 1024 * 1024 : 512 * 1024 * 1024;
-
class DurableInterface {
MONGO_DISALLOW_COPYING(DurableInterface);
@@ -148,6 +148,13 @@ private:
*/
void startup();
+// Sets a new JournalListener, which is used to alert the rest of the system about
+// journaled write progress.
+void setJournalListener(JournalListener* jl);
+
+// Expose the JournalListener, needed for the journal writer thread.
+JournalListener* getJournalListener();
+
} // namespace dur
diff --git a/src/mongo/db/storage/mmap_v1/dur_journal_writer.cpp b/src/mongo/db/storage/mmap_v1/dur_journal_writer.cpp
index 9670aa80300..ff24018a0c6 100644
--- a/src/mongo/db/storage/mmap_v1/dur_journal_writer.cpp
+++ b/src/mongo/db/storage/mmap_v1/dur_journal_writer.cpp
@@ -33,6 +33,7 @@
#include "mongo/db/storage/mmap_v1/dur_journal_writer.h"
#include "mongo/db/client.h"
+#include "mongo/db/storage/mmap_v1/dur.h"
#include "mongo/db/storage/mmap_v1/dur_journal.h"
#include "mongo/db/storage/mmap_v1/dur_recover.h"
#include "mongo/db/storage/mmap_v1/dur_stats.h"
@@ -236,7 +237,8 @@ void JournalWriter::_journalWriterThread() {
WRITETOJOURNAL(buffer->_header, buffer->_builder);
// Data is now persisted in the journal, which is sufficient for acknowledging
- // getLastError
+ // durability.
+ dur::getJournalListener()->onDurable(buffer->journalListenerToken);
_commitNotify->notifyAll(buffer->_commitNumber);
// Apply the journal entries on top of the shared view so that when flush is
diff --git a/src/mongo/db/storage/mmap_v1/dur_journal_writer.h b/src/mongo/db/storage/mmap_v1/dur_journal_writer.h
index 2f738cbb380..83ab17fbfbd 100644
--- a/src/mongo/db/storage/mmap_v1/dur_journal_writer.h
+++ b/src/mongo/db/storage/mmap_v1/dur_journal_writer.h
@@ -29,6 +29,7 @@
#pragma once
#include "mongo/base/disallow_copying.h"
+#include "mongo/db/storage/journal_listener.h"
#include "mongo/db/storage/mmap_v1/aligned_builder.h"
#include "mongo/db/storage/mmap_v1/dur_journalformat.h"
#include "mongo/stdx/thread.h"
@@ -68,6 +69,8 @@ public:
_isNoop = true;
}
+ JournalListener::Token journalListenerToken;
+
private:
friend class BufferGuard;
friend class JournalWriter;
diff --git a/src/mongo/db/storage/mmap_v1/mmap_v1_engine.cpp b/src/mongo/db/storage/mmap_v1/mmap_v1_engine.cpp
index da1d7544c98..160c268c6fa 100644
--- a/src/mongo/db/storage/mmap_v1/mmap_v1_engine.cpp
+++ b/src/mongo/db/storage/mmap_v1/mmap_v1_engine.cpp
@@ -359,4 +359,8 @@ void MMAPV1Engine::cleanShutdown() {
MemoryMappedFile::closeAllFiles(ss3);
log() << ss3.str() << endl;
}
+
+void MMAPV1Engine::setJournalListener(JournalListener* jl) {
+ dur::setJournalListener(jl);
+}
}
diff --git a/src/mongo/db/storage/mmap_v1/mmap_v1_engine.h b/src/mongo/db/storage/mmap_v1/mmap_v1_engine.h
index ad672f7d3de..4fac787b4b0 100644
--- a/src/mongo/db/storage/mmap_v1/mmap_v1_engine.h
+++ b/src/mongo/db/storage/mmap_v1/mmap_v1_engine.h
@@ -38,6 +38,7 @@
namespace mongo {
+class JournalListener;
class MMAPV1DatabaseCatalogEntry;
class MMAPV1Engine : public StorageEngine {
@@ -93,6 +94,8 @@ public:
*/
RecordAccessTracker& getRecordAccessTracker();
+ void setJournalListener(JournalListener* jl) final;
+
private:
static void _listDatabases(const std::string& directory, std::vector<std::string>* out);
diff --git a/src/mongo/db/storage/storage_engine.h b/src/mongo/db/storage/storage_engine.h
index b31a296ac89..9b84b32e39d 100644
--- a/src/mongo/db/storage/storage_engine.h
+++ b/src/mongo/db/storage/storage_engine.h
@@ -40,6 +40,7 @@
namespace mongo {
class DatabaseCatalogEntry;
+class JournalListener;
class OperationContext;
class RecoveryUnit;
class SnapshotManager;
@@ -267,6 +268,12 @@ public:
return nullptr;
}
+ /**
+ * Sets a new JournalListener, which is used by the storage engine to alert the rest of the
+ * system about journaled write progress.
+ */
+ virtual void setJournalListener(JournalListener* jl) = 0;
+
protected:
/**
* The destructor will never be called. See cleanShutdown instead.
diff --git a/src/mongo/db/storage/wiredtiger/SConscript b/src/mongo/db/storage/wiredtiger/SConscript
index 26e4435c956..431a89a5703 100644
--- a/src/mongo/db/storage/wiredtiger/SConscript
+++ b/src/mongo/db/storage/wiredtiger/SConscript
@@ -42,6 +42,7 @@ if wiredtiger:
'$BUILD_DIR/mongo/db/index/index_descriptor',
'$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/db/storage/index_entry_comparison',
+ '$BUILD_DIR/mongo/db/storage/journal_listener',
'$BUILD_DIR/mongo/db/storage/key_string',
'$BUILD_DIR/mongo/db/storage/oplog_hack',
'$BUILD_DIR/mongo/db/storage/storage_options',
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
index 8420434cf4e..8619c04aef7 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
@@ -51,6 +51,7 @@
#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/service_context.h"
+#include "mongo/db/storage/journal_listener.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_customization_hooks.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_global_options.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_index.h"
@@ -691,4 +692,8 @@ void WiredTigerKVEngine::_checkIdentPath(StringData ident) {
start = idx + 1;
}
}
+
+void WiredTigerKVEngine::setJournalListener(JournalListener* jl) {
+ return _sessionCache->setJournalListener(jl);
+}
}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
index bc0a5f71301..357f94d7a91 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
@@ -44,6 +44,7 @@
namespace mongo {
+class JournalListener;
class WiredTigerSessionCache;
class WiredTigerSizeStorer;
@@ -121,6 +122,8 @@ public:
return &_sessionCache->snapshotManager();
}
+ void setJournalListener(JournalListener* jl) final;
+
// wiredtiger specific
// Calls WT_CONNECTION::reconfigure on the underlying WT_CONNECTION
// held by this class
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp
index 4b9d3a80c52..eda476c4d8f 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h"
#include "mongo/base/error_codes.h"
+#include "mongo/db/storage/journal_listener.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_util.h"
#include "mongo/stdx/memory.h"
@@ -166,7 +167,12 @@ void WiredTigerSessionCache::waitUntilDurable(bool forceCheckpoint) {
WiredTigerSession* session = getSession();
ON_BLOCK_EXIT([this, session] { releaseSession(session); });
WT_SESSION* s = session->getSession();
- invariantWTOK(s->checkpoint(s, NULL));
+ {
+ stdx::unique_lock<stdx::mutex> lk(_journalListenerMutex);
+ JournalListener::Token token = _journalListener->getToken();
+ invariantWTOK(s->checkpoint(s, NULL));
+ _journalListener->onDurable(token);
+ }
LOG(4) << "created checkpoint (forced)";
return;
}
@@ -189,7 +195,12 @@ void WiredTigerSessionCache::waitUntilDurable(bool forceCheckpoint) {
// Use the journal when available, or a checkpoint otherwise.
if (_engine->isDurable()) {
- invariantWTOK(s->log_flush(s, "sync=on"));
+ {
+ stdx::unique_lock<stdx::mutex> lk(_journalListenerMutex);
+ JournalListener::Token token = _journalListener->getToken();
+ invariantWTOK(s->log_flush(s, "sync=on"));
+ _journalListener->onDurable(token);
+ }
LOG(4) << "flushed journal";
} else {
invariantWTOK(s->checkpoint(s, NULL));
@@ -278,4 +289,9 @@ void WiredTigerSessionCache::releaseSession(WiredTigerSession* session) {
if (_engine && _engine->haveDropsQueued())
_engine->dropAllQueued();
}
+
+void WiredTigerSessionCache::setJournalListener(JournalListener* jl) {
+ stdx::unique_lock<stdx::mutex> lk(_journalListenerMutex);
+ _journalListener = jl;
+}
}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h
index 64c37d743e3..d5354101ac7 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h
@@ -37,6 +37,7 @@
#include <boost/thread/shared_mutex.hpp>
#include <wiredtiger.h>
+#include "mongo/db/storage/journal_listener.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/stdx/mutex.h"
@@ -171,6 +172,8 @@ public:
return _snapshotManager;
}
+ void setJournalListener(JournalListener* jl);
+
private:
WiredTigerKVEngine* _engine; // not owned, might be NULL
WT_CONNECTION* _conn; // not owned
@@ -192,5 +195,10 @@ private:
// Counter and critical section mutex for waitUntilDurable
AtomicUInt32 _lastSyncTime;
stdx::mutex _lastSyncMutex;
+
+ // Notified when we commit to the journal.
+ JournalListener* _journalListener = &NoOpJournalListener::instance;
+ // Protects _journalListener.
+ stdx::mutex _journalListenerMutex;
};
} // namespace