summaryrefslogtreecommitdiff
path: root/qpid/tools/src/py/qls/anal.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/tools/src/py/qls/anal.py')
-rw-r--r--qpid/tools/src/py/qls/anal.py593
1 files changed, 593 insertions, 0 deletions
diff --git a/qpid/tools/src/py/qls/anal.py b/qpid/tools/src/py/qls/anal.py
new file mode 100644
index 0000000000..865dfa16c7
--- /dev/null
+++ b/qpid/tools/src/py/qls/anal.py
@@ -0,0 +1,593 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+Module: qls.anal
+
+Classes for recovery and analysis of a Qpid Linear Store (QLS).
+"""
+
+import os.path
+import qls.err
+import qls.jrnl
+import qls.utils
+
+class HighCounter(object):
+ def __init__(self):
+ self.num = 0
+ def check(self, num):
+ if self.num < num:
+ self.num = num
+ def get(self):
+ return self.num
+ def get_next(self):
+ self.num += 1
+ return self.num
+
+class JournalRecoveryManager(object):
+ TPL_DIR_NAME = 'tpl'
+ JRNL_DIR_NAME = 'jrnl'
+ def __init__(self, directory, args):
+ if not os.path.exists(directory):
+ raise qls.err.InvalidQlsDirectoryNameError(directory)
+ self.directory = directory
+ self.args = args
+ self.tpl = None
+ self.journals = {}
+ self.high_rid_counter = HighCounter()
+ self.prepared_list = None
+ def report(self, print_stats_flag):
+ self._reconcile_transactions(self.prepared_list, self.args.txn)
+ if self.tpl is not None:
+ self.tpl.report(print_stats_flag)
+ for queue_name in sorted(self.journals.keys()):
+ self.journals[queue_name].report(print_stats_flag)
+ def run(self):
+ tpl_dir = os.path.join(self.directory, JournalRecoveryManager.TPL_DIR_NAME)
+ if os.path.exists(tpl_dir):
+ self.tpl = Journal(tpl_dir, None, self.args)
+ self.tpl.recover(self.high_rid_counter)
+ if self.args.show_recs or self.args.show_all_recs:
+ print
+ jrnl_dir = os.path.join(self.directory, JournalRecoveryManager.JRNL_DIR_NAME)
+ self.prepared_list = self.tpl.txn_map.get_prepared_list() if self.tpl is not None else {}
+ if os.path.exists(jrnl_dir):
+ for dir_entry in sorted(os.listdir(jrnl_dir)):
+ jrnl = Journal(os.path.join(jrnl_dir, dir_entry), self.prepared_list, self.args)
+ jrnl.recover(self.high_rid_counter)
+ self.journals[jrnl.get_queue_name()] = jrnl
+ if self.args.show_recs or self.args.show_all_recs:
+ print
+ def _reconcile_transactions(self, prepared_list, txn_flag):
+ print 'Transaction reconciliation report:'
+ print '=================================='
+ print len(prepared_list), 'open transaction(s) found in prepared transaction list:'
+ for xid in prepared_list.keys():
+ commit_flag = prepared_list[xid]
+ if commit_flag is None:
+ status = '[Prepared, neither committed nor aborted - assuming commit]'
+ elif commit_flag:
+ status = '[Prepared, but interrupted during commit phase]'
+ else:
+ status = '[Prepared, but interrupted during abort phase]'
+ print ' ', qls.utils.format_xid(xid), status
+ if prepared_list[xid] is None: # Prepared, but not committed or aborted
+ enqueue_record = self.tpl.get_txn_map_record(xid)[0][1]
+ dequeue_record = qls.utils.create_record(qls.jrnl.DequeueRecord.MAGIC, \
+ qls.jrnl.DequeueRecord.TXN_COMPLETE_COMMIT_FLAG, \
+ self.tpl.current_journal_file, \
+ self.high_rid_counter.get_next(), \
+ enqueue_record.record_id, xid, None)
+ if txn_flag:
+ self.tpl.add_record(dequeue_record)
+ for queue_name in sorted(self.journals.keys()):
+ self.journals[queue_name].reconcile_transactions(prepared_list, txn_flag)
+ if len(prepared_list) > 0:
+ print 'Completing prepared transactions in prepared transaction list:'
+ for xid in prepared_list.keys():
+ print ' ', qls.utils.format_xid(xid)
+ transaction_record = qls.utils.create_record(qls.jrnl.TransactionRecord.MAGIC_COMMIT, 0, \
+ self.tpl.current_journal_file, \
+ self.high_rid_counter.get_next(), None, xid, None)
+ if txn_flag:
+ self.tpl.add_record(transaction_record)
+ print
+
+class EnqueueMap(object):
+ """
+ Map of enqueued records in a QLS journal
+ """
+ def __init__(self, journal):
+ self.journal = journal
+ self.enq_map = {}
+ def add(self, journal_file, enq_record, locked_flag):
+ if enq_record.record_id in self.enq_map:
+ raise qls.err.DuplicateRecordIdError(self.journal.current_file_header, enq_record)
+ self.enq_map[enq_record.record_id] = [journal_file, enq_record, locked_flag]
+ def contains(self, rid):
+ """Return True if the map contains the given rid"""
+ return rid in self.enq_map
+ def delete(self, journal_file, deq_record):
+ if deq_record.dequeue_record_id in self.enq_map:
+ enq_list = self.enq_map[deq_record.dequeue_record_id]
+ del self.enq_map[deq_record.dequeue_record_id]
+ return enq_list
+ else:
+ raise qls.err.RecordIdNotFoundError(journal_file.file_header, deq_record)
+ def get(self, record_id):
+ if record_id in self.enq_map:
+ return self.enq_map[record_id]
+ return None
+ def lock(self, journal_file, dequeue_record):
+ if dequeue_record.dequeue_record_id not in self.enq_map:
+ raise qls.err.RecordIdNotFoundError(journal_file.file_header, dequeue_record)
+ self.enq_map[dequeue_record.dequeue_record_id][2] = True
+ def report_str(self, _, show_records):
+ """Return a string containing a text report for all records in the map"""
+ if len(self.enq_map) == 0:
+ return 'No enqueued records found.'
+ rstr = '%d enqueued records found' % len(self.enq_map)
+ if show_records:
+ rstr += ":"
+ rid_list = self.enq_map.keys()
+ rid_list.sort()
+ for rid in rid_list:
+ journal_file, record, locked_flag = self.enq_map[rid]
+ if locked_flag:
+ lock_str = '[LOCKED]'
+ else:
+ lock_str = ''
+ rstr += '\n 0x%x:%s %s' % (journal_file.file_header.file_num, record, lock_str)
+ else:
+ rstr += '.'
+ return rstr
+ def unlock(self, journal_file, dequeue_record):
+ """Set the transaction lock for a given record_id to False"""
+ if dequeue_record.dequeue_record_id in self.enq_map:
+ if self.enq_map[dequeue_record.dequeue_record_id][2]:
+ self.enq_map[dequeue_record.dequeue_record_id][2] = False
+ else:
+ raise qls.err.RecordNotLockedError(journal_file.file_header, dequeue_record)
+ else:
+ raise qls.err.RecordIdNotFoundError(journal_file.file_header, dequeue_record)
+
+class TransactionMap(object):
+ """
+ Map of open transactions used while recovering a QLS journal
+ """
+ def __init__(self, enq_map):
+ self.txn_map = {}
+ self.enq_map = enq_map
+ def abort(self, xid):
+ """Perform an abort operation for the given xid record"""
+ for journal_file, record, _ in self.txn_map[xid]:
+ if isinstance(record, qls.jrnl.DequeueRecord):
+ if self.enq_map.contains(record.dequeue_record_id):
+ self.enq_map.unlock(journal_file, record)
+ else:
+ journal_file.decr_enq_cnt(record)
+ del self.txn_map[xid]
+ def add(self, journal_file, record):
+ if record.xid is None:
+ raise qls.err.NonTransactionalRecordError(journal_file.file_header, record, 'TransactionMap.add()')
+ if isinstance(record, qls.jrnl.DequeueRecord):
+ try:
+ self.enq_map.lock(journal_file, record)
+ except qls.err.RecordIdNotFoundError:
+ # Not in emap, look for rid in tmap - should not happen in practice
+ txn_op = self._find_record_id(record.xid, record.dequeue_record_id)
+ if txn_op != None:
+ if txn_op[2]:
+ raise qls.err.AlreadyLockedError(journal_file.file_header, record)
+ txn_op[2] = True
+ if record.xid in self.txn_map:
+ self.txn_map[record.xid].append([journal_file, record, False]) # append to existing list
+ else:
+ self.txn_map[record.xid] = [[journal_file, record, False]] # create new list
+ def commit(self, xid):
+ """Perform a commit operation for the given xid record"""
+ mismatch_list = []
+ for journal_file, record, lock in self.txn_map[xid]:
+ if isinstance(record, qls.jrnl.EnqueueRecord):
+ self.enq_map.add(journal_file, record, lock) # Transfer enq to emap
+ else:
+ if self.enq_map.contains(record.dequeue_record_id):
+ self.enq_map.unlock(journal_file, record)
+ self.enq_map.delete(journal_file, record)[0].decr_enq_cnt(record)
+ else:
+ mismatch_list.append('0x%x' % record.dequeue_record_id)
+ del self.txn_map[xid]
+ return mismatch_list
+ def contains(self, xid):
+ """Return True if the xid exists in the map; False otherwise"""
+ return xid in self.txn_map
+ def delete(self, journal_file, transaction_record):
+ """Remove a transaction record from the map using either a commit or abort header"""
+ if transaction_record.magic[-1] == 'c':
+ return self.commit(transaction_record.xid)
+ if transaction_record.magic[-1] == 'a':
+ self.abort(transaction_record.xid)
+ else:
+ raise qls.err.InvalidRecordTypeError(journal_file.file_header, transaction_record,
+ 'delete from Transaction Map')
+ def get(self, xid):
+ if xid in self.txn_map:
+ return self.txn_map[xid]
+ return None
+ def get_prepared_list(self):
+ """
+ Prepared list is a map of xid(key) to one of None, True or False. These represent respectively:
+ None: prepared, but neither committed or aborted (interrupted before commit or abort)
+ False: prepared and aborted (interrupted before abort complete)
+ True: prepared and committed (interrupted before commit complete)
+ """
+ prepared_list = {}
+ for xid in self.get_xid_list():
+ for _, record, _ in self.txn_map[xid]:
+ if isinstance(record, qls.jrnl.EnqueueRecord):
+ prepared_list[xid] = None
+ else:
+ prepared_list[xid] = record.is_transaction_complete_commit()
+ return prepared_list
+ def get_xid_list(self):
+ return self.txn_map.keys()
+ def report_str(self, _, show_records):
+ """Return a string containing a text report for all records in the map"""
+ if len(self.txn_map) == 0:
+ return 'No outstanding transactions found.'
+ rstr = '%d outstanding transaction(s)' % len(self.txn_map)
+ if show_records:
+ rstr += ':'
+ for xid, op_list in self.txn_map.iteritems():
+ rstr += '\n %s containing %d operations:' % (qls.utils.format_xid(xid), len(op_list))
+ for journal_file, record, _ in op_list:
+ rstr += '\n 0x%x:%s' % (journal_file.file_header.file_num, record)
+ else:
+ rstr += '.'
+ return rstr
+ def _find_record_id(self, xid, record_id):
+ """ Search for and return map list with supplied rid."""
+ if xid in self.txn_map:
+ for txn_op in self.txn_map[xid]:
+ if txn_op[1].record_id == record_id:
+ return txn_op
+ for this_xid in self.txn_map.iterkeys():
+ for txn_op in self.txn_map[this_xid]:
+ if txn_op[1].record_id == record_id:
+ return txn_op
+ return None
+
+class JournalStatistics(object):
+ """Journal statistics"""
+ def __init__(self):
+ self.total_record_count = 0
+ self.transient_record_count = 0
+ self.filler_record_count = 0
+ self.enqueue_count = 0
+ self.dequeue_count = 0
+ self.transaction_record_count = 0
+ self.transaction_enqueue_count = 0
+ self.transaction_dequeue_count = 0
+ self.transaction_commit_count = 0
+ self.transaction_abort_count = 0
+ self.transaction_operation_count = 0
+ def __str__(self):
+ fstr = 'Total record count: %d\n' + \
+ 'Transient record count: %d\n' + \
+ 'Filler_record_count: %d\n' + \
+ 'Enqueue_count: %d\n' + \
+ 'Dequeue_count: %d\n' + \
+ 'Transaction_record_count: %d\n' + \
+ 'Transaction_enqueue_count: %d\n' + \
+ 'Transaction_dequeue_count: %d\n' + \
+ 'Transaction_commit_count: %d\n' + \
+ 'Transaction_abort_count: %d\n' + \
+ 'Transaction_operation_count: %d\n'
+ return fstr % (self.total_record_count,
+ self.transient_record_count,
+ self.filler_record_count,
+ self.enqueue_count,
+ self.dequeue_count,
+ self.transaction_record_count,
+ self.transaction_enqueue_count,
+ self.transaction_dequeue_count,
+ self.transaction_commit_count,
+ self.transaction_abort_count,
+ self.transaction_operation_count)
+
+class Journal(object):
+ """
+ Instance of a Qpid Linear Store (QLS) journal.
+ """
+ def __init__(self, directory, xid_prepared_list, args):
+ self.directory = directory
+ self.queue_name = os.path.basename(directory)
+ self.files = {}
+ self.file_num_list = None
+ self.file_num_itr = None
+ self.enq_map = EnqueueMap(self)
+ self.txn_map = TransactionMap(self.enq_map)
+ self.current_journal_file = None
+ self.first_rec_flag = None
+ self.statistics = JournalStatistics()
+ self.xid_prepared_list = xid_prepared_list # This is None for the TPL instance only
+ self.args = args
+ self.last_record_offset = None # TODO: Move into JournalFile
+ self.num_filler_records_required = None # TODO: Move into JournalFile
+ self.fill_to_offset = None
+ def add_record(self, record):
+ if isinstance(record, qls.jrnl.EnqueueRecord) or isinstance(record, qls.jrnl.DequeueRecord):
+ if record.xid_size > 0:
+ self.txn_map.add(self.current_journal_file, record)
+ else:
+ self.enq_map.add(self.current_journal_file, record, False)
+ elif isinstance(record, qls.jrnl.TransactionRecord):
+ self.txn_map.delete(self.current_journal_file, record)
+ else:
+ raise qls.err.InvalidRecordTypeError(self.current_journal_file, record, 'add to Journal')
+ def get_enq_map_record(self, rid):
+ return self.enq_map.get(rid)
+ def get_txn_map_record(self, xid):
+ return self.txn_map.get(xid)
+ def get_outstanding_txn_list(self):
+ return self.txn_map.get_xid_list()
+ def get_queue_name(self):
+ return self.queue_name
+ def recover(self, high_rid_counter):
+ print 'Recovering', self.queue_name
+ self._analyze_files()
+ try:
+ while self._get_next_record(high_rid_counter):
+ pass
+ self._check_alignment()
+ except qls.err.NoMoreFilesInJournalError:
+ print 'No more files in journal'
+ except qls.err.FirstRecordOffsetMismatchError as err:
+ print '0x%08x: **** FRO ERROR: queue=\"%s\" fid=0x%x fro actual=0x%08x expected=0x%08x' % \
+ (err.get_expected_fro(), err.get_queue_name(), err.get_file_number(), err.get_record_offset(),
+ err.get_expected_fro())
+ def reconcile_transactions(self, prepared_list, txn_flag):
+ xid_list = self.txn_map.get_xid_list()
+ if len(xid_list) > 0:
+ print self.queue_name, 'contains', len(xid_list), 'open transaction(s):'
+ for xid in xid_list:
+ if xid in prepared_list.keys():
+ commit_flag = prepared_list[xid]
+ if commit_flag is None:
+ print ' ', qls.utils.format_xid(xid), '- Assuming commit after prepare'
+ if txn_flag:
+ self.txn_map.commit(xid)
+ elif commit_flag:
+ print ' ', qls.utils.format_xid(xid), '- Completing interrupted commit operation'
+ if txn_flag:
+ self.txn_map.commit(xid)
+ else:
+ print ' ', qls.utils.format_xid(xid), '- Completing interrupted abort operation'
+ if txn_flag:
+ self.txn_map.abort(xid)
+ else:
+ print ' ', qls.utils.format_xid(xid), '- Ignoring, not in prepared transaction list'
+ if txn_flag:
+ self.txn_map.abort(xid)
+ def report(self, print_stats_flag):
+ print 'Journal "%s":' % self.queue_name
+ print '=' * (11 + len(self.queue_name))
+ if print_stats_flag:
+ print str(self.statistics)
+ print self.enq_map.report_str(True, True)
+ print self.txn_map.report_str(True, True)
+ JournalFile.report_header()
+ for file_num in sorted(self.files.keys()):
+ self.files[file_num].report()
+ #TODO: move this to JournalFile, append to file info
+ if self.num_filler_records_required is not None and self.fill_to_offset is not None:
+ print '0x%x:0x%08x: %d filler records required for DBLK alignment to 0x%08x' % \
+ (self.current_journal_file.file_header.file_num, self.last_record_offset,
+ self.num_filler_records_required, self.fill_to_offset)
+ print
+ #--- protected functions ---
+ def _analyze_files(self):
+ for dir_entry in os.listdir(self.directory):
+ dir_entry_bits = dir_entry.split('.')
+ if len(dir_entry_bits) == 2 and dir_entry_bits[1] == JournalRecoveryManager.JRNL_DIR_NAME:
+ fq_file_name = os.path.join(self.directory, dir_entry)
+ file_handle = open(fq_file_name)
+ args = qls.utils.load_args(file_handle, qls.jrnl.RecordHeader)
+ file_hdr = qls.jrnl.FileHeader(*args)
+ file_hdr.init(file_handle, *qls.utils.load_args(file_handle, qls.jrnl.FileHeader))
+ if file_hdr.is_header_valid(file_hdr):
+ file_hdr.load(file_handle)
+ if file_hdr.is_valid():
+ qls.utils.skip(file_handle, file_hdr.file_header_size_sblks * qls.utils.DEFAULT_SBLK_SIZE)
+ self.files[file_hdr.file_num] = JournalFile(file_hdr)
+ self.file_num_list = sorted(self.files.keys())
+ self.file_num_itr = iter(self.file_num_list)
+ def _check_alignment(self): # TODO: Move into JournalFile
+ remaining_sblks = self.last_record_offset % qls.utils.DEFAULT_SBLK_SIZE
+ if remaining_sblks == 0:
+ self.num_filler_records_required = 0
+ else:
+ self.num_filler_records_required = (qls.utils.DEFAULT_SBLK_SIZE - remaining_sblks) / \
+ qls.utils.DEFAULT_DBLK_SIZE
+ self.fill_to_offset = self.last_record_offset + \
+ (self.num_filler_records_required * qls.utils.DEFAULT_DBLK_SIZE)
+ if self.args.show_recs or self.args.show_all_recs:
+ print '0x%x:0x%08x: %d filler records required for DBLK alignment to 0x%08x' % \
+ (self.current_journal_file.file_header.file_num, self.last_record_offset,
+ self.num_filler_records_required, self.fill_to_offset)
+ def _check_file(self):
+ if self.current_journal_file is not None:
+ if not self.current_journal_file.file_header.is_end_of_file():
+ return True
+ if self.current_journal_file.file_header.is_end_of_file():
+ self.last_record_offset = self.current_journal_file.file_header.file_handle.tell()
+ if not self._get_next_file():
+ return False
+ fhdr = self.current_journal_file.file_header
+ fhdr.file_handle.seek(fhdr.first_record_offset)
+ return True
+ def _get_next_file(self):
+ if self.current_journal_file is not None:
+ file_handle = self.current_journal_file.file_header.file_handle
+ if not file_handle.closed: # sanity check, should not be necessary
+ file_handle.close()
+ file_num = 0
+ try:
+ while file_num == 0:
+ file_num = self.file_num_itr.next()
+ except StopIteration:
+ pass
+ if file_num == 0:
+ return False
+ self.current_journal_file = self.files[file_num]
+ self.first_rec_flag = True
+ if self.args.show_recs or self.args.show_all_recs:
+ file_header = self.current_journal_file.file_header
+ print '0x%x:%s' % (file_header.file_num, file_header.to_string())
+ return True
+ def _get_next_record(self, high_rid_counter):
+ if not self._check_file():
+ return False
+ self.last_record_offset = self.current_journal_file.file_header.file_handle.tell()
+ this_record = qls.utils.load(self.current_journal_file.file_header.file_handle, qls.jrnl.RecordHeader)
+ if not this_record.is_header_valid(self.current_journal_file.file_header):
+ return False
+ if self.first_rec_flag:
+ if this_record.file_offset != self.current_journal_file.file_header.first_record_offset:
+ raise qls.err.FirstRecordOffsetMismatchError(self.current_journal_file.file_header, this_record)
+ self.first_rec_flag = False
+ self.statistics.total_record_count += 1
+ start_journal_file = self.current_journal_file
+ if isinstance(this_record, qls.jrnl.EnqueueRecord):
+ ok_flag = self._handle_enqueue_record(this_record, start_journal_file)
+ high_rid_counter.check(this_record.record_id)
+ if self.args.show_recs or self.args.show_all_recs:
+ print '0x%x:%s' % (start_journal_file.file_header.file_num, \
+ this_record.to_string(self.args.show_xids, self.args.show_data))
+ elif isinstance(this_record, qls.jrnl.DequeueRecord):
+ ok_flag = self._handle_dequeue_record(this_record, start_journal_file)
+ high_rid_counter.check(this_record.record_id)
+ if self.args.show_recs or self.args.show_all_recs:
+ print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record.to_string(self.args.show_xids))
+ elif isinstance(this_record, qls.jrnl.TransactionRecord):
+ ok_flag = self._handle_transaction_record(this_record, start_journal_file)
+ high_rid_counter.check(this_record.record_id)
+ if self.args.show_recs or self.args.show_all_recs:
+ print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record.to_string(self.args.show_xids))
+ else:
+ self.statistics.filler_record_count += 1
+ ok_flag = True
+ if self.args.show_all_recs:
+ print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record)
+ qls.utils.skip(self.current_journal_file.file_header.file_handle, qls.utils.DEFAULT_DBLK_SIZE)
+ return ok_flag
+ def _handle_enqueue_record(self, enqueue_record, start_journal_file):
+ while enqueue_record.load(self.current_journal_file.file_header.file_handle):
+ if not self._get_next_file():
+ enqueue_record.truncated_flag = True
+ return False
+ if not enqueue_record.is_valid(start_journal_file):
+ return False
+ if enqueue_record.is_external() and enqueue_record.data != None:
+ raise qls.err.ExternalDataError(self.current_journal_file.file_header, enqueue_record)
+ if enqueue_record.is_transient():
+ self.statistics.transient_record_count += 1
+ return True
+ if enqueue_record.xid_size > 0:
+ self.txn_map.add(start_journal_file, enqueue_record)
+ self.statistics.transaction_operation_count += 1
+ self.statistics.transaction_record_count += 1
+ self.statistics.transaction_enqueue_count += 1
+ else:
+ self.enq_map.add(start_journal_file, enqueue_record, False)
+ start_journal_file.incr_enq_cnt()
+ self.statistics.enqueue_count += 1
+ return True
+ def _handle_dequeue_record(self, dequeue_record, start_journal_file):
+ while dequeue_record.load(self.current_journal_file.file_header.file_handle):
+ if not self._get_next_file():
+ dequeue_record.truncated_flag = True
+ return False
+ if not dequeue_record.is_valid(start_journal_file):
+ return False
+ if dequeue_record.xid_size > 0:
+ if self.xid_prepared_list is None: # ie this is the TPL
+ dequeue_record.transaction_prepared_list_flag = True
+ elif not self.enq_map.contains(dequeue_record.dequeue_record_id):
+ dequeue_record.warnings.append('NOT IN EMAP') # Only for non-TPL records
+ self.txn_map.add(start_journal_file, dequeue_record)
+ self.statistics.transaction_operation_count += 1
+ self.statistics.transaction_record_count += 1
+ self.statistics.transaction_dequeue_count += 1
+ else:
+ try:
+ self.enq_map.delete(start_journal_file, dequeue_record)[0].decr_enq_cnt(dequeue_record)
+ except qls.err.RecordIdNotFoundError:
+ dequeue_record.warnings.append('NOT IN EMAP')
+ self.statistics.dequeue_count += 1
+ return True
+ def _handle_transaction_record(self, transaction_record, start_journal_file):
+ while transaction_record.load(self.current_journal_file.file_header.file_handle):
+ if not self._get_next_file():
+ transaction_record.truncated_flag = True
+ return False
+ if not transaction_record.is_valid(start_journal_file):
+ return False
+ if transaction_record.magic[-1] == 'a':
+ self.statistics.transaction_abort_count += 1
+ else:
+ self.statistics.transaction_commit_count += 1
+ if self.txn_map.contains(transaction_record.xid):
+ self.txn_map.delete(self.current_journal_file, transaction_record)
+ else:
+ transaction_record.warnings.append('NOT IN TMAP')
+# if transaction_record.magic[-1] == 'c': # commits only
+# self._txn_obj_list[hdr.xid] = hdr
+ self.statistics.transaction_record_count += 1
+ return True
+ def _load_data(self, record):
+ while not record.is_complete:
+ record.load(self.current_journal_file.file_handle)
+
+class JournalFile(object):
+ def __init__(self, file_header):
+ self.file_header = file_header
+ self.enq_cnt = 0
+ self.deq_cnt = 0
+ self.num_filler_records_required = None
+ def incr_enq_cnt(self):
+ self.enq_cnt += 1
+ def decr_enq_cnt(self, record):
+ if self.enq_cnt <= self.deq_cnt:
+ raise qls.err.EnqueueCountUnderflowError(self.file_header, record)
+ self.deq_cnt += 1
+ def get_enq_cnt(self):
+ return self.enq_cnt - self.deq_cnt
+ def is_outstanding_enq(self):
+ return self.enq_cnt > self.deq_cnt
+ @staticmethod
+ def report_header():
+ print 'file_num enq_cnt p_no efp journal_file'
+ print '-------- ------- ---- ----- ------------'
+ def report(self):
+ comment = '<uninitialized>' if self.file_header.file_num == 0 else ''
+ file_num_str = '0x%x' % self.file_header.file_num
+ print '%8s %7d %4d %4dk %s %s' % (file_num_str, self.get_enq_cnt(), self.file_header.partition_num,
+ self.file_header.efp_data_size_kb,
+ os.path.basename(self.file_header.file_handle.name), comment)