diff options
author | Kim van der Riet <kpvdr@apache.org> | 2014-01-15 22:07:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2014-01-15 22:07:30 +0000 |
commit | 810ac4c70346730fb7103e823f26e30e7f68a731 (patch) | |
tree | c29102bd9924d8b9dbefb6321f2f60513824c008 | |
parent | a6e5e1fa9e750c8667cabfb752f0d5174eba70a5 (diff) | |
download | qpid-python-810ac4c70346730fb7103e823f26e30e7f68a731.tar.gz |
QPID-5483: [linearstore] Recovery of journal with partly written record fails with "JERR_JREC_BADRECTAIL: Invalid data record tail" error message
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1558589 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/linearstore/ISSUES | 104 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/Checksum.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp | 40 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp | 66 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/deq_rec.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp | 73 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/enq_rec.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp | 65 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/txn_rec.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/utils/rec_tail.c | 13 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/utils/rec_tail.h | 7 |
11 files changed, 247 insertions, 139 deletions
diff --git a/qpid/cpp/src/qpid/linearstore/ISSUES b/qpid/cpp/src/qpid/linearstore/ISSUES index 412db073df..8b5f8524dc 100644 --- a/qpid/cpp/src/qpid/linearstore/ISSUES +++ b/qpid/cpp/src/qpid/linearstore/ISSUES @@ -17,52 +17,70 @@ # under the License. # -LinearStore issues: +Linear Store issues: -Store: ------- +Current/pending: +================ + Q-JIRA RHBZ Description / Comments + ------ ------- ---------------------- + 5359 - Linearstore: Implement new management schema and wire into store + 5360 - Linearstore: Evaluate and rework logging to produce a consistent log outputConsistent logging + 5361 - Linearstore: No tests for linearstore functionality currently exist + * No existing tests for linearstore: + ** Basic broker-level tests for txn and non-txn recovery + ** Store-level tests which check write boundary conditions + ** EFP tests, including file recovery, error management + ** Unit tests + ** Basic performance tests + 5362 - Linearstore: No store tools exist for examining the journals + svn r.1558888 2014-01-09: WIP checkin for linearstore version of qpid_qls_analyze. Needs testing and tidy-up. + * Store analysis and status + * Recovery/reading of message content + * Empty file pool status and management + 5464 - [linearstore] Incompletely created journal files accumulate in EFP + 5479 1053701 [linearstore] Using recovered store results in "JERR_JNLF_FILEOFFSOVFL: Attempted to increase submitted offset past file size. (JournalFile::submittedDblkCount)" error message + 5480 1053749 [linearstore] Recovery of store failure with "JERR_MAP_NOTFOUND: Key not found in map." error message + - 1035843 Slow performance for producers + - 1036026 [LinearStore] Qpid linear store unable to create durable queue - framing-error: Queue <q-name>: create() failed: jexception 0x0000 + UNABLE TO REPRODUCE - but Frantizek has additional info + - 1039522 Qpid crashes while recovering from linear store around apid::linearstore::journal::JournalFile::getFqFileName() including enq_rec::decode() threw JERR_JREC_BAD_RECTAIL + - 1039525 Qpid crashes while recovering from linear store around apid::linearstore::journal::jexception::format including enq_rec::decode() threw JERR_JREC_BAD_REC_TAIL + * Possible dup of 1039522 -1. (FIXED) Overwrite identity: When recovering a previously used file, if the write boundary coincides with old record - start, no way of discriminating old from new at boundary (used to use OWI). +Fixed/closed: +============= + Q-JIRA RHBZ Description / Comments + ------ ------- ---------------------- + 5357 1052518 Linearstore: Empty file recycling not functional + svn r.1545563 2013-11-26: Propsed fix + 5358 1052727 Linearstore: Checksums not implemented in record tail + svn r.1547601 2013-12-03: Propsed fix + 5387 1036071 Linearstore: Segmentation fault when deleting queue + svn r.1547641 2013-12-03: Propsed fix + 5388 1035802 Linearstore: Segmentation fault when recovering empty queue + svn r.1547921 2013-12-04: Propsed fix +NO-JIRA - Added missing Apache copyright/license text + svn r.1551304 2013-12-16: Propsed fix + 5425 1052445 Linearstore: Transaction Prepared List (TPL) fails with jexception 0x0402 AtomicCounter::addLimit() threw JERR_JNLF_FILEOFFSOVFL + svn r.1551361 2013-12-16: Proposed fix + 5442 1039949 Linearstore: Dtx recover test fails + svn r.1552772 2013-12-20: Proposed fix + 5444 1052775 Linearstore: Recovering from qpid-txtest fails with "Inconsistent TPL 2PC count" error message + svn r.1553148 2013-12-23: Proposed fix + - 1038599 [LinearStore] Abort when deleting used queue after restart + CLOSED-NOTABUG 2014-01-06 + 5460 1051097 [linearstore] Recovery of store which contains prepared but incomplete transactions results in message loss + svn r.1556892 2014-01-09: Proposed fix + 5473 1051924 [linearstore] Recovery of journal in which last logical file contains truncated record causes crash + svn r.1557620 2014-01-12: Proposed fix -2. (FIXED) QPID-5357: Recycling files while in use not working, however, files are recovered to EFP during recovery. Must solve - #1 first. - -3. (FIXED) QPID-5358: Checksum not implemented in record tail, not checked during read. - -4. QPID-5359: Rework qpid management parameters and controls (QMF). - -5. QPID-5360: Consistent logging: rework logging to provide uniform and consistent logging from store (both logging - level and places where logging occurs). - -6. QPID-5361: No tests - * No existing tests for linearstore: - ** Basic broker-level tests for txn and non-txn recovery - ** Store-level tests which check write boundary conditions - ** Unit tests - ** Basic performance tests - -7: QPID-5362: No tools - * Store analysis and status - * Recovery/reading of message content - -8. One journal file lost when queue deleted. All files except for one are recycled back to the EFP. - -9. Complete exceptions - several exceptions thrown using jexception have no exception numbers - -Current bugs and performance issues: ------------------------------------- -1. BZ 1035843 - Slow performance for producers -2. (FIXED) QPID-5387 (BZ 1036071) - Crash when deleting queue -3. (FIXED) QPID-5388 (BZ 1035802) - Segmentation fault when recovering empty queue -4. (UNABLE TO REPRODUCE) BZ 1036026 - Unable to create durable queue - framing error - possibly caused by running both stores at the same time -5. (UNABLE TO REPRODUCE) BZ 1038599 - Abort when deleting used queue after restart - may be dup of QPID-5387 (BZ 1036071) -6. BZ 1039522 - Crash during recovery - JournalFile::getFqFileName() -JERR_JREC_BADRECTAIL -7. BZ 1039525 - Crash during recovery - journal::jexception - JERR_JREC_BADRECTAIL -8. (FIXED) QPID-5442 (BZ 1039949) - DTX test failure - missing XIDs -9. (FIXED) QPID-5460 (BZ 1051097) - Transactional messages lost during recovery -10. QPID-5464 - Incompletely created journal files accumulate in EFP -11. QPID-5473 (BZ 1051924) - Recovery where last record in file is truncated (ie spans files), but following file is uninitialized causes crash +Future: +======= +* One journal file lost when queue deleted. All files except for one are recycled back to the EFP. +* Complete exceptions - several exceptions thrown using jexception have no exception numbers +* Investigate ability of store to detect missing journal files, especially from logical end of a journal +* Investigate ability of store to handle file muddle-ups (ie journal files from EFP which are not zeroed or other journals) +* Look at improving the efficiency of recovery - right now the entire store is read once, and then each recovered record xid and data is read again Code tidy-up ------------ diff --git a/qpid/cpp/src/qpid/linearstore/journal/Checksum.cpp b/qpid/cpp/src/qpid/linearstore/journal/Checksum.cpp index 89d654ce76..eaede12d8e 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/Checksum.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/Checksum.cpp @@ -30,9 +30,11 @@ Checksum::Checksum() : a(1UL), b(0UL), MOD_ADLER(65521UL) {} Checksum::~Checksum() {} void Checksum::addData(const unsigned char* data, const std::size_t len) { - for (uint32_t i = 0; i < len; i++) { - a = (a + data[i]) % MOD_ADLER; - b = (a + b) % MOD_ADLER; + if (data) { + for (uint32_t i = 0; i < len; i++) { + a = (a + data[i]) % MOD_ADLER; + b = (a + b) % MOD_ADLER; + } } } diff --git a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp index e27a239a18..72308cc929 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp @@ -221,28 +221,34 @@ bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr, // Check enqueue record checksum Checksum checksum; - checksum.addData((unsigned char*)&enqueueHeader, sizeof(::enq_hdr_t)); + checksum.addData((const unsigned char*)&enqueueHeader, sizeof(::enq_hdr_t)); if (xidSize > 0) { - checksum.addData((unsigned char*)*xidPtrPtr, xidSize); + checksum.addData((const unsigned char*)*xidPtrPtr, xidSize); } if (dataSize > 0) { - checksum.addData((unsigned char*)*dataPtrPtr, dataSize); + checksum.addData((const unsigned char*)*dataPtrPtr, dataSize); } ::rec_tail_t enqueueTail; inFileStream_.read((char*)&enqueueTail, sizeof(::rec_tail_t)); uint32_t cs = checksum.getChecksum(); //std::cout << std::hex << "### rid=0x" << enqueueHeader._rhdr._rid << " rtcs=0x" << enqueueTail._checksum << " cs=0x" << cs << std::dec << std::endl; // DEBUG - int res = ::rec_tail_check(&enqueueTail, &enqueueHeader._rhdr, cs); + uint16_t res = ::rec_tail_check(&enqueueTail, &enqueueHeader._rhdr, cs); if (res != 0) { std::stringstream oss; - switch (res) { - case 1: oss << std::hex << "Magic: expected 0x" << ~enqueueHeader._rhdr._magic << "; found 0x" << enqueueTail._xmagic; break; - case 2: oss << std::hex << "Serial: expected 0x" << enqueueHeader._rhdr._serial << "; found 0x" << enqueueTail._serial; break; - case 3: oss << std::hex << "Record Id: expected 0x" << enqueueHeader._rhdr._rid << "; found 0x" << enqueueTail._rid; break; - case 4: oss << std::hex << "Checksum: expected 0x" << cs << "; found 0x" << enqueueTail._checksum; break; - default: oss << "Unknown error " << res; + oss << "Bad record tail:" << std::hex; + if (res & ::REC_TAIL_MAGIC_ERR_MASK) { + oss << std::endl << " Magic: expected 0x" << ~enqueueHeader._rhdr._magic << "; found 0x" << enqueueTail._xmagic; } - throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "enq_rec", "decode"); // TODO: Don't throw exception, log info + if (res & ::REC_TAIL_SERIAL_ERR_MASK) { + oss << std::endl << " Serial: expected 0x" << enqueueHeader._rhdr._serial << "; found 0x" << enqueueTail._serial; + } + if (res & ::REC_TAIL_RID_ERR_MASK) { + oss << std::endl << " Record Id: expected 0x" << enqueueHeader._rhdr._rid << "; found 0x" << enqueueTail._rid; + } + if (res & ::REC_TAIL_CHECKSUM_ERR_MASK) { + oss << std::endl << " Checksum: expected 0x" << cs << "; found 0x" << enqueueTail._checksum; + } + throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "RecoveryManager", "readNextRemainingRecord"); // TODO: Don't throw exception, log info } // Set data token @@ -472,7 +478,13 @@ bool RecoveryManager::decodeRecord(jrec& record, done = record.decode(headerRecord, &inFileStream_, cumulativeSizeRead); } catch (const jexception& e) { - journalLogRef_.log(JournalLog::LOG_INFO, queueName_, e.what()); + if (e.err_code() == jerrno::JERR_JREC_BADRECTAIL) { + std::ostringstream oss; + oss << jerrno::err_msg(e.err_code()) << e.additional_info(); + journalLogRef_.log(JournalLog::LOG_INFO, queueName_, oss.str()); + } else { + journalLogRef_.log(JournalLog::LOG_INFO, queueName_, e.what()); + } checkJournalAlignment(start_file_offs); return false; } @@ -602,7 +614,6 @@ bool RecoveryManager::getNextRecordHeader() oss << std::hex << "_tmap.set_aio_compl: txn_enq xid=\"" << xid << "\" rid=0x" << h._rid; throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "RecoveryManager", "getNextRecordHeader"); } - std::free(xidp); } else { if (enqueueMapRef_.insert_pfid(h._rid, start_fid, file_pos) < enq_map::EMAP_OK) { // fail // The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID. @@ -641,7 +652,6 @@ bool RecoveryManager::getNextRecordHeader() oss << std::hex << "_tmap.set_aio_compl: txn_deq xid=\"" << xid << "\" rid=0x" << dr.rid(); throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "RecoveryManager", "getNextRecordHeader"); } - std::free(xidp); } else { uint64_t enq_fid; if (enqueueMapRef_.get_remove_pfid(dr.deq_rid(), enq_fid, true) == enq_map::EMAP_OK) { // ignore not found error @@ -675,7 +685,6 @@ bool RecoveryManager::getNextRecordHeader() enqueueMapRef_.unlock(itr->drid_); // ignore not found error } } - std::free(xidp); } break; case QLS_TXC_MAGIC: @@ -711,7 +720,6 @@ bool RecoveryManager::getNextRecordHeader() fileNumberMap_[enq_fid]->decrEnqueuedRecordCount(); } } - std::free(xidp); } break; case QLS_EMPTY_MAGIC: diff --git a/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp b/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp index 8b9e9d7f64..a4882aaa9c 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp @@ -32,7 +32,7 @@ namespace journal { deq_rec::deq_rec(): _xidp(0), - _buff(0) + _xid_buff(0) { ::deq_hdr_init(&_deq_hdr, QLS_DEQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, 0, 0); ::rec_tail_copy(&_deq_tail, &_deq_hdr._rhdr, 0); @@ -53,7 +53,7 @@ deq_rec::reset(const uint64_t serial, const uint64_t rid, const uint64_t drid, _deq_hdr._deq_rid = drid; _deq_hdr._xidsize = xidlen; _xidp = xidp; - _buff = 0; + _xid_buff = 0; _deq_tail._serial = serial; _deq_tail._rid = rid; _deq_tail._checksum = 0UL; @@ -192,15 +192,15 @@ deq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) // Read header, allocate (if req'd) for xid if (_deq_hdr._xidsize) { - _buff = std::malloc(_deq_hdr._xidsize); - MALLOC_CHK(_buff, "_buff", "enq_rec", "rcv_decode"); + _xid_buff = std::malloc(_deq_hdr._xidsize); + MALLOC_CHK(_xid_buff, "_buff", "enq_rec", "rcv_decode"); } } if (rec_offs < sizeof(_deq_hdr) + _deq_hdr._xidsize) { // Read xid (or continue reading xid) std::size_t offs = rec_offs - sizeof(_deq_hdr); - ifsp->read((char*)_buff + offs, _deq_hdr._xidsize - offs); + ifsp->read((char*)_xid_buff + offs, _deq_hdr._xidsize - offs); std::size_t size_read = ifsp->gcount(); rec_offs += size_read; if (size_read < _deq_hdr._xidsize - offs) @@ -228,39 +228,22 @@ deq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) assert(!ifsp->fail() && !ifsp->bad()); return false; } + check_rec_tail(); } ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size()); assert(!ifsp->fail() && !ifsp->bad()); - if (_deq_hdr._xidsize) { - Checksum checksum; - checksum.addData((unsigned char*)&_deq_hdr, sizeof(_deq_hdr)); - checksum.addData((unsigned char*)_buff, _deq_hdr._xidsize); - uint32_t cs = checksum.getChecksum(); - int res = ::rec_tail_check(&_deq_tail, &_deq_hdr._rhdr, cs); - if (res != 0) { - std::stringstream oss; - switch (res) { - case 1: oss << std::hex << "Magic: expected 0x" << ~_deq_hdr._rhdr._magic << "; found 0x" << _deq_tail._xmagic; break; - case 2: oss << std::hex << "Serial: expected 0x" << _deq_hdr._rhdr._serial << "; found 0x" << _deq_tail._serial; break; - case 3: oss << std::hex << "Record Id: expected 0x" << _deq_hdr._rhdr._rid << "; found 0x" << _deq_tail._rid; break; - case 4: oss << std::hex << "Checksum: expected 0x" << cs << "; found 0x" << _deq_tail._checksum; break; - default: oss << "Unknown error " << res; - } - throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "deq_rec", "decode"); // TODO: Don't throw exception, log info - } - } return true; } std::size_t deq_rec::get_xid(void** const xidpp) { - if (!_buff) + if (!_xid_buff) { *xidpp = 0; return 0; } - *xidpp = _buff; + *xidpp = _xid_buff; return _deq_hdr._xidsize; } @@ -291,9 +274,40 @@ deq_rec::rec_size() const } void +deq_rec::check_rec_tail() const { + Checksum checksum; + checksum.addData((const unsigned char*)&_deq_hdr, sizeof(::deq_hdr_t)); + if (_deq_hdr._xidsize > 0) { + checksum.addData((const unsigned char*)_xid_buff, _deq_hdr._xidsize); + } + uint32_t cs = checksum.getChecksum(); + uint16_t res = ::rec_tail_check(&_deq_tail, &_deq_hdr._rhdr, cs); + if (res != 0) { + std::stringstream oss; + oss << std::hex; + if (res & ::REC_TAIL_MAGIC_ERR_MASK) { + oss << std::endl << " Magic: expected 0x" << ~_deq_hdr._rhdr._magic << "; found 0x" << _deq_tail._xmagic; + } + if (res & ::REC_TAIL_SERIAL_ERR_MASK) { + oss << std::endl << " Serial: expected 0x" << _deq_hdr._rhdr._serial << "; found 0x" << _deq_tail._serial; + } + if (res & ::REC_TAIL_RID_ERR_MASK) { + oss << std::endl << " Record Id: expected 0x" << _deq_hdr._rhdr._rid << "; found 0x" << _deq_tail._rid; + } + if (res & ::REC_TAIL_CHECKSUM_ERR_MASK) { + oss << std::endl << " Checksum: expected 0x" << cs << "; found 0x" << _deq_tail._checksum; + } + throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "deq_rec", "check_rec_tail"); + } +} + +void deq_rec::clean() { - // clean up allocated memory here + if (_xid_buff) { + std::free(_xid_buff); + _xid_buff = 0; + } } }}} diff --git a/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h b/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h index c7f78e1215..ead0eed72a 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h +++ b/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h @@ -39,7 +39,7 @@ class deq_rec : public jrec private: ::deq_hdr_t _deq_hdr; ///< Local instance of dequeue header struct const void* _xidp; ///< xid pointer for encoding (writing to disk) - void* _buff; ///< Pointer to buffer to receive data read from disk + void* _xid_buff; ///< Pointer to buffer to receive xid read from disk ::rec_tail_t _deq_tail; ///< Local instance of enqueue tail struct, only encoded if XID is present public: @@ -59,6 +59,7 @@ public: inline std::size_t data_size() const { return 0; } // This record never carries data std::size_t xid_size() const; std::size_t rec_size() const; + void check_rec_tail() const; private: virtual void clean(); diff --git a/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp b/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp index a4f19b3a7b..f95a722308 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp @@ -34,7 +34,8 @@ enq_rec::enq_rec(): jrec(), // superclass _xidp(0), _data(0), - _buff(0) + _xid_buff(0), + _data_buff(0) { ::enq_hdr_init(&_enq_hdr, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, 0, false); ::rec_tail_copy(&_enq_tail, &_enq_hdr._rhdr, 0); @@ -57,7 +58,6 @@ enq_rec::reset(const uint64_t serial, const uint64_t rid, const void* const dbuf _enq_hdr._dsize = dlen; _xidp = xidp; _data = dbuf; - _buff = 0; _enq_tail._serial = serial; _enq_tail._rid = rid; } @@ -229,15 +229,20 @@ enq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) rec_offs = sizeof(::enq_hdr_t); if (_enq_hdr._xidsize > 0) { - _buff = std::malloc(_enq_hdr._xidsize); - MALLOC_CHK(_buff, "_buff", "enq_rec", "rcv_decode"); + _xid_buff = std::malloc(_enq_hdr._xidsize); + MALLOC_CHK(_xid_buff, "_xid_buff", "enq_rec", "decode"); + } + if (_enq_hdr._dsize > 0) + { + _data_buff = std::malloc(_enq_hdr._dsize); + MALLOC_CHK(_data_buff, "_data_buff", "enq_rec", "decode") } } if (rec_offs < sizeof(_enq_hdr) + _enq_hdr._xidsize) { // Read xid (or continue reading xid) std::size_t offs = rec_offs - sizeof(_enq_hdr); - ifsp->read((char*)_buff + offs, _enq_hdr._xidsize - offs); + ifsp->read((char*)_xid_buff + offs, _enq_hdr._xidsize - offs); std::size_t size_read = ifsp->gcount(); rec_offs += size_read; if (size_read < _enq_hdr._xidsize - offs) @@ -253,9 +258,9 @@ enq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) { if (rec_offs < sizeof(_enq_hdr) + _enq_hdr._xidsize + _enq_hdr._dsize) { - // Ignore data (or continue ignoring data) + // Read data (or continue reading data) std::size_t offs = rec_offs - sizeof(_enq_hdr) - _enq_hdr._xidsize; - ifsp->ignore(_enq_hdr._dsize - offs); + ifsp->read((char*)_data_buff + offs, _enq_hdr._dsize - offs); std::size_t size_read = ifsp->gcount(); rec_offs += size_read; if (size_read < _enq_hdr._dsize - offs) @@ -286,6 +291,7 @@ enq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) assert(!ifsp->fail() && !ifsp->bad()); return false; } + check_rec_tail(); } ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size()); assert(!ifsp->fail() && !ifsp->bad()); @@ -295,27 +301,25 @@ enq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) std::size_t enq_rec::get_xid(void** const xidpp) { - if (!_buff || !_enq_hdr._xidsize) - { + if (!_xid_buff || !_enq_hdr._xidsize) { *xidpp = 0; return 0; } - *xidpp = _buff; + *xidpp = _xid_buff; return _enq_hdr._xidsize; } std::size_t enq_rec::get_data(void** const datapp) { - if (!_buff) - { + if (!_data_buff) { *datapp = 0; return 0; } if (::is_enq_external(&_enq_hdr)) *datapp = 0; else - *datapp = (void*)((char*)_buff + _enq_hdr._xidsize); + *datapp = _data_buff; return _enq_hdr._dsize; } @@ -348,9 +352,46 @@ enq_rec::rec_size(const std::size_t xidsize, const std::size_t dsize, const bool } void -enq_rec::clean() -{ - // clean up allocated memory here +enq_rec::check_rec_tail() const { + Checksum checksum; + checksum.addData((const unsigned char*)&_enq_hdr, sizeof(::enq_hdr_t)); + if (_enq_hdr._xidsize > 0) { + checksum.addData((const unsigned char*)_xid_buff, _enq_hdr._xidsize); + } + if (_enq_hdr._dsize > 0) { + checksum.addData((const unsigned char*)_data_buff, _enq_hdr._dsize); + } + uint32_t cs = checksum.getChecksum(); + uint16_t res = ::rec_tail_check(&_enq_tail, &_enq_hdr._rhdr, cs); + if (res != 0) { + std::stringstream oss; + oss << std::hex; + if (res & ::REC_TAIL_MAGIC_ERR_MASK) { + oss << std::endl << " Magic: expected 0x" << ~_enq_hdr._rhdr._magic << "; found 0x" << _enq_tail._xmagic; + } + if (res & ::REC_TAIL_SERIAL_ERR_MASK) { + oss << std::endl << " Serial: expected 0x" << _enq_hdr._rhdr._serial << "; found 0x" << _enq_tail._serial; + } + if (res & ::REC_TAIL_RID_ERR_MASK) { + oss << std::endl << " Record Id: expected 0x" << _enq_hdr._rhdr._rid << "; found 0x" << _enq_tail._rid; + } + if (res & ::REC_TAIL_CHECKSUM_ERR_MASK) { + oss << std::endl << " Checksum: expected 0x" << cs << "; found 0x" << _enq_tail._checksum; + } + throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "enq_rec", "check_rec_tail"); + } +} + +void +enq_rec::clean() { + if (_xid_buff) { + std::free(_xid_buff); + _xid_buff = 0; + } + if (_data_buff) { + std::free(_data_buff); + _data_buff = 0; + } } }}} diff --git a/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h b/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h index 439203c052..1655e2cc4d 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h +++ b/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h @@ -40,7 +40,8 @@ private: ::enq_hdr_t _enq_hdr; ///< Local instance of enqueue header struct const void* _xidp; ///< xid pointer for encoding (for writing to disk) const void* _data; ///< Pointer to data to be written to disk - void* _buff; ///< Pointer to buffer to receive data read from disk + void* _xid_buff; + void* _data_buff; ::rec_tail_t _enq_tail; ///< Local instance of enqueue tail struct public: @@ -62,6 +63,7 @@ public: std::size_t rec_size() const; static std::size_t rec_size(const std::size_t xidsize, const std::size_t dsize, const bool external); inline uint64_t rid() const { return _enq_hdr._rhdr._rid; } + void check_rec_tail() const; private: virtual void clean(); diff --git a/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp b/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp index fa5f83cd24..1368fd4be2 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp @@ -32,7 +32,7 @@ namespace journal { txn_rec::txn_rec(): _xidp(0), - _buff(0) + _xid_buff(0) { ::txn_hdr_init(&_txn_hdr, 0, QLS_JRNL_VERSION, 0, 0, 0, 0); ::rec_tail_init(&_txn_tail, 0, 0, 0, 0); @@ -52,7 +52,7 @@ txn_rec::reset(const bool commitFlag, const uint64_t serial, const uint64_t rid _txn_hdr._rhdr._rid = rid; _txn_hdr._xidsize = xidlen; _xidp = xidp; - _buff = 0; + _xid_buff = 0; _txn_tail._xmagic = ~_txn_hdr._rhdr._magic; _txn_tail._serial = serial; _txn_tail._rid = rid; @@ -184,14 +184,14 @@ txn_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) ::rec_hdr_copy(&_txn_hdr._rhdr, &h); ifsp->read((char*)&_txn_hdr._xidsize, sizeof(_txn_hdr._xidsize)); rec_offs = sizeof(::txn_hdr_t); - _buff = std::malloc(_txn_hdr._xidsize); - MALLOC_CHK(_buff, "_buff", "txn_rec", "rcv_decode"); + _xid_buff = std::malloc(_txn_hdr._xidsize); + MALLOC_CHK(_xid_buff, "_buff", "txn_rec", "rcv_decode"); } if (rec_offs < sizeof(txn_hdr_t) + _txn_hdr._xidsize) { // Read xid (or continue reading xid) std::size_t offs = rec_offs - sizeof(txn_hdr_t); - ifsp->read((char*)_buff + offs, _txn_hdr._xidsize - offs); + ifsp->read((char*)_xid_buff + offs, _txn_hdr._xidsize - offs); std::size_t size_read = ifsp->gcount(); rec_offs += size_read; if (size_read < _txn_hdr._xidsize - offs) @@ -218,39 +218,23 @@ txn_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) assert(!ifsp->fail() && !ifsp->bad()); return false; } + check_rec_tail(); } ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size()); assert(!ifsp->fail() && !ifsp->bad()); assert(_txn_hdr._xidsize > 0); - - Checksum checksum; - checksum.addData((unsigned char*)&_txn_hdr, sizeof(_txn_hdr)); - checksum.addData((unsigned char*)_buff, _txn_hdr._xidsize); - uint32_t cs = checksum.getChecksum(); - int res = ::rec_tail_check(&_txn_tail, &_txn_hdr._rhdr, cs); - if (res != 0) { - std::stringstream oss; - switch (res) { - case 1: oss << std::hex << "Magic: expected 0x" << ~_txn_hdr._rhdr._magic << "; found 0x" << _txn_tail._xmagic; break; - case 2: oss << std::hex << "Serial: expected 0x" << _txn_hdr._rhdr._serial << "; found 0x" << _txn_tail._serial; break; - case 3: oss << std::hex << "Record Id: expected 0x" << _txn_hdr._rhdr._rid << "; found 0x" << _txn_tail._rid; break; - case 4: oss << std::hex << "Checksum: expected 0x" << cs << "; found 0x" << _txn_tail._checksum; break; - default: oss << "Unknown error " << res; - } - throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "txn_rec", "decode"); // TODO: Don't throw exception, log info - } return true; } std::size_t txn_rec::get_xid(void** const xidpp) { - if (!_buff) + if (!_xid_buff) { *xidpp = 0; return 0; } - *xidpp = _buff; + *xidpp = _xid_buff; return _txn_hdr._xidsize; } @@ -282,9 +266,40 @@ txn_rec::rec_size() const } void +txn_rec::check_rec_tail() const { + Checksum checksum; + checksum.addData((const unsigned char*)&_txn_hdr, sizeof(::txn_hdr_t)); + if (_txn_hdr._xidsize > 0) { + checksum.addData((const unsigned char*)_xid_buff, _txn_hdr._xidsize); + } + uint32_t cs = checksum.getChecksum(); + uint16_t res = ::rec_tail_check(&_txn_tail, &_txn_hdr._rhdr, cs); + if (res != 0) { + std::stringstream oss; + oss << std::hex; + if (res & ::REC_TAIL_MAGIC_ERR_MASK) { + oss << std::endl << " Magic: expected 0x" << ~_txn_hdr._rhdr._magic << "; found 0x" << _txn_tail._xmagic; + } + if (res & ::REC_TAIL_SERIAL_ERR_MASK) { + oss << std::endl << " Serial: expected 0x" << _txn_hdr._rhdr._serial << "; found 0x" << _txn_tail._serial; + } + if (res & ::REC_TAIL_RID_ERR_MASK) { + oss << std::endl << " Record Id: expected 0x" << _txn_hdr._rhdr._rid << "; found 0x" << _txn_tail._rid; + } + if (res & ::REC_TAIL_CHECKSUM_ERR_MASK) { + oss << std::endl << " Checksum: expected 0x" << cs << "; found 0x" << _txn_tail._checksum; + } + throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "txn_rec", "check_rec_tail"); + } +} + +void txn_rec::clean() { - // clean up allocated memory here + if (_xid_buff) { + std::free(_xid_buff); + _xid_buff = 0; + } } }}} diff --git a/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h b/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h index a9224a4a01..daca16d9d4 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h +++ b/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h @@ -39,7 +39,7 @@ class txn_rec : public jrec private: ::txn_hdr_t _txn_hdr; ///< Local instance of transaction header struct const void* _xidp; ///< xid pointer for encoding (writing to disk) - void* _buff; ///< Pointer to buffer to receive data read from disk + void* _xid_buff; ///< Pointer to buffer to receive xid read from disk ::rec_tail_t _txn_tail; ///< Local instance of enqueue tail struct public: @@ -57,6 +57,7 @@ public: std::size_t xid_size() const; std::size_t rec_size() const; inline uint64_t rid() const { return _txn_hdr._rhdr._rid; } + void check_rec_tail() const; private: virtual void clean(); diff --git a/qpid/cpp/src/qpid/linearstore/journal/utils/rec_tail.c b/qpid/cpp/src/qpid/linearstore/journal/utils/rec_tail.c index 88c68e2b78..7128c96f32 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/utils/rec_tail.c +++ b/qpid/cpp/src/qpid/linearstore/journal/utils/rec_tail.c @@ -36,10 +36,11 @@ void rec_tail_copy(rec_tail_t* dest, const rec_hdr_t* src, const uint32_t checks dest->_rid = src->_rid; } -int rec_tail_check(const rec_tail_t* tail, const rec_hdr_t* header, const uint32_t checksum) { - if (tail->_xmagic != ~header->_magic) return 1; - if (tail->_serial != header->_serial) return 2; - if (tail->_rid != header->_rid) return 3; - if (tail->_checksum != checksum) return 4; - return 0; +uint16_t rec_tail_check(const rec_tail_t* tail, const rec_hdr_t* header, const uint32_t checksum) { + uint16_t err = 0; + if (tail->_xmagic != ~header->_magic) err |= REC_TAIL_MAGIC_ERR_MASK; + if (tail->_serial != header->_serial) err |= REC_TAIL_SERIAL_ERR_MASK; + if (tail->_rid != header->_rid) err |= REC_TAIL_RID_ERR_MASK; + if (tail->_checksum != checksum) err |= REC_TAIL_CHECKSUM_ERR_MASK; + return err; } diff --git a/qpid/cpp/src/qpid/linearstore/journal/utils/rec_tail.h b/qpid/cpp/src/qpid/linearstore/journal/utils/rec_tail.h index 5163580ead..afc71c104a 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/utils/rec_tail.h +++ b/qpid/cpp/src/qpid/linearstore/journal/utils/rec_tail.h @@ -63,10 +63,15 @@ typedef struct rec_tail_t { uint64_t _rid; /**< Record ID (rotating 64-bit counter) */ } rec_tail_t; +static const uint16_t REC_TAIL_MAGIC_ERR_MASK = 0x01; +static const uint16_t REC_TAIL_SERIAL_ERR_MASK = 0x02; +static const uint16_t REC_TAIL_RID_ERR_MASK = 0x04; +static const uint16_t REC_TAIL_CHECKSUM_ERR_MASK = 0x08; + void rec_tail_init(rec_tail_t* dest, const uint32_t xmagic, const uint32_t checksum, const uint64_t serial, const uint64_t rid); void rec_tail_copy(rec_tail_t* dest, const rec_hdr_t* src, const uint32_t checksum); -int rec_tail_check(const rec_tail_t* tail, const rec_hdr_t* header, const uint32_t checksum); +uint16_t rec_tail_check(const rec_tail_t* tail, const rec_hdr_t* header, const uint32_t checksum); #pragma pack() |