/**
* Copyright (C) 2015 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand
#include "mongo/platform/basic.h"
#include "mongo/db/catalog/capped_utils.h"
#include "mongo/db/background.h"
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/collection_catalog_entry.h"
#include "mongo/db/catalog/database.h"
#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/catalog/index_catalog.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/client.h"
#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/index_builder.h"
#include "mongo/db/operation_context_impl.h"
#include "mongo/db/query/internal_plans.h"
#include "mongo/db/query/plan_yield_policy.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/service_context.h"
#include "mongo/util/scopeguard.h"
namespace mongo {
Status emptyCapped(OperationContext* txn,
const NamespaceString& collectionName) {
ScopedTransaction scopedXact(txn, MODE_IX);
AutoGetDb autoDb(txn, collectionName.db(), MODE_X);
bool userInitiatedWritesAndNotPrimary = txn->writesAreReplicated() &&
!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(
collectionName);
if (userInitiatedWritesAndNotPrimary) {
return Status(ErrorCodes::NotMaster,
str::stream() << "Not primary while truncating collection "
<< collectionName.ns());
}
Database* db = autoDb.getDb();
massert(13429, "no such database", db);
Collection* collection = db->getCollection(collectionName);
massert(28584, "no such collection", collection);
BackgroundOperation::assertNoBgOpInProgForNs(collectionName.ns());
WriteUnitOfWork wuow(txn);
Status status = collection->truncate(txn);
if (!status.isOK()) {
return status;
}
getGlobalServiceContext()->getOpObserver()->onEmptyCapped(txn, collection->ns());
wuow.commit();
return Status::OK();
}
Status cloneCollectionAsCapped(OperationContext* txn,
Database* db,
const std::string& shortFrom,
const std::string& shortTo,
double size,
bool temp) {
std::string fromNs = db->name() + "." + shortFrom;
std::string toNs = db->name() + "." + shortTo;
Collection* fromCollection = db->getCollection(fromNs);
if (!fromCollection)
return Status(ErrorCodes::NamespaceNotFound,
str::stream() << "source collection " << fromNs << " does not exist");
if (db->getCollection(toNs))
return Status(ErrorCodes::NamespaceExists, "to collection already exists");
// create new collection
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
const auto fromOptions = fromCollection->getCatalogEntry()
->getCollectionOptions(txn)
.toBSON();
OldClientContext ctx(txn, toNs);
BSONObjBuilder spec;
spec.appendBool("capped", true);
spec.append("size", size);
if (temp)
spec.appendBool("temp", true);
spec.appendElementsUnique(fromOptions);
WriteUnitOfWork wunit(txn);
Status status = userCreateNS(txn, ctx.db(), toNs, spec.done());
if (!status.isOK())
return status;
wunit.commit();
} MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "cloneCollectionAsCapped", fromNs);
Collection* toCollection = db->getCollection(toNs);
invariant(toCollection); // we created above
// how much data to ignore because it won't fit anyway
// datasize and extentSize can't be compared exactly, so add some padding to 'size'
long long allocatedSpaceGuess =
std::max(static_cast(size * 2),
static_cast(toCollection->getRecordStore()->storageSize(txn) * 2));
long long excessSize = fromCollection->dataSize(txn) - allocatedSpaceGuess;
boost::scoped_ptr exec(InternalPlanner::collectionScan(
txn,
fromNs,
fromCollection,
InternalPlanner::FORWARD));
exec->setYieldPolicy(PlanExecutor::WRITE_CONFLICT_RETRY_ONLY);
Snapshotted objToClone;
RecordId loc;
PlanExecutor::ExecState state = PlanExecutor::FAILURE; // suppress uninitialized warnings
DisableDocumentValidation validationDisabler(txn);
int retries = 0; // non-zero when retrying our last document.
while (true) {
if (!retries) {
state = exec->getNextSnapshotted(&objToClone, &loc);
}
switch(state) {
case PlanExecutor::IS_EOF:
return Status::OK();
case PlanExecutor::ADVANCED:
{
if (excessSize > 0) {
// 4x is for padding, power of 2, etc...
excessSize -= (4 * objToClone.value().objsize());
continue;
}
break;
}
default:
// Unreachable as:
// 1) We require a read lock (at a minimum) on the "from" collection
// and won't yield, preventing collection drop and PlanExecutor::DEAD
// 2) PlanExecutor::FAILURE is only returned on PlanStage::FAILURE. The
// CollectionScan PlanStage does not have a FAILURE scenario.
// 3) All other PlanExecutor states are handled above
invariant(false);
}
try {
// Make sure we are working with the latest version of the document.
if (objToClone.snapshotId() != txn->recoveryUnit()->getSnapshotId()
&& !fromCollection->findDoc(txn, loc, &objToClone)) {
// doc was deleted so don't clone it.
retries = 0;
continue;
}
WriteUnitOfWork wunit(txn);
toCollection->insertDocument(txn,
objToClone.value(),
true,
txn->writesAreReplicated());
wunit.commit();
// Go to the next document
retries = 0;
}
catch (const WriteConflictException& wce) {
CurOp::get(txn)->debug().writeConflicts++;
retries++; // logAndBackoff expects this to be 1 on first call.
wce.logAndBackoff(retries, "cloneCollectionAsCapped", fromNs);
// Can't use WRITE_CONFLICT_RETRY_LOOP macros since we need to save/restore exec
// around call to abandonSnapshot.
exec->saveState();
txn->recoveryUnit()->abandonSnapshot();
exec->restoreState(txn); // Handles any WCEs internally.
}
}
invariant(false); // unreachable
}
Status convertToCapped(OperationContext* txn,
const NamespaceString& collectionName,
double size) {
StringData dbname = collectionName.db();
StringData shortSource = collectionName.coll();
ScopedTransaction transaction(txn, MODE_IX);
AutoGetDb autoDb(txn, collectionName.db(), MODE_X);
bool userInitiatedWritesAndNotPrimary = txn->writesAreReplicated() &&
!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(collectionName);
if (userInitiatedWritesAndNotPrimary) {
return Status(ErrorCodes::NotMaster,
str::stream() << "Not primary while converting "
<< collectionName.ns() << " to a capped collection");
}
Database* const db = autoDb.getDb();
if (!db) {
return Status(ErrorCodes::DatabaseNotFound,
str::stream() << "database " << dbname << " not found");
}
BackgroundOperation::assertNoBgOpInProgForDb(dbname);
std::string shortTmpName = str::stream() << "tmp.convertToCapped." << shortSource;
std::string longTmpName = str::stream() << dbname << "." << shortTmpName;
if (db->getCollection(longTmpName)) {
WriteUnitOfWork wunit(txn);
Status status = db->dropCollection(txn, longTmpName);
if (!status.isOK())
return status;
}
const bool shouldReplicateWrites = txn->writesAreReplicated();
txn->setReplicatedWrites(false);
ON_BLOCK_EXIT(&OperationContext::setReplicatedWrites, txn, shouldReplicateWrites);
Status status = cloneCollectionAsCapped(txn,
db,
shortSource.toString(),
shortTmpName,
size,
true);
if (!status.isOK()) {
return status;
}
verify(db->getCollection(longTmpName));
{
WriteUnitOfWork wunit(txn);
status = db->dropCollection(txn, collectionName.ns());
txn->setReplicatedWrites(shouldReplicateWrites);
if (!status.isOK())
return status;
status = db->renameCollection(txn, longTmpName, collectionName.ns(), false);
if (!status.isOK())
return status;
getGlobalServiceContext()->getOpObserver()->onConvertToCapped(
txn,
NamespaceString(collectionName),
size);
wunit.commit();
}
return Status::OK();
}
} // namespace mongo