/** * Copyright (C) 2012 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #pragma once #include #include "mongo/base/disallow_copying.h" #include "mongo/base/status_with.h" #include "mongo/db/jsobj.h" #include "mongo/db/repl/data_replicator_external_state.h" #include "mongo/db/repl/oplog_buffer.h" #include "mongo/db/repl/oplog_fetcher.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/sync_source_resolver.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/functional.h" #include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" #include "mongo/util/net/hostandport.h" namespace mongo { class DBClientBase; class OperationContext; namespace repl { class ReplicationCoordinator; class ReplicationCoordinatorExternalState; class BackgroundSync { MONGO_DISALLOW_COPYING(BackgroundSync); public: BackgroundSync(ReplicationCoordinatorExternalState* replicationCoordinatorExternalState, std::unique_ptr oplogBuffer); // stop syncing (when this node becomes a primary, e.g.) void stop(); /** * Starts oplog buffer, task executor and producer thread, in that order. */ void startup(OperationContext* txn); /** * Signals producer thread to stop. */ void shutdown(OperationContext* txn); /** * Waits for producer thread to stop before shutting down the task executor and oplog buffer. */ void join(OperationContext* txn); /** * Returns true if shutdown() has been called. * Once this returns true, nothing more will be added to the queue and consumers must shutdown. */ bool inShutdown() const; bool isStopped() const; // starts the sync target notifying thread void notifierThread(); HostAndPort getSyncTarget() const; // Interface implementation bool peek(OperationContext* txn, BSONObj* op); void consume(OperationContext* txn); void clearSyncTarget(); void waitForMore(OperationContext* txn); // For monitoring BSONObj getCounters(); // Clears any fetched and buffered oplog entries. void clearBuffer(OperationContext* txn); /** * Cancel existing find/getMore commands on the sync source's oplog collection. */ void cancelFetcher(); /** * Returns true if any of the following is true: * 1) We are shutting down; * 2) We are primary; * 3) We are in drain mode; or * 4) We are stopped. */ bool shouldStopFetching() const; // Testing related stuff void pushTestOpToBuffer(OperationContext* txn, const BSONObj& op); private: bool _inShutdown_inlock() const; /** * Starts the producer thread which runs until shutdown. Upon resolving the current sync source * the producer thread uses the OplogFetcher (which requires the replication coordinator * external state at construction) to fetch oplog entries from the source's oplog via a long * running find query. */ void _run(); // Production thread inner loop. void _runProducer(); void _produce(OperationContext* txn); /** * Signals to the applier that we have no new data, * and are in sync with the applier at this point. * * NOTE: Used after rollback and during draining to transition to Primary role; */ void _signalNoNewDataForApplier(OperationContext* txn); /** * Checks current background sync state before pushing operations into blocking queue and * updating metrics. If the queue is full, might block. */ void _enqueueDocuments(Fetcher::Documents::const_iterator begin, Fetcher::Documents::const_iterator end, const OplogFetcher::DocumentsInfo& info); /** * Executes a rollback. * 'getConnection' returns a connection to the sync source. */ void _rollback(OperationContext* txn, const HostAndPort& source, stdx::function getConnection); // restart syncing void start(OperationContext* txn); long long _readLastAppliedHash(OperationContext* txn); // Production thread std::unique_ptr _oplogBuffer; // A pointer to the replication coordinator running the show. ReplicationCoordinator* _replCoord; // A pointer to the replication coordinator external state. ReplicationCoordinatorExternalState* _replicationCoordinatorExternalState; // Used to determine sync source. // TODO(dannenberg) move into DataReplicator. SyncSourceResolver _syncSourceResolver; // _mutex protects all of the class variables declared below. mutable stdx::mutex _mutex; OpTime _lastOpTimeFetched; // lastFetchedHash is used to match ops to determine if we need to rollback, when // a secondary. long long _lastFetchedHash = 0LL; // Thread running producerThread(). std::unique_ptr _producerThread; // Set to true if shutdown() has been called. bool _inShutdown = false; // if producer thread should not be running bool _stopped = true; HostAndPort _syncSourceHost; // Current oplog fetcher tailing the oplog on the sync source. // Owned by us. std::unique_ptr _oplogFetcher; }; } // namespace repl } // namespace mongo