diff options
Diffstat (limited to 'qpid/tools/src/py/qls/jrnl.py')
-rw-r--r-- | qpid/tools/src/py/qls/jrnl.py | 873 |
1 files changed, 107 insertions, 766 deletions
diff --git a/qpid/tools/src/py/qls/jrnl.py b/qpid/tools/src/py/qls/jrnl.py index 5bce78bfad..f4fb16ef9f 100644 --- a/qpid/tools/src/py/qls/jrnl.py +++ b/qpid/tools/src/py/qls/jrnl.py @@ -17,567 +17,17 @@ # under the License. # -import os -import os.path +""" +Module: qls.jrnl + +Contains journal record classes. +""" + import qls.err +import qls.utils import string import struct -from time import gmtime, strftime -import zlib - -class HighCounter(object): - def __init__(self): - self.num = 0 - def check(self, num): - if self.num < num: - self.num = num - def get(self): - return self.num - def get_next(self): - self.num += 1 - return self.num - -class JournalRecoveryManager(object): - TPL_DIR_NAME = 'tpl' - JRNL_DIR_NAME = 'jrnl' - def __init__(self, directory, args): - if not os.path.exists(directory): - raise qls.err.InvalidQlsDirectoryNameError(directory) - self.directory = directory - self.args = args - self.tpl = None - self.journals = {} - self.high_rid_counter = HighCounter() - def report(self, print_stats_flag): - if self.tpl is not None: - self.tpl.report(print_stats_flag) - for queue_name in sorted(self.journals.keys()): - self.journals[queue_name].report(print_stats_flag) - def run(self, args): - tpl_dir = os.path.join(self.directory, JournalRecoveryManager.TPL_DIR_NAME) - if os.path.exists(tpl_dir): - self.tpl = Journal(tpl_dir, None, self.args) - self.tpl.recover(self.high_rid_counter) - print - jrnl_dir = os.path.join(self.directory, JournalRecoveryManager.JRNL_DIR_NAME) - prepared_list = self.tpl.txn_map.get_prepared_list() if self.tpl is not None else {} - if os.path.exists(jrnl_dir): - for dir_entry in sorted(os.listdir(jrnl_dir)): - jrnl = Journal(os.path.join(jrnl_dir, dir_entry), prepared_list, self.args) - jrnl.recover(self.high_rid_counter) - self.journals[jrnl.get_queue_name()] = jrnl - print - self._reconcile_transactions(prepared_list, args.txn) - def _reconcile_transactions(self, prepared_list, txn_flag): - print 'Transaction reconciliation report:' - print '==================================' - print len(prepared_list), 'open transaction(s) found in prepared transaction list:' - for xid in prepared_list.keys(): - commit_flag = prepared_list[xid] - if commit_flag is None: - status = '[Prepared, neither committed nor aborted - assuming commit]' - elif commit_flag: - status = '[Prepared, but interrupted during commit phase]' - else: - status = '[Prepared, but interrupted during abort phase]' - print ' ', Utils.format_xid(xid), status - if prepared_list[xid] is None: # Prepared, but not committed or aborted - enqueue_record = self.tpl.get_txn_map_record(xid)[0][1] - dequeue_record = Utils.create_record('QLSd', DequeueRecord.TXN_COMPLETE_COMMIT_FLAG, \ - self.tpl.current_journal_file, self.high_rid_counter.get_next(), \ - enqueue_record.record_id, xid, None) - if txn_flag: - self.tpl.add_record(dequeue_record) - for queue_name in sorted(self.journals.keys()): - self.journals[queue_name].reconcile_transactions(prepared_list, txn_flag) - if len(prepared_list) > 0: - print 'Completing prepared transactions in prepared transaction list:' - for xid in prepared_list.keys(): - print ' ', Utils.format_xid(xid) - transaction_record = Utils.create_record('QLSc', 0, self.tpl.current_journal_file, \ - self.high_rid_counter.get_next(), None, xid, None) - if txn_flag: - self.tpl.add_record(transaction_record) - print - -class EnqueueMap(object): - """ - Map of enqueued records in a QLS journal - """ - def __init__(self, journal): - self.journal = journal - self.enq_map = {} - def add(self, journal_file, enq_record, locked_flag): - if enq_record.record_id in self.enq_map: - raise qls.err.DuplicateRecordIdError(self.journal.current_file_header, enq_record) - self.enq_map[enq_record.record_id] = [journal_file, enq_record, locked_flag] - def contains(self, rid): - """Return True if the map contains the given rid""" - return rid in self.enq_map - def delete(self, journal_file, deq_record): - if deq_record.dequeue_record_id in self.enq_map: - enq_list = self.enq_map[deq_record.dequeue_record_id] - del self.enq_map[deq_record.dequeue_record_id] - return enq_list - else: - raise qls.err.RecordIdNotFoundError(journal_file.file_header, deq_record) - def get(self, record_id): - if record_id in self.enq_map: - return self.enq_map[record_id] - return None - def lock(self, journal_file, dequeue_record): - if dequeue_record.dequeue_record_id not in self.enq_map: - raise qls.err.RecordIdNotFoundError(journal_file.file_header, dequeue_record) - self.enq_map[dequeue_record.dequeue_record_id][2] = True - def report_str(self, _, show_records): - """Return a string containing a text report for all records in the map""" - if len(self.enq_map) == 0: - return 'No enqueued records found.' - rstr = '%d enqueued records found' % len(self.enq_map) - if show_records: - rstr += ":" - rid_list = self.enq_map.keys() - rid_list.sort() - for rid in rid_list: - journal_file, record, locked_flag = self.enq_map[rid] - if locked_flag: - lock_str = '[LOCKED]' - else: - lock_str = '' - rstr += '\n 0x%x:%s %s' % (journal_file.file_header.file_num, record, lock_str) - else: - rstr += '.' - return rstr - def unlock(self, journal_file, dequeue_record): - """Set the transaction lock for a given record_id to False""" - if dequeue_record.dequeue_record_id in self.enq_map: - if self.enq_map[dequeue_record.dequeue_record_id][2]: - self.enq_map[dequeue_record.dequeue_record_id][2] = False - else: - raise qls.err.RecordNotLockedError(journal_file.file_header, dequeue_record) - else: - raise qls.err.RecordIdNotFoundError(journal_file.file_header, dequeue_record) - -class TransactionMap(object): - """ - Map of open transactions used while recovering a QLS journal - """ - def __init__(self, enq_map): - self.txn_map = {} - self.enq_map = enq_map - def abort(self, xid): - """Perform an abort operation for the given xid record""" - for journal_file, record, _ in self.txn_map[xid]: - if isinstance(record, DequeueRecord): - if self.enq_map.contains(record.dequeue_record_id): - self.enq_map.unlock(journal_file, record) - else: - journal_file.decr_enq_cnt(record) - del self.txn_map[xid] - def add(self, journal_file, record): - if record.xid is None: - raise qls.err.NonTransactionalRecordError(journal_file.file_header, record, 'TransactionMap.add()') - if isinstance(record, DequeueRecord): - try: - self.enq_map.lock(journal_file, record) - except qls.err.RecordIdNotFoundError: - # Not in emap, look for rid in tmap - should not happen in practice - txn_op = self._find_record_id(record.xid, record.dequeue_record_id) - if txn_op != None: - if txn_op[2]: - raise qls.err.AlreadyLockedError(journal_file.file_header, record) - txn_op[2] = True - if record.xid in self.txn_map: - self.txn_map[record.xid].append([journal_file, record, False]) # append to existing list - else: - self.txn_map[record.xid] = [[journal_file, record, False]] # create new list - def commit(self, xid): - """Perform a commit operation for the given xid record""" - mismatch_list = [] - for journal_file, record, lock in self.txn_map[xid]: - if isinstance(record, EnqueueRecord): - self.enq_map.add(journal_file, record, lock) # Transfer enq to emap - else: - if self.enq_map.contains(record.dequeue_record_id): - self.enq_map.unlock(journal_file, record) - self.enq_map.delete(journal_file, record)[0].decr_enq_cnt(record) - else: - mismatch_list.append('0x%x' % record.dequeue_record_id) - del self.txn_map[xid] - return mismatch_list - def contains(self, xid): - """Return True if the xid exists in the map; False otherwise""" - return xid in self.txn_map - def delete(self, journal_file, transaction_record): - """Remove a transaction record from the map using either a commit or abort header""" - if transaction_record.magic[-1] == 'c': - return self.commit(transaction_record.xid) - if transaction_record.magic[-1] == 'a': - self.abort(transaction_record.xid) - else: - raise qls.err.InvalidRecordTypeError(journal_file.file_header, transaction_record, - 'delete from Transaction Map') - def get(self, xid): - if xid in self.txn_map: - return self.txn_map[xid] - return None - def get_prepared_list(self): - """ - Prepared list is a map of xid(key) to one of None, True or False. These represent respectively: - None: prepared, but neither committed or aborted (interrupted before commit or abort) - False: prepared and aborted (interrupted before abort complete) - True: prepared and committed (interrupted before commit complete) - """ - prepared_list = {} - for xid in self.get_xid_list(): - for _, record, _ in self.txn_map[xid]: - if isinstance(record, EnqueueRecord): - prepared_list[xid] = None - else: - prepared_list[xid] = record.is_transaction_complete_commit() - return prepared_list - def get_xid_list(self): - return self.txn_map.keys() - def report_str(self, _, show_records): - """Return a string containing a text report for all records in the map""" - if len(self.txn_map) == 0: - return 'No outstanding transactions found.' - rstr = '%d outstanding transaction(s)' % len(self.txn_map) - if show_records: - rstr += ':' - for xid, op_list in self.txn_map.iteritems(): - rstr += '\n %s containing %d operations:' % (Utils.format_xid(xid), len(op_list)) - for journal_file, record, _ in op_list: - rstr += '\n 0x%x:%s' % (journal_file.file_header.file_num, record) - else: - rstr += '.' - return rstr - def _find_record_id(self, xid, record_id): - """ Search for and return map list with supplied rid.""" - if xid in self.txn_map: - for txn_op in self.txn_map[xid]: - if txn_op[1].record_id == record_id: - return txn_op - for this_xid in self.txn_map.iterkeys(): - for txn_op in self.txn_map[this_xid]: - if txn_op[1].record_id == record_id: - return txn_op - return None - -class JournalStatistics(object): - """Journal statistics""" - def __init__(self): - self.total_record_count = 0 - self.transient_record_count = 0 - self.filler_record_count = 0 - self.enqueue_count = 0 - self.dequeue_count = 0 - self.transaction_record_count = 0 - self.transaction_enqueue_count = 0 - self.transaction_dequeue_count = 0 - self.transaction_commit_count = 0 - self.transaction_abort_count = 0 - self.transaction_operation_count = 0 - def __str__(self): - fstr = 'Total record count: %d\n' + \ - 'Transient record count: %d\n' + \ - 'Filler_record_count: %d\n' + \ - 'Enqueue_count: %d\n' + \ - 'Dequeue_count: %d\n' + \ - 'Transaction_record_count: %d\n' + \ - 'Transaction_enqueue_count: %d\n' + \ - 'Transaction_dequeue_count: %d\n' + \ - 'Transaction_commit_count: %d\n' + \ - 'Transaction_abort_count: %d\n' + \ - 'Transaction_operation_count: %d\n' - return fstr % (self.total_record_count, - self.transient_record_count, - self.filler_record_count, - self.enqueue_count, - self.dequeue_count, - self.transaction_record_count, - self.transaction_enqueue_count, - self.transaction_dequeue_count, - self.transaction_commit_count, - self.transaction_abort_count, - self.transaction_operation_count) - -class Journal(object): - """ - Instance of a Qpid Linear Store (QLS) journal. - """ - def __init__(self, directory, xid_prepared_list, args): - self.directory = directory - self.queue_name = os.path.basename(directory) - self.files = {} - self.file_num_list = None - self.file_num_itr = None - self.enq_map = EnqueueMap(self) - self.txn_map = TransactionMap(self.enq_map) - self.current_journal_file = None - self.first_rec_flag = None - self.statistics = JournalStatistics() - self.xid_prepared_list = xid_prepared_list # This is None for the TPL instance only - self.args = args - self.last_record_offset = None # TODO: Move into JournalFile - self.num_filler_records_required = None # TODO: Move into JournalFile - self.fill_to_offset = None - def add_record(self, record): - if isinstance(record, EnqueueRecord) or isinstance(record, DequeueRecord): - if record.xid_size > 0: - self.txn_map.add(self.current_journal_file, record) - else: - self.enq_map.add(self.current_journal_file, record, False) - elif isinstance(record, TransactionRecord): - self.txn_map.delete(self.current_journal_file, record) - else: - raise qls.err.InvalidRecordTypeError(self.current_journal_file, record, 'add to Journal') - def get_enq_map_record(self, rid): - return self.enq_map.get(rid) - def get_txn_map_record(self, xid): - return self.txn_map.get(xid) - def get_outstanding_txn_list(self): - return self.txn_map.get_xid_list() - def get_queue_name(self): - return self.queue_name - def recover(self, high_rid_counter): - print 'Recovering %s' % self.queue_name - self._analyze_files() - try: - while self._get_next_record(high_rid_counter): - pass - self._check_alignment() - except qls.err.NoMoreFilesInJournalError: - print 'No more files in journal' - except qls.err.FirstRecordOffsetMismatchError as err: - print '0x%08x: **** FRO ERROR: queue=\"%s\" fid=0x%x fro actual=0x%08x expected=0x%08x' % \ - (err.get_expected_fro(), err.get_queue_name(), err.get_file_number(), err.get_record_offset(), - err.get_expected_fro()) - def reconcile_transactions(self, prepared_list, txn_flag): - xid_list = self.txn_map.get_xid_list() - if len(xid_list) > 0: - print self.queue_name, 'contains', len(xid_list), 'open transaction(s):' - for xid in xid_list: - if xid in prepared_list.keys(): - commit_flag = prepared_list[xid] - if commit_flag is None: - print ' ', Utils.format_xid(xid), '- Assuming commit after prepare' - if txn_flag: - self.txn_map.commit(xid) - elif commit_flag: - print ' ', Utils.format_xid(xid), '- Completing interrupted commit operation' - if txn_flag: - self.txn_map.commit(xid) - else: - print ' ', Utils.format_xid(xid), '- Completing interrupted abort operation' - if txn_flag: - self.txn_map.abort(xid) - else: - print ' ', Utils.format_xid(xid), '- Ignoring, not in prepared transaction list' - if txn_flag: - self.txn_map.abort(xid) - def report(self, print_stats_flag): - print 'Journal "%s":' % self.queue_name - print '=' * (11 + len(self.queue_name)) - if print_stats_flag: - print str(self.statistics) - print self.enq_map.report_str(True, True) - print self.txn_map.report_str(True, True) - JournalFile.report_header() - for file_num in sorted(self.files.keys()): - self.files[file_num].report() - #TODO: move this to JournalFile, append to file info - if self.num_filler_records_required is not None and self.fill_to_offset is not None: - print '0x%x:0x%08x: %d filler records required for DBLK alignment to 0x%08x' % \ - (self.current_journal_file.file_header.file_num, self.last_record_offset, - self.num_filler_records_required, self.fill_to_offset) - print - #--- protected functions --- - def _analyze_files(self): - for dir_entry in os.listdir(self.directory): - dir_entry_bits = dir_entry.split('.') - if len(dir_entry_bits) == 2 and dir_entry_bits[1] == JournalRecoveryManager.JRNL_DIR_NAME: - fq_file_name = os.path.join(self.directory, dir_entry) - file_handle = open(fq_file_name) - args = Utils.load_args(file_handle, RecordHeader) - file_hdr = FileHeader(*args) - file_hdr.init(file_handle, *Utils.load_args(file_handle, FileHeader)) - if file_hdr.is_header_valid(file_hdr): - file_hdr.load(file_handle) - if file_hdr.is_valid(): - Utils.skip(file_handle, file_hdr.file_header_size_sblks * Utils.SBLK_SIZE) - self.files[file_hdr.file_num] = JournalFile(file_hdr) - self.file_num_list = sorted(self.files.keys()) - self.file_num_itr = iter(self.file_num_list) - def _check_alignment(self): # TODO: Move into JournalFile - remaining_sblks = self.last_record_offset % Utils.SBLK_SIZE - if remaining_sblks == 0: - self.num_filler_records_required = 0 - else: - self.num_filler_records_required = (Utils.SBLK_SIZE - remaining_sblks) / Utils.DBLK_SIZE - self.fill_to_offset = self.last_record_offset + (self.num_filler_records_required * Utils.DBLK_SIZE) - if self.args.show_recs or self.args.show_all_recs: - print '0x%x:0x%08x: %d filler records required for DBLK alignment to 0x%08x' % \ - (self.current_journal_file.file_header.file_num, self.last_record_offset, - self.num_filler_records_required, self.fill_to_offset) - def _check_file(self): - if self.current_journal_file is not None: - if not self.current_journal_file.file_header.is_end_of_file(): - return True - if self.current_journal_file.file_header.is_end_of_file(): - self.last_record_offset = self.current_journal_file.file_header.file_handle.tell() - if not self._get_next_file(): - return False - fhdr = self.current_journal_file.file_header - fhdr.file_handle.seek(fhdr.first_record_offset) - return True - def _get_next_file(self): - if self.current_journal_file is not None: - file_handle = self.current_journal_file.file_header.file_handle - if not file_handle.closed: # sanity check, should not be necessary - file_handle.close() - file_num = 0 - try: - while file_num == 0: - file_num = self.file_num_itr.next() - except StopIteration: - pass - if file_num == 0: - return False - self.current_journal_file = self.files[file_num] - self.first_rec_flag = True - if self.args.show_recs or self.args.show_all_recs: - print '0x%x:%s' % (self.current_journal_file.file_header.file_num, self.current_journal_file.file_header) - return True - def _get_next_record(self, high_rid_counter): - if not self._check_file(): - return False - self.last_record_offset = self.current_journal_file.file_header.file_handle.tell() - this_record = Utils.load(self.current_journal_file.file_header.file_handle, RecordHeader) - if not this_record.is_header_valid(self.current_journal_file.file_header): - return False - if self.first_rec_flag: - if this_record.file_offset != self.current_journal_file.file_header.first_record_offset: - raise qls.err.FirstRecordOffsetMismatchError(self.current_journal_file.file_header, this_record) - self.first_rec_flag = False - self.statistics.total_record_count += 1 - start_journal_file = self.current_journal_file - if isinstance(this_record, EnqueueRecord): - ok_flag = self._handle_enqueue_record(this_record, start_journal_file) - high_rid_counter.check(this_record.record_id) - if self.args.show_recs or self.args.show_all_recs: - print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record) - elif isinstance(this_record, DequeueRecord): - ok_flag = self._handle_dequeue_record(this_record, start_journal_file) - high_rid_counter.check(this_record.record_id) - if self.args.show_recs or self.args.show_all_recs: - print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record) - elif isinstance(this_record, TransactionRecord): - ok_flag = self._handle_transaction_record(this_record, start_journal_file) - high_rid_counter.check(this_record.record_id) - if self.args.show_recs or self.args.show_all_recs: - print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record) - else: - self.statistics.filler_record_count += 1 - ok_flag = True - if self.args.show_all_recs: - print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record) - Utils.skip(self.current_journal_file.file_header.file_handle, Utils.DBLK_SIZE) - return ok_flag - def _handle_enqueue_record(self, enqueue_record, start_journal_file): - while enqueue_record.load(self.current_journal_file.file_header.file_handle): - if not self._get_next_file(): - enqueue_record.truncated_flag = True - return False - if not enqueue_record.is_valid(start_journal_file): - return False - if enqueue_record.is_external() and enqueue_record.data != None: - raise qls.err.ExternalDataError(self.current_journal_file.file_header, enqueue_record) - if enqueue_record.is_transient(): - self.statistics.transient_record_count += 1 - return True - if enqueue_record.xid_size > 0: - self.txn_map.add(start_journal_file, enqueue_record) - self.statistics.transaction_operation_count += 1 - self.statistics.transaction_record_count += 1 - self.statistics.transaction_enqueue_count += 1 - else: - self.enq_map.add(start_journal_file, enqueue_record, False) - start_journal_file.incr_enq_cnt() - self.statistics.enqueue_count += 1 - return True - def _handle_dequeue_record(self, dequeue_record, start_journal_file): - while dequeue_record.load(self.current_journal_file.file_header.file_handle): - if not self._get_next_file(): - dequeue_record.truncated_flag = True - return False - if not dequeue_record.is_valid(start_journal_file): - return False - if dequeue_record.xid_size > 0: - if self.xid_prepared_list is None: # ie this is the TPL - dequeue_record.transaction_prepared_list_flag = True - elif not self.enq_map.contains(dequeue_record.dequeue_record_id): - dequeue_record.warnings.append('NOT IN EMAP') # Only for non-TPL records - self.txn_map.add(start_journal_file, dequeue_record) - self.statistics.transaction_operation_count += 1 - self.statistics.transaction_record_count += 1 - self.statistics.transaction_dequeue_count += 1 - else: - try: - self.enq_map.delete(start_journal_file, dequeue_record)[0].decr_enq_cnt(dequeue_record) - except qls.err.RecordIdNotFoundError: - dequeue_record.warnings.append('NOT IN EMAP') - self.statistics.dequeue_count += 1 - return True - def _handle_transaction_record(self, transaction_record, start_journal_file): - while transaction_record.load(self.current_journal_file.file_header.file_handle): - if not self._get_next_file(): - transaction_record.truncated_flag = True - return False - if not transaction_record.is_valid(start_journal_file): - return False - if transaction_record.magic[-1] == 'a': - self.statistics.transaction_abort_count += 1 - else: - self.statistics.transaction_commit_count += 1 - if self.txn_map.contains(transaction_record.xid): - self.txn_map.delete(self.current_journal_file, transaction_record) - else: - transaction_record.warnings.append('NOT IN TMAP') -# if transaction_record.magic[-1] == 'c': # commits only -# self._txn_obj_list[hdr.xid] = hdr - self.statistics.transaction_record_count += 1 - return True - def _load_data(self, record): - while not record.is_complete: - record.load(self.current_journal_file.file_handle) - -class JournalFile(object): - def __init__(self, file_header): - self.file_header = file_header - self.enq_cnt = 0 - self.deq_cnt = 0 - self.num_filler_records_required = None - def incr_enq_cnt(self): - self.enq_cnt += 1 - def decr_enq_cnt(self, record): - if self.enq_cnt <= self.deq_cnt: - raise qls.err.EnqueueCountUnderflowError(self.file_header, record) - self.deq_cnt += 1 - def get_enq_cnt(self): - return self.enq_cnt - self.deq_cnt - def is_outstanding_enq(self): - return self.enq_cnt > self.deq_cnt - @staticmethod - def report_header(): - print 'file_num enq_cnt p_no efp journal_file' - print '-------- ------- ---- ----- ------------' - def report(self): - comment = '<uninitialized>' if self.file_header.file_num == 0 else '' - file_num_str = '0x%x' % self.file_header.file_num - print '%8s %7d %4d %4dk %s %s' % (file_num_str, self.get_enq_cnt(), self.file_header.partition_num, - self.file_header.efp_data_size_kb, - os.path.basename(self.file_header.file_handle.name), comment) +import time class RecordHeader(object): FORMAT = '<4s2H2Q' @@ -590,14 +40,14 @@ class RecordHeader(object): self.record_id = record_id self.warnings = [] self.truncated_flag = False - def checksum_encode(self): + def encode(self): return struct.pack(RecordHeader.FORMAT, self.magic, self.version, self.user_flags, self.serial, self.record_id) def load(self, file_handle): pass @staticmethod def discriminate(args): """Use the last char in the header magic to determine the header type""" - return _CLASSES.get(args[1][-1], RecordHeader) + return CLASSES.get(args[1][-1], RecordHeader) def is_empty(self): """Return True if this record is empty (ie has a magic of 0x0000""" return self.magic == '\x00'*4 @@ -608,17 +58,12 @@ class RecordHeader(object): if self.magic[:3] != 'QLS' or self.magic[3] not in ['a', 'c', 'd', 'e', 'f', 'x']: return False if self.magic[-1] != 'x': - if self.version != Utils.RECORD_VERSION: - raise qls.err.InvalidRecordVersionError(file_header, self, Utils.RECORD_VERSION) + if self.version != qls.utils.DEFAULT_RECORD_VERSION: + raise qls.err.InvalidRecordVersionError(file_header, self, qls.utils.DEFAULT_RECORD_VERSION) if self.serial != file_header.serial: return False return True - def _get_warnings(self): - warn_str = '' - for warn in self.warnings: - warn_str += '<%s>' % warn - return warn_str - def __str__(self): + def to_string(self): """Return string representation of this header""" if self.is_empty(): return '0x%08x: <empty>' % (self.file_offset) @@ -628,6 +73,14 @@ class RecordHeader(object): return '0x%08x: [%c v=%d f=0x%04x rid=0x%x]' % \ (self.file_offset, self.magic[-1].upper(), self.version, self.user_flags, self.record_id) return '0x%08x: <error, unknown magic "%s" (possible overwrite boundary?)>' % (self.file_offset, self.magic) + def _get_warnings(self): + warn_str = '' + for warn in self.warnings: + warn_str += '<%s>' % warn + return warn_str + def __str__(self): + """Return string representation of this header""" + return RecordHeader.to_string(self) class RecordTail(object): FORMAT = '<4sL2Q' @@ -653,24 +106,28 @@ class RecordTail(object): if self.valid_flag is None: if not self.complete: return False - self.valid_flag = Utils.inv_str(self.xmagic) == record.magic and \ + self.valid_flag = qls.utils.inv_str(self.xmagic) == record.magic and \ self.serial == record.serial and \ self.record_id == record.record_id and \ - Utils.adler32(record.checksum_encode()) == self.checksum + qls.utils.adler32(record.checksum_encode()) == self.checksum return self.valid_flag - def __str__(self): + def to_string(self): """Return a string representation of the this RecordTail instance""" if self.valid_flag is not None: if not self.valid_flag: return '[INVALID RECORD TAIL]' - magic = Utils.inv_str(self.xmagic) + magic = qls.utils.inv_str(self.xmagic) magic_char = magic[-1].upper() if magic[-1] in string.printable else '?' return '[%c cs=0x%08x rid=0x%x]' % (magic_char, self.checksum, self.record_id) + def __str__(self): + """Return a string representation of the this RecordTail instance""" + return RecordTail.to_string(self) class FileHeader(RecordHeader): FORMAT = '<2H4x5QH' - def init(self, file_handle, _, file_header_size_sblks, partition_num, efp_data_size_kb, - first_record_offset, timestamp_sec, timestamp_ns, file_num, queue_name_len): + MAGIC = 'QLSf' + def init(self, file_handle, _, file_header_size_sblks, partition_num, efp_data_size_kb, first_record_offset, + timestamp_sec, timestamp_ns, file_num, queue_name_len): self.file_handle = file_handle self.file_header_size_sblks = file_header_size_sblks self.partition_num = partition_num @@ -681,11 +138,21 @@ class FileHeader(RecordHeader): self.file_num = file_num self.queue_name_len = queue_name_len self.queue_name = None - def load(self, file_handle): - self.queue_name = file_handle.read(self.queue_name_len) + def encode(self): + if self.queue_name is None: + return RecordHeader.encode(self) + struct.pack(self.FORMAT, self.file_header_size_sblks, \ + self.partition_num, self.efp_data_size_kb, \ + self.first_record_offset, self.timestamp_sec, \ + self.timestamp_ns, self.file_num, 0) + return RecordHeader.encode(self) + struct.pack(self.FORMAT, self.file_header_size_sblks, self.partition_num, \ + self.efp_data_size_kb, self.first_record_offset, \ + self.timestamp_sec, self.timestamp_ns, self.file_num, \ + self.queue_name_len) + self.queue_name def get_file_size(self): """Sum of file header size and data size""" - return (self.file_header_size_sblks * Utils.SBLK_SIZE) + (self.efp_data_size_kb * 1024) + return (self.file_header_size_sblks * qls.utils.DEFAULT_SBLK_SIZE) + (self.efp_data_size_kb * 1024) + def load(self, file_handle): + self.queue_name = file_handle.read(self.queue_name_len) def is_end_of_file(self): return self.file_handle.tell() >= self.get_file_size() def is_valid(self): @@ -704,18 +171,22 @@ class FileHeader(RecordHeader): return True def timestamp_str(self): """Get the timestamp of this record in string format""" - time = gmtime(self.timestamp_sec) + now = time.gmtime(self.timestamp_sec) fstr = '%%a %%b %%d %%H:%%M:%%S.%09d %%Y' % (self.timestamp_ns) - return strftime(fstr, time) - def __str__(self): + return time.strftime(fstr, now) + def to_string(self): """Return a string representation of the this FileHeader instance""" - return '%s fnum=0x%x fro=0x%08x p=%d s=%dk t=%s %s' % (RecordHeader.__str__(self), self.file_num, + return '%s fnum=0x%x fro=0x%08x p=%d s=%dk t=%s %s' % (RecordHeader.to_string(self), self.file_num, self.first_record_offset, self.partition_num, self.efp_data_size_kb, self.timestamp_str(), self._get_warnings()) + def __str__(self): + """Return a string representation of the this FileHeader instance""" + return FileHeader.to_string(self) class EnqueueRecord(RecordHeader): FORMAT = '<2Q' + MAGIC = 'QLSe' EXTERNAL_FLAG_MASK = 0x20 TRANSIENT_FLAG_MASK = 0x10 def init(self, _, xid_size, data_size): @@ -726,9 +197,13 @@ class EnqueueRecord(RecordHeader): self.data = None self.data_complete = False self.record_tail = None - def checksum_encode(self): - return RecordHeader.checksum_encode(self) + struct.pack(self.FORMAT, self.xid_size, self.data_size) + \ - self.xid + self.data + def checksum_encode(self): # encode excluding record tail + bytes = RecordHeader.encode(self) + struct.pack(self.FORMAT, self.xid_size, self.data_size) + if self.xid is not None: + bytes += self.xid + if self.data is not None: + bytes += self.data + return bytes def is_external(self): return self.user_flags & EnqueueRecord.EXTERNAL_FLAG_MASK > 0 def is_transient(self): @@ -750,13 +225,13 @@ class EnqueueRecord(RecordHeader): return True def load(self, file_handle): """Return True when load is incomplete and must be called again with new file handle""" - self.xid, self.xid_complete = Utils.load_data(file_handle, self.xid, self.xid_size) + self.xid, self.xid_complete = qls.utils.load_data(file_handle, self.xid, self.xid_size) if not self.xid_complete: return True if self.is_external(): self.data_complete = True else: - self.data, self.data_complete = Utils.load_data(file_handle, self.data, self.data_size) + self.data, self.data_complete = qls.utils.load_data(file_handle, self.data, self.data_size) if not self.data_complete: return True if self.xid_size > 0 or self.data_size > 0: @@ -769,6 +244,19 @@ class EnqueueRecord(RecordHeader): else: return True return False + def to_string(self, show_xid_flag, show_data_flag): + """Return a string representation of the this EnqueueRecord instance""" + if self.truncated_flag: + return '%s xid(%d) data(%d) [Truncated, no more files in journal]' % (RecordHeader.__str__(self), + self.xid_size, self.data_size) + if self.record_tail is None: + record_tail_str = '' + else: + record_tail_str = self.record_tail.to_string() + return '%s %s %s %s %s %s' % (RecordHeader.to_string(self), + qls.utils.format_xid(self.xid, self.xid_size, show_xid_flag), + qls.utils.format_data(self.data, self.data_size, show_data_flag), + record_tail_str, self._print_flags(), self._get_warnings()) def _print_flags(self): """Utility function to decode the flags field in the header and print a string representation""" fstr = '' @@ -784,19 +272,11 @@ class EnqueueRecord(RecordHeader): return fstr def __str__(self): """Return a string representation of the this EnqueueRecord instance""" - if self.truncated_flag: - return '%s xid(%d) data(%d) [Truncated, no more files in journal]' % (RecordHeader.__str__(self), - self.xid_size, self.data_size) - if self.record_tail is None: - record_tail_str = '' - else: - record_tail_str = str(self.record_tail) - return '%s %s %s %s %s %s' % (RecordHeader.__str__(self), Utils.format_xid(self.xid, self.xid_size), - Utils.format_data(self.data_size, self.data), record_tail_str, - self._print_flags(), self._get_warnings()) + return EnqueueRecord.to_string(self, False, False) class DequeueRecord(RecordHeader): FORMAT = '<2Q' + MAGIC = 'QLSd' TXN_COMPLETE_COMMIT_FLAG = 0x10 def init(self, _, dequeue_record_id, xid_size): self.dequeue_record_id = dequeue_record_id @@ -805,8 +285,8 @@ class DequeueRecord(RecordHeader): self.xid = None self.xid_complete = False self.record_tail = None - def checksum_encode(self): - return RecordHeader.checksum_encode(self) + struct.pack(self.FORMAT, self.dequeue_record_id, self.xid_size) + \ + def checksum_encode(self): # encode excluding record tail + return RecordHeader.encode(self) + struct.pack(self.FORMAT, self.dequeue_record_id, self.xid_size) + \ self.xid def is_transaction_complete_commit(self): return self.user_flags & DequeueRecord.TXN_COMPLETE_COMMIT_FLAG > 0 @@ -825,7 +305,7 @@ class DequeueRecord(RecordHeader): return True def load(self, file_handle): """Return True when load is incomplete and must be called again with new file handle""" - self.xid, self.xid_complete = Utils.load_data(file_handle, self.xid, self.xid_size) + self.xid, self.xid_complete = qls.utils.load_data(file_handle, self.xid, self.xid_size) if not self.xid_complete: return True if self.xid_size > 0: @@ -838,6 +318,19 @@ class DequeueRecord(RecordHeader): else: return True return False + def to_string(self, show_xid_flag): + """Return a string representation of the this DequeueRecord instance""" + if self.truncated_flag: + return '%s xid(%d) drid=0x%x [Truncated, no more files in journal]' % (RecordHeader.__str__(self), + self.xid_size, + self.dequeue_record_id) + if self.record_tail is None: + record_tail_str = '' + else: + record_tail_str = self.record_tail.to_string() + return '%s drid=0x%x %s %s %s %s' % (RecordHeader.to_string(self), self.dequeue_record_id, + qls.utils.format_xid(self.xid, self.xid_size, show_xid_flag), + record_tail_str, self._print_flags(), self._get_warnings()) def _print_flags(self): """Utility function to decode the flags field in the header and print a string representation""" if self.transaction_prepared_list_flag: @@ -848,27 +341,19 @@ class DequeueRecord(RecordHeader): return '' def __str__(self): """Return a string representation of the this DequeueRecord instance""" - if self.truncated_flag: - return '%s xid(%d) drid=0x%x [Truncated, no more files in journal]' % (RecordHeader.__str__(self), - self.xid_size, - self.dequeue_record_id) - if self.record_tail is None: - record_tail_str = '' - else: - record_tail_str = str(self.record_tail) - return '%s %s drid=0x%x %s %s %s' % (RecordHeader.__str__(self), Utils.format_xid(self.xid, self.xid_size), - self.dequeue_record_id, record_tail_str, self._print_flags(), - self._get_warnings()) + return DequeueRecord.to_string(self, False) class TransactionRecord(RecordHeader): FORMAT = '<Q' + MAGIC_ABORT = 'QLSa' + MAGIC_COMMIT = 'QLSc' def init(self, _, xid_size): self.xid_size = xid_size self.xid = None self.xid_complete = False self.record_tail = None - def checksum_encode(self): - return RecordHeader.checksum_encode(self) + struct.pack(self.FORMAT, self.xid_size) + self.xid + def checksum_encode(self): # encode excluding record tail + return RecordHeader.encode(self) + struct.pack(self.FORMAT, self.xid_size) + self.xid def is_valid(self, journal_file): if not RecordHeader.is_header_valid(self, journal_file.file_header): return False @@ -881,7 +366,7 @@ class TransactionRecord(RecordHeader): return True def load(self, file_handle): """Return True when load is incomplete and must be called again with new file handle""" - self.xid, self.xid_complete = Utils.load_data(file_handle, self.xid, self.xid_size) + self.xid, self.xid_complete = qls.utils.load_data(file_handle, self.xid, self.xid_size) if not self.xid_complete: return True if self.xid_size > 0: @@ -894,168 +379,24 @@ class TransactionRecord(RecordHeader): else: return True return False - def __str__(self): + def to_string(self, show_xid_flag): """Return a string representation of the this TransactionRecord instance""" if self.truncated_flag: return '%s xid(%d) [Truncated, no more files in journal]' % (RecordHeader.__str__(self), self.xid_size) if self.record_tail is None: record_tail_str = '' else: - record_tail_str = str(self.record_tail) - return '%s %s %s %s' % (RecordHeader.__str__(self), Utils.format_xid(self.xid, self.xid_size), record_tail_str, - self._get_warnings()) - -class Utils(object): - """Class containing utility functions for dealing with the journal""" - DBLK_SIZE = 128 - RECORD_VERSION = 2 - SBLK_SIZE = 4096 - @staticmethod - def adler32(data): - return zlib.adler32(data) & 0xffffffff - @staticmethod - def create_record(magic, uflags, journal_file, record_id, dequeue_record_id, xid, data): - record_class = _CLASSES.get(magic[-1]) - record = record_class(0, magic, Utils.RECORD_VERSION, uflags, journal_file.file_header.serial, record_id) - xid_length = len(xid) if xid is not None else 0 - if isinstance(record, EnqueueRecord): - data_length = len(data) if data is not None else 0 - record.init(None, xid_length, data_length) - elif isinstance(record, DequeueRecord): - record.init(None, dequeue_record_id, xid_length) - elif isinstance(record, TransactionRecord): - record.init(None, xid_length) - else: - raise qls.err.InvalidClassError(record.__class__.__name__) - if xid is not None: - record.xid = xid - record.xid_complete = True - if data is not None: - record.data = data - record.data_complete = True - record.record_tail = RecordTail(None) - record.record_tail.xmagic = Utils.inv_str(magic) - record.record_tail.checksum = Utils.adler32(record.checksum_encode()) - record.record_tail.serial = record.serial - record.record_tail.record_id = record.record_id - return record - @staticmethod - def format_data(dsize, data): - """Format binary data for printing""" - if data == None: - return '' - # << DEBUG >> - begin = data.find('msg') - end = data.find('\0', begin) - return 'data(%d)="%s"' % (dsize, data[begin:end]) - # << END DEBUG - if Utils._is_printable(data): - datastr = Utils._split_str(data) - else: - datastr = Utils._hex_split_str(data) - if dsize != len(data): - raise qls.err.DataSizeError(dsize, len(data), datastr) - return 'data(%d)="%s"' % (dsize, datastr) - @staticmethod - def format_xid(xid, xidsize=None): - """Format binary XID for printing""" - if xid == None and xidsize != None: - if xidsize > 0: - raise qls.err.XidSizeError(xidsize, 0, None) - return '' - if Utils._is_printable(xid): - xidstr = '"%s"' % Utils._split_str(xid) - else: - xidstr = '0x%s' % Utils._hex_split_str(xid) - if xidsize == None: - xidsize = len(xid) - elif xidsize != len(xid): - raise qls.err.XidSizeError(xidsize, len(xid), xidstr) - return 'xid(%d)=%s' % (xidsize, xidstr) - @staticmethod - def inv_str(in_string): - """Perform a binary 1's compliment (invert all bits) on a binary string""" - istr = '' - for index in range(0, len(in_string)): - istr += chr(~ord(in_string[index]) & 0xff) - return istr - @staticmethod - def load(file_handle, klass): - """Load a record of class klass from a file""" - args = Utils.load_args(file_handle, klass) - subclass = klass.discriminate(args) - result = subclass(*args) # create instance of record - if subclass != klass: - result.init(*Utils.load_args(file_handle, subclass)) - return result - @staticmethod - def load_args(file_handle, klass): - """Load the arguments from class klass""" - size = struct.calcsize(klass.FORMAT) - foffs = file_handle.tell(), - fbin = file_handle.read(size) - if len(fbin) != size: - raise qls.err.UnexpectedEndOfFileError(len(fbin), size, foffs, file_handle.name) - return foffs + struct.unpack(klass.FORMAT, fbin) - @staticmethod - def load_data(file_handle, element, element_size): - if element_size == 0: - return element, True - if element is None: - element = file_handle.read(element_size) - else: - read_size = element_size - len(element) - element += file_handle.read(read_size) - return element, len(element) == element_size - @staticmethod - def skip(file_handle, boundary): - """Read and discard disk bytes until the next multiple of boundary""" - if not file_handle.closed: - file_handle.read(Utils._rem_bytes_in_block(file_handle, boundary)) - #--- protected functions --- - @staticmethod - def _hex_str(in_str, begin, end): - """Return a binary string as a hex string""" - hstr = '' - for index in range(begin, end): - if Utils._is_printable(in_str[index]): - hstr += in_str[index] - else: - hstr += '\\%02x' % ord(in_str[index]) - return hstr - @staticmethod - def _hex_split_str(in_str):#, split_size = 50): - """Split a hex string into two parts separated by an ellipsis""" -# if len(in_str) <= split_size: -# return Utils._hex_str(in_str, 0, len(in_str)) -# return Utils._hex_str(in_str, 0, 10) + ' ... ' + Utils._hex_str(in_str, len(in_str)-10, len(in_str)) - return ''.join(x.encode('hex') for x in reversed(in_str)) - @staticmethod - def _is_printable(in_str): - """Return True if in_str in printable; False otherwise.""" - for this_char in in_str: - if this_char not in string.printable: - return False - return True - @staticmethod - def _rem_bytes_in_block(file_handle, block_size): - """Return the remaining bytes in a block""" - foffs = file_handle.tell() - return (Utils._size_in_blocks(foffs, block_size) * block_size) - foffs - @staticmethod - def _size_in_blocks(size, block_size): - """Return the size in terms of data blocks""" - return int((size + block_size - 1) / block_size) - @staticmethod - def _split_str(in_str, split_size = 50): - """Split a string into two parts separated by an ellipsis if it is longer than split_size""" - if len(in_str) < split_size: - return in_str - return in_str[:25] + ' ... ' + in_str[-25:] + record_tail_str = self.record_tail.to_string() + return '%s %s %s %s' % (RecordHeader.to_string(self), + qls.utils.format_xid(self.xid, self.xid_size, show_xid_flag), + record_tail_str, self._get_warnings()) + def __str__(self): + """Return a string representation of the this TransactionRecord instance""" + return TransactionRecord.to_string(self, False) # ============================================================================= -_CLASSES = { +CLASSES = { 'a': TransactionRecord, 'c': TransactionRecord, 'd': DequeueRecord, |