/**
* Copyright (C) 2015 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include
#include
#include
#include
#include "mongo/base/disallow_copying.h"
#include "mongo/base/string_data.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/query/index_bounds.h"
#include "mongo/db/repl/collection_bulk_loader.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/service_context.h"
namespace mongo {
class Collection;
struct CollectionOptions;
class OperationContext;
namespace repl {
/**
* Storage interface used by the replication system to interact with storage.
* This interface provides seperation of concerns and a place for mocking out test
* interactions.
*
* The grouping of functionality includes general collection helpers, and more specific replication
* concepts:
* * Create Collection and Oplog
* * Drop database and all user databases
* * Drop a collection
* * Insert documents into a collection
* * Manage minvalid boundaries and initial sync state
*
* ***** MINVALID *****
* This interface provides helper functions for maintaining a single document in the
* local.replset.minvalid collection.
*
* When a member reaches its minValid optime it is in a consistent state. Thus, minValid is
* set as the last step in initial sync. At the beginning of initial sync, doingInitialSync
* is appended onto minValid to indicate that initial sync was started but has not yet
* completed.
*
* The document is also updated during "normal" sync. The optime of the last op in each batch is
* used to set minValid, along with a "begin" field to demark the start and the fact that a batch
* is active. When the batch is done the "begin" field is removed to indicate that we are in a
* consistent state when the batch has been fully applied.
*
* Example of all fields:
* { _id:...,
* doingInitialSync: true // initial sync is active
* ts:..., t:... // end-OpTime
* begin: {ts:..., t:...} // a batch is currently being applied, and not consistent
* }
*/
class StorageInterface {
MONGO_DISALLOW_COPYING(StorageInterface);
public:
// Operation Context binding.
static StorageInterface* get(ServiceContext* service);
static StorageInterface* get(ServiceContext& service);
static StorageInterface* get(OperationContext* opCtx);
static void set(ServiceContext* service, std::unique_ptr storageInterface);
// Constructor and Destructor.
StorageInterface() = default;
virtual ~StorageInterface() = default;
// MinValid and Initial Sync Flag.
/**
* Returns true if initial sync was started but has not not completed.
*/
virtual bool getInitialSyncFlag(OperationContext* opCtx) const = 0;
/**
* Sets the the initial sync flag to record that initial sync has not completed.
*
* This operation is durable and waits for durable writes (which will block on
*journaling/checkpointing).
*/
virtual void setInitialSyncFlag(OperationContext* opCtx) = 0;
/**
* Clears the the initial sync flag to record that initial sync has completed.
*
* This operation is durable and waits for durable writes (which will block on
*journaling/checkpointing).
*/
virtual void clearInitialSyncFlag(OperationContext* opCtx) = 0;
/**
* The minValid value is the earliest (minimum) Timestamp that must be applied in order to
* consider the dataset consistent.
*/
virtual void setMinValid(OperationContext* opCtx, const OpTime& minValid) = 0;
virtual OpTime getMinValid(OperationContext* opCtx) const = 0;
/**
* Sets minValid only if it is not already higher than endOpTime.
* Warning, this compares the term and timestamp independently. Do not use if the current
* minValid could be from the other fork of a rollback.
*/
virtual void setMinValidToAtLeast(OperationContext* opCtx, const OpTime& endOpTime) = 0;
/**
* Rollback ID is an increasing counter of how many rollbacks have occurred on this server.
*/
virtual StatusWith getRollbackID(OperationContext* opCtx) = 0;
virtual Status initializeRollbackID(OperationContext* opCtx) = 0;
virtual Status incrementRollbackID(OperationContext* opCtx) = 0;
/**
* On startup all oplog entries with a value >= the oplog delete from point should be deleted.
* If null, no documents should be deleted.
*/
virtual void setOplogDeleteFromPoint(OperationContext* opCtx, const Timestamp& timestamp) = 0;
virtual Timestamp getOplogDeleteFromPoint(OperationContext* opCtx) = 0;
/**
* The applied through point is a persistent record of where we've applied through. If null, the
* applied through point is the top of the oplog.
*/
virtual void setAppliedThrough(OperationContext* opCtx, const OpTime& optime) = 0;
/**
* You should probably be calling ReplicationCoordinator::getLastAppliedOpTime() instead.
*
* This reads the value from storage which isn't always updated when the ReplicationCoordinator
* is.
*/
virtual OpTime getAppliedThrough(OperationContext* opCtx) = 0;
// Collection creation and population for initial sync.
/**
* Creates a collection with the provided indexes.
*
* Assumes that no database locks have been acquired prior to calling this function.
*/
virtual StatusWith> createCollectionForBulkLoading(
const NamespaceString& nss,
const CollectionOptions& options,
const BSONObj idIndexSpec,
const std::vector& secondaryIndexSpecs) = 0;
/**
* Inserts a document into a collection.
*
* NOTE: If the collection doesn't exist, it will not be created, and instead
* an error is returned.
*/
virtual Status insertDocument(OperationContext* opCtx,
const NamespaceString& nss,
const BSONObj& doc) = 0;
/**
* Inserts the given documents into the collection.
* It is an error to call this function with an empty set of documents.
*/
virtual Status insertDocuments(OperationContext* opCtx,
const NamespaceString& nss,
const std::vector& docs) = 0;
/**
* Creates the initial oplog, errors if it exists.
*/
virtual Status createOplog(OperationContext* opCtx, const NamespaceString& nss) = 0;
/**
* Returns the configured maximum size of the oplog.
*
* Implementations are allowed to be "fuzzy" and delete documents when the actual size is
* slightly above or below this, so callers should not rely on its exact value.
*/
virtual StatusWith getOplogMaxSize(OperationContext* opCtx,
const NamespaceString& nss) = 0;
/**
* Creates a collection.
*/
virtual Status createCollection(OperationContext* opCtx,
const NamespaceString& nss,
const CollectionOptions& options) = 0;
/**
* Drops a collection, like the oplog.
*/
virtual Status dropCollection(OperationContext* opCtx, const NamespaceString& nss) = 0;
/**
* Drops all databases except "local".
*/
virtual Status dropReplicatedDatabases(OperationContext* opCtx) = 0;
/**
* Validates that the admin database is valid during initial sync.
*/
virtual Status isAdminDbValid(OperationContext* opCtx) = 0;
/**
* Finds at most "limit" documents returned by a collection or index scan on the collection in
* the requested direction.
* The documents returned will be copied and buffered. No cursors on the underlying collection
* will be kept open once this function returns.
* If "indexName" is boost::none, a collection scan is used to locate the document.
* Index scan options:
* If "startKey" is not empty, the index scan will start from the given key (instead of
* MinKey/MaxKey).
* Set "boundInclusion" to BoundInclusion::kIncludeStartKeyOnly to include "startKey" in
* the index scan results. Set to BoundInclusion::kIncludeEndKeyOnly to return the key
* immediately following "startKey" from the index.
*/
enum class ScanDirection {
kForward = 1,
kBackward = -1,
};
virtual StatusWith> findDocuments(OperationContext* opCtx,
const NamespaceString& nss,
boost::optional indexName,
ScanDirection scanDirection,
const BSONObj& startKey,
BoundInclusion boundInclusion,
std::size_t limit) = 0;
/**
* Deletes at most "limit" documents returned by a collection or index scan on the collection in
* the requested direction. Returns deleted documents on success.
* The documents returned will be copied and buffered. No cursors on the underlying collection
* will be kept open once this function returns.
* If "indexName" is null, a collection scan is used to locate the document.
*/
virtual StatusWith> deleteDocuments(OperationContext* opCtx,
const NamespaceString& nss,
boost::optional indexName,
ScanDirection scanDirection,
const BSONObj& startKey,
BoundInclusion boundInclusion,
std::size_t limit) = 0;
/**
* Finds a single document in the collection referenced by the specified _id.
*
* Not supported on collections with a default collation.
*/
virtual StatusWith findById(OperationContext* opCtx,
const NamespaceString& nss,
const BSONElement& idKey) = 0;
/**
* Deletes a single document in the collection referenced by the specified _id.
* Returns deleted document on success.
*
* Not supported on collections with a default collation.
*/
virtual StatusWith deleteById(OperationContext* opCtx,
const NamespaceString& nss,
const BSONElement& idKey) = 0;
/**
* Updates a single document in the collection referenced by the specified _id.
* The document is located by looking up "idKey" in the id index.
* "update" represents the replacement document or list of requested modifications to be applied
* to the document.
* If the document is not found, a new document will be created with the requested modifications
* applied.
*/
virtual Status upsertById(OperationContext* opCtx,
const NamespaceString& nss,
const BSONElement& idKey,
const BSONObj& update) = 0;
/**
* Removes all documents that match the "filter" from a collection.
* "filter" specifies the deletion criteria using query operators. Pass in an empty document to
* delete all documents in a collection.
*/
virtual Status deleteByFilter(OperationContext* opCtx,
const NamespaceString& nss,
const BSONObj& filter) = 0;
using CollectionSize = uint64_t;
using CollectionCount = uint64_t;
/**
* Returns the sum of the sizes of documents in the collection in bytes.
*/
virtual StatusWith getCollectionSize(OperationContext* opCtx,
const NamespaceString& nss) = 0;
/**
* Returns the number of documents in the collection.
*/
virtual StatusWith getCollectionCount(OperationContext* opCtx,
const NamespaceString& nss) = 0;
};
} // namespace repl
} // namespace mongo