summaryrefslogtreecommitdiff
path: root/qpid/tools/src/py/qpid-store-resize
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/tools/src/py/qpid-store-resize')
-rwxr-xr-xqpid/tools/src/py/qpid-store-resize350
1 files changed, 350 insertions, 0 deletions
diff --git a/qpid/tools/src/py/qpid-store-resize b/qpid/tools/src/py/qpid-store-resize
new file mode 100755
index 0000000000..38d8eaf1ad
--- /dev/null
+++ b/qpid/tools/src/py/qpid-store-resize
@@ -0,0 +1,350 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpidstore import jerr, jrnl, janal
+import glob, optparse, os, sys, time
+
+
+#== class Resize ==============================================================
+
+class Resize(object):
+ """
+ Creates a new store journal and copies records from old journal to new. The new journal may be of
+ different size from the old one. The records are packed into the new journal (ie only remaining
+ enqueued records and associated transactions - if any - are copied over without spaces between them).
+
+ The default action is to push the old journal down into a 'bak' sub-directory and then create a
+ new journal of the same size and pack it with the records from the old. However, it is possible to
+ suppress the pushdown (using --no-pushdown), in which case either a new journal id (using
+ --new-base-filename) or an old journal id (usnig --old-base-filename) must be supplied. In the former
+ case,a new journal will be created using the new base file name alongside the old one. In the latter
+ case, the old journal will be renamed to the supplied name, and the new one will take the default.
+ Note that both can be specified together with the --no-pushdown option.
+
+ To resize the journal, use the optional --num-jfiles and/or --jfile-size parameters. These
+ should be large enough to write all the records or an error will result. If the size is large enough
+ to write all records, but too small to keep below the enqueue threshold, a warning will be printed.
+ Note that as any valid size will be accepted, a journal can also be shrunk, as long as it is sufficiently
+ big to accept the transferred records.
+ """
+
+ BAK_DIR = "bak"
+ JFILE_SIZE_PGS_MIN = 1
+ JFILE_SIZE_PGS_MAX = 32768
+ NUM_JFILES_MIN = 4
+ NUM_JFILES_MAX = 64
+
+ def __init__(self):
+ """Constructor"""
+ self._opts = None
+ self._jdir = None
+ self._fname = None
+ self._fnum = None
+ self._file = None
+ self._file_rec_wr_cnt = None
+ self._filler_wr_cnt = None
+ self._last_rec_fid = None
+ self._last_rec_offs = None
+ self._rec_wr_cnt = None
+
+ self._jrnl_info = None
+ self._jrnl_analysis = None
+ self._jrnl_reader = None
+
+ self._process_args()
+ self._jrnl_info = jrnl.JrnlInfo(self._jdir, self._opts.bfn)
+ # FIXME: This is a hack... find an elegant way of getting the file size to jrec!
+ jrnl.JRNL_FILE_SIZE = self._jrnl_info.get_jrnl_file_size_bytes()
+ self._jrnl_analysis = janal.JrnlAnalyzer(self._jrnl_info)
+ self._jrnl_reader = janal.JrnlReader(self._jrnl_info, self._jrnl_analysis, self._opts.qflag, self._opts.rflag,
+ self._opts.vflag)
+
+ def run(self):
+ """Perform the action of resizing the journal"""
+ if not self._opts.qflag:
+ print self._jrnl_analysis
+ self._jrnl_reader.run()
+ if self._opts.vflag:
+ print self._jrnl_info
+ if not self._opts.qflag:
+ print self._jrnl_reader.report(self._opts.vflag, self._opts.rflag)
+ self._handle_old_files()
+ self._create_new_files()
+ if not self._opts.qflag:
+ print "Transferred %d records to new journal." % self._rec_wr_cnt
+ self._chk_free()
+
+ def _chk_free(self):
+ """Check if sufficient space is available in resized journal to be able to enqueue. Raise a warning if not."""
+ if self._last_rec_fid == None or self._last_rec_offs == None:
+ return
+ wr_capacity_bytes = self._last_rec_fid * self._jrnl_info.get_jrnl_data_size_bytes() + self._last_rec_offs
+ tot_capacity_bytes = self._jrnl_info.get_tot_jrnl_data_size_bytes()
+ percent_full = 100.0 * wr_capacity_bytes / tot_capacity_bytes
+ if percent_full > 80.0:
+ raise jerr.JWarning("WARNING: Journal %s is %2.1f%% full and will likely not allow enqueuing of new records"
+ " until some existing records are dequeued." %
+ (self._jrnl_info.get_jrnl_id(), percent_full))
+
+ def _create_new_files(self):
+ """Create new journal files"""
+ # Assemble records to be transfered
+ master_record_list = {}
+ txn_record_list = self._jrnl_reader.txn_obj_list()
+ if self._opts.vflag and self._jrnl_reader.emap().size() > 0:
+ print "* Assembling %d records from emap" % self._jrnl_reader.emap().size()
+ for tup in self._jrnl_reader.emap().get_rec_list():
+ hdr = tup[1]
+ hdr.flags &= ~jrnl.Hdr.OWI_MASK # Turn off owi
+ master_record_list[long(hdr.rid)] = hdr
+ if hdr.xidsize > 0 and hdr.xid in txn_record_list:
+ txn_hdr = txn_record_list[hdr.xid]
+ del(txn_record_list[hdr.xid])
+ txn_hdr.flags &= ~jrnl.Hdr.OWI_MASK # Turn off owi
+ master_record_list[long(txn_hdr.rid)] = txn_hdr
+ if self._opts.vflag and self._jrnl_reader.tmap().size() > 0:
+ print "* Assembling %d records from tmap" % self._jrnl_reader.tmap().size()
+ for xid in self._jrnl_reader.tmap().xids():
+ for l in self._jrnl_reader.tmap().get(xid):
+ hdr = l[1]
+ hdr.flags &= ~jrnl.Hdr.OWI_MASK # Turn off owi
+ master_record_list[hdr.rid] = hdr
+ rid_list = master_record_list.keys()
+ rid_list.sort()
+
+ # get base filename
+ bfn = self._opts.bfn
+ if self._opts.nbfn != None:
+ bfn = self._opts.nbfn
+
+ # write jinf file
+ self._jrnl_info.resize(self._opts.njf, self._opts.jfs)
+ self._jrnl_info.write(self._jdir, bfn)
+
+ # write records
+ if self._opts.vflag:
+ print "* Transferring records to new journal files"
+ fro = self._jrnl_info.get_jrnl_sblk_size_bytes()
+ while len(rid_list) > 0:
+ hdr = master_record_list[rid_list.pop(0)]
+ rec = hdr.encode()
+ pos = 0
+ while pos < len(rec):
+ if self._file == None or self._file.tell() >= self._jrnl_info.get_jrnl_file_size_bytes():
+ if self._file == None:
+ rid = hdr.rid
+ elif len(rid_list) == 0:
+ rid = 0
+ else:
+ rid = rid_list[0]
+ if not self._rotate_file(rid, fro):
+ raise jerr.JournalSpaceExceededError()
+ if len(rec) - pos <= self._jrnl_info.get_jrnl_file_size_bytes() - self._file.tell():
+ self._file.write(rec[pos:])
+ self._fill_file(jrnl.Utils.size_in_bytes_to_blk(self._file.tell(),
+ self._jrnl_info.get_jrnl_dblk_size_bytes()))
+ pos = len(rec)
+ fro = self._jrnl_info.get_jrnl_sblk_size_bytes()
+ else:
+ flen = self._jrnl_info.get_jrnl_file_size_bytes() - self._file.tell()
+ self._file.write(rec[pos:pos + flen])
+ pos += flen
+ rem = len(rec) - pos
+ if rem <= self._jrnl_info.get_jrnl_data_size_bytes():
+ fro = (jrnl.Utils.size_in_bytes_to_blk(self._jrnl_info.get_jrnl_sblk_size_bytes() + rem,
+ self._jrnl_info.get_jrnl_dblk_size_bytes()))
+ else:
+ fro = 0
+ self._rec_wr_cnt += 1
+ self._file_rec_wr_cnt += 1
+ self._fill_file(add_filler_recs = True)
+ while self._rotate_file():
+ pass
+
+ def _fill_file(self, to_posn = None, add_filler_recs = False):
+ """Fill a file to a known offset"""
+ if self._file == None:
+ return
+ if add_filler_recs:
+ nfr = int(jrnl.Utils.rem_bytes_in_blk(self._file, self._jrnl_info.get_jrnl_sblk_size_bytes()) /
+ self._jrnl_info.get_jrnl_dblk_size_bytes())
+ if nfr > 0:
+ self._filler_wr_cnt = nfr
+ for i in range(0, nfr):
+ self._file.write("RHMx")
+ self._fill_file(jrnl.Utils.size_in_bytes_to_blk(self._file.tell(),
+ self._jrnl_info.get_jrnl_dblk_size_bytes()))
+ self._last_rec_fid = self._fnum
+ self._last_rec_offs = self._file.tell()
+ if to_posn == None:
+ to_posn = self._jrnl_info.get_jrnl_file_size_bytes()
+ elif to_posn > self._jrnl_info.get_jrnl_file_size_bytes():
+ raise jerr.FillExceedsFileSizeError(to_posn, self._jrnl_info.get_jrnl_file_size_bytes())
+ diff = to_posn - self._file.tell()
+ self._file.write(str("\0" * diff))
+ #DEBUG
+ if self._file.tell() != to_posn:
+ raise jerr.FillSizeError(self._file.tell(), to_posn)
+
+ def _rotate_file(self, rid = None, fro = None):
+ """Switch to the next logical file"""
+ if self._file != None:
+ self._file.close()
+ if self._opts.vflag:
+ if self._file_rec_wr_cnt == 0:
+ print " (empty)"
+ elif self._filler_wr_cnt == None:
+ print " (%d records)" % self._file_rec_wr_cnt
+ else:
+ print " (%d records + %d filler(s))" % (self._file_rec_wr_cnt, self._filler_wr_cnt)
+ if self._fnum == None:
+ self._fnum = 0
+ self._rec_wr_cnt = 0
+ elif self._fnum == self._jrnl_info.get_num_jrnl_files() - 1:
+ return False
+ else:
+ self._fnum += 1
+ self._file_rec_wr_cnt = 0
+ self._fname = os.path.join(self._jrnl_info.get_jrnl_dir(), "%s.%04x.jdat" %
+ (self._jrnl_info.get_jrnl_base_name(), self._fnum))
+ if self._opts.vflag:
+ print "* Opening file %s" % self._fname,
+ self._file = open(self._fname, "w")
+ if rid == None or fro == None:
+ self._fill_file()
+ else:
+ now = time.time()
+ fhdr = jrnl.FileHdr(0, "RHMf", jrnl.Hdr.HDR_VER, int(jrnl.Hdr.BIG_ENDIAN), 0, rid)
+ fhdr.init(self._file, 0, self._fnum, self._fnum, fro, int(now), 1000000000*(now - int(now)))
+ self._file.write(fhdr.encode())
+ self._fill_file(self._jrnl_info.get_jrnl_sblk_size_bytes())
+ return True
+
+ def _handle_old_files(self):
+ """Push old journal down into a backup directory"""
+ target_dir = self._jdir
+ if not self._opts.npd:
+ target_dir = os.path.join(self._jdir, self.BAK_DIR)
+ if os.path.exists(target_dir):
+ if self._opts.vflag:
+ print "* Pushdown directory %s exists, deleting content" % target_dir
+ for fname in glob.glob(os.path.join(target_dir, "*")):
+ os.unlink(fname)
+ else:
+ if self._opts.vflag:
+ print "* Creating new pushdown directory %s" % target_dir
+ os.mkdir(target_dir)
+
+ if not self._opts.npd or self._opts.obfn != None:
+ if self._opts.obfn != None and self._opts.vflag:
+ print "* Renaming old journal files using base name %s" % self._opts.obfn
+ # .jdat files
+ for fname in glob.glob(os.path.join(self._jdir, "%s.*.jdat" % self._opts.bfn)):
+ tbfn = os.path.basename(fname)
+ if self._opts.obfn != None:
+ per1 = tbfn.rfind(".")
+ if per1 >= 0:
+ per2 = tbfn.rfind(".", 0, per1)
+ if per2 >= 0:
+ tbfn = "%s%s" % (self._opts.obfn, tbfn[per2:])
+ os.rename(fname, os.path.join(target_dir, tbfn))
+ # .jinf file
+ self._jrnl_info.write(target_dir, self._opts.obfn)
+ os.unlink(os.path.join(self._jdir, "%s.jinf" % self._opts.bfn))
+
+ def _print_options(self):
+ """Print program options"""
+ if self._opts.vflag:
+ print "Journal dir: %s" % self._jdir
+ print "Options: Base filename: %s" % self._opts.bfn
+ print " New base filename: %s" % self._opts.nbfn
+ print " Old base filename: %s" % self._opts.obfn
+ print " Pushdown: %s" % self._opts.npd
+ print " No. journal files: %d" % self._opts.njf
+ print " Journal file size: %d 64kiB blocks" % self._opts.jfs
+ print " Show records flag: %s" % self._opts.rflag
+ print " Verbose flag: %s" % True
+ print
+
+ def _process_args(self):
+ """Process the command-line arguments"""
+ opt = optparse.OptionParser(usage="%prog [options] DIR", version="%prog 1.0")
+ opt.add_option("-b", "--base-filename",
+ action="store", dest="bfn", default="JournalData",
+ help="Base filename for old journal files")
+ opt.add_option("-B", "--new-base-filename",
+ action="store", dest="nbfn",
+ help="Base filename for new journal files")
+ opt.add_option("-n", "--no-pushdown",
+ action="store_true", dest="npd",
+ help="Suppress pushdown of old files into \"bak\" dir; old files will remain in existing dir")
+ opt.add_option("-N", "--num-jfiles",
+ action="store", type="int", dest="njf", default=8,
+ help="Number of files for new journal (%d-%d)" % (self.NUM_JFILES_MIN, self.NUM_JFILES_MAX))
+ opt.add_option("-o", "--old-base-filename",
+ action="store", dest="obfn",
+ help="Base filename for old journal files")
+ opt.add_option("-q", "--quiet",
+ action="store_true", dest="qflag",
+ help="Quiet (suppress all non-error output)")
+ opt.add_option("-r", "--records",
+ action="store_true", dest="rflag",
+ help="Print remaining records and transactions")
+ opt.add_option("-s", "--jfile-size-pgs",
+ action="store", type="int", dest="jfs", default=24,
+ help="Size of each new journal file in 64kiB blocks (%d-%d)" %
+ (self.JFILE_SIZE_PGS_MIN, self.JFILE_SIZE_PGS_MAX))
+ opt.add_option("-v", "--verbose",
+ action="store_true", dest="vflag",
+ help="Verbose output")
+ (self._opts, args) = opt.parse_args()
+ if len(args) == 0:
+ opt.error("No journal directory argument")
+ elif len(args) > 1:
+ opt.error("Too many positional arguments: %s" % args)
+ if self._opts.qflag and self._opts.rflag:
+ opt.error("Quiet (-q/--quiet) and record (-r/--records) options are mutually exclusive")
+ if self._opts.qflag and self._opts.vflag:
+ opt.error("Quiet (-q/--quiet) and verbose (-v/--verbose) options are mutually exclusive")
+ if self._opts.njf != None and (self._opts.njf < self.NUM_JFILES_MIN or self._opts.njf > self.NUM_JFILES_MAX):
+ opt.error("Number of files (%d) is out of range (%d-%d)" %
+ (self._opts.njf, self.NUM_JFILES_MIN, self.NUM_JFILES_MAX))
+ if self._opts.jfs != None and (self._opts.jfs < self.JFILE_SIZE_PGS_MIN or
+ self._opts.jfs > self.JFILE_SIZE_PGS_MAX):
+ opt.error("File size (%d) is out of range (%d-%d)" %
+ (self._opts.jfs, self.JFILE_SIZE_PGS_MIN, self.JFILE_SIZE_PGS_MAX))
+ if self._opts.npd != None and (self._opts.nbfn == None and self._opts.obfn == None):
+ opt.error("If (-n/--no-pushdown) is used, then at least one of (-B/--new-base-filename) and"
+ " (-o/--old-base-filename) must be used.")
+ self._jdir = args[0]
+ if not os.path.exists(self._jdir):
+ opt.error("Journal path \"%s\" does not exist" % self._jdir)
+ self._print_options()
+
+#==============================================================================
+# main program
+#==============================================================================
+
+if __name__ == "__main__":
+ R = Resize()
+ try:
+ R.run()
+ except Exception, e:
+ sys.exit(e)