diff options
Diffstat (limited to 'qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp | 100 |
1 files changed, 61 insertions, 39 deletions
diff --git a/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp b/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp index c246837a8d..7c590713f5 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp @@ -30,8 +30,6 @@ #include "qpid/linearstore/journal/LinearFileController.h" #include "qpid/linearstore/journal/utils/file_hdr.h" -//#include <iostream> // DEBUG - namespace qpid { namespace linearstore { namespace journal { @@ -49,7 +47,7 @@ wmgr::wmgr(jcntl* jc, _deq_busy(false), _abort_busy(false), _commit_busy(false), - _txn_pending_set() + _txn_pending_map() {} wmgr::wmgr(jcntl* jc, @@ -67,7 +65,7 @@ wmgr::wmgr(jcntl* jc, _deq_busy(false), _abort_busy(false), _commit_busy(false), - _txn_pending_set() + _txn_pending_map() {} wmgr::~wmgr() @@ -281,6 +279,7 @@ wmgr::dequeue(data_tok* dtokp, _deq_busy = true; } //std::cout << "---+++ wmgr::dequeue() DEQ rid=0x" << std::hex << rid << " drid=0x" << dequeue_rid << " " << std::dec << std::flush; // DEBUG + std::string xid((const char*)xid_ptr, xid_len); bool done = false; Checksum checksum; while (!done) @@ -292,9 +291,27 @@ wmgr::dequeue(data_tok* dtokp, uint32_t ret = _deq_rec.encode(wptr, data_offs_dblks, (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks, checksum); - // Remember fid which contains the record header in case record is split over several files if (data_offs_dblks == 0) { - dtokp->set_fid(_lfc.getCurrentFileSeqNum()); + uint64_t fid; + short eres = _emap.get_pfid(dtokp->dequeue_rid(), fid); + if (eres == enq_map::EMAP_OK) { + dtokp->set_fid(fid); + } else if (xid_len > 0) { + txn_data_list_t tdl = _tmap.get_tdata_list(xid); + bool found = false; + for (tdl_const_itr_t i=tdl.begin(); i!=tdl.end() && !found; ++i) { + if (i->rid_ == dtokp->dequeue_rid()) { + found = true; + dtokp->set_fid(i->pfid_); + break; + } + } + if (!found) { + throw jexception("rid found in neither emap nor tmap, transactional"); + } + } else { + throw jexception("rid not found in emap, non-transactional"); + } } _pg_offset_dblks += ret; _cached_offset_dblks += ret; @@ -325,7 +342,7 @@ wmgr::dequeue(data_tok* dtokp, if (eres == enq_map::EMAP_RID_NOT_FOUND) { std::ostringstream oss; - oss << std::hex << "rid=0x" << rid; + oss << std::hex << "emap: rid=0x" << rid; throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "dequeue"); } if (eres == enq_map::EMAP_LOCKED) @@ -335,10 +352,6 @@ wmgr::dequeue(data_tok* dtokp, throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "dequeue"); } } -//std::cout << "[0x" << std::hex << _lfc.getEnqueuedRecordCount(fid) << std::dec << std::flush; // DEBUG -//try { - _lfc.decrEnqueuedRecordCount(fid); -//} catch (std::exception& e) { std::cout << "***OOPS*** " << e.what() << " cfid=" << _lfc.getCurrentFileSeqNum() << " fid=" << fid << std::flush; throw; } } done = true; @@ -427,14 +440,16 @@ wmgr::abort(data_tok* dtokp, // Delete this txn from tmap, unlock any locked records in emap std::string xid((const char*)xid_ptr, xid_len); txn_data_list_t tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found + fidl_t fidl; for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++) { if (!itr->enq_flag_) _emap.unlock(itr->drid_); // ignore rid not found error - if (itr->enq_flag_) - _lfc.decrEnqueuedRecordCount(itr->pfid_); + if (itr->enq_flag_) { + fidl.push_back(itr->pfid_); + } } - std::pair<std::set<std::string>::iterator, bool> res = _txn_pending_set.insert(xid); + std::pair<pending_txn_map_itr_t, bool> res = _txn_pending_map.insert(std::pair<std::string, fidl_t>(xid, fidl)); if (!res.second) { std::ostringstream oss; @@ -526,6 +541,7 @@ wmgr::commit(data_tok* dtokp, // Delete this txn from tmap, process records into emap std::string xid((const char*)xid_ptr, xid_len); txn_data_list_t tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found + fidl_t fidl; for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++) { if (itr->enq_flag_) // txn enqueue @@ -547,20 +563,20 @@ wmgr::commit(data_tok* dtokp, if (eres == enq_map::EMAP_RID_NOT_FOUND) { std::ostringstream oss; - oss << std::hex << "rid=0x" << rid; - throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "dequeue"); + oss << std::hex << "emap: rid=0x" << itr->drid_; + throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "commit"); } if (eres == enq_map::EMAP_LOCKED) { std::ostringstream oss; - oss << std::hex << "rid=0x" << rid; - throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "dequeue"); + oss << std::hex << "rid=0x" << itr->drid_; + throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "commit"); } } - _lfc.decrEnqueuedRecordCount(fid); + fidl.push_back(fid); } } - std::pair<std::set<std::string>::iterator, bool> res = _txn_pending_set.insert(xid); + std::pair<pending_txn_map_itr_t, bool> res = _txn_pending_map.insert(std::pair<std::string, fidl_t>(xid, fidl)); if (!res.second) { std::ostringstream oss; @@ -695,7 +711,7 @@ wmgr::get_next_file() { _pg_cntr = 0; //std::cout << "&&&&& wmgr::get_next_file(): " << status_str() << std::flush << std::endl; // DEBUG - _lfc.pullEmptyFileFromEfp(); + _lfc.getNextJournalFile(); } int32_t @@ -757,7 +773,7 @@ wmgr::get_events(timespec* const timeout, data_tok* dtokp = pcbp->_pdtokl->at(k); if (dtokp->decr_pg_cnt() == 0) { - std::set<std::string>::iterator it; + pending_txn_map_itr_t it; switch (dtokp->wstate()) { case data_tok::ENQ_SUBM: @@ -770,6 +786,9 @@ wmgr::get_events(timespec* const timeout, _tmap.set_aio_compl(dtokp->xid(), dtokp->rid()); break; case data_tok::DEQ_SUBM: + if (!dtokp->has_xid()) { + _lfc.decrEnqueuedRecordCount(dtokp->fid()); + } dtokl.push_back(dtokp); tot_data_toks++; dtokp->set_wstate(data_tok::DEQ); @@ -781,31 +800,35 @@ wmgr::get_events(timespec* const timeout, dtokl.push_back(dtokp); tot_data_toks++; dtokp->set_wstate(data_tok::ABORTED); - it = _txn_pending_set.find(dtokp->xid()); - if (it == _txn_pending_set.end()) + it = _txn_pending_map.find(dtokp->xid()); + if (it == _txn_pending_map.end()) { std::ostringstream oss; - oss << std::hex << "_txn_pending_set: abort xid=\""; - oss << dtokp->xid() << "\""; - throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", - "get_events"); + oss << std::hex << "_txn_pending_set: abort xid=\"" + << qpid::linearstore::journal::jcntl::str2hexnum(dtokp->xid()) << "\""; + throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "get_events"); + } + for (fidl_itr_t i=it->second.begin(); i!=it->second.end(); ++i) { + _lfc.decrEnqueuedRecordCount(*i); } - _txn_pending_set.erase(it); + _txn_pending_map.erase(it); break; case data_tok::COMMIT_SUBM: dtokl.push_back(dtokp); tot_data_toks++; dtokp->set_wstate(data_tok::COMMITTED); - it = _txn_pending_set.find(dtokp->xid()); - if (it == _txn_pending_set.end()) + it = _txn_pending_map.find(dtokp->xid()); + if (it == _txn_pending_map.end()) { std::ostringstream oss; - oss << std::hex << "_txn_pending_set: commit xid=\""; - oss << dtokp->xid() << "\""; - throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", - "get_events"); + oss << std::hex << "_txn_pending_set: commit xid=\"" + << qpid::linearstore::journal::jcntl::str2hexnum(dtokp->xid()) << "\""; + throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "get_events"); + } + for (fidl_itr_t i=it->second.begin(); i!=it->second.end(); ++i) { + _lfc.decrEnqueuedRecordCount(*i); } - _txn_pending_set.erase(it); + _txn_pending_map.erase(it); break; case data_tok::ENQ_PART: case data_tok::DEQ_PART: @@ -858,8 +881,8 @@ wmgr::is_txn_synced(const std::string& xid) if (_tmap.is_txn_synced(xid) == txn_map::TMAP_NOT_SYNCED) return false; // Check for outstanding commit/aborts - std::set<std::string>::iterator it = _txn_pending_set.find(xid); - return it == _txn_pending_set.end(); + pending_txn_map_itr_t it = _txn_pending_map.find(xid); + return it == _txn_pending_map.end(); } void @@ -871,7 +894,6 @@ wmgr::initialize(aio_callback* const cbp, pmgr::initialize(cbp, wcache_pgsize_sblks, wcache_num_pages); wmgr::clean(); _page_cb_arr[0]._state = IN_USE; - _ddtokl.clear(); _cached_offset_dblks = 0; _enq_busy = false; } |