diff options
author | Kim van der Riet <kpvdr@apache.org> | 2015-09-25 14:16:41 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2015-09-25 14:16:41 +0000 |
commit | 541f2bb32a936033f959cd1891c902cc6219a77a (patch) | |
tree | 4a5589dab35e9c45e575e523c972c69eed565da9 | |
parent | 18699a7ba7680b4a614f09fd96310bd96f0bbf68 (diff) | |
download | qpid-python-541f2bb32a936033f959cd1891c902cc6219a77a.tar.gz |
QPID-6765 [linearstore] Add qpid-txtest mode to qpid-qls-analyze which extracts message number from message body
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1705310 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/tools/src/py/qlslibs/analyze.py | 71 | ||||
-rw-r--r-- | qpid/tools/src/py/qlslibs/jrnl.py | 26 | ||||
-rw-r--r-- | qpid/tools/src/py/qlslibs/utils.py | 30 | ||||
-rwxr-xr-x | qpid/tools/src/py/qpid-qls-analyze | 12 |
4 files changed, 72 insertions, 67 deletions
diff --git a/qpid/tools/src/py/qlslibs/analyze.py b/qpid/tools/src/py/qlslibs/analyze.py index a67e17e426..8c5de05b9e 100644 --- a/qpid/tools/src/py/qlslibs/analyze.py +++ b/qpid/tools/src/py/qlslibs/analyze.py @@ -52,12 +52,12 @@ class JournalRecoveryManager(object): self.journals = {} self.high_rid_counter = HighCounter() self.prepared_list = None - def report(self, print_stats_flag): + def report(self): self._reconcile_transactions(self.prepared_list, self.args.txn) if self.tpl is not None: - self.tpl.report(print_stats_flag, self.args.show_recovered_recs) + self.tpl.report(self.args) for queue_name in sorted(self.journals.keys()): - self.journals[queue_name].report(print_stats_flag, self.args.show_recovered_recs) + self.journals[queue_name].report(self.args) def run(self): tpl_dir = os.path.join(self.directory, JournalRecoveryManager.TPL_DIR_NAME) if os.path.exists(tpl_dir): @@ -78,7 +78,7 @@ class JournalRecoveryManager(object): def _reconcile_transactions(self, prepared_list, txn_flag): print 'Transaction reconciliation report:' print '==================================' - print len(prepared_list), 'open transaction(s) found in prepared transaction list:' + print 'Transaction Prepared List (TPL) contains %d open transaction(s):' % len(prepared_list) for xid in prepared_list.keys(): commit_flag = prepared_list[xid] if commit_flag is None: @@ -97,18 +97,22 @@ class JournalRecoveryManager(object): enqueue_record.record_id, xid, None) if txn_flag: self.tpl.add_record(dequeue_record) + print + print 'Open transactions found in queues:' + print '----------------------------------' for queue_name in sorted(self.journals.keys()): self.journals[queue_name].reconcile_transactions(prepared_list, txn_flag) - if len(prepared_list) > 0: - print 'Completing prepared transactions in prepared transaction list:' - for xid in prepared_list.keys(): - print ' ', qlslibs.utils.format_xid(xid) - transaction_record = qlslibs.utils.create_record(qlslibs.jrnl.TransactionRecord.MAGIC_COMMIT, 0, \ - self.tpl.current_journal_file, \ - self.high_rid_counter.get_next(), None, xid, None) - if txn_flag: - self.tpl.add_record(transaction_record) print + if len(prepared_list) > 0: + print 'Creating commit records for the following prepared transactions in TPL:' + for xid in prepared_list.keys(): + print ' ', qlslibs.utils.format_xid(xid) + transaction_record = qlslibs.utils.create_record(qlslibs.jrnl.TransactionRecord.MAGIC_COMMIT, 0, \ + self.tpl.current_journal_file, \ + self.high_rid_counter.get_next(), None, xid, None) + if txn_flag: + self.tpl.add_record(transaction_record) + print class EnqueueMap(object): """ @@ -139,22 +143,21 @@ class EnqueueMap(object): if dequeue_record.dequeue_record_id not in self.enq_map: raise qlslibs.err.RecordIdNotFoundError(journal_file.file_header, dequeue_record) self.enq_map[dequeue_record.dequeue_record_id][2] = True - def report_str(self, _, show_records): + def report_str(self, args): """Return a string containing a text report for all records in the map""" if len(self.enq_map) == 0: return 'No enqueued records found.' rstr = '%d enqueued records found' % len(self.enq_map) - if show_records: + if args.show_recovered_recs: rstr += ":" rid_list = self.enq_map.keys() rid_list.sort() for rid in rid_list: journal_file, record, locked_flag = self.enq_map[rid] + rstr += '\n 0x%x:' % journal_file.file_header.file_num + rstr += record.to_string(args.show_xids, args.show_data, args.txtest) if locked_flag: - lock_str = '[LOCKED]' - else: - lock_str = '' - rstr += '\n 0x%x:%s %s' % (journal_file.file_header.file_num, record, lock_str) + rstr += ' [LOCKED]' else: rstr += '.' return rstr @@ -248,17 +251,18 @@ class TransactionMap(object): return prepared_list def get_xid_list(self): return self.txn_map.keys() - def report_str(self, _, show_records): + def report_str(self, args): """Return a string containing a text report for all records in the map""" if len(self.txn_map) == 0: return 'No outstanding transactions found.' rstr = '%d outstanding transaction(s)' % len(self.txn_map) - if show_records: + if args.show_recovered_recs: rstr += ':' for xid, op_list in self.txn_map.iteritems(): rstr += '\n %s containing %d operations:' % (qlslibs.utils.format_xid(xid), len(op_list)) for journal_file, record, _ in op_list: - rstr += '\n 0x%x:%s' % (journal_file.file_header.file_num, record) + rstr += '\n 0x%x:' % journal_file.file_header.file_num + rstr += record.to_string(args.show_xids, args.show_data, args.txtest) else: rstr += '.' return rstr @@ -334,6 +338,7 @@ class Journal(object): self.num_filler_records_required = None # TODO: Move into JournalFile self.fill_to_offset = None def add_record(self, record): + """Used for reconciling transactions only - called from JournalRecoveryManager._reconcile_transactions()""" if isinstance(record, qlslibs.jrnl.EnqueueRecord) or isinstance(record, qlslibs.jrnl.DequeueRecord): if record.xid_size > 0: self.txn_map.add(self.current_journal_file, record) @@ -385,16 +390,16 @@ class Journal(object): if txn_flag: self.txn_map.abort(xid) else: - print ' ', qlslibs.utils.format_xid(xid), '- Ignoring, not in prepared transaction list' + print ' ', qlslibs.utils.format_xid(xid), '- Ignoring, not in prepared transaction list' if txn_flag: self.txn_map.abort(xid) - def report(self, print_stats_flag, show_recovered_records): + def report(self, args): print 'Journal "%s":' % self.queue_name print '=' * (11 + len(self.queue_name)) - if print_stats_flag: + if args.stats: print str(self.statistics) - print self.enq_map.report_str(True, show_recovered_records) - print self.txn_map.report_str(True, show_recovered_records) + print self.enq_map.report_str(args) + print self.txn_map.report_str(args) JournalFile.report_header() for file_num in sorted(self.files.keys()): self.files[file_num].report() @@ -485,17 +490,17 @@ class Journal(object): high_rid_counter.check(this_record.record_id) if self.args.show_recovery_recs or self.args.show_all_recs: print '0x%x:%s' % (start_journal_file.file_header.file_num, \ - this_record.to_string(self.args.show_xids, self.args.show_data)) + this_record.to_string(self.args.show_xids, self.args.show_data, self.args.txtest)) elif isinstance(this_record, qlslibs.jrnl.DequeueRecord): ok_flag = self._handle_dequeue_record(this_record, start_journal_file) high_rid_counter.check(this_record.record_id) if self.args.show_recovery_recs or self.args.show_all_recs: - print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record.to_string(self.args.show_xids)) + print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record.to_string(self.args.show_xids, None, None)) elif isinstance(this_record, qlslibs.jrnl.TransactionRecord): ok_flag = self._handle_transaction_record(this_record, start_journal_file) high_rid_counter.check(this_record.record_id) if self.args.show_recovery_recs or self.args.show_all_recs: - print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record.to_string(self.args.show_xids)) + print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record.to_string(self.args.show_xids, None, None)) else: self.statistics.filler_record_count += 1 ok_flag = True @@ -555,10 +560,12 @@ class Journal(object): return False if not transaction_record.is_valid(start_journal_file): return False - if transaction_record.magic[-1] == 'a': + if transaction_record.magic[-1] == 'a': # Abort self.statistics.transaction_abort_count += 1 - else: + elif transaction_record.magic[-1] == 'c': # Commit self.statistics.transaction_commit_count += 1 + else: + raise InvalidRecordTypeError('Unknown transaction record magic \'%s\'' % transaction_record.magic) if self.txn_map.contains(transaction_record.xid): self.txn_map.delete(self.current_journal_file, transaction_record) else: diff --git a/qpid/tools/src/py/qlslibs/jrnl.py b/qpid/tools/src/py/qlslibs/jrnl.py index ee25015220..5e65890393 100644 --- a/qpid/tools/src/py/qlslibs/jrnl.py +++ b/qpid/tools/src/py/qlslibs/jrnl.py @@ -78,9 +78,6 @@ class RecordHeader(object): for warn in self.warnings: warn_str += '<%s>' % warn return warn_str - def __str__(self): - """Return string representation of this header""" - return self.to_rh_string() class RecordTail(object): FORMAT = '<4sL2Q' @@ -119,9 +116,6 @@ class RecordTail(object): magic = qlslibs.utils.inv_str(self.xmagic) magic_char = magic[-1].upper() if magic[-1] in string.printable else '?' return '[%c cs=0x%08x rid=0x%x]' % (magic_char, self.checksum, self.record_id) - def __str__(self): - """Return a string representation of the this RecordTail instance""" - return self.to_string() class FileHeader(RecordHeader): FORMAT = '<2H4x5QH' @@ -185,9 +179,6 @@ class FileHeader(RecordHeader): self.first_record_offset, self.partition_num, self.efp_data_size_kb, self.timestamp_str(), self._get_warnings()) - def __str__(self): - """Return a string representation of the this FileHeader instance""" - return self.to_string() class EnqueueRecord(RecordHeader): FORMAT = '<2Q' @@ -249,7 +240,7 @@ class EnqueueRecord(RecordHeader): else: return True return False - def to_string(self, show_xid_flag, show_data_flag): + def to_string(self, show_xid_flag, show_data_flag, txtest_flag): """Return a string representation of the this EnqueueRecord instance""" if self.truncated_flag: return '%s xid(%d) data(%d) [Truncated, no more files in journal]' % (RecordHeader.__str__(self), @@ -260,7 +251,7 @@ class EnqueueRecord(RecordHeader): record_tail_str = self.record_tail.to_string() return '%s %s %s %s %s %s' % (self.to_rh_string(), qlslibs.utils.format_xid(self.xid, self.xid_size, show_xid_flag), - qlslibs.utils.format_data(self.data, self.data_size, show_data_flag), + qlslibs.utils.format_data(self.data, self.data_size, show_data_flag, txtest_flag), record_tail_str, self._print_flags(), self._get_warnings()) def _print_flags(self): """Utility function to decode the flags field in the header and print a string representation""" @@ -275,9 +266,6 @@ class EnqueueRecord(RecordHeader): if len(fstr) > 0: fstr += ']' return fstr - def __str__(self): - """Return a string representation of the this EnqueueRecord instance""" - return self.to_string(False, False) class DequeueRecord(RecordHeader): FORMAT = '<2Q' @@ -323,7 +311,7 @@ class DequeueRecord(RecordHeader): else: return True return False - def to_string(self, show_xid_flag): + def to_string(self, show_xid_flag, _u1, _u2): """Return a string representation of the this DequeueRecord instance""" if self.truncated_flag: return '%s xid(%d) drid=0x%x [Truncated, no more files in journal]' % (RecordHeader.__str__(self), @@ -344,9 +332,6 @@ class DequeueRecord(RecordHeader): else: return '[ABORT]' return '' - def __str__(self): - """Return a string representation of the this DequeueRecord instance""" - return self.to_string(False) class TransactionRecord(RecordHeader): FORMAT = '<Q' @@ -384,7 +369,7 @@ class TransactionRecord(RecordHeader): else: return True return False - def to_string(self, show_xid_flag): + def to_string(self, show_xid_flag, _u1, _u2): """Return a string representation of the this TransactionRecord instance""" if self.truncated_flag: return '%s xid(%d) [Truncated, no more files in journal]' % (RecordHeader.__str__(self), self.xid_size) @@ -395,9 +380,6 @@ class TransactionRecord(RecordHeader): return '%s %s %s %s' % (self.to_rh_string(), qlslibs.utils.format_xid(self.xid, self.xid_size, show_xid_flag), record_tail_str, self._get_warnings()) - def __str__(self): - """Return a string representation of the this TransactionRecord instance""" - return self.to_string(False) # ============================================================================= diff --git a/qpid/tools/src/py/qlslibs/utils.py b/qpid/tools/src/py/qlslibs/utils.py index c32f8c7abb..dfa760a839 100644 --- a/qpid/tools/src/py/qlslibs/utils.py +++ b/qpid/tools/src/py/qlslibs/utils.py @@ -73,13 +73,13 @@ def efp_directory_size(directory_name): pass return 0 -def format_data(data, data_size=None, show_data_flag=True): +def format_data(data, data_size=None, show_data_flag=True, txtest_flag=False): """Format binary data for printing""" - return _format_binary(data, data_size, show_data_flag, 'data', qlslibs.err.DataSizeError, False) + return _format_binary(data, data_size, show_data_flag, 'data', qlslibs.err.DataSizeError, False, txtest_flag) def format_xid(xid, xid_size=None, show_xid_flag=True): """Format binary XID for printing""" - return _format_binary(xid, xid_size, show_xid_flag, 'xid', qlslibs.err.XidSizeError, True) + return _format_binary(xid, xid_size, show_xid_flag, 'xid', qlslibs.err.XidSizeError, True, False) def get_avail_disk_space(path): df_proc = subprocess.Popen(["df", path], stdout=subprocess.PIPE) @@ -133,7 +133,7 @@ def skip(file_handle, boundary): #--- protected functions --- -def _format_binary(bin_str, bin_size, show_bin_flag, prefix, err_class, hex_num_flag): +def _format_binary(bin_str, bin_size, show_bin_flag, prefix, err_class, hex_num_flag, txtest_flag): """Format binary XID for printing""" if bin_str is None and bin_size is not None: if bin_size > 0: @@ -144,14 +144,16 @@ def _format_binary(bin_str, bin_size, show_bin_flag, prefix, err_class, hex_num_ elif bin_size != len(bin_str): raise err_class(bin_size, len(bin_str), bin_str) out_str = '%s(%d)' % (prefix, bin_size) - if show_bin_flag: + if txtest_flag: + out_str += '=\'%s\'' % _txtest_msg_str(bin_str) + elif show_bin_flag: if _is_printable(bin_str): binstr = '"%s"' % _split_str(bin_str) elif hex_num_flag: binstr = '0x%s' % _str_to_hex_num(bin_str) else: - binstr = _hex_split_str(bin_str) - out_str += '=%s' % binstr + binstr = _hex_split_str(bin_str, 50, 10, 10) + out_str += '=\'%s\'' % binstr return out_str def _hex_str(in_str, begin, end): @@ -164,12 +166,20 @@ def _hex_str(in_str, begin, end): hstr += '\\%02x' % ord(in_str[index]) return hstr -def _hex_split_str(in_str, split_size = 50): +def _hex_split_str(in_str, split_size, head_size, tail_size): """Split a hex string into two parts separated by an ellipsis""" if len(in_str) <= split_size: return _hex_str(in_str, 0, len(in_str)) - return _hex_str(in_str, 0, 10) + ' ... ' + _hex_str(in_str, len(in_str)-10, len(in_str)) - #return ''.join(x.encode('hex') for x in reversed(in_str)) + return _hex_str(in_str, 0, head_size) + ' ... ' + _hex_str(in_str, len(in_str)-tail_size, len(in_str)) + +def _txtest_msg_str(bin_str): + """Extract the message number used in qpid-txtest""" + msg_index = bin_str.find('msg') + if msg_index >= 0: + end_index = bin_str.find('\x00', msg_index) + assert end_index >= 0 + return bin_str[msg_index:end_index] + return None def _is_printable(in_str): """Return True if in_str in printable; False otherwise.""" diff --git a/qpid/tools/src/py/qpid-qls-analyze b/qpid/tools/src/py/qpid-qls-analyze index 35b370f4a3..7fbf6b1bb2 100755 --- a/qpid/tools/src/py/qpid-qls-analyze +++ b/qpid/tools/src/py/qpid-qls-analyze @@ -52,11 +52,17 @@ class QlsAnalyzerArgParser(argparse.ArgumentParser): self.add_argument('--show-all-recs', action='store_true', help='Show all records (including fillers) found during recovery') self.add_argument('--show-xids', action='store_true', - help='Show xid as hex number, otherwise show only xid length') + help='Show xid as hex number, otherwise show only xid length. Only has effect when records are shown') +# TODO: Add ability to show xid as an index rather than a value, helps analysis when xid is a long value with +# small differences which cannot easily be seen when looking at an output. Also prints a table of indeces vs xid values. +# self.add_argument('--show-xid-index', action='store_true', +# help='Show xids by index rather than by their value. Useful for long xids. Prints xid index table') self.add_argument('--show-data', action='store_true', - help='Show data, otherwise show only data length') + help='Show data, otherwise show only data length. Only has effect when records are shown') self.add_argument('--stats', action='store_true', help='Print journal record stats') + self.add_argument('--txtest', action='store_true', + help='Show qpid-txtest message number as the message content when viewing records. Only has effect when records are shown') self.add_argument('--txn', action='store_true', help='Reconcile incomplete transactions') self.add_argument('--version', action='version', @@ -91,7 +97,7 @@ class QqpdLinearStoreAnalyzer(object): """ Create a report on the linear store previously analyzed using analyze() """ if self.args.efp: self.efp_manager.report() - self.jrnl_recovery_mgr.report(self.args.stats) + self.jrnl_recovery_mgr.report() def run(self): """ Run the analyzer, which reads and analyzes the linear store """ if self.args.efp: |