summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2015-09-25 14:16:41 +0000
committerKim van der Riet <kpvdr@apache.org>2015-09-25 14:16:41 +0000
commit541f2bb32a936033f959cd1891c902cc6219a77a (patch)
tree4a5589dab35e9c45e575e523c972c69eed565da9
parent18699a7ba7680b4a614f09fd96310bd96f0bbf68 (diff)
downloadqpid-python-541f2bb32a936033f959cd1891c902cc6219a77a.tar.gz
QPID-6765 [linearstore] Add qpid-txtest mode to qpid-qls-analyze which extracts message number from message body
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1705310 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/tools/src/py/qlslibs/analyze.py71
-rw-r--r--qpid/tools/src/py/qlslibs/jrnl.py26
-rw-r--r--qpid/tools/src/py/qlslibs/utils.py30
-rwxr-xr-xqpid/tools/src/py/qpid-qls-analyze12
4 files changed, 72 insertions, 67 deletions
diff --git a/qpid/tools/src/py/qlslibs/analyze.py b/qpid/tools/src/py/qlslibs/analyze.py
index a67e17e426..8c5de05b9e 100644
--- a/qpid/tools/src/py/qlslibs/analyze.py
+++ b/qpid/tools/src/py/qlslibs/analyze.py
@@ -52,12 +52,12 @@ class JournalRecoveryManager(object):
self.journals = {}
self.high_rid_counter = HighCounter()
self.prepared_list = None
- def report(self, print_stats_flag):
+ def report(self):
self._reconcile_transactions(self.prepared_list, self.args.txn)
if self.tpl is not None:
- self.tpl.report(print_stats_flag, self.args.show_recovered_recs)
+ self.tpl.report(self.args)
for queue_name in sorted(self.journals.keys()):
- self.journals[queue_name].report(print_stats_flag, self.args.show_recovered_recs)
+ self.journals[queue_name].report(self.args)
def run(self):
tpl_dir = os.path.join(self.directory, JournalRecoveryManager.TPL_DIR_NAME)
if os.path.exists(tpl_dir):
@@ -78,7 +78,7 @@ class JournalRecoveryManager(object):
def _reconcile_transactions(self, prepared_list, txn_flag):
print 'Transaction reconciliation report:'
print '=================================='
- print len(prepared_list), 'open transaction(s) found in prepared transaction list:'
+ print 'Transaction Prepared List (TPL) contains %d open transaction(s):' % len(prepared_list)
for xid in prepared_list.keys():
commit_flag = prepared_list[xid]
if commit_flag is None:
@@ -97,18 +97,22 @@ class JournalRecoveryManager(object):
enqueue_record.record_id, xid, None)
if txn_flag:
self.tpl.add_record(dequeue_record)
+ print
+ print 'Open transactions found in queues:'
+ print '----------------------------------'
for queue_name in sorted(self.journals.keys()):
self.journals[queue_name].reconcile_transactions(prepared_list, txn_flag)
- if len(prepared_list) > 0:
- print 'Completing prepared transactions in prepared transaction list:'
- for xid in prepared_list.keys():
- print ' ', qlslibs.utils.format_xid(xid)
- transaction_record = qlslibs.utils.create_record(qlslibs.jrnl.TransactionRecord.MAGIC_COMMIT, 0, \
- self.tpl.current_journal_file, \
- self.high_rid_counter.get_next(), None, xid, None)
- if txn_flag:
- self.tpl.add_record(transaction_record)
print
+ if len(prepared_list) > 0:
+ print 'Creating commit records for the following prepared transactions in TPL:'
+ for xid in prepared_list.keys():
+ print ' ', qlslibs.utils.format_xid(xid)
+ transaction_record = qlslibs.utils.create_record(qlslibs.jrnl.TransactionRecord.MAGIC_COMMIT, 0, \
+ self.tpl.current_journal_file, \
+ self.high_rid_counter.get_next(), None, xid, None)
+ if txn_flag:
+ self.tpl.add_record(transaction_record)
+ print
class EnqueueMap(object):
"""
@@ -139,22 +143,21 @@ class EnqueueMap(object):
if dequeue_record.dequeue_record_id not in self.enq_map:
raise qlslibs.err.RecordIdNotFoundError(journal_file.file_header, dequeue_record)
self.enq_map[dequeue_record.dequeue_record_id][2] = True
- def report_str(self, _, show_records):
+ def report_str(self, args):
"""Return a string containing a text report for all records in the map"""
if len(self.enq_map) == 0:
return 'No enqueued records found.'
rstr = '%d enqueued records found' % len(self.enq_map)
- if show_records:
+ if args.show_recovered_recs:
rstr += ":"
rid_list = self.enq_map.keys()
rid_list.sort()
for rid in rid_list:
journal_file, record, locked_flag = self.enq_map[rid]
+ rstr += '\n 0x%x:' % journal_file.file_header.file_num
+ rstr += record.to_string(args.show_xids, args.show_data, args.txtest)
if locked_flag:
- lock_str = '[LOCKED]'
- else:
- lock_str = ''
- rstr += '\n 0x%x:%s %s' % (journal_file.file_header.file_num, record, lock_str)
+ rstr += ' [LOCKED]'
else:
rstr += '.'
return rstr
@@ -248,17 +251,18 @@ class TransactionMap(object):
return prepared_list
def get_xid_list(self):
return self.txn_map.keys()
- def report_str(self, _, show_records):
+ def report_str(self, args):
"""Return a string containing a text report for all records in the map"""
if len(self.txn_map) == 0:
return 'No outstanding transactions found.'
rstr = '%d outstanding transaction(s)' % len(self.txn_map)
- if show_records:
+ if args.show_recovered_recs:
rstr += ':'
for xid, op_list in self.txn_map.iteritems():
rstr += '\n %s containing %d operations:' % (qlslibs.utils.format_xid(xid), len(op_list))
for journal_file, record, _ in op_list:
- rstr += '\n 0x%x:%s' % (journal_file.file_header.file_num, record)
+ rstr += '\n 0x%x:' % journal_file.file_header.file_num
+ rstr += record.to_string(args.show_xids, args.show_data, args.txtest)
else:
rstr += '.'
return rstr
@@ -334,6 +338,7 @@ class Journal(object):
self.num_filler_records_required = None # TODO: Move into JournalFile
self.fill_to_offset = None
def add_record(self, record):
+ """Used for reconciling transactions only - called from JournalRecoveryManager._reconcile_transactions()"""
if isinstance(record, qlslibs.jrnl.EnqueueRecord) or isinstance(record, qlslibs.jrnl.DequeueRecord):
if record.xid_size > 0:
self.txn_map.add(self.current_journal_file, record)
@@ -385,16 +390,16 @@ class Journal(object):
if txn_flag:
self.txn_map.abort(xid)
else:
- print ' ', qlslibs.utils.format_xid(xid), '- Ignoring, not in prepared transaction list'
+ print ' ', qlslibs.utils.format_xid(xid), '- Ignoring, not in prepared transaction list'
if txn_flag:
self.txn_map.abort(xid)
- def report(self, print_stats_flag, show_recovered_records):
+ def report(self, args):
print 'Journal "%s":' % self.queue_name
print '=' * (11 + len(self.queue_name))
- if print_stats_flag:
+ if args.stats:
print str(self.statistics)
- print self.enq_map.report_str(True, show_recovered_records)
- print self.txn_map.report_str(True, show_recovered_records)
+ print self.enq_map.report_str(args)
+ print self.txn_map.report_str(args)
JournalFile.report_header()
for file_num in sorted(self.files.keys()):
self.files[file_num].report()
@@ -485,17 +490,17 @@ class Journal(object):
high_rid_counter.check(this_record.record_id)
if self.args.show_recovery_recs or self.args.show_all_recs:
print '0x%x:%s' % (start_journal_file.file_header.file_num, \
- this_record.to_string(self.args.show_xids, self.args.show_data))
+ this_record.to_string(self.args.show_xids, self.args.show_data, self.args.txtest))
elif isinstance(this_record, qlslibs.jrnl.DequeueRecord):
ok_flag = self._handle_dequeue_record(this_record, start_journal_file)
high_rid_counter.check(this_record.record_id)
if self.args.show_recovery_recs or self.args.show_all_recs:
- print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record.to_string(self.args.show_xids))
+ print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record.to_string(self.args.show_xids, None, None))
elif isinstance(this_record, qlslibs.jrnl.TransactionRecord):
ok_flag = self._handle_transaction_record(this_record, start_journal_file)
high_rid_counter.check(this_record.record_id)
if self.args.show_recovery_recs or self.args.show_all_recs:
- print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record.to_string(self.args.show_xids))
+ print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record.to_string(self.args.show_xids, None, None))
else:
self.statistics.filler_record_count += 1
ok_flag = True
@@ -555,10 +560,12 @@ class Journal(object):
return False
if not transaction_record.is_valid(start_journal_file):
return False
- if transaction_record.magic[-1] == 'a':
+ if transaction_record.magic[-1] == 'a': # Abort
self.statistics.transaction_abort_count += 1
- else:
+ elif transaction_record.magic[-1] == 'c': # Commit
self.statistics.transaction_commit_count += 1
+ else:
+ raise InvalidRecordTypeError('Unknown transaction record magic \'%s\'' % transaction_record.magic)
if self.txn_map.contains(transaction_record.xid):
self.txn_map.delete(self.current_journal_file, transaction_record)
else:
diff --git a/qpid/tools/src/py/qlslibs/jrnl.py b/qpid/tools/src/py/qlslibs/jrnl.py
index ee25015220..5e65890393 100644
--- a/qpid/tools/src/py/qlslibs/jrnl.py
+++ b/qpid/tools/src/py/qlslibs/jrnl.py
@@ -78,9 +78,6 @@ class RecordHeader(object):
for warn in self.warnings:
warn_str += '<%s>' % warn
return warn_str
- def __str__(self):
- """Return string representation of this header"""
- return self.to_rh_string()
class RecordTail(object):
FORMAT = '<4sL2Q'
@@ -119,9 +116,6 @@ class RecordTail(object):
magic = qlslibs.utils.inv_str(self.xmagic)
magic_char = magic[-1].upper() if magic[-1] in string.printable else '?'
return '[%c cs=0x%08x rid=0x%x]' % (magic_char, self.checksum, self.record_id)
- def __str__(self):
- """Return a string representation of the this RecordTail instance"""
- return self.to_string()
class FileHeader(RecordHeader):
FORMAT = '<2H4x5QH'
@@ -185,9 +179,6 @@ class FileHeader(RecordHeader):
self.first_record_offset, self.partition_num,
self.efp_data_size_kb, self.timestamp_str(),
self._get_warnings())
- def __str__(self):
- """Return a string representation of the this FileHeader instance"""
- return self.to_string()
class EnqueueRecord(RecordHeader):
FORMAT = '<2Q'
@@ -249,7 +240,7 @@ class EnqueueRecord(RecordHeader):
else:
return True
return False
- def to_string(self, show_xid_flag, show_data_flag):
+ def to_string(self, show_xid_flag, show_data_flag, txtest_flag):
"""Return a string representation of the this EnqueueRecord instance"""
if self.truncated_flag:
return '%s xid(%d) data(%d) [Truncated, no more files in journal]' % (RecordHeader.__str__(self),
@@ -260,7 +251,7 @@ class EnqueueRecord(RecordHeader):
record_tail_str = self.record_tail.to_string()
return '%s %s %s %s %s %s' % (self.to_rh_string(),
qlslibs.utils.format_xid(self.xid, self.xid_size, show_xid_flag),
- qlslibs.utils.format_data(self.data, self.data_size, show_data_flag),
+ qlslibs.utils.format_data(self.data, self.data_size, show_data_flag, txtest_flag),
record_tail_str, self._print_flags(), self._get_warnings())
def _print_flags(self):
"""Utility function to decode the flags field in the header and print a string representation"""
@@ -275,9 +266,6 @@ class EnqueueRecord(RecordHeader):
if len(fstr) > 0:
fstr += ']'
return fstr
- def __str__(self):
- """Return a string representation of the this EnqueueRecord instance"""
- return self.to_string(False, False)
class DequeueRecord(RecordHeader):
FORMAT = '<2Q'
@@ -323,7 +311,7 @@ class DequeueRecord(RecordHeader):
else:
return True
return False
- def to_string(self, show_xid_flag):
+ def to_string(self, show_xid_flag, _u1, _u2):
"""Return a string representation of the this DequeueRecord instance"""
if self.truncated_flag:
return '%s xid(%d) drid=0x%x [Truncated, no more files in journal]' % (RecordHeader.__str__(self),
@@ -344,9 +332,6 @@ class DequeueRecord(RecordHeader):
else:
return '[ABORT]'
return ''
- def __str__(self):
- """Return a string representation of the this DequeueRecord instance"""
- return self.to_string(False)
class TransactionRecord(RecordHeader):
FORMAT = '<Q'
@@ -384,7 +369,7 @@ class TransactionRecord(RecordHeader):
else:
return True
return False
- def to_string(self, show_xid_flag):
+ def to_string(self, show_xid_flag, _u1, _u2):
"""Return a string representation of the this TransactionRecord instance"""
if self.truncated_flag:
return '%s xid(%d) [Truncated, no more files in journal]' % (RecordHeader.__str__(self), self.xid_size)
@@ -395,9 +380,6 @@ class TransactionRecord(RecordHeader):
return '%s %s %s %s' % (self.to_rh_string(),
qlslibs.utils.format_xid(self.xid, self.xid_size, show_xid_flag),
record_tail_str, self._get_warnings())
- def __str__(self):
- """Return a string representation of the this TransactionRecord instance"""
- return self.to_string(False)
# =============================================================================
diff --git a/qpid/tools/src/py/qlslibs/utils.py b/qpid/tools/src/py/qlslibs/utils.py
index c32f8c7abb..dfa760a839 100644
--- a/qpid/tools/src/py/qlslibs/utils.py
+++ b/qpid/tools/src/py/qlslibs/utils.py
@@ -73,13 +73,13 @@ def efp_directory_size(directory_name):
pass
return 0
-def format_data(data, data_size=None, show_data_flag=True):
+def format_data(data, data_size=None, show_data_flag=True, txtest_flag=False):
"""Format binary data for printing"""
- return _format_binary(data, data_size, show_data_flag, 'data', qlslibs.err.DataSizeError, False)
+ return _format_binary(data, data_size, show_data_flag, 'data', qlslibs.err.DataSizeError, False, txtest_flag)
def format_xid(xid, xid_size=None, show_xid_flag=True):
"""Format binary XID for printing"""
- return _format_binary(xid, xid_size, show_xid_flag, 'xid', qlslibs.err.XidSizeError, True)
+ return _format_binary(xid, xid_size, show_xid_flag, 'xid', qlslibs.err.XidSizeError, True, False)
def get_avail_disk_space(path):
df_proc = subprocess.Popen(["df", path], stdout=subprocess.PIPE)
@@ -133,7 +133,7 @@ def skip(file_handle, boundary):
#--- protected functions ---
-def _format_binary(bin_str, bin_size, show_bin_flag, prefix, err_class, hex_num_flag):
+def _format_binary(bin_str, bin_size, show_bin_flag, prefix, err_class, hex_num_flag, txtest_flag):
"""Format binary XID for printing"""
if bin_str is None and bin_size is not None:
if bin_size > 0:
@@ -144,14 +144,16 @@ def _format_binary(bin_str, bin_size, show_bin_flag, prefix, err_class, hex_num_
elif bin_size != len(bin_str):
raise err_class(bin_size, len(bin_str), bin_str)
out_str = '%s(%d)' % (prefix, bin_size)
- if show_bin_flag:
+ if txtest_flag:
+ out_str += '=\'%s\'' % _txtest_msg_str(bin_str)
+ elif show_bin_flag:
if _is_printable(bin_str):
binstr = '"%s"' % _split_str(bin_str)
elif hex_num_flag:
binstr = '0x%s' % _str_to_hex_num(bin_str)
else:
- binstr = _hex_split_str(bin_str)
- out_str += '=%s' % binstr
+ binstr = _hex_split_str(bin_str, 50, 10, 10)
+ out_str += '=\'%s\'' % binstr
return out_str
def _hex_str(in_str, begin, end):
@@ -164,12 +166,20 @@ def _hex_str(in_str, begin, end):
hstr += '\\%02x' % ord(in_str[index])
return hstr
-def _hex_split_str(in_str, split_size = 50):
+def _hex_split_str(in_str, split_size, head_size, tail_size):
"""Split a hex string into two parts separated by an ellipsis"""
if len(in_str) <= split_size:
return _hex_str(in_str, 0, len(in_str))
- return _hex_str(in_str, 0, 10) + ' ... ' + _hex_str(in_str, len(in_str)-10, len(in_str))
- #return ''.join(x.encode('hex') for x in reversed(in_str))
+ return _hex_str(in_str, 0, head_size) + ' ... ' + _hex_str(in_str, len(in_str)-tail_size, len(in_str))
+
+def _txtest_msg_str(bin_str):
+ """Extract the message number used in qpid-txtest"""
+ msg_index = bin_str.find('msg')
+ if msg_index >= 0:
+ end_index = bin_str.find('\x00', msg_index)
+ assert end_index >= 0
+ return bin_str[msg_index:end_index]
+ return None
def _is_printable(in_str):
"""Return True if in_str in printable; False otherwise."""
diff --git a/qpid/tools/src/py/qpid-qls-analyze b/qpid/tools/src/py/qpid-qls-analyze
index 35b370f4a3..7fbf6b1bb2 100755
--- a/qpid/tools/src/py/qpid-qls-analyze
+++ b/qpid/tools/src/py/qpid-qls-analyze
@@ -52,11 +52,17 @@ class QlsAnalyzerArgParser(argparse.ArgumentParser):
self.add_argument('--show-all-recs', action='store_true',
help='Show all records (including fillers) found during recovery')
self.add_argument('--show-xids', action='store_true',
- help='Show xid as hex number, otherwise show only xid length')
+ help='Show xid as hex number, otherwise show only xid length. Only has effect when records are shown')
+# TODO: Add ability to show xid as an index rather than a value, helps analysis when xid is a long value with
+# small differences which cannot easily be seen when looking at an output. Also prints a table of indeces vs xid values.
+# self.add_argument('--show-xid-index', action='store_true',
+# help='Show xids by index rather than by their value. Useful for long xids. Prints xid index table')
self.add_argument('--show-data', action='store_true',
- help='Show data, otherwise show only data length')
+ help='Show data, otherwise show only data length. Only has effect when records are shown')
self.add_argument('--stats', action='store_true',
help='Print journal record stats')
+ self.add_argument('--txtest', action='store_true',
+ help='Show qpid-txtest message number as the message content when viewing records. Only has effect when records are shown')
self.add_argument('--txn', action='store_true',
help='Reconcile incomplete transactions')
self.add_argument('--version', action='version',
@@ -91,7 +97,7 @@ class QqpdLinearStoreAnalyzer(object):
""" Create a report on the linear store previously analyzed using analyze() """
if self.args.efp:
self.efp_manager.report()
- self.jrnl_recovery_mgr.report(self.args.stats)
+ self.jrnl_recovery_mgr.report()
def run(self):
""" Run the analyzer, which reads and analyzes the linear store """
if self.args.efp: