/* @file dur_commitjob.cpp */ /** * Copyright (C) 2009 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "pch.h" #include "dur_commitjob.h" #include "dur_stats.h" #include "taskqueue.h" #include "client.h" #include "../util/concurrency/threadlocal.h" #include "mongo/util/stacktrace.h" namespace mongo { #if defined(_DEBUG) && (defined(_WIN64) || !defined(_WIN32)) #define CHECK_SPOOLING 1 #endif namespace dur { ThreadLocalIntents::~ThreadLocalIntents() { fassert( 16731, intents.size() == 0 ); } void ThreadLocalIntents::push(const WriteIntent& x) { if( !commitJob._hasWritten ) commitJob._hasWritten = true; if( intents.size() == N ) { if ( !condense() ) { unspool(); } } intents.push_back( x ); #if( CHECK_SPOOLING ) nSpooled++; #endif } void ThreadLocalIntents::_unspool() { if ( intents.size() == 0 ) return; for( unsigned j = 0; j < intents.size(); j++ ) { commitJob.note(intents[j].start(), intents[j].length()); } #if( CHECK_SPOOLING ) nSpooled.signedAdd( -1 * static_cast(intents.size()) ); #endif intents.clear(); } bool ThreadLocalIntents::condense() { std::sort( intents.begin(), intents.end() ); bool didAnything = false; for ( unsigned x = 0; x < intents.size() - 1 ; x++ ) { if ( intents[x].overlaps( intents[x+1] ) ) { intents[x].absorb( intents[x+1] ); intents.erase( intents.begin() + x + 1 ); x--; didAnything = true; #if( CHECK_SPOOLING ) nSpooled.signedAdd(-1); #endif } } return didAnything; } void ThreadLocalIntents::unspool() { if ( intents.size() ) { SimpleMutex::scoped_lock lk(commitJob.groupCommitMutex); _unspool(); } } AtomicUInt ThreadLocalIntents::nSpooled; } TSP_DECLARE(dur::ThreadLocalIntents,tlIntents) TSP_DEFINE(dur::ThreadLocalIntents,tlIntents) namespace dur { void assertNothingSpooled() { #if( CHECK_SPOOLING ) if( ThreadLocalIntents::nSpooled != 0 ) { log() << ThreadLocalIntents::nSpooled.get() << endl; if( tlIntents.get() ) log() << "me:" << tlIntents.get()->n_informational() << endl; else log() << "no tlIntent for my thread" << endl; verify(false); } #endif } // when we release our w or W lock this is invoked void unspoolWriteIntents() { ThreadLocalIntents *t = tlIntents.get(); if( t ) t->unspool(); } /** base declare write intent function that all the helpers call. */ /** we batch up our write intents so that we do not have to synchronize too often */ void DurableImpl::declareWriteIntent(void *p, unsigned len) { cc().writeHappened(); MemoryMappedFile::makeWritable(p, len); ThreadLocalIntents *t = tlIntents.getMake(); t->push(WriteIntent(p,len)); } BOOST_STATIC_ASSERT( UncommittedBytesLimit > BSONObjMaxInternalSize * 3 ); BOOST_STATIC_ASSERT( sizeof(void*)==4 || UncommittedBytesLimit > BSONObjMaxInternalSize * 6 ); void WriteIntent::absorb(const WriteIntent& other) { dassert(overlaps(other)); void* newStart = min(start(), other.start()); p = max(p, other.p); len = (char*)p - (char*)newStart; dassert(contains(other)); } void IntentsAndDurOps::clear() { assertLockedForCommitting(); commitJob.groupCommitMutex.dassertLocked(); _alreadyNoted.clear(); _intents.clear(); _durOps.clear(); #if defined(DEBUG_WRITE_INTENT) cout << "_debug clear\n"; _debug.clear(); #endif } #if defined(DEBUG_WRITE_INTENT) void assertAlreadyDeclared(void *p, int len) { if( commitJob.wi()._debug[p] >= len ) return; log() << "assertAlreadyDeclared fails " << (void*)p << " len:" << len << ' ' << commitJob.wi()._debug[p] << endl; printStackTrace(); abort(); } #endif /** note an operation other than a "basic write" */ void CommitJob::noteOp(shared_ptr p) { dassert( cmdLine.dur ); // DurOp's are rare so it is ok to have the lock cost here SimpleMutex::scoped_lock lk(groupCommitMutex); cc().writeHappened(); _hasWritten = true; _intentsAndDurOps._durOps.push_back(p); } size_t privateMapBytes = 0; // used by _REMAPPRIVATEVIEW to track how much / how fast to remap void CommitJob::commitingBegin() { assertLockedForCommitting(); _commitNumber = _notify.now(); stats.curr->_commits++; } void CommitJob::_committingReset() { _hasWritten = false; _intentsAndDurOps.clear(); privateMapBytes += _bytes; _bytes = 0; _nSinceCommitIfNeededCall = 0; } CommitJob::CommitJob() : groupCommitMutex("groupCommit"), _hasWritten(false) { _commitNumber = 0; _bytes = 0; _nSinceCommitIfNeededCall = 0; } void CommitJob::note(void* p, int len) { groupCommitMutex.dassertLocked(); dassert( _hasWritten ); // from the point of view of the dur module, it would be fine (i think) to only // be read locked here. but must be at least read locked to avoid race with // remapprivateview if( !_intentsAndDurOps._alreadyNoted.checkAndSet(p, len) ) { /** tips for debugging: if you have an incorrect diff between data files in different folders (see jstests/dur/quick.js for example), turn this on and see what is logged. if you have a copy of its output from before the regression, a simple diff of these lines would tell you a lot likely. */ #if 0 && defined(_DEBUG) { static int n; if( ++n < 10000 ) { size_t ofs; MongoMMF *mmf = privateViews._find(w.p, ofs); if( mmf ) { log() << "DEBUG note write intent " << w.p << ' ' << mmf->filename() << " ofs:" << hex << ofs << " len:" << w.len << endl; } else { log() << "DEBUG note write intent " << w.p << ' ' << w.len << " NOT FOUND IN privateViews" << endl; } } else if( n == 10000 ) { log() << "DEBUG stopping write intent logging, too much to log" << endl; } } #endif // remember intent. we will journal it in a bit _intentsAndDurOps.insertWriteIntent(p, len); { // a bit over conservative in counting pagebytes used static size_t lastPos; // note this doesn't reset with each commit, but that is ok we aren't being that precise size_t x = ((size_t) p) & ~0xfff; // round off to page address (4KB) if( x != lastPos ) { lastPos = x; unsigned b = (len+4095) & ~0xfff; _bytes += b; #if defined(_DEBUG) _nSinceCommitIfNeededCall++; if( _nSinceCommitIfNeededCall >= 80 ) { if( _nSinceCommitIfNeededCall % 40 == 0 ) { log() << "debug nsincecommitifneeded:" << _nSinceCommitIfNeededCall << " bytes:" << _bytes << endl; if( _nSinceCommitIfNeededCall == 240 || _nSinceCommitIfNeededCall == 1200 ) { log() << "_DEBUG printing stack given high nsinccommitifneeded number" << endl; printStackTrace(); } } } #endif if (_bytes > UncommittedBytesLimit * 3) { static time_t lastComplain; static unsigned nComplains; // throttle logging if( ++nComplains < 100 || time(0) - lastComplain >= 60 ) { lastComplain = time(0); warning() << "DR102 too much data written uncommitted " << _bytes/1000000.0 << "MB" << endl; if( nComplains < 10 || nComplains % 10 == 0 ) { // wassert makes getLastError show an error, so we just print stack trace printStackTrace(); } } } } } } } } }