/**
* Copyright (C) 2017 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include
#include
#include
#include "mongo/base/status.h"
#include "mongo/base/status_with.h"
#include "mongo/base/string_data.h"
#include "mongo/bson/mutable/damage_vector.h"
#include "mongo/bson/timestamp.h"
#include "mongo/db/catalog/coll_mod.h"
#include "mongo/db/catalog/collection_info_cache.h"
#include "mongo/db/catalog/collection_options.h"
#include "mongo/db/catalog/index_consistency.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/cursor_manager.h"
#include "mongo/db/exec/collection_scan_common.h"
#include "mongo/db/logical_session_id.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/op_observer.h"
#include "mongo/db/query/collation/collator_interface.h"
#include "mongo/db/record_id.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/storage/capped_callback.h"
#include "mongo/db/storage/record_store.h"
#include "mongo/db/storage/snapshot.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/functional.h"
#include "mongo/stdx/mutex.h"
namespace mongo {
class CollectionCatalogEntry;
class DatabaseCatalogEntry;
class ExtentManager;
class IndexCatalog;
class IndexDescriptor;
class DatabaseImpl;
class MatchExpression;
class MultiIndexBlock;
class OpDebug;
class OperationContext;
struct OplogUpdateEntryArgs;
class RecordCursor;
class RecordFetcher;
class UpdateDriver;
class UpdateRequest;
struct CompactOptions {
// padding
enum PaddingMode { PRESERVE, NONE, MANUAL } paddingMode = NONE;
// only used if _paddingMode == MANUAL
double paddingFactor = 1; // what to multiple document size by
int paddingBytes = 0; // what to add to ducment size after multiplication
// other
bool validateDocuments = true;
std::string toString() const;
unsigned computeRecordSize(unsigned recordSize) const {
recordSize = static_cast(paddingFactor * recordSize);
recordSize += paddingBytes;
return recordSize;
}
};
struct CompactStats {
long long corruptDocuments = 0;
};
/**
* Queries with the awaitData option use this notifier object to wait for more data to be
* inserted into the capped collection.
*/
class CappedInsertNotifier {
public:
CappedInsertNotifier();
/**
* Wakes up all threads waiting.
*/
void notifyAll();
/**
* Waits until 'deadline', or until notifyAll() is called to indicate that new
* data is available in the capped collection.
*
* NOTE: Waiting threads can be signaled by calling kill or notify* methods.
*/
void waitUntil(uint64_t prevVersion, Date_t deadline) const;
/**
* Returns the version for use as an additional wake condition when used above.
*/
uint64_t getVersion() const {
return _version;
}
/**
* Cancels the notifier if the collection is dropped/invalidated, and wakes all waiting.
*/
void kill();
/**
* Returns true if no new insert notification will occur.
*/
bool isDead();
private:
// Signalled when a successful insert is made into a capped collection.
mutable stdx::condition_variable _notifier;
// Mutex used with '_notifier'. Protects access to '_version'.
mutable stdx::mutex _mutex;
// A counter, incremented on insertion of new data into the capped collection.
//
// The condition which '_cappedNewDataNotifier' is being notified of is an increment of this
// counter. Access to this counter is synchronized with '_mutex'.
uint64_t _version;
// True once the notifier is dead.
bool _dead;
};
/**
* this is NOT safe through a yield right now.
* not sure if it will be, or what yet.
*/
class Collection final : CappedCallback, UpdateNotifier {
public:
enum ValidationAction { WARN, ERROR_V };
enum ValidationLevel { OFF, MODERATE, STRICT_V };
enum class StoreDeletedDoc { Off, On };
class Impl : virtual CappedCallback, virtual UpdateNotifier {
public:
virtual ~Impl() = 0;
virtual void init(OperationContext* opCtx) = 0;
private:
friend Collection;
virtual DatabaseCatalogEntry* dbce() const = 0;
virtual CollectionCatalogEntry* details() const = 0;
virtual Status aboutToDeleteCapped(OperationContext* opCtx,
const RecordId& loc,
RecordData data) = 0;
virtual Status recordStoreGoingToUpdateInPlace(OperationContext* opCtx,
const RecordId& loc) = 0;
public:
virtual bool ok() const = 0;
virtual CollectionCatalogEntry* getCatalogEntry() = 0;
virtual const CollectionCatalogEntry* getCatalogEntry() const = 0;
virtual CollectionInfoCache* infoCache() = 0;
virtual const CollectionInfoCache* infoCache() const = 0;
virtual const NamespaceString& ns() const = 0;
virtual OptionalCollectionUUID uuid() const = 0;
virtual void refreshUUID(OperationContext* opCtx) = 0;
virtual const IndexCatalog* getIndexCatalog() const = 0;
virtual IndexCatalog* getIndexCatalog() = 0;
virtual const RecordStore* getRecordStore() const = 0;
virtual RecordStore* getRecordStore() = 0;
virtual CursorManager* getCursorManager() const = 0;
virtual bool requiresIdIndex() const = 0;
virtual Snapshotted docFor(OperationContext* opCtx, const RecordId& loc) const = 0;
virtual bool findDoc(OperationContext* opCtx,
const RecordId& loc,
Snapshotted* out) const = 0;
virtual std::unique_ptr getCursor(OperationContext* opCtx,
bool forward) const = 0;
virtual std::vector> getManyCursors(
OperationContext* opCtx) const = 0;
virtual void deleteDocument(OperationContext* opCtx,
StmtId stmtId,
const RecordId& loc,
OpDebug* opDebug,
bool fromMigrate,
bool noWarn,
StoreDeletedDoc storeDeletedDoc) = 0;
virtual Status insertDocuments(OperationContext* opCtx,
std::vector::const_iterator begin,
std::vector::const_iterator end,
OpDebug* opDebug,
bool enforceQuota,
bool fromMigrate) = 0;
virtual Status insertDocument(OperationContext* opCtx,
const InsertStatement& doc,
OpDebug* opDebug,
bool enforceQuota,
bool fromMigrate) = 0;
virtual Status insertDocumentsForOplog(OperationContext* opCtx,
const DocWriter* const* docs,
Timestamp* timestamps,
size_t nDocs) = 0;
virtual Status insertDocument(OperationContext* opCtx,
const BSONObj& doc,
const std::vector& indexBlocks,
bool enforceQuota) = 0;
virtual RecordId updateDocument(OperationContext* opCtx,
const RecordId& oldLocation,
const Snapshotted& oldDoc,
const BSONObj& newDoc,
bool enforceQuota,
bool indexesAffected,
OpDebug* opDebug,
OplogUpdateEntryArgs* args) = 0;
virtual bool updateWithDamagesSupported() const = 0;
virtual StatusWith updateDocumentWithDamages(
OperationContext* opCtx,
const RecordId& loc,
const Snapshotted& oldRec,
const char* damageSource,
const mutablebson::DamageVector& damages,
OplogUpdateEntryArgs* args) = 0;
virtual StatusWith compact(OperationContext* opCtx,
const CompactOptions* options) = 0;
virtual Status truncate(OperationContext* opCtx) = 0;
virtual Status validate(OperationContext* opCtx,
ValidateCmdLevel level,
bool background,
std::unique_ptr collLk,
ValidateResults* results,
BSONObjBuilder* output) = 0;
virtual Status touch(OperationContext* opCtx,
bool touchData,
bool touchIndexes,
BSONObjBuilder* output) const = 0;
virtual void cappedTruncateAfter(OperationContext* opCtx, RecordId end, bool inclusive) = 0;
virtual StatusWithMatchExpression parseValidator(
OperationContext* opCtx,
const BSONObj& validator,
MatchExpressionParser::AllowedFeatureSet allowedFeatures,
boost::optional
maxFeatureCompatibilityVersion = boost::none) const = 0;
virtual Status setValidator(OperationContext* opCtx, BSONObj validator) = 0;
virtual Status setValidationLevel(OperationContext* opCtx, StringData newLevel) = 0;
virtual Status setValidationAction(OperationContext* opCtx, StringData newAction) = 0;
virtual StringData getValidationLevel() const = 0;
virtual StringData getValidationAction() const = 0;
virtual Status updateValidator(OperationContext* opCtx,
BSONObj newValidator,
StringData newLevel,
StringData newAction) = 0;
virtual bool isCapped() const = 0;
virtual std::shared_ptr getCappedInsertNotifier() const = 0;
virtual uint64_t numRecords(OperationContext* opCtx) const = 0;
virtual uint64_t dataSize(OperationContext* opCtx) const = 0;
virtual uint64_t getIndexSize(OperationContext* opCtx,
BSONObjBuilder* details,
int scale) = 0;
virtual boost::optional getMinimumVisibleSnapshot() = 0;
virtual void setMinimumVisibleSnapshot(Timestamp name) = 0;
virtual bool haveCappedWaiters() = 0;
virtual void notifyCappedWaitersIfNeeded() = 0;
virtual const CollatorInterface* getDefaultCollator() const = 0;
};
private:
static std::unique_ptr makeImpl(Collection* _this,
OperationContext* opCtx,
StringData fullNS,
OptionalCollectionUUID uuid,
CollectionCatalogEntry* details,
RecordStore* recordStore,
DatabaseCatalogEntry* dbce);
public:
using factory_function_type = decltype(makeImpl);
static void registerFactory(stdx::function factory);
explicit inline Collection(OperationContext* const opCtx,
const StringData fullNS,
OptionalCollectionUUID uuid,
CollectionCatalogEntry* const details, // does not own
RecordStore* const recordStore, // does not own
DatabaseCatalogEntry* const dbce) // does not own
: _pimpl(makeImpl(this, opCtx, fullNS, uuid, details, recordStore, dbce)) {
this->_impl().init(opCtx);
}
// Use this constructor only for testing/mocks
explicit inline Collection(std::unique_ptr mock) : _pimpl(std::move(mock)) {}
inline ~Collection() = default;
inline bool ok() const {
return this->_impl().ok();
}
inline CollectionCatalogEntry* getCatalogEntry() {
return this->_impl().getCatalogEntry();
}
inline const CollectionCatalogEntry* getCatalogEntry() const {
return this->_impl().getCatalogEntry();
}
inline CollectionInfoCache* infoCache() {
return this->_impl().infoCache();
}
inline const CollectionInfoCache* infoCache() const {
return this->_impl().infoCache();
}
inline const NamespaceString& ns() const {
return this->_impl().ns();
}
inline OptionalCollectionUUID uuid() const {
return this->_impl().uuid();
}
inline void refreshUUID(OperationContext* opCtx) {
return this->_impl().refreshUUID(opCtx);
}
inline const IndexCatalog* getIndexCatalog() const {
return this->_impl().getIndexCatalog();
}
inline IndexCatalog* getIndexCatalog() {
return this->_impl().getIndexCatalog();
}
inline const RecordStore* getRecordStore() const {
return this->_impl().getRecordStore();
}
inline RecordStore* getRecordStore() {
return this->_impl().getRecordStore();
}
inline CursorManager* getCursorManager() const {
return this->_impl().getCursorManager();
}
inline bool requiresIdIndex() const {
return this->_impl().requiresIdIndex();
}
inline Snapshotted docFor(OperationContext* const opCtx, const RecordId& loc) const {
return Snapshotted(opCtx->recoveryUnit()->getSnapshotId(),
this->getRecordStore()->dataFor(opCtx, loc).releaseToBson());
}
/**
* @param out - contents set to the right docs if exists, or nothing.
* @return true iff loc exists
*/
inline bool findDoc(OperationContext* const opCtx,
const RecordId& loc,
Snapshotted* const out) const {
return this->_impl().findDoc(opCtx, loc, out);
}
inline std::unique_ptr getCursor(OperationContext* const opCtx,
const bool forward = true) const {
return this->_impl().getCursor(opCtx, forward);
}
/**
* Returns many cursors that partition the Collection into many disjoint sets. Iterating
* all returned cursors is equivalent to iterating the full collection.
*/
inline std::vector> getManyCursors(
OperationContext* const opCtx) const {
return this->_impl().getManyCursors(opCtx);
}
/**
* Deletes the document with the given RecordId from the collection.
*
* 'fromMigrate' indicates whether the delete was induced by a chunk migration, and
* so should be ignored by the user as an internal maintenance operation and not a
* real delete.
* 'loc' key to uniquely identify a record in a collection.
* 'opDebug' Optional argument. When not null, will be used to record operation statistics.
* 'cappedOK' if true, allows deletes on capped collections (Cloner::copyDB uses this).
* 'noWarn' if unindexing the record causes an error, if noWarn is true the error
* will not be logged.
*/
inline void deleteDocument(OperationContext* const opCtx,
StmtId stmtId,
const RecordId& loc,
OpDebug* const opDebug,
const bool fromMigrate = false,
const bool noWarn = false,
StoreDeletedDoc storeDeletedDoc = StoreDeletedDoc::Off) {
return this->_impl().deleteDocument(
opCtx, stmtId, loc, opDebug, fromMigrate, noWarn, storeDeletedDoc);
}
/*
* Inserts all documents inside one WUOW.
* Caller should ensure vector is appropriately sized for this.
* If any errors occur (including WCE), caller should retry documents individually.
*
* 'opDebug' Optional argument. When not null, will be used to record operation statistics.
*/
inline Status insertDocuments(OperationContext* const opCtx,
const std::vector::const_iterator begin,
const std::vector::const_iterator end,
OpDebug* const opDebug,
const bool enforceQuota,
const bool fromMigrate = false) {
return this->_impl().insertDocuments(opCtx, begin, end, opDebug, enforceQuota, fromMigrate);
}
/**
* this does NOT modify the doc before inserting
* i.e. will not add an _id field for documents that are missing it
*
* 'opDebug' Optional argument. When not null, will be used to record operation statistics.
* 'enforceQuota' If false, quotas will be ignored.
*/
inline Status insertDocument(OperationContext* const opCtx,
const InsertStatement& doc,
OpDebug* const opDebug,
const bool enforceQuota,
const bool fromMigrate = false) {
return this->_impl().insertDocument(opCtx, doc, opDebug, enforceQuota, fromMigrate);
}
/**
* Callers must ensure no document validation is performed for this collection when calling
* this method.
*/
inline Status insertDocumentsForOplog(OperationContext* const opCtx,
const DocWriter* const* const docs,
Timestamp* timestamps,
const size_t nDocs) {
return this->_impl().insertDocumentsForOplog(opCtx, docs, timestamps, nDocs);
}
/**
* Inserts a document into the record store and adds it to the MultiIndexBlocks passed in.
*
* NOTE: It is up to caller to commit the indexes.
*/
inline Status insertDocument(OperationContext* const opCtx,
const BSONObj& doc,
const std::vector& indexBlocks,
const bool enforceQuota) {
return this->_impl().insertDocument(opCtx, doc, indexBlocks, enforceQuota);
}
/**
* Updates the document @ oldLocation with newDoc.
*
* If the document fits in the old space, it is put there; if not, it is moved.
* Sets 'args.updatedDoc' to the updated version of the document with damages applied, on
* success.
* 'opDebug' Optional argument. When not null, will be used to record operation statistics.
* @return the post update location of the doc (may or may not be the same as oldLocation)
*/
inline RecordId updateDocument(OperationContext* const opCtx,
const RecordId& oldLocation,
const Snapshotted& oldDoc,
const BSONObj& newDoc,
const bool enforceQuota,
const bool indexesAffected,
OpDebug* const opDebug,
OplogUpdateEntryArgs* const args) {
return this->_impl().updateDocument(
opCtx, oldLocation, oldDoc, newDoc, enforceQuota, indexesAffected, opDebug, args);
}
inline bool updateWithDamagesSupported() const {
return this->_impl().updateWithDamagesSupported();
}
/**
* Not allowed to modify indexes.
* Illegal to call if updateWithDamagesSupported() returns false.
* Sets 'args.updatedDoc' to the updated version of the document with damages applied, on
* success.
* @return the contents of the updated record.
*/
inline StatusWith updateDocumentWithDamages(
OperationContext* const opCtx,
const RecordId& loc,
const Snapshotted& oldRec,
const char* const damageSource,
const mutablebson::DamageVector& damages,
OplogUpdateEntryArgs* const args) {
return this->_impl().updateDocumentWithDamages(
opCtx, loc, oldRec, damageSource, damages, args);
}
// -----------
inline StatusWith compact(OperationContext* const opCtx,
const CompactOptions* const options) {
return this->_impl().compact(opCtx, options);
}
/**
* removes all documents as fast as possible
* indexes before and after will be the same
* as will other characteristics.
*/
inline Status truncate(OperationContext* const opCtx) {
return this->_impl().truncate(opCtx);
}
/**
* @return OK if the validate run successfully
* OK will be returned even if corruption is found
* deatils will be in result.
*/
inline Status validate(OperationContext* const opCtx,
const ValidateCmdLevel level,
bool background,
std::unique_ptr collLk,
ValidateResults* const results,
BSONObjBuilder* const output) {
return this->_impl().validate(opCtx, level, background, std::move(collLk), results, output);
}
/**
* forces data into cache.
*/
inline Status touch(OperationContext* const opCtx,
const bool touchData,
const bool touchIndexes,
BSONObjBuilder* const output) const {
return this->_impl().touch(opCtx, touchData, touchIndexes, output);
}
/**
* Truncate documents newer than the document at 'end' from the capped
* collection. The collection cannot be completely emptied using this
* function. An assertion will be thrown if that is attempted.
* @param inclusive - Truncate 'end' as well iff true
*/
inline void cappedTruncateAfter(OperationContext* const opCtx,
const RecordId end,
const bool inclusive) {
return this->_impl().cappedTruncateAfter(opCtx, end, inclusive);
}
/**
* Returns a non-ok Status if validator is not legal for this collection.
*/
inline StatusWithMatchExpression parseValidator(
OperationContext* opCtx,
const BSONObj& validator,
MatchExpressionParser::AllowedFeatureSet allowedFeatures,
boost::optional
maxFeatureCompatibilityVersion) const {
return this->_impl().parseValidator(
opCtx, validator, allowedFeatures, maxFeatureCompatibilityVersion);
}
static StatusWith parseValidationLevel(StringData);
static StatusWith parseValidationAction(StringData);
static void registerParseValidationLevelImpl(
stdx::function impl);
static void registerParseValidationActionImpl(
stdx::function impl);
/**
* Sets the validator for this collection.
*
* An empty validator removes all validation.
* Requires an exclusive lock on the collection.
*/
inline Status setValidator(OperationContext* const opCtx, const BSONObj validator) {
return this->_impl().setValidator(opCtx, validator);
}
inline Status setValidationLevel(OperationContext* const opCtx, const StringData newLevel) {
return this->_impl().setValidationLevel(opCtx, newLevel);
}
inline Status setValidationAction(OperationContext* const opCtx, const StringData newAction) {
return this->_impl().setValidationAction(opCtx, newAction);
}
inline StringData getValidationLevel() const {
return this->_impl().getValidationLevel();
}
inline StringData getValidationAction() const {
return this->_impl().getValidationAction();
}
inline Status updateValidator(OperationContext* opCtx,
BSONObj newValidator,
StringData newLevel,
StringData newAction) {
return this->_impl().updateValidator(opCtx, newValidator, newLevel, newAction);
}
// -----------
//
// Stats
//
inline bool isCapped() const {
return this->_impl().isCapped();
}
/**
* Get a pointer to a capped insert notifier object. The caller can wait on this object
* until it is notified of a new insert into the capped collection.
*
* It is invalid to call this method unless the collection is capped.
*/
inline std::shared_ptr getCappedInsertNotifier() const {
return this->_impl().getCappedInsertNotifier();
}
inline uint64_t numRecords(OperationContext* const opCtx) const {
return this->_impl().numRecords(opCtx);
}
inline uint64_t dataSize(OperationContext* const opCtx) const {
return this->_impl().dataSize(opCtx);
}
inline int averageObjectSize(OperationContext* const opCtx) const {
uint64_t n = this->numRecords(opCtx);
if (n == 0)
return 5;
return static_cast(this->dataSize(opCtx) / n);
}
inline uint64_t getIndexSize(OperationContext* const opCtx,
BSONObjBuilder* const details = nullptr,
const int scale = 1) {
return this->_impl().getIndexSize(opCtx, details, scale);
}
/**
* If return value is not boost::none, reads with majority read concern using an older snapshot
* must error.
*/
inline boost::optional getMinimumVisibleSnapshot() {
return this->_impl().getMinimumVisibleSnapshot();
}
inline void setMinimumVisibleSnapshot(const Timestamp name) {
return this->_impl().setMinimumVisibleSnapshot(name);
}
inline bool haveCappedWaiters() {
return this->_impl().haveCappedWaiters();
}
/**
* Notify (capped collection) waiters of data changes, like an insert.
*/
inline void notifyCappedWaitersIfNeeded() {
return this->_impl().notifyCappedWaitersIfNeeded();
}
/**
* Get a pointer to the collection's default collator. The pointer must not be used after this
* Collection is destroyed.
*/
inline const CollatorInterface* getDefaultCollator() const {
return this->_impl().getDefaultCollator();
}
private:
inline DatabaseCatalogEntry* dbce() const {
return this->_impl().dbce();
}
inline CollectionCatalogEntry* details() const {
return this->_impl().details();
}
inline Status aboutToDeleteCapped(OperationContext* const opCtx,
const RecordId& loc,
const RecordData data) final {
return this->_impl().aboutToDeleteCapped(opCtx, loc, data);
}
inline Status recordStoreGoingToUpdateInPlace(OperationContext* const opCtx,
const RecordId& loc) final {
return this->_impl().recordStoreGoingToUpdateInPlace(opCtx, loc);
}
// This structure exists to give us a customization point to decide how to force users of this
// class to depend upon the corresponding `collection.cpp` Translation Unit (TU). All public
// forwarding functions call `_impl(), and `_impl` creates an instance of this structure.
struct TUHook {
static void hook() noexcept;
explicit inline TUHook() noexcept {
if (kDebugBuild)
this->hook();
}
};
inline const Impl& _impl() const {
TUHook{};
return *this->_pimpl;
}
inline Impl& _impl() {
TUHook{};
return *this->_pimpl;
}
std::unique_ptr _pimpl;
friend class DatabaseImpl;
friend class IndexCatalogImpl;
};
} // namespace mongo