diff options
Diffstat (limited to 'cpp/src/qpid/legacystore/jrnl/enq_rec.cpp')
-rw-r--r-- | cpp/src/qpid/legacystore/jrnl/enq_rec.cpp | 638 |
1 files changed, 638 insertions, 0 deletions
diff --git a/cpp/src/qpid/legacystore/jrnl/enq_rec.cpp b/cpp/src/qpid/legacystore/jrnl/enq_rec.cpp new file mode 100644 index 0000000000..468599836b --- /dev/null +++ b/cpp/src/qpid/legacystore/jrnl/enq_rec.cpp @@ -0,0 +1,638 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * \file enq_rec.cpp + * + * Qpid asynchronous store plugin library + * + * This file contains the code for the mrg::journal::enq_rec (journal enqueue + * record) class. See comments in file enq_rec.h for details. + * + * \author Kim van der Riet + */ + +#include "qpid/legacystore/jrnl/enq_rec.h" + +#include <cassert> +#include <cerrno> +#include <cstdlib> +#include <cstring> +#include <iomanip> +#include "qpid/legacystore/jrnl/jerrno.h" +#include "qpid/legacystore/jrnl/jexception.h" +#include <sstream> + +namespace mrg +{ +namespace journal +{ + +// Constructor used for read operations, where buf contains preallocated space to receive data. +enq_rec::enq_rec(): + jrec(), // superclass + _enq_hdr(RHM_JDAT_ENQ_MAGIC, RHM_JDAT_VERSION, 0, 0, 0, false, false), + _xidp(0), + _data(0), + _buff(0), + _enq_tail(_enq_hdr) +{} + +// Constructor used for transactional write operations, where dbuf contains data to be written. +enq_rec::enq_rec(const u_int64_t rid, const void* const dbuf, const std::size_t dlen, + const void* const xidp, const std::size_t xidlen, const bool owi, const bool transient): + jrec(), // superclass + _enq_hdr(RHM_JDAT_ENQ_MAGIC, RHM_JDAT_VERSION, rid, xidlen, dlen, owi, transient), + _xidp(xidp), + _data(dbuf), + _buff(0), + _enq_tail(_enq_hdr) +{} + +enq_rec::~enq_rec() +{ + clean(); +} + +// Prepare instance for use in reading data from journal, where buf contains preallocated space +// to receive data. +void +enq_rec::reset() +{ + _enq_hdr._rid = 0; + _enq_hdr.set_owi(false); + _enq_hdr.set_transient(false); + _enq_hdr._xidsize = 0; + _enq_hdr._dsize = 0; + _xidp = 0; + _data = 0; + _buff = 0; + _enq_tail._rid = 0; +} + +// Prepare instance for use in writing transactional data to journal, where dbuf contains data to +// be written. +void +enq_rec::reset(const u_int64_t rid, const void* const dbuf, const std::size_t dlen, + const void* const xidp, const std::size_t xidlen, const bool owi, const bool transient, + const bool external) +{ + _enq_hdr._rid = rid; + _enq_hdr.set_owi(owi); + _enq_hdr.set_transient(transient); + _enq_hdr.set_external(external); + _enq_hdr._xidsize = xidlen; + _enq_hdr._dsize = dlen; + _xidp = xidp; + _data = dbuf; + _buff = 0; + _enq_tail._rid = rid; +} + +u_int32_t +enq_rec::encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks) +{ + assert(wptr != 0); + assert(max_size_dblks > 0); + if (_xidp == 0) + assert(_enq_hdr._xidsize == 0); + + std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE; + std::size_t rem = max_size_dblks * JRNL_DBLK_SIZE; + std::size_t wr_cnt = 0; + if (rec_offs_dblks) // Continuation of split data record (over 2 or more pages) + { + if (size_dblks(rec_size()) - rec_offs_dblks > max_size_dblks) // Further split required + { + rec_offs -= sizeof(_enq_hdr); + std::size_t wsize = _enq_hdr._xidsize > rec_offs ? _enq_hdr._xidsize - rec_offs : 0; + std::size_t wsize2 = wsize; + if (wsize) + { + if (wsize > rem) + wsize = rem; + std::memcpy(wptr, (const char*)_xidp + rec_offs, wsize); + wr_cnt = wsize; + rem -= wsize; + } + rec_offs -= _enq_hdr._xidsize - wsize2; + if (rem && !_enq_hdr.is_external()) + { + wsize = _enq_hdr._dsize > rec_offs ? _enq_hdr._dsize - rec_offs : 0; + wsize2 = wsize; + if (wsize) + { + if (wsize > rem) + wsize = rem; + std::memcpy((char*)wptr + wr_cnt, (const char*)_data + rec_offs, wsize); + wr_cnt += wsize; + rem -= wsize; + } + rec_offs -= _enq_hdr._dsize - wsize2; + } + if (rem) + { + wsize = sizeof(_enq_tail) > rec_offs ? sizeof(_enq_tail) - rec_offs : 0; + wsize2 = wsize; + if (wsize) + { + if (wsize > rem) + wsize = rem; + std::memcpy((char*)wptr + wr_cnt, (char*)&_enq_tail + rec_offs, wsize); + wr_cnt += wsize; + rem -= wsize; + } + rec_offs -= sizeof(_enq_tail) - wsize2; + } + assert(rem == 0); + assert(rec_offs == 0); + } + else // No further split required + { + rec_offs -= sizeof(_enq_hdr); + std::size_t wsize = _enq_hdr._xidsize > rec_offs ? _enq_hdr._xidsize - rec_offs : 0; + if (wsize) + { + std::memcpy(wptr, (const char*)_xidp + rec_offs, wsize); + wr_cnt += wsize; + } + rec_offs -= _enq_hdr._xidsize - wsize; + wsize = _enq_hdr._dsize > rec_offs ? _enq_hdr._dsize - rec_offs : 0; + if (wsize && !_enq_hdr.is_external()) + { + std::memcpy((char*)wptr + wr_cnt, (const char*)_data + rec_offs, wsize); + wr_cnt += wsize; + } + rec_offs -= _enq_hdr._dsize - wsize; + wsize = sizeof(_enq_tail) > rec_offs ? sizeof(_enq_tail) - rec_offs : 0; + if (wsize) + { + std::memcpy((char*)wptr + wr_cnt, (char*)&_enq_tail + rec_offs, wsize); + wr_cnt += wsize; +#ifdef RHM_CLEAN + std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE; + std::size_t dblk_rec_size = size_dblks(rec_size() - rec_offs) * JRNL_DBLK_SIZE; + std::memset((char*)wptr + wr_cnt, RHM_CLEAN_CHAR, dblk_rec_size - wr_cnt); +#endif + } + rec_offs -= sizeof(_enq_tail) - wsize; + assert(rec_offs == 0); + } + } + else // Start at beginning of data record + { + // Assumption: the header will always fit into the first dblk + std::memcpy(wptr, (void*)&_enq_hdr, sizeof(_enq_hdr)); + wr_cnt = sizeof(_enq_hdr); + if (size_dblks(rec_size()) > max_size_dblks) // Split required + { + std::size_t wsize; + rem -= sizeof(_enq_hdr); + if (rem) + { + wsize = rem >= _enq_hdr._xidsize ? _enq_hdr._xidsize : rem; + std::memcpy((char*)wptr + wr_cnt, _xidp, wsize); + wr_cnt += wsize; + rem -= wsize; + } + if (rem && !_enq_hdr.is_external()) + { + wsize = rem >= _enq_hdr._dsize ? _enq_hdr._dsize : rem; + std::memcpy((char*)wptr + wr_cnt, _data, wsize); + wr_cnt += wsize; + rem -= wsize; + } + if (rem) + { + wsize = rem >= sizeof(_enq_tail) ? sizeof(_enq_tail) : rem; + std::memcpy((char*)wptr + wr_cnt, (void*)&_enq_tail, wsize); + wr_cnt += wsize; + rem -= wsize; + } + assert(rem == 0); + } + else // No split required + { + if (_enq_hdr._xidsize) + { + std::memcpy((char*)wptr + wr_cnt, _xidp, _enq_hdr._xidsize); + wr_cnt += _enq_hdr._xidsize; + } + if (!_enq_hdr.is_external()) + { + std::memcpy((char*)wptr + wr_cnt, _data, _enq_hdr._dsize); + wr_cnt += _enq_hdr._dsize; + } + std::memcpy((char*)wptr + wr_cnt, (void*)&_enq_tail, sizeof(_enq_tail)); + wr_cnt += sizeof(_enq_tail); +#ifdef RHM_CLEAN + std::size_t dblk_rec_size = size_dblks(rec_size()) * JRNL_DBLK_SIZE; + std::memset((char*)wptr + wr_cnt, RHM_CLEAN_CHAR, dblk_rec_size - wr_cnt); +#endif + } + } + return size_dblks(wr_cnt); +} + +u_int32_t +enq_rec::decode(rec_hdr& h, void* rptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks) +{ + assert(rptr != 0); + assert(max_size_dblks > 0); + + std::size_t rd_cnt = 0; + if (rec_offs_dblks) // Continuation of record on new page + { + const u_int32_t hdr_xid_data_size = enq_hdr::size() + _enq_hdr._xidsize + + (_enq_hdr.is_external() ? 0 : _enq_hdr._dsize); + const u_int32_t hdr_xid_data_tail_size = hdr_xid_data_size + rec_tail::size(); + const u_int32_t hdr_data_dblks = size_dblks(hdr_xid_data_size); + const u_int32_t hdr_tail_dblks = size_dblks(hdr_xid_data_tail_size); + const std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE; + const std::size_t offs = rec_offs - enq_hdr::size(); + + if (hdr_tail_dblks - rec_offs_dblks <= max_size_dblks) + { + // Remainder of record fits within this page + if (offs < _enq_hdr._xidsize) + { + // some XID still outstanding, copy remainder of XID, data and tail + const std::size_t rem = _enq_hdr._xidsize + _enq_hdr._dsize - offs; + std::memcpy((char*)_buff + offs, rptr, rem); + rd_cnt += rem; + std::memcpy((void*)&_enq_tail, ((char*)rptr + rd_cnt), sizeof(_enq_tail)); + chk_tail(); + rd_cnt += sizeof(_enq_tail); + } + else if (offs < _enq_hdr._xidsize + _enq_hdr._dsize && !_enq_hdr.is_external()) + { + // some data still outstanding, copy remainder of data and tail + const std::size_t data_offs = offs - _enq_hdr._xidsize; + const std::size_t data_rem = _enq_hdr._dsize - data_offs; + std::memcpy((char*)_buff + offs, rptr, data_rem); + rd_cnt += data_rem; + std::memcpy((void*)&_enq_tail, ((char*)rptr + rd_cnt), sizeof(_enq_tail)); + chk_tail(); + rd_cnt += sizeof(_enq_tail); + } + else + { + // Tail or part of tail only outstanding, complete tail + const std::size_t tail_offs = rec_offs - enq_hdr::size() - _enq_hdr._xidsize - + _enq_hdr._dsize; + const std::size_t tail_rem = rec_tail::size() - tail_offs; + std::memcpy((char*)&_enq_tail + tail_offs, rptr, tail_rem); + chk_tail(); + rd_cnt = tail_rem; + } + } + else if (hdr_data_dblks - rec_offs_dblks <= max_size_dblks) + { + // Remainder of xid & data fits within this page; tail split + + /* + * TODO: This section needs revision. Since it is known that the end of the page falls within the + * tail record, it is only necessary to write from the current offset to the end of the page under + * all circumstances. The multiple if/else combinations may be eliminated, as well as one memcpy() + * operation. + * + * Also note that Coverity has detected a possible memory overwrite in this block. It occurs if + * both the following two if() stmsts (numbered) are false. With rd_cnt = 0, this would result in + * the value of tail_rem > sizeof(tail_rec). Practically, this could only happen if the start and + * end of a page both fall within the same tail record, in which case the tail would have to be + * (much!) larger. However, the logic here does not account for this possibility. + * + * If the optimization above is undertaken, this code would probably be removed. + */ + if (offs < _enq_hdr._xidsize) // 1 + { + // some XID still outstanding, copy remainder of XID and data + const std::size_t rem = _enq_hdr._xidsize + _enq_hdr._dsize - offs; + std::memcpy((char*)_buff + offs, rptr, rem); + rd_cnt += rem; + } + else if (offs < _enq_hdr._xidsize + _enq_hdr._dsize && !_enq_hdr.is_external()) // 2 + { + // some data still outstanding, copy remainder of data + const std::size_t data_offs = offs - _enq_hdr._xidsize; + const std::size_t data_rem = _enq_hdr._dsize - data_offs; + std::memcpy((char*)_buff + offs, rptr, data_rem); + rd_cnt += data_rem; + } + const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE) - rd_cnt; + if (tail_rem) + { + std::memcpy((void*)&_enq_tail, ((char*)rptr + rd_cnt), tail_rem); + rd_cnt += tail_rem; + } + } + else + { + // Since xid and data are contiguous, both fit within current page - copy whole page + const std::size_t data_cp_size = (max_size_dblks * JRNL_DBLK_SIZE); + std::memcpy((char*)_buff + offs, rptr, data_cp_size); + rd_cnt += data_cp_size; + } + } + else // Start of record + { + // Get and check header + _enq_hdr.hdr_copy(h); + rd_cnt = sizeof(rec_hdr); +#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT) + rd_cnt += sizeof(u_int32_t); // Filler 0 +#endif + _enq_hdr._xidsize = *(std::size_t*)((char*)rptr + rd_cnt); + rd_cnt += sizeof(std::size_t); +#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT) + rd_cnt += sizeof(u_int32_t); // Filler 0 +#endif +#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT) + rd_cnt += sizeof(u_int32_t); // Filler 1 +#endif + _enq_hdr._dsize = *(std::size_t*)((char*)rptr + rd_cnt); + rd_cnt = _enq_hdr.size(); + chk_hdr(); + if (_enq_hdr._xidsize + (_enq_hdr.is_external() ? 0 : _enq_hdr._dsize)) + { + _buff = std::malloc(_enq_hdr._xidsize + (_enq_hdr.is_external() ? 0 : _enq_hdr._dsize)); + MALLOC_CHK(_buff, "_buff", "enq_rec", "decode"); + + const u_int32_t hdr_xid_size = enq_hdr::size() + _enq_hdr._xidsize; + const u_int32_t hdr_xid_data_size = hdr_xid_size + (_enq_hdr.is_external() ? 0 : _enq_hdr._dsize); + const u_int32_t hdr_xid_data_tail_size = hdr_xid_data_size + rec_tail::size(); + const u_int32_t hdr_xid_dblks = size_dblks(hdr_xid_size); + const u_int32_t hdr_data_dblks = size_dblks(hdr_xid_data_size); + const u_int32_t hdr_tail_dblks = size_dblks(hdr_xid_data_tail_size); + // Check if record (header + data + tail) fits within this page, we can check the + // tail before the expense of copying data to memory + if (hdr_tail_dblks <= max_size_dblks) + { + // Header, xid, data and tail fits within this page + if (_enq_hdr._xidsize) + { + std::memcpy(_buff, (char*)rptr + rd_cnt, _enq_hdr._xidsize); + rd_cnt += _enq_hdr._xidsize; + } + if (_enq_hdr._dsize && !_enq_hdr.is_external()) + { + std::memcpy((char*)_buff + _enq_hdr._xidsize, (char*)rptr + rd_cnt, + _enq_hdr._dsize); + rd_cnt += _enq_hdr._dsize; + } + std::memcpy((void*)&_enq_tail, (char*)rptr + rd_cnt, sizeof(_enq_tail)); + chk_tail(); + rd_cnt += sizeof(_enq_tail); + } + else if (hdr_data_dblks <= max_size_dblks) + { + // Header, xid and data fit within this page, tail split or separated + if (_enq_hdr._xidsize) + { + std::memcpy(_buff, (char*)rptr + rd_cnt, _enq_hdr._xidsize); + rd_cnt += _enq_hdr._xidsize; + } + if (_enq_hdr._dsize && !_enq_hdr.is_external()) + { + std::memcpy((char*)_buff + _enq_hdr._xidsize, (char*)rptr + rd_cnt, + _enq_hdr._dsize); + rd_cnt += _enq_hdr._dsize; + } + const std::size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE) - rd_cnt; + if (tail_rem) + { + std::memcpy((void*)&_enq_tail, (char*)rptr + rd_cnt, tail_rem); + rd_cnt += tail_rem; + } + } + else if (hdr_xid_dblks <= max_size_dblks) + { + // Header and xid fits within this page, data split or separated + if (_enq_hdr._xidsize) + { + std::memcpy(_buff, (char*)rptr + rd_cnt, _enq_hdr._xidsize); + rd_cnt += _enq_hdr._xidsize; + } + if (_enq_hdr._dsize && !_enq_hdr.is_external()) + { + const std::size_t data_cp_size = (max_size_dblks * JRNL_DBLK_SIZE) - rd_cnt; + std::memcpy((char*)_buff + _enq_hdr._xidsize, (char*)rptr + rd_cnt, data_cp_size); + rd_cnt += data_cp_size; + } + } + else + { + // Header fits within this page, xid split or separated + const std::size_t data_cp_size = (max_size_dblks * JRNL_DBLK_SIZE) - rd_cnt; + std::memcpy(_buff, (char*)rptr + rd_cnt, data_cp_size); + rd_cnt += data_cp_size; + } + } + } + return size_dblks(rd_cnt); +} + +bool +enq_rec::rcv_decode(rec_hdr h, std::ifstream* ifsp, std::size_t& rec_offs) +{ + if (rec_offs == 0) + { + // Read header, allocate (if req'd) for xid + _enq_hdr.hdr_copy(h); +#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT) + ifsp->ignore(sizeof(u_int32_t)); // _filler0 +#endif + ifsp->read((char*)&_enq_hdr._xidsize, sizeof(std::size_t)); +#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT) + ifsp->ignore(sizeof(u_int32_t)); // _filler0 +#endif +#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT) + ifsp->ignore(sizeof(u_int32_t)); // _filler1 +#endif + ifsp->read((char*)&_enq_hdr._dsize, sizeof(std::size_t)); +#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT) + ifsp->ignore(sizeof(u_int32_t)); // _filler1 +#endif + rec_offs = sizeof(_enq_hdr); + if (_enq_hdr._xidsize) + { + _buff = std::malloc(_enq_hdr._xidsize); + MALLOC_CHK(_buff, "_buff", "enq_rec", "rcv_decode"); + } + } + if (rec_offs < sizeof(_enq_hdr) + _enq_hdr._xidsize) + { + // Read xid (or continue reading xid) + std::size_t offs = rec_offs - sizeof(_enq_hdr); + ifsp->read((char*)_buff + offs, _enq_hdr._xidsize - offs); + std::size_t size_read = ifsp->gcount(); + rec_offs += size_read; + if (size_read < _enq_hdr._xidsize - offs) + { + assert(ifsp->eof()); + // As we may have read past eof, turn off fail bit + ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit)); + assert(!ifsp->fail() && !ifsp->bad()); + return false; + } + } + if (!_enq_hdr.is_external()) + { + if (rec_offs < sizeof(_enq_hdr) + _enq_hdr._xidsize + _enq_hdr._dsize) + { + // Ignore data (or continue ignoring data) + std::size_t offs = rec_offs - sizeof(_enq_hdr) - _enq_hdr._xidsize; + ifsp->ignore(_enq_hdr._dsize - offs); + std::size_t size_read = ifsp->gcount(); + rec_offs += size_read; + if (size_read < _enq_hdr._dsize - offs) + { + assert(ifsp->eof()); + // As we may have read past eof, turn off fail bit + ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit)); + assert(!ifsp->fail() && !ifsp->bad()); + return false; + } + } + } + if (rec_offs < sizeof(_enq_hdr) + _enq_hdr._xidsize + + (_enq_hdr.is_external() ? 0 : _enq_hdr._dsize) + sizeof(rec_tail)) + { + // Read tail (or continue reading tail) + std::size_t offs = rec_offs - sizeof(_enq_hdr) - _enq_hdr._xidsize; + if (!_enq_hdr.is_external()) + offs -= _enq_hdr._dsize; + ifsp->read((char*)&_enq_tail + offs, sizeof(rec_tail) - offs); + std::size_t size_read = ifsp->gcount(); + rec_offs += size_read; + if (size_read < sizeof(rec_tail) - offs) + { + assert(ifsp->eof()); + // As we may have read past eof, turn off fail bit + ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit)); + assert(!ifsp->fail() && !ifsp->bad()); + return false; + } + } + ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - rec_size()); + chk_tail(); // Throws if tail invalid or record incomplete + assert(!ifsp->fail() && !ifsp->bad()); + return true; +} + +std::size_t +enq_rec::get_xid(void** const xidpp) +{ + if (!_buff || !_enq_hdr._xidsize) + { + *xidpp = 0; + return 0; + } + *xidpp = _buff; + return _enq_hdr._xidsize; +} + +std::size_t +enq_rec::get_data(void** const datapp) +{ + if (!_buff) + { + *datapp = 0; + return 0; + } + if (_enq_hdr.is_external()) + *datapp = 0; + else + *datapp = (void*)((char*)_buff + _enq_hdr._xidsize); + return _enq_hdr._dsize; +} + +std::string& +enq_rec::str(std::string& str) const +{ + std::ostringstream oss; + oss << "enq_rec: m=" << _enq_hdr._magic; + oss << " v=" << (int)_enq_hdr._version; + oss << " rid=" << _enq_hdr._rid; + if (_xidp) + oss << " xid=\"" << _xidp << "\""; + oss << " len=" << _enq_hdr._dsize; + str.append(oss.str()); + return str; +} + +std::size_t +enq_rec::rec_size() const +{ + return rec_size(_enq_hdr._xidsize, _enq_hdr._dsize, _enq_hdr.is_external()); +} + +std::size_t +enq_rec::rec_size(const std::size_t xidsize, const std::size_t dsize, const bool external) +{ + if (external) + return enq_hdr::size() + xidsize + rec_tail::size(); + return enq_hdr::size() + xidsize + dsize + rec_tail::size(); +} + +void +enq_rec::set_rid(const u_int64_t rid) +{ + _enq_hdr._rid = rid; + _enq_tail._rid = rid; +} + +void +enq_rec::chk_hdr() const +{ + jrec::chk_hdr(_enq_hdr); + if (_enq_hdr._magic != RHM_JDAT_ENQ_MAGIC) + { + std::ostringstream oss; + oss << std::hex << std::setfill('0'); + oss << "enq magic: rid=0x" << std::setw(16) << _enq_hdr._rid; + oss << ": expected=0x" << std::setw(8) << RHM_JDAT_ENQ_MAGIC; + oss << " read=0x" << std::setw(2) << (int)_enq_hdr._magic; + throw jexception(jerrno::JERR_JREC_BADRECHDR, oss.str(), "enq_rec", "chk_hdr"); + } +} + +void +enq_rec::chk_hdr(u_int64_t rid) const +{ + chk_hdr(); + jrec::chk_rid(_enq_hdr, rid); +} + +void +enq_rec::chk_tail() const +{ + jrec::chk_tail(_enq_tail, _enq_hdr); +} + +void +enq_rec::clean() +{ + // clean up allocated memory here +} + +} // namespace journal +} // namespace mrg |