diff options
Diffstat (limited to 'qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp | 256 |
1 files changed, 167 insertions, 89 deletions
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp index f61da49a1b..a4ed6cbf19 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp @@ -26,20 +26,24 @@ #include <cstdlib> #include <cstring> #include "qpid/linearstore/jrnl/utils/file_hdr.h" +#include "qpid/linearstore/jrnl/jcfg.h" #include "qpid/linearstore/jrnl/jcntl.h" #include "qpid/linearstore/jrnl/jerrno.h" #include "qpid/linearstore/jrnl/JournalFile.h" #include <sstream> #include <stdint.h> -//#include <iostream> // DEBUG +#include <iostream> // DEBUG namespace qpid { namespace qls_jrnl { -wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, LinearFileController& lfc): +wmgr::wmgr(jcntl* jc, + enq_map& emap, + txn_map& tmap, + LinearFileController& lfc): pmgr(jc, emap, tmap), _lfc(lfc), _max_dtokpp(0), @@ -52,8 +56,13 @@ wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, LinearFileController& lfc): _txn_pending_set() {} -wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, LinearFileController& lfc, const uint32_t max_dtokpp, const uint32_t max_iowait_us): - pmgr(jc, emap, tmap /* , dtoklp */), +wmgr::wmgr(jcntl* jc, + enq_map& emap, + txn_map& tmap, + LinearFileController& lfc, + const uint32_t max_dtokpp, + const uint32_t max_iowait_us): + pmgr(jc, emap, tmap), _lfc(lfc), _max_dtokpp(max_dtokpp), _max_io_wait_us(max_iowait_us), @@ -71,9 +80,12 @@ wmgr::~wmgr() } void -wmgr::initialize(aio_callback* const cbp, const uint32_t wcache_pgsize_sblks, - const uint16_t wcache_num_pages, const uint32_t max_dtokpp, const uint32_t max_iowait_us, - std::size_t eo) +wmgr::initialize(aio_callback* const cbp, + const uint32_t wcache_pgsize_sblks, + const uint16_t wcache_num_pages, + const uint32_t max_dtokpp, + const uint32_t max_iowait_us, + std::size_t eo) { _enq_busy = false; _deq_busy = false; @@ -86,26 +98,38 @@ wmgr::initialize(aio_callback* const cbp, const uint32_t wcache_pgsize_sblks, if (eo) { - const uint32_t wr_pg_size_dblks = _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS; - uint32_t data_dblks = (eo / JRNL_DBLK_SIZE_BYTES) - 4; // 4 dblks for file hdr + const uint32_t wr_pg_size_dblks = _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS; + uint32_t data_dblks = (eo / QLS_DBLK_SIZE_BYTES) - 4; // 4 dblks for file hdr _pg_cntr = data_dblks / wr_pg_size_dblks; _pg_offset_dblks = data_dblks - (_pg_cntr * wr_pg_size_dblks); } } iores -wmgr::enqueue(const void* const data_buff, const std::size_t tot_data_len, - const std::size_t this_data_len, data_tok* dtokp, const void* const xid_ptr, - const std::size_t xid_len, const bool transient, const bool external) +wmgr::enqueue(const void* const data_buff, + const std::size_t tot_data_len, + const std::size_t this_data_len, + data_tok* dtokp, + const void* const xid_ptr, + const std::size_t xid_len, + const bool transient, + const bool external) { if (xid_len) assert(xid_ptr != 0); - if (_deq_busy || _abort_busy || _commit_busy) - return RHM_IORES_BUSY; + if (_deq_busy || _abort_busy || _commit_busy) { + std::ostringstream oss; + oss << "RHM_IORES_BUSY: enqueue while part way through another op:"; + oss << " _deq_busy=" << (_deq_busy?"T":"F"); + oss << " _abort_busy=" << (_abort_busy?"T":"F"); + oss << " _commit_busy=" << (_commit_busy?"T":"F"); + throw jexception(oss.str()); // TODO: complete exception + } - if (this_data_len != tot_data_len && !external) - return RHM_IORES_NOTIMPL; + if (this_data_len != tot_data_len && !external) { + throw jexception("RHM_IORES_NOTIMPL: partial enqueues not implemented"); // TODO: complete exception; + } iores res = pre_write_check(WMGR_ENQUEUE, dtokp, xid_len, tot_data_len, external); if (res != RHM_IORES_SUCCESS) @@ -124,9 +148,8 @@ wmgr::enqueue(const void* const data_buff, const std::size_t tot_data_len, } } - uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _lfc.getNextRecordId();/*_wrfc.get_incr_rid()*/ - _enq_rec.reset(rid, data_buff, tot_data_len, xid_ptr, xid_len/*, _wrfc.owi()*/, transient, - external); + uint64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _lfc.getNextRecordId(); + _enq_rec.reset(rid, data_buff, tot_data_len, xid_ptr, xid_len, transient, external); if (!cont) { dtokp->set_rid(rid); @@ -137,14 +160,16 @@ wmgr::enqueue(const void* const data_buff, const std::size_t tot_data_len, dtokp->clear_xid(); _enq_busy = true; } +//std::cout << "---+++ wmgr::enqueue() ENQ rid=0x" << std::hex << rid << " " << std::dec << std::flush; // DEBUG bool done = false; while (!done) { - assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS); - void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE_BYTES); +//std::cout << "*" << std::flush; // DEBUG + assert(_pg_offset_dblks < _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS); + void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * QLS_DBLK_SIZE_BYTES); uint32_t data_offs_dblks = dtokp->dblocks_written(); uint32_t ret = _enq_rec.encode(wptr, data_offs_dblks, - (_cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) - _pg_offset_dblks); + (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks); // Remember fid which contains the record header in case record is split over several files if (data_offs_dblks == 0) { @@ -159,6 +184,7 @@ wmgr::enqueue(const void* const data_buff, const std::size_t tot_data_len, // Is the encoding of this record complete? if (dtokp->dblocks_written() >= _enq_rec.rec_size_dblks()) { +//std::cout << "!" << std::flush; // DEBUG // TODO: Incorrect - must set state to ENQ_CACHED; ENQ_SUBM is set when AIO returns. dtokp->set_wstate(data_tok::ENQ_SUBM); dtokp->set_dsize(tot_data_len); @@ -166,7 +192,8 @@ wmgr::enqueue(const void* const data_buff, const std::size_t tot_data_len, // long multi-page messages have their token on the page containing the END of the // message. AIO callbacks will then only process this token when entire message is // enqueued. - _lfc.incrEnqueuedRecordCount(); + _lfc.incrEnqueuedRecordCount(dtokp->fid()); +//std::cout << "[0x" << std::hex << _lfc.getEnqueuedRecordCount() << std::dec << std::flush; // DEBUG if (xid_len) // If part of transaction, add to transaction map { @@ -185,26 +212,37 @@ wmgr::enqueue(const void* const data_buff, const std::size_t tot_data_len, } done = true; - } - else + } else { +//std::cout << "$" << std::endl << std::flush; // DEBUG dtokp->set_wstate(data_tok::ENQ_PART); + } file_header_check(rid, cont, _enq_rec.rec_size_dblks() - data_offs_dblks); - flush_check(res, cont, done); + flush_check(res, cont, done, rid); } if (dtokp->wstate() >= data_tok::ENQ_SUBM) _enq_busy = false; +//std::cout << " res=" << iores_str(res) << " _enq_busy=" << (_enq_busy?"T":"F") << std::endl << std::flush; // DEBUG return res; } iores -wmgr::dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len, const bool txn_coml_commit) +wmgr::dequeue(data_tok* dtokp, + const void* const xid_ptr, + const std::size_t xid_len, + const bool txn_coml_commit) { if (xid_len) assert(xid_ptr != 0); - if (_enq_busy || _abort_busy || _commit_busy) - return RHM_IORES_BUSY; + if (_enq_busy || _abort_busy || _commit_busy) { + std::ostringstream oss; + oss << "RHM_IORES_BUSY: dequeue while part way through another op:"; + oss << " _enq_busy=" << (_enq_busy?"T":"F"); + oss << " _abort_busy=" << (_abort_busy?"T":"F"); + oss << " _commit_busy=" << (_commit_busy?"T":"F"); + throw jexception(oss.str()); // TODO: complete exception + } iores res = pre_write_check(WMGR_DEQUEUE, dtokp); if (res != RHM_IORES_SUCCESS) @@ -224,7 +262,7 @@ wmgr::dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_ } const bool ext_rid = dtokp->external_rid(); - uint64_t rid = (ext_rid | cont) ? dtokp->rid() : /*_wrfc.get_incr_rid()*/0; // TODO: replace for linearstore: _wrfc + uint64_t rid = (ext_rid | cont) ? dtokp->rid() : _lfc.getNextRecordId(); uint64_t dequeue_rid = (ext_rid | cont) ? dtokp->dequeue_rid() : dtokp->rid(); _deq_rec.reset(rid, dequeue_rid, xid_ptr, xid_len/*, _wrfc.owi()*/, txn_coml_commit); if (!cont) @@ -242,14 +280,16 @@ wmgr::dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_ dtokp->set_dblocks_written(0); // Reset dblks_written from previous op _deq_busy = true; } +//std::cout << "---+++ wmgr::dequeue() DEQ rid=0x" << std::hex << rid << " drid=0x" << dequeue_rid << " " << std::dec << std::flush; // DEBUG bool done = false; while (!done) { - assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS); - void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE_BYTES); +//std::cout << "*" << std::flush; // DEBUG + assert(_pg_offset_dblks < _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS); + void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * QLS_DBLK_SIZE_BYTES); uint32_t data_offs_dblks = dtokp->dblocks_written(); uint32_t ret = _deq_rec.encode(wptr, data_offs_dblks, - (_cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) - _pg_offset_dblks); + (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks); // Remember fid which contains the record header in case record is split over several files if (data_offs_dblks == 0) { @@ -264,6 +304,7 @@ wmgr::dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_ // Is the encoding of this record complete? if (dtokp->dblocks_written() >= _deq_rec.rec_size_dblks()) { +//std::cout << "!" << std::flush; // DEBUG // TODO: Incorrect - must set state to ENQ_CACHED; ENQ_SUBM is set when AIO returns. dtokp->set_wstate(data_tok::DEQ_SUBM); @@ -276,7 +317,7 @@ wmgr::dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_ } else { - int16_t fid; + uint64_t fid; short eres = _emap.get_remove_pfid(dtokp->dequeue_rid(), fid); if (eres < enq_map::EMAP_OK) // fail { @@ -293,30 +334,43 @@ wmgr::dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_ throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "dequeue"); } } - _lfc.decrEnqueuedRecordCount(); +//std::cout << "[0x" << std::hex << _lfc.getEnqueuedRecordCount(fid) << std::dec << std::flush; // DEBUG +//try { + _lfc.decrEnqueuedRecordCount(fid); +//} catch (std::exception& e) { std::cout << "***OOPS*** " << e.what() << " cfid=" << _lfc.getCurrentFileSeqNum() << " fid=" << fid << std::flush; throw; } } done = true; - } - else + } else { +//std::cout << "$" << std::flush; // DEBUG dtokp->set_wstate(data_tok::DEQ_PART); + } file_header_check(rid, cont, _deq_rec.rec_size_dblks() - data_offs_dblks); - flush_check(res, cont, done); + flush_check(res, cont, done, rid); } if (dtokp->wstate() >= data_tok::DEQ_SUBM) _deq_busy = false; +//std::cout << " res=" << iores_str(res) << " _deq_busy=" << (_deq_busy?"T":"F") << std::endl << std::flush; // DEBUG return res; } iores -wmgr::abort(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len) +wmgr::abort(data_tok* dtokp, + const void* const xid_ptr, + const std::size_t xid_len) { // commit and abort MUST have a valid xid assert(xid_ptr != 0 && xid_len > 0); - if (_enq_busy || _deq_busy || _commit_busy) - return RHM_IORES_BUSY; + if (_enq_busy || _deq_busy || _commit_busy) { + std::ostringstream oss; + oss << "RHM_IORES_BUSY: abort while part way through another op:"; + oss << " _enq_busy=" << (_enq_busy?"T":"F"); + oss << " _deq_busy=" << (_deq_busy?"T":"F"); + oss << " _commit_busy=" << (_commit_busy?"T":"F"); + throw jexception(oss.str()); // TODO: complete exception + } iores res = pre_write_check(WMGR_ABORT, dtokp); if (res != RHM_IORES_SUCCESS) @@ -348,11 +402,11 @@ wmgr::abort(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_le bool done = false; while (!done) { - assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS); - void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE_BYTES); + assert(_pg_offset_dblks < _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS); + void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * QLS_DBLK_SIZE_BYTES); uint32_t data_offs_dblks = dtokp->dblocks_written(); uint32_t ret = _txn_rec.encode(wptr, data_offs_dblks, - (_cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) - _pg_offset_dblks); + (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks); // Remember fid which contains the record header in case record is split over several files if (data_offs_dblks == 0) @@ -392,7 +446,7 @@ wmgr::abort(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_le dtokp->set_wstate(data_tok::ABORT_PART); file_header_check(rid, cont, _txn_rec.rec_size_dblks() - data_offs_dblks); - flush_check(res, cont, done); + flush_check(res, cont, done, rid); } if (dtokp->wstate() >= data_tok::ABORT_SUBM) _abort_busy = false; @@ -400,13 +454,21 @@ wmgr::abort(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_le } iores -wmgr::commit(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len) +wmgr::commit(data_tok* dtokp, + const void* const xid_ptr, + const std::size_t xid_len) { // commit and abort MUST have a valid xid assert(xid_ptr != 0 && xid_len > 0); - if (_enq_busy || _deq_busy || _abort_busy) - return RHM_IORES_BUSY; + if (_enq_busy || _deq_busy || _abort_busy) { + std::ostringstream oss; + oss << "RHM_IORES_BUSY: commit while part way through another op:"; + oss << " _enq_busy=" << (_enq_busy?"T":"F"); + oss << " _deq_busy=" << (_deq_busy?"T":"F"); + oss << " _abort_busy=" << (_abort_busy?"T":"F"); + throw jexception(oss.str()); // TODO: complete exception + } iores res = pre_write_check(WMGR_COMMIT, dtokp); if (res != RHM_IORES_SUCCESS) @@ -438,11 +500,11 @@ wmgr::commit(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_l bool done = false; while (!done) { - assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS); - void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE_BYTES); + assert(_pg_offset_dblks < _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS); + void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * QLS_DBLK_SIZE_BYTES); uint32_t data_offs_dblks = dtokp->dblocks_written(); uint32_t ret = _txn_rec.encode(wptr, data_offs_dblks, - (_cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) - _pg_offset_dblks); + (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks); // Remember fid which contains the record header in case record is split over several files if (data_offs_dblks == 0) @@ -475,7 +537,7 @@ wmgr::commit(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_l } else // txn dequeue { - int16_t fid; + uint64_t fid; short eres = _emap.get_remove_pfid(itr->_drid, fid, true); if (eres < enq_map::EMAP_OK) // fail { @@ -509,7 +571,7 @@ wmgr::commit(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_l dtokp->set_wstate(data_tok::COMMIT_PART); file_header_check(rid, cont, _txn_rec.rec_size_dblks() - data_offs_dblks); - flush_check(res, cont, done); + flush_check(res, cont, done, rid); } if (dtokp->wstate() >= data_tok::COMMIT_SUBM) _commit_busy = false; @@ -517,29 +579,34 @@ wmgr::commit(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_l } void -wmgr::file_header_check(const uint64_t rid, const bool cont, const uint32_t rec_dblks_rem) +wmgr::file_header_check(const uint64_t rid, + const bool cont, + const uint32_t rec_dblks_rem) { if (_lfc.isEmpty()) // File never written (i.e. no header or data) { std::size_t fro = 0; if (cont) { - bool file_fit = rec_dblks_rem <= _lfc.dataSize_sblks() * JRNL_SBLK_SIZE_DBLKS; // Will fit within this journal file - bool file_full = rec_dblks_rem == _lfc.dataSize_sblks() * JRNL_SBLK_SIZE_DBLKS; // Will exactly fill this journal file + bool file_fit = rec_dblks_rem <= _lfc.dataSize_sblks() * QLS_SBLK_SIZE_DBLKS; // Will fit within this journal file + bool file_full = rec_dblks_rem == _lfc.dataSize_sblks() * QLS_SBLK_SIZE_DBLKS; // Will exactly fill this journal file if (file_fit && !file_full) { - fro = (rec_dblks_rem + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_DBLKS)) * JRNL_DBLK_SIZE_BYTES; + fro = (rec_dblks_rem + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_DBLKS)) * QLS_DBLK_SIZE_BYTES; } } else { - fro = QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_BYTES; + fro = QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES; } _lfc.asyncFileHeaderWrite(_ioctx, 0, rid, fro); + _aio_evt_rem++; } } void -wmgr::flush_check(iores& res, bool& cont, bool& done) +wmgr::flush_check(iores& res, + bool& cont, + bool& done, const uint64_t /*rid*/) // DEBUG { // Is page is full, flush - if (_pg_offset_dblks >= _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) + if (_pg_offset_dblks >= _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) { res = write_flush(); assert(res == RHM_IORES_SUCCESS); @@ -558,6 +625,7 @@ wmgr::flush_check(iores& res, bool& cont, bool& done) if (!done) { cont = true; } +//std::cout << "***** wmgr::flush_check(): GET NEXT FILE: rid=0x" << std::hex << rid << std::dec << " res=" << iores_str(res) << " cont=" << (cont?"T":"F") << " done=" << (done?"T":"F") << std::endl; // DEBUG } } } @@ -580,28 +648,28 @@ wmgr::write_flush() // Don't bother flushing an empty page or one that is still in state AIO_PENDING if (_cached_offset_dblks) { - if (_page_cb_arr[_pg_index]._state == AIO_PENDING) + if (_page_cb_arr[_pg_index]._state == AIO_PENDING) { +//std::cout << "#"; // DEBUG res = RHM_IORES_PAGE_AIOWAIT; - else - { + } else { if (_page_cb_arr[_pg_index]._state != IN_USE) { std::ostringstream oss; oss << "pg_index=" << _pg_index << " state=" << _page_cb_arr[_pg_index].state_str(); - throw jexception(jerrno::JERR_WMGR_BADPGSTATE, oss.str(), "wmgr", - "write_flush"); + throw jexception(jerrno::JERR_WMGR_BADPGSTATE, oss.str(), "wmgr", "write_flush"); } // Send current page using AIO - // In manual flushes, dblks may not coincide with sblks, add filler records ("RHMx") - // if necessary. + // In manual flushes, dblks may not coincide with sblks, add filler records ("RHMx") if necessary. dblk_roundup(); - std::size_t pg_offs = (_pg_offset_dblks - _cached_offset_dblks) * JRNL_DBLK_SIZE_BYTES; + std::size_t pg_offs = (_pg_offset_dblks - _cached_offset_dblks) * QLS_DBLK_SIZE_BYTES; aio_cb* aiocbp = &_aio_cb_arr[_pg_index]; _lfc.asyncPageWrite(_ioctx, aiocbp, (char*)_page_ptr_arr[_pg_index] + pg_offs, _cached_offset_dblks); + _page_cb_arr[_pg_index]._state = AIO_PENDING; _aio_evt_rem++; +//std::cout << "." << _aio_evt_rem << std::flush; // DEBUG _cached_offset_dblks = 0; _jc->instr_incr_outstanding_aio_cnt(); @@ -610,7 +678,7 @@ wmgr::write_flush() _page_cb_arr[_pg_index]._state = IN_USE; } } - get_events(UNUSED, 0); + get_events(0, false); if (_page_cb_arr[_pg_index]._state == UNUSED) _page_cb_arr[_pg_index]._state = IN_USE; return res; @@ -620,17 +688,19 @@ void wmgr::get_next_file() { _pg_cntr = 0; +//std::cout << "&&&&& wmgr::get_next_file(): " << status_str() << std::endl; // DEBUG _lfc.pullEmptyFileFromEfp(); } int32_t -wmgr::get_events(page_state state, timespec* const timeout, bool flush) +wmgr::get_events(timespec* const timeout, + bool flush) { if (_aio_evt_rem == 0) // no events to get return 0; int ret = 0; - if ((ret = aio::getevents(_ioctx, flush ? _aio_evt_rem : 1, _aio_evt_rem/*_cache_num_pages + _jc->num_jfiles()*/, _aio_event_arr, timeout)) < 0) + if ((ret = aio::getevents(_ioctx, flush ? _aio_evt_rem : 1, _aio_evt_rem, _aio_event_arr, timeout)) < 0) { if (ret == -EINTR) // Interrupted by signal return 0; @@ -652,6 +722,7 @@ wmgr::get_events(page_state state, timespec* const timeout, bool flush) throw jexception(jerrno::JERR__UNDERFLOW, oss.str(), "wmgr", "get_events"); } _aio_evt_rem--; +//std::cout << "'" << _aio_evt_rem; // DEBUG aio_cb* aiocbp = _aio_event_arr[i].obj; // This I/O control block (iocb) page_cb* pcbp = (page_cb*)(aiocbp->data); // This page control block (pcb) long aioret = (long)_aio_event_arr[i].res; @@ -671,6 +742,7 @@ wmgr::get_events(page_state state, timespec* const timeout, bool flush) } if (pcbp) // Page writes have pcb { +//std::cout << "p"; // DEBUG uint32_t s = pcbp->_pdtokl->size(); std::vector<data_tok*> dtokl; dtokl.reserve(s); @@ -754,7 +826,8 @@ wmgr::get_events(page_state state, timespec* const timeout, bool flush) // Clean up this pcb's data_tok list pcbp->_pdtokl->clear(); - pcbp->_state = state; + pcbp->_state = UNUSED; +//std::cout << "c" << pcbp->_index << pcbp->state_str(); // DEBUG // Perform AIO return callback if (_cbp && tot_data_toks) @@ -762,10 +835,10 @@ wmgr::get_events(page_state state, timespec* const timeout, bool flush) } else // File header writes have no pcb { +//std::cout << "f"; // DEBUG file_hdr_t* fhp = (file_hdr_t*)aiocbp->u.c.buf; - _lfc.addWriteCompletedDblkCount(fhp->_file_number, QLS_JRNL_FHDR_RES_SIZE_SBLKS * JRNL_SBLK_SIZE_DBLKS); + _lfc.addWriteCompletedDblkCount(fhp->_file_number, QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_DBLKS); _lfc.decrOutstandingAioOperationCount(fhp->_file_number); - //fcntlp->set_wr_fhdr_aio_outstanding(false); // TODO: Do we need this? } } @@ -784,7 +857,9 @@ wmgr::is_txn_synced(const std::string& xid) } void -wmgr::initialize(aio_callback* const cbp, const uint32_t wcache_pgsize_sblks, const uint16_t wcache_num_pages) +wmgr::initialize(aio_callback* const cbp, + const uint32_t wcache_pgsize_sblks, + const uint16_t wcache_num_pages) { pmgr::initialize(cbp, wcache_pgsize_sblks, wcache_num_pages); @@ -796,9 +871,11 @@ wmgr::initialize(aio_callback* const cbp, const uint32_t wcache_pgsize_sblks, co } iores -wmgr::pre_write_check(const _op_type op, const data_tok* const dtokp, - const std::size_t /*xidsize*/, const std::size_t /*dsize*/, const bool /*external*/ - ) const +wmgr::pre_write_check(const _op_type op, + const data_tok* const dtokp, + const std::size_t /*xidsize*/, + const std::size_t /*dsize*/, + const bool /*external*/) const { // Check status of current file // TODO: Replace for LFC @@ -861,11 +938,12 @@ wmgr::pre_write_check(const _op_type op, const data_tok* const dtokp, } void -wmgr::dequeue_check(const std::string& xid, const uint64_t drid) +wmgr::dequeue_check(const std::string& xid, + const uint64_t drid) { // First check emap bool found = false; - int16_t fid; + uint64_t fid; short eres = _emap.get_pfid(drid, fid); if (eres < enq_map::EMAP_OK) { // fail if (eres == enq_map::EMAP_RID_NOT_FOUND) { @@ -891,13 +969,13 @@ void wmgr::dblk_roundup() { const uint32_t xmagic = QLS_EMPTY_MAGIC; - uint32_t wdblks = jrec::size_blks(_cached_offset_dblks, JRNL_SBLK_SIZE_DBLKS) * JRNL_SBLK_SIZE_DBLKS; + uint32_t wdblks = jrec::size_blks(_cached_offset_dblks, QLS_SBLK_SIZE_DBLKS) * QLS_SBLK_SIZE_DBLKS; while (_cached_offset_dblks < wdblks) { - void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE_BYTES); + void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * QLS_DBLK_SIZE_BYTES); std::memcpy(wptr, (const void*)&xmagic, sizeof(xmagic)); -#ifdef RHM_CLEAN - std::memset((char*)wptr + sizeof(xmagic), RHM_CLEAN_CHAR, JRNL_DBLK_SIZE_BYTES - sizeof(xmagic)); +#ifdef QLS_CLEAN + std::memset((char*)wptr + sizeof(xmagic), QLS_CLEAN_CHAR, QLS_DBLK_SIZE_BYTES - sizeof(xmagic)); #endif _pg_offset_dblks++; _cached_offset_dblks++; @@ -907,14 +985,15 @@ wmgr::dblk_roundup() void wmgr::rotate_page() { - _page_cb_arr[_pg_index]._state = AIO_PENDING; - if (_pg_offset_dblks >= _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) +//std::cout << "^^^^^ wmgr::rotate_page() " << status_str() << " pi=" << _pg_index; // DEBUG + if (_pg_offset_dblks >= _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) { _pg_offset_dblks = 0; _pg_cntr++; } if (++_pg_index >= _cache_num_pages) _pg_index = 0; +//std::cout << "->" << _pg_index << std::endl; // DEBUG } void @@ -928,7 +1007,7 @@ wmgr::status_str() const std::ostringstream oss; oss << "wmgr: pi=" << _pg_index << " pc=" << _pg_cntr; oss << " po=" << _pg_offset_dblks << " aer=" << _aio_evt_rem; - oss << " edac:" << (_enq_busy?"T":"F") << (_deq_busy?"T":"F"); + oss << " edac=" << (_enq_busy?"T":"F") << (_deq_busy?"T":"F"); oss << (_abort_busy?"T":"F") << (_commit_busy?"T":"F"); oss << " ps=["; for (int i=0; i<_cache_num_pages; i++) @@ -938,11 +1017,10 @@ wmgr::status_str() const case UNUSED: oss << "-"; break; case IN_USE: oss << "U"; break; case AIO_PENDING: oss << "A"; break; - case AIO_COMPLETE: oss << "*"; break; default: oss << _page_cb_arr[i]._state; } } - oss << "] " << _lfc.status(0); + oss << "] "; return oss.str(); } |