diff options
author | Kim van der Riet <kpvdr@apache.org> | 2015-01-16 18:48:13 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2015-01-16 18:48:13 +0000 |
commit | b47672cca797af3bf7075d98dad71321ad29fe41 (patch) | |
tree | 6a61f93cb2daae01ac885edcdf8af6f58363ca4b /tools | |
parent | 1142cd6bb04d14541240c9a69352937aac6b09ee (diff) | |
download | qpid-python-b47672cca797af3bf7075d98dad71321ad29fe41.tar.gz |
QPID-5362: Linearstore: No store tools exist for examining the journals - Bugfix and update for new partition and directory structure change from QPID-5671 and QPID-6303
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1652490 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'tools')
-rw-r--r-- | tools/src/py/qlslibs/analyze.py (renamed from tools/src/py/qlslibs/anal.py) | 39 | ||||
-rw-r--r-- | tools/src/py/qlslibs/efp.py | 60 | ||||
-rw-r--r-- | tools/src/py/qlslibs/jrnl.py | 23 | ||||
-rwxr-xr-x | tools/src/py/qpid-qls-analyze | 16 |
4 files changed, 82 insertions, 56 deletions
diff --git a/tools/src/py/qlslibs/anal.py b/tools/src/py/qlslibs/analyze.py index df51c1b2b4..a67e17e426 100644 --- a/tools/src/py/qlslibs/anal.py +++ b/tools/src/py/qlslibs/analyze.py @@ -18,7 +18,7 @@ # """ -Module: qlslibs.anal +Module: qlslibs.analyze Classes for recovery and analysis of a Qpid Linear Store (QLS). """ @@ -41,8 +41,8 @@ class HighCounter(object): return self.num class JournalRecoveryManager(object): - TPL_DIR_NAME = 'tpl' - JRNL_DIR_NAME = 'jrnl' + TPL_DIR_NAME = 'tpl2' + JRNL_DIR_NAME = 'jrnl2' def __init__(self, directory, args): if not os.path.exists(directory): raise qlslibs.err.InvalidQlsDirectoryNameError(directory) @@ -55,15 +55,15 @@ class JournalRecoveryManager(object): def report(self, print_stats_flag): self._reconcile_transactions(self.prepared_list, self.args.txn) if self.tpl is not None: - self.tpl.report(print_stats_flag) + self.tpl.report(print_stats_flag, self.args.show_recovered_recs) for queue_name in sorted(self.journals.keys()): - self.journals[queue_name].report(print_stats_flag) + self.journals[queue_name].report(print_stats_flag, self.args.show_recovered_recs) def run(self): tpl_dir = os.path.join(self.directory, JournalRecoveryManager.TPL_DIR_NAME) if os.path.exists(tpl_dir): self.tpl = Journal(tpl_dir, None, self.args) self.tpl.recover(self.high_rid_counter) - if self.args.show_recs or self.args.show_all_recs: + if self.args.show_recovery_recs or self.args.show_all_recs: print jrnl_dir = os.path.join(self.directory, JournalRecoveryManager.JRNL_DIR_NAME) self.prepared_list = self.tpl.txn_map.get_prepared_list() if self.tpl is not None else {} @@ -72,8 +72,9 @@ class JournalRecoveryManager(object): jrnl = Journal(os.path.join(jrnl_dir, dir_entry), self.prepared_list, self.args) jrnl.recover(self.high_rid_counter) self.journals[jrnl.get_queue_name()] = jrnl - if self.args.show_recs or self.args.show_all_recs: + if self.args.show_recovery_recs or self.args.show_all_recs: print + print def _reconcile_transactions(self, prepared_list, txn_flag): print 'Transaction reconciliation report:' print '==================================' @@ -315,6 +316,7 @@ class Journal(object): """ Instance of a Qpid Linear Store (QLS) journal. """ + JRNL_SUFFIX = 'jrnl' def __init__(self, directory, xid_prepared_list, args): self.directory = directory self.queue_name = os.path.basename(directory) @@ -350,7 +352,7 @@ class Journal(object): def get_queue_name(self): return self.queue_name def recover(self, high_rid_counter): - print 'Recovering', self.queue_name + print 'Recovering %s...' % self.queue_name, self._analyze_files() try: while self._get_next_record(high_rid_counter): @@ -362,6 +364,7 @@ class Journal(object): print '0x%08x: **** FRO ERROR: queue=\"%s\" fid=0x%x fro actual=0x%08x expected=0x%08x' % \ (err.get_expected_fro(), err.get_queue_name(), err.get_file_number(), err.get_record_offset(), err.get_expected_fro()) + print 'done' def reconcile_transactions(self, prepared_list, txn_flag): xid_list = self.txn_map.get_xid_list() if len(xid_list) > 0: @@ -385,13 +388,13 @@ class Journal(object): print ' ', qlslibs.utils.format_xid(xid), '- Ignoring, not in prepared transaction list' if txn_flag: self.txn_map.abort(xid) - def report(self, print_stats_flag): + def report(self, print_stats_flag, show_recovered_records): print 'Journal "%s":' % self.queue_name print '=' * (11 + len(self.queue_name)) if print_stats_flag: print str(self.statistics) - print self.enq_map.report_str(True, True) - print self.txn_map.report_str(True, True) + print self.enq_map.report_str(True, show_recovered_records) + print self.txn_map.report_str(True, show_recovered_records) JournalFile.report_header() for file_num in sorted(self.files.keys()): self.files[file_num].report() @@ -405,7 +408,7 @@ class Journal(object): def _analyze_files(self): for dir_entry in os.listdir(self.directory): dir_entry_bits = dir_entry.split('.') - if len(dir_entry_bits) == 2 and dir_entry_bits[1] == JournalRecoveryManager.JRNL_DIR_NAME: + if len(dir_entry_bits) == 2 and dir_entry_bits[1] == Journal.JRNL_SUFFIX: fq_file_name = os.path.join(self.directory, dir_entry) file_handle = open(fq_file_name) args = qlslibs.utils.load_args(file_handle, qlslibs.jrnl.RecordHeader) @@ -413,7 +416,7 @@ class Journal(object): file_hdr.init(file_handle, *qlslibs.utils.load_args(file_handle, qlslibs.jrnl.FileHeader)) if file_hdr.is_header_valid(file_hdr): file_hdr.load(file_handle) - if file_hdr.is_valid(): + if file_hdr.is_valid(False): qlslibs.utils.skip(file_handle, file_hdr.file_header_size_sblks * qlslibs.utils.DEFAULT_SBLK_SIZE) self.files[file_hdr.file_num] = JournalFile(file_hdr) @@ -430,7 +433,7 @@ class Journal(object): qlslibs.utils.DEFAULT_DBLK_SIZE self.fill_to_offset = self.last_record_offset + \ (self.num_filler_records_required * qlslibs.utils.DEFAULT_DBLK_SIZE) - if self.args.show_recs or self.args.show_all_recs: + if self.args.show_recovery_recs or self.args.show_all_recs: print '0x%x:0x%08x: %d filler records required for DBLK alignment to 0x%08x' % \ (self.current_journal_file.file_header.file_num, self.last_record_offset, self.num_filler_records_required, self.fill_to_offset) @@ -460,7 +463,7 @@ class Journal(object): return False self.current_journal_file = self.files[file_num] self.first_rec_flag = True - if self.args.show_recs or self.args.show_all_recs: + if self.args.show_recovery_recs or self.args.show_all_recs: file_header = self.current_journal_file.file_header print '0x%x:%s' % (file_header.file_num, file_header.to_string()) return True @@ -480,18 +483,18 @@ class Journal(object): if isinstance(this_record, qlslibs.jrnl.EnqueueRecord): ok_flag = self._handle_enqueue_record(this_record, start_journal_file) high_rid_counter.check(this_record.record_id) - if self.args.show_recs or self.args.show_all_recs: + if self.args.show_recovery_recs or self.args.show_all_recs: print '0x%x:%s' % (start_journal_file.file_header.file_num, \ this_record.to_string(self.args.show_xids, self.args.show_data)) elif isinstance(this_record, qlslibs.jrnl.DequeueRecord): ok_flag = self._handle_dequeue_record(this_record, start_journal_file) high_rid_counter.check(this_record.record_id) - if self.args.show_recs or self.args.show_all_recs: + if self.args.show_recovery_recs or self.args.show_all_recs: print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record.to_string(self.args.show_xids)) elif isinstance(this_record, qlslibs.jrnl.TransactionRecord): ok_flag = self._handle_transaction_record(this_record, start_journal_file) high_rid_counter.check(this_record.record_id) - if self.args.show_recs or self.args.show_all_recs: + if self.args.show_recovery_recs or self.args.show_all_recs: print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record.to_string(self.args.show_xids)) else: self.statistics.filler_record_count += 1 diff --git a/tools/src/py/qlslibs/efp.py b/tools/src/py/qlslibs/efp.py index 0fa27b3519..1c751c3d06 100644 --- a/tools/src/py/qlslibs/efp.py +++ b/tools/src/py/qlslibs/efp.py @@ -85,15 +85,16 @@ class EfpManager(object): self.efp_partitions.remove(self.current_efp_partition) shutil.rmtree(os.path.join(self.current_efp_partition.efp_directory, dir_name)) def report(self): - print 'Empty File Pool (EFP) report:' - print '=============================' - print 'Found', len(self.efp_partitions), 'partition(s).' + print 'Empty File Pool (EFP) report' + print '============================' + print 'Found', len(self.efp_partitions), 'partition(s)' if (len(self.efp_partitions)) > 0: + sorted_efp_partitions = sorted(self.efp_partitions, key=lambda x: x.partition_number) EfpPartition.print_report_table_header() - for ptn in self.efp_partitions: + for ptn in sorted_efp_partitions: ptn.print_report_table_line() print - for ptn in self.efp_partitions: + for ptn in sorted_efp_partitions: ptn.report() def run(self, arg_tup): self._analyze_efp() @@ -182,9 +183,12 @@ class EfpPartition(object): self.tot_file_size_kb, self.directory) def report(self): print 'Partition %s:' % os.path.basename(self.directory) - EmptyFilePool.print_report_table_header() - for dir_name in self.efp_pools.keys(): - self.efp_pools[dir_name].print_report_table_line() + if len(self.efp_pools) > 0: + EmptyFilePool.print_report_table_header() + for dir_name in self.efp_pools.keys(): + self.efp_pools[dir_name].print_report_table_line() + else: + print '<empty - no EFPs found in this partition>' print def scan(self): if os.path.exists(self.directory): @@ -217,13 +221,16 @@ class EmptyFilePool(object): """ EFP_DIR_SUFFIX = 'k' EFP_JRNL_EXTENTION = '.jrnl' + EFP_INUSE_DIRNAME = 'in_use' + EFP_RETURNED_DIRNAME = 'returned' def __init__(self, directory, partition_number): self.base_dir_name = os.path.basename(directory) self.directory = directory self.partition_number = partition_number self.data_size_kb = None - self.files = [] - self.tot_file_size_kb = 0 + self.efp_files = [] + self.in_use_files = [] + self.returned_files = [] self._validate_efp_directory() def create_new_efp_files(self, num_files): """ Create one or more new empty journal files of the prescribed size for this EFP """ @@ -238,24 +245,37 @@ class EmptyFilePool(object): """ Static function to create an EFP directory name from the size of the files it contains """ return '%dk' % file_size_kb def get_tot_file_count(self): - return len(self.files) + return len(self.efp_files) def get_tot_file_size_kb(self): - return self.data_size_kb * len(self.files) + return self.data_size_kb * len(self.efp_files) @staticmethod def print_report_table_header(): - print 'data_size_kb file_count tot_file_size_kb efp_directory' - print '------------ ---------- ---------------- -------------' + print ' ---------- efp ------------ --------- in_use ---------- -------- returned ---------' + print 'data_size_kb file_count tot_file_size_kb file_count tot_file_size_kb file_count tot_file_size_kb efp_directory' + print '------------ ---------- ---------------- ---------- ---------------- ---------- ---------------- -------------' def print_report_table_line(self): - print '%12d %10d %16d %s' % (self.data_size_kb, self.get_tot_file_count(), - self.get_tot_file_size_kb(), self.get_directory()) + print '%12d %10d %16d %10d %16d %10d %16d %s' % (self.data_size_kb, len(self.efp_files), + self.data_size_kb * len(self.efp_files), + len(self.in_use_files), + self.data_size_kb * len(self.in_use_files), + len(self.returned_files), + self.data_size_kb * len(self.returned_files), + self.get_directory()) def scan(self): for efp_file in os.listdir(self.directory): + if efp_file == self.EFP_INUSE_DIRNAME: + for in_use_file in os.listdir(os.path.join(self.directory, self.EFP_INUSE_DIRNAME)): + self.in_use_files.append(in_use_file) + continue + if efp_file == self.EFP_RETURNED_DIRNAME: + for returned_file in os.listdir(os.path.join(self.directory, self.EFP_RETURNED_DIRNAME)): + self.returned_files.append(returned_file) + continue if self._validate_efp_file(os.path.join(self.directory, efp_file)): - self.files.append(efp_file) + self.efp_files.append(efp_file) def _add_efp_file(self, efp_file_name): """ Add a single journal file of the appropriate size to this EFP. No file size check is made here. """ - self.files.append(efp_file_name) - self.tot_file_size_kb += os.path.getsize(efp_file_name) + self.efp_files.append(efp_file_name) def _create_new_efp_file(self): """ Create a single new empty journal file of the prescribed size for this EFP """ file_name = str(uuid.uuid4()) + EmptyFilePool.EFP_JRNL_EXTENTION @@ -296,7 +316,7 @@ class EmptyFilePool(object): return False file_hdr.load(file_handle) file_handle.close() - if not file_hdr.is_valid(): + if not file_hdr.is_valid(True): return False return True diff --git a/tools/src/py/qlslibs/jrnl.py b/tools/src/py/qlslibs/jrnl.py index 555cf6a8ae..ee25015220 100644 --- a/tools/src/py/qlslibs/jrnl.py +++ b/tools/src/py/qlslibs/jrnl.py @@ -155,19 +155,24 @@ class FileHeader(RecordHeader): self.queue_name = file_handle.read(self.queue_name_len) def is_end_of_file(self): return self.file_handle.tell() >= self.get_file_size() - def is_valid(self): + def is_valid(self, is_empty): if not RecordHeader.is_header_valid(self, self): return False if self.file_handle is None or self.file_header_size_sblks == 0 or self.partition_num == 0 or \ - self.efp_data_size_kb == 0 or self.first_record_offset == 0 or self.timestamp_sec == 0 or \ - self.timestamp_ns == 0 or self.file_num == 0: - return False - if self.queue_name_len == 0: - return False - if self.queue_name is None: - return False - if len(self.queue_name) != self.queue_name_len: + self.efp_data_size_kb == 0: return False + if is_empty: + if self.first_record_offset != 0 or self.timestamp_sec != 0 or self.timestamp_ns != 0 or \ + self.file_num != 0 or self.queue_name_len != 0: + return False + else: + if self.first_record_offset == 0 or self.timestamp_sec == 0 or self.timestamp_ns == 0 or \ + self.file_num == 0 or self.queue_name_len == 0: + return False + if self.queue_name is None: + return False + if len(self.queue_name) != self.queue_name_len: + return False return True def timestamp_str(self): """Get the timestamp of this record in string format""" diff --git a/tools/src/py/qpid-qls-analyze b/tools/src/py/qpid-qls-analyze index 5c7d04b4c0..35b370f4a3 100755 --- a/tools/src/py/qpid-qls-analyze +++ b/tools/src/py/qpid-qls-analyze @@ -28,11 +28,11 @@ import sys default = os.path.normpath('/usr/share/qpid-tools') home = os.environ.get('QPID_TOOLS_HOME', default) -sys.path.append(os.path.join(home,'python')) +sys.path.append(os.path.join(home, 'python')) import argparse import os -import qlslibs.anal +import qlslibs.analyze import qlslibs.efp class QlsAnalyzerArgParser(argparse.ArgumentParser): @@ -45,7 +45,9 @@ class QlsAnalyzerArgParser(argparse.ArgumentParser): help='Qpid Linear Store (QLS) directory to be analyzed') self.add_argument('--efp', action='store_true', help='Analyze the Emtpy File Pool (EFP) and show stats') - self.add_argument('--show-recs', action='store_true', + self.add_argument('--show-recovered-recs', action='store_true', + help='Show only recovered records') + self.add_argument('--show-recovery-recs', action='store_true', help='Show material records found during recovery') self.add_argument('--show-all-recs', action='store_true', help='Show all records (including fillers) found during recovery') @@ -72,13 +74,13 @@ class QqpdLinearStoreAnalyzer(object): * The Linear Store * The Transaction Prepared List (TPL) """ - QLS_ANALYZE_VERSION = '0.1' + QLS_ANALYZE_VERSION = '1.0' def __init__(self): self.args = None self._process_args() self.qls_dir = os.path.abspath(self.args.qls_dir) self.efp_manager = qlslibs.efp.EfpManager(self.qls_dir, None) - self.jrnl_recovery_mgr = qlslibs.anal.JournalRecoveryManager(self.qls_dir, self.args) + self.jrnl_recovery_mgr = qlslibs.analyze.JournalRecoveryManager(self.qls_dir, self.args) def _process_args(self): """ Create arg parser and process args """ parser = QlsAnalyzerArgParser() @@ -101,10 +103,6 @@ class QqpdLinearStoreAnalyzer(object): #============================================================================== if __name__ == "__main__": - # TODO: Remove this in due course - print 'WARNING: This program is still a work in progress and is largely untested.' - print '* USE AT YOUR OWN RISK *' - print M = QqpdLinearStoreAnalyzer() M.run() M.report() |