diff options
author | Kim van der Riet <kpvdr@apache.org> | 2014-02-05 15:34:42 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2014-02-05 15:34:42 +0000 |
commit | cfae4006fc5f91a693c5edef075ab3c9d848a111 (patch) | |
tree | 90b2240036a494d1dc2dfe6d3d139563a2d7fc7c | |
parent | d7aae95e552fa9e06d8332a4ec8732a2cc667a0b (diff) | |
download | qpid-python-cfae4006fc5f91a693c5edef075ab3c9d848a111.tar.gz |
QPID-5362: Bugfixes and enhancements for qpid_qls_analyze
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1564808 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/tools/src/py/qls/efp.py | 3 | ||||
-rw-r--r-- | qpid/tools/src/py/qls/err.py | 10 | ||||
-rw-r--r-- | qpid/tools/src/py/qls/jrnl.py | 194 | ||||
-rwxr-xr-x | qpid/tools/src/py/qpid_qls_analyze.py | 8 |
4 files changed, 150 insertions, 65 deletions
diff --git a/qpid/tools/src/py/qls/efp.py b/qpid/tools/src/py/qls/efp.py index 3ad8104faa..abf289dc12 100644 --- a/qpid/tools/src/py/qls/efp.py +++ b/qpid/tools/src/py/qls/efp.py @@ -26,10 +26,11 @@ class EfpManager(object): Top level class to analyze the Qpid Linear Store (QLS) directory for the partitions that make up the Empty File Pool (EFP). """ - def __init__(self, directory): + def __init__(self, directory, args): if not os.path.exists(directory): raise qls.err.InvalidQlsDirectoryNameError(directory) self.directory = directory + self.args = args self.partitions = [] def report(self): print 'Found', len(self.partitions), 'partition(s).' diff --git a/qpid/tools/src/py/qls/err.py b/qpid/tools/src/py/qls/err.py index d56d739915..702fbb9520 100644 --- a/qpid/tools/src/py/qls/err.py +++ b/qpid/tools/src/py/qls/err.py @@ -30,6 +30,16 @@ class QlsRecordError(QlsError): QlsError.__init__(self) self.file_header = file_header self.record = record + def get_expected_fro(self): + return self.file_header.first_record_offset + def get_file_number(self): + return self.file_header.file_num + def get_queue_name(self): + return self.file_header.queue_name + def get_record_id(self): + return self.record.record_id + def get_record_offset(self): + return self.record.file_offset def __str__(self): return 'queue="%s" file_id=0x%x record_offset=0x%x record_id=0x%x' % \ (self.file_header.queue_name, self.file_header.file_num, self.record.file_offset, self.record.record_id) diff --git a/qpid/tools/src/py/qls/jrnl.py b/qpid/tools/src/py/qls/jrnl.py index ffd200ddba..5bce78bfad 100644 --- a/qpid/tools/src/py/qls/jrnl.py +++ b/qpid/tools/src/py/qls/jrnl.py @@ -23,6 +23,7 @@ import qls.err import string import struct from time import gmtime, strftime +import zlib class HighCounter(object): def __init__(self): @@ -39,10 +40,11 @@ class HighCounter(object): class JournalRecoveryManager(object): TPL_DIR_NAME = 'tpl' JRNL_DIR_NAME = 'jrnl' - def __init__(self, directory): + def __init__(self, directory, args): if not os.path.exists(directory): raise qls.err.InvalidQlsDirectoryNameError(directory) self.directory = directory + self.args = args self.tpl = None self.journals = {} self.high_rid_counter = HighCounter() @@ -54,20 +56,21 @@ class JournalRecoveryManager(object): def run(self, args): tpl_dir = os.path.join(self.directory, JournalRecoveryManager.TPL_DIR_NAME) if os.path.exists(tpl_dir): - self.tpl = Journal(tpl_dir, None) + self.tpl = Journal(tpl_dir, None, self.args) self.tpl.recover(self.high_rid_counter) print jrnl_dir = os.path.join(self.directory, JournalRecoveryManager.JRNL_DIR_NAME) - prepared_list = self.tpl.txn_map.get_prepared_list() + prepared_list = self.tpl.txn_map.get_prepared_list() if self.tpl is not None else {} if os.path.exists(jrnl_dir): for dir_entry in sorted(os.listdir(jrnl_dir)): - jrnl = Journal(os.path.join(jrnl_dir, dir_entry), prepared_list) + jrnl = Journal(os.path.join(jrnl_dir, dir_entry), prepared_list, self.args) jrnl.recover(self.high_rid_counter) self.journals[jrnl.get_queue_name()] = jrnl print self._reconcile_transactions(prepared_list, args.txn) def _reconcile_transactions(self, prepared_list, txn_flag): print 'Transaction reconciliation report:' + print '==================================' print len(prepared_list), 'open transaction(s) found in prepared transaction list:' for xid in prepared_list.keys(): commit_flag = prepared_list[xid] @@ -79,7 +82,7 @@ class JournalRecoveryManager(object): status = '[Prepared, but interrupted during abort phase]' print ' ', Utils.format_xid(xid), status if prepared_list[xid] is None: # Prepared, but not committed or aborted - enqueue_record = self.tpl.get_txn_map_record(xid) + enqueue_record = self.tpl.get_txn_map_record(xid)[0][1] dequeue_record = Utils.create_record('QLSd', DequeueRecord.TXN_COMPLETE_COMMIT_FLAG, \ self.tpl.current_journal_file, self.high_rid_counter.get_next(), \ enqueue_record.record_id, xid, None) @@ -141,7 +144,7 @@ class EnqueueMap(object): lock_str = '[LOCKED]' else: lock_str = '' - rstr += '\n %d:%s %s' % (journal_file.file_header.file_num, record, lock_str) + rstr += '\n 0x%x:%s %s' % (journal_file.file_header.file_num, record, lock_str) else: rstr += '.' return rstr @@ -245,7 +248,7 @@ class TransactionMap(object): for xid, op_list in self.txn_map.iteritems(): rstr += '\n %s containing %d operations:' % (Utils.format_xid(xid), len(op_list)) for journal_file, record, _ in op_list: - rstr += '\n %d:%s' % (journal_file.file_header.file_num, record) + rstr += '\n 0x%x:%s' % (journal_file.file_header.file_num, record) else: rstr += '.' return rstr @@ -303,7 +306,7 @@ class Journal(object): """ Instance of a Qpid Linear Store (QLS) journal. """ - def __init__(self, directory, xid_prepared_list): + def __init__(self, directory, xid_prepared_list, args): self.directory = directory self.queue_name = os.path.basename(directory) self.files = {} @@ -315,6 +318,10 @@ class Journal(object): self.first_rec_flag = None self.statistics = JournalStatistics() self.xid_prepared_list = xid_prepared_list # This is None for the TPL instance only + self.args = args + self.last_record_offset = None # TODO: Move into JournalFile + self.num_filler_records_required = None # TODO: Move into JournalFile + self.fill_to_offset = None def add_record(self, record): if isinstance(record, EnqueueRecord) or isinstance(record, DequeueRecord): if record.xid_size > 0: @@ -334,15 +341,18 @@ class Journal(object): def get_queue_name(self): return self.queue_name def recover(self, high_rid_counter): - print 'Recovering', self.queue_name #DEBUG + print 'Recovering %s' % self.queue_name self._analyze_files() try: while self._get_next_record(high_rid_counter): pass + self._check_alignment() except qls.err.NoMoreFilesInJournalError: - #print '[No more files in journal]' # DEBUG - #print #DEBUG - pass + print 'No more files in journal' + except qls.err.FirstRecordOffsetMismatchError as err: + print '0x%08x: **** FRO ERROR: queue=\"%s\" fid=0x%x fro actual=0x%08x expected=0x%08x' % \ + (err.get_expected_fro(), err.get_queue_name(), err.get_file_number(), err.get_record_offset(), + err.get_expected_fro()) def reconcile_transactions(self, prepared_list, txn_flag): xid_list = self.txn_map.get_xid_list() if len(xid_list) > 0: @@ -363,11 +373,12 @@ class Journal(object): if txn_flag: self.txn_map.abort(xid) else: - print ' ', Utils.format_xid(xid), '- Aborting, not in prepared transaction list' + print ' ', Utils.format_xid(xid), '- Ignoring, not in prepared transaction list' if txn_flag: self.txn_map.abort(xid) def report(self, print_stats_flag): print 'Journal "%s":' % self.queue_name + print '=' * (11 + len(self.queue_name)) if print_stats_flag: print str(self.statistics) print self.enq_map.report_str(True, True) @@ -375,6 +386,11 @@ class Journal(object): JournalFile.report_header() for file_num in sorted(self.files.keys()): self.files[file_num].report() + #TODO: move this to JournalFile, append to file info + if self.num_filler_records_required is not None and self.fill_to_offset is not None: + print '0x%x:0x%08x: %d filler records required for DBLK alignment to 0x%08x' % \ + (self.current_journal_file.file_header.file_num, self.last_record_offset, + self.num_filler_records_required, self.fill_to_offset) print #--- protected functions --- def _analyze_files(self): @@ -386,21 +402,35 @@ class Journal(object): args = Utils.load_args(file_handle, RecordHeader) file_hdr = FileHeader(*args) file_hdr.init(file_handle, *Utils.load_args(file_handle, FileHeader)) - if not file_hdr.is_header_valid(file_hdr): - break - file_hdr.load(file_handle) - if not file_hdr.is_valid(): - break - Utils.skip(file_handle, file_hdr.file_header_size_sblks * Utils.SBLK_SIZE) - self.files[file_hdr.file_num] = JournalFile(file_hdr) + if file_hdr.is_header_valid(file_hdr): + file_hdr.load(file_handle) + if file_hdr.is_valid(): + Utils.skip(file_handle, file_hdr.file_header_size_sblks * Utils.SBLK_SIZE) + self.files[file_hdr.file_num] = JournalFile(file_hdr) self.file_num_list = sorted(self.files.keys()) self.file_num_itr = iter(self.file_num_list) + def _check_alignment(self): # TODO: Move into JournalFile + remaining_sblks = self.last_record_offset % Utils.SBLK_SIZE + if remaining_sblks == 0: + self.num_filler_records_required = 0 + else: + self.num_filler_records_required = (Utils.SBLK_SIZE - remaining_sblks) / Utils.DBLK_SIZE + self.fill_to_offset = self.last_record_offset + (self.num_filler_records_required * Utils.DBLK_SIZE) + if self.args.show_recs or self.args.show_all_recs: + print '0x%x:0x%08x: %d filler records required for DBLK alignment to 0x%08x' % \ + (self.current_journal_file.file_header.file_num, self.last_record_offset, + self.num_filler_records_required, self.fill_to_offset) def _check_file(self): - if self.current_journal_file is not None and not self.current_journal_file.file_header.is_end_of_file(): - return - self._get_next_file() + if self.current_journal_file is not None: + if not self.current_journal_file.file_header.is_end_of_file(): + return True + if self.current_journal_file.file_header.is_end_of_file(): + self.last_record_offset = self.current_journal_file.file_header.file_handle.tell() + if not self._get_next_file(): + return False fhdr = self.current_journal_file.file_header fhdr.file_handle.seek(fhdr.first_record_offset) + return True def _get_next_file(self): if self.current_journal_file is not None: file_handle = self.current_journal_file.file_header.file_handle @@ -413,13 +443,16 @@ class Journal(object): except StopIteration: pass if file_num == 0: - raise qls.err.NoMoreFilesInJournalError(self.queue_name) + return False self.current_journal_file = self.files[file_num] self.first_rec_flag = True - print self.current_journal_file.file_header - #print '[file_num=0x%x]' % self.current_journal_file.file_num #DEBUG + if self.args.show_recs or self.args.show_all_recs: + print '0x%x:%s' % (self.current_journal_file.file_header.file_num, self.current_journal_file.file_header) + return True def _get_next_record(self, high_rid_counter): - self._check_file() + if not self._check_file(): + return False + self.last_record_offset = self.current_journal_file.file_header.file_handle.tell() this_record = Utils.load(self.current_journal_file.file_header.file_handle, RecordHeader) if not this_record.is_header_valid(self.current_journal_file.file_header): return False @@ -427,32 +460,42 @@ class Journal(object): if this_record.file_offset != self.current_journal_file.file_header.first_record_offset: raise qls.err.FirstRecordOffsetMismatchError(self.current_journal_file.file_header, this_record) self.first_rec_flag = False - high_rid_counter.check(this_record.record_id) self.statistics.total_record_count += 1 + start_journal_file = self.current_journal_file if isinstance(this_record, EnqueueRecord): - self._handle_enqueue_record(this_record) - print this_record + ok_flag = self._handle_enqueue_record(this_record, start_journal_file) + high_rid_counter.check(this_record.record_id) + if self.args.show_recs or self.args.show_all_recs: + print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record) elif isinstance(this_record, DequeueRecord): - self._handle_dequeue_record(this_record) - print this_record + ok_flag = self._handle_dequeue_record(this_record, start_journal_file) + high_rid_counter.check(this_record.record_id) + if self.args.show_recs or self.args.show_all_recs: + print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record) elif isinstance(this_record, TransactionRecord): - self._handle_transaction_record(this_record) - print this_record + ok_flag = self._handle_transaction_record(this_record, start_journal_file) + high_rid_counter.check(this_record.record_id) + if self.args.show_recs or self.args.show_all_recs: + print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record) else: self.statistics.filler_record_count += 1 + ok_flag = True + if self.args.show_all_recs: + print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record) Utils.skip(self.current_journal_file.file_header.file_handle, Utils.DBLK_SIZE) - return True - def _handle_enqueue_record(self, enqueue_record): - start_journal_file = self.current_journal_file + return ok_flag + def _handle_enqueue_record(self, enqueue_record, start_journal_file): while enqueue_record.load(self.current_journal_file.file_header.file_handle): - self._get_next_file() - if not enqueue_record.is_valid(self.current_journal_file): - return + if not self._get_next_file(): + enqueue_record.truncated_flag = True + return False + if not enqueue_record.is_valid(start_journal_file): + return False if enqueue_record.is_external() and enqueue_record.data != None: raise qls.err.ExternalDataError(self.current_journal_file.file_header, enqueue_record) if enqueue_record.is_transient(): self.statistics.transient_record_count += 1 - return + return True if enqueue_record.xid_size > 0: self.txn_map.add(start_journal_file, enqueue_record) self.statistics.transaction_operation_count += 1 @@ -462,13 +505,14 @@ class Journal(object): self.enq_map.add(start_journal_file, enqueue_record, False) start_journal_file.incr_enq_cnt() self.statistics.enqueue_count += 1 - #print enqueue_record, # DEBUG - def _handle_dequeue_record(self, dequeue_record): - start_journal_file = self.current_journal_file + return True + def _handle_dequeue_record(self, dequeue_record, start_journal_file): while dequeue_record.load(self.current_journal_file.file_header.file_handle): - self._get_next_file() - if not dequeue_record.is_valid(self.current_journal_file): - return + if not self._get_next_file(): + dequeue_record.truncated_flag = True + return False + if not dequeue_record.is_valid(start_journal_file): + return False if dequeue_record.xid_size > 0: if self.xid_prepared_list is None: # ie this is the TPL dequeue_record.transaction_prepared_list_flag = True @@ -484,12 +528,14 @@ class Journal(object): except qls.err.RecordIdNotFoundError: dequeue_record.warnings.append('NOT IN EMAP') self.statistics.dequeue_count += 1 - #print dequeue_record, # DEBUG - def _handle_transaction_record(self, transaction_record): + return True + def _handle_transaction_record(self, transaction_record, start_journal_file): while transaction_record.load(self.current_journal_file.file_header.file_handle): - self._get_next_file() - if not transaction_record.is_valid(self.current_journal_file): - return + if not self._get_next_file(): + transaction_record.truncated_flag = True + return False + if not transaction_record.is_valid(start_journal_file): + return False if transaction_record.magic[-1] == 'a': self.statistics.transaction_abort_count += 1 else: @@ -501,7 +547,7 @@ class Journal(object): # if transaction_record.magic[-1] == 'c': # commits only # self._txn_obj_list[hdr.xid] = hdr self.statistics.transaction_record_count += 1 - #print transaction_record, # DEBUG + return True def _load_data(self, record): while not record.is_complete: record.load(self.current_journal_file.file_handle) @@ -511,6 +557,7 @@ class JournalFile(object): self.file_header = file_header self.enq_cnt = 0 self.deq_cnt = 0 + self.num_filler_records_required = None def incr_enq_cnt(self): self.enq_cnt += 1 def decr_enq_cnt(self, record): @@ -527,7 +574,8 @@ class JournalFile(object): print '-------- ------- ---- ----- ------------' def report(self): comment = '<uninitialized>' if self.file_header.file_num == 0 else '' - print '%8d %7d %4d %4dk %s %s' % (self.file_header.file_num, self.get_enq_cnt(), self.file_header.partition_num, + file_num_str = '0x%x' % self.file_header.file_num + print '%8s %7d %4d %4dk %s %s' % (file_num_str, self.get_enq_cnt(), self.file_header.partition_num, self.file_header.efp_data_size_kb, os.path.basename(self.file_header.file_handle.name), comment) @@ -541,6 +589,9 @@ class RecordHeader(object): self.serial = serial self.record_id = record_id self.warnings = [] + self.truncated_flag = False + def checksum_encode(self): + return struct.pack(RecordHeader.FORMAT, self.magic, self.version, self.user_flags, self.serial, self.record_id) def load(self, file_handle): pass @staticmethod @@ -560,8 +611,6 @@ class RecordHeader(object): if self.version != Utils.RECORD_VERSION: raise qls.err.InvalidRecordVersionError(file_header, self, Utils.RECORD_VERSION) if self.serial != file_header.serial: - #print '[serial mismatch at 0x%x]' % self.file_offset #DEBUG - #print #DEBUG return False return True def _get_warnings(self): @@ -606,8 +655,8 @@ class RecordTail(object): return False self.valid_flag = Utils.inv_str(self.xmagic) == record.magic and \ self.serial == record.serial and \ - self.record_id == record.record_id - # TODO: When we can verify the checksum, add this here + self.record_id == record.record_id and \ + Utils.adler32(record.checksum_encode()) == self.checksum return self.valid_flag def __str__(self): """Return a string representation of the this RecordTail instance""" @@ -660,7 +709,7 @@ class FileHeader(RecordHeader): return strftime(fstr, time) def __str__(self): """Return a string representation of the this FileHeader instance""" - return '%s fnum=%d fro=0x%08x p=%d s=%dk t=%s %s' % (RecordHeader.__str__(self), self.file_num, + return '%s fnum=0x%x fro=0x%08x p=%d s=%dk t=%s %s' % (RecordHeader.__str__(self), self.file_num, self.first_record_offset, self.partition_num, self.efp_data_size_kb, self.timestamp_str(), self._get_warnings()) @@ -677,6 +726,9 @@ class EnqueueRecord(RecordHeader): self.data = None self.data_complete = False self.record_tail = None + def checksum_encode(self): + return RecordHeader.checksum_encode(self) + struct.pack(self.FORMAT, self.xid_size, self.data_size) + \ + self.xid + self.data def is_external(self): return self.user_flags & EnqueueRecord.EXTERNAL_FLAG_MASK > 0 def is_transient(self): @@ -732,6 +784,9 @@ class EnqueueRecord(RecordHeader): return fstr def __str__(self): """Return a string representation of the this EnqueueRecord instance""" + if self.truncated_flag: + return '%s xid(%d) data(%d) [Truncated, no more files in journal]' % (RecordHeader.__str__(self), + self.xid_size, self.data_size) if self.record_tail is None: record_tail_str = '' else: @@ -750,6 +805,9 @@ class DequeueRecord(RecordHeader): self.xid = None self.xid_complete = False self.record_tail = None + def checksum_encode(self): + return RecordHeader.checksum_encode(self) + struct.pack(self.FORMAT, self.dequeue_record_id, self.xid_size) + \ + self.xid def is_transaction_complete_commit(self): return self.user_flags & DequeueRecord.TXN_COMPLETE_COMMIT_FLAG > 0 def is_valid(self, journal_file): @@ -790,6 +848,10 @@ class DequeueRecord(RecordHeader): return '' def __str__(self): """Return a string representation of the this DequeueRecord instance""" + if self.truncated_flag: + return '%s xid(%d) drid=0x%x [Truncated, no more files in journal]' % (RecordHeader.__str__(self), + self.xid_size, + self.dequeue_record_id) if self.record_tail is None: record_tail_str = '' else: @@ -805,6 +867,8 @@ class TransactionRecord(RecordHeader): self.xid = None self.xid_complete = False self.record_tail = None + def checksum_encode(self): + return RecordHeader.checksum_encode(self) + struct.pack(self.FORMAT, self.xid_size) + self.xid def is_valid(self, journal_file): if not RecordHeader.is_header_valid(self, journal_file.file_header): return False @@ -832,6 +896,8 @@ class TransactionRecord(RecordHeader): return False def __str__(self): """Return a string representation of the this TransactionRecord instance""" + if self.truncated_flag: + return '%s xid(%d) [Truncated, no more files in journal]' % (RecordHeader.__str__(self), self.xid_size) if self.record_tail is None: record_tail_str = '' else: @@ -845,6 +911,9 @@ class Utils(object): RECORD_VERSION = 2 SBLK_SIZE = 4096 @staticmethod + def adler32(data): + return zlib.adler32(data) & 0xffffffff + @staticmethod def create_record(magic, uflags, journal_file, record_id, dequeue_record_id, xid, data): record_class = _CLASSES.get(magic[-1]) record = record_class(0, magic, Utils.RECORD_VERSION, uflags, journal_file.file_header.serial, record_id) @@ -866,7 +935,7 @@ class Utils(object): record.data_complete = True record.record_tail = RecordTail(None) record.record_tail.xmagic = Utils.inv_str(magic) - record.record_tail.checksum = 0 # TODO: when we can calculate checksums, add this here + record.record_tail.checksum = Utils.adler32(record.checksum_encode()) record.record_tail.serial = record.serial record.record_tail.record_id = record.record_id return record @@ -878,7 +947,7 @@ class Utils(object): # << DEBUG >> begin = data.find('msg') end = data.find('\0', begin) - return 'data="%s"' % data[begin:end] + return 'data(%d)="%s"' % (dsize, data[begin:end]) # << END DEBUG if Utils._is_printable(data): datastr = Utils._split_str(data) @@ -941,7 +1010,8 @@ class Utils(object): @staticmethod def skip(file_handle, boundary): """Read and discard disk bytes until the next multiple of boundary""" - file_handle.read(Utils._rem_bytes_in_block(file_handle, boundary)) + if not file_handle.closed: + file_handle.read(Utils._rem_bytes_in_block(file_handle, boundary)) #--- protected functions --- @staticmethod def _hex_str(in_str, begin, end): diff --git a/qpid/tools/src/py/qpid_qls_analyze.py b/qpid/tools/src/py/qpid_qls_analyze.py index 165d41fe95..a540587547 100755 --- a/qpid/tools/src/py/qpid_qls_analyze.py +++ b/qpid/tools/src/py/qpid_qls_analyze.py @@ -37,8 +37,8 @@ class QqpdLinearStoreAnalyzer(object): self.args = None self._process_args() self.qls_dir = os.path.abspath(self.args.qls_dir) - self.efp_manager = efp.EfpManager(self.qls_dir) - self.jrnl_recovery_mgr = jrnl.JournalRecoveryManager(self.qls_dir) + self.efp_manager = efp.EfpManager(self.qls_dir, self.args) + self.jrnl_recovery_mgr = jrnl.JournalRecoveryManager(self.qls_dir, self.args) def _analyze_efp(self): self.efp_manager.run(self.args) def _analyze_journals(self): @@ -49,6 +49,10 @@ class QqpdLinearStoreAnalyzer(object): help='Qpid Linear Store (QLS) directory to be analyzed') parser.add_argument('--efp', action='store_true', help='Analyze the Emtpy File Pool (EFP) and show stats') + parser.add_argument('--show-recs', action='store_true', + help='Show material records found during recovery') + parser.add_argument('--show-all-recs', action='store_true', + help='Show all records (including fillers) found during recovery') parser.add_argument('--stats', action='store_true', help='Print journal record stats') parser.add_argument('--txn', action='store_true', |