summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp')
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp397
1 files changed, 397 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp b/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp
new file mode 100644
index 0000000000..0fecd90cbf
--- /dev/null
+++ b/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp
@@ -0,0 +1,397 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/linearstore/journal/enq_rec.h"
+
+#include <cassert>
+#include <cstring>
+#include "qpid/linearstore/journal/Checksum.h"
+#include "qpid/linearstore/journal/jexception.h"
+
+namespace qpid {
+namespace linearstore {
+namespace journal {
+
+enq_rec::enq_rec():
+ jrec(), // superclass
+ _xidp(0),
+ _data(0),
+ _xid_buff(0),
+ _data_buff(0)
+{
+ ::enq_hdr_init(&_enq_hdr, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, 0, false);
+ ::rec_tail_copy(&_enq_tail, &_enq_hdr._rhdr, 0);
+}
+
+enq_rec::~enq_rec()
+{
+ clean();
+}
+
+void
+enq_rec::reset(const uint64_t serial, const uint64_t rid, const void* const dbuf, const std::size_t dlen,
+ const void* const xidp, const std::size_t xidlen, const bool transient, const bool external)
+{
+ _enq_hdr._rhdr._serial = serial;
+ _enq_hdr._rhdr._rid = rid;
+ ::set_enq_transient(&_enq_hdr, transient);
+ ::set_enq_external(&_enq_hdr, external);
+ _enq_hdr._xidsize = xidlen;
+ _enq_hdr._dsize = dlen;
+ _xidp = xidp;
+ _data = dbuf;
+ _enq_tail._serial = serial;
+ _enq_tail._rid = rid;
+}
+
+uint32_t
+enq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Checksum& checksum)
+{
+ assert(wptr != 0);
+ assert(max_size_dblks > 0);
+ if (_xidp == 0)
+ assert(_enq_hdr._xidsize == 0);
+
+ std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
+ std::size_t rem = max_size_dblks * QLS_DBLK_SIZE_BYTES;
+ std::size_t wr_cnt = 0;
+ if (rec_offs_dblks) // Continuation of split data record (over 2 or more pages)
+ {
+ if (size_dblks(rec_size()) - rec_offs_dblks > max_size_dblks) // Further split required
+ {
+ rec_offs -= sizeof(_enq_hdr);
+ std::size_t wsize = _enq_hdr._xidsize > rec_offs ? _enq_hdr._xidsize - rec_offs : 0;
+ std::size_t wsize2 = wsize;
+ if (wsize)
+ {
+ if (wsize > rem)
+ wsize = rem;
+ std::memcpy(wptr, (const char*)_xidp + rec_offs, wsize);
+ wr_cnt = wsize;
+ rem -= wsize;
+ }
+ rec_offs -= _enq_hdr._xidsize - wsize2;
+ if (rem && !::is_enq_external(&_enq_hdr))
+ {
+ wsize = _enq_hdr._dsize > rec_offs ? _enq_hdr._dsize - rec_offs : 0;
+ wsize2 = wsize;
+ if (wsize)
+ {
+ if (wsize > rem)
+ wsize = rem;
+ std::memcpy((char*)wptr + wr_cnt, (const char*)_data + rec_offs, wsize);
+ wr_cnt += wsize;
+ rem -= wsize;
+ }
+ rec_offs -= _enq_hdr._dsize - wsize2;
+ }
+ checksum.addData((unsigned char*)wptr, wr_cnt);
+ if (rem)
+ {
+ _enq_tail._checksum = checksum.getChecksum();
+ wsize = sizeof(_enq_tail) > rec_offs ? sizeof(_enq_tail) - rec_offs : 0;
+ wsize2 = wsize;
+ if (wsize)
+ {
+ if (wsize > rem)
+ wsize = rem;
+ std::memcpy((char*)wptr + wr_cnt, (char*)&_enq_tail + rec_offs, wsize);
+ wr_cnt += wsize;
+ rem -= wsize;
+ }
+ rec_offs -= sizeof(_enq_tail) - wsize2;
+ }
+ assert(rem == 0);
+ assert(rec_offs == 0);
+ }
+ else // No further split required
+ {
+ rec_offs -= sizeof(_enq_hdr);
+ std::size_t xid_wsize = _enq_hdr._xidsize > rec_offs ? _enq_hdr._xidsize - rec_offs : 0;
+ if (xid_wsize)
+ {
+ std::memcpy(wptr, (const char*)_xidp + rec_offs, xid_wsize);
+ wr_cnt += xid_wsize;
+ }
+ rec_offs -= _enq_hdr._xidsize - xid_wsize;
+ std::size_t data_wsize = _enq_hdr._dsize > rec_offs ? _enq_hdr._dsize - rec_offs : 0;
+ if (data_wsize && !::is_enq_external(&_enq_hdr))
+ {
+ std::memcpy((char*)wptr + wr_cnt, (const char*)_data + rec_offs, data_wsize);
+ wr_cnt += data_wsize;
+ }
+ rec_offs -= _enq_hdr._dsize - data_wsize;
+ if (xid_wsize || data_wsize) {
+ checksum.addData((unsigned char*)wptr, wr_cnt);
+ }
+ _enq_tail._checksum = checksum.getChecksum();
+ std::size_t wsize = sizeof(_enq_tail) > rec_offs ? sizeof(_enq_tail) - rec_offs : 0;
+ if (wsize)
+ {
+ std::memcpy((char*)wptr + wr_cnt, (char*)&_enq_tail + rec_offs, wsize);
+ wr_cnt += wsize;
+#ifdef QLS_CLEAN
+ std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
+ std::size_t dblk_rec_size = size_dblks(rec_size() - rec_offs) * QLS_DBLK_SIZE_BYTES;
+ std::memset((char*)wptr + wr_cnt, QLS_CLEAN_CHAR, dblk_rec_size - wr_cnt);
+#endif
+ }
+ rec_offs -= sizeof(_enq_tail) - wsize;
+ assert(rec_offs == 0);
+ }
+ }
+ else // Start at beginning of data record
+ {
+ // Assumption: the header will always fit into the first dblk
+ std::memcpy(wptr, (void*)&_enq_hdr, sizeof(_enq_hdr));
+ wr_cnt = sizeof(_enq_hdr);
+ if (size_dblks(rec_size()) > max_size_dblks) // Split required
+ {
+ std::size_t wsize;
+ rem -= sizeof(_enq_hdr);
+ if (rem)
+ {
+ wsize = rem >= _enq_hdr._xidsize ? _enq_hdr._xidsize : rem;
+ std::memcpy((char*)wptr + wr_cnt, _xidp, wsize);
+ wr_cnt += wsize;
+ rem -= wsize;
+ }
+ if (rem && !::is_enq_external(&_enq_hdr))
+ {
+ wsize = rem >= _enq_hdr._dsize ? _enq_hdr._dsize : rem;
+ std::memcpy((char*)wptr + wr_cnt, _data, wsize);
+ wr_cnt += wsize;
+ rem -= wsize;
+ }
+ checksum.addData((unsigned char*)wptr, wr_cnt);
+ if (rem)
+ {
+ _enq_tail._checksum = checksum.getChecksum();
+ wsize = rem >= sizeof(_enq_tail) ? sizeof(_enq_tail) : rem;
+ std::memcpy((char*)wptr + wr_cnt, (void*)&_enq_tail, wsize);
+ wr_cnt += wsize;
+ rem -= wsize;
+ }
+ assert(rem == 0);
+ }
+ else // No split required
+ {
+ if (_enq_hdr._xidsize)
+ {
+ std::memcpy((char*)wptr + wr_cnt, _xidp, _enq_hdr._xidsize);
+ wr_cnt += _enq_hdr._xidsize;
+ }
+ if (!::is_enq_external(&_enq_hdr))
+ {
+ std::memcpy((char*)wptr + wr_cnt, _data, _enq_hdr._dsize);
+ wr_cnt += _enq_hdr._dsize;
+ }
+ checksum.addData((unsigned char*)wptr, wr_cnt);
+ _enq_tail._checksum = checksum.getChecksum();
+ std::memcpy((char*)wptr + wr_cnt, (void*)&_enq_tail, sizeof(_enq_tail));
+ wr_cnt += sizeof(_enq_tail);
+#ifdef QLS_CLEAN
+ std::size_t dblk_rec_size = size_dblks(rec_size()) * QLS_DBLK_SIZE_BYTES;
+ std::memset((char*)wptr + wr_cnt, QLS_CLEAN_CHAR, dblk_rec_size - wr_cnt);
+#endif
+ }
+ }
+ return size_dblks(wr_cnt);
+}
+
+bool
+enq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs, const std::streampos rec_start)
+{
+ if (rec_offs == 0)
+ {
+ // Read header, allocate (if req'd) for xid
+ ::rec_hdr_copy(&_enq_hdr._rhdr, &h);
+ ifsp->read((char*)&_enq_hdr._xidsize, sizeof(_enq_hdr._xidsize));
+ ifsp->read((char*)&_enq_hdr._dsize, sizeof(_enq_hdr._dsize));
+ rec_offs = sizeof(::enq_hdr_t);
+ if (_enq_hdr._xidsize > 0)
+ {
+ _xid_buff = std::malloc(_enq_hdr._xidsize);
+ MALLOC_CHK(_xid_buff, "_xid_buff", "enq_rec", "decode");
+ }
+ if (_enq_hdr._dsize > 0)
+ {
+ _data_buff = std::malloc(_enq_hdr._dsize);
+ MALLOC_CHK(_data_buff, "_data_buff", "enq_rec", "decode")
+ }
+ }
+ if (rec_offs < sizeof(_enq_hdr) + _enq_hdr._xidsize)
+ {
+ // Read xid (or continue reading xid)
+ std::size_t offs = rec_offs - sizeof(_enq_hdr);
+ ifsp->read((char*)_xid_buff + offs, _enq_hdr._xidsize - offs);
+ std::size_t size_read = ifsp->gcount();
+ rec_offs += size_read;
+ if (size_read < _enq_hdr._xidsize - offs)
+ {
+ assert(ifsp->eof());
+ // As we may have read past eof, turn off fail bit
+ ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit));
+ assert(!ifsp->fail() && !ifsp->bad());
+ return false;
+ }
+ }
+ if (!::is_enq_external(&_enq_hdr))
+ {
+ if (rec_offs < sizeof(_enq_hdr) + _enq_hdr._xidsize + _enq_hdr._dsize)
+ {
+ // Read data (or continue reading data)
+ std::size_t offs = rec_offs - sizeof(_enq_hdr) - _enq_hdr._xidsize;
+ ifsp->read((char*)_data_buff + offs, _enq_hdr._dsize - offs);
+ std::size_t size_read = ifsp->gcount();
+ rec_offs += size_read;
+ if (size_read < _enq_hdr._dsize - offs)
+ {
+ assert(ifsp->eof());
+ // As we may have read past eof, turn off fail bit
+ ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit));
+ assert(!ifsp->fail() && !ifsp->bad());
+ return false;
+ }
+ }
+ }
+ if (rec_offs < sizeof(_enq_hdr) + _enq_hdr._xidsize +
+ (::is_enq_external(&_enq_hdr) ? 0 : _enq_hdr._dsize) + sizeof(rec_tail_t))
+ {
+ // Read tail (or continue reading tail)
+ std::size_t offs = rec_offs - sizeof(_enq_hdr) - _enq_hdr._xidsize;
+ if (!::is_enq_external(&_enq_hdr))
+ offs -= _enq_hdr._dsize;
+ ifsp->read((char*)&_enq_tail + offs, sizeof(rec_tail_t) - offs);
+ std::size_t size_read = ifsp->gcount();
+ rec_offs += size_read;
+ if (size_read < sizeof(rec_tail_t) - offs)
+ {
+ assert(ifsp->eof());
+ // As we may have read past eof, turn off fail bit
+ ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit));
+ assert(!ifsp->fail() && !ifsp->bad());
+ return false;
+ }
+ check_rec_tail(rec_start);
+ }
+ ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
+ assert(!ifsp->fail() && !ifsp->bad());
+ return true;
+}
+
+std::size_t
+enq_rec::get_xid(void** const xidpp)
+{
+ if (!_xid_buff || !_enq_hdr._xidsize) {
+ *xidpp = 0;
+ return 0;
+ }
+ *xidpp = _xid_buff;
+ return _enq_hdr._xidsize;
+}
+
+std::size_t
+enq_rec::get_data(void** const datapp)
+{
+ if (!_data_buff) {
+ *datapp = 0;
+ return 0;
+ }
+ if (::is_enq_external(&_enq_hdr))
+ *datapp = 0;
+ else
+ *datapp = _data_buff;
+ return _enq_hdr._dsize;
+}
+
+std::string&
+enq_rec::str(std::string& str) const
+{
+ std::ostringstream oss;
+ oss << "enq_rec: m=" << _enq_hdr._rhdr._magic;
+ oss << " v=" << (int)_enq_hdr._rhdr._version;
+ oss << " rid=" << _enq_hdr._rhdr._rid;
+ if (_xidp)
+ oss << " xid=\"" << _xidp << "\"";
+ oss << " len=" << _enq_hdr._dsize;
+ str.append(oss.str());
+ return str;
+}
+
+std::size_t
+enq_rec::rec_size() const
+{
+ return rec_size(_enq_hdr._xidsize, _enq_hdr._dsize, ::is_enq_external(&_enq_hdr));
+}
+
+std::size_t
+enq_rec::rec_size(const std::size_t xidsize, const std::size_t dsize, const bool external)
+{
+ if (external)
+ return sizeof(enq_hdr_t) + xidsize + sizeof(rec_tail_t);
+ return sizeof(enq_hdr_t) + xidsize + dsize + sizeof(rec_tail_t);
+}
+
+void
+enq_rec::check_rec_tail(const std::streampos rec_start) const {
+ Checksum checksum;
+ checksum.addData((const unsigned char*)&_enq_hdr, sizeof(::enq_hdr_t));
+ if (_enq_hdr._xidsize > 0) {
+ checksum.addData((const unsigned char*)_xid_buff, _enq_hdr._xidsize);
+ }
+ if (_enq_hdr._dsize > 0) {
+ checksum.addData((const unsigned char*)_data_buff, _enq_hdr._dsize);
+ }
+ uint32_t cs = checksum.getChecksum();
+ uint16_t res = ::rec_tail_check(&_enq_tail, &_enq_hdr._rhdr, cs);
+ if (res != 0) {
+ std::stringstream oss;
+ oss << std::endl << " Record offset: 0x" << std::hex << rec_start;
+ if (res & ::REC_TAIL_MAGIC_ERR_MASK) {
+ oss << std::endl << " Magic: expected 0x" << ~_enq_hdr._rhdr._magic << "; found 0x" << _enq_tail._xmagic;
+ }
+ if (res & ::REC_TAIL_SERIAL_ERR_MASK) {
+ oss << std::endl << " Serial: expected 0x" << _enq_hdr._rhdr._serial << "; found 0x" << _enq_tail._serial;
+ }
+ if (res & ::REC_TAIL_RID_ERR_MASK) {
+ oss << std::endl << " Record Id: expected 0x" << _enq_hdr._rhdr._rid << "; found 0x" << _enq_tail._rid;
+ }
+ if (res & ::REC_TAIL_CHECKSUM_ERR_MASK) {
+ oss << std::endl << " Checksum: expected 0x" << cs << "; found 0x" << _enq_tail._checksum;
+ }
+ throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "enq_rec", "check_rec_tail");
+ }
+}
+
+void
+enq_rec::clean() {
+ if (_xid_buff) {
+ std::free(_xid_buff);
+ _xid_buff = 0;
+ }
+ if (_data_buff) {
+ std::free(_data_buff);
+ _data_buff = 0;
+ }
+}
+
+}}}