summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp')
-rw-r--r--qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp141
1 files changed, 73 insertions, 68 deletions
diff --git a/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp b/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp
index f820c3c075..663431657c 100644
--- a/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp
+++ b/qpid/cpp/src/qpid/linearstore/jrnl/deq_rec.cpp
@@ -30,15 +30,17 @@
* \author Kim van der Riet
*/
-#include "qpid/legacystore/jrnl/deq_rec.h"
+#include "qpid/linearstore/jrnl/deq_rec.h"
+#include "qpid/linearstore/jrnl/utils/deq_hdr.h"
+#include "qpid/linearstore/jrnl/utils/rec_tail.h"
#include <cassert>
#include <cerrno>
#include <cstdlib>
#include <cstring>
#include <iomanip>
-#include "qpid/legacystore/jrnl/jerrno.h"
-#include "qpid/legacystore/jrnl/jexception.h"
+#include "qpid/linearstore/jrnl/jerrno.h"
+#include "qpid/linearstore/jrnl/jexception.h"
#include <sstream>
namespace mrg
@@ -47,19 +49,26 @@ namespace journal
{
deq_rec::deq_rec():
- _deq_hdr(RHM_JDAT_DEQ_MAGIC, RHM_JDAT_VERSION, 0, 0, 0, false),
+// _deq_hdr(QLS_DEQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, false),
_xidp(0),
- _buff(0),
- _deq_tail(_deq_hdr)
-{}
+ _buff(0)
+// _deq_tail(_deq_hdr)
+{
+ ::deq_hdr_init(&_deq_hdr, QLS_DEQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, 0);
+ ::rec_tail_copy(&_deq_tail, &_deq_hdr._rhdr, 0);
+}
-deq_rec::deq_rec(const u_int64_t rid, const u_int64_t drid, const void* const xidp,
- const std::size_t xidlen, const bool owi, const bool txn_coml_commit):
- _deq_hdr(RHM_JDAT_DEQ_MAGIC, RHM_JDAT_VERSION, rid, drid, xidlen, owi, txn_coml_commit),
+deq_rec::deq_rec(const uint64_t rid, const uint64_t drid, const void* const xidp,
+ const std::size_t xidlen, const bool txn_coml_commit):
+// _deq_hdr(QLS_DEQ_MAGIC, QLS_JRNL_VERSION, rid, drid, xidlen, owi, txn_coml_commit),
_xidp(xidp),
- _buff(0),
- _deq_tail(_deq_hdr)
-{}
+ _buff(0)
+// _deq_tail(_deq_hdr)
+{
+ ::deq_hdr_init(&_deq_hdr, QLS_DEQ_MAGIC, QLS_JRNL_VERSION, 0, rid, drid, xidlen);
+ ::rec_tail_copy(&_deq_tail, &_deq_hdr._rhdr, 0);
+ ::set_txn_coml_commit(&_deq_hdr, txn_coml_commit);
+}
deq_rec::~deq_rec()
{
@@ -69,23 +78,23 @@ deq_rec::~deq_rec()
void
deq_rec::reset()
{
- _deq_hdr._rid = 0;
- _deq_hdr.set_owi(false);
- _deq_hdr.set_txn_coml_commit(false);
+ _deq_hdr._rhdr._rid = 0;
+// _deq_hdr.set_owi(false);
+ ::set_txn_coml_commit(&_deq_hdr, false);
_deq_hdr._deq_rid = 0;
_deq_hdr._xidsize = 0;
+ _deq_tail._checksum = 0;
_deq_tail._rid = 0;
_xidp = 0;
_buff = 0;
}
void
-deq_rec::reset(const u_int64_t rid, const u_int64_t drid, const void* const xidp,
- const std::size_t xidlen, const bool owi, const bool txn_coml_commit)
+deq_rec::reset(const uint64_t rid, const uint64_t drid, const void* const xidp,
+ const std::size_t xidlen, const bool txn_coml_commit)
{
- _deq_hdr._rid = rid;
- _deq_hdr.set_owi(owi);
- _deq_hdr.set_txn_coml_commit(txn_coml_commit);
+ _deq_hdr._rhdr._rid = rid;
+ ::set_txn_coml_commit(&_deq_hdr, txn_coml_commit);
_deq_hdr._deq_rid = drid;
_deq_hdr._xidsize = xidlen;
_deq_tail._rid = rid;
@@ -93,8 +102,8 @@ deq_rec::reset(const u_int64_t rid, const u_int64_t drid, const void* const xi
_buff = 0;
}
-u_int32_t
-deq_rec::encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
+uint32_t
+deq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
{
assert(wptr != 0);
assert(max_size_dblks > 0);
@@ -205,8 +214,8 @@ deq_rec::encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
return size_dblks(wr_cnt);
}
-u_int32_t
-deq_rec::decode(rec_hdr& h, void* rptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
+uint32_t
+deq_rec::decode(rec_hdr_t& h, void* rptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
{
assert(rptr != 0);
assert(max_size_dblks > 0);
@@ -214,18 +223,18 @@ deq_rec::decode(rec_hdr& h, void* rptr, u_int32_t rec_offs_dblks, u_int32_t max_
std::size_t rd_cnt = 0;
if (rec_offs_dblks) // Continuation of record on new page
{
- const u_int32_t hdr_xid_dblks = size_dblks(deq_hdr::size() + _deq_hdr._xidsize);
- const u_int32_t hdr_xid_tail_dblks = size_dblks(deq_hdr::size() + _deq_hdr._xidsize +
- rec_tail::size());
+ const uint32_t hdr_xid_dblks = size_dblks(sizeof(deq_hdr_t) + _deq_hdr._xidsize);
+ const uint32_t hdr_xid_tail_dblks = size_dblks(sizeof(deq_hdr_t) + _deq_hdr._xidsize +
+ sizeof(rec_tail_t));
const std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE;
if (hdr_xid_tail_dblks - rec_offs_dblks <= max_size_dblks)
{
// Remainder of xid fits within this page
- if (rec_offs - deq_hdr::size() < _deq_hdr._xidsize)
+ if (rec_offs - sizeof(deq_hdr_t) < _deq_hdr._xidsize)
{
// Part of xid still outstanding, copy remainder of xid and tail
- const std::size_t xid_offs = rec_offs - deq_hdr::size();
+ const std::size_t xid_offs = rec_offs - sizeof(deq_hdr_t);
const std::size_t xid_rem = _deq_hdr._xidsize - xid_offs;
std::memcpy((char*)_buff + xid_offs, rptr, xid_rem);
rd_cnt = xid_rem;
@@ -236,8 +245,8 @@ deq_rec::decode(rec_hdr& h, void* rptr, u_int32_t rec_offs_dblks, u_int32_t max_
else
{
// Tail or part of tail only outstanding, complete tail
- const std::size_t tail_offs = rec_offs - deq_hdr::size() - _deq_hdr._xidsize;
- const std::size_t tail_rem = rec_tail::size() - tail_offs;
+ const std::size_t tail_offs = rec_offs - sizeof(deq_hdr_t) - _deq_hdr._xidsize;
+ const std::size_t tail_rem = sizeof(rec_tail_t) - tail_offs;
std::memcpy((char*)&_deq_tail + tail_offs, rptr, tail_rem);
chk_tail();
rd_cnt = tail_rem;
@@ -246,7 +255,7 @@ deq_rec::decode(rec_hdr& h, void* rptr, u_int32_t rec_offs_dblks, u_int32_t max_
else if (hdr_xid_dblks - rec_offs_dblks <= max_size_dblks)
{
// Remainder of xid fits within this page, tail split
- const std::size_t xid_offs = rec_offs - deq_hdr::size();
+ const std::size_t xid_offs = rec_offs - sizeof(deq_hdr_t);
const std::size_t xid_rem = _deq_hdr._xidsize - xid_offs;
std::memcpy((char*)_buff + xid_offs, rptr, xid_rem);
rd_cnt += xid_rem;
@@ -261,30 +270,28 @@ deq_rec::decode(rec_hdr& h, void* rptr, u_int32_t rec_offs_dblks, u_int32_t max_
{
// Remainder of xid split
const std::size_t xid_cp_size = (max_size_dblks * JRNL_DBLK_SIZE);
- std::memcpy((char*)_buff + rec_offs - deq_hdr::size(), rptr, xid_cp_size);
+ std::memcpy((char*)_buff + rec_offs - sizeof(deq_hdr_t), rptr, xid_cp_size);
rd_cnt += xid_cp_size;
}
}
else // Start of record
{
// Get and check header
- _deq_hdr.hdr_copy(h);
- rd_cnt = sizeof(rec_hdr);
- _deq_hdr._deq_rid = *(u_int64_t*)((char*)rptr + rd_cnt);
- rd_cnt += sizeof(u_int64_t);
-#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
- rd_cnt += sizeof(u_int32_t); // Filler 0
-#endif
+ //_deq_hdr.hdr_copy(h);
+ ::rec_hdr_copy(&_deq_hdr._rhdr, &h);
+ rd_cnt = sizeof(rec_hdr_t);
+ _deq_hdr._deq_rid = *(uint64_t*)((char*)rptr + rd_cnt);
+ rd_cnt += sizeof(uint64_t);
_deq_hdr._xidsize = *(std::size_t*)((char*)rptr + rd_cnt);
- rd_cnt = _deq_hdr.size();
+ rd_cnt = sizeof(deq_hdr_t);
chk_hdr();
if (_deq_hdr._xidsize)
{
_buff = std::malloc(_deq_hdr._xidsize);
MALLOC_CHK(_buff, "_buff", "deq_rec", "decode");
- const u_int32_t hdr_xid_dblks = size_dblks(deq_hdr::size() + _deq_hdr._xidsize);
- const u_int32_t hdr_xid_tail_dblks = size_dblks(deq_hdr::size() + _deq_hdr._xidsize +
- rec_tail::size());
+ const uint32_t hdr_xid_dblks = size_dblks(sizeof(deq_hdr_t) + _deq_hdr._xidsize);
+ const uint32_t hdr_xid_tail_dblks = size_dblks(sizeof(deq_hdr_t) + _deq_hdr._xidsize +
+ sizeof(rec_tail_t));
// Check if record (header + xid + tail) fits within this page, we can check the
// tail before the expense of copying data to memory
@@ -322,18 +329,16 @@ deq_rec::decode(rec_hdr& h, void* rptr, u_int32_t rec_offs_dblks, u_int32_t max_
}
bool
-deq_rec::rcv_decode(rec_hdr h, std::ifstream* ifsp, std::size_t& rec_offs)
+deq_rec::rcv_decode(rec_hdr_t h, std::ifstream* ifsp, std::size_t& rec_offs)
{
if (rec_offs == 0)
{
- _deq_hdr.hdr_copy(h);
- ifsp->read((char*)&_deq_hdr._deq_rid, sizeof(u_int64_t));
-#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
- ifsp->ignore(sizeof(u_int32_t)); // _filler0
-#endif
+ //_deq_hdr.hdr_copy(h);
+ ::rec_hdr_copy(&_deq_hdr._rhdr, &h);
+ ifsp->read((char*)&_deq_hdr._deq_rid, sizeof(uint64_t));
ifsp->read((char*)&_deq_hdr._xidsize, sizeof(std::size_t));
-#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
- ifsp->ignore(sizeof(u_int32_t)); // _filler0
+#if defined(JRNL_32_BIT)
+ ifsp->ignore(sizeof(uint32_t)); // _filler0
#endif
rec_offs = sizeof(_deq_hdr);
// Read header, allocate (if req'd) for xid
@@ -360,14 +365,14 @@ deq_rec::rcv_decode(rec_hdr h, std::ifstream* ifsp, std::size_t& rec_offs)
}
}
if (rec_offs < sizeof(_deq_hdr) +
- (_deq_hdr._xidsize ? _deq_hdr._xidsize + sizeof(rec_tail) : 0))
+ (_deq_hdr._xidsize ? _deq_hdr._xidsize + sizeof(rec_tail_t) : 0))
{
// Read tail (or continue reading tail)
std::size_t offs = rec_offs - sizeof(_deq_hdr) - _deq_hdr._xidsize;
- ifsp->read((char*)&_deq_tail + offs, sizeof(rec_tail) - offs);
+ ifsp->read((char*)&_deq_tail + offs, sizeof(rec_tail_t) - offs);
std::size_t size_read = ifsp->gcount();
rec_offs += size_read;
- if (size_read < sizeof(rec_tail) - offs)
+ if (size_read < sizeof(rec_tail_t) - offs)
{
assert(ifsp->eof());
// As we may have read past eof, turn off fail bit
@@ -399,9 +404,9 @@ std::string&
deq_rec::str(std::string& str) const
{
std::ostringstream oss;
- oss << "deq_rec: m=" << _deq_hdr._magic;
- oss << " v=" << (int)_deq_hdr._version;
- oss << " rid=" << _deq_hdr._rid;
+ oss << "deq_rec: m=" << _deq_hdr._rhdr._magic;
+ oss << " v=" << (int)_deq_hdr._rhdr._version;
+ oss << " rid=" << _deq_hdr._rhdr._rid;
oss << " drid=" << _deq_hdr._deq_rid;
if (_xidp)
oss << " xid=\"" << _xidp << "\"";
@@ -418,35 +423,35 @@ deq_rec::xid_size() const
std::size_t
deq_rec::rec_size() const
{
- return deq_hdr::size() + (_deq_hdr._xidsize ? _deq_hdr._xidsize + rec_tail::size() : 0);
+ return sizeof(deq_hdr_t) + (_deq_hdr._xidsize ? _deq_hdr._xidsize + sizeof(rec_tail_t) : 0);
}
void
deq_rec::chk_hdr() const
{
- jrec::chk_hdr(_deq_hdr);
- if (_deq_hdr._magic != RHM_JDAT_DEQ_MAGIC)
+ jrec::chk_hdr(_deq_hdr._rhdr);
+ if (_deq_hdr._rhdr._magic != QLS_DEQ_MAGIC)
{
std::ostringstream oss;
oss << std::hex << std::setfill('0');
- oss << "deq magic: rid=0x" << std::setw(16) << _deq_hdr._rid;
- oss << ": expected=0x" << std::setw(8) << RHM_JDAT_DEQ_MAGIC;
- oss << " read=0x" << std::setw(2) << (int)_deq_hdr._magic;
+ oss << "deq magic: rid=0x" << std::setw(16) << _deq_hdr._rhdr._rid;
+ oss << ": expected=0x" << std::setw(8) << QLS_DEQ_MAGIC;
+ oss << " read=0x" << std::setw(2) << (int)_deq_hdr._rhdr._magic;
throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "deq_rec", "chk_hdr");
}
}
void
-deq_rec::chk_hdr(u_int64_t rid) const
+deq_rec::chk_hdr(uint64_t rid) const
{
chk_hdr();
- jrec::chk_rid(_deq_hdr, rid);
+ jrec::chk_rid(_deq_hdr._rhdr, rid);
}
void
deq_rec::chk_tail() const
{
- jrec::chk_tail(_deq_tail, _deq_hdr);
+ jrec::chk_tail(_deq_tail, _deq_hdr._rhdr);
}
void