summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/legacystore/jrnl/deq_rec.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/legacystore/jrnl/deq_rec.cpp')
-rw-r--r--cpp/src/qpid/legacystore/jrnl/deq_rec.cpp459
1 files changed, 459 insertions, 0 deletions
diff --git a/cpp/src/qpid/legacystore/jrnl/deq_rec.cpp b/cpp/src/qpid/legacystore/jrnl/deq_rec.cpp
new file mode 100644
index 0000000000..4de412c201
--- /dev/null
+++ b/cpp/src/qpid/legacystore/jrnl/deq_rec.cpp
@@ -0,0 +1,459 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * \file deq_rec.cpp
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * This file contains the code for the mrg::journal::deq_rec (journal dequeue
+ * record) class. See comments in file deq_rec.h for details.
+ *
+ * \author Kim van der Riet
+ */
+
+#include "jrnl/deq_rec.h"
+
+#include <cassert>
+#include <cerrno>
+#include <cstdlib>
+#include <cstring>
+#include <iomanip>
+#include "qpid/legacystore/jrnl/jerrno.h"
+#include "qpid/legacystore/jrnl/jexception.h"
+#include <sstream>
+
+namespace mrg
+{
+namespace journal
+{
+
+deq_rec::deq_rec():
+ _deq_hdr(RHM_JDAT_DEQ_MAGIC, RHM_JDAT_VERSION, 0, 0, 0, false),
+ _xidp(0),
+ _buff(0),
+ _deq_tail(_deq_hdr)
+{}
+
+deq_rec::deq_rec(const u_int64_t rid, const u_int64_t drid, const void* const xidp,
+ const std::size_t xidlen, const bool owi, const bool txn_coml_commit):
+ _deq_hdr(RHM_JDAT_DEQ_MAGIC, RHM_JDAT_VERSION, rid, drid, xidlen, owi, txn_coml_commit),
+ _xidp(xidp),
+ _buff(0),
+ _deq_tail(_deq_hdr)
+{}
+
+deq_rec::~deq_rec()
+{
+ clean();
+}
+
+void
+deq_rec::reset()
+{
+ _deq_hdr._rid = 0;
+ _deq_hdr.set_owi(false);
+ _deq_hdr.set_txn_coml_commit(false);
+ _deq_hdr._deq_rid = 0;
+ _deq_hdr._xidsize = 0;
+ _deq_tail._rid = 0;
+ _xidp = 0;
+ _buff = 0;
+}
+
+void
+deq_rec::reset(const u_int64_t rid, const u_int64_t drid, const void* const xidp,
+ const std::size_t xidlen, const bool owi, const bool txn_coml_commit)
+{
+ _deq_hdr._rid = rid;
+ _deq_hdr.set_owi(owi);
+ _deq_hdr.set_txn_coml_commit(txn_coml_commit);
+ _deq_hdr._deq_rid = drid;
+ _deq_hdr._xidsize = xidlen;
+ _deq_tail._rid = rid;
+ _xidp = xidp;
+ _buff = 0;
+}
+
+u_int32_t
+deq_rec::encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
+{
+ assert(wptr != 0);
+ assert(max_size_dblks > 0);
+ if (_xidp == 0)
+ assert(_deq_hdr._xidsize == 0);
+
+ std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE;
+ std::size_t rem = max_size_dblks * JRNL_DBLK_SIZE;
+ std::size_t wr_cnt = 0;
+ if (rec_offs_dblks) // Continuation of split dequeue record (over 2 or more pages)
+ {
+ if (size_dblks(rec_size()) - rec_offs_dblks > max_size_dblks) // Further split required
+ {
+ rec_offs -= sizeof(_deq_hdr);
+ std::size_t wsize = _deq_hdr._xidsize > rec_offs ? _deq_hdr._xidsize - rec_offs : 0;
+ std::size_t wsize2 = wsize;
+ if (wsize)
+ {
+ if (wsize > rem)
+ wsize = rem;
+ std::memcpy(wptr, (const char*)_xidp + rec_offs, wsize);
+ wr_cnt += wsize;
+ rem -= wsize;
+ }
+ rec_offs -= _deq_hdr._xidsize - wsize2;
+ if (rem)
+ {
+ wsize = sizeof(_deq_tail) > rec_offs ? sizeof(_deq_tail) - rec_offs : 0;
+ wsize2 = wsize;
+ if (wsize)
+ {
+ if (wsize > rem)
+ wsize = rem;
+ std::memcpy((char*)wptr + wr_cnt, (char*)&_deq_tail + rec_offs, wsize);
+ wr_cnt += wsize;
+ rem -= wsize;
+ }
+ rec_offs -= sizeof(_deq_tail) - wsize2;
+ }
+ assert(rem == 0);
+ assert(rec_offs == 0);
+ }
+ else // No further split required
+ {
+ rec_offs -= sizeof(_deq_hdr);
+ std::size_t wsize = _deq_hdr._xidsize > rec_offs ? _deq_hdr._xidsize - rec_offs : 0;
+ if (wsize)
+ {
+ std::memcpy(wptr, (const char*)_xidp + rec_offs, wsize);
+ wr_cnt += wsize;
+ }
+ rec_offs -= _deq_hdr._xidsize - wsize;
+ wsize = sizeof(_deq_tail) > rec_offs ? sizeof(_deq_tail) - rec_offs : 0;
+ if (wsize)
+ {
+ std::memcpy((char*)wptr + wr_cnt, (char*)&_deq_tail + rec_offs, wsize);
+ wr_cnt += wsize;
+#ifdef RHM_CLEAN
+ std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE;
+ std::size_t dblk_rec_size = size_dblks(rec_size() - rec_offs) * JRNL_DBLK_SIZE;
+ std::memset((char*)wptr + wr_cnt, RHM_CLEAN_CHAR, dblk_rec_size - wr_cnt);
+#endif
+ }
+ rec_offs -= sizeof(_deq_tail) - wsize;
+ assert(rec_offs == 0);
+ }
+ }
+ else // Start at beginning of data record
+ {
+ // Assumption: the header will always fit into the first dblk
+ std::memcpy(wptr, (void*)&_deq_hdr, sizeof(_deq_hdr));
+ wr_cnt = sizeof(_deq_hdr);
+ if (size_dblks(rec_size()) > max_size_dblks) // Split required - can only occur with xid
+ {
+ std::size_t wsize;
+ rem -= sizeof(_deq_hdr);
+ if (rem)
+ {
+ wsize = rem >= _deq_hdr._xidsize ? _deq_hdr._xidsize : rem;
+ std::memcpy((char*)wptr + wr_cnt, _xidp, wsize);
+ wr_cnt += wsize;
+ rem -= wsize;
+ }
+ if (rem)
+ {
+ wsize = rem >= sizeof(_deq_tail) ? sizeof(_deq_tail) : rem;
+ std::memcpy((char*)wptr + wr_cnt, (void*)&_deq_tail, wsize);
+ wr_cnt += wsize;
+ rem -= wsize;
+ }
+ assert(rem == 0);
+ }
+ else // No split required
+ {
+ if (_deq_hdr._xidsize)
+ {
+ std::memcpy((char*)wptr + wr_cnt, _xidp, _deq_hdr._xidsize);
+ wr_cnt += _deq_hdr._xidsize;
+ std::memcpy((char*)wptr + wr_cnt, (void*)&_deq_tail, sizeof(_deq_tail));
+ wr_cnt += sizeof(_deq_tail);
+ }
+#ifdef RHM_CLEAN
+ std::size_t dblk_rec_size = size_dblks(rec_size()) * JRNL_DBLK_SIZE;
+ std::memset((char*)wptr + wr_cnt, RHM_CLEAN_CHAR, dblk_rec_size - wr_cnt);
+#endif
+ }
+ }
+ return size_dblks(wr_cnt);
+}
+
+u_int32_t
+deq_rec::decode(rec_hdr& h, void* rptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
+{
+ assert(rptr != 0);
+ assert(max_size_dblks > 0);
+
+ std::size_t rd_cnt = 0;
+ if (rec_offs_dblks) // Continuation of record on new page
+ {
+ const u_int32_t hdr_xid_dblks = size_dblks(deq_hdr::size() + _deq_hdr._xidsize);
+ const u_int32_t hdr_xid_tail_dblks = size_dblks(deq_hdr::size() + _deq_hdr._xidsize +
+ rec_tail::size());
+ const std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE;
+
+ if (hdr_xid_tail_dblks - rec_offs_dblks <= max_size_dblks)
+ {
+ // Remainder of xid fits within this page
+ if (rec_offs - deq_hdr::size() < _deq_hdr._xidsize)
+ {
+ // Part of xid still outstanding, copy remainder of xid and tail
+ const std::size_t xid_offs = rec_offs - deq_hdr::size();
+ const std::size_t xid_rem = _deq_hdr._xidsize - xid_offs;
+ std::memcpy((char*)_buff + xid_offs, rptr, xid_rem);
+ rd_cnt = xid_rem;
+ std::memcpy((void*)&_deq_tail, ((char*)rptr + rd_cnt), sizeof(_deq_tail));
+ chk_tail();
+ rd_cnt += sizeof(_deq_tail);
+ }
+ else
+ {
+ // Tail or part of tail only outstanding, complete tail
+ const std::size_t tail_offs = rec_offs - deq_hdr::size() - _deq_hdr._xidsize;
+ const std::size_t tail_rem = rec_tail::size() - tail_offs;
+ std::memcpy((char*)&_deq_tail + tail_offs, rptr, tail_rem);
+ chk_tail();
+ rd_cnt = tail_rem;
+ }
+ }
+ else if (hdr_xid_dblks - rec_offs_dblks <= max_size_dblks)
+ {
+ // Remainder of xid fits within this page, tail split
+ const std::size_t xid_offs = rec_offs - deq_hdr::size();
+ const std::size_t xid_rem = _deq_hdr._xidsize - xid_offs;
+ std::memcpy((char*)_buff + xid_offs, rptr, xid_rem);
+ rd_cnt += xid_rem;
+ const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE) - rd_cnt;
+ if (tail_rem)
+ {
+ std::memcpy((void*)&_deq_tail, ((char*)rptr + xid_rem), tail_rem);
+ rd_cnt += tail_rem;
+ }
+ }
+ else
+ {
+ // Remainder of xid split
+ const std::size_t xid_cp_size = (max_size_dblks * JRNL_DBLK_SIZE);
+ std::memcpy((char*)_buff + rec_offs - deq_hdr::size(), rptr, xid_cp_size);
+ rd_cnt += xid_cp_size;
+ }
+ }
+ else // Start of record
+ {
+ // Get and check header
+ _deq_hdr.hdr_copy(h);
+ rd_cnt = sizeof(rec_hdr);
+ _deq_hdr._deq_rid = *(u_int64_t*)((char*)rptr + rd_cnt);
+ rd_cnt += sizeof(u_int64_t);
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+ rd_cnt += sizeof(u_int32_t); // Filler 0
+#endif
+ _deq_hdr._xidsize = *(std::size_t*)((char*)rptr + rd_cnt);
+ rd_cnt = _deq_hdr.size();
+ chk_hdr();
+ if (_deq_hdr._xidsize)
+ {
+ _buff = std::malloc(_deq_hdr._xidsize);
+ MALLOC_CHK(_buff, "_buff", "deq_rec", "decode");
+ const u_int32_t hdr_xid_dblks = size_dblks(deq_hdr::size() + _deq_hdr._xidsize);
+ const u_int32_t hdr_xid_tail_dblks = size_dblks(deq_hdr::size() + _deq_hdr._xidsize +
+ rec_tail::size());
+
+ // Check if record (header + xid + tail) fits within this page, we can check the
+ // tail before the expense of copying data to memory
+ if (hdr_xid_tail_dblks <= max_size_dblks)
+ {
+ // Entire header, xid and tail fits within this page
+ std::memcpy(_buff, (char*)rptr + rd_cnt, _deq_hdr._xidsize);
+ rd_cnt += _deq_hdr._xidsize;
+ std::memcpy((void*)&_deq_tail, (char*)rptr + rd_cnt, sizeof(_deq_tail));
+ rd_cnt += sizeof(_deq_tail);
+ chk_tail();
+ }
+ else if (hdr_xid_dblks <= max_size_dblks)
+ {
+ // Entire header and xid fit within this page, tail split
+ std::memcpy(_buff, (char*)rptr + rd_cnt, _deq_hdr._xidsize);
+ rd_cnt += _deq_hdr._xidsize;
+ const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE) - rd_cnt;
+ if (tail_rem)
+ {
+ std::memcpy((void*)&_deq_tail, (char*)rptr + rd_cnt, tail_rem);
+ rd_cnt += tail_rem;
+ }
+ }
+ else
+ {
+ // Header fits within this page, xid split
+ const std::size_t xid_cp_size = (max_size_dblks * JRNL_DBLK_SIZE) - rd_cnt;
+ std::memcpy(_buff, (char*)rptr + rd_cnt, xid_cp_size);
+ rd_cnt += xid_cp_size;
+ }
+ }
+ }
+ return size_dblks(rd_cnt);
+}
+
+bool
+deq_rec::rcv_decode(rec_hdr h, std::ifstream* ifsp, std::size_t& rec_offs)
+{
+ if (rec_offs == 0)
+ {
+ _deq_hdr.hdr_copy(h);
+ ifsp->read((char*)&_deq_hdr._deq_rid, sizeof(u_int64_t));
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+ ifsp->ignore(sizeof(u_int32_t)); // _filler0
+#endif
+ ifsp->read((char*)&_deq_hdr._xidsize, sizeof(std::size_t));
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+ ifsp->ignore(sizeof(u_int32_t)); // _filler0
+#endif
+ rec_offs = sizeof(_deq_hdr);
+ // Read header, allocate (if req'd) for xid
+ if (_deq_hdr._xidsize)
+ {
+ _buff = std::malloc(_deq_hdr._xidsize);
+ MALLOC_CHK(_buff, "_buff", "enq_rec", "rcv_decode");
+ }
+ }
+ if (rec_offs < sizeof(_deq_hdr) + _deq_hdr._xidsize)
+ {
+ // Read xid (or continue reading xid)
+ std::size_t offs = rec_offs - sizeof(_deq_hdr);
+ ifsp->read((char*)_buff + offs, _deq_hdr._xidsize - offs);
+ std::size_t size_read = ifsp->gcount();
+ rec_offs += size_read;
+ if (size_read < _deq_hdr._xidsize - offs)
+ {
+ assert(ifsp->eof());
+ // As we may have read past eof, turn off fail bit
+ ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit));
+ assert(!ifsp->fail() && !ifsp->bad());
+ return false;
+ }
+ }
+ if (rec_offs < sizeof(_deq_hdr) +
+ (_deq_hdr._xidsize ? _deq_hdr._xidsize + sizeof(rec_tail) : 0))
+ {
+ // Read tail (or continue reading tail)
+ std::size_t offs = rec_offs - sizeof(_deq_hdr) - _deq_hdr._xidsize;
+ ifsp->read((char*)&_deq_tail + offs, sizeof(rec_tail) - offs);
+ std::size_t size_read = ifsp->gcount();
+ rec_offs += size_read;
+ if (size_read < sizeof(rec_tail) - offs)
+ {
+ assert(ifsp->eof());
+ // As we may have read past eof, turn off fail bit
+ ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit));
+ assert(!ifsp->fail() && !ifsp->bad());
+ return false;
+ }
+ }
+ ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - rec_size());
+ if (_deq_hdr._xidsize)
+ chk_tail(); // Throws if tail invalid or record incomplete
+ assert(!ifsp->fail() && !ifsp->bad());
+ return true;
+}
+
+std::size_t
+deq_rec::get_xid(void** const xidpp)
+{
+ if (!_buff)
+ {
+ *xidpp = 0;
+ return 0;
+ }
+ *xidpp = _buff;
+ return _deq_hdr._xidsize;
+}
+
+std::string&
+deq_rec::str(std::string& str) const
+{
+ std::ostringstream oss;
+ oss << "deq_rec: m=" << _deq_hdr._magic;
+ oss << " v=" << (int)_deq_hdr._version;
+ oss << " rid=" << _deq_hdr._rid;
+ oss << " drid=" << _deq_hdr._deq_rid;
+ if (_xidp)
+ oss << " xid=\"" << _xidp << "\"";
+ str.append(oss.str());
+ return str;
+}
+
+std::size_t
+deq_rec::xid_size() const
+{
+ return _deq_hdr._xidsize;
+}
+
+std::size_t
+deq_rec::rec_size() const
+{
+ return deq_hdr::size() + (_deq_hdr._xidsize ? _deq_hdr._xidsize + rec_tail::size() : 0);
+}
+
+void
+deq_rec::chk_hdr() const
+{
+ jrec::chk_hdr(_deq_hdr);
+ if (_deq_hdr._magic != RHM_JDAT_DEQ_MAGIC)
+ {
+ std::ostringstream oss;
+ oss << std::hex << std::setfill('0');
+ oss << "deq magic: rid=0x" << std::setw(16) << _deq_hdr._rid;
+ oss << ": expected=0x" << std::setw(8) << RHM_JDAT_DEQ_MAGIC;
+ oss << " read=0x" << std::setw(2) << (int)_deq_hdr._magic;
+ throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "deq_rec", "chk_hdr");
+ }
+}
+
+void
+deq_rec::chk_hdr(u_int64_t rid) const
+{
+ chk_hdr();
+ jrec::chk_rid(_deq_hdr, rid);
+}
+
+void
+deq_rec::chk_tail() const
+{
+ jrec::chk_tail(_deq_tail, _deq_hdr);
+}
+
+void
+deq_rec::clean()
+{
+ // clean up allocated memory here
+}
+
+} // namespace journal
+} // namespace mrg