// extsort.cpp /** * Copyright (C) 2008 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "stdafx.h" #include "extsort.h" #include "namespace.h" #include "../util/file.h" #include #include #include namespace mongo { unsigned long long BSONObjExternalSorter::_compares = 0; BSONObjExternalSorter::BSONObjExternalSorter( const BSONObj & order , long maxFileSize ) : _order( order.getOwned() ) , _maxFilesize( maxFileSize ) , _cur(0), _curSizeSoFar(0), _sorted(0){ stringstream rootpath; rootpath << dbpath; if ( dbpath[dbpath.size()-1] != '/' ) rootpath << "/"; rootpath << "_tmp/esort." << time(0) << "." << rand() << "/"; _root = rootpath.str(); log(1) << "external sort root: " << _root.string() << endl; create_directories( _root ); _compares = 0; } BSONObjExternalSorter::~BSONObjExternalSorter(){ if ( _cur ){ delete _cur; _cur = 0; } unsigned long removed = remove_all( _root ); wassert( removed == 1 + _files.size() ); } void BSONObjExternalSorter::sort(){ uassert( "already sorted" , ! _sorted ); _sorted = true; if ( _cur && _files.size() == 0 ){ _cur->sort( MyCmp( _order ) ); log(1) << "\t\t not using file. size:" << _curSizeSoFar << " _compares:" << _compares << endl; return; } if ( _cur ){ finishMap(); } if ( _cur ){ delete _cur; _cur = 0; } if ( _files.size() == 0 ) return; } void BSONObjExternalSorter::add( const BSONObj& o , const DiskLoc & loc ){ uassert( "sorted already" , ! _sorted ); if ( ! _cur ){ _cur = new InMemory(); } BSONObj toadd = o; if ( toadd.isOwned() ){ // hack to handle buffer problems if ( toadd.objsize() < 100 ) toadd = toadd.copy(); } else { toadd = toadd.getOwned(); } _cur->push_back( pair( o , loc ) ); long size = o.objsize(); _curSizeSoFar += size + sizeof( DiskLoc ) + sizeof( BSONObj ); if ( _curSizeSoFar > _maxFilesize ) finishMap(); } void BSONObjExternalSorter::finishMap(){ uassert( "bad" , _cur ); _curSizeSoFar = 0; if ( _cur->size() == 0 ) return; _cur->sort( MyCmp( _order ) ); stringstream ss; ss << _root.string() << "/file." << _files.size(); string file = ss.str(); ofstream out; out.open( file.c_str() , ios_base::out | ios_base::binary ); uassert( (string)"couldn't open file: " + file , out.good() ); int num = 0; for ( InMemory::iterator i=_cur->begin(); i != _cur->end(); i++ ){ Data p = *i; out.write( p.first.objdata() , p.first.objsize() ); out.write( (char*)(&p.second) , sizeof( DiskLoc ) ); num++; } _cur->clear(); _files.push_back( file ); out.close(); log(2) << "Added file: " << file << " with " << num << "objects for external sort" << endl; } // --------------------------------- BSONObjExternalSorter::Iterator::Iterator( BSONObjExternalSorter * sorter ) : _cmp( sorter->_order ) , _in( 0 ){ for ( list::iterator i=sorter->_files.begin(); i!=sorter->_files.end(); i++ ){ _files.push_back( new FileIterator( *i ) ); _stash.push_back( pair( Data( BSONObj() , DiskLoc() ) , false ) ); } if ( _files.size() == 0 && sorter->_cur ){ _in = sorter->_cur; _it = sorter->_cur->begin(); } } BSONObjExternalSorter::Iterator::~Iterator(){ for ( vector::iterator i=_files.begin(); i!=_files.end(); i++ ) delete *i; _files.clear(); } bool BSONObjExternalSorter::Iterator::more(){ if ( _in ) return _it != _in->end(); for ( vector::iterator i=_files.begin(); i!=_files.end(); i++ ) if ( (*i)->more() ) return true; for ( vector< pair >::iterator i=_stash.begin(); i!=_stash.end(); i++ ) if ( i->second ) return true; return false; } pair BSONObjExternalSorter::Iterator::next(){ if ( _in ){ return *(_it++); } Data best; int slot = -1; for ( unsigned i=0; i<_stash.size(); i++ ){ if ( ! _stash[i].second ){ if ( _files[i]->more() ) _stash[i] = pair( _files[i]->next() , true ); else continue; } if ( slot == -1 || _cmp( best , _stash[i].first ) == 0 ){ best = _stash[i].first; slot = i; } } assert( slot >= 0 ); _stash[slot].second = false; return best; } // ----------------------------------- BSONObjExternalSorter::FileIterator::FileIterator( string file ){ long length; _buf = (char*)_file.map( file.c_str() , length , MemoryMappedFile::SEQUENTIAL ); massert( "mmap failed" , _buf ); assert( (unsigned long)length == file_size( file ) ); _end = _buf + length; } BSONObjExternalSorter::FileIterator::~FileIterator(){ } bool BSONObjExternalSorter::FileIterator::more(){ return _buf < _end; } pair BSONObjExternalSorter::FileIterator::next(){ BSONObj o( _buf ); _buf += o.objsize(); DiskLoc * l = (DiskLoc*)_buf; _buf += 8; return Data( o , *l ); } }