summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp')
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp100
1 files changed, 61 insertions, 39 deletions
diff --git a/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp b/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp
index c246837a8d..7c590713f5 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp
@@ -30,8 +30,6 @@
#include "qpid/linearstore/journal/LinearFileController.h"
#include "qpid/linearstore/journal/utils/file_hdr.h"
-//#include <iostream> // DEBUG
-
namespace qpid {
namespace linearstore {
namespace journal {
@@ -49,7 +47,7 @@ wmgr::wmgr(jcntl* jc,
_deq_busy(false),
_abort_busy(false),
_commit_busy(false),
- _txn_pending_set()
+ _txn_pending_map()
{}
wmgr::wmgr(jcntl* jc,
@@ -67,7 +65,7 @@ wmgr::wmgr(jcntl* jc,
_deq_busy(false),
_abort_busy(false),
_commit_busy(false),
- _txn_pending_set()
+ _txn_pending_map()
{}
wmgr::~wmgr()
@@ -281,6 +279,7 @@ wmgr::dequeue(data_tok* dtokp,
_deq_busy = true;
}
//std::cout << "---+++ wmgr::dequeue() DEQ rid=0x" << std::hex << rid << " drid=0x" << dequeue_rid << " " << std::dec << std::flush; // DEBUG
+ std::string xid((const char*)xid_ptr, xid_len);
bool done = false;
Checksum checksum;
while (!done)
@@ -292,9 +291,27 @@ wmgr::dequeue(data_tok* dtokp,
uint32_t ret = _deq_rec.encode(wptr, data_offs_dblks,
(_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks, checksum);
- // Remember fid which contains the record header in case record is split over several files
if (data_offs_dblks == 0) {
- dtokp->set_fid(_lfc.getCurrentFileSeqNum());
+ uint64_t fid;
+ short eres = _emap.get_pfid(dtokp->dequeue_rid(), fid);
+ if (eres == enq_map::EMAP_OK) {
+ dtokp->set_fid(fid);
+ } else if (xid_len > 0) {
+ txn_data_list_t tdl = _tmap.get_tdata_list(xid);
+ bool found = false;
+ for (tdl_const_itr_t i=tdl.begin(); i!=tdl.end() && !found; ++i) {
+ if (i->rid_ == dtokp->dequeue_rid()) {
+ found = true;
+ dtokp->set_fid(i->pfid_);
+ break;
+ }
+ }
+ if (!found) {
+ throw jexception("rid found in neither emap nor tmap, transactional");
+ }
+ } else {
+ throw jexception("rid not found in emap, non-transactional");
+ }
}
_pg_offset_dblks += ret;
_cached_offset_dblks += ret;
@@ -325,7 +342,7 @@ wmgr::dequeue(data_tok* dtokp,
if (eres == enq_map::EMAP_RID_NOT_FOUND)
{
std::ostringstream oss;
- oss << std::hex << "rid=0x" << rid;
+ oss << std::hex << "emap: rid=0x" << rid;
throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "dequeue");
}
if (eres == enq_map::EMAP_LOCKED)
@@ -335,10 +352,6 @@ wmgr::dequeue(data_tok* dtokp,
throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "dequeue");
}
}
-//std::cout << "[0x" << std::hex << _lfc.getEnqueuedRecordCount(fid) << std::dec << std::flush; // DEBUG
-//try {
- _lfc.decrEnqueuedRecordCount(fid);
-//} catch (std::exception& e) { std::cout << "***OOPS*** " << e.what() << " cfid=" << _lfc.getCurrentFileSeqNum() << " fid=" << fid << std::flush; throw; }
}
done = true;
@@ -427,14 +440,16 @@ wmgr::abort(data_tok* dtokp,
// Delete this txn from tmap, unlock any locked records in emap
std::string xid((const char*)xid_ptr, xid_len);
txn_data_list_t tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
+ fidl_t fidl;
for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++)
{
if (!itr->enq_flag_)
_emap.unlock(itr->drid_); // ignore rid not found error
- if (itr->enq_flag_)
- _lfc.decrEnqueuedRecordCount(itr->pfid_);
+ if (itr->enq_flag_) {
+ fidl.push_back(itr->pfid_);
+ }
}
- std::pair<std::set<std::string>::iterator, bool> res = _txn_pending_set.insert(xid);
+ std::pair<pending_txn_map_itr_t, bool> res = _txn_pending_map.insert(std::pair<std::string, fidl_t>(xid, fidl));
if (!res.second)
{
std::ostringstream oss;
@@ -526,6 +541,7 @@ wmgr::commit(data_tok* dtokp,
// Delete this txn from tmap, process records into emap
std::string xid((const char*)xid_ptr, xid_len);
txn_data_list_t tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
+ fidl_t fidl;
for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++)
{
if (itr->enq_flag_) // txn enqueue
@@ -547,20 +563,20 @@ wmgr::commit(data_tok* dtokp,
if (eres == enq_map::EMAP_RID_NOT_FOUND)
{
std::ostringstream oss;
- oss << std::hex << "rid=0x" << rid;
- throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "dequeue");
+ oss << std::hex << "emap: rid=0x" << itr->drid_;
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "commit");
}
if (eres == enq_map::EMAP_LOCKED)
{
std::ostringstream oss;
- oss << std::hex << "rid=0x" << rid;
- throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "dequeue");
+ oss << std::hex << "rid=0x" << itr->drid_;
+ throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "commit");
}
}
- _lfc.decrEnqueuedRecordCount(fid);
+ fidl.push_back(fid);
}
}
- std::pair<std::set<std::string>::iterator, bool> res = _txn_pending_set.insert(xid);
+ std::pair<pending_txn_map_itr_t, bool> res = _txn_pending_map.insert(std::pair<std::string, fidl_t>(xid, fidl));
if (!res.second)
{
std::ostringstream oss;
@@ -695,7 +711,7 @@ wmgr::get_next_file()
{
_pg_cntr = 0;
//std::cout << "&&&&& wmgr::get_next_file(): " << status_str() << std::flush << std::endl; // DEBUG
- _lfc.pullEmptyFileFromEfp();
+ _lfc.getNextJournalFile();
}
int32_t
@@ -757,7 +773,7 @@ wmgr::get_events(timespec* const timeout,
data_tok* dtokp = pcbp->_pdtokl->at(k);
if (dtokp->decr_pg_cnt() == 0)
{
- std::set<std::string>::iterator it;
+ pending_txn_map_itr_t it;
switch (dtokp->wstate())
{
case data_tok::ENQ_SUBM:
@@ -770,6 +786,9 @@ wmgr::get_events(timespec* const timeout,
_tmap.set_aio_compl(dtokp->xid(), dtokp->rid());
break;
case data_tok::DEQ_SUBM:
+ if (!dtokp->has_xid()) {
+ _lfc.decrEnqueuedRecordCount(dtokp->fid());
+ }
dtokl.push_back(dtokp);
tot_data_toks++;
dtokp->set_wstate(data_tok::DEQ);
@@ -781,31 +800,35 @@ wmgr::get_events(timespec* const timeout,
dtokl.push_back(dtokp);
tot_data_toks++;
dtokp->set_wstate(data_tok::ABORTED);
- it = _txn_pending_set.find(dtokp->xid());
- if (it == _txn_pending_set.end())
+ it = _txn_pending_map.find(dtokp->xid());
+ if (it == _txn_pending_map.end())
{
std::ostringstream oss;
- oss << std::hex << "_txn_pending_set: abort xid=\"";
- oss << dtokp->xid() << "\"";
- throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr",
- "get_events");
+ oss << std::hex << "_txn_pending_set: abort xid=\""
+ << qpid::linearstore::journal::jcntl::str2hexnum(dtokp->xid()) << "\"";
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "get_events");
+ }
+ for (fidl_itr_t i=it->second.begin(); i!=it->second.end(); ++i) {
+ _lfc.decrEnqueuedRecordCount(*i);
}
- _txn_pending_set.erase(it);
+ _txn_pending_map.erase(it);
break;
case data_tok::COMMIT_SUBM:
dtokl.push_back(dtokp);
tot_data_toks++;
dtokp->set_wstate(data_tok::COMMITTED);
- it = _txn_pending_set.find(dtokp->xid());
- if (it == _txn_pending_set.end())
+ it = _txn_pending_map.find(dtokp->xid());
+ if (it == _txn_pending_map.end())
{
std::ostringstream oss;
- oss << std::hex << "_txn_pending_set: commit xid=\"";
- oss << dtokp->xid() << "\"";
- throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr",
- "get_events");
+ oss << std::hex << "_txn_pending_set: commit xid=\""
+ << qpid::linearstore::journal::jcntl::str2hexnum(dtokp->xid()) << "\"";
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "get_events");
+ }
+ for (fidl_itr_t i=it->second.begin(); i!=it->second.end(); ++i) {
+ _lfc.decrEnqueuedRecordCount(*i);
}
- _txn_pending_set.erase(it);
+ _txn_pending_map.erase(it);
break;
case data_tok::ENQ_PART:
case data_tok::DEQ_PART:
@@ -858,8 +881,8 @@ wmgr::is_txn_synced(const std::string& xid)
if (_tmap.is_txn_synced(xid) == txn_map::TMAP_NOT_SYNCED)
return false;
// Check for outstanding commit/aborts
- std::set<std::string>::iterator it = _txn_pending_set.find(xid);
- return it == _txn_pending_set.end();
+ pending_txn_map_itr_t it = _txn_pending_map.find(xid);
+ return it == _txn_pending_map.end();
}
void
@@ -871,7 +894,6 @@ wmgr::initialize(aio_callback* const cbp,
pmgr::initialize(cbp, wcache_pgsize_sblks, wcache_num_pages);
wmgr::clean();
_page_cb_arr[0]._state = IN_USE;
- _ddtokl.clear();
_cached_offset_dblks = 0;
_enq_busy = false;
}