diff options
author | Kim van der Riet <kpvdr@apache.org> | 2014-05-21 17:33:51 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2014-05-21 17:33:51 +0000 |
commit | e5d39d0c6b513674984a1e0a8e57b3768eb70cbe (patch) | |
tree | f1cecab27d33d62fc0032a54cf758011aa921610 /tools | |
parent | b3071a27c902b298605fc57ec97155532d861a4e (diff) | |
download | qpid-python-e5d39d0c6b513674984a1e0a8e57b3768eb70cbe.tar.gz |
NO_JIRA: [linearstore] Update to ISSUES; whitespace fix from last checkin
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1596633 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'tools')
-rwxr-xr-x | tools/setup.py | 7 | ||||
-rw-r--r-- | tools/src/py/qlslibs/__init__.py (renamed from tools/src/py/qls/__init__.py) | 0 | ||||
-rw-r--r-- | tools/src/py/qlslibs/anal.py (renamed from tools/src/py/qls/anal.py) | 113 | ||||
-rw-r--r-- | tools/src/py/qlslibs/efp.py (renamed from tools/src/py/qls/efp.py) | 50 | ||||
-rw-r--r-- | tools/src/py/qlslibs/err.py (renamed from tools/src/py/qls/err.py) | 2 | ||||
-rw-r--r-- | tools/src/py/qlslibs/jrnl.py (renamed from tools/src/py/qls/jrnl.py) | 34 | ||||
-rw-r--r-- | tools/src/py/qlslibs/utils.py (renamed from tools/src/py/qls/utils.py) | 22 | ||||
-rwxr-xr-x | tools/src/py/qpid-qls-analyze (renamed from tools/src/py/qpid_qls_analyze.py) | 24 |
8 files changed, 137 insertions, 115 deletions
diff --git a/tools/setup.py b/tools/setup.py index cb40192d0b..808a800222 100755 --- a/tools/setup.py +++ b/tools/setup.py @@ -47,6 +47,13 @@ setup(name="qpid-tools", "src/py/qpid-stat", "src/py/qpid-tool", "src/py/qmf-tool"], + data_files=[("/usr/libexec", ["src/py/qpid-qls-analyze"]), + ("/usr/shared/qpid-tools/python/qlslibs", + ["src/py/qlslibs/anal.py", + "src/py/qlslibs/efp.py", + "src/py/qlslibs/err.py", + "src/py/qlslibs/jrnl.py", + "src/py/qlslibs/utils.py"])], url="http://qpid.apache.org/", license="Apache Software License", description="Diagnostic and management tools for Apache Qpid brokers.", diff --git a/tools/src/py/qls/__init__.py b/tools/src/py/qlslibs/__init__.py index d8a500d9d8..d8a500d9d8 100644 --- a/tools/src/py/qls/__init__.py +++ b/tools/src/py/qlslibs/__init__.py diff --git a/tools/src/py/qls/anal.py b/tools/src/py/qlslibs/anal.py index 865dfa16c7..502d860bfa 100644 --- a/tools/src/py/qls/anal.py +++ b/tools/src/py/qlslibs/anal.py @@ -18,15 +18,15 @@ # """ -Module: qls.anal +Module: qlslibs.anal Classes for recovery and analysis of a Qpid Linear Store (QLS). """ import os.path -import qls.err -import qls.jrnl -import qls.utils +import qlslibs.err +import qlslibs.jrnl +import qlslibs.utils class HighCounter(object): def __init__(self): @@ -45,7 +45,7 @@ class JournalRecoveryManager(object): JRNL_DIR_NAME = 'jrnl' def __init__(self, directory, args): if not os.path.exists(directory): - raise qls.err.InvalidQlsDirectoryNameError(directory) + raise qlslibs.err.InvalidQlsDirectoryNameError(directory) self.directory = directory self.args = args self.tpl = None @@ -86,14 +86,14 @@ class JournalRecoveryManager(object): status = '[Prepared, but interrupted during commit phase]' else: status = '[Prepared, but interrupted during abort phase]' - print ' ', qls.utils.format_xid(xid), status + print ' ', qlslibs.utils.format_xid(xid), status if prepared_list[xid] is None: # Prepared, but not committed or aborted enqueue_record = self.tpl.get_txn_map_record(xid)[0][1] - dequeue_record = qls.utils.create_record(qls.jrnl.DequeueRecord.MAGIC, \ - qls.jrnl.DequeueRecord.TXN_COMPLETE_COMMIT_FLAG, \ - self.tpl.current_journal_file, \ - self.high_rid_counter.get_next(), \ - enqueue_record.record_id, xid, None) + dequeue_record = qlslibs.utils.create_record(qlslibs.jrnl.DequeueRecord.MAGIC, \ + qlslibs.jrnl.DequeueRecord.TXN_COMPLETE_COMMIT_FLAG, \ + self.tpl.current_journal_file, \ + self.high_rid_counter.get_next(), \ + enqueue_record.record_id, xid, None) if txn_flag: self.tpl.add_record(dequeue_record) for queue_name in sorted(self.journals.keys()): @@ -101,10 +101,10 @@ class JournalRecoveryManager(object): if len(prepared_list) > 0: print 'Completing prepared transactions in prepared transaction list:' for xid in prepared_list.keys(): - print ' ', qls.utils.format_xid(xid) - transaction_record = qls.utils.create_record(qls.jrnl.TransactionRecord.MAGIC_COMMIT, 0, \ - self.tpl.current_journal_file, \ - self.high_rid_counter.get_next(), None, xid, None) + print ' ', qlslibs.utils.format_xid(xid) + transaction_record = qlslibs.utils.create_record(qlslibs.jrnl.TransactionRecord.MAGIC_COMMIT, 0, \ + self.tpl.current_journal_file, \ + self.high_rid_counter.get_next(), None, xid, None) if txn_flag: self.tpl.add_record(transaction_record) print @@ -118,7 +118,7 @@ class EnqueueMap(object): self.enq_map = {} def add(self, journal_file, enq_record, locked_flag): if enq_record.record_id in self.enq_map: - raise qls.err.DuplicateRecordIdError(self.journal.current_file_header, enq_record) + raise qlslibs.err.DuplicateRecordIdError(self.journal.current_file_header, enq_record) self.enq_map[enq_record.record_id] = [journal_file, enq_record, locked_flag] def contains(self, rid): """Return True if the map contains the given rid""" @@ -129,14 +129,14 @@ class EnqueueMap(object): del self.enq_map[deq_record.dequeue_record_id] return enq_list else: - raise qls.err.RecordIdNotFoundError(journal_file.file_header, deq_record) + raise qlslibs.err.RecordIdNotFoundError(journal_file.file_header, deq_record) def get(self, record_id): if record_id in self.enq_map: return self.enq_map[record_id] return None def lock(self, journal_file, dequeue_record): if dequeue_record.dequeue_record_id not in self.enq_map: - raise qls.err.RecordIdNotFoundError(journal_file.file_header, dequeue_record) + raise qlslibs.err.RecordIdNotFoundError(journal_file.file_header, dequeue_record) self.enq_map[dequeue_record.dequeue_record_id][2] = True def report_str(self, _, show_records): """Return a string containing a text report for all records in the map""" @@ -163,9 +163,9 @@ class EnqueueMap(object): if self.enq_map[dequeue_record.dequeue_record_id][2]: self.enq_map[dequeue_record.dequeue_record_id][2] = False else: - raise qls.err.RecordNotLockedError(journal_file.file_header, dequeue_record) + raise qlslibs.err.RecordNotLockedError(journal_file.file_header, dequeue_record) else: - raise qls.err.RecordIdNotFoundError(journal_file.file_header, dequeue_record) + raise qlslibs.err.RecordIdNotFoundError(journal_file.file_header, dequeue_record) class TransactionMap(object): """ @@ -177,7 +177,7 @@ class TransactionMap(object): def abort(self, xid): """Perform an abort operation for the given xid record""" for journal_file, record, _ in self.txn_map[xid]: - if isinstance(record, qls.jrnl.DequeueRecord): + if isinstance(record, qlslibs.jrnl.DequeueRecord): if self.enq_map.contains(record.dequeue_record_id): self.enq_map.unlock(journal_file, record) else: @@ -185,16 +185,16 @@ class TransactionMap(object): del self.txn_map[xid] def add(self, journal_file, record): if record.xid is None: - raise qls.err.NonTransactionalRecordError(journal_file.file_header, record, 'TransactionMap.add()') - if isinstance(record, qls.jrnl.DequeueRecord): + raise qlslibs.err.NonTransactionalRecordError(journal_file.file_header, record, 'TransactionMap.add()') + if isinstance(record, qlslibs.jrnl.DequeueRecord): try: self.enq_map.lock(journal_file, record) - except qls.err.RecordIdNotFoundError: + except qlslibs.err.RecordIdNotFoundError: # Not in emap, look for rid in tmap - should not happen in practice txn_op = self._find_record_id(record.xid, record.dequeue_record_id) if txn_op != None: if txn_op[2]: - raise qls.err.AlreadyLockedError(journal_file.file_header, record) + raise qlslibs.err.AlreadyLockedError(journal_file.file_header, record) txn_op[2] = True if record.xid in self.txn_map: self.txn_map[record.xid].append([journal_file, record, False]) # append to existing list @@ -204,7 +204,7 @@ class TransactionMap(object): """Perform a commit operation for the given xid record""" mismatch_list = [] for journal_file, record, lock in self.txn_map[xid]: - if isinstance(record, qls.jrnl.EnqueueRecord): + if isinstance(record, qlslibs.jrnl.EnqueueRecord): self.enq_map.add(journal_file, record, lock) # Transfer enq to emap else: if self.enq_map.contains(record.dequeue_record_id): @@ -224,8 +224,8 @@ class TransactionMap(object): if transaction_record.magic[-1] == 'a': self.abort(transaction_record.xid) else: - raise qls.err.InvalidRecordTypeError(journal_file.file_header, transaction_record, - 'delete from Transaction Map') + raise qlslibs.err.InvalidRecordTypeError(journal_file.file_header, transaction_record, + 'delete from Transaction Map') def get(self, xid): if xid in self.txn_map: return self.txn_map[xid] @@ -240,7 +240,7 @@ class TransactionMap(object): prepared_list = {} for xid in self.get_xid_list(): for _, record, _ in self.txn_map[xid]: - if isinstance(record, qls.jrnl.EnqueueRecord): + if isinstance(record, qlslibs.jrnl.EnqueueRecord): prepared_list[xid] = None else: prepared_list[xid] = record.is_transaction_complete_commit() @@ -255,7 +255,7 @@ class TransactionMap(object): if show_records: rstr += ':' for xid, op_list in self.txn_map.iteritems(): - rstr += '\n %s containing %d operations:' % (qls.utils.format_xid(xid), len(op_list)) + rstr += '\n %s containing %d operations:' % (qlslibs.utils.format_xid(xid), len(op_list)) for journal_file, record, _ in op_list: rstr += '\n 0x%x:%s' % (journal_file.file_header.file_num, record) else: @@ -332,15 +332,15 @@ class Journal(object): self.num_filler_records_required = None # TODO: Move into JournalFile self.fill_to_offset = None def add_record(self, record): - if isinstance(record, qls.jrnl.EnqueueRecord) or isinstance(record, qls.jrnl.DequeueRecord): + if isinstance(record, qlslibs.jrnl.EnqueueRecord) or isinstance(record, qlslibs.jrnl.DequeueRecord): if record.xid_size > 0: self.txn_map.add(self.current_journal_file, record) else: self.enq_map.add(self.current_journal_file, record, False) - elif isinstance(record, qls.jrnl.TransactionRecord): + elif isinstance(record, qlslibs.jrnl.TransactionRecord): self.txn_map.delete(self.current_journal_file, record) else: - raise qls.err.InvalidRecordTypeError(self.current_journal_file, record, 'add to Journal') + raise qlslibs.err.InvalidRecordTypeError(self.current_journal_file, record, 'add to Journal') def get_enq_map_record(self, rid): return self.enq_map.get(rid) def get_txn_map_record(self, xid): @@ -356,9 +356,9 @@ class Journal(object): while self._get_next_record(high_rid_counter): pass self._check_alignment() - except qls.err.NoMoreFilesInJournalError: + except qlslibs.err.NoMoreFilesInJournalError: print 'No more files in journal' - except qls.err.FirstRecordOffsetMismatchError as err: + except qlslibs.err.FirstRecordOffsetMismatchError as err: print '0x%08x: **** FRO ERROR: queue=\"%s\" fid=0x%x fro actual=0x%08x expected=0x%08x' % \ (err.get_expected_fro(), err.get_queue_name(), err.get_file_number(), err.get_record_offset(), err.get_expected_fro()) @@ -370,19 +370,19 @@ class Journal(object): if xid in prepared_list.keys(): commit_flag = prepared_list[xid] if commit_flag is None: - print ' ', qls.utils.format_xid(xid), '- Assuming commit after prepare' + print ' ', qlslibs.utils.format_xid(xid), '- Assuming commit after prepare' if txn_flag: self.txn_map.commit(xid) elif commit_flag: - print ' ', qls.utils.format_xid(xid), '- Completing interrupted commit operation' + print ' ', qlslibs.utils.format_xid(xid), '- Completing interrupted commit operation' if txn_flag: self.txn_map.commit(xid) else: - print ' ', qls.utils.format_xid(xid), '- Completing interrupted abort operation' + print ' ', qlslibs.utils.format_xid(xid), '- Completing interrupted abort operation' if txn_flag: self.txn_map.abort(xid) else: - print ' ', qls.utils.format_xid(xid), '- Ignoring, not in prepared transaction list' + print ' ', qlslibs.utils.format_xid(xid), '- Ignoring, not in prepared transaction list' if txn_flag: self.txn_map.abort(xid) def report(self, print_stats_flag): @@ -408,25 +408,26 @@ class Journal(object): if len(dir_entry_bits) == 2 and dir_entry_bits[1] == JournalRecoveryManager.JRNL_DIR_NAME: fq_file_name = os.path.join(self.directory, dir_entry) file_handle = open(fq_file_name) - args = qls.utils.load_args(file_handle, qls.jrnl.RecordHeader) - file_hdr = qls.jrnl.FileHeader(*args) - file_hdr.init(file_handle, *qls.utils.load_args(file_handle, qls.jrnl.FileHeader)) + args = qlslibs.utils.load_args(file_handle, qlslibs.jrnl.RecordHeader) + file_hdr = qlslibs.jrnl.FileHeader(*args) + file_hdr.init(file_handle, *qlslibs.utils.load_args(file_handle, qlslibs.jrnl.FileHeader)) if file_hdr.is_header_valid(file_hdr): file_hdr.load(file_handle) if file_hdr.is_valid(): - qls.utils.skip(file_handle, file_hdr.file_header_size_sblks * qls.utils.DEFAULT_SBLK_SIZE) + qlslibs.utils.skip(file_handle, + file_hdr.file_header_size_sblks * qlslibs.utils.DEFAULT_SBLK_SIZE) self.files[file_hdr.file_num] = JournalFile(file_hdr) self.file_num_list = sorted(self.files.keys()) self.file_num_itr = iter(self.file_num_list) def _check_alignment(self): # TODO: Move into JournalFile - remaining_sblks = self.last_record_offset % qls.utils.DEFAULT_SBLK_SIZE + remaining_sblks = self.last_record_offset % qlslibs.utils.DEFAULT_SBLK_SIZE if remaining_sblks == 0: self.num_filler_records_required = 0 else: - self.num_filler_records_required = (qls.utils.DEFAULT_SBLK_SIZE - remaining_sblks) / \ - qls.utils.DEFAULT_DBLK_SIZE + self.num_filler_records_required = (qlslibs.utils.DEFAULT_SBLK_SIZE - remaining_sblks) / \ + qlslibs.utils.DEFAULT_DBLK_SIZE self.fill_to_offset = self.last_record_offset + \ - (self.num_filler_records_required * qls.utils.DEFAULT_DBLK_SIZE) + (self.num_filler_records_required * qlslibs.utils.DEFAULT_DBLK_SIZE) if self.args.show_recs or self.args.show_all_recs: print '0x%x:0x%08x: %d filler records required for DBLK alignment to 0x%08x' % \ (self.current_journal_file.file_header.file_num, self.last_record_offset, @@ -465,27 +466,27 @@ class Journal(object): if not self._check_file(): return False self.last_record_offset = self.current_journal_file.file_header.file_handle.tell() - this_record = qls.utils.load(self.current_journal_file.file_header.file_handle, qls.jrnl.RecordHeader) + this_record = qlslibs.utils.load(self.current_journal_file.file_header.file_handle, qlslibs.jrnl.RecordHeader) if not this_record.is_header_valid(self.current_journal_file.file_header): return False if self.first_rec_flag: if this_record.file_offset != self.current_journal_file.file_header.first_record_offset: - raise qls.err.FirstRecordOffsetMismatchError(self.current_journal_file.file_header, this_record) + raise qlslibs.err.FirstRecordOffsetMismatchError(self.current_journal_file.file_header, this_record) self.first_rec_flag = False self.statistics.total_record_count += 1 start_journal_file = self.current_journal_file - if isinstance(this_record, qls.jrnl.EnqueueRecord): + if isinstance(this_record, qlslibs.jrnl.EnqueueRecord): ok_flag = self._handle_enqueue_record(this_record, start_journal_file) high_rid_counter.check(this_record.record_id) if self.args.show_recs or self.args.show_all_recs: print '0x%x:%s' % (start_journal_file.file_header.file_num, \ this_record.to_string(self.args.show_xids, self.args.show_data)) - elif isinstance(this_record, qls.jrnl.DequeueRecord): + elif isinstance(this_record, qlslibs.jrnl.DequeueRecord): ok_flag = self._handle_dequeue_record(this_record, start_journal_file) high_rid_counter.check(this_record.record_id) if self.args.show_recs or self.args.show_all_recs: print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record.to_string(self.args.show_xids)) - elif isinstance(this_record, qls.jrnl.TransactionRecord): + elif isinstance(this_record, qlslibs.jrnl.TransactionRecord): ok_flag = self._handle_transaction_record(this_record, start_journal_file) high_rid_counter.check(this_record.record_id) if self.args.show_recs or self.args.show_all_recs: @@ -495,7 +496,7 @@ class Journal(object): ok_flag = True if self.args.show_all_recs: print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record) - qls.utils.skip(self.current_journal_file.file_header.file_handle, qls.utils.DEFAULT_DBLK_SIZE) + qlslibs.utils.skip(self.current_journal_file.file_header.file_handle, qlslibs.utils.DEFAULT_DBLK_SIZE) return ok_flag def _handle_enqueue_record(self, enqueue_record, start_journal_file): while enqueue_record.load(self.current_journal_file.file_header.file_handle): @@ -505,7 +506,7 @@ class Journal(object): if not enqueue_record.is_valid(start_journal_file): return False if enqueue_record.is_external() and enqueue_record.data != None: - raise qls.err.ExternalDataError(self.current_journal_file.file_header, enqueue_record) + raise qlslibs.err.ExternalDataError(self.current_journal_file.file_header, enqueue_record) if enqueue_record.is_transient(): self.statistics.transient_record_count += 1 return True @@ -538,7 +539,7 @@ class Journal(object): else: try: self.enq_map.delete(start_journal_file, dequeue_record)[0].decr_enq_cnt(dequeue_record) - except qls.err.RecordIdNotFoundError: + except qlslibs.err.RecordIdNotFoundError: dequeue_record.warnings.append('NOT IN EMAP') self.statistics.dequeue_count += 1 return True @@ -575,7 +576,7 @@ class JournalFile(object): self.enq_cnt += 1 def decr_enq_cnt(self, record): if self.enq_cnt <= self.deq_cnt: - raise qls.err.EnqueueCountUnderflowError(self.file_header, record) + raise qlslibs.err.EnqueueCountUnderflowError(self.file_header, record) self.deq_cnt += 1 def get_enq_cnt(self): return self.enq_cnt - self.deq_cnt diff --git a/tools/src/py/qls/efp.py b/tools/src/py/qlslibs/efp.py index 93b77eea93..0fa27b3519 100644 --- a/tools/src/py/qls/efp.py +++ b/tools/src/py/qlslibs/efp.py @@ -18,14 +18,14 @@ # """ -Module: qls.efp +Module: qlslibs.efp Contains empty file pool (EFP) classes. """ import os import os.path -import qls.err +import qlslibs.err import shutil import uuid @@ -36,7 +36,7 @@ class EfpManager(object): """ def __init__(self, directory, disk_space_required_kb): if not os.path.exists(directory): - raise qls.err.InvalidQlsDirectoryNameError(directory) + raise qlslibs.err.InvalidQlsDirectoryNameError(directory) self.directory = directory self.disk_space_required_kb = disk_space_required_kb self.efp_partitions = [] @@ -72,7 +72,7 @@ class EfpManager(object): efp = self.current_efp_partition.efp_pools[file_size] num_files_needed = num_files - efp.get_tot_file_count() if num_files_needed > 0: - self.current_efp_partition.create_new_efp_files(qls.utils.efp_directory_size(file_size), + self.current_efp_partition.create_new_efp_files(qlslibs.utils.efp_directory_size(file_size), num_files_needed) else: print ' WARNING: Pool %s in partition %s already contains %d files: no action taken' % \ @@ -120,7 +120,7 @@ class EfpManager(object): self.efp_pools[efpl].append(efp_partition.efp_pools[efpl]) self.total_num_files += efp_partition.tot_file_count self.total_cum_file_size_kb += efp_partition.tot_file_size_kb - except qls.err.InvalidPartitionDirectoryNameError: + except qlslibs.err.InvalidPartitionDirectoryNameError: pass def _check_args(self, arg_tup): """ Value check of args. The names of partitions and pools are validated against the discovered instances """ @@ -138,16 +138,16 @@ class EfpManager(object): found = True break if not found: - raise qls.err.PartitionDoesNotExistError(arg_partition) + raise qlslibs.err.PartitionDoesNotExistError(arg_partition) except ValueError: - raise qls.err.InvalidPartitionDirectoryNameError(arg_partition) + raise qlslibs.err.InvalidPartitionDirectoryNameError(arg_partition) if self.current_efp_partition is not None: pool_list = self.current_efp_partition.efp_pools.keys() efp_directory_name = EmptyFilePool.get_directory_name(int(arg_file_size)) if arg_add and efp_directory_name in pool_list: - raise qls.err.PoolDirectoryAlreadyExistsError(efp_directory_name) + raise qlslibs.err.PoolDirectoryAlreadyExistsError(efp_directory_name) if (arg_remove or arg_freshen) and efp_directory_name not in pool_list: - raise qls.err.PoolDirectoryDoesNotExistError(efp_directory_name) + raise qlslibs.err.PoolDirectoryDoesNotExistError(efp_directory_name) class EfpPartition(object): """ @@ -197,17 +197,18 @@ class EfpPartition(object): self.efp_pools[dir_entry] = efp def _validate_partition_directory(self, disk_space_required_kb): if os.path.basename(self.directory)[0] is not EfpPartition.PTN_DIR_PREFIX: - raise qls.err.InvalidPartitionDirectoryNameError(self.directory) + raise qlslibs.err.InvalidPartitionDirectoryNameError(self.directory) try: self.partition_number = int(os.path.basename(self.directory)[1:]) except ValueError: - raise qls.err.InvalidPartitionDirectoryNameError(self.directory) - if not qls.utils.has_write_permission(self.directory): - raise qls.err.WritePermissionError(self.directory) + raise qlslibs.err.InvalidPartitionDirectoryNameError(self.directory) + if not qlslibs.utils.has_write_permission(self.directory): + raise qlslibs.err.WritePermissionError(self.directory) if disk_space_required_kb is not None: - space_avail = qls.utils.get_avail_disk_space(self.directory) + space_avail = qlslibs.utils.get_avail_disk_space(self.directory) if space_avail < (disk_space_required_kb * 1024): - raise qls.err.InsufficientSpaceOnDiskError(self.directory, space_avail, disk_space_required_kb * 1024) + raise qlslibs.err.InsufficientSpaceOnDiskError(self.directory, space_avail, + disk_space_required_kb * 1024) class EmptyFilePool(object): """ @@ -258,14 +259,15 @@ class EmptyFilePool(object): def _create_new_efp_file(self): """ Create a single new empty journal file of the prescribed size for this EFP """ file_name = str(uuid.uuid4()) + EmptyFilePool.EFP_JRNL_EXTENTION - file_header = qls.jrnl.FileHeader(0, qls.jrnl.FileHeader.MAGIC, qls.utils.DEFAULT_RECORD_VERSION, 0, 0, 0) - file_header.init(None, None, qls.utils.DEFAULT_HEADER_SIZE_SBLKS, self.partition_number, self.data_size_kb, + file_header = qlslibs.jrnl.FileHeader(0, qlslibs.jrnl.FileHeader.MAGIC, qlslibs.utils.DEFAULT_RECORD_VERSION, + 0, 0, 0) + file_header.init(None, None, qlslibs.utils.DEFAULT_HEADER_SIZE_SBLKS, self.partition_number, self.data_size_kb, 0, 0, 0, 0, 0) efh = file_header.encode() efh_bytes = len(efh) file_handle = open(os.path.join(self.directory, file_name), 'wb') file_handle.write(efh) - file_handle.write('\xff' * (qls.utils.DEFAULT_SBLK_SIZE - efh_bytes)) + file_handle.write('\xff' * (qlslibs.utils.DEFAULT_SBLK_SIZE - efh_bytes)) file_handle.write('\x00' * (int(self.data_size_kb) * 1024)) file_handle.close() fqfn = os.path.join(self.directory, file_name) @@ -273,22 +275,22 @@ class EmptyFilePool(object): return os.path.getsize(fqfn) def _validate_efp_directory(self): if self.base_dir_name[-1] is not EmptyFilePool.EFP_DIR_SUFFIX: - raise qls.err.InvalidEfpDirectoryNameError(self.directory) + raise qlslibs.err.InvalidEfpDirectoryNameError(self.directory) try: self.data_size_kb = int(os.path.basename(self.base_dir_name)[:-1]) except ValueError: - raise qls.err.InvalidEfpDirectoryNameError(self.directory) + raise qlslibs.err.InvalidEfpDirectoryNameError(self.directory) def _validate_efp_file(self, efp_file): file_size = os.path.getsize(efp_file) - expected_file_size = (self.data_size_kb * 1024) + qls.utils.DEFAULT_SBLK_SIZE + expected_file_size = (self.data_size_kb * 1024) + qlslibs.utils.DEFAULT_SBLK_SIZE if file_size != expected_file_size: print 'WARNING: File %s not of correct size (size=%d, expected=%d): Ignoring' % (efp_file, file_size, expected_file_size) return False file_handle = open(efp_file) - args = qls.utils.load_args(file_handle, qls.jrnl.RecordHeader) - file_hdr = qls.jrnl.FileHeader(*args) - file_hdr.init(file_handle, *qls.utils.load_args(file_handle, qls.jrnl.FileHeader)) + args = qlslibs.utils.load_args(file_handle, qlslibs.jrnl.RecordHeader) + file_hdr = qlslibs.jrnl.FileHeader(*args) + file_hdr.init(file_handle, *qlslibs.utils.load_args(file_handle, qlslibs.jrnl.FileHeader)) if not file_hdr.is_header_valid(file_hdr): file_handle.close() return False diff --git a/tools/src/py/qls/err.py b/tools/src/py/qlslibs/err.py index bceaf041c9..f47632ce6a 100644 --- a/tools/src/py/qls/err.py +++ b/tools/src/py/qlslibs/err.py @@ -18,7 +18,7 @@ # """ -Module: qls.err +Module: qlslibs.err Contains error classes. """ diff --git a/tools/src/py/qls/jrnl.py b/tools/src/py/qlslibs/jrnl.py index e10e5ced8b..555cf6a8ae 100644 --- a/tools/src/py/qls/jrnl.py +++ b/tools/src/py/qlslibs/jrnl.py @@ -18,13 +18,13 @@ # """ -Module: qls.jrnl +Module: qlslibs.jrnl Contains journal record classes. """ -import qls.err -import qls.utils +import qlslibs.err +import qlslibs.utils import string import struct import time @@ -58,8 +58,8 @@ class RecordHeader(object): if self.magic[:3] != 'QLS' or self.magic[3] not in ['a', 'c', 'd', 'e', 'f', 'x']: return False if self.magic[-1] != 'x': - if self.version != qls.utils.DEFAULT_RECORD_VERSION: - raise qls.err.InvalidRecordVersionError(file_header, self, qls.utils.DEFAULT_RECORD_VERSION) + if self.version != qlslibs.utils.DEFAULT_RECORD_VERSION: + raise qlslibs.err.InvalidRecordVersionError(file_header, self, qlslibs.utils.DEFAULT_RECORD_VERSION) if self.serial != file_header.serial: return False return True @@ -106,17 +106,17 @@ class RecordTail(object): if self.valid_flag is None: if not self.complete: return False - self.valid_flag = qls.utils.inv_str(self.xmagic) == record.magic and \ + self.valid_flag = qlslibs.utils.inv_str(self.xmagic) == record.magic and \ self.serial == record.serial and \ self.record_id == record.record_id and \ - qls.utils.adler32(record.checksum_encode()) == self.checksum + qlslibs.utils.adler32(record.checksum_encode()) == self.checksum return self.valid_flag def to_string(self): """Return a string representation of the this RecordTail instance""" if self.valid_flag is not None: if not self.valid_flag: return '[INVALID RECORD TAIL]' - magic = qls.utils.inv_str(self.xmagic) + magic = qlslibs.utils.inv_str(self.xmagic) magic_char = magic[-1].upper() if magic[-1] in string.printable else '?' return '[%c cs=0x%08x rid=0x%x]' % (magic_char, self.checksum, self.record_id) def __str__(self): @@ -150,7 +150,7 @@ class FileHeader(RecordHeader): self.queue_name_len) + self.queue_name def get_file_size(self): """Sum of file header size and data size""" - return (self.file_header_size_sblks * qls.utils.DEFAULT_SBLK_SIZE) + (self.efp_data_size_kb * 1024) + return (self.file_header_size_sblks * qlslibs.utils.DEFAULT_SBLK_SIZE) + (self.efp_data_size_kb * 1024) def load(self, file_handle): self.queue_name = file_handle.read(self.queue_name_len) def is_end_of_file(self): @@ -225,13 +225,13 @@ class EnqueueRecord(RecordHeader): return True def load(self, file_handle): """Return True when load is incomplete and must be called again with new file handle""" - self.xid, self.xid_complete = qls.utils.load_data(file_handle, self.xid, self.xid_size) + self.xid, self.xid_complete = qlslibs.utils.load_data(file_handle, self.xid, self.xid_size) if not self.xid_complete: return True if self.is_external(): self.data_complete = True else: - self.data, self.data_complete = qls.utils.load_data(file_handle, self.data, self.data_size) + self.data, self.data_complete = qlslibs.utils.load_data(file_handle, self.data, self.data_size) if not self.data_complete: return True if self.xid_size > 0 or self.data_size > 0: @@ -254,8 +254,8 @@ class EnqueueRecord(RecordHeader): else: record_tail_str = self.record_tail.to_string() return '%s %s %s %s %s %s' % (self.to_rh_string(), - qls.utils.format_xid(self.xid, self.xid_size, show_xid_flag), - qls.utils.format_data(self.data, self.data_size, show_data_flag), + qlslibs.utils.format_xid(self.xid, self.xid_size, show_xid_flag), + qlslibs.utils.format_data(self.data, self.data_size, show_data_flag), record_tail_str, self._print_flags(), self._get_warnings()) def _print_flags(self): """Utility function to decode the flags field in the header and print a string representation""" @@ -305,7 +305,7 @@ class DequeueRecord(RecordHeader): return True def load(self, file_handle): """Return True when load is incomplete and must be called again with new file handle""" - self.xid, self.xid_complete = qls.utils.load_data(file_handle, self.xid, self.xid_size) + self.xid, self.xid_complete = qlslibs.utils.load_data(file_handle, self.xid, self.xid_size) if not self.xid_complete: return True if self.xid_size > 0: @@ -329,7 +329,7 @@ class DequeueRecord(RecordHeader): else: record_tail_str = self.record_tail.to_string() return '%s drid=0x%x %s %s %s %s' % (self.to_rh_string(), self.dequeue_record_id, - qls.utils.format_xid(self.xid, self.xid_size, show_xid_flag), + qlslibs.utils.format_xid(self.xid, self.xid_size, show_xid_flag), record_tail_str, self._print_flags(), self._get_warnings()) def _print_flags(self): """Utility function to decode the flags field in the header and print a string representation""" @@ -366,7 +366,7 @@ class TransactionRecord(RecordHeader): return True def load(self, file_handle): """Return True when load is incomplete and must be called again with new file handle""" - self.xid, self.xid_complete = qls.utils.load_data(file_handle, self.xid, self.xid_size) + self.xid, self.xid_complete = qlslibs.utils.load_data(file_handle, self.xid, self.xid_size) if not self.xid_complete: return True if self.xid_size > 0: @@ -388,7 +388,7 @@ class TransactionRecord(RecordHeader): else: record_tail_str = self.record_tail.to_string() return '%s %s %s %s' % (self.to_rh_string(), - qls.utils.format_xid(self.xid, self.xid_size, show_xid_flag), + qlslibs.utils.format_xid(self.xid, self.xid_size, show_xid_flag), record_tail_str, self._get_warnings()) def __str__(self): """Return a string representation of the this TransactionRecord instance""" diff --git a/tools/src/py/qls/utils.py b/tools/src/py/qlslibs/utils.py index 758dc446c0..c32f8c7abb 100644 --- a/tools/src/py/qls/utils.py +++ b/tools/src/py/qlslibs/utils.py @@ -18,13 +18,13 @@ # """ -Module: qls.utils +Module: qlslibs.utils Contains helper functions for qpid_qls_analyze. """ import os -import qls.jrnl +import qlslibs.jrnl import stat import string import struct @@ -43,18 +43,18 @@ def adler32(data): def create_record(magic, uflags, journal_file, record_id, dequeue_record_id, xid, data): """Helper function to construct a record with xid, data (where applicable) and consistent tail with checksum""" - record_class = qls.jrnl.CLASSES.get(magic[-1]) + record_class = qlslibs.jrnl.CLASSES.get(magic[-1]) record = record_class(0, magic, DEFAULT_RECORD_VERSION, uflags, journal_file.file_header.serial, record_id) xid_length = len(xid) if xid is not None else 0 - if isinstance(record, qls.jrnl.EnqueueRecord): + if isinstance(record, qlslibs.jrnl.EnqueueRecord): data_length = len(data) if data is not None else 0 record.init(None, xid_length, data_length) - elif isinstance(record, qls.jrnl.DequeueRecord): + elif isinstance(record, qlslibs.jrnl.DequeueRecord): record.init(None, dequeue_record_id, xid_length) - elif isinstance(record, qls.jrnl.TransactionRecord): + elif isinstance(record, qlslibs.jrnl.TransactionRecord): record.init(None, xid_length) else: - raise qls.err.InvalidClassError(record.__class__.__name__) + raise qlslibs.err.InvalidClassError(record.__class__.__name__) if xid is not None: record.xid = xid record.xid_complete = True @@ -75,11 +75,11 @@ def efp_directory_size(directory_name): def format_data(data, data_size=None, show_data_flag=True): """Format binary data for printing""" - return _format_binary(data, data_size, show_data_flag, 'data', qls.err.DataSizeError, False) + return _format_binary(data, data_size, show_data_flag, 'data', qlslibs.err.DataSizeError, False) def format_xid(xid, xid_size=None, show_xid_flag=True): """Format binary XID for printing""" - return _format_binary(xid, xid_size, show_xid_flag, 'xid', qls.err.XidSizeError, True) + return _format_binary(xid, xid_size, show_xid_flag, 'xid', qlslibs.err.XidSizeError, True) def get_avail_disk_space(path): df_proc = subprocess.Popen(["df", path], stdout=subprocess.PIPE) @@ -112,7 +112,7 @@ def load_args(file_handle, klass): foffs = file_handle.tell(), fbin = file_handle.read(size) if len(fbin) != size: - raise qls.err.UnexpectedEndOfFileError(len(fbin), size, foffs, file_handle.name) + raise qlslibs.err.UnexpectedEndOfFileError(len(fbin), size, foffs, file_handle.name) return foffs + struct.unpack(klass.FORMAT, fbin) def load_data(file_handle, element, element_size): @@ -179,7 +179,7 @@ def _is_printable(in_str): return True def _mk_record_tail(record): - record_tail = qls.jrnl.RecordTail(None) + record_tail = qlslibs.jrnl.RecordTail(None) record_tail.xmagic = inv_str(record.magic) record_tail.checksum = adler32(record.checksum_encode()) record_tail.serial = record.serial diff --git a/tools/src/py/qpid_qls_analyze.py b/tools/src/py/qpid-qls-analyze index 1b2655896c..5c7d04b4c0 100755 --- a/tools/src/py/qpid_qls_analyze.py +++ b/tools/src/py/qpid-qls-analyze @@ -23,15 +23,24 @@ qpid-qls-analyze Reads and analyzes a Qpid Linear Store (QLS) store directory. """ +import os.path +import sys + +default = os.path.normpath('/usr/share/qpid-tools') +home = os.environ.get('QPID_TOOLS_HOME', default) +sys.path.append(os.path.join(home,'python')) + import argparse import os -import os.path -import qls.anal -import qls.efp +import qlslibs.anal +import qlslibs.efp class QlsAnalyzerArgParser(argparse.ArgumentParser): + """ + Class to handle command-line arguments. + """ def __init__(self): - argparse.ArgumentParser.__init__(self, description = 'Qpid Linear Store Analyzer', prog = 'qpid-qls-analyze') + argparse.ArgumentParser.__init__(self, description='Qpid Linear Store Analyzer', prog='qpid-qls-analyze') self.add_argument('qls_dir', metavar='DIR', help='Qpid Linear Store (QLS) directory to be analyzed') self.add_argument('--efp', action='store_true', @@ -68,18 +77,21 @@ class QqpdLinearStoreAnalyzer(object): self.args = None self._process_args() self.qls_dir = os.path.abspath(self.args.qls_dir) - self.efp_manager = qls.efp.EfpManager(self.qls_dir, None) - self.jrnl_recovery_mgr = qls.anal.JournalRecoveryManager(self.qls_dir, self.args) + self.efp_manager = qlslibs.efp.EfpManager(self.qls_dir, None) + self.jrnl_recovery_mgr = qlslibs.anal.JournalRecoveryManager(self.qls_dir, self.args) def _process_args(self): + """ Create arg parser and process args """ parser = QlsAnalyzerArgParser() self.args = parser.parse_args() if not os.path.exists(self.args.qls_dir): parser.error('Journal path "%s" does not exist' % self.args.qls_dir) def report(self): + """ Create a report on the linear store previously analyzed using analyze() """ if self.args.efp: self.efp_manager.report() self.jrnl_recovery_mgr.report(self.args.stats) def run(self): + """ Run the analyzer, which reads and analyzes the linear store """ if self.args.efp: self.efp_manager.run(None) self.jrnl_recovery_mgr.run() |