# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import jerr, jrnl import os.path, sys #== class EnqMap ============================================================== class EnqMap(object): """Class for maintaining a map of enqueued records, indexing the rid against hdr, fid and transaction lock""" def __init__(self): """Constructor""" self.__map = {} def __str__(self): """Print the contents of the map""" return self.report(True, True) def add(self, fid, hdr, lock = False): """Add a new record into the map""" if hdr.rid in self.__map: raise jerr.DuplicateRidError(hdr.rid) self.__map[hdr.rid] = [fid, hdr, lock] def contains(self, rid): """Return True if the map contains the given rid""" return rid in self.__map def delete(self, rid): """Delete the rid and its associated data from the map""" if rid in self.__map: if self.get_lock(rid): raise jerr.DeleteLockedRecordError(rid) del self.__map[rid] else: raise jerr.JWarning("ERROR: Deleting non-existent rid from EnqMap: rid=0x%x" % rid) def get(self, rid): """Return a list [fid, hdr, lock] for the given rid""" if self.contains(rid): return self.__map[rid] return None def get_fid(self, rid): """Return the fid for the given rid""" if self.contains(rid): return self.__map[rid][0] return None def get_hdr(self, rid): """Return the header record for the given rid""" if self.contains(rid): return self.__map[rid][1] return None def get_lock(self, rid): """Return the transaction lock value for the given rid""" if self.contains(rid): return self.__map[rid][2] return None def get_rec_list(self): """Return a list of tuples (fid, hdr, lock) for all entries in the map""" return self.__map.values() def lock(self, rid): """Set the transaction lock for a given rid to True""" if rid in self.__map: if not self.__map[rid][2]: # locked self.__map[rid][2] = True else: raise jerr.AlreadyLockedError(rid) else: raise jerr.JWarning("ERROR: Locking non-existent rid in EnqMap: rid=0x%x" % rid) def report(self, show_stats, show_records): """Return a string containing a text report for all records in the map""" if len(self.__map) == 0: return "No enqueued records found." rstr = "%d enqueued records found" % len(self.__map) if show_records: rstr += ":" rid_list = self.__map.keys() rid_list.sort() for rid in rid_list: if self.__map[rid][2]: lock_str = " [LOCKED]" else: lock_str = "" rstr += "\n lfid=%d %s %s" % (rec[0], rec[1], lock_str) else: rstr += "." return rstr def rids(self): """Return a list of rids in the map""" return self.__map.keys() def size(self): """Return the number of entries in the map""" return len(self.__map) def unlock(self, rid): """Set the transaction lock for a given rid to False""" if rid in self.__map: if self.__map[rid][2]: self.__map[rid][2] = False else: raise jerr.NotLockedError(rid) else: raise jerr.NonExistentRecordError("unlock", rid) #== class TxnMap ============================================================== class TxnMap(object): """Transaction map, which maps xids to a list of outstanding actions""" def __init__(self, emap): """Constructor, requires an existing EnqMap instance""" self.__emap = emap self.__map = {} def __str__(self): """Print the contents of the map""" return self.report(True, True) def add(self, fid, hdr): """Add a new transactional record into the map""" if isinstance(hdr, jrnl.DeqRec): try: self.__emap.lock(hdr.deq_rid) except jerr.JWarning: # Not in emap, look for rid in tmap l = self.find_rid(hdr.deq_rid, hdr.xid) if l != None: if l[2]: raise jerr.AlreadyLockedError(hdr.deq_rid) l[2] = True if hdr.xid in self.__map: self.__map[hdr.xid].append([fid, hdr, False]) # append to existing list else: self.__map[hdr.xid] = [[fid, hdr, False]] # create new list def contains(self, xid): """Return True if the xid exists in the map; False otherwise""" return xid in self.__map def delete(self, hdr): """Remove a transaction record from the map using either a commit or abort header""" if hdr.magic[-1] == "c": return self._commit(hdr.xid) if hdr.magic[-1] == "a": self._abort(hdr.xid) else: raise jerr.InvalidRecordTypeError("delete from TxnMap", hdr.magic, hdr.rid) def find_rid(self, rid, xid_hint = None): """ Search for and return map list with supplied rid. If xid_hint is supplied, try that xid first""" if xid_hint != None and self.contains(xid_hint): for l in self.__map[xid_hint]: if l[1].rid == rid: return l for xid in self.__map.iterkeys(): if xid_hint == None or xid != xid_hint: for l in self.__map[xid]: if l[1].rid == rid: return l def get(self, xid): """Return a list of operations for the given xid""" if self.contains(xid): return self.__map[xid] def report(self, show_stats, show_records): """Return a string containing a text report for all records in the map""" if len(self.__map) == 0: return "No outstanding transactions found." rstr = "%d outstanding transactions found" % len(self.__map) if show_records: rstr += ":" for xid, tup in self.__map.iteritems(): rstr += "\n xid=%s:" % jrnl.Utils.format_xid(xid) for i in tup: rstr += "\n %s" % str(i[1]) else: rstr += "." return rstr def size(self): """Return the number of xids in the map""" return len(self.__map) def xids(self): """Return a list of xids in the map""" return self.__map.keys() def _abort(self, xid): """Perform an abort operation for the given xid record""" for _, hdr, _ in self.__map[xid]: if isinstance(hdr, jrnl.DeqRec): try: self.__emap.unlock(hdr.deq_rid) except jerr.NonExistentRecordError, err: # Not in emap, look in current transaction op list (TPL) found_rid = False for _, hdr1, _ in self.__map[xid]: if isinstance(hdr1, jrnl.EnqRec) and hdr1.rid == hdr.deq_rid: found_rid = True break if not found_rid: # Not found in current transaction op list, re-throw error raise err del self.__map[xid] def _commit(self, xid): """Perform a commit operation for the given xid record""" mismatch_list = [] for fid, hdr, lock in self.__map[xid]: if isinstance(hdr, jrnl.EnqRec): self.__emap.add(fid, hdr, lock) # Transfer enq to emap else: if self.__emap.contains(hdr.deq_rid): self.__emap.unlock(hdr.deq_rid) self.__emap.delete(hdr.deq_rid) else: mismatch_list.append("0x%x" % hdr.deq_rid) del self.__map[xid] return mismatch_list #== class JrnlAnalyzer ======================================================== class JrnlAnalyzer(object): """ This class analyzes a set of journal files and determines which is the last to be written (the newest file), and hence which should be the first to be read for recovery (the oldest file). The analysis is performed on construction; the contents of the JrnlInfo object passed provide the recovery details. """ def __init__(self, jinf): """Constructor""" self.__oldest = None self.__jinf = jinf self.__flist = self._analyze() def __str__(self): """String representation of this JrnlAnalyzer instance, will print out results of analysis.""" ostr = "Journal files analyzed in directory %s (* = earliest full):\n" % self.__jinf.get_current_dir() if self.is_empty(): ostr += " \n" else: for tup in self.__flist: tmp = " " if tup[0] == self.__oldest[0]: tmp = "*" ostr += " %s %s: owi=%-5s rid=0x%x, fro=0x%x ts=%s\n" % (tmp, os.path.basename(tup[1]), tup[2], tup[3], tup[4], tup[5]) for i in range(self.__flist[-1][0] + 1, self.__jinf.get_num_jrnl_files()): ostr += " %s.%04x.jdat: \n" % (self.__jinf.get_jrnl_base_name(), i) return ostr # Analysis def get_oldest_file(self): """Return a tuple (ordnum, jfn, owi, rid, fro, timestamp) for the oldest data file found in the journal""" return self.__oldest def get_oldest_file_index(self): """Return the ordinal number of the oldest data file found in the journal""" if self.is_empty(): return None return self.__oldest[0] def is_empty(self): """Return true if the analysis found that the journal file has never been written to""" return len(self.__flist) == 0 def _analyze(self): """Perform the journal file analysis by reading and comparing the file headers of each journal data file""" owi_found = False flist = [] for i in range(0, self.__jinf.get_num_jrnl_files()): jfn = os.path.join(self.__jinf.get_current_dir(), "%s.%04x.jdat" % (self.__jinf.get_jrnl_base_name(), i)) fhandle = open(jfn) fhdr = jrnl.Utils.load(fhandle, jrnl.Hdr) if fhdr.empty(): break this_tup = (i, jfn, fhdr.owi(), fhdr.rid, fhdr.fro, fhdr.timestamp_str()) flist.append(this_tup) if i == 0: init_owi = fhdr.owi() self.__oldest = this_tup elif fhdr.owi() != init_owi and not owi_found: self.__oldest = this_tup owi_found = True return flist #== class JrnlReader ==================================================== class JrnlReader(object): """ This class contains an Enqueue Map (emap), a transaction map (tmap) and a transaction object list (txn_obj_list) which are populated by reading the journals from the oldest to the newest and analyzing each record. The JrnlInfo and JrnlAnalyzer objects supplied on construction provide the information used for the recovery. The analysis is performed on construction. """ def __init__(self, jinfo, jra, qflag = False, rflag = False, vflag = False): """Constructor, which reads all """ self._jinfo = jinfo self._jra = jra self._qflag = qflag self._rflag = rflag self._vflag = vflag # test callback functions for CSV tests self._csv_store_chk = None self._csv_start_cb = None self._csv_enq_cb = None self._csv_deq_cb = None self._csv_txn_cb = None self._csv_end_cb = None self._emap = EnqMap() self._tmap = TxnMap(self._emap) self._txn_obj_list = {} self._file = None self._file_hdr = None self._file_num = None self._first_rec_flag = None self._fro = None self._last_file_flag = None self._start_file_num = None self._file_hdr_owi = None self._warning = [] self._abort_cnt = 0 self._commit_cnt = 0 self._msg_cnt = 0 self._rec_cnt = 0 self._txn_msg_cnt = 0 def __str__(self): """Print out all the undequeued records""" return self.report(True, self._rflag) def emap(self): """Get the enqueue map""" return self._emap def get_abort_cnt(self): """Get the cumulative number of transactional aborts found""" return self._abort_cnt def get_commit_cnt(self): """Get the cumulative number of transactional commits found""" return self._commit_cnt def get_msg_cnt(self): """Get the cumulative number of messages found""" return self._msg_cnt def get_rec_cnt(self): """Get the cumulative number of journal records (including fillers) found""" return self._rec_cnt def is_last_file(self): """Return True if the last file is being read""" return self._last_file_flag def report(self, show_stats = True, show_records = False): """Return a string containing a report on the file analysis""" rstr = self._emap.report(show_stats, show_records) + "\n" + self._tmap.report(show_stats, show_records) #TODO - print size analysis here - ie how full, sparse, est. space remaining before enq threshold return rstr def run(self): """Perform the read of the journal""" if self._csv_start_cb != None and self._csv_start_cb(self._csv_store_chk): return if self._jra.is_empty(): return stop = self._advance_jrnl_file(*self._jra.get_oldest_file()) while not stop and not self._get_next_record(): pass if self._csv_end_cb != None and self._csv_end_cb(self._csv_store_chk): return if not self._qflag: print def set_callbacks(self, csv_store_chk, csv_start_cb = None, csv_enq_cb = None, csv_deq_cb = None, csv_txn_cb = None, csv_end_cb = None): """Set callbacks for checks to be made at various points while reading the journal""" self._csv_store_chk = csv_store_chk self._csv_start_cb = csv_start_cb self._csv_enq_cb = csv_enq_cb self._csv_deq_cb = csv_deq_cb self._csv_txn_cb = csv_txn_cb self._csv_end_cb = csv_end_cb def tmap(self): """Return the transaction map""" return self._tmap def get_txn_msg_cnt(self): """Get the cumulative transactional message count""" return self._txn_msg_cnt def txn_obj_list(self): """Get a cumulative list of transaction objects (commits and aborts)""" return self._txn_obj_list def _advance_jrnl_file(self, *oldest_file_info): """Rotate to using the next journal file. Return False if the operation was successful, True if there are no more files to read.""" fro_seek_flag = False if len(oldest_file_info) > 0: self._start_file_num = self._file_num = oldest_file_info[0] self._fro = oldest_file_info[4] fro_seek_flag = True # jump to fro to start reading if not self._qflag and not self._rflag: if self._vflag: print "Recovering journals..." else: print "Recovering journals", if self._file != None and self._is_file_full(): self._file.close() self._file_num = self._incr_file_num() if self._file_num == self._start_file_num: return True if self._start_file_num == 0: self._last_file_flag = self._file_num == self._jinfo.get_num_jrnl_files() - 1 else: self._last_file_flag = self._file_num == self._start_file_num - 1 if self._file_num < 0 or self._file_num >= self._jinfo.get_num_jrnl_files(): raise jerr.BadFileNumberError(self._file_num) jfn = os.path.join(self._jinfo.get_current_dir(), "%s.%04x.jdat" % (self._jinfo.get_jrnl_base_name(), self._file_num)) self._file = open(jfn) self._file_hdr = jrnl.Utils.load(self._file, jrnl.Hdr) if fro_seek_flag and self._file.tell() != self._fro: self._file.seek(self._fro) self._first_rec_flag = True if not self._qflag: if self._rflag: print jfn, ": ", self._file_hdr elif self._vflag: print "* Reading %s" % jfn else: print ".", sys.stdout.flush() return False def _check_owi(self, hdr): """Return True if the header's owi indicator matches that of the file header record; False otherwise. This can indicate whether the last record in a file has been read and now older records which have not yet been overwritten are now being read.""" return self._file_hdr_owi == hdr.owi() def _is_file_full(self): """Return True if the current file is full (no more write space); false otherwise""" return self._file.tell() >= self._jinfo.get_jrnl_file_size_bytes() def _get_next_record(self): """Get the next record in the file for analysis""" if self._is_file_full(): if self._advance_jrnl_file(): return True try: hdr = jrnl.Utils.load(self._file, jrnl.Hdr) except: return True if hdr.empty(): return True if hdr.check(): return True self._rec_cnt += 1 self._file_hdr_owi = self._file_hdr.owi() if self._first_rec_flag: if self._file_hdr.fro != hdr.foffs: raise jerr.FirstRecordOffsetMismatch(self._file_hdr.fro, hdr.foffs) else: if self._rflag: print " * fro ok: 0x%x" % self._file_hdr.fro self._first_rec_flag = False stop = False if isinstance(hdr, jrnl.EnqRec): stop = self._handle_enq_rec(hdr) elif isinstance(hdr, jrnl.DeqRec): stop = self._handle_deq_rec(hdr) elif isinstance(hdr, jrnl.TxnRec): stop = self._handle_txn_rec(hdr) wstr = "" for warn in self._warning: wstr += " (%s)" % warn if self._rflag: print " > %s %s" % (hdr, wstr) self._warning = [] return stop def _handle_deq_rec(self, hdr): """Process a dequeue ("RHMd") record""" if self._load_rec(hdr): return True # Check OWI flag if not self._check_owi(hdr): self._warning.append("WARNING: OWI mismatch - could be overwrite boundary.") return True # Test hook if self._csv_deq_cb != None and self._csv_deq_cb(self._csv_store_chk, hdr): return True try: if hdr.xid == None: self._emap.delete(hdr.deq_rid) else: self._tmap.add(self._file_hdr.fid, hdr) except jerr.JWarning, warn: self._warning.append(str(warn)) return False def _handle_enq_rec(self, hdr): """Process a dequeue ("RHMe") record""" if self._load_rec(hdr): return True # Check extern flag if hdr.extern and hdr.data != None: raise jerr.ExternFlagDataError(hdr) # Check OWI flag if not self._check_owi(hdr): self._warning.append("WARNING: OWI mismatch - could be overwrite boundary.") return True # Test hook if self._csv_enq_cb != None and self._csv_enq_cb(self._csv_store_chk, hdr): return True if hdr.xid == None: self._emap.add(self._file_hdr.fid, hdr) else: self._txn_msg_cnt += 1 self._tmap.add(self._file_hdr.fid, hdr) self._msg_cnt += 1 return False def _handle_txn_rec(self, hdr): """Process a transaction ("RHMa or RHMc") record""" if self._load_rec(hdr): return True # Check OWI flag if not self._check_owi(hdr): self._warning.append("WARNING: OWI mismatch - could be overwrite boundary.") return True # Test hook if self._csv_txn_cb != None and self._csv_txn_cb(self._csv_store_chk, hdr): return True if hdr.magic[-1] == "a": self._abort_cnt += 1 else: self._commit_cnt += 1 if self._tmap.contains(hdr.xid): mismatched_rids = self._tmap.delete(hdr) if mismatched_rids != None and len(mismatched_rids) > 0: self._warning.append("WARNING: transactional dequeues not found in enqueue map; rids=%s" % mismatched_rids) else: self._warning.append("WARNING: %s not found in transaction map" % jrnl.Utils.format_xid(hdr.xid)) if hdr.magic[-1] == "c": # commits only self._txn_obj_list[hdr.xid] = hdr return False def _incr_file_num(self): """Increment the number of files read with wraparound (ie after file n-1, go to 0)""" self._file_num += 1 if self._file_num >= self._jinfo.get_num_jrnl_files(): self._file_num = 0 return self._file_num def _load_rec(self, hdr): """Load a single record for the given header. There may be arbitrarily large xids and data components.""" while not hdr.complete(): if self._advance_jrnl_file(): return True hdr.load(self._file) return False # ============================================================================= if __name__ == "__main__": print "This is a library, and cannot be executed."