diff options
Diffstat (limited to 'qpid/tools/src/py/qlslibs')
-rw-r--r-- | qpid/tools/src/py/qlslibs/__init__.py | 19 | ||||
-rw-r--r-- | qpid/tools/src/py/qlslibs/analyze.py | 599 | ||||
-rw-r--r-- | qpid/tools/src/py/qlslibs/efp.py | 327 | ||||
-rw-r--r-- | qpid/tools/src/py/qlslibs/err.py | 261 | ||||
-rw-r--r-- | qpid/tools/src/py/qlslibs/jrnl.py | 412 | ||||
-rw-r--r-- | qpid/tools/src/py/qlslibs/utils.py | 206 |
6 files changed, 1824 insertions, 0 deletions
diff --git a/qpid/tools/src/py/qlslibs/__init__.py b/qpid/tools/src/py/qlslibs/__init__.py new file mode 100644 index 0000000000..d8a500d9d8 --- /dev/null +++ b/qpid/tools/src/py/qlslibs/__init__.py @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + diff --git a/qpid/tools/src/py/qlslibs/analyze.py b/qpid/tools/src/py/qlslibs/analyze.py new file mode 100644 index 0000000000..a67e17e426 --- /dev/null +++ b/qpid/tools/src/py/qlslibs/analyze.py @@ -0,0 +1,599 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +Module: qlslibs.analyze + +Classes for recovery and analysis of a Qpid Linear Store (QLS). +""" + +import os.path +import qlslibs.err +import qlslibs.jrnl +import qlslibs.utils + +class HighCounter(object): + def __init__(self): + self.num = 0 + def check(self, num): + if self.num < num: + self.num = num + def get(self): + return self.num + def get_next(self): + self.num += 1 + return self.num + +class JournalRecoveryManager(object): + TPL_DIR_NAME = 'tpl2' + JRNL_DIR_NAME = 'jrnl2' + def __init__(self, directory, args): + if not os.path.exists(directory): + raise qlslibs.err.InvalidQlsDirectoryNameError(directory) + self.directory = directory + self.args = args + self.tpl = None + self.journals = {} + self.high_rid_counter = HighCounter() + self.prepared_list = None + def report(self, print_stats_flag): + self._reconcile_transactions(self.prepared_list, self.args.txn) + if self.tpl is not None: + self.tpl.report(print_stats_flag, self.args.show_recovered_recs) + for queue_name in sorted(self.journals.keys()): + self.journals[queue_name].report(print_stats_flag, self.args.show_recovered_recs) + def run(self): + tpl_dir = os.path.join(self.directory, JournalRecoveryManager.TPL_DIR_NAME) + if os.path.exists(tpl_dir): + self.tpl = Journal(tpl_dir, None, self.args) + self.tpl.recover(self.high_rid_counter) + if self.args.show_recovery_recs or self.args.show_all_recs: + print + jrnl_dir = os.path.join(self.directory, JournalRecoveryManager.JRNL_DIR_NAME) + self.prepared_list = self.tpl.txn_map.get_prepared_list() if self.tpl is not None else {} + if os.path.exists(jrnl_dir): + for dir_entry in sorted(os.listdir(jrnl_dir)): + jrnl = Journal(os.path.join(jrnl_dir, dir_entry), self.prepared_list, self.args) + jrnl.recover(self.high_rid_counter) + self.journals[jrnl.get_queue_name()] = jrnl + if self.args.show_recovery_recs or self.args.show_all_recs: + print + print + def _reconcile_transactions(self, prepared_list, txn_flag): + print 'Transaction reconciliation report:' + print '==================================' + print len(prepared_list), 'open transaction(s) found in prepared transaction list:' + for xid in prepared_list.keys(): + commit_flag = prepared_list[xid] + if commit_flag is None: + status = '[Prepared, neither committed nor aborted - assuming commit]' + elif commit_flag: + status = '[Prepared, but interrupted during commit phase]' + else: + status = '[Prepared, but interrupted during abort phase]' + print ' ', qlslibs.utils.format_xid(xid), status + if prepared_list[xid] is None: # Prepared, but not committed or aborted + enqueue_record = self.tpl.get_txn_map_record(xid)[0][1] + dequeue_record = qlslibs.utils.create_record(qlslibs.jrnl.DequeueRecord.MAGIC, \ + qlslibs.jrnl.DequeueRecord.TXN_COMPLETE_COMMIT_FLAG, \ + self.tpl.current_journal_file, \ + self.high_rid_counter.get_next(), \ + enqueue_record.record_id, xid, None) + if txn_flag: + self.tpl.add_record(dequeue_record) + for queue_name in sorted(self.journals.keys()): + self.journals[queue_name].reconcile_transactions(prepared_list, txn_flag) + if len(prepared_list) > 0: + print 'Completing prepared transactions in prepared transaction list:' + for xid in prepared_list.keys(): + print ' ', qlslibs.utils.format_xid(xid) + transaction_record = qlslibs.utils.create_record(qlslibs.jrnl.TransactionRecord.MAGIC_COMMIT, 0, \ + self.tpl.current_journal_file, \ + self.high_rid_counter.get_next(), None, xid, None) + if txn_flag: + self.tpl.add_record(transaction_record) + print + +class EnqueueMap(object): + """ + Map of enqueued records in a QLS journal + """ + def __init__(self, journal): + self.journal = journal + self.enq_map = {} + def add(self, journal_file, enq_record, locked_flag): + if enq_record.record_id in self.enq_map: + raise qlslibs.err.DuplicateRecordIdError(self.journal.current_file_header, enq_record) + self.enq_map[enq_record.record_id] = [journal_file, enq_record, locked_flag] + def contains(self, rid): + """Return True if the map contains the given rid""" + return rid in self.enq_map + def delete(self, journal_file, deq_record): + if deq_record.dequeue_record_id in self.enq_map: + enq_list = self.enq_map[deq_record.dequeue_record_id] + del self.enq_map[deq_record.dequeue_record_id] + return enq_list + else: + raise qlslibs.err.RecordIdNotFoundError(journal_file.file_header, deq_record) + def get(self, record_id): + if record_id in self.enq_map: + return self.enq_map[record_id] + return None + def lock(self, journal_file, dequeue_record): + if dequeue_record.dequeue_record_id not in self.enq_map: + raise qlslibs.err.RecordIdNotFoundError(journal_file.file_header, dequeue_record) + self.enq_map[dequeue_record.dequeue_record_id][2] = True + def report_str(self, _, show_records): + """Return a string containing a text report for all records in the map""" + if len(self.enq_map) == 0: + return 'No enqueued records found.' + rstr = '%d enqueued records found' % len(self.enq_map) + if show_records: + rstr += ":" + rid_list = self.enq_map.keys() + rid_list.sort() + for rid in rid_list: + journal_file, record, locked_flag = self.enq_map[rid] + if locked_flag: + lock_str = '[LOCKED]' + else: + lock_str = '' + rstr += '\n 0x%x:%s %s' % (journal_file.file_header.file_num, record, lock_str) + else: + rstr += '.' + return rstr + def unlock(self, journal_file, dequeue_record): + """Set the transaction lock for a given record_id to False""" + if dequeue_record.dequeue_record_id in self.enq_map: + if self.enq_map[dequeue_record.dequeue_record_id][2]: + self.enq_map[dequeue_record.dequeue_record_id][2] = False + else: + raise qlslibs.err.RecordNotLockedError(journal_file.file_header, dequeue_record) + else: + raise qlslibs.err.RecordIdNotFoundError(journal_file.file_header, dequeue_record) + +class TransactionMap(object): + """ + Map of open transactions used while recovering a QLS journal + """ + def __init__(self, enq_map): + self.txn_map = {} + self.enq_map = enq_map + def abort(self, xid): + """Perform an abort operation for the given xid record""" + for journal_file, record, _ in self.txn_map[xid]: + if isinstance(record, qlslibs.jrnl.DequeueRecord): + if self.enq_map.contains(record.dequeue_record_id): + self.enq_map.unlock(journal_file, record) + else: + journal_file.decr_enq_cnt(record) + del self.txn_map[xid] + def add(self, journal_file, record): + if record.xid is None: + raise qlslibs.err.NonTransactionalRecordError(journal_file.file_header, record, 'TransactionMap.add()') + if isinstance(record, qlslibs.jrnl.DequeueRecord): + try: + self.enq_map.lock(journal_file, record) + except qlslibs.err.RecordIdNotFoundError: + # Not in emap, look for rid in tmap - should not happen in practice + txn_op = self._find_record_id(record.xid, record.dequeue_record_id) + if txn_op != None: + if txn_op[2]: + raise qlslibs.err.AlreadyLockedError(journal_file.file_header, record) + txn_op[2] = True + if record.xid in self.txn_map: + self.txn_map[record.xid].append([journal_file, record, False]) # append to existing list + else: + self.txn_map[record.xid] = [[journal_file, record, False]] # create new list + def commit(self, xid): + """Perform a commit operation for the given xid record""" + mismatch_list = [] + for journal_file, record, lock in self.txn_map[xid]: + if isinstance(record, qlslibs.jrnl.EnqueueRecord): + self.enq_map.add(journal_file, record, lock) # Transfer enq to emap + else: + if self.enq_map.contains(record.dequeue_record_id): + self.enq_map.unlock(journal_file, record) + self.enq_map.delete(journal_file, record)[0].decr_enq_cnt(record) + else: + mismatch_list.append('0x%x' % record.dequeue_record_id) + del self.txn_map[xid] + return mismatch_list + def contains(self, xid): + """Return True if the xid exists in the map; False otherwise""" + return xid in self.txn_map + def delete(self, journal_file, transaction_record): + """Remove a transaction record from the map using either a commit or abort header""" + if transaction_record.magic[-1] == 'c': + return self.commit(transaction_record.xid) + if transaction_record.magic[-1] == 'a': + self.abort(transaction_record.xid) + else: + raise qlslibs.err.InvalidRecordTypeError(journal_file.file_header, transaction_record, + 'delete from Transaction Map') + def get(self, xid): + if xid in self.txn_map: + return self.txn_map[xid] + return None + def get_prepared_list(self): + """ + Prepared list is a map of xid(key) to one of None, True or False. These represent respectively: + None: prepared, but neither committed or aborted (interrupted before commit or abort) + False: prepared and aborted (interrupted before abort complete) + True: prepared and committed (interrupted before commit complete) + """ + prepared_list = {} + for xid in self.get_xid_list(): + for _, record, _ in self.txn_map[xid]: + if isinstance(record, qlslibs.jrnl.EnqueueRecord): + prepared_list[xid] = None + else: + prepared_list[xid] = record.is_transaction_complete_commit() + return prepared_list + def get_xid_list(self): + return self.txn_map.keys() + def report_str(self, _, show_records): + """Return a string containing a text report for all records in the map""" + if len(self.txn_map) == 0: + return 'No outstanding transactions found.' + rstr = '%d outstanding transaction(s)' % len(self.txn_map) + if show_records: + rstr += ':' + for xid, op_list in self.txn_map.iteritems(): + rstr += '\n %s containing %d operations:' % (qlslibs.utils.format_xid(xid), len(op_list)) + for journal_file, record, _ in op_list: + rstr += '\n 0x%x:%s' % (journal_file.file_header.file_num, record) + else: + rstr += '.' + return rstr + def _find_record_id(self, xid, record_id): + """ Search for and return map list with supplied rid.""" + if xid in self.txn_map: + for txn_op in self.txn_map[xid]: + if txn_op[1].record_id == record_id: + return txn_op + for this_xid in self.txn_map.iterkeys(): + for txn_op in self.txn_map[this_xid]: + if txn_op[1].record_id == record_id: + return txn_op + return None + +class JournalStatistics(object): + """Journal statistics""" + def __init__(self): + self.total_record_count = 0 + self.transient_record_count = 0 + self.filler_record_count = 0 + self.enqueue_count = 0 + self.dequeue_count = 0 + self.transaction_record_count = 0 + self.transaction_enqueue_count = 0 + self.transaction_dequeue_count = 0 + self.transaction_commit_count = 0 + self.transaction_abort_count = 0 + self.transaction_operation_count = 0 + def __str__(self): + fstr = 'Total record count: %d\n' + \ + 'Transient record count: %d\n' + \ + 'Filler_record_count: %d\n' + \ + 'Enqueue_count: %d\n' + \ + 'Dequeue_count: %d\n' + \ + 'Transaction_record_count: %d\n' + \ + 'Transaction_enqueue_count: %d\n' + \ + 'Transaction_dequeue_count: %d\n' + \ + 'Transaction_commit_count: %d\n' + \ + 'Transaction_abort_count: %d\n' + \ + 'Transaction_operation_count: %d\n' + return fstr % (self.total_record_count, + self.transient_record_count, + self.filler_record_count, + self.enqueue_count, + self.dequeue_count, + self.transaction_record_count, + self.transaction_enqueue_count, + self.transaction_dequeue_count, + self.transaction_commit_count, + self.transaction_abort_count, + self.transaction_operation_count) + +class Journal(object): + """ + Instance of a Qpid Linear Store (QLS) journal. + """ + JRNL_SUFFIX = 'jrnl' + def __init__(self, directory, xid_prepared_list, args): + self.directory = directory + self.queue_name = os.path.basename(directory) + self.files = {} + self.file_num_list = None + self.file_num_itr = None + self.enq_map = EnqueueMap(self) + self.txn_map = TransactionMap(self.enq_map) + self.current_journal_file = None + self.first_rec_flag = None + self.statistics = JournalStatistics() + self.xid_prepared_list = xid_prepared_list # This is None for the TPL instance only + self.args = args + self.last_record_offset = None # TODO: Move into JournalFile + self.num_filler_records_required = None # TODO: Move into JournalFile + self.fill_to_offset = None + def add_record(self, record): + if isinstance(record, qlslibs.jrnl.EnqueueRecord) or isinstance(record, qlslibs.jrnl.DequeueRecord): + if record.xid_size > 0: + self.txn_map.add(self.current_journal_file, record) + else: + self.enq_map.add(self.current_journal_file, record, False) + elif isinstance(record, qlslibs.jrnl.TransactionRecord): + self.txn_map.delete(self.current_journal_file, record) + else: + raise qlslibs.err.InvalidRecordTypeError(self.current_journal_file, record, 'add to Journal') + def get_enq_map_record(self, rid): + return self.enq_map.get(rid) + def get_txn_map_record(self, xid): + return self.txn_map.get(xid) + def get_outstanding_txn_list(self): + return self.txn_map.get_xid_list() + def get_queue_name(self): + return self.queue_name + def recover(self, high_rid_counter): + print 'Recovering %s...' % self.queue_name, + self._analyze_files() + try: + while self._get_next_record(high_rid_counter): + pass + self._check_alignment() + except qlslibs.err.NoMoreFilesInJournalError: + print 'No more files in journal' + except qlslibs.err.FirstRecordOffsetMismatchError as err: + print '0x%08x: **** FRO ERROR: queue=\"%s\" fid=0x%x fro actual=0x%08x expected=0x%08x' % \ + (err.get_expected_fro(), err.get_queue_name(), err.get_file_number(), err.get_record_offset(), + err.get_expected_fro()) + print 'done' + def reconcile_transactions(self, prepared_list, txn_flag): + xid_list = self.txn_map.get_xid_list() + if len(xid_list) > 0: + print self.queue_name, 'contains', len(xid_list), 'open transaction(s):' + for xid in xid_list: + if xid in prepared_list.keys(): + commit_flag = prepared_list[xid] + if commit_flag is None: + print ' ', qlslibs.utils.format_xid(xid), '- Assuming commit after prepare' + if txn_flag: + self.txn_map.commit(xid) + elif commit_flag: + print ' ', qlslibs.utils.format_xid(xid), '- Completing interrupted commit operation' + if txn_flag: + self.txn_map.commit(xid) + else: + print ' ', qlslibs.utils.format_xid(xid), '- Completing interrupted abort operation' + if txn_flag: + self.txn_map.abort(xid) + else: + print ' ', qlslibs.utils.format_xid(xid), '- Ignoring, not in prepared transaction list' + if txn_flag: + self.txn_map.abort(xid) + def report(self, print_stats_flag, show_recovered_records): + print 'Journal "%s":' % self.queue_name + print '=' * (11 + len(self.queue_name)) + if print_stats_flag: + print str(self.statistics) + print self.enq_map.report_str(True, show_recovered_records) + print self.txn_map.report_str(True, show_recovered_records) + JournalFile.report_header() + for file_num in sorted(self.files.keys()): + self.files[file_num].report() + #TODO: move this to JournalFile, append to file info + if self.num_filler_records_required is not None and self.fill_to_offset is not None: + print '0x%x:0x%08x: %d filler records required for DBLK alignment to 0x%08x' % \ + (self.current_journal_file.file_header.file_num, self.last_record_offset, + self.num_filler_records_required, self.fill_to_offset) + print + #--- protected functions --- + def _analyze_files(self): + for dir_entry in os.listdir(self.directory): + dir_entry_bits = dir_entry.split('.') + if len(dir_entry_bits) == 2 and dir_entry_bits[1] == Journal.JRNL_SUFFIX: + fq_file_name = os.path.join(self.directory, dir_entry) + file_handle = open(fq_file_name) + args = qlslibs.utils.load_args(file_handle, qlslibs.jrnl.RecordHeader) + file_hdr = qlslibs.jrnl.FileHeader(*args) + file_hdr.init(file_handle, *qlslibs.utils.load_args(file_handle, qlslibs.jrnl.FileHeader)) + if file_hdr.is_header_valid(file_hdr): + file_hdr.load(file_handle) + if file_hdr.is_valid(False): + qlslibs.utils.skip(file_handle, + file_hdr.file_header_size_sblks * qlslibs.utils.DEFAULT_SBLK_SIZE) + self.files[file_hdr.file_num] = JournalFile(file_hdr) + self.file_num_list = sorted(self.files.keys()) + self.file_num_itr = iter(self.file_num_list) + def _check_alignment(self): # TODO: Move into JournalFile + if self.last_record_offset is None: # Empty file, _check_file() never run + return + remaining_sblks = self.last_record_offset % qlslibs.utils.DEFAULT_SBLK_SIZE + if remaining_sblks == 0: + self.num_filler_records_required = 0 + else: + self.num_filler_records_required = (qlslibs.utils.DEFAULT_SBLK_SIZE - remaining_sblks) / \ + qlslibs.utils.DEFAULT_DBLK_SIZE + self.fill_to_offset = self.last_record_offset + \ + (self.num_filler_records_required * qlslibs.utils.DEFAULT_DBLK_SIZE) + if self.args.show_recovery_recs or self.args.show_all_recs: + print '0x%x:0x%08x: %d filler records required for DBLK alignment to 0x%08x' % \ + (self.current_journal_file.file_header.file_num, self.last_record_offset, + self.num_filler_records_required, self.fill_to_offset) + def _check_file(self): + if self.current_journal_file is not None: + if not self.current_journal_file.file_header.is_end_of_file(): + return True + if self.current_journal_file.file_header.is_end_of_file(): + self.last_record_offset = self.current_journal_file.file_header.file_handle.tell() + if not self._get_next_file(): + return False + fhdr = self.current_journal_file.file_header + fhdr.file_handle.seek(fhdr.first_record_offset) + return True + def _get_next_file(self): + if self.current_journal_file is not None: + file_handle = self.current_journal_file.file_header.file_handle + if not file_handle.closed: # sanity check, should not be necessary + file_handle.close() + file_num = 0 + try: + while file_num == 0: + file_num = self.file_num_itr.next() + except StopIteration: + pass + if file_num == 0: + return False + self.current_journal_file = self.files[file_num] + self.first_rec_flag = True + if self.args.show_recovery_recs or self.args.show_all_recs: + file_header = self.current_journal_file.file_header + print '0x%x:%s' % (file_header.file_num, file_header.to_string()) + return True + def _get_next_record(self, high_rid_counter): + if not self._check_file(): + return False + self.last_record_offset = self.current_journal_file.file_header.file_handle.tell() + this_record = qlslibs.utils.load(self.current_journal_file.file_header.file_handle, qlslibs.jrnl.RecordHeader) + if not this_record.is_header_valid(self.current_journal_file.file_header): + return False + if self.first_rec_flag: + if this_record.file_offset != self.current_journal_file.file_header.first_record_offset: + raise qlslibs.err.FirstRecordOffsetMismatchError(self.current_journal_file.file_header, this_record) + self.first_rec_flag = False + self.statistics.total_record_count += 1 + start_journal_file = self.current_journal_file + if isinstance(this_record, qlslibs.jrnl.EnqueueRecord): + ok_flag = self._handle_enqueue_record(this_record, start_journal_file) + high_rid_counter.check(this_record.record_id) + if self.args.show_recovery_recs or self.args.show_all_recs: + print '0x%x:%s' % (start_journal_file.file_header.file_num, \ + this_record.to_string(self.args.show_xids, self.args.show_data)) + elif isinstance(this_record, qlslibs.jrnl.DequeueRecord): + ok_flag = self._handle_dequeue_record(this_record, start_journal_file) + high_rid_counter.check(this_record.record_id) + if self.args.show_recovery_recs or self.args.show_all_recs: + print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record.to_string(self.args.show_xids)) + elif isinstance(this_record, qlslibs.jrnl.TransactionRecord): + ok_flag = self._handle_transaction_record(this_record, start_journal_file) + high_rid_counter.check(this_record.record_id) + if self.args.show_recovery_recs or self.args.show_all_recs: + print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record.to_string(self.args.show_xids)) + else: + self.statistics.filler_record_count += 1 + ok_flag = True + if self.args.show_all_recs: + print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record) + qlslibs.utils.skip(self.current_journal_file.file_header.file_handle, qlslibs.utils.DEFAULT_DBLK_SIZE) + return ok_flag + def _handle_enqueue_record(self, enqueue_record, start_journal_file): + while enqueue_record.load(self.current_journal_file.file_header.file_handle): + if not self._get_next_file(): + enqueue_record.truncated_flag = True + return False + if not enqueue_record.is_valid(start_journal_file): + return False + if enqueue_record.is_external() and enqueue_record.data != None: + raise qlslibs.err.ExternalDataError(self.current_journal_file.file_header, enqueue_record) + if enqueue_record.is_transient(): + self.statistics.transient_record_count += 1 + return True + if enqueue_record.xid_size > 0: + self.txn_map.add(start_journal_file, enqueue_record) + self.statistics.transaction_operation_count += 1 + self.statistics.transaction_record_count += 1 + self.statistics.transaction_enqueue_count += 1 + else: + self.enq_map.add(start_journal_file, enqueue_record, False) + start_journal_file.incr_enq_cnt() + self.statistics.enqueue_count += 1 + return True + def _handle_dequeue_record(self, dequeue_record, start_journal_file): + while dequeue_record.load(self.current_journal_file.file_header.file_handle): + if not self._get_next_file(): + dequeue_record.truncated_flag = True + return False + if not dequeue_record.is_valid(start_journal_file): + return False + if dequeue_record.xid_size > 0: + if self.xid_prepared_list is None: # ie this is the TPL + dequeue_record.transaction_prepared_list_flag = True + elif not self.enq_map.contains(dequeue_record.dequeue_record_id): + dequeue_record.warnings.append('NOT IN EMAP') # Only for non-TPL records + self.txn_map.add(start_journal_file, dequeue_record) + self.statistics.transaction_operation_count += 1 + self.statistics.transaction_record_count += 1 + self.statistics.transaction_dequeue_count += 1 + else: + try: + self.enq_map.delete(start_journal_file, dequeue_record)[0].decr_enq_cnt(dequeue_record) + except qlslibs.err.RecordIdNotFoundError: + dequeue_record.warnings.append('NOT IN EMAP') + self.statistics.dequeue_count += 1 + return True + def _handle_transaction_record(self, transaction_record, start_journal_file): + while transaction_record.load(self.current_journal_file.file_header.file_handle): + if not self._get_next_file(): + transaction_record.truncated_flag = True + return False + if not transaction_record.is_valid(start_journal_file): + return False + if transaction_record.magic[-1] == 'a': + self.statistics.transaction_abort_count += 1 + else: + self.statistics.transaction_commit_count += 1 + if self.txn_map.contains(transaction_record.xid): + self.txn_map.delete(self.current_journal_file, transaction_record) + else: + transaction_record.warnings.append('NOT IN TMAP') +# if transaction_record.magic[-1] == 'c': # commits only +# self._txn_obj_list[hdr.xid] = hdr + self.statistics.transaction_record_count += 1 + return True + def _load_data(self, record): + while not record.is_complete: + record.load(self.current_journal_file.file_handle) + +class JournalFile(object): + def __init__(self, file_header): + self.file_header = file_header + self.enq_cnt = 0 + self.deq_cnt = 0 + self.num_filler_records_required = None + def incr_enq_cnt(self): + self.enq_cnt += 1 + def decr_enq_cnt(self, record): + if self.enq_cnt <= self.deq_cnt: + raise qlslibs.err.EnqueueCountUnderflowError(self.file_header, record) + self.deq_cnt += 1 + def get_enq_cnt(self): + return self.enq_cnt - self.deq_cnt + def is_outstanding_enq(self): + return self.enq_cnt > self.deq_cnt + @staticmethod + def report_header(): + print 'file_num enq_cnt p_no efp journal_file' + print '-------- ------- ---- ----- ------------' + def report(self): + comment = '<uninitialized>' if self.file_header.file_num == 0 else '' + file_num_str = '0x%x' % self.file_header.file_num + print '%8s %7d %4d %4dk %s %s' % (file_num_str, self.get_enq_cnt(), self.file_header.partition_num, + self.file_header.efp_data_size_kb, + os.path.basename(self.file_header.file_handle.name), comment) diff --git a/qpid/tools/src/py/qlslibs/efp.py b/qpid/tools/src/py/qlslibs/efp.py new file mode 100644 index 0000000000..1c751c3d06 --- /dev/null +++ b/qpid/tools/src/py/qlslibs/efp.py @@ -0,0 +1,327 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +Module: qlslibs.efp + +Contains empty file pool (EFP) classes. +""" + +import os +import os.path +import qlslibs.err +import shutil +import uuid + +class EfpManager(object): + """ + Top level class to analyze the Qpid Linear Store (QLS) directory for the partitions that make up the + Empty File Pool (EFP). + """ + def __init__(self, directory, disk_space_required_kb): + if not os.path.exists(directory): + raise qlslibs.err.InvalidQlsDirectoryNameError(directory) + self.directory = directory + self.disk_space_required_kb = disk_space_required_kb + self.efp_partitions = [] + self.efp_pools = {} + self.total_num_files = 0 + self.total_cum_file_size_kb = 0 + self.current_efp_partition = None + def add_file_pool(self, file_size_kb, num_files): + """ Add an EFP in the specified partition of the specified size containing the specified number of files """ + dir_name = EmptyFilePool.get_directory_name(file_size_kb) + print 'Adding pool \'%s\' to partition %s' % (dir_name, self.current_efp_partition.partition_number) + self.total_cum_file_size_kb += self.current_efp_partition.create_new_efp(file_size_kb, num_files) + self.total_num_files += num_files + def freshen_file_pool(self, file_size_kb, num_files): + """ Freshen an EFP in the specified partition and of the specified size to the specified number of files """ + if self.current_efp_partition is None: + partition_list = self.efp_partitions + partition_str = 'all partitions' + else: + partition_list = [self.current_efp_partition] + partition_str = 'partition %d' % self.current_efp_partition.partition_number + if file_size_kb is None: + pool_str = 'all pools' + else: + pool_str = 'pool \'%s\'' % EmptyFilePool.get_directory_name(int(file_size_kb)) + print 'Freshening %s in %s to %d files' % (pool_str, partition_str, num_files) + for self.current_efp_partition in partition_list: # Partition objects + if file_size_kb is None: + file_size_list = self.current_efp_partition.efp_pools.keys() + else: + file_size_list = ['%sk' % file_size_kb] + for file_size in file_size_list: + efp = self.current_efp_partition.efp_pools[file_size] + num_files_needed = num_files - efp.get_tot_file_count() + if num_files_needed > 0: + self.current_efp_partition.create_new_efp_files(qlslibs.utils.efp_directory_size(file_size), + num_files_needed) + else: + print ' WARNING: Pool %s in partition %s already contains %d files: no action taken' % \ + (self.current_efp_partition.efp_pools[file_size].size_str, + self.current_efp_partition.partition_number, efp.get_num_files()) + def remove_file_pool(self, file_size_kb): + """ Remove an existing EFP from the specified partition and of the specified size """ + dir_name = EmptyFilePool.get_directory_name(file_size_kb) + print 'Removing pool \'%s\' from partition %s' % (dir_name, self.current_efp_partition.partition_number) + self.efp_partitions.remove(self.current_efp_partition) + shutil.rmtree(os.path.join(self.current_efp_partition.efp_directory, dir_name)) + def report(self): + print 'Empty File Pool (EFP) report' + print '============================' + print 'Found', len(self.efp_partitions), 'partition(s)' + if (len(self.efp_partitions)) > 0: + sorted_efp_partitions = sorted(self.efp_partitions, key=lambda x: x.partition_number) + EfpPartition.print_report_table_header() + for ptn in sorted_efp_partitions: + ptn.print_report_table_line() + print + for ptn in sorted_efp_partitions: + ptn.report() + def run(self, arg_tup): + self._analyze_efp() + if arg_tup is not None: + _, arg_file_size, arg_num_files, arg_add, arg_remove, arg_freshen, arg_list = arg_tup + self._check_args(arg_tup) + if arg_add: + self.add_file_pool(int(arg_file_size), int(arg_num_files)) + if arg_remove: + self.remove_file_pool(int(arg_file_size)) + if arg_freshen: + self.freshen_file_pool(arg_file_size, int(arg_num_files)) + if arg_list: + self.report() + def _analyze_efp(self): + for dir_entry in os.listdir(self.directory): + try: + efp_partition = EfpPartition(os.path.join(self.directory, dir_entry), self.disk_space_required_kb) + efp_partition.scan() + self.efp_partitions.append(efp_partition) + for efpl in efp_partition.efp_pools.iterkeys(): + if efpl not in self.efp_pools: + self.efp_pools[efpl] = [] + self.efp_pools[efpl].append(efp_partition.efp_pools[efpl]) + self.total_num_files += efp_partition.tot_file_count + self.total_cum_file_size_kb += efp_partition.tot_file_size_kb + except qlslibs.err.InvalidPartitionDirectoryNameError: + pass + def _check_args(self, arg_tup): + """ Value check of args. The names of partitions and pools are validated against the discovered instances """ + arg_partition, arg_file_size, _, arg_add, arg_remove, arg_freshen, _ = arg_tup + if arg_partition is not None: + try: + if arg_partition[0] == 'p': # string partition name, eg 'p001' + partition_num = int(arg_partition[1:]) + else: # numeric partition, eg '1' + partition_num = int(arg_partition) + found = False + for partition in self.efp_partitions: + if partition.partition_number == partition_num: + self.current_efp_partition = partition + found = True + break + if not found: + raise qlslibs.err.PartitionDoesNotExistError(arg_partition) + except ValueError: + raise qlslibs.err.InvalidPartitionDirectoryNameError(arg_partition) + if self.current_efp_partition is not None: + pool_list = self.current_efp_partition.efp_pools.keys() + efp_directory_name = EmptyFilePool.get_directory_name(int(arg_file_size)) + if arg_add and efp_directory_name in pool_list: + raise qlslibs.err.PoolDirectoryAlreadyExistsError(efp_directory_name) + if (arg_remove or arg_freshen) and efp_directory_name not in pool_list: + raise qlslibs.err.PoolDirectoryDoesNotExistError(efp_directory_name) + +class EfpPartition(object): + """ + Class that represents a EFP partition. Each partition contains one or more Empty File Pools (EFPs). + """ + PTN_DIR_PREFIX = 'p' + EFP_DIR_NAME = 'efp' + def __init__(self, directory, disk_space_required_kb): + self.directory = directory + self.partition_number = None + self.efp_pools = {} + self.tot_file_count = 0 + self.tot_file_size_kb = 0 + self._validate_partition_directory(disk_space_required_kb) + def create_new_efp_files(self, file_size_kb, num_files): + """ Create new EFP files in this partition """ + dir_name = EmptyFilePool.get_directory_name(file_size_kb) + if dir_name in self.efp_pools.keys(): + efp = self.efp_pools[dir_name] + else: + efp = EmptyFilePool(os.path.join(self.directory, EfpPartition.EFP_DIR_NAME), dir_name) + this_tot_file_size_kb = efp.create_new_efp_files(num_files) + self.tot_file_size_kb += this_tot_file_size_kb + self.tot_file_count += num_files + return this_tot_file_size_kb + @staticmethod + def print_report_table_header(): + print 'p_no no_efp tot_files tot_size_kb directory' + print '---- ------ --------- ----------- ---------' + def print_report_table_line(self): + print '%4d %6d %9d %11d %s' % (self.partition_number, len(self.efp_pools), self.tot_file_count, + self.tot_file_size_kb, self.directory) + def report(self): + print 'Partition %s:' % os.path.basename(self.directory) + if len(self.efp_pools) > 0: + EmptyFilePool.print_report_table_header() + for dir_name in self.efp_pools.keys(): + self.efp_pools[dir_name].print_report_table_line() + else: + print '<empty - no EFPs found in this partition>' + print + def scan(self): + if os.path.exists(self.directory): + efp_dir = os.path.join(self.directory, EfpPartition.EFP_DIR_NAME) + for dir_entry in os.listdir(efp_dir): + efp = EmptyFilePool(os.path.join(efp_dir, dir_entry), self.partition_number) + efp.scan() + self.tot_file_count += efp.get_tot_file_count() + self.tot_file_size_kb += efp.get_tot_file_size_kb() + self.efp_pools[dir_entry] = efp + def _validate_partition_directory(self, disk_space_required_kb): + if os.path.basename(self.directory)[0] is not EfpPartition.PTN_DIR_PREFIX: + raise qlslibs.err.InvalidPartitionDirectoryNameError(self.directory) + try: + self.partition_number = int(os.path.basename(self.directory)[1:]) + except ValueError: + raise qlslibs.err.InvalidPartitionDirectoryNameError(self.directory) + if not qlslibs.utils.has_write_permission(self.directory): + raise qlslibs.err.WritePermissionError(self.directory) + if disk_space_required_kb is not None: + space_avail = qlslibs.utils.get_avail_disk_space(self.directory) + if space_avail < (disk_space_required_kb * 1024): + raise qlslibs.err.InsufficientSpaceOnDiskError(self.directory, space_avail, + disk_space_required_kb * 1024) + +class EmptyFilePool(object): + """ + Class that represents a single Empty File Pool within a partition. Each EFP contains pre-formatted linear store + journal files (but it may also be empty). + """ + EFP_DIR_SUFFIX = 'k' + EFP_JRNL_EXTENTION = '.jrnl' + EFP_INUSE_DIRNAME = 'in_use' + EFP_RETURNED_DIRNAME = 'returned' + def __init__(self, directory, partition_number): + self.base_dir_name = os.path.basename(directory) + self.directory = directory + self.partition_number = partition_number + self.data_size_kb = None + self.efp_files = [] + self.in_use_files = [] + self.returned_files = [] + self._validate_efp_directory() + def create_new_efp_files(self, num_files): + """ Create one or more new empty journal files of the prescribed size for this EFP """ + this_total_file_size = 0 + for _ in range(num_files): + this_total_file_size += self._create_new_efp_file() + return this_total_file_size + def get_directory(self): + return self.directory + @staticmethod + def get_directory_name(file_size_kb): + """ Static function to create an EFP directory name from the size of the files it contains """ + return '%dk' % file_size_kb + def get_tot_file_count(self): + return len(self.efp_files) + def get_tot_file_size_kb(self): + return self.data_size_kb * len(self.efp_files) + @staticmethod + def print_report_table_header(): + print ' ---------- efp ------------ --------- in_use ---------- -------- returned ---------' + print 'data_size_kb file_count tot_file_size_kb file_count tot_file_size_kb file_count tot_file_size_kb efp_directory' + print '------------ ---------- ---------------- ---------- ---------------- ---------- ---------------- -------------' + def print_report_table_line(self): + print '%12d %10d %16d %10d %16d %10d %16d %s' % (self.data_size_kb, len(self.efp_files), + self.data_size_kb * len(self.efp_files), + len(self.in_use_files), + self.data_size_kb * len(self.in_use_files), + len(self.returned_files), + self.data_size_kb * len(self.returned_files), + self.get_directory()) + def scan(self): + for efp_file in os.listdir(self.directory): + if efp_file == self.EFP_INUSE_DIRNAME: + for in_use_file in os.listdir(os.path.join(self.directory, self.EFP_INUSE_DIRNAME)): + self.in_use_files.append(in_use_file) + continue + if efp_file == self.EFP_RETURNED_DIRNAME: + for returned_file in os.listdir(os.path.join(self.directory, self.EFP_RETURNED_DIRNAME)): + self.returned_files.append(returned_file) + continue + if self._validate_efp_file(os.path.join(self.directory, efp_file)): + self.efp_files.append(efp_file) + def _add_efp_file(self, efp_file_name): + """ Add a single journal file of the appropriate size to this EFP. No file size check is made here. """ + self.efp_files.append(efp_file_name) + def _create_new_efp_file(self): + """ Create a single new empty journal file of the prescribed size for this EFP """ + file_name = str(uuid.uuid4()) + EmptyFilePool.EFP_JRNL_EXTENTION + file_header = qlslibs.jrnl.FileHeader(0, qlslibs.jrnl.FileHeader.MAGIC, qlslibs.utils.DEFAULT_RECORD_VERSION, + 0, 0, 0) + file_header.init(None, None, qlslibs.utils.DEFAULT_HEADER_SIZE_SBLKS, self.partition_number, self.data_size_kb, + 0, 0, 0, 0, 0) + efh = file_header.encode() + efh_bytes = len(efh) + file_handle = open(os.path.join(self.directory, file_name), 'wb') + file_handle.write(efh) + file_handle.write('\xff' * (qlslibs.utils.DEFAULT_SBLK_SIZE - efh_bytes)) + file_handle.write('\x00' * (int(self.data_size_kb) * 1024)) + file_handle.close() + fqfn = os.path.join(self.directory, file_name) + self._add_efp_file(fqfn) + return os.path.getsize(fqfn) + def _validate_efp_directory(self): + if self.base_dir_name[-1] is not EmptyFilePool.EFP_DIR_SUFFIX: + raise qlslibs.err.InvalidEfpDirectoryNameError(self.directory) + try: + self.data_size_kb = int(os.path.basename(self.base_dir_name)[:-1]) + except ValueError: + raise qlslibs.err.InvalidEfpDirectoryNameError(self.directory) + def _validate_efp_file(self, efp_file): + file_size = os.path.getsize(efp_file) + expected_file_size = (self.data_size_kb * 1024) + qlslibs.utils.DEFAULT_SBLK_SIZE + if file_size != expected_file_size: + print 'WARNING: File %s not of correct size (size=%d, expected=%d): Ignoring' % (efp_file, file_size, + expected_file_size) + return False + file_handle = open(efp_file) + args = qlslibs.utils.load_args(file_handle, qlslibs.jrnl.RecordHeader) + file_hdr = qlslibs.jrnl.FileHeader(*args) + file_hdr.init(file_handle, *qlslibs.utils.load_args(file_handle, qlslibs.jrnl.FileHeader)) + if not file_hdr.is_header_valid(file_hdr): + file_handle.close() + return False + file_hdr.load(file_handle) + file_handle.close() + if not file_hdr.is_valid(True): + return False + return True + + +# ============================================================================= + +if __name__ == "__main__": + print "This is a library, and cannot be executed." diff --git a/qpid/tools/src/py/qlslibs/err.py b/qpid/tools/src/py/qlslibs/err.py new file mode 100644 index 0000000000..f47632ce6a --- /dev/null +++ b/qpid/tools/src/py/qlslibs/err.py @@ -0,0 +1,261 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +Module: qlslibs.err + +Contains error classes. +""" + +# --- Parent classes + +class QlsError(Exception): + """Base error class for QLS errors and exceptions""" + def __init__(self): + Exception.__init__(self) + def __str__(self): + return '' + +class QlsRecordError(QlsError): + """Base error class for individual records""" + def __init__(self, file_header, record): + QlsError.__init__(self) + self.file_header = file_header + self.record = record + def get_expected_fro(self): + return self.file_header.first_record_offset + def get_file_number(self): + return self.file_header.file_num + def get_queue_name(self): + return self.file_header.queue_name + def get_record_id(self): + return self.record.record_id + def get_record_offset(self): + return self.record.file_offset + def __str__(self): + return 'queue="%s" file_id=0x%x record_offset=0x%x record_id=0x%x' % \ + (self.file_header.queue_name, self.file_header.file_num, self.record.file_offset, self.record.record_id) + +# --- Error classes + +class AlreadyLockedError(QlsRecordError): + """Transactional record to be locked is already locked""" + def __init__(self, file_header, record): + QlsRecordError.__init__(self, file_header, record) + def __str__(self): + return 'Transactional operation already locked in TransactionMap: ' + QlsRecordError.__str__(self) + +class DataSizeError(QlsError): + """Error class for Data size mismatch""" + def __init__(self, expected_size, actual_size, data_str): + QlsError.__init__(self) + self.expected_size = expected_size + self.actual_size = actual_size + self.xid_str = data_str + def __str__(self): + return 'Inconsistent data size: expected:%d; actual:%d; data="%s"' % \ + (self.expected_size, self.actual_size, self.data_str) + +class DuplicateRecordIdError(QlsRecordError): + """Duplicate Record Id in Enqueue Map""" + def __init__(self, file_header, record): + QlsRecordError.__init__(self, file_header, record) + def __str__(self): + return 'Duplicate Record Id in enqueue map: ' + QlsRecordError.__str__(self) + +class EnqueueCountUnderflowError(QlsRecordError): + """Attempted to decrement enqueue count past 0""" + def __init__(self, file_header, record): + QlsRecordError.__init__(self, file_header, record) + def __str__(self): + return 'Enqueue record count underflow: ' + QlsRecordError.__str__(self) + +class ExternalDataError(QlsRecordError): + """Data present in Enqueue record when external data flag is set""" + def __init__(self, file_header, record): + QlsRecordError.__init__(self, file_header, record) + def __str__(self): + return 'Data present in external data record: ' + QlsRecordError.__str__(self) + +class FirstRecordOffsetMismatchError(QlsRecordError): + """First Record Offset (FRO) does not match file header""" + def __init__(self, file_header, record): + QlsRecordError.__init__(self, file_header, record) + def __str__(self): + return 'First record offset mismatch: ' + QlsRecordError.__str__(self) + ' expected_offset=0x%x' % \ + self.file_header.first_record_offset + +class InsufficientSpaceOnDiskError(QlsError): + """Insufficient space on disk""" + def __init__(self, directory, space_avail, space_requried): + QlsError.__init__(self) + self.directory = directory + self.space_avail = space_avail + self.space_required = space_requried + def __str__(self): + return 'Insufficient space on disk: directory=%s; avail_space=%d required_space=%d' % \ + (self.directory, self.space_avail, self.space_required) + +class InvalidClassError(QlsError): + """Invalid class name or type""" + def __init__(self, class_name): + QlsError.__init__(self) + self.class_name = class_name + def __str__(self): + return 'Invalid class name "%s"' % self.class_name + +class InvalidEfpDirectoryNameError(QlsError): + """Invalid EFP directory name - should be NNNNk, where NNNN is a number (of any length)""" + def __init__(self, directory_name): + QlsError.__init__(self) + self.directory_name = directory_name + def __str__(self): + return 'Invalid EFP directory name "%s"' % self.directory_name + +#class InvalidFileSizeString(QlsError): +# """Invalid file size string""" +# def __init__(self, file_size_string): +# QlsError.__init__(self) +# self.file_size_string = file_size_string +# def __str__(self): +# return 'Invalid file size string "%s"' % self.file_size_string + +class InvalidPartitionDirectoryNameError(QlsError): + """Invalid EFP partition name - should be pNNN, where NNN is a 3-digit partition number""" + def __init__(self, directory_name): + QlsError.__init__(self) + self.directory_name = directory_name + def __str__(self): + return 'Invalid partition directory name "%s"' % self.directory_name + +class InvalidQlsDirectoryNameError(QlsError): + """Invalid QLS directory name""" + def __init__(self, directory_name): + QlsError.__init__(self) + self.directory_name = directory_name + def __str__(self): + return 'Invalid QLS directory name "%s"' % self.directory_name + +class InvalidRecordTypeError(QlsRecordError): + """Error class for any operation using an invalid record type""" + def __init__(self, file_header, record, error_msg): + QlsRecordError.__init__(self, file_header, record) + self.error_msg = error_msg + def __str__(self): + return 'Invalid record type: ' + QlsRecordError.__str__(self) + ':' + self.error_msg + +class InvalidRecordVersionError(QlsRecordError): + """Invalid record version""" + def __init__(self, file_header, record, expected_version): + QlsRecordError.__init__(self, file_header, record) + self.expected_version = expected_version + def __str__(self): + return 'Invalid record version: queue="%s" ' + QlsRecordError.__str__(self) + \ + ' ver_found=0x%x ver_expected=0x%x' % (self.record_header.version, self.expected_version) + +class NoMoreFilesInJournalError(QlsError): + """Raised when trying to obtain the next file in the journal and there are no more files""" + def __init__(self, queue_name): + QlsError.__init__(self) + self.queue_name = queue_name + def __str__(self): + return 'No more journal files in queue "%s"' % self.queue_name + +class NonTransactionalRecordError(QlsRecordError): + """Transactional operation on non-transactional record""" + def __init__(self, file_header, record, operation): + QlsRecordError.__init__(self, file_header, record) + self.operation = operation + def __str__(self): + return 'Transactional operation on non-transactional record: ' + QlsRecordError.__str__() + \ + ' operation=%s' % self.operation + +class PartitionDoesNotExistError(QlsError): + """Partition name does not exist on disk""" + def __init__(self, partition_directory): + QlsError.__init__(self) + self.partition_directory = partition_directory + def __str__(self): + return 'Partition %s does not exist' % self.partition_directory + +class PoolDirectoryAlreadyExistsError(QlsError): + """Pool directory already exists""" + def __init__(self, pool_directory): + QlsError.__init__(self) + self.pool_directory = pool_directory + def __str__(self): + return 'Pool directory %s already exists' % self.pool_directory + +class PoolDirectoryDoesNotExistError(QlsError): + """Pool directory does not exist""" + def __init__(self, pool_directory): + QlsError.__init__(self) + self.pool_directory = pool_directory + def __str__(self): + return 'Pool directory %s does not exist' % self.pool_directory + +class RecordIdNotFoundError(QlsRecordError): + """Record Id not found in enqueue map""" + def __init__(self, file_header, record): + QlsRecordError.__init__(self, file_header, record) + def __str__(self): + return 'Record Id not found in enqueue map: ' + QlsRecordError.__str__() + +class RecordNotLockedError(QlsRecordError): + """Record in enqueue map is not locked""" + def __init__(self, file_header, record): + QlsRecordError.__init__(self, file_header, record) + def __str__(self): + return 'Record in enqueue map is not locked: ' + QlsRecordError.__str__() + +class UnexpectedEndOfFileError(QlsError): + """The bytes read from a file is less than that expected""" + def __init__(self, size_read, size_expected, file_offset, file_name): + QlsError.__init__(self) + self.size_read = size_read + self.size_expected = size_expected + self.file_offset = file_offset + self.file_name = file_name + def __str__(self): + return 'Tried to read %d at offset %d in file "%s"; only read %d' % \ + (self.size_read, self.file_offset, self.file_name, self.size_expected) + +class WritePermissionError(QlsError): + """No write permission""" + def __init__(self, directory): + QlsError.__init__(self) + self.directory = directory + def __str__(self): + return 'No write permission in directory %s' % self.directory + +class XidSizeError(QlsError): + """Error class for Xid size mismatch""" + def __init__(self, expected_size, actual_size, xid_str): + QlsError.__init__(self) + self.expected_size = expected_size + self.actual_size = actual_size + self.xid_str = xid_str + def __str__(self): + return 'Inconsistent xid size: expected:%d; actual:%d; xid="%s"' % \ + (self.expected_size, self.actual_size, self.xid_str) + +# ============================================================================= + +if __name__ == "__main__": + print "This is a library, and cannot be executed." diff --git a/qpid/tools/src/py/qlslibs/jrnl.py b/qpid/tools/src/py/qlslibs/jrnl.py new file mode 100644 index 0000000000..ee25015220 --- /dev/null +++ b/qpid/tools/src/py/qlslibs/jrnl.py @@ -0,0 +1,412 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +Module: qlslibs.jrnl + +Contains journal record classes. +""" + +import qlslibs.err +import qlslibs.utils +import string +import struct +import time + +class RecordHeader(object): + FORMAT = '<4s2H2Q' + def __init__(self, file_offset, magic, version, user_flags, serial, record_id): + self.file_offset = file_offset + self.magic = magic + self.version = version + self.user_flags = user_flags + self.serial = serial + self.record_id = record_id + self.warnings = [] + self.truncated_flag = False + def encode(self): + return struct.pack(RecordHeader.FORMAT, self.magic, self.version, self.user_flags, self.serial, self.record_id) + def load(self, file_handle): + pass + @staticmethod + def discriminate(args): + """Use the last char in the header magic to determine the header type""" + return CLASSES.get(args[1][-1], RecordHeader) + def is_empty(self): + """Return True if this record is empty (ie has a magic of 0x0000""" + return self.magic == '\x00'*4 + def is_header_valid(self, file_header): + """Check that this record is valid""" + if self.is_empty(): + return False + if self.magic[:3] != 'QLS' or self.magic[3] not in ['a', 'c', 'd', 'e', 'f', 'x']: + return False + if self.magic[-1] != 'x': + if self.version != qlslibs.utils.DEFAULT_RECORD_VERSION: + raise qlslibs.err.InvalidRecordVersionError(file_header, self, qlslibs.utils.DEFAULT_RECORD_VERSION) + if self.serial != file_header.serial: + return False + return True + def to_rh_string(self): + """Return string representation of this header""" + if self.is_empty(): + return '0x%08x: <empty>' % (self.file_offset) + if self.magic[-1] == 'x': + return '0x%08x: [X]' % (self.file_offset) + if self.magic[-1] in ['a', 'c', 'd', 'e', 'f', 'x']: + return '0x%08x: [%c v=%d f=0x%04x rid=0x%x]' % \ + (self.file_offset, self.magic[-1].upper(), self.version, self.user_flags, self.record_id) + return '0x%08x: <error, unknown magic "%s" (possible overwrite boundary?)>' % (self.file_offset, self.magic) + def _get_warnings(self): + warn_str = '' + for warn in self.warnings: + warn_str += '<%s>' % warn + return warn_str + def __str__(self): + """Return string representation of this header""" + return self.to_rh_string() + +class RecordTail(object): + FORMAT = '<4sL2Q' + def __init__(self, file_handle): # TODO - clumsy, only allows reading from disk. Move all disk stuff to laod() + self.file_offset = file_handle.tell() if file_handle is not None else 0 + self.complete = False + self.read_size = struct.calcsize(RecordTail.FORMAT) + self.fbin = file_handle.read(self.read_size) if file_handle is not None else None + self.valid_flag = None + if self.fbin is not None and len(self.fbin) >= self.read_size: + self.complete = True + self.xmagic, self.checksum, self.serial, self.record_id = struct.unpack(RecordTail.FORMAT, self.fbin) + def load(self, file_handle): + """Used to continue load of RecordTail object if it is split between files""" + if not self.is_complete: + self.fbin += file_handle.read(self.read_size - len(self.fbin)) + if (len(self.fbin)) >= self.read_size: + self.complete = True + self.xmagic, self.checksum, self.serial, self.record_id = struct.unpack(RecordTail.FORMAT, self.fbin) + def is_complete(self): + return self.complete + def is_valid(self, record): + if self.valid_flag is None: + if not self.complete: + return False + self.valid_flag = qlslibs.utils.inv_str(self.xmagic) == record.magic and \ + self.serial == record.serial and \ + self.record_id == record.record_id and \ + qlslibs.utils.adler32(record.checksum_encode()) == self.checksum + return self.valid_flag + def to_string(self): + """Return a string representation of the this RecordTail instance""" + if self.valid_flag is not None: + if not self.valid_flag: + return '[INVALID RECORD TAIL]' + magic = qlslibs.utils.inv_str(self.xmagic) + magic_char = magic[-1].upper() if magic[-1] in string.printable else '?' + return '[%c cs=0x%08x rid=0x%x]' % (magic_char, self.checksum, self.record_id) + def __str__(self): + """Return a string representation of the this RecordTail instance""" + return self.to_string() + +class FileHeader(RecordHeader): + FORMAT = '<2H4x5QH' + MAGIC = 'QLSf' + def init(self, file_handle, _, file_header_size_sblks, partition_num, efp_data_size_kb, first_record_offset, + timestamp_sec, timestamp_ns, file_num, queue_name_len): + self.file_handle = file_handle + self.file_header_size_sblks = file_header_size_sblks + self.partition_num = partition_num + self.efp_data_size_kb = efp_data_size_kb + self.first_record_offset = first_record_offset + self.timestamp_sec = timestamp_sec + self.timestamp_ns = timestamp_ns + self.file_num = file_num + self.queue_name_len = queue_name_len + self.queue_name = None + def encode(self): + if self.queue_name is None: + return RecordHeader.encode(self) + struct.pack(self.FORMAT, self.file_header_size_sblks, \ + self.partition_num, self.efp_data_size_kb, \ + self.first_record_offset, self.timestamp_sec, \ + self.timestamp_ns, self.file_num, 0) + return RecordHeader.encode(self) + struct.pack(self.FORMAT, self.file_header_size_sblks, self.partition_num, \ + self.efp_data_size_kb, self.first_record_offset, \ + self.timestamp_sec, self.timestamp_ns, self.file_num, \ + self.queue_name_len) + self.queue_name + def get_file_size(self): + """Sum of file header size and data size""" + return (self.file_header_size_sblks * qlslibs.utils.DEFAULT_SBLK_SIZE) + (self.efp_data_size_kb * 1024) + def load(self, file_handle): + self.queue_name = file_handle.read(self.queue_name_len) + def is_end_of_file(self): + return self.file_handle.tell() >= self.get_file_size() + def is_valid(self, is_empty): + if not RecordHeader.is_header_valid(self, self): + return False + if self.file_handle is None or self.file_header_size_sblks == 0 or self.partition_num == 0 or \ + self.efp_data_size_kb == 0: + return False + if is_empty: + if self.first_record_offset != 0 or self.timestamp_sec != 0 or self.timestamp_ns != 0 or \ + self.file_num != 0 or self.queue_name_len != 0: + return False + else: + if self.first_record_offset == 0 or self.timestamp_sec == 0 or self.timestamp_ns == 0 or \ + self.file_num == 0 or self.queue_name_len == 0: + return False + if self.queue_name is None: + return False + if len(self.queue_name) != self.queue_name_len: + return False + return True + def timestamp_str(self): + """Get the timestamp of this record in string format""" + now = time.gmtime(self.timestamp_sec) + fstr = '%%a %%b %%d %%H:%%M:%%S.%09d %%Y' % (self.timestamp_ns) + return time.strftime(fstr, now) + def to_string(self): + """Return a string representation of the this FileHeader instance""" + return '%s fnum=0x%x fro=0x%08x p=%d s=%dk t=%s %s' % (self.to_rh_string(), self.file_num, + self.first_record_offset, self.partition_num, + self.efp_data_size_kb, self.timestamp_str(), + self._get_warnings()) + def __str__(self): + """Return a string representation of the this FileHeader instance""" + return self.to_string() + +class EnqueueRecord(RecordHeader): + FORMAT = '<2Q' + MAGIC = 'QLSe' + EXTERNAL_FLAG_MASK = 0x20 + TRANSIENT_FLAG_MASK = 0x10 + def init(self, _, xid_size, data_size): + self.xid_size = xid_size + self.data_size = data_size + self.xid = None + self.xid_complete = False + self.data = None + self.data_complete = False + self.record_tail = None + def checksum_encode(self): # encode excluding record tail + cs_bytes = RecordHeader.encode(self) + struct.pack(self.FORMAT, self.xid_size, self.data_size) + if self.xid is not None: + cs_bytes += self.xid + if self.data is not None: + cs_bytes += self.data + return cs_bytes + def is_external(self): + return self.user_flags & EnqueueRecord.EXTERNAL_FLAG_MASK > 0 + def is_transient(self): + return self.user_flags & EnqueueRecord.TRANSIENT_FLAG_MASK > 0 + def is_valid(self, journal_file): + if not RecordHeader.is_header_valid(self, journal_file.file_header): + return False + if not (self.xid_complete and self.data_complete): + return False + if self.xid_size > 0 and len(self.xid) != self.xid_size: + return False + if self.data_size > 0 and len(self.data) != self.data_size: + return False + if self.xid_size > 0 or self.data_size > 0: + if self.record_tail is None: + return False + if not self.record_tail.is_valid(self): + return False + return True + def load(self, file_handle): + """Return True when load is incomplete and must be called again with new file handle""" + self.xid, self.xid_complete = qlslibs.utils.load_data(file_handle, self.xid, self.xid_size) + if not self.xid_complete: + return True + if self.is_external(): + self.data_complete = True + else: + self.data, self.data_complete = qlslibs.utils.load_data(file_handle, self.data, self.data_size) + if not self.data_complete: + return True + if self.xid_size > 0 or self.data_size > 0: + if self.record_tail is None: + self.record_tail = RecordTail(file_handle) + elif not self.record_tail.is_complete(): + self.record_tail.load(file_handle) # Continue loading partially loaded tail + if self.record_tail.is_complete(): + self.record_tail.is_valid(self) + else: + return True + return False + def to_string(self, show_xid_flag, show_data_flag): + """Return a string representation of the this EnqueueRecord instance""" + if self.truncated_flag: + return '%s xid(%d) data(%d) [Truncated, no more files in journal]' % (RecordHeader.__str__(self), + self.xid_size, self.data_size) + if self.record_tail is None: + record_tail_str = '' + else: + record_tail_str = self.record_tail.to_string() + return '%s %s %s %s %s %s' % (self.to_rh_string(), + qlslibs.utils.format_xid(self.xid, self.xid_size, show_xid_flag), + qlslibs.utils.format_data(self.data, self.data_size, show_data_flag), + record_tail_str, self._print_flags(), self._get_warnings()) + def _print_flags(self): + """Utility function to decode the flags field in the header and print a string representation""" + fstr = '' + if self.is_transient(): + fstr = '[TRANSIENT' + if self.is_external(): + if len(fstr) > 0: + fstr += ',EXTERNAL' + else: + fstr = '*EXTERNAL' + if len(fstr) > 0: + fstr += ']' + return fstr + def __str__(self): + """Return a string representation of the this EnqueueRecord instance""" + return self.to_string(False, False) + +class DequeueRecord(RecordHeader): + FORMAT = '<2Q' + MAGIC = 'QLSd' + TXN_COMPLETE_COMMIT_FLAG = 0x10 + def init(self, _, dequeue_record_id, xid_size): + self.dequeue_record_id = dequeue_record_id + self.xid_size = xid_size + self.transaction_prepared_list_flag = False + self.xid = None + self.xid_complete = False + self.record_tail = None + def checksum_encode(self): # encode excluding record tail + return RecordHeader.encode(self) + struct.pack(self.FORMAT, self.dequeue_record_id, self.xid_size) + \ + self.xid + def is_transaction_complete_commit(self): + return self.user_flags & DequeueRecord.TXN_COMPLETE_COMMIT_FLAG > 0 + def is_valid(self, journal_file): + if not RecordHeader.is_header_valid(self, journal_file.file_header): + return False + if self.xid_size > 0: + if not self.xid_complete: + return False + if self.xid_size > 0 and len(self.xid) != self.xid_size: + return False + if self.record_tail is None: + return False + if not self.record_tail.is_valid(self): + return False + return True + def load(self, file_handle): + """Return True when load is incomplete and must be called again with new file handle""" + self.xid, self.xid_complete = qlslibs.utils.load_data(file_handle, self.xid, self.xid_size) + if not self.xid_complete: + return True + if self.xid_size > 0: + if self.record_tail is None: + self.record_tail = RecordTail(file_handle) + elif not self.record_tail.is_complete(): + self.record_tail.load(file_handle) + if self.record_tail.is_complete(): + self.record_tail.is_valid(self) + else: + return True + return False + def to_string(self, show_xid_flag): + """Return a string representation of the this DequeueRecord instance""" + if self.truncated_flag: + return '%s xid(%d) drid=0x%x [Truncated, no more files in journal]' % (RecordHeader.__str__(self), + self.xid_size, + self.dequeue_record_id) + if self.record_tail is None: + record_tail_str = '' + else: + record_tail_str = self.record_tail.to_string() + return '%s drid=0x%x %s %s %s %s' % (self.to_rh_string(), self.dequeue_record_id, + qlslibs.utils.format_xid(self.xid, self.xid_size, show_xid_flag), + record_tail_str, self._print_flags(), self._get_warnings()) + def _print_flags(self): + """Utility function to decode the flags field in the header and print a string representation""" + if self.transaction_prepared_list_flag: + if self.is_transaction_complete_commit(): + return '[COMMIT]' + else: + return '[ABORT]' + return '' + def __str__(self): + """Return a string representation of the this DequeueRecord instance""" + return self.to_string(False) + +class TransactionRecord(RecordHeader): + FORMAT = '<Q' + MAGIC_ABORT = 'QLSa' + MAGIC_COMMIT = 'QLSc' + def init(self, _, xid_size): + self.xid_size = xid_size + self.xid = None + self.xid_complete = False + self.record_tail = None + def checksum_encode(self): # encode excluding record tail + return RecordHeader.encode(self) + struct.pack(self.FORMAT, self.xid_size) + self.xid + def is_valid(self, journal_file): + if not RecordHeader.is_header_valid(self, journal_file.file_header): + return False + if not self.xid_complete or len(self.xid) != self.xid_size: + return False + if self.record_tail is None: + return False + if not self.record_tail.is_valid(self): + return False + return True + def load(self, file_handle): + """Return True when load is incomplete and must be called again with new file handle""" + self.xid, self.xid_complete = qlslibs.utils.load_data(file_handle, self.xid, self.xid_size) + if not self.xid_complete: + return True + if self.xid_size > 0: + if self.record_tail is None: + self.record_tail = RecordTail(file_handle) + elif not self.record_tail.is_complete(): + self.record_tail.load(file_handle) + if self.record_tail.is_complete(): + self.record_tail.is_valid(self) + else: + return True + return False + def to_string(self, show_xid_flag): + """Return a string representation of the this TransactionRecord instance""" + if self.truncated_flag: + return '%s xid(%d) [Truncated, no more files in journal]' % (RecordHeader.__str__(self), self.xid_size) + if self.record_tail is None: + record_tail_str = '' + else: + record_tail_str = self.record_tail.to_string() + return '%s %s %s %s' % (self.to_rh_string(), + qlslibs.utils.format_xid(self.xid, self.xid_size, show_xid_flag), + record_tail_str, self._get_warnings()) + def __str__(self): + """Return a string representation of the this TransactionRecord instance""" + return self.to_string(False) + +# ============================================================================= + +CLASSES = { + 'a': TransactionRecord, + 'c': TransactionRecord, + 'd': DequeueRecord, + 'e': EnqueueRecord, +} + +if __name__ == '__main__': + print 'This is a library, and cannot be executed.' diff --git a/qpid/tools/src/py/qlslibs/utils.py b/qpid/tools/src/py/qlslibs/utils.py new file mode 100644 index 0000000000..c32f8c7abb --- /dev/null +++ b/qpid/tools/src/py/qlslibs/utils.py @@ -0,0 +1,206 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +Module: qlslibs.utils + +Contains helper functions for qpid_qls_analyze. +""" + +import os +import qlslibs.jrnl +import stat +import string +import struct +import subprocess +import zlib + +DEFAULT_DBLK_SIZE = 128 +DEFAULT_SBLK_SIZE = 4096 # 32 dblks +DEFAULT_SBLK_SIZE_KB = DEFAULT_SBLK_SIZE / 1024 +DEFAULT_RECORD_VERSION = 2 +DEFAULT_HEADER_SIZE_SBLKS = 1 + +def adler32(data): + """return the adler32 checksum of data""" + return zlib.adler32(data) & 0xffffffff + +def create_record(magic, uflags, journal_file, record_id, dequeue_record_id, xid, data): + """Helper function to construct a record with xid, data (where applicable) and consistent tail with checksum""" + record_class = qlslibs.jrnl.CLASSES.get(magic[-1]) + record = record_class(0, magic, DEFAULT_RECORD_VERSION, uflags, journal_file.file_header.serial, record_id) + xid_length = len(xid) if xid is not None else 0 + if isinstance(record, qlslibs.jrnl.EnqueueRecord): + data_length = len(data) if data is not None else 0 + record.init(None, xid_length, data_length) + elif isinstance(record, qlslibs.jrnl.DequeueRecord): + record.init(None, dequeue_record_id, xid_length) + elif isinstance(record, qlslibs.jrnl.TransactionRecord): + record.init(None, xid_length) + else: + raise qlslibs.err.InvalidClassError(record.__class__.__name__) + if xid is not None: + record.xid = xid + record.xid_complete = True + if data is not None: + record.data = data + record.data_complete = True + record.record_tail = _mk_record_tail(record) + return record + +def efp_directory_size(directory_name): + """"Decode the directory name in the format NNNk to a numeric size, where NNN is a number string""" + try: + if directory_name[-1] == 'k': + return int(directory_name[:-1]) + except ValueError: + pass + return 0 + +def format_data(data, data_size=None, show_data_flag=True): + """Format binary data for printing""" + return _format_binary(data, data_size, show_data_flag, 'data', qlslibs.err.DataSizeError, False) + +def format_xid(xid, xid_size=None, show_xid_flag=True): + """Format binary XID for printing""" + return _format_binary(xid, xid_size, show_xid_flag, 'xid', qlslibs.err.XidSizeError, True) + +def get_avail_disk_space(path): + df_proc = subprocess.Popen(["df", path], stdout=subprocess.PIPE) + output = df_proc.communicate()[0] + return int(output.split('\n')[1].split()[3]) + +def has_write_permission(path): + stat_info = os.stat(path) + return bool(stat_info.st_mode & stat.S_IRGRP) + +def inv_str(in_string): + """Perform a binary 1's compliment (invert all bits) on a binary string""" + istr = '' + for index in range(0, len(in_string)): + istr += chr(~ord(in_string[index]) & 0xff) + return istr + +def load(file_handle, klass): + """Load a record of class klass from a file""" + args = load_args(file_handle, klass) + subclass = klass.discriminate(args) + result = subclass(*args) # create instance of record + if subclass != klass: + result.init(*load_args(file_handle, subclass)) + return result + +def load_args(file_handle, klass): + """Load the arguments from class klass""" + size = struct.calcsize(klass.FORMAT) + foffs = file_handle.tell(), + fbin = file_handle.read(size) + if len(fbin) != size: + raise qlslibs.err.UnexpectedEndOfFileError(len(fbin), size, foffs, file_handle.name) + return foffs + struct.unpack(klass.FORMAT, fbin) + +def load_data(file_handle, element, element_size): + """Read element_size bytes of binary data from file_handle into element""" + if element_size == 0: + return element, True + if element is None: + element = file_handle.read(element_size) + else: + read_size = element_size - len(element) + element += file_handle.read(read_size) + return element, len(element) == element_size + +def skip(file_handle, boundary): + """Read and discard disk bytes until the next multiple of boundary""" + if not file_handle.closed: + file_handle.read(_rem_bytes_in_block(file_handle, boundary)) + +#--- protected functions --- + +def _format_binary(bin_str, bin_size, show_bin_flag, prefix, err_class, hex_num_flag): + """Format binary XID for printing""" + if bin_str is None and bin_size is not None: + if bin_size > 0: + raise err_class(bin_size, len(bin_str), bin_str) + return '' + if bin_size is None: + bin_size = len(bin_str) + elif bin_size != len(bin_str): + raise err_class(bin_size, len(bin_str), bin_str) + out_str = '%s(%d)' % (prefix, bin_size) + if show_bin_flag: + if _is_printable(bin_str): + binstr = '"%s"' % _split_str(bin_str) + elif hex_num_flag: + binstr = '0x%s' % _str_to_hex_num(bin_str) + else: + binstr = _hex_split_str(bin_str) + out_str += '=%s' % binstr + return out_str + +def _hex_str(in_str, begin, end): + """Return a binary string as a hex string""" + hstr = '' + for index in range(begin, end): + if _is_printable(in_str[index]): + hstr += in_str[index] + else: + hstr += '\\%02x' % ord(in_str[index]) + return hstr + +def _hex_split_str(in_str, split_size = 50): + """Split a hex string into two parts separated by an ellipsis""" + if len(in_str) <= split_size: + return _hex_str(in_str, 0, len(in_str)) + return _hex_str(in_str, 0, 10) + ' ... ' + _hex_str(in_str, len(in_str)-10, len(in_str)) + #return ''.join(x.encode('hex') for x in reversed(in_str)) + +def _is_printable(in_str): + """Return True if in_str in printable; False otherwise.""" + for this_char in in_str: + if this_char not in string.letters and this_char not in string.digits and this_char not in string.punctuation: + return False + return True + +def _mk_record_tail(record): + record_tail = qlslibs.jrnl.RecordTail(None) + record_tail.xmagic = inv_str(record.magic) + record_tail.checksum = adler32(record.checksum_encode()) + record_tail.serial = record.serial + record_tail.record_id = record.record_id + return record_tail + +def _rem_bytes_in_block(file_handle, block_size): + """Return the remaining bytes in a block""" + foffs = file_handle.tell() + return (_size_in_blocks(foffs, block_size) * block_size) - foffs + +def _size_in_blocks(size, block_size): + """Return the size in terms of data blocks""" + return int((size + block_size - 1) / block_size) + +def _split_str(in_str, split_size = 50): + """Split a string into two parts separated by an ellipsis if it is longer than split_size""" + if len(in_str) < split_size: + return in_str + return in_str[:25] + ' ... ' + in_str[-25:] + +def _str_to_hex_num(in_str): + """Turn a string into a hex number representation, little endian assumed (ie LSB is first, MSB is last)""" + return ''.join(x.encode('hex') for x in reversed(in_str)) |