diff options
Diffstat (limited to 'qpid/tools/src/py/qpid-store-resize')
-rwxr-xr-x | qpid/tools/src/py/qpid-store-resize | 350 |
1 files changed, 350 insertions, 0 deletions
diff --git a/qpid/tools/src/py/qpid-store-resize b/qpid/tools/src/py/qpid-store-resize new file mode 100755 index 0000000000..38d8eaf1ad --- /dev/null +++ b/qpid/tools/src/py/qpid-store-resize @@ -0,0 +1,350 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpidstore import jerr, jrnl, janal +import glob, optparse, os, sys, time + + +#== class Resize ============================================================== + +class Resize(object): + """ + Creates a new store journal and copies records from old journal to new. The new journal may be of + different size from the old one. The records are packed into the new journal (ie only remaining + enqueued records and associated transactions - if any - are copied over without spaces between them). + + The default action is to push the old journal down into a 'bak' sub-directory and then create a + new journal of the same size and pack it with the records from the old. However, it is possible to + suppress the pushdown (using --no-pushdown), in which case either a new journal id (using + --new-base-filename) or an old journal id (usnig --old-base-filename) must be supplied. In the former + case,a new journal will be created using the new base file name alongside the old one. In the latter + case, the old journal will be renamed to the supplied name, and the new one will take the default. + Note that both can be specified together with the --no-pushdown option. + + To resize the journal, use the optional --num-jfiles and/or --jfile-size parameters. These + should be large enough to write all the records or an error will result. If the size is large enough + to write all records, but too small to keep below the enqueue threshold, a warning will be printed. + Note that as any valid size will be accepted, a journal can also be shrunk, as long as it is sufficiently + big to accept the transferred records. + """ + + BAK_DIR = "bak" + JFILE_SIZE_PGS_MIN = 1 + JFILE_SIZE_PGS_MAX = 32768 + NUM_JFILES_MIN = 4 + NUM_JFILES_MAX = 64 + + def __init__(self): + """Constructor""" + self._opts = None + self._jdir = None + self._fname = None + self._fnum = None + self._file = None + self._file_rec_wr_cnt = None + self._filler_wr_cnt = None + self._last_rec_fid = None + self._last_rec_offs = None + self._rec_wr_cnt = None + + self._jrnl_info = None + self._jrnl_analysis = None + self._jrnl_reader = None + + self._process_args() + self._jrnl_info = jrnl.JrnlInfo(self._jdir, self._opts.bfn) + # FIXME: This is a hack... find an elegant way of getting the file size to jrec! + jrnl.JRNL_FILE_SIZE = self._jrnl_info.get_jrnl_file_size_bytes() + self._jrnl_analysis = janal.JrnlAnalyzer(self._jrnl_info) + self._jrnl_reader = janal.JrnlReader(self._jrnl_info, self._jrnl_analysis, self._opts.qflag, self._opts.rflag, + self._opts.vflag) + + def run(self): + """Perform the action of resizing the journal""" + if not self._opts.qflag: + print self._jrnl_analysis + self._jrnl_reader.run() + if self._opts.vflag: + print self._jrnl_info + if not self._opts.qflag: + print self._jrnl_reader.report(self._opts.vflag, self._opts.rflag) + self._handle_old_files() + self._create_new_files() + if not self._opts.qflag: + print "Transferred %d records to new journal." % self._rec_wr_cnt + self._chk_free() + + def _chk_free(self): + """Check if sufficient space is available in resized journal to be able to enqueue. Raise a warning if not.""" + if self._last_rec_fid == None or self._last_rec_offs == None: + return + wr_capacity_bytes = self._last_rec_fid * self._jrnl_info.get_jrnl_data_size_bytes() + self._last_rec_offs + tot_capacity_bytes = self._jrnl_info.get_tot_jrnl_data_size_bytes() + percent_full = 100.0 * wr_capacity_bytes / tot_capacity_bytes + if percent_full > 80.0: + raise jerr.JWarning("WARNING: Journal %s is %2.1f%% full and will likely not allow enqueuing of new records" + " until some existing records are dequeued." % + (self._jrnl_info.get_jrnl_id(), percent_full)) + + def _create_new_files(self): + """Create new journal files""" + # Assemble records to be transfered + master_record_list = {} + txn_record_list = self._jrnl_reader.txn_obj_list() + if self._opts.vflag and self._jrnl_reader.emap().size() > 0: + print "* Assembling %d records from emap" % self._jrnl_reader.emap().size() + for tup in self._jrnl_reader.emap().get_rec_list(): + hdr = tup[1] + hdr.flags &= ~jrnl.Hdr.OWI_MASK # Turn off owi + master_record_list[long(hdr.rid)] = hdr + if hdr.xidsize > 0 and hdr.xid in txn_record_list: + txn_hdr = txn_record_list[hdr.xid] + del(txn_record_list[hdr.xid]) + txn_hdr.flags &= ~jrnl.Hdr.OWI_MASK # Turn off owi + master_record_list[long(txn_hdr.rid)] = txn_hdr + if self._opts.vflag and self._jrnl_reader.tmap().size() > 0: + print "* Assembling %d records from tmap" % self._jrnl_reader.tmap().size() + for xid in self._jrnl_reader.tmap().xids(): + for l in self._jrnl_reader.tmap().get(xid): + hdr = l[1] + hdr.flags &= ~jrnl.Hdr.OWI_MASK # Turn off owi + master_record_list[hdr.rid] = hdr + rid_list = master_record_list.keys() + rid_list.sort() + + # get base filename + bfn = self._opts.bfn + if self._opts.nbfn != None: + bfn = self._opts.nbfn + + # write jinf file + self._jrnl_info.resize(self._opts.njf, self._opts.jfs) + self._jrnl_info.write(self._jdir, bfn) + + # write records + if self._opts.vflag: + print "* Transferring records to new journal files" + fro = self._jrnl_info.get_jrnl_sblk_size_bytes() + while len(rid_list) > 0: + hdr = master_record_list[rid_list.pop(0)] + rec = hdr.encode() + pos = 0 + while pos < len(rec): + if self._file == None or self._file.tell() >= self._jrnl_info.get_jrnl_file_size_bytes(): + if self._file == None: + rid = hdr.rid + elif len(rid_list) == 0: + rid = 0 + else: + rid = rid_list[0] + if not self._rotate_file(rid, fro): + raise jerr.JournalSpaceExceededError() + if len(rec) - pos <= self._jrnl_info.get_jrnl_file_size_bytes() - self._file.tell(): + self._file.write(rec[pos:]) + self._fill_file(jrnl.Utils.size_in_bytes_to_blk(self._file.tell(), + self._jrnl_info.get_jrnl_dblk_size_bytes())) + pos = len(rec) + fro = self._jrnl_info.get_jrnl_sblk_size_bytes() + else: + flen = self._jrnl_info.get_jrnl_file_size_bytes() - self._file.tell() + self._file.write(rec[pos:pos + flen]) + pos += flen + rem = len(rec) - pos + if rem <= self._jrnl_info.get_jrnl_data_size_bytes(): + fro = (jrnl.Utils.size_in_bytes_to_blk(self._jrnl_info.get_jrnl_sblk_size_bytes() + rem, + self._jrnl_info.get_jrnl_dblk_size_bytes())) + else: + fro = 0 + self._rec_wr_cnt += 1 + self._file_rec_wr_cnt += 1 + self._fill_file(add_filler_recs = True) + while self._rotate_file(): + pass + + def _fill_file(self, to_posn = None, add_filler_recs = False): + """Fill a file to a known offset""" + if self._file == None: + return + if add_filler_recs: + nfr = int(jrnl.Utils.rem_bytes_in_blk(self._file, self._jrnl_info.get_jrnl_sblk_size_bytes()) / + self._jrnl_info.get_jrnl_dblk_size_bytes()) + if nfr > 0: + self._filler_wr_cnt = nfr + for i in range(0, nfr): + self._file.write("RHMx") + self._fill_file(jrnl.Utils.size_in_bytes_to_blk(self._file.tell(), + self._jrnl_info.get_jrnl_dblk_size_bytes())) + self._last_rec_fid = self._fnum + self._last_rec_offs = self._file.tell() + if to_posn == None: + to_posn = self._jrnl_info.get_jrnl_file_size_bytes() + elif to_posn > self._jrnl_info.get_jrnl_file_size_bytes(): + raise jerr.FillExceedsFileSizeError(to_posn, self._jrnl_info.get_jrnl_file_size_bytes()) + diff = to_posn - self._file.tell() + self._file.write(str("\0" * diff)) + #DEBUG + if self._file.tell() != to_posn: + raise jerr.FillSizeError(self._file.tell(), to_posn) + + def _rotate_file(self, rid = None, fro = None): + """Switch to the next logical file""" + if self._file != None: + self._file.close() + if self._opts.vflag: + if self._file_rec_wr_cnt == 0: + print " (empty)" + elif self._filler_wr_cnt == None: + print " (%d records)" % self._file_rec_wr_cnt + else: + print " (%d records + %d filler(s))" % (self._file_rec_wr_cnt, self._filler_wr_cnt) + if self._fnum == None: + self._fnum = 0 + self._rec_wr_cnt = 0 + elif self._fnum == self._jrnl_info.get_num_jrnl_files() - 1: + return False + else: + self._fnum += 1 + self._file_rec_wr_cnt = 0 + self._fname = os.path.join(self._jrnl_info.get_jrnl_dir(), "%s.%04x.jdat" % + (self._jrnl_info.get_jrnl_base_name(), self._fnum)) + if self._opts.vflag: + print "* Opening file %s" % self._fname, + self._file = open(self._fname, "w") + if rid == None or fro == None: + self._fill_file() + else: + now = time.time() + fhdr = jrnl.FileHdr(0, "RHMf", jrnl.Hdr.HDR_VER, int(jrnl.Hdr.BIG_ENDIAN), 0, rid) + fhdr.init(self._file, 0, self._fnum, self._fnum, fro, int(now), 1000000000*(now - int(now))) + self._file.write(fhdr.encode()) + self._fill_file(self._jrnl_info.get_jrnl_sblk_size_bytes()) + return True + + def _handle_old_files(self): + """Push old journal down into a backup directory""" + target_dir = self._jdir + if not self._opts.npd: + target_dir = os.path.join(self._jdir, self.BAK_DIR) + if os.path.exists(target_dir): + if self._opts.vflag: + print "* Pushdown directory %s exists, deleting content" % target_dir + for fname in glob.glob(os.path.join(target_dir, "*")): + os.unlink(fname) + else: + if self._opts.vflag: + print "* Creating new pushdown directory %s" % target_dir + os.mkdir(target_dir) + + if not self._opts.npd or self._opts.obfn != None: + if self._opts.obfn != None and self._opts.vflag: + print "* Renaming old journal files using base name %s" % self._opts.obfn + # .jdat files + for fname in glob.glob(os.path.join(self._jdir, "%s.*.jdat" % self._opts.bfn)): + tbfn = os.path.basename(fname) + if self._opts.obfn != None: + per1 = tbfn.rfind(".") + if per1 >= 0: + per2 = tbfn.rfind(".", 0, per1) + if per2 >= 0: + tbfn = "%s%s" % (self._opts.obfn, tbfn[per2:]) + os.rename(fname, os.path.join(target_dir, tbfn)) + # .jinf file + self._jrnl_info.write(target_dir, self._opts.obfn) + os.unlink(os.path.join(self._jdir, "%s.jinf" % self._opts.bfn)) + + def _print_options(self): + """Print program options""" + if self._opts.vflag: + print "Journal dir: %s" % self._jdir + print "Options: Base filename: %s" % self._opts.bfn + print " New base filename: %s" % self._opts.nbfn + print " Old base filename: %s" % self._opts.obfn + print " Pushdown: %s" % self._opts.npd + print " No. journal files: %d" % self._opts.njf + print " Journal file size: %d 64kiB blocks" % self._opts.jfs + print " Show records flag: %s" % self._opts.rflag + print " Verbose flag: %s" % True + print + + def _process_args(self): + """Process the command-line arguments""" + opt = optparse.OptionParser(usage="%prog [options] DIR", version="%prog 1.0") + opt.add_option("-b", "--base-filename", + action="store", dest="bfn", default="JournalData", + help="Base filename for old journal files") + opt.add_option("-B", "--new-base-filename", + action="store", dest="nbfn", + help="Base filename for new journal files") + opt.add_option("-n", "--no-pushdown", + action="store_true", dest="npd", + help="Suppress pushdown of old files into \"bak\" dir; old files will remain in existing dir") + opt.add_option("-N", "--num-jfiles", + action="store", type="int", dest="njf", default=8, + help="Number of files for new journal (%d-%d)" % (self.NUM_JFILES_MIN, self.NUM_JFILES_MAX)) + opt.add_option("-o", "--old-base-filename", + action="store", dest="obfn", + help="Base filename for old journal files") + opt.add_option("-q", "--quiet", + action="store_true", dest="qflag", + help="Quiet (suppress all non-error output)") + opt.add_option("-r", "--records", + action="store_true", dest="rflag", + help="Print remaining records and transactions") + opt.add_option("-s", "--jfile-size-pgs", + action="store", type="int", dest="jfs", default=24, + help="Size of each new journal file in 64kiB blocks (%d-%d)" % + (self.JFILE_SIZE_PGS_MIN, self.JFILE_SIZE_PGS_MAX)) + opt.add_option("-v", "--verbose", + action="store_true", dest="vflag", + help="Verbose output") + (self._opts, args) = opt.parse_args() + if len(args) == 0: + opt.error("No journal directory argument") + elif len(args) > 1: + opt.error("Too many positional arguments: %s" % args) + if self._opts.qflag and self._opts.rflag: + opt.error("Quiet (-q/--quiet) and record (-r/--records) options are mutually exclusive") + if self._opts.qflag and self._opts.vflag: + opt.error("Quiet (-q/--quiet) and verbose (-v/--verbose) options are mutually exclusive") + if self._opts.njf != None and (self._opts.njf < self.NUM_JFILES_MIN or self._opts.njf > self.NUM_JFILES_MAX): + opt.error("Number of files (%d) is out of range (%d-%d)" % + (self._opts.njf, self.NUM_JFILES_MIN, self.NUM_JFILES_MAX)) + if self._opts.jfs != None and (self._opts.jfs < self.JFILE_SIZE_PGS_MIN or + self._opts.jfs > self.JFILE_SIZE_PGS_MAX): + opt.error("File size (%d) is out of range (%d-%d)" % + (self._opts.jfs, self.JFILE_SIZE_PGS_MIN, self.JFILE_SIZE_PGS_MAX)) + if self._opts.npd != None and (self._opts.nbfn == None and self._opts.obfn == None): + opt.error("If (-n/--no-pushdown) is used, then at least one of (-B/--new-base-filename) and" + " (-o/--old-base-filename) must be used.") + self._jdir = args[0] + if not os.path.exists(self._jdir): + opt.error("Journal path \"%s\" does not exist" % self._jdir) + self._print_options() + +#============================================================================== +# main program +#============================================================================== + +if __name__ == "__main__": + R = Resize() + try: + R.run() + except Exception, e: + sys.exit(e) |