summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2014-01-15 22:07:30 +0000
committerKim van der Riet <kpvdr@apache.org>2014-01-15 22:07:30 +0000
commit810ac4c70346730fb7103e823f26e30e7f68a731 (patch)
treec29102bd9924d8b9dbefb6321f2f60513824c008
parenta6e5e1fa9e750c8667cabfb752f0d5174eba70a5 (diff)
downloadqpid-python-810ac4c70346730fb7103e823f26e30e7f68a731.tar.gz
QPID-5483: [linearstore] Recovery of journal with partly written record fails with "JERR_JREC_BADRECTAIL: Invalid data record tail" error message
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1558589 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/linearstore/ISSUES104
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/Checksum.cpp8
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp40
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp66
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/deq_rec.h3
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp73
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/enq_rec.h4
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp65
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/txn_rec.h3
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/utils/rec_tail.c13
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/utils/rec_tail.h7
11 files changed, 247 insertions, 139 deletions
diff --git a/qpid/cpp/src/qpid/linearstore/ISSUES b/qpid/cpp/src/qpid/linearstore/ISSUES
index 412db073df..8b5f8524dc 100644
--- a/qpid/cpp/src/qpid/linearstore/ISSUES
+++ b/qpid/cpp/src/qpid/linearstore/ISSUES
@@ -17,52 +17,70 @@
# under the License.
#
-LinearStore issues:
+Linear Store issues:
-Store:
-------
+Current/pending:
+================
+ Q-JIRA RHBZ Description / Comments
+ ------ ------- ----------------------
+ 5359 - Linearstore: Implement new management schema and wire into store
+ 5360 - Linearstore: Evaluate and rework logging to produce a consistent log outputConsistent logging
+ 5361 - Linearstore: No tests for linearstore functionality currently exist
+ * No existing tests for linearstore:
+ ** Basic broker-level tests for txn and non-txn recovery
+ ** Store-level tests which check write boundary conditions
+ ** EFP tests, including file recovery, error management
+ ** Unit tests
+ ** Basic performance tests
+ 5362 - Linearstore: No store tools exist for examining the journals
+ svn r.1558888 2014-01-09: WIP checkin for linearstore version of qpid_qls_analyze. Needs testing and tidy-up.
+ * Store analysis and status
+ * Recovery/reading of message content
+ * Empty file pool status and management
+ 5464 - [linearstore] Incompletely created journal files accumulate in EFP
+ 5479 1053701 [linearstore] Using recovered store results in "JERR_JNLF_FILEOFFSOVFL: Attempted to increase submitted offset past file size. (JournalFile::submittedDblkCount)" error message
+ 5480 1053749 [linearstore] Recovery of store failure with "JERR_MAP_NOTFOUND: Key not found in map." error message
+ - 1035843 Slow performance for producers
+ - 1036026 [LinearStore] Qpid linear store unable to create durable queue - framing-error: Queue <q-name>: create() failed: jexception 0x0000
+ UNABLE TO REPRODUCE - but Frantizek has additional info
+ - 1039522 Qpid crashes while recovering from linear store around apid::linearstore::journal::JournalFile::getFqFileName() including enq_rec::decode() threw JERR_JREC_BAD_RECTAIL
+ - 1039525 Qpid crashes while recovering from linear store around apid::linearstore::journal::jexception::format including enq_rec::decode() threw JERR_JREC_BAD_REC_TAIL
+ * Possible dup of 1039522
-1. (FIXED) Overwrite identity: When recovering a previously used file, if the write boundary coincides with old record
- start, no way of discriminating old from new at boundary (used to use OWI).
+Fixed/closed:
+=============
+ Q-JIRA RHBZ Description / Comments
+ ------ ------- ----------------------
+ 5357 1052518 Linearstore: Empty file recycling not functional
+ svn r.1545563 2013-11-26: Propsed fix
+ 5358 1052727 Linearstore: Checksums not implemented in record tail
+ svn r.1547601 2013-12-03: Propsed fix
+ 5387 1036071 Linearstore: Segmentation fault when deleting queue
+ svn r.1547641 2013-12-03: Propsed fix
+ 5388 1035802 Linearstore: Segmentation fault when recovering empty queue
+ svn r.1547921 2013-12-04: Propsed fix
+NO-JIRA - Added missing Apache copyright/license text
+ svn r.1551304 2013-12-16: Propsed fix
+ 5425 1052445 Linearstore: Transaction Prepared List (TPL) fails with jexception 0x0402 AtomicCounter::addLimit() threw JERR_JNLF_FILEOFFSOVFL
+ svn r.1551361 2013-12-16: Proposed fix
+ 5442 1039949 Linearstore: Dtx recover test fails
+ svn r.1552772 2013-12-20: Proposed fix
+ 5444 1052775 Linearstore: Recovering from qpid-txtest fails with "Inconsistent TPL 2PC count" error message
+ svn r.1553148 2013-12-23: Proposed fix
+ - 1038599 [LinearStore] Abort when deleting used queue after restart
+ CLOSED-NOTABUG 2014-01-06
+ 5460 1051097 [linearstore] Recovery of store which contains prepared but incomplete transactions results in message loss
+ svn r.1556892 2014-01-09: Proposed fix
+ 5473 1051924 [linearstore] Recovery of journal in which last logical file contains truncated record causes crash
+ svn r.1557620 2014-01-12: Proposed fix
-2. (FIXED) QPID-5357: Recycling files while in use not working, however, files are recovered to EFP during recovery. Must solve
- #1 first.
-
-3. (FIXED) QPID-5358: Checksum not implemented in record tail, not checked during read.
-
-4. QPID-5359: Rework qpid management parameters and controls (QMF).
-
-5. QPID-5360: Consistent logging: rework logging to provide uniform and consistent logging from store (both logging
- level and places where logging occurs).
-
-6. QPID-5361: No tests
- * No existing tests for linearstore:
- ** Basic broker-level tests for txn and non-txn recovery
- ** Store-level tests which check write boundary conditions
- ** Unit tests
- ** Basic performance tests
-
-7: QPID-5362: No tools
- * Store analysis and status
- * Recovery/reading of message content
-
-8. One journal file lost when queue deleted. All files except for one are recycled back to the EFP.
-
-9. Complete exceptions - several exceptions thrown using jexception have no exception numbers
-
-Current bugs and performance issues:
-------------------------------------
-1. BZ 1035843 - Slow performance for producers
-2. (FIXED) QPID-5387 (BZ 1036071) - Crash when deleting queue
-3. (FIXED) QPID-5388 (BZ 1035802) - Segmentation fault when recovering empty queue
-4. (UNABLE TO REPRODUCE) BZ 1036026 - Unable to create durable queue - framing error - possibly caused by running both stores at the same time
-5. (UNABLE TO REPRODUCE) BZ 1038599 - Abort when deleting used queue after restart - may be dup of QPID-5387 (BZ 1036071)
-6. BZ 1039522 - Crash during recovery - JournalFile::getFqFileName() -JERR_JREC_BADRECTAIL
-7. BZ 1039525 - Crash during recovery - journal::jexception - JERR_JREC_BADRECTAIL
-8. (FIXED) QPID-5442 (BZ 1039949) - DTX test failure - missing XIDs
-9. (FIXED) QPID-5460 (BZ 1051097) - Transactional messages lost during recovery
-10. QPID-5464 - Incompletely created journal files accumulate in EFP
-11. QPID-5473 (BZ 1051924) - Recovery where last record in file is truncated (ie spans files), but following file is uninitialized causes crash
+Future:
+=======
+* One journal file lost when queue deleted. All files except for one are recycled back to the EFP.
+* Complete exceptions - several exceptions thrown using jexception have no exception numbers
+* Investigate ability of store to detect missing journal files, especially from logical end of a journal
+* Investigate ability of store to handle file muddle-ups (ie journal files from EFP which are not zeroed or other journals)
+* Look at improving the efficiency of recovery - right now the entire store is read once, and then each recovered record xid and data is read again
Code tidy-up
------------
diff --git a/qpid/cpp/src/qpid/linearstore/journal/Checksum.cpp b/qpid/cpp/src/qpid/linearstore/journal/Checksum.cpp
index 89d654ce76..eaede12d8e 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/Checksum.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/Checksum.cpp
@@ -30,9 +30,11 @@ Checksum::Checksum() : a(1UL), b(0UL), MOD_ADLER(65521UL) {}
Checksum::~Checksum() {}
void Checksum::addData(const unsigned char* data, const std::size_t len) {
- for (uint32_t i = 0; i < len; i++) {
- a = (a + data[i]) % MOD_ADLER;
- b = (a + b) % MOD_ADLER;
+ if (data) {
+ for (uint32_t i = 0; i < len; i++) {
+ a = (a + data[i]) % MOD_ADLER;
+ b = (a + b) % MOD_ADLER;
+ }
}
}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
index e27a239a18..72308cc929 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
@@ -221,28 +221,34 @@ bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr,
// Check enqueue record checksum
Checksum checksum;
- checksum.addData((unsigned char*)&enqueueHeader, sizeof(::enq_hdr_t));
+ checksum.addData((const unsigned char*)&enqueueHeader, sizeof(::enq_hdr_t));
if (xidSize > 0) {
- checksum.addData((unsigned char*)*xidPtrPtr, xidSize);
+ checksum.addData((const unsigned char*)*xidPtrPtr, xidSize);
}
if (dataSize > 0) {
- checksum.addData((unsigned char*)*dataPtrPtr, dataSize);
+ checksum.addData((const unsigned char*)*dataPtrPtr, dataSize);
}
::rec_tail_t enqueueTail;
inFileStream_.read((char*)&enqueueTail, sizeof(::rec_tail_t));
uint32_t cs = checksum.getChecksum();
//std::cout << std::hex << "### rid=0x" << enqueueHeader._rhdr._rid << " rtcs=0x" << enqueueTail._checksum << " cs=0x" << cs << std::dec << std::endl; // DEBUG
- int res = ::rec_tail_check(&enqueueTail, &enqueueHeader._rhdr, cs);
+ uint16_t res = ::rec_tail_check(&enqueueTail, &enqueueHeader._rhdr, cs);
if (res != 0) {
std::stringstream oss;
- switch (res) {
- case 1: oss << std::hex << "Magic: expected 0x" << ~enqueueHeader._rhdr._magic << "; found 0x" << enqueueTail._xmagic; break;
- case 2: oss << std::hex << "Serial: expected 0x" << enqueueHeader._rhdr._serial << "; found 0x" << enqueueTail._serial; break;
- case 3: oss << std::hex << "Record Id: expected 0x" << enqueueHeader._rhdr._rid << "; found 0x" << enqueueTail._rid; break;
- case 4: oss << std::hex << "Checksum: expected 0x" << cs << "; found 0x" << enqueueTail._checksum; break;
- default: oss << "Unknown error " << res;
+ oss << "Bad record tail:" << std::hex;
+ if (res & ::REC_TAIL_MAGIC_ERR_MASK) {
+ oss << std::endl << " Magic: expected 0x" << ~enqueueHeader._rhdr._magic << "; found 0x" << enqueueTail._xmagic;
}
- throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "enq_rec", "decode"); // TODO: Don't throw exception, log info
+ if (res & ::REC_TAIL_SERIAL_ERR_MASK) {
+ oss << std::endl << " Serial: expected 0x" << enqueueHeader._rhdr._serial << "; found 0x" << enqueueTail._serial;
+ }
+ if (res & ::REC_TAIL_RID_ERR_MASK) {
+ oss << std::endl << " Record Id: expected 0x" << enqueueHeader._rhdr._rid << "; found 0x" << enqueueTail._rid;
+ }
+ if (res & ::REC_TAIL_CHECKSUM_ERR_MASK) {
+ oss << std::endl << " Checksum: expected 0x" << cs << "; found 0x" << enqueueTail._checksum;
+ }
+ throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "RecoveryManager", "readNextRemainingRecord"); // TODO: Don't throw exception, log info
}
// Set data token
@@ -472,7 +478,13 @@ bool RecoveryManager::decodeRecord(jrec& record,
done = record.decode(headerRecord, &inFileStream_, cumulativeSizeRead);
}
catch (const jexception& e) {
- journalLogRef_.log(JournalLog::LOG_INFO, queueName_, e.what());
+ if (e.err_code() == jerrno::JERR_JREC_BADRECTAIL) {
+ std::ostringstream oss;
+ oss << jerrno::err_msg(e.err_code()) << e.additional_info();
+ journalLogRef_.log(JournalLog::LOG_INFO, queueName_, oss.str());
+ } else {
+ journalLogRef_.log(JournalLog::LOG_INFO, queueName_, e.what());
+ }
checkJournalAlignment(start_file_offs);
return false;
}
@@ -602,7 +614,6 @@ bool RecoveryManager::getNextRecordHeader()
oss << std::hex << "_tmap.set_aio_compl: txn_enq xid=\"" << xid << "\" rid=0x" << h._rid;
throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "RecoveryManager", "getNextRecordHeader");
}
- std::free(xidp);
} else {
if (enqueueMapRef_.insert_pfid(h._rid, start_fid, file_pos) < enq_map::EMAP_OK) { // fail
// The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID.
@@ -641,7 +652,6 @@ bool RecoveryManager::getNextRecordHeader()
oss << std::hex << "_tmap.set_aio_compl: txn_deq xid=\"" << xid << "\" rid=0x" << dr.rid();
throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "RecoveryManager", "getNextRecordHeader");
}
- std::free(xidp);
} else {
uint64_t enq_fid;
if (enqueueMapRef_.get_remove_pfid(dr.deq_rid(), enq_fid, true) == enq_map::EMAP_OK) { // ignore not found error
@@ -675,7 +685,6 @@ bool RecoveryManager::getNextRecordHeader()
enqueueMapRef_.unlock(itr->drid_); // ignore not found error
}
}
- std::free(xidp);
}
break;
case QLS_TXC_MAGIC:
@@ -711,7 +720,6 @@ bool RecoveryManager::getNextRecordHeader()
fileNumberMap_[enq_fid]->decrEnqueuedRecordCount();
}
}
- std::free(xidp);
}
break;
case QLS_EMPTY_MAGIC:
diff --git a/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp b/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp
index 8b9e9d7f64..a4882aaa9c 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp
@@ -32,7 +32,7 @@ namespace journal {
deq_rec::deq_rec():
_xidp(0),
- _buff(0)
+ _xid_buff(0)
{
::deq_hdr_init(&_deq_hdr, QLS_DEQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, 0, 0);
::rec_tail_copy(&_deq_tail, &_deq_hdr._rhdr, 0);
@@ -53,7 +53,7 @@ deq_rec::reset(const uint64_t serial, const uint64_t rid, const uint64_t drid,
_deq_hdr._deq_rid = drid;
_deq_hdr._xidsize = xidlen;
_xidp = xidp;
- _buff = 0;
+ _xid_buff = 0;
_deq_tail._serial = serial;
_deq_tail._rid = rid;
_deq_tail._checksum = 0UL;
@@ -192,15 +192,15 @@ deq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs)
// Read header, allocate (if req'd) for xid
if (_deq_hdr._xidsize)
{
- _buff = std::malloc(_deq_hdr._xidsize);
- MALLOC_CHK(_buff, "_buff", "enq_rec", "rcv_decode");
+ _xid_buff = std::malloc(_deq_hdr._xidsize);
+ MALLOC_CHK(_xid_buff, "_buff", "enq_rec", "rcv_decode");
}
}
if (rec_offs < sizeof(_deq_hdr) + _deq_hdr._xidsize)
{
// Read xid (or continue reading xid)
std::size_t offs = rec_offs - sizeof(_deq_hdr);
- ifsp->read((char*)_buff + offs, _deq_hdr._xidsize - offs);
+ ifsp->read((char*)_xid_buff + offs, _deq_hdr._xidsize - offs);
std::size_t size_read = ifsp->gcount();
rec_offs += size_read;
if (size_read < _deq_hdr._xidsize - offs)
@@ -228,39 +228,22 @@ deq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs)
assert(!ifsp->fail() && !ifsp->bad());
return false;
}
+ check_rec_tail();
}
ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
assert(!ifsp->fail() && !ifsp->bad());
- if (_deq_hdr._xidsize) {
- Checksum checksum;
- checksum.addData((unsigned char*)&_deq_hdr, sizeof(_deq_hdr));
- checksum.addData((unsigned char*)_buff, _deq_hdr._xidsize);
- uint32_t cs = checksum.getChecksum();
- int res = ::rec_tail_check(&_deq_tail, &_deq_hdr._rhdr, cs);
- if (res != 0) {
- std::stringstream oss;
- switch (res) {
- case 1: oss << std::hex << "Magic: expected 0x" << ~_deq_hdr._rhdr._magic << "; found 0x" << _deq_tail._xmagic; break;
- case 2: oss << std::hex << "Serial: expected 0x" << _deq_hdr._rhdr._serial << "; found 0x" << _deq_tail._serial; break;
- case 3: oss << std::hex << "Record Id: expected 0x" << _deq_hdr._rhdr._rid << "; found 0x" << _deq_tail._rid; break;
- case 4: oss << std::hex << "Checksum: expected 0x" << cs << "; found 0x" << _deq_tail._checksum; break;
- default: oss << "Unknown error " << res;
- }
- throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "deq_rec", "decode"); // TODO: Don't throw exception, log info
- }
- }
return true;
}
std::size_t
deq_rec::get_xid(void** const xidpp)
{
- if (!_buff)
+ if (!_xid_buff)
{
*xidpp = 0;
return 0;
}
- *xidpp = _buff;
+ *xidpp = _xid_buff;
return _deq_hdr._xidsize;
}
@@ -291,9 +274,40 @@ deq_rec::rec_size() const
}
void
+deq_rec::check_rec_tail() const {
+ Checksum checksum;
+ checksum.addData((const unsigned char*)&_deq_hdr, sizeof(::deq_hdr_t));
+ if (_deq_hdr._xidsize > 0) {
+ checksum.addData((const unsigned char*)_xid_buff, _deq_hdr._xidsize);
+ }
+ uint32_t cs = checksum.getChecksum();
+ uint16_t res = ::rec_tail_check(&_deq_tail, &_deq_hdr._rhdr, cs);
+ if (res != 0) {
+ std::stringstream oss;
+ oss << std::hex;
+ if (res & ::REC_TAIL_MAGIC_ERR_MASK) {
+ oss << std::endl << " Magic: expected 0x" << ~_deq_hdr._rhdr._magic << "; found 0x" << _deq_tail._xmagic;
+ }
+ if (res & ::REC_TAIL_SERIAL_ERR_MASK) {
+ oss << std::endl << " Serial: expected 0x" << _deq_hdr._rhdr._serial << "; found 0x" << _deq_tail._serial;
+ }
+ if (res & ::REC_TAIL_RID_ERR_MASK) {
+ oss << std::endl << " Record Id: expected 0x" << _deq_hdr._rhdr._rid << "; found 0x" << _deq_tail._rid;
+ }
+ if (res & ::REC_TAIL_CHECKSUM_ERR_MASK) {
+ oss << std::endl << " Checksum: expected 0x" << cs << "; found 0x" << _deq_tail._checksum;
+ }
+ throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "deq_rec", "check_rec_tail");
+ }
+}
+
+void
deq_rec::clean()
{
- // clean up allocated memory here
+ if (_xid_buff) {
+ std::free(_xid_buff);
+ _xid_buff = 0;
+ }
}
}}}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h b/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h
index c7f78e1215..ead0eed72a 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h
@@ -39,7 +39,7 @@ class deq_rec : public jrec
private:
::deq_hdr_t _deq_hdr; ///< Local instance of dequeue header struct
const void* _xidp; ///< xid pointer for encoding (writing to disk)
- void* _buff; ///< Pointer to buffer to receive data read from disk
+ void* _xid_buff; ///< Pointer to buffer to receive xid read from disk
::rec_tail_t _deq_tail; ///< Local instance of enqueue tail struct, only encoded if XID is present
public:
@@ -59,6 +59,7 @@ public:
inline std::size_t data_size() const { return 0; } // This record never carries data
std::size_t xid_size() const;
std::size_t rec_size() const;
+ void check_rec_tail() const;
private:
virtual void clean();
diff --git a/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp b/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp
index a4f19b3a7b..f95a722308 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp
@@ -34,7 +34,8 @@ enq_rec::enq_rec():
jrec(), // superclass
_xidp(0),
_data(0),
- _buff(0)
+ _xid_buff(0),
+ _data_buff(0)
{
::enq_hdr_init(&_enq_hdr, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, 0, false);
::rec_tail_copy(&_enq_tail, &_enq_hdr._rhdr, 0);
@@ -57,7 +58,6 @@ enq_rec::reset(const uint64_t serial, const uint64_t rid, const void* const dbuf
_enq_hdr._dsize = dlen;
_xidp = xidp;
_data = dbuf;
- _buff = 0;
_enq_tail._serial = serial;
_enq_tail._rid = rid;
}
@@ -229,15 +229,20 @@ enq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs)
rec_offs = sizeof(::enq_hdr_t);
if (_enq_hdr._xidsize > 0)
{
- _buff = std::malloc(_enq_hdr._xidsize);
- MALLOC_CHK(_buff, "_buff", "enq_rec", "rcv_decode");
+ _xid_buff = std::malloc(_enq_hdr._xidsize);
+ MALLOC_CHK(_xid_buff, "_xid_buff", "enq_rec", "decode");
+ }
+ if (_enq_hdr._dsize > 0)
+ {
+ _data_buff = std::malloc(_enq_hdr._dsize);
+ MALLOC_CHK(_data_buff, "_data_buff", "enq_rec", "decode")
}
}
if (rec_offs < sizeof(_enq_hdr) + _enq_hdr._xidsize)
{
// Read xid (or continue reading xid)
std::size_t offs = rec_offs - sizeof(_enq_hdr);
- ifsp->read((char*)_buff + offs, _enq_hdr._xidsize - offs);
+ ifsp->read((char*)_xid_buff + offs, _enq_hdr._xidsize - offs);
std::size_t size_read = ifsp->gcount();
rec_offs += size_read;
if (size_read < _enq_hdr._xidsize - offs)
@@ -253,9 +258,9 @@ enq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs)
{
if (rec_offs < sizeof(_enq_hdr) + _enq_hdr._xidsize + _enq_hdr._dsize)
{
- // Ignore data (or continue ignoring data)
+ // Read data (or continue reading data)
std::size_t offs = rec_offs - sizeof(_enq_hdr) - _enq_hdr._xidsize;
- ifsp->ignore(_enq_hdr._dsize - offs);
+ ifsp->read((char*)_data_buff + offs, _enq_hdr._dsize - offs);
std::size_t size_read = ifsp->gcount();
rec_offs += size_read;
if (size_read < _enq_hdr._dsize - offs)
@@ -286,6 +291,7 @@ enq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs)
assert(!ifsp->fail() && !ifsp->bad());
return false;
}
+ check_rec_tail();
}
ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
assert(!ifsp->fail() && !ifsp->bad());
@@ -295,27 +301,25 @@ enq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs)
std::size_t
enq_rec::get_xid(void** const xidpp)
{
- if (!_buff || !_enq_hdr._xidsize)
- {
+ if (!_xid_buff || !_enq_hdr._xidsize) {
*xidpp = 0;
return 0;
}
- *xidpp = _buff;
+ *xidpp = _xid_buff;
return _enq_hdr._xidsize;
}
std::size_t
enq_rec::get_data(void** const datapp)
{
- if (!_buff)
- {
+ if (!_data_buff) {
*datapp = 0;
return 0;
}
if (::is_enq_external(&_enq_hdr))
*datapp = 0;
else
- *datapp = (void*)((char*)_buff + _enq_hdr._xidsize);
+ *datapp = _data_buff;
return _enq_hdr._dsize;
}
@@ -348,9 +352,46 @@ enq_rec::rec_size(const std::size_t xidsize, const std::size_t dsize, const bool
}
void
-enq_rec::clean()
-{
- // clean up allocated memory here
+enq_rec::check_rec_tail() const {
+ Checksum checksum;
+ checksum.addData((const unsigned char*)&_enq_hdr, sizeof(::enq_hdr_t));
+ if (_enq_hdr._xidsize > 0) {
+ checksum.addData((const unsigned char*)_xid_buff, _enq_hdr._xidsize);
+ }
+ if (_enq_hdr._dsize > 0) {
+ checksum.addData((const unsigned char*)_data_buff, _enq_hdr._dsize);
+ }
+ uint32_t cs = checksum.getChecksum();
+ uint16_t res = ::rec_tail_check(&_enq_tail, &_enq_hdr._rhdr, cs);
+ if (res != 0) {
+ std::stringstream oss;
+ oss << std::hex;
+ if (res & ::REC_TAIL_MAGIC_ERR_MASK) {
+ oss << std::endl << " Magic: expected 0x" << ~_enq_hdr._rhdr._magic << "; found 0x" << _enq_tail._xmagic;
+ }
+ if (res & ::REC_TAIL_SERIAL_ERR_MASK) {
+ oss << std::endl << " Serial: expected 0x" << _enq_hdr._rhdr._serial << "; found 0x" << _enq_tail._serial;
+ }
+ if (res & ::REC_TAIL_RID_ERR_MASK) {
+ oss << std::endl << " Record Id: expected 0x" << _enq_hdr._rhdr._rid << "; found 0x" << _enq_tail._rid;
+ }
+ if (res & ::REC_TAIL_CHECKSUM_ERR_MASK) {
+ oss << std::endl << " Checksum: expected 0x" << cs << "; found 0x" << _enq_tail._checksum;
+ }
+ throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "enq_rec", "check_rec_tail");
+ }
+}
+
+void
+enq_rec::clean() {
+ if (_xid_buff) {
+ std::free(_xid_buff);
+ _xid_buff = 0;
+ }
+ if (_data_buff) {
+ std::free(_data_buff);
+ _data_buff = 0;
+ }
}
}}}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h b/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h
index 439203c052..1655e2cc4d 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h
@@ -40,7 +40,8 @@ private:
::enq_hdr_t _enq_hdr; ///< Local instance of enqueue header struct
const void* _xidp; ///< xid pointer for encoding (for writing to disk)
const void* _data; ///< Pointer to data to be written to disk
- void* _buff; ///< Pointer to buffer to receive data read from disk
+ void* _xid_buff;
+ void* _data_buff;
::rec_tail_t _enq_tail; ///< Local instance of enqueue tail struct
public:
@@ -62,6 +63,7 @@ public:
std::size_t rec_size() const;
static std::size_t rec_size(const std::size_t xidsize, const std::size_t dsize, const bool external);
inline uint64_t rid() const { return _enq_hdr._rhdr._rid; }
+ void check_rec_tail() const;
private:
virtual void clean();
diff --git a/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp b/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp
index fa5f83cd24..1368fd4be2 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp
@@ -32,7 +32,7 @@ namespace journal {
txn_rec::txn_rec():
_xidp(0),
- _buff(0)
+ _xid_buff(0)
{
::txn_hdr_init(&_txn_hdr, 0, QLS_JRNL_VERSION, 0, 0, 0, 0);
::rec_tail_init(&_txn_tail, 0, 0, 0, 0);
@@ -52,7 +52,7 @@ txn_rec::reset(const bool commitFlag, const uint64_t serial, const uint64_t rid
_txn_hdr._rhdr._rid = rid;
_txn_hdr._xidsize = xidlen;
_xidp = xidp;
- _buff = 0;
+ _xid_buff = 0;
_txn_tail._xmagic = ~_txn_hdr._rhdr._magic;
_txn_tail._serial = serial;
_txn_tail._rid = rid;
@@ -184,14 +184,14 @@ txn_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs)
::rec_hdr_copy(&_txn_hdr._rhdr, &h);
ifsp->read((char*)&_txn_hdr._xidsize, sizeof(_txn_hdr._xidsize));
rec_offs = sizeof(::txn_hdr_t);
- _buff = std::malloc(_txn_hdr._xidsize);
- MALLOC_CHK(_buff, "_buff", "txn_rec", "rcv_decode");
+ _xid_buff = std::malloc(_txn_hdr._xidsize);
+ MALLOC_CHK(_xid_buff, "_buff", "txn_rec", "rcv_decode");
}
if (rec_offs < sizeof(txn_hdr_t) + _txn_hdr._xidsize)
{
// Read xid (or continue reading xid)
std::size_t offs = rec_offs - sizeof(txn_hdr_t);
- ifsp->read((char*)_buff + offs, _txn_hdr._xidsize - offs);
+ ifsp->read((char*)_xid_buff + offs, _txn_hdr._xidsize - offs);
std::size_t size_read = ifsp->gcount();
rec_offs += size_read;
if (size_read < _txn_hdr._xidsize - offs)
@@ -218,39 +218,23 @@ txn_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs)
assert(!ifsp->fail() && !ifsp->bad());
return false;
}
+ check_rec_tail();
}
ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
assert(!ifsp->fail() && !ifsp->bad());
assert(_txn_hdr._xidsize > 0);
-
- Checksum checksum;
- checksum.addData((unsigned char*)&_txn_hdr, sizeof(_txn_hdr));
- checksum.addData((unsigned char*)_buff, _txn_hdr._xidsize);
- uint32_t cs = checksum.getChecksum();
- int res = ::rec_tail_check(&_txn_tail, &_txn_hdr._rhdr, cs);
- if (res != 0) {
- std::stringstream oss;
- switch (res) {
- case 1: oss << std::hex << "Magic: expected 0x" << ~_txn_hdr._rhdr._magic << "; found 0x" << _txn_tail._xmagic; break;
- case 2: oss << std::hex << "Serial: expected 0x" << _txn_hdr._rhdr._serial << "; found 0x" << _txn_tail._serial; break;
- case 3: oss << std::hex << "Record Id: expected 0x" << _txn_hdr._rhdr._rid << "; found 0x" << _txn_tail._rid; break;
- case 4: oss << std::hex << "Checksum: expected 0x" << cs << "; found 0x" << _txn_tail._checksum; break;
- default: oss << "Unknown error " << res;
- }
- throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "txn_rec", "decode"); // TODO: Don't throw exception, log info
- }
return true;
}
std::size_t
txn_rec::get_xid(void** const xidpp)
{
- if (!_buff)
+ if (!_xid_buff)
{
*xidpp = 0;
return 0;
}
- *xidpp = _buff;
+ *xidpp = _xid_buff;
return _txn_hdr._xidsize;
}
@@ -282,9 +266,40 @@ txn_rec::rec_size() const
}
void
+txn_rec::check_rec_tail() const {
+ Checksum checksum;
+ checksum.addData((const unsigned char*)&_txn_hdr, sizeof(::txn_hdr_t));
+ if (_txn_hdr._xidsize > 0) {
+ checksum.addData((const unsigned char*)_xid_buff, _txn_hdr._xidsize);
+ }
+ uint32_t cs = checksum.getChecksum();
+ uint16_t res = ::rec_tail_check(&_txn_tail, &_txn_hdr._rhdr, cs);
+ if (res != 0) {
+ std::stringstream oss;
+ oss << std::hex;
+ if (res & ::REC_TAIL_MAGIC_ERR_MASK) {
+ oss << std::endl << " Magic: expected 0x" << ~_txn_hdr._rhdr._magic << "; found 0x" << _txn_tail._xmagic;
+ }
+ if (res & ::REC_TAIL_SERIAL_ERR_MASK) {
+ oss << std::endl << " Serial: expected 0x" << _txn_hdr._rhdr._serial << "; found 0x" << _txn_tail._serial;
+ }
+ if (res & ::REC_TAIL_RID_ERR_MASK) {
+ oss << std::endl << " Record Id: expected 0x" << _txn_hdr._rhdr._rid << "; found 0x" << _txn_tail._rid;
+ }
+ if (res & ::REC_TAIL_CHECKSUM_ERR_MASK) {
+ oss << std::endl << " Checksum: expected 0x" << cs << "; found 0x" << _txn_tail._checksum;
+ }
+ throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "txn_rec", "check_rec_tail");
+ }
+}
+
+void
txn_rec::clean()
{
- // clean up allocated memory here
+ if (_xid_buff) {
+ std::free(_xid_buff);
+ _xid_buff = 0;
+ }
}
}}}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h b/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h
index a9224a4a01..daca16d9d4 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h
@@ -39,7 +39,7 @@ class txn_rec : public jrec
private:
::txn_hdr_t _txn_hdr; ///< Local instance of transaction header struct
const void* _xidp; ///< xid pointer for encoding (writing to disk)
- void* _buff; ///< Pointer to buffer to receive data read from disk
+ void* _xid_buff; ///< Pointer to buffer to receive xid read from disk
::rec_tail_t _txn_tail; ///< Local instance of enqueue tail struct
public:
@@ -57,6 +57,7 @@ public:
std::size_t xid_size() const;
std::size_t rec_size() const;
inline uint64_t rid() const { return _txn_hdr._rhdr._rid; }
+ void check_rec_tail() const;
private:
virtual void clean();
diff --git a/qpid/cpp/src/qpid/linearstore/journal/utils/rec_tail.c b/qpid/cpp/src/qpid/linearstore/journal/utils/rec_tail.c
index 88c68e2b78..7128c96f32 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/utils/rec_tail.c
+++ b/qpid/cpp/src/qpid/linearstore/journal/utils/rec_tail.c
@@ -36,10 +36,11 @@ void rec_tail_copy(rec_tail_t* dest, const rec_hdr_t* src, const uint32_t checks
dest->_rid = src->_rid;
}
-int rec_tail_check(const rec_tail_t* tail, const rec_hdr_t* header, const uint32_t checksum) {
- if (tail->_xmagic != ~header->_magic) return 1;
- if (tail->_serial != header->_serial) return 2;
- if (tail->_rid != header->_rid) return 3;
- if (tail->_checksum != checksum) return 4;
- return 0;
+uint16_t rec_tail_check(const rec_tail_t* tail, const rec_hdr_t* header, const uint32_t checksum) {
+ uint16_t err = 0;
+ if (tail->_xmagic != ~header->_magic) err |= REC_TAIL_MAGIC_ERR_MASK;
+ if (tail->_serial != header->_serial) err |= REC_TAIL_SERIAL_ERR_MASK;
+ if (tail->_rid != header->_rid) err |= REC_TAIL_RID_ERR_MASK;
+ if (tail->_checksum != checksum) err |= REC_TAIL_CHECKSUM_ERR_MASK;
+ return err;
}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/utils/rec_tail.h b/qpid/cpp/src/qpid/linearstore/journal/utils/rec_tail.h
index 5163580ead..afc71c104a 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/utils/rec_tail.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/utils/rec_tail.h
@@ -63,10 +63,15 @@ typedef struct rec_tail_t {
uint64_t _rid; /**< Record ID (rotating 64-bit counter) */
} rec_tail_t;
+static const uint16_t REC_TAIL_MAGIC_ERR_MASK = 0x01;
+static const uint16_t REC_TAIL_SERIAL_ERR_MASK = 0x02;
+static const uint16_t REC_TAIL_RID_ERR_MASK = 0x04;
+static const uint16_t REC_TAIL_CHECKSUM_ERR_MASK = 0x08;
+
void rec_tail_init(rec_tail_t* dest, const uint32_t xmagic, const uint32_t checksum, const uint64_t serial,
const uint64_t rid);
void rec_tail_copy(rec_tail_t* dest, const rec_hdr_t* src, const uint32_t checksum);
-int rec_tail_check(const rec_tail_t* tail, const rec_hdr_t* header, const uint32_t checksum);
+uint16_t rec_tail_check(const rec_tail_t* tail, const rec_hdr_t* header, const uint32_t checksum);
#pragma pack()