diff options
Diffstat (limited to 'qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp | 141 |
1 files changed, 73 insertions, 68 deletions
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp index f820c3c075..663431657c 100644 --- a/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp +++ b/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp @@ -30,15 +30,17 @@ * \author Kim van der Riet */ -#include "qpid/legacystore/jrnl/deq_rec.h" +#include "qpid/linearstore/jrnl/deq_rec.h" +#include "qpid/linearstore/jrnl/utils/deq_hdr.h" +#include "qpid/linearstore/jrnl/utils/rec_tail.h" #include <cassert> #include <cerrno> #include <cstdlib> #include <cstring> #include <iomanip> -#include "qpid/legacystore/jrnl/jerrno.h" -#include "qpid/legacystore/jrnl/jexception.h" +#include "qpid/linearstore/jrnl/jerrno.h" +#include "qpid/linearstore/jrnl/jexception.h" #include <sstream> namespace mrg @@ -47,19 +49,26 @@ namespace journal { deq_rec::deq_rec(): - _deq_hdr(RHM_JDAT_DEQ_MAGIC, RHM_JDAT_VERSION, 0, 0, 0, false), +// _deq_hdr(QLS_DEQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, false), _xidp(0), - _buff(0), - _deq_tail(_deq_hdr) -{} + _buff(0) +// _deq_tail(_deq_hdr) +{ + ::deq_hdr_init(&_deq_hdr, QLS_DEQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, 0); + ::rec_tail_copy(&_deq_tail, &_deq_hdr._rhdr, 0); +} -deq_rec::deq_rec(const u_int64_t rid, const u_int64_t drid, const void* const xidp, - const std::size_t xidlen, const bool owi, const bool txn_coml_commit): - _deq_hdr(RHM_JDAT_DEQ_MAGIC, RHM_JDAT_VERSION, rid, drid, xidlen, owi, txn_coml_commit), +deq_rec::deq_rec(const uint64_t rid, const uint64_t drid, const void* const xidp, + const std::size_t xidlen, const bool txn_coml_commit): +// _deq_hdr(QLS_DEQ_MAGIC, QLS_JRNL_VERSION, rid, drid, xidlen, owi, txn_coml_commit), _xidp(xidp), - _buff(0), - _deq_tail(_deq_hdr) -{} + _buff(0) +// _deq_tail(_deq_hdr) +{ + ::deq_hdr_init(&_deq_hdr, QLS_DEQ_MAGIC, QLS_JRNL_VERSION, 0, rid, drid, xidlen); + ::rec_tail_copy(&_deq_tail, &_deq_hdr._rhdr, 0); + ::set_txn_coml_commit(&_deq_hdr, txn_coml_commit); +} deq_rec::~deq_rec() { @@ -69,23 +78,23 @@ deq_rec::~deq_rec() void deq_rec::reset() { - _deq_hdr._rid = 0; - _deq_hdr.set_owi(false); - _deq_hdr.set_txn_coml_commit(false); + _deq_hdr._rhdr._rid = 0; +// _deq_hdr.set_owi(false); + ::set_txn_coml_commit(&_deq_hdr, false); _deq_hdr._deq_rid = 0; _deq_hdr._xidsize = 0; + _deq_tail._checksum = 0; _deq_tail._rid = 0; _xidp = 0; _buff = 0; } void -deq_rec::reset(const u_int64_t rid, const u_int64_t drid, const void* const xidp, - const std::size_t xidlen, const bool owi, const bool txn_coml_commit) +deq_rec::reset(const uint64_t rid, const uint64_t drid, const void* const xidp, + const std::size_t xidlen, const bool txn_coml_commit) { - _deq_hdr._rid = rid; - _deq_hdr.set_owi(owi); - _deq_hdr.set_txn_coml_commit(txn_coml_commit); + _deq_hdr._rhdr._rid = rid; + ::set_txn_coml_commit(&_deq_hdr, txn_coml_commit); _deq_hdr._deq_rid = drid; _deq_hdr._xidsize = xidlen; _deq_tail._rid = rid; @@ -93,8 +102,8 @@ deq_rec::reset(const u_int64_t rid, const u_int64_t drid, const void* const xi _buff = 0; } -u_int32_t -deq_rec::encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks) +uint32_t +deq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) { assert(wptr != 0); assert(max_size_dblks > 0); @@ -205,8 +214,8 @@ deq_rec::encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks) return size_dblks(wr_cnt); } -u_int32_t -deq_rec::decode(rec_hdr& h, void* rptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks) +uint32_t +deq_rec::decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) { assert(rptr != 0); assert(max_size_dblks > 0); @@ -214,18 +223,18 @@ deq_rec::decode(rec_hdr& h, void* rptr, u_int32_t rec_offs_dblks, u_int32_t max_ std::size_t rd_cnt = 0; if (rec_offs_dblks) // Continuation of record on new page { - const u_int32_t hdr_xid_dblks = size_dblks(deq_hdr::size() + _deq_hdr._xidsize); - const u_int32_t hdr_xid_tail_dblks = size_dblks(deq_hdr::size() + _deq_hdr._xidsize + - rec_tail::size()); + const uint32_t hdr_xid_dblks = size_dblks(sizeof(deq_hdr_t) + _deq_hdr._xidsize); + const uint32_t hdr_xid_tail_dblks = size_dblks(sizeof(deq_hdr_t) + _deq_hdr._xidsize + + sizeof(rec_tail_t)); const std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE; if (hdr_xid_tail_dblks - rec_offs_dblks <= max_size_dblks) { // Remainder of xid fits within this page - if (rec_offs - deq_hdr::size() < _deq_hdr._xidsize) + if (rec_offs - sizeof(deq_hdr_t) < _deq_hdr._xidsize) { // Part of xid still outstanding, copy remainder of xid and tail - const std::size_t xid_offs = rec_offs - deq_hdr::size(); + const std::size_t xid_offs = rec_offs - sizeof(deq_hdr_t); const std::size_t xid_rem = _deq_hdr._xidsize - xid_offs; std::memcpy((char*)_buff + xid_offs, rptr, xid_rem); rd_cnt = xid_rem; @@ -236,8 +245,8 @@ deq_rec::decode(rec_hdr& h, void* rptr, u_int32_t rec_offs_dblks, u_int32_t max_ else { // Tail or part of tail only outstanding, complete tail - const std::size_t tail_offs = rec_offs - deq_hdr::size() - _deq_hdr._xidsize; - const std::size_t tail_rem = rec_tail::size() - tail_offs; + const std::size_t tail_offs = rec_offs - sizeof(deq_hdr_t) - _deq_hdr._xidsize; + const std::size_t tail_rem = sizeof(rec_tail_t) - tail_offs; std::memcpy((char*)&_deq_tail + tail_offs, rptr, tail_rem); chk_tail(); rd_cnt = tail_rem; @@ -246,7 +255,7 @@ deq_rec::decode(rec_hdr& h, void* rptr, u_int32_t rec_offs_dblks, u_int32_t max_ else if (hdr_xid_dblks - rec_offs_dblks <= max_size_dblks) { // Remainder of xid fits within this page, tail split - const std::size_t xid_offs = rec_offs - deq_hdr::size(); + const std::size_t xid_offs = rec_offs - sizeof(deq_hdr_t); const std::size_t xid_rem = _deq_hdr._xidsize - xid_offs; std::memcpy((char*)_buff + xid_offs, rptr, xid_rem); rd_cnt += xid_rem; @@ -261,30 +270,28 @@ deq_rec::decode(rec_hdr& h, void* rptr, u_int32_t rec_offs_dblks, u_int32_t max_ { // Remainder of xid split const std::size_t xid_cp_size = (max_size_dblks * JRNL_DBLK_SIZE); - std::memcpy((char*)_buff + rec_offs - deq_hdr::size(), rptr, xid_cp_size); + std::memcpy((char*)_buff + rec_offs - sizeof(deq_hdr_t), rptr, xid_cp_size); rd_cnt += xid_cp_size; } } else // Start of record { // Get and check header - _deq_hdr.hdr_copy(h); - rd_cnt = sizeof(rec_hdr); - _deq_hdr._deq_rid = *(u_int64_t*)((char*)rptr + rd_cnt); - rd_cnt += sizeof(u_int64_t); -#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT) - rd_cnt += sizeof(u_int32_t); // Filler 0 -#endif + //_deq_hdr.hdr_copy(h); + ::rec_hdr_copy(&_deq_hdr._rhdr, &h); + rd_cnt = sizeof(rec_hdr_t); + _deq_hdr._deq_rid = *(uint64_t*)((char*)rptr + rd_cnt); + rd_cnt += sizeof(uint64_t); _deq_hdr._xidsize = *(std::size_t*)((char*)rptr + rd_cnt); - rd_cnt = _deq_hdr.size(); + rd_cnt = sizeof(deq_hdr_t); chk_hdr(); if (_deq_hdr._xidsize) { _buff = std::malloc(_deq_hdr._xidsize); MALLOC_CHK(_buff, "_buff", "deq_rec", "decode"); - const u_int32_t hdr_xid_dblks = size_dblks(deq_hdr::size() + _deq_hdr._xidsize); - const u_int32_t hdr_xid_tail_dblks = size_dblks(deq_hdr::size() + _deq_hdr._xidsize + - rec_tail::size()); + const uint32_t hdr_xid_dblks = size_dblks(sizeof(deq_hdr_t) + _deq_hdr._xidsize); + const uint32_t hdr_xid_tail_dblks = size_dblks(sizeof(deq_hdr_t) + _deq_hdr._xidsize + + sizeof(rec_tail_t)); // Check if record (header + xid + tail) fits within this page, we can check the // tail before the expense of copying data to memory @@ -322,18 +329,16 @@ deq_rec::decode(rec_hdr& h, void* rptr, u_int32_t rec_offs_dblks, u_int32_t max_ } bool -deq_rec::rcv_decode(rec_hdr h, std::ifstream* ifsp, std::size_t& rec_offs) +deq_rec::rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs) { if (rec_offs == 0) { - _deq_hdr.hdr_copy(h); - ifsp->read((char*)&_deq_hdr._deq_rid, sizeof(u_int64_t)); -#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT) - ifsp->ignore(sizeof(u_int32_t)); // _filler0 -#endif + //_deq_hdr.hdr_copy(h); + ::rec_hdr_copy(&_deq_hdr._rhdr, &h); + ifsp->read((char*)&_deq_hdr._deq_rid, sizeof(uint64_t)); ifsp->read((char*)&_deq_hdr._xidsize, sizeof(std::size_t)); -#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT) - ifsp->ignore(sizeof(u_int32_t)); // _filler0 +#if defined(JRNL_32_BIT) + ifsp->ignore(sizeof(uint32_t)); // _filler0 #endif rec_offs = sizeof(_deq_hdr); // Read header, allocate (if req'd) for xid @@ -360,14 +365,14 @@ deq_rec::rcv_decode(rec_hdr h, std::ifstream* ifsp, std::size_t& rec_offs) } } if (rec_offs < sizeof(_deq_hdr) + - (_deq_hdr._xidsize ? _deq_hdr._xidsize + sizeof(rec_tail) : 0)) + (_deq_hdr._xidsize ? _deq_hdr._xidsize + sizeof(rec_tail_t) : 0)) { // Read tail (or continue reading tail) std::size_t offs = rec_offs - sizeof(_deq_hdr) - _deq_hdr._xidsize; - ifsp->read((char*)&_deq_tail + offs, sizeof(rec_tail) - offs); + ifsp->read((char*)&_deq_tail + offs, sizeof(rec_tail_t) - offs); std::size_t size_read = ifsp->gcount(); rec_offs += size_read; - if (size_read < sizeof(rec_tail) - offs) + if (size_read < sizeof(rec_tail_t) - offs) { assert(ifsp->eof()); // As we may have read past eof, turn off fail bit @@ -399,9 +404,9 @@ std::string& deq_rec::str(std::string& str) const { std::ostringstream oss; - oss << "deq_rec: m=" << _deq_hdr._magic; - oss << " v=" << (int)_deq_hdr._version; - oss << " rid=" << _deq_hdr._rid; + oss << "deq_rec: m=" << _deq_hdr._rhdr._magic; + oss << " v=" << (int)_deq_hdr._rhdr._version; + oss << " rid=" << _deq_hdr._rhdr._rid; oss << " drid=" << _deq_hdr._deq_rid; if (_xidp) oss << " xid=\"" << _xidp << "\""; @@ -418,35 +423,35 @@ deq_rec::xid_size() const std::size_t deq_rec::rec_size() const { - return deq_hdr::size() + (_deq_hdr._xidsize ? _deq_hdr._xidsize + rec_tail::size() : 0); + return sizeof(deq_hdr_t) + (_deq_hdr._xidsize ? _deq_hdr._xidsize + sizeof(rec_tail_t) : 0); } void deq_rec::chk_hdr() const { - jrec::chk_hdr(_deq_hdr); - if (_deq_hdr._magic != RHM_JDAT_DEQ_MAGIC) + jrec::chk_hdr(_deq_hdr._rhdr); + if (_deq_hdr._rhdr._magic != QLS_DEQ_MAGIC) { std::ostringstream oss; oss << std::hex << std::setfill('0'); - oss << "deq magic: rid=0x" << std::setw(16) << _deq_hdr._rid; - oss << ": expected=0x" << std::setw(8) << RHM_JDAT_DEQ_MAGIC; - oss << " read=0x" << std::setw(2) << (int)_deq_hdr._magic; + oss << "deq magic: rid=0x" << std::setw(16) << _deq_hdr._rhdr._rid; + oss << ": expected=0x" << std::setw(8) << QLS_DEQ_MAGIC; + oss << " read=0x" << std::setw(2) << (int)_deq_hdr._rhdr._magic; throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "deq_rec", "chk_hdr"); } } void -deq_rec::chk_hdr(u_int64_t rid) const +deq_rec::chk_hdr(uint64_t rid) const { chk_hdr(); - jrec::chk_rid(_deq_hdr, rid); + jrec::chk_rid(_deq_hdr._rhdr, rid); } void deq_rec::chk_tail() const { - jrec::chk_tail(_deq_tail, _deq_hdr); + jrec::chk_tail(_deq_tail, _deq_hdr._rhdr); } void |