summaryrefslogtreecommitdiff
path: root/src/mongo/db/compact.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/compact.cpp')
-rw-r--r--src/mongo/db/compact.cpp376
1 files changed, 376 insertions, 0 deletions
diff --git a/src/mongo/db/compact.cpp b/src/mongo/db/compact.cpp
new file mode 100644
index 00000000000..32931b6c5fd
--- /dev/null
+++ b/src/mongo/db/compact.cpp
@@ -0,0 +1,376 @@
+/** @file compact.cpp
+ compaction of deleted space in pdfiles (datafiles)
+*/
+
+/**
+* Copyright (C) 2010 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,b
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "pch.h"
+#include "pdfile.h"
+#include "concurrency.h"
+#include "commands.h"
+#include "curop-inl.h"
+#include "background.h"
+#include "extsort.h"
+#include "compact.h"
+#include "../util/concurrency/task.h"
+#include "../util/timer.h"
+
+namespace mongo {
+
+ char faux;
+
+ void addRecordToRecListInExtent(Record *r, DiskLoc loc);
+ DiskLoc allocateSpaceForANewRecord(const char *ns, NamespaceDetails *d, int lenWHdr, bool god);
+ void freeExtents(DiskLoc firstExt, DiskLoc lastExt);
+
+ /* this should be done in alloc record not here, but doing here for now.
+ really dumb; it's a start.
+ */
+ unsigned quantizeMask(unsigned x) {
+ if( x > 4096 * 20 )
+ return ~4095;
+ if( x >= 512 )
+ return ~63;
+ return ~0;
+ }
+
+ /** @return number of skipped (invalid) documents */
+ unsigned compactExtent(const char *ns, NamespaceDetails *d, const DiskLoc ext, int n,
+ const scoped_array<IndexSpec> &indexSpecs,
+ scoped_array<SortPhaseOne>& phase1, int nidx, bool validate,
+ double pf, int pb)
+ {
+ log() << "compact extent #" << n << endl;
+ unsigned oldObjSize = 0; // we'll report what the old padding was
+ unsigned oldObjSizeWithPadding = 0;
+
+ Extent *e = ext.ext();
+ e->assertOk();
+ assert( e->validates() );
+ unsigned skipped = 0;
+
+ {
+ // the next/prev pointers within the extent might not be in order so we first page the whole thing in
+ // sequentially
+ log() << "compact paging in len=" << e->length/1000000.0 << "MB" << endl;
+ Timer t;
+ MAdvise adv(e, e->length, MAdvise::Sequential);
+ const char *p = (const char *) e;
+ for( int i = 0; i < e->length; i += 4096 ) {
+ faux += p[i];
+ }
+ int ms = t.millis();
+ if( ms > 1000 )
+ log() << "compact end paging in " << ms << "ms " << e->length/1000000.0/ms << "MB/sec" << endl;
+ }
+
+ {
+ log() << "compact copying records" << endl;
+ unsigned totalSize = 0;
+ int nrecs = 0;
+ DiskLoc L = e->firstRecord;
+ if( !L.isNull() ) {
+ while( 1 ) {
+ Record *recOld = L.rec();
+ L = recOld->nextInExtent(L);
+ nrecs++;
+ BSONObj objOld(recOld);
+
+ if( !validate || objOld.valid() ) {
+ unsigned sz = objOld.objsize();
+
+ oldObjSize += sz;
+ oldObjSizeWithPadding += recOld->netLength();
+
+ unsigned lenWHdr = sz + Record::HeaderSize;
+ unsigned lenWPadding = lenWHdr;
+ {
+ lenWPadding = static_cast<unsigned>(pf*lenWPadding);
+ lenWPadding += pb;
+ lenWPadding = lenWPadding & quantizeMask(lenWPadding);
+ if( lenWPadding < lenWHdr || lenWPadding > BSONObjMaxUserSize / 2 ) {
+ lenWPadding = lenWHdr;
+ }
+ }
+ totalSize += lenWPadding;
+ DiskLoc loc = allocateSpaceForANewRecord(ns, d, lenWPadding, false);
+ uassert(14024, "compact error out of space during compaction", !loc.isNull());
+ Record *recNew = loc.rec();
+ recNew = (Record *) getDur().writingPtr(recNew, lenWHdr);
+ addRecordToRecListInExtent(recNew, loc);
+ memcpy(recNew->data, objOld.objdata(), sz);
+
+ {
+ // extract keys for all indexes we will be rebuilding
+ for( int x = 0; x < nidx; x++ ) {
+ phase1[x].addKeys(indexSpecs[x], objOld, loc);
+ }
+ }
+ }
+ else {
+ if( ++skipped <= 10 )
+ log() << "compact skipping invalid object" << endl;
+ }
+
+ if( L.isNull() ) {
+ // we just did the very last record from the old extent. it's still pointed to
+ // by the old extent ext, but that will be fixed below after this loop
+ break;
+ }
+
+ // remove the old records (orphan them) periodically so our commit block doesn't get too large
+ bool stopping = false;
+ RARELY stopping = *killCurrentOp.checkForInterruptNoAssert() != 0;
+ if( stopping || getDur().aCommitIsNeeded() ) {
+ e->firstRecord.writing() = L;
+ Record *r = L.rec();
+ getDur().writingInt(r->prevOfs) = DiskLoc::NullOfs;
+ getDur().commitIfNeeded();
+ killCurrentOp.checkForInterrupt(false);
+ }
+ }
+ } // if !L.isNull()
+
+ assert( d->firstExtent == ext );
+ assert( d->lastExtent != ext );
+ DiskLoc newFirst = e->xnext;
+ d->firstExtent.writing() = newFirst;
+ newFirst.ext()->xprev.writing().Null();
+ getDur().writing(e)->markEmpty();
+ freeExtents(ext,ext);
+ getDur().commitIfNeeded();
+
+ {
+ double op = 1.0;
+ if( oldObjSize )
+ op = static_cast<double>(oldObjSizeWithPadding)/oldObjSize;
+ log() << "compact " << nrecs << " documents " << totalSize/1000000.0 << "MB"
+ << " oldPadding: " << op << ' ' << static_cast<unsigned>(op*100.0)/100
+ << endl;
+ }
+ }
+
+ return skipped;
+ }
+
+ extern SortPhaseOne *precalced;
+
+ bool _compact(const char *ns, NamespaceDetails *d, string& errmsg, bool validate, BSONObjBuilder& result, double pf, int pb) {
+ //int les = d->lastExtentSize;
+
+ // this is a big job, so might as well make things tidy before we start just to be nice.
+ getDur().commitNow();
+
+ list<DiskLoc> extents;
+ for( DiskLoc L = d->firstExtent; !L.isNull(); L = L.ext()->xnext )
+ extents.push_back(L);
+ log() << "compact " << extents.size() << " extents" << endl;
+
+ ProgressMeterHolder pm( cc().curop()->setMessage( "compact extent" , extents.size() ) );
+
+ // same data, but might perform a little different after compact?
+ NamespaceDetailsTransient::get(ns).clearQueryCache();
+
+ int nidx = d->nIndexes;
+ scoped_array<IndexSpec> indexSpecs( new IndexSpec[nidx] );
+ scoped_array<SortPhaseOne> phase1( new SortPhaseOne[nidx] );
+ {
+ NamespaceDetails::IndexIterator ii = d->ii();
+ int x = 0;
+ while( ii.more() ) {
+ BSONObjBuilder b;
+ IndexDetails& idx = ii.next();
+ BSONObj::iterator i(idx.info.obj());
+ while( i.more() ) {
+ BSONElement e = i.next();
+ if( !str::equals(e.fieldName(), "v") && !str::equals(e.fieldName(), "background") ) {
+ b.append(e);
+ }
+ }
+ BSONObj o = b.obj().getOwned();
+ phase1[x].sorter.reset( new BSONObjExternalSorter( idx.idxInterface(), o.getObjectField("key") ) );
+ phase1[x].sorter->hintNumObjects( d->stats.nrecords );
+ indexSpecs[x++].reset(o);
+ }
+ }
+
+ log() << "compact orphan deleted lists" << endl;
+ for( int i = 0; i < Buckets; i++ ) {
+ d->deletedList[i].writing().Null();
+ }
+
+
+
+ // Start over from scratch with our extent sizing and growth
+ d->lastExtentSize=0;
+
+ // before dropping indexes, at least make sure we can allocate one extent!
+ uassert(14025, "compact error no space available to allocate", !allocateSpaceForANewRecord(ns, d, Record::HeaderSize+1, false).isNull());
+
+ // note that the drop indexes call also invalidates all clientcursors for the namespace, which is important and wanted here
+ log() << "compact dropping indexes" << endl;
+ BSONObjBuilder b;
+ if( !dropIndexes(d, ns, "*", errmsg, b, true) ) {
+ errmsg = "compact drop indexes failed";
+ log() << errmsg << endl;
+ return false;
+ }
+
+ getDur().commitNow();
+
+ long long skipped = 0;
+ int n = 0;
+ for( list<DiskLoc>::iterator i = extents.begin(); i != extents.end(); i++ ) {
+ skipped += compactExtent(ns, d, *i, n++, indexSpecs, phase1, nidx, validate, pf, pb);
+ pm.hit();
+ }
+
+ if( skipped ) {
+ result.append("invalidObjects", skipped);
+ }
+
+ assert( d->firstExtent.ext()->xprev.isNull() );
+
+ // indexes will do their own progress meter?
+ pm.finished();
+
+ // build indexes
+ NamespaceString s(ns);
+ string si = s.db + ".system.indexes";
+ for( int i = 0; i < nidx; i++ ) {
+ killCurrentOp.checkForInterrupt(false);
+ BSONObj info = indexSpecs[i].info;
+ log() << "compact create index " << info["key"].Obj().toString() << endl;
+ try {
+ precalced = &phase1[i];
+ theDataFileMgr.insert(si.c_str(), info.objdata(), info.objsize());
+ }
+ catch(...) {
+ precalced = 0;
+ throw;
+ }
+ precalced = 0;
+ }
+
+ return true;
+ }
+
+ bool compact(const string& ns, string &errmsg, bool validate, BSONObjBuilder& result, double pf, int pb) {
+ massert( 14028, "bad ns", NamespaceString::normal(ns.c_str()) );
+ massert( 14027, "can't compact a system namespace", !str::contains(ns, ".system.") ); // items in system.indexes cannot be moved there are pointers to those disklocs in NamespaceDetails
+
+ bool ok;
+ {
+ writelock lk;
+ BackgroundOperation::assertNoBgOpInProgForNs(ns.c_str());
+ Client::Context ctx(ns);
+ NamespaceDetails *d = nsdetails(ns.c_str());
+ massert( 13660, str::stream() << "namespace " << ns << " does not exist", d );
+ massert( 13661, "cannot compact capped collection", !d->capped );
+ log() << "compact " << ns << " begin" << endl;
+ if( pf != 0 || pb != 0 ) {
+ log() << "paddingFactor:" << pf << " paddingBytes:" << pb << endl;
+ }
+ try {
+ ok = _compact(ns.c_str(), d, errmsg, validate, result, pf, pb);
+ }
+ catch(...) {
+ log() << "compact " << ns << " end (with error)" << endl;
+ throw;
+ }
+ log() << "compact " << ns << " end" << endl;
+ }
+ return ok;
+ }
+
+ bool isCurrentlyAReplSetPrimary();
+
+ class CompactCmd : public Command {
+ public:
+ virtual LockType locktype() const { return NONE; }
+ virtual bool adminOnly() const { return false; }
+ virtual bool slaveOk() const { return true; }
+ virtual bool maintenanceMode() const { return true; }
+ virtual bool logTheOp() { return false; }
+ virtual void help( stringstream& help ) const {
+ help << "compact collection\n"
+ "warning: this operation blocks the server and is slow. you can cancel with cancelOp()\n"
+ "{ compact : <collection_name>, [force:true], [validate:true] }\n"
+ " force - allows to run on a replica set primary\n"
+ " validate - check records are noncorrupt before adding to newly compacting extents. slower but safer (default is true in this version)\n";
+ }
+ virtual bool requiresAuth() { return true; }
+ CompactCmd() : Command("compact") { }
+
+ virtual bool run(const string& db, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
+ string coll = cmdObj.firstElement().valuestr();
+ if( coll.empty() || db.empty() ) {
+ errmsg = "no collection name specified";
+ return false;
+ }
+
+ if( isCurrentlyAReplSetPrimary() && !cmdObj["force"].trueValue() ) {
+ errmsg = "will not run compact on an active replica set primary as this is a slow blocking operation. use force:true to force";
+ return false;
+ }
+
+ string ns = db + '.' + coll;
+ if ( ! NamespaceString::normal(ns.c_str()) ) {
+ errmsg = "bad namespace name";
+ return false;
+ }
+
+ // parameter validation to avoid triggering assertions in compact()
+ if ( str::contains(ns, ".system.") ) {
+ errmsg = "can't compact a system namespace";
+ return false;
+ }
+
+ {
+ writelock lk;
+ Client::Context ctx(ns);
+ NamespaceDetails *d = nsdetails(ns.c_str());
+ if( ! d ) {
+ errmsg = "namespace does not exist";
+ return false;
+ }
+
+ if ( d->capped ) {
+ errmsg = "cannot compact a capped collection";
+ return false;
+ }
+ }
+
+ double pf = 1.0;
+ int pb = 0;
+ if( cmdObj.hasElement("paddingFactor") ) {
+ pf = cmdObj["paddingFactor"].Number();
+ assert( pf >= 1.0 && pf <= 4.0 );
+ }
+ if( cmdObj.hasElement("paddingBytes") ) {
+ pb = (int) cmdObj["paddingBytes"].Number();
+ assert( pb >= 0 && pb <= 1024 * 1024 );
+ }
+
+ bool validate = !cmdObj.hasElement("validate") || cmdObj["validate"].trueValue(); // default is true at the moment
+ bool ok = compact(ns, errmsg, validate, result, pf, pb);
+ return ok;
+ }
+ };
+ static CompactCmd compactCmd;
+
+}