diff options
author | matt dannenberg <matt.dannenberg@10gen.com> | 2016-01-26 12:40:58 -0500 |
---|---|---|
committer | matt dannenberg <matt.dannenberg@10gen.com> | 2016-02-04 11:22:18 -0500 |
commit | c3d0eecc3cad249849734eae96096b8ad313ab04 (patch) | |
tree | 4faa4a6bc06cd5a99586f3ee9f603a37acac0b15 /src/mongo | |
parent | 2c2e6a38f559f25559c2b24eff51511c6fbc4a5b (diff) | |
download | mongo-c3d0eecc3cad249849734eae96096b8ad313ab04.tar.gz |
SERVER-22269 make ReadConcern: majority reflect journaled state on PRIMARY
Diffstat (limited to 'src/mongo')
25 files changed, 250 insertions, 31 deletions
diff --git a/src/mongo/db/repl/optime.cpp b/src/mongo/db/repl/optime.cpp index 8cf08a0afd7..288826821c2 100644 --- a/src/mongo/db/repl/optime.cpp +++ b/src/mongo/db/repl/optime.cpp @@ -41,26 +41,6 @@ namespace repl { const char OpTime::kTimestampFieldName[] = "ts"; const char OpTime::kTermFieldName[] = "t"; -OpTime::OpTime() : _timestamp(Timestamp(0, 0)), _term(OpTime::kUninitializedTerm) {} - -OpTime::OpTime(Timestamp ts, long long term) : _timestamp(std::move(ts)), _term(term) {} - -Timestamp OpTime::getTimestamp() const { - return _timestamp; -} - -long long OpTime::getSecs() const { - return _timestamp.getSecs(); -} - -long long OpTime::getTerm() const { - return _term; -} - -bool OpTime::isNull() const { - return _timestamp.isNull(); -} - void OpTime::append(BSONObjBuilder* builder, const std::string& subObjName) const { BSONObjBuilder opTimeBuilder(builder->subobjStart(subObjName)); opTimeBuilder.append(kTimestampFieldName, _timestamp); diff --git a/src/mongo/db/repl/optime.h b/src/mongo/db/repl/optime.h index b4c647047b9..d606935f340 100644 --- a/src/mongo/db/repl/optime.h +++ b/src/mongo/db/repl/optime.h @@ -62,14 +62,20 @@ public: static const long long kInitialTerm = 0; // Default OpTime, also the smallest one. - OpTime(); - OpTime(Timestamp ts, long long term); + OpTime() : _timestamp(Timestamp(0, 0)), _term(kUninitializedTerm) {} + OpTime(Timestamp ts, long long term) : _timestamp(std::move(ts)), _term(term) {} - Timestamp getTimestamp() const; + Timestamp getTimestamp() const { + return _timestamp; + } - long long getSecs() const; + long long getSecs() const { + return _timestamp.getSecs(); + } - long long getTerm() const; + long long getTerm() const { + return _term; + } /** * Serializes the contents of this optime to the specified builder in the form: @@ -83,7 +89,9 @@ public: std::string toString() const; // Returns true when this OpTime is not yet initialized. - bool isNull() const; + bool isNull() const { + return _timestamp.isNull(); + } inline bool operator==(const OpTime& rhs) const { // Only compare timestamp if either of the two OpTimes is generated by old protocol, diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index ee12989dbbb..c14dae19b8c 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -53,6 +53,7 @@ #include "mongo/db/repl/minvalid.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/repl_settings.h" +#include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/repl/rs_sync.h" #include "mongo/db/repl/snapshot_thread.h" #include "mongo/db/server_parameters.h" @@ -108,6 +109,7 @@ void ReplicationCoordinatorExternalStateImpl::startThreads(const ReplSettings& s _snapshotThread = SnapshotThread::start(getGlobalServiceContext()); } _startedThreads = true; + getGlobalServiceContext()->getGlobalStorageEngine()->setJournalListener(this); } void ReplicationCoordinatorExternalStateImpl::startMasterSlave(OperationContext* txn) { @@ -430,5 +432,13 @@ bool ReplicationCoordinatorExternalStateImpl::isReadCommittedSupportedByStorageE return storageEngine->getSnapshotManager(); } +JournalListener::Token ReplicationCoordinatorExternalStateImpl::getToken() { + return repl::getGlobalReplicationCoordinator()->getMyLastAppliedOpTime(); +} + +void ReplicationCoordinatorExternalStateImpl::onDurable(const JournalListener::Token& token) { + repl::getGlobalReplicationCoordinator()->setMyLastDurableOpTimeForward(token); +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h index 051c225c5bc..577c0b2aa4f 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h @@ -34,6 +34,7 @@ #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/repl/replication_coordinator_external_state.h" #include "mongo/db/repl/sync_source_feedback.h" +#include "mongo/db/storage/journal_listener.h" #include "mongo/db/storage/snapshot_manager.h" #include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" @@ -43,7 +44,8 @@ namespace repl { class SnapshotThread; -class ReplicationCoordinatorExternalStateImpl : public ReplicationCoordinatorExternalState { +class ReplicationCoordinatorExternalStateImpl : public ReplicationCoordinatorExternalState, + public JournalListener { MONGO_DISALLOW_COPYING(ReplicationCoordinatorExternalStateImpl); public: @@ -84,6 +86,10 @@ public: std::string getNextOpContextThreadName(); + // Methods from JournalListener. + virtual JournalListener::Token getToken(); + virtual void onDurable(const JournalListener::Token& token); + private: // Guards starting threads and setting _startedThreads stdx::mutex _threadMutex; diff --git a/src/mongo/db/storage/SConscript b/src/mongo/db/storage/SConscript index d0468f266e1..eae01a14119 100644 --- a/src/mongo/db/storage/SConscript +++ b/src/mongo/db/storage/SConscript @@ -12,6 +12,15 @@ env.SConscript( env.Library( + target='journal_listener', + source=[ + 'journal_listener.cpp', + ], + LIBDEPS=[ + ], + ) + +env.Library( target='index_entry_comparison', source=[ 'index_entry_comparison.cpp', diff --git a/src/mongo/db/storage/devnull/devnull_kv_engine.h b/src/mongo/db/storage/devnull/devnull_kv_engine.h index a6d559ef35a..f31b07aa242 100644 --- a/src/mongo/db/storage/devnull/devnull_kv_engine.h +++ b/src/mongo/db/storage/devnull/devnull_kv_engine.h @@ -36,6 +36,8 @@ namespace mongo { +class JournalListener; + class DevNullKVEngine : public KVEngine { public: virtual ~DevNullKVEngine() {} @@ -100,6 +102,8 @@ public: virtual void cleanShutdown(){}; + void setJournalListener(JournalListener* jl) final {} + private: std::shared_ptr<void> _catalogInfo; }; diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_engine.h b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_engine.h index 2c6df27be31..01e322f60bb 100644 --- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_engine.h +++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_engine.h @@ -36,6 +36,8 @@ namespace mongo { +class JournalListener; + class EphemeralForTestEngine : public KVEngine { public: virtual RecoveryUnit* newRecoveryUnit(); @@ -96,6 +98,8 @@ public: std::vector<std::string> getAllIdents(OperationContext* opCtx) const; + void setJournalListener(JournalListener* jl) final {} + private: typedef StringMap<std::shared_ptr<void>> DataMap; diff --git a/src/mongo/db/storage/journal_listener.cpp b/src/mongo/db/storage/journal_listener.cpp new file mode 100644 index 00000000000..ff9cb4cd979 --- /dev/null +++ b/src/mongo/db/storage/journal_listener.cpp @@ -0,0 +1,35 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/storage/journal_listener.h" + +namespace mongo { +NoOpJournalListener NoOpJournalListener::instance; +} // namespace mongo diff --git a/src/mongo/db/storage/journal_listener.h b/src/mongo/db/storage/journal_listener.h new file mode 100644 index 00000000000..f0f2d5c2aa7 --- /dev/null +++ b/src/mongo/db/storage/journal_listener.h @@ -0,0 +1,72 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/repl/optime.h" + +namespace mongo { + +/** + * This class allows for the storageEngine to alert the rest of the system about journaled write + * progress. + * + * It has two methods. The first, getToken(), returns a token representing the current progress + * applied to the node. It should be called just prior to making writes durable (usually, syncing a + * journal entry to disk). + * + * The second method, onDurable(), takes this token as an argument and relays to the rest of the + * system that writes through that point have been journaled. All implementations must be prepared + * to receive default-constructed Tokens generated by NoOpJournalListener, in case they are + * activated while a journal commit is in progress. + */ +class JournalListener { +public: + using Token = repl::OpTime; + virtual ~JournalListener() = default; + virtual Token getToken() = 0; + virtual void onDurable(const Token& token) = 0; +}; + +/** + * The NoOpJournalListener is a trivial implementation of a JournalListener, that does nothing. + * NoOpJournalListener::instance exists simply as a default implementation for storage engines to + * use until they are passed a JournalListener with greater functionality, allowing us to avoid + * checking for JournalListener-nullness. + */ +class NoOpJournalListener : public JournalListener { +public: + virtual ~NoOpJournalListener() = default; + virtual JournalListener::Token getToken() { + return JournalListener::Token(); + } + virtual void onDurable(const Token& token) {} + // As this has no state, it is de facto const and can be safely shared freely. + static NoOpJournalListener instance; +}; +} diff --git a/src/mongo/db/storage/kv/kv_engine.h b/src/mongo/db/storage/kv/kv_engine.h index 30e8f9f8e65..0e14bdb0c21 100644 --- a/src/mongo/db/storage/kv/kv_engine.h +++ b/src/mongo/db/storage/kv/kv_engine.h @@ -40,6 +40,7 @@ namespace mongo { class IndexDescriptor; +class JournalListener; class OperationContext; class RecordStore; class RecoveryUnit; @@ -161,6 +162,12 @@ public: } /** + * Sets a new JournalListener, which is used to alert the rest of the + * system about journaled write progress. + */ + virtual void setJournalListener(JournalListener* jl) = 0; + + /** * The destructor will never be called from mongod, but may be called from tests. * Engines may assume that this will only be called in the case of clean shutdown, even if * cleanShutdown() hasn't been called. diff --git a/src/mongo/db/storage/kv/kv_storage_engine.cpp b/src/mongo/db/storage/kv/kv_storage_engine.cpp index 3fb3fa695cc..1cca70a962b 100644 --- a/src/mongo/db/storage/kv/kv_storage_engine.cpp +++ b/src/mongo/db/storage/kv/kv_storage_engine.cpp @@ -287,4 +287,8 @@ Status KVStorageEngine::repairRecordStore(OperationContext* txn, const std::stri _dbs[nsToDatabase(ns)]->reinitCollectionAfterRepair(txn, ns); return Status::OK(); } + +void KVStorageEngine::setJournalListener(JournalListener* jl) { + _engine->setJournalListener(jl); +} } diff --git a/src/mongo/db/storage/kv/kv_storage_engine.h b/src/mongo/db/storage/kv/kv_storage_engine.h index 98216477336..4c520ad1e83 100644 --- a/src/mongo/db/storage/kv/kv_storage_engine.h +++ b/src/mongo/db/storage/kv/kv_storage_engine.h @@ -33,6 +33,7 @@ #include <map> #include <string> +#include "mongo/db/storage/journal_listener.h" #include "mongo/db/storage/kv/kv_catalog.h" #include "mongo/db/storage/record_store.h" #include "mongo/db/storage/storage_engine.h" @@ -95,6 +96,8 @@ public: SnapshotManager* getSnapshotManager() const final; + void setJournalListener(JournalListener* jl) final; + // ------ kv ------ KVEngine* getEngine() { diff --git a/src/mongo/db/storage/mmap_v1/SConscript b/src/mongo/db/storage/mmap_v1/SConscript index 8f8d4343300..99072946ac6 100644 --- a/src/mongo/db/storage/mmap_v1/SConscript +++ b/src/mongo/db/storage/mmap_v1/SConscript @@ -36,6 +36,7 @@ env.Library( 'logfile', 'compress', '$BUILD_DIR/mongo/db/catalog/collection_options', + '$BUILD_DIR/mongo/db/storage/journal_listener', '$BUILD_DIR/mongo/db/storage/paths', ], LIBDEPS_TAGS=[ diff --git a/src/mongo/db/storage/mmap_v1/dur.cpp b/src/mongo/db/storage/mmap_v1/dur.cpp index 979fe0ab900..f50be1ce146 100644 --- a/src/mongo/db/storage/mmap_v1/dur.cpp +++ b/src/mongo/db/storage/mmap_v1/dur.cpp @@ -400,6 +400,11 @@ void remapPrivateViewImpl(double fraction) { DurableImpl durableImpl; NonDurableImpl nonDurableImpl; +// Notified when we commit to the journal. +static JournalListener* journalListener = &NoOpJournalListener::instance; +// Protects journalListener. +static stdx::mutex journalListenerMutex; + } // namespace @@ -730,6 +735,7 @@ static void durThread() { // writes (hasWritten == false). JournalWriter::Buffer* const buffer = journalWriter.newBuffer(); buffer->setNoop(); + buffer->journalListenerToken = getJournalListener()->getToken(); journalWriter.writeBuffer(buffer, commitNumber); } else { @@ -794,6 +800,7 @@ static void durThread() { autoFlushLock.release(); } + buffer->journalListenerToken = getJournalListener()->getToken(); // Request async I/O to the journal. This may block. journalWriter.writeBuffer(buffer, commitNumber); @@ -886,5 +893,15 @@ void startup() { DurableInterface::_impl = &durableImpl; } +void setJournalListener(JournalListener* jl) { + stdx::unique_lock<stdx::mutex> lk(journalListenerMutex); + journalListener = jl; +} + +JournalListener* getJournalListener() { + stdx::unique_lock<stdx::mutex> lk(journalListenerMutex); + return journalListener; +} + } // namespace dur } // namespace mongo diff --git a/src/mongo/db/storage/mmap_v1/dur.h b/src/mongo/db/storage/mmap_v1/dur.h index 7cfd46fada3..43f2ebe7c23 100644 --- a/src/mongo/db/storage/mmap_v1/dur.h +++ b/src/mongo/db/storage/mmap_v1/dur.h @@ -32,6 +32,7 @@ #include <vector> #include "mongo/base/disallow_copying.h" +#include "mongo/db/storage/journal_listener.h" namespace mongo { @@ -42,7 +43,6 @@ namespace dur { // a smaller limit is likely better on 32 bit const unsigned UncommittedBytesLimit = (sizeof(void*) == 4) ? 50 * 1024 * 1024 : 512 * 1024 * 1024; - class DurableInterface { MONGO_DISALLOW_COPYING(DurableInterface); @@ -148,6 +148,13 @@ private: */ void startup(); +// Sets a new JournalListener, which is used to alert the rest of the system about +// journaled write progress. +void setJournalListener(JournalListener* jl); + +// Expose the JournalListener, needed for the journal writer thread. +JournalListener* getJournalListener(); + } // namespace dur diff --git a/src/mongo/db/storage/mmap_v1/dur_journal_writer.cpp b/src/mongo/db/storage/mmap_v1/dur_journal_writer.cpp index 9670aa80300..ff24018a0c6 100644 --- a/src/mongo/db/storage/mmap_v1/dur_journal_writer.cpp +++ b/src/mongo/db/storage/mmap_v1/dur_journal_writer.cpp @@ -33,6 +33,7 @@ #include "mongo/db/storage/mmap_v1/dur_journal_writer.h" #include "mongo/db/client.h" +#include "mongo/db/storage/mmap_v1/dur.h" #include "mongo/db/storage/mmap_v1/dur_journal.h" #include "mongo/db/storage/mmap_v1/dur_recover.h" #include "mongo/db/storage/mmap_v1/dur_stats.h" @@ -236,7 +237,8 @@ void JournalWriter::_journalWriterThread() { WRITETOJOURNAL(buffer->_header, buffer->_builder); // Data is now persisted in the journal, which is sufficient for acknowledging - // getLastError + // durability. + dur::getJournalListener()->onDurable(buffer->journalListenerToken); _commitNotify->notifyAll(buffer->_commitNumber); // Apply the journal entries on top of the shared view so that when flush is diff --git a/src/mongo/db/storage/mmap_v1/dur_journal_writer.h b/src/mongo/db/storage/mmap_v1/dur_journal_writer.h index 2f738cbb380..83ab17fbfbd 100644 --- a/src/mongo/db/storage/mmap_v1/dur_journal_writer.h +++ b/src/mongo/db/storage/mmap_v1/dur_journal_writer.h @@ -29,6 +29,7 @@ #pragma once #include "mongo/base/disallow_copying.h" +#include "mongo/db/storage/journal_listener.h" #include "mongo/db/storage/mmap_v1/aligned_builder.h" #include "mongo/db/storage/mmap_v1/dur_journalformat.h" #include "mongo/stdx/thread.h" @@ -68,6 +69,8 @@ public: _isNoop = true; } + JournalListener::Token journalListenerToken; + private: friend class BufferGuard; friend class JournalWriter; diff --git a/src/mongo/db/storage/mmap_v1/mmap_v1_engine.cpp b/src/mongo/db/storage/mmap_v1/mmap_v1_engine.cpp index da1d7544c98..160c268c6fa 100644 --- a/src/mongo/db/storage/mmap_v1/mmap_v1_engine.cpp +++ b/src/mongo/db/storage/mmap_v1/mmap_v1_engine.cpp @@ -359,4 +359,8 @@ void MMAPV1Engine::cleanShutdown() { MemoryMappedFile::closeAllFiles(ss3); log() << ss3.str() << endl; } + +void MMAPV1Engine::setJournalListener(JournalListener* jl) { + dur::setJournalListener(jl); +} } diff --git a/src/mongo/db/storage/mmap_v1/mmap_v1_engine.h b/src/mongo/db/storage/mmap_v1/mmap_v1_engine.h index ad672f7d3de..4fac787b4b0 100644 --- a/src/mongo/db/storage/mmap_v1/mmap_v1_engine.h +++ b/src/mongo/db/storage/mmap_v1/mmap_v1_engine.h @@ -38,6 +38,7 @@ namespace mongo { +class JournalListener; class MMAPV1DatabaseCatalogEntry; class MMAPV1Engine : public StorageEngine { @@ -93,6 +94,8 @@ public: */ RecordAccessTracker& getRecordAccessTracker(); + void setJournalListener(JournalListener* jl) final; + private: static void _listDatabases(const std::string& directory, std::vector<std::string>* out); diff --git a/src/mongo/db/storage/storage_engine.h b/src/mongo/db/storage/storage_engine.h index b31a296ac89..9b84b32e39d 100644 --- a/src/mongo/db/storage/storage_engine.h +++ b/src/mongo/db/storage/storage_engine.h @@ -40,6 +40,7 @@ namespace mongo { class DatabaseCatalogEntry; +class JournalListener; class OperationContext; class RecoveryUnit; class SnapshotManager; @@ -267,6 +268,12 @@ public: return nullptr; } + /** + * Sets a new JournalListener, which is used by the storage engine to alert the rest of the + * system about journaled write progress. + */ + virtual void setJournalListener(JournalListener* jl) = 0; + protected: /** * The destructor will never be called. See cleanShutdown instead. diff --git a/src/mongo/db/storage/wiredtiger/SConscript b/src/mongo/db/storage/wiredtiger/SConscript index 26e4435c956..431a89a5703 100644 --- a/src/mongo/db/storage/wiredtiger/SConscript +++ b/src/mongo/db/storage/wiredtiger/SConscript @@ -42,6 +42,7 @@ if wiredtiger: '$BUILD_DIR/mongo/db/index/index_descriptor', '$BUILD_DIR/mongo/db/service_context', '$BUILD_DIR/mongo/db/storage/index_entry_comparison', + '$BUILD_DIR/mongo/db/storage/journal_listener', '$BUILD_DIR/mongo/db/storage/key_string', '$BUILD_DIR/mongo/db/storage/oplog_hack', '$BUILD_DIR/mongo/db/storage/storage_options', diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp index 8420434cf4e..8619c04aef7 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp @@ -51,6 +51,7 @@ #include "mongo/db/index/index_descriptor.h" #include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" +#include "mongo/db/storage/journal_listener.h" #include "mongo/db/storage/wiredtiger/wiredtiger_customization_hooks.h" #include "mongo/db/storage/wiredtiger/wiredtiger_global_options.h" #include "mongo/db/storage/wiredtiger/wiredtiger_index.h" @@ -691,4 +692,8 @@ void WiredTigerKVEngine::_checkIdentPath(StringData ident) { start = idx + 1; } } + +void WiredTigerKVEngine::setJournalListener(JournalListener* jl) { + return _sessionCache->setJournalListener(jl); +} } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h index bc0a5f71301..357f94d7a91 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h @@ -44,6 +44,7 @@ namespace mongo { +class JournalListener; class WiredTigerSessionCache; class WiredTigerSizeStorer; @@ -121,6 +122,8 @@ public: return &_sessionCache->snapshotManager(); } + void setJournalListener(JournalListener* jl) final; + // wiredtiger specific // Calls WT_CONNECTION::reconfigure on the underlying WT_CONNECTION // held by this class diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp index 4b9d3a80c52..eda476c4d8f 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp @@ -34,6 +34,7 @@ #include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h" #include "mongo/base/error_codes.h" +#include "mongo/db/storage/journal_listener.h" #include "mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h" #include "mongo/db/storage/wiredtiger/wiredtiger_util.h" #include "mongo/stdx/memory.h" @@ -166,7 +167,12 @@ void WiredTigerSessionCache::waitUntilDurable(bool forceCheckpoint) { WiredTigerSession* session = getSession(); ON_BLOCK_EXIT([this, session] { releaseSession(session); }); WT_SESSION* s = session->getSession(); - invariantWTOK(s->checkpoint(s, NULL)); + { + stdx::unique_lock<stdx::mutex> lk(_journalListenerMutex); + JournalListener::Token token = _journalListener->getToken(); + invariantWTOK(s->checkpoint(s, NULL)); + _journalListener->onDurable(token); + } LOG(4) << "created checkpoint (forced)"; return; } @@ -189,7 +195,12 @@ void WiredTigerSessionCache::waitUntilDurable(bool forceCheckpoint) { // Use the journal when available, or a checkpoint otherwise. if (_engine->isDurable()) { - invariantWTOK(s->log_flush(s, "sync=on")); + { + stdx::unique_lock<stdx::mutex> lk(_journalListenerMutex); + JournalListener::Token token = _journalListener->getToken(); + invariantWTOK(s->log_flush(s, "sync=on")); + _journalListener->onDurable(token); + } LOG(4) << "flushed journal"; } else { invariantWTOK(s->checkpoint(s, NULL)); @@ -278,4 +289,9 @@ void WiredTigerSessionCache::releaseSession(WiredTigerSession* session) { if (_engine && _engine->haveDropsQueued()) _engine->dropAllQueued(); } + +void WiredTigerSessionCache::setJournalListener(JournalListener* jl) { + stdx::unique_lock<stdx::mutex> lk(_journalListenerMutex); + _journalListener = jl; +} } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h index 64c37d743e3..d5354101ac7 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h @@ -37,6 +37,7 @@ #include <boost/thread/shared_mutex.hpp> #include <wiredtiger.h> +#include "mongo/db/storage/journal_listener.h" #include "mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.h" #include "mongo/platform/atomic_word.h" #include "mongo/stdx/mutex.h" @@ -171,6 +172,8 @@ public: return _snapshotManager; } + void setJournalListener(JournalListener* jl); + private: WiredTigerKVEngine* _engine; // not owned, might be NULL WT_CONNECTION* _conn; // not owned @@ -192,5 +195,10 @@ private: // Counter and critical section mutex for waitUntilDurable AtomicUInt32 _lastSyncTime; stdx::mutex _lastSyncMutex; + + // Notified when we commit to the journal. + JournalListener* _journalListener = &NoOpJournalListener::instance; + // Protects _journalListener. + stdx::mutex _journalListenerMutex; }; } // namespace |