summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2014-02-05 15:34:42 +0000
committerKim van der Riet <kpvdr@apache.org>2014-02-05 15:34:42 +0000
commitcfae4006fc5f91a693c5edef075ab3c9d848a111 (patch)
tree90b2240036a494d1dc2dfe6d3d139563a2d7fc7c
parentd7aae95e552fa9e06d8332a4ec8732a2cc667a0b (diff)
downloadqpid-python-cfae4006fc5f91a693c5edef075ab3c9d848a111.tar.gz
QPID-5362: Bugfixes and enhancements for qpid_qls_analyze
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1564808 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/tools/src/py/qls/efp.py3
-rw-r--r--qpid/tools/src/py/qls/err.py10
-rw-r--r--qpid/tools/src/py/qls/jrnl.py194
-rwxr-xr-xqpid/tools/src/py/qpid_qls_analyze.py8
4 files changed, 150 insertions, 65 deletions
diff --git a/qpid/tools/src/py/qls/efp.py b/qpid/tools/src/py/qls/efp.py
index 3ad8104faa..abf289dc12 100644
--- a/qpid/tools/src/py/qls/efp.py
+++ b/qpid/tools/src/py/qls/efp.py
@@ -26,10 +26,11 @@ class EfpManager(object):
Top level class to analyze the Qpid Linear Store (QLS) directory for the partitions that make up the
Empty File Pool (EFP).
"""
- def __init__(self, directory):
+ def __init__(self, directory, args):
if not os.path.exists(directory):
raise qls.err.InvalidQlsDirectoryNameError(directory)
self.directory = directory
+ self.args = args
self.partitions = []
def report(self):
print 'Found', len(self.partitions), 'partition(s).'
diff --git a/qpid/tools/src/py/qls/err.py b/qpid/tools/src/py/qls/err.py
index d56d739915..702fbb9520 100644
--- a/qpid/tools/src/py/qls/err.py
+++ b/qpid/tools/src/py/qls/err.py
@@ -30,6 +30,16 @@ class QlsRecordError(QlsError):
QlsError.__init__(self)
self.file_header = file_header
self.record = record
+ def get_expected_fro(self):
+ return self.file_header.first_record_offset
+ def get_file_number(self):
+ return self.file_header.file_num
+ def get_queue_name(self):
+ return self.file_header.queue_name
+ def get_record_id(self):
+ return self.record.record_id
+ def get_record_offset(self):
+ return self.record.file_offset
def __str__(self):
return 'queue="%s" file_id=0x%x record_offset=0x%x record_id=0x%x' % \
(self.file_header.queue_name, self.file_header.file_num, self.record.file_offset, self.record.record_id)
diff --git a/qpid/tools/src/py/qls/jrnl.py b/qpid/tools/src/py/qls/jrnl.py
index ffd200ddba..5bce78bfad 100644
--- a/qpid/tools/src/py/qls/jrnl.py
+++ b/qpid/tools/src/py/qls/jrnl.py
@@ -23,6 +23,7 @@ import qls.err
import string
import struct
from time import gmtime, strftime
+import zlib
class HighCounter(object):
def __init__(self):
@@ -39,10 +40,11 @@ class HighCounter(object):
class JournalRecoveryManager(object):
TPL_DIR_NAME = 'tpl'
JRNL_DIR_NAME = 'jrnl'
- def __init__(self, directory):
+ def __init__(self, directory, args):
if not os.path.exists(directory):
raise qls.err.InvalidQlsDirectoryNameError(directory)
self.directory = directory
+ self.args = args
self.tpl = None
self.journals = {}
self.high_rid_counter = HighCounter()
@@ -54,20 +56,21 @@ class JournalRecoveryManager(object):
def run(self, args):
tpl_dir = os.path.join(self.directory, JournalRecoveryManager.TPL_DIR_NAME)
if os.path.exists(tpl_dir):
- self.tpl = Journal(tpl_dir, None)
+ self.tpl = Journal(tpl_dir, None, self.args)
self.tpl.recover(self.high_rid_counter)
print
jrnl_dir = os.path.join(self.directory, JournalRecoveryManager.JRNL_DIR_NAME)
- prepared_list = self.tpl.txn_map.get_prepared_list()
+ prepared_list = self.tpl.txn_map.get_prepared_list() if self.tpl is not None else {}
if os.path.exists(jrnl_dir):
for dir_entry in sorted(os.listdir(jrnl_dir)):
- jrnl = Journal(os.path.join(jrnl_dir, dir_entry), prepared_list)
+ jrnl = Journal(os.path.join(jrnl_dir, dir_entry), prepared_list, self.args)
jrnl.recover(self.high_rid_counter)
self.journals[jrnl.get_queue_name()] = jrnl
print
self._reconcile_transactions(prepared_list, args.txn)
def _reconcile_transactions(self, prepared_list, txn_flag):
print 'Transaction reconciliation report:'
+ print '=================================='
print len(prepared_list), 'open transaction(s) found in prepared transaction list:'
for xid in prepared_list.keys():
commit_flag = prepared_list[xid]
@@ -79,7 +82,7 @@ class JournalRecoveryManager(object):
status = '[Prepared, but interrupted during abort phase]'
print ' ', Utils.format_xid(xid), status
if prepared_list[xid] is None: # Prepared, but not committed or aborted
- enqueue_record = self.tpl.get_txn_map_record(xid)
+ enqueue_record = self.tpl.get_txn_map_record(xid)[0][1]
dequeue_record = Utils.create_record('QLSd', DequeueRecord.TXN_COMPLETE_COMMIT_FLAG, \
self.tpl.current_journal_file, self.high_rid_counter.get_next(), \
enqueue_record.record_id, xid, None)
@@ -141,7 +144,7 @@ class EnqueueMap(object):
lock_str = '[LOCKED]'
else:
lock_str = ''
- rstr += '\n %d:%s %s' % (journal_file.file_header.file_num, record, lock_str)
+ rstr += '\n 0x%x:%s %s' % (journal_file.file_header.file_num, record, lock_str)
else:
rstr += '.'
return rstr
@@ -245,7 +248,7 @@ class TransactionMap(object):
for xid, op_list in self.txn_map.iteritems():
rstr += '\n %s containing %d operations:' % (Utils.format_xid(xid), len(op_list))
for journal_file, record, _ in op_list:
- rstr += '\n %d:%s' % (journal_file.file_header.file_num, record)
+ rstr += '\n 0x%x:%s' % (journal_file.file_header.file_num, record)
else:
rstr += '.'
return rstr
@@ -303,7 +306,7 @@ class Journal(object):
"""
Instance of a Qpid Linear Store (QLS) journal.
"""
- def __init__(self, directory, xid_prepared_list):
+ def __init__(self, directory, xid_prepared_list, args):
self.directory = directory
self.queue_name = os.path.basename(directory)
self.files = {}
@@ -315,6 +318,10 @@ class Journal(object):
self.first_rec_flag = None
self.statistics = JournalStatistics()
self.xid_prepared_list = xid_prepared_list # This is None for the TPL instance only
+ self.args = args
+ self.last_record_offset = None # TODO: Move into JournalFile
+ self.num_filler_records_required = None # TODO: Move into JournalFile
+ self.fill_to_offset = None
def add_record(self, record):
if isinstance(record, EnqueueRecord) or isinstance(record, DequeueRecord):
if record.xid_size > 0:
@@ -334,15 +341,18 @@ class Journal(object):
def get_queue_name(self):
return self.queue_name
def recover(self, high_rid_counter):
- print 'Recovering', self.queue_name #DEBUG
+ print 'Recovering %s' % self.queue_name
self._analyze_files()
try:
while self._get_next_record(high_rid_counter):
pass
+ self._check_alignment()
except qls.err.NoMoreFilesInJournalError:
- #print '[No more files in journal]' # DEBUG
- #print #DEBUG
- pass
+ print 'No more files in journal'
+ except qls.err.FirstRecordOffsetMismatchError as err:
+ print '0x%08x: **** FRO ERROR: queue=\"%s\" fid=0x%x fro actual=0x%08x expected=0x%08x' % \
+ (err.get_expected_fro(), err.get_queue_name(), err.get_file_number(), err.get_record_offset(),
+ err.get_expected_fro())
def reconcile_transactions(self, prepared_list, txn_flag):
xid_list = self.txn_map.get_xid_list()
if len(xid_list) > 0:
@@ -363,11 +373,12 @@ class Journal(object):
if txn_flag:
self.txn_map.abort(xid)
else:
- print ' ', Utils.format_xid(xid), '- Aborting, not in prepared transaction list'
+ print ' ', Utils.format_xid(xid), '- Ignoring, not in prepared transaction list'
if txn_flag:
self.txn_map.abort(xid)
def report(self, print_stats_flag):
print 'Journal "%s":' % self.queue_name
+ print '=' * (11 + len(self.queue_name))
if print_stats_flag:
print str(self.statistics)
print self.enq_map.report_str(True, True)
@@ -375,6 +386,11 @@ class Journal(object):
JournalFile.report_header()
for file_num in sorted(self.files.keys()):
self.files[file_num].report()
+ #TODO: move this to JournalFile, append to file info
+ if self.num_filler_records_required is not None and self.fill_to_offset is not None:
+ print '0x%x:0x%08x: %d filler records required for DBLK alignment to 0x%08x' % \
+ (self.current_journal_file.file_header.file_num, self.last_record_offset,
+ self.num_filler_records_required, self.fill_to_offset)
print
#--- protected functions ---
def _analyze_files(self):
@@ -386,21 +402,35 @@ class Journal(object):
args = Utils.load_args(file_handle, RecordHeader)
file_hdr = FileHeader(*args)
file_hdr.init(file_handle, *Utils.load_args(file_handle, FileHeader))
- if not file_hdr.is_header_valid(file_hdr):
- break
- file_hdr.load(file_handle)
- if not file_hdr.is_valid():
- break
- Utils.skip(file_handle, file_hdr.file_header_size_sblks * Utils.SBLK_SIZE)
- self.files[file_hdr.file_num] = JournalFile(file_hdr)
+ if file_hdr.is_header_valid(file_hdr):
+ file_hdr.load(file_handle)
+ if file_hdr.is_valid():
+ Utils.skip(file_handle, file_hdr.file_header_size_sblks * Utils.SBLK_SIZE)
+ self.files[file_hdr.file_num] = JournalFile(file_hdr)
self.file_num_list = sorted(self.files.keys())
self.file_num_itr = iter(self.file_num_list)
+ def _check_alignment(self): # TODO: Move into JournalFile
+ remaining_sblks = self.last_record_offset % Utils.SBLK_SIZE
+ if remaining_sblks == 0:
+ self.num_filler_records_required = 0
+ else:
+ self.num_filler_records_required = (Utils.SBLK_SIZE - remaining_sblks) / Utils.DBLK_SIZE
+ self.fill_to_offset = self.last_record_offset + (self.num_filler_records_required * Utils.DBLK_SIZE)
+ if self.args.show_recs or self.args.show_all_recs:
+ print '0x%x:0x%08x: %d filler records required for DBLK alignment to 0x%08x' % \
+ (self.current_journal_file.file_header.file_num, self.last_record_offset,
+ self.num_filler_records_required, self.fill_to_offset)
def _check_file(self):
- if self.current_journal_file is not None and not self.current_journal_file.file_header.is_end_of_file():
- return
- self._get_next_file()
+ if self.current_journal_file is not None:
+ if not self.current_journal_file.file_header.is_end_of_file():
+ return True
+ if self.current_journal_file.file_header.is_end_of_file():
+ self.last_record_offset = self.current_journal_file.file_header.file_handle.tell()
+ if not self._get_next_file():
+ return False
fhdr = self.current_journal_file.file_header
fhdr.file_handle.seek(fhdr.first_record_offset)
+ return True
def _get_next_file(self):
if self.current_journal_file is not None:
file_handle = self.current_journal_file.file_header.file_handle
@@ -413,13 +443,16 @@ class Journal(object):
except StopIteration:
pass
if file_num == 0:
- raise qls.err.NoMoreFilesInJournalError(self.queue_name)
+ return False
self.current_journal_file = self.files[file_num]
self.first_rec_flag = True
- print self.current_journal_file.file_header
- #print '[file_num=0x%x]' % self.current_journal_file.file_num #DEBUG
+ if self.args.show_recs or self.args.show_all_recs:
+ print '0x%x:%s' % (self.current_journal_file.file_header.file_num, self.current_journal_file.file_header)
+ return True
def _get_next_record(self, high_rid_counter):
- self._check_file()
+ if not self._check_file():
+ return False
+ self.last_record_offset = self.current_journal_file.file_header.file_handle.tell()
this_record = Utils.load(self.current_journal_file.file_header.file_handle, RecordHeader)
if not this_record.is_header_valid(self.current_journal_file.file_header):
return False
@@ -427,32 +460,42 @@ class Journal(object):
if this_record.file_offset != self.current_journal_file.file_header.first_record_offset:
raise qls.err.FirstRecordOffsetMismatchError(self.current_journal_file.file_header, this_record)
self.first_rec_flag = False
- high_rid_counter.check(this_record.record_id)
self.statistics.total_record_count += 1
+ start_journal_file = self.current_journal_file
if isinstance(this_record, EnqueueRecord):
- self._handle_enqueue_record(this_record)
- print this_record
+ ok_flag = self._handle_enqueue_record(this_record, start_journal_file)
+ high_rid_counter.check(this_record.record_id)
+ if self.args.show_recs or self.args.show_all_recs:
+ print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record)
elif isinstance(this_record, DequeueRecord):
- self._handle_dequeue_record(this_record)
- print this_record
+ ok_flag = self._handle_dequeue_record(this_record, start_journal_file)
+ high_rid_counter.check(this_record.record_id)
+ if self.args.show_recs or self.args.show_all_recs:
+ print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record)
elif isinstance(this_record, TransactionRecord):
- self._handle_transaction_record(this_record)
- print this_record
+ ok_flag = self._handle_transaction_record(this_record, start_journal_file)
+ high_rid_counter.check(this_record.record_id)
+ if self.args.show_recs or self.args.show_all_recs:
+ print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record)
else:
self.statistics.filler_record_count += 1
+ ok_flag = True
+ if self.args.show_all_recs:
+ print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record)
Utils.skip(self.current_journal_file.file_header.file_handle, Utils.DBLK_SIZE)
- return True
- def _handle_enqueue_record(self, enqueue_record):
- start_journal_file = self.current_journal_file
+ return ok_flag
+ def _handle_enqueue_record(self, enqueue_record, start_journal_file):
while enqueue_record.load(self.current_journal_file.file_header.file_handle):
- self._get_next_file()
- if not enqueue_record.is_valid(self.current_journal_file):
- return
+ if not self._get_next_file():
+ enqueue_record.truncated_flag = True
+ return False
+ if not enqueue_record.is_valid(start_journal_file):
+ return False
if enqueue_record.is_external() and enqueue_record.data != None:
raise qls.err.ExternalDataError(self.current_journal_file.file_header, enqueue_record)
if enqueue_record.is_transient():
self.statistics.transient_record_count += 1
- return
+ return True
if enqueue_record.xid_size > 0:
self.txn_map.add(start_journal_file, enqueue_record)
self.statistics.transaction_operation_count += 1
@@ -462,13 +505,14 @@ class Journal(object):
self.enq_map.add(start_journal_file, enqueue_record, False)
start_journal_file.incr_enq_cnt()
self.statistics.enqueue_count += 1
- #print enqueue_record, # DEBUG
- def _handle_dequeue_record(self, dequeue_record):
- start_journal_file = self.current_journal_file
+ return True
+ def _handle_dequeue_record(self, dequeue_record, start_journal_file):
while dequeue_record.load(self.current_journal_file.file_header.file_handle):
- self._get_next_file()
- if not dequeue_record.is_valid(self.current_journal_file):
- return
+ if not self._get_next_file():
+ dequeue_record.truncated_flag = True
+ return False
+ if not dequeue_record.is_valid(start_journal_file):
+ return False
if dequeue_record.xid_size > 0:
if self.xid_prepared_list is None: # ie this is the TPL
dequeue_record.transaction_prepared_list_flag = True
@@ -484,12 +528,14 @@ class Journal(object):
except qls.err.RecordIdNotFoundError:
dequeue_record.warnings.append('NOT IN EMAP')
self.statistics.dequeue_count += 1
- #print dequeue_record, # DEBUG
- def _handle_transaction_record(self, transaction_record):
+ return True
+ def _handle_transaction_record(self, transaction_record, start_journal_file):
while transaction_record.load(self.current_journal_file.file_header.file_handle):
- self._get_next_file()
- if not transaction_record.is_valid(self.current_journal_file):
- return
+ if not self._get_next_file():
+ transaction_record.truncated_flag = True
+ return False
+ if not transaction_record.is_valid(start_journal_file):
+ return False
if transaction_record.magic[-1] == 'a':
self.statistics.transaction_abort_count += 1
else:
@@ -501,7 +547,7 @@ class Journal(object):
# if transaction_record.magic[-1] == 'c': # commits only
# self._txn_obj_list[hdr.xid] = hdr
self.statistics.transaction_record_count += 1
- #print transaction_record, # DEBUG
+ return True
def _load_data(self, record):
while not record.is_complete:
record.load(self.current_journal_file.file_handle)
@@ -511,6 +557,7 @@ class JournalFile(object):
self.file_header = file_header
self.enq_cnt = 0
self.deq_cnt = 0
+ self.num_filler_records_required = None
def incr_enq_cnt(self):
self.enq_cnt += 1
def decr_enq_cnt(self, record):
@@ -527,7 +574,8 @@ class JournalFile(object):
print '-------- ------- ---- ----- ------------'
def report(self):
comment = '<uninitialized>' if self.file_header.file_num == 0 else ''
- print '%8d %7d %4d %4dk %s %s' % (self.file_header.file_num, self.get_enq_cnt(), self.file_header.partition_num,
+ file_num_str = '0x%x' % self.file_header.file_num
+ print '%8s %7d %4d %4dk %s %s' % (file_num_str, self.get_enq_cnt(), self.file_header.partition_num,
self.file_header.efp_data_size_kb,
os.path.basename(self.file_header.file_handle.name), comment)
@@ -541,6 +589,9 @@ class RecordHeader(object):
self.serial = serial
self.record_id = record_id
self.warnings = []
+ self.truncated_flag = False
+ def checksum_encode(self):
+ return struct.pack(RecordHeader.FORMAT, self.magic, self.version, self.user_flags, self.serial, self.record_id)
def load(self, file_handle):
pass
@staticmethod
@@ -560,8 +611,6 @@ class RecordHeader(object):
if self.version != Utils.RECORD_VERSION:
raise qls.err.InvalidRecordVersionError(file_header, self, Utils.RECORD_VERSION)
if self.serial != file_header.serial:
- #print '[serial mismatch at 0x%x]' % self.file_offset #DEBUG
- #print #DEBUG
return False
return True
def _get_warnings(self):
@@ -606,8 +655,8 @@ class RecordTail(object):
return False
self.valid_flag = Utils.inv_str(self.xmagic) == record.magic and \
self.serial == record.serial and \
- self.record_id == record.record_id
- # TODO: When we can verify the checksum, add this here
+ self.record_id == record.record_id and \
+ Utils.adler32(record.checksum_encode()) == self.checksum
return self.valid_flag
def __str__(self):
"""Return a string representation of the this RecordTail instance"""
@@ -660,7 +709,7 @@ class FileHeader(RecordHeader):
return strftime(fstr, time)
def __str__(self):
"""Return a string representation of the this FileHeader instance"""
- return '%s fnum=%d fro=0x%08x p=%d s=%dk t=%s %s' % (RecordHeader.__str__(self), self.file_num,
+ return '%s fnum=0x%x fro=0x%08x p=%d s=%dk t=%s %s' % (RecordHeader.__str__(self), self.file_num,
self.first_record_offset, self.partition_num,
self.efp_data_size_kb, self.timestamp_str(),
self._get_warnings())
@@ -677,6 +726,9 @@ class EnqueueRecord(RecordHeader):
self.data = None
self.data_complete = False
self.record_tail = None
+ def checksum_encode(self):
+ return RecordHeader.checksum_encode(self) + struct.pack(self.FORMAT, self.xid_size, self.data_size) + \
+ self.xid + self.data
def is_external(self):
return self.user_flags & EnqueueRecord.EXTERNAL_FLAG_MASK > 0
def is_transient(self):
@@ -732,6 +784,9 @@ class EnqueueRecord(RecordHeader):
return fstr
def __str__(self):
"""Return a string representation of the this EnqueueRecord instance"""
+ if self.truncated_flag:
+ return '%s xid(%d) data(%d) [Truncated, no more files in journal]' % (RecordHeader.__str__(self),
+ self.xid_size, self.data_size)
if self.record_tail is None:
record_tail_str = ''
else:
@@ -750,6 +805,9 @@ class DequeueRecord(RecordHeader):
self.xid = None
self.xid_complete = False
self.record_tail = None
+ def checksum_encode(self):
+ return RecordHeader.checksum_encode(self) + struct.pack(self.FORMAT, self.dequeue_record_id, self.xid_size) + \
+ self.xid
def is_transaction_complete_commit(self):
return self.user_flags & DequeueRecord.TXN_COMPLETE_COMMIT_FLAG > 0
def is_valid(self, journal_file):
@@ -790,6 +848,10 @@ class DequeueRecord(RecordHeader):
return ''
def __str__(self):
"""Return a string representation of the this DequeueRecord instance"""
+ if self.truncated_flag:
+ return '%s xid(%d) drid=0x%x [Truncated, no more files in journal]' % (RecordHeader.__str__(self),
+ self.xid_size,
+ self.dequeue_record_id)
if self.record_tail is None:
record_tail_str = ''
else:
@@ -805,6 +867,8 @@ class TransactionRecord(RecordHeader):
self.xid = None
self.xid_complete = False
self.record_tail = None
+ def checksum_encode(self):
+ return RecordHeader.checksum_encode(self) + struct.pack(self.FORMAT, self.xid_size) + self.xid
def is_valid(self, journal_file):
if not RecordHeader.is_header_valid(self, journal_file.file_header):
return False
@@ -832,6 +896,8 @@ class TransactionRecord(RecordHeader):
return False
def __str__(self):
"""Return a string representation of the this TransactionRecord instance"""
+ if self.truncated_flag:
+ return '%s xid(%d) [Truncated, no more files in journal]' % (RecordHeader.__str__(self), self.xid_size)
if self.record_tail is None:
record_tail_str = ''
else:
@@ -845,6 +911,9 @@ class Utils(object):
RECORD_VERSION = 2
SBLK_SIZE = 4096
@staticmethod
+ def adler32(data):
+ return zlib.adler32(data) & 0xffffffff
+ @staticmethod
def create_record(magic, uflags, journal_file, record_id, dequeue_record_id, xid, data):
record_class = _CLASSES.get(magic[-1])
record = record_class(0, magic, Utils.RECORD_VERSION, uflags, journal_file.file_header.serial, record_id)
@@ -866,7 +935,7 @@ class Utils(object):
record.data_complete = True
record.record_tail = RecordTail(None)
record.record_tail.xmagic = Utils.inv_str(magic)
- record.record_tail.checksum = 0 # TODO: when we can calculate checksums, add this here
+ record.record_tail.checksum = Utils.adler32(record.checksum_encode())
record.record_tail.serial = record.serial
record.record_tail.record_id = record.record_id
return record
@@ -878,7 +947,7 @@ class Utils(object):
# << DEBUG >>
begin = data.find('msg')
end = data.find('\0', begin)
- return 'data="%s"' % data[begin:end]
+ return 'data(%d)="%s"' % (dsize, data[begin:end])
# << END DEBUG
if Utils._is_printable(data):
datastr = Utils._split_str(data)
@@ -941,7 +1010,8 @@ class Utils(object):
@staticmethod
def skip(file_handle, boundary):
"""Read and discard disk bytes until the next multiple of boundary"""
- file_handle.read(Utils._rem_bytes_in_block(file_handle, boundary))
+ if not file_handle.closed:
+ file_handle.read(Utils._rem_bytes_in_block(file_handle, boundary))
#--- protected functions ---
@staticmethod
def _hex_str(in_str, begin, end):
diff --git a/qpid/tools/src/py/qpid_qls_analyze.py b/qpid/tools/src/py/qpid_qls_analyze.py
index 165d41fe95..a540587547 100755
--- a/qpid/tools/src/py/qpid_qls_analyze.py
+++ b/qpid/tools/src/py/qpid_qls_analyze.py
@@ -37,8 +37,8 @@ class QqpdLinearStoreAnalyzer(object):
self.args = None
self._process_args()
self.qls_dir = os.path.abspath(self.args.qls_dir)
- self.efp_manager = efp.EfpManager(self.qls_dir)
- self.jrnl_recovery_mgr = jrnl.JournalRecoveryManager(self.qls_dir)
+ self.efp_manager = efp.EfpManager(self.qls_dir, self.args)
+ self.jrnl_recovery_mgr = jrnl.JournalRecoveryManager(self.qls_dir, self.args)
def _analyze_efp(self):
self.efp_manager.run(self.args)
def _analyze_journals(self):
@@ -49,6 +49,10 @@ class QqpdLinearStoreAnalyzer(object):
help='Qpid Linear Store (QLS) directory to be analyzed')
parser.add_argument('--efp', action='store_true',
help='Analyze the Emtpy File Pool (EFP) and show stats')
+ parser.add_argument('--show-recs', action='store_true',
+ help='Show material records found during recovery')
+ parser.add_argument('--show-all-recs', action='store_true',
+ help='Show all records (including fillers) found during recovery')
parser.add_argument('--stats', action='store_true',
help='Print journal record stats')
parser.add_argument('--txn', action='store_true',