summaryrefslogtreecommitdiff
path: root/qpid/tools/src/py/qpid-store-resize
blob: 38d8eaf1adda64982b1090731c627bc90d7d6918 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

from qpidstore import jerr, jrnl, janal
import glob, optparse, os, sys, time


#== class Resize ==============================================================

class Resize(object):
    """
    Creates a new store journal and copies records from old journal to new. The new journal may be of
    different size from the old one. The records are packed into the new journal (ie only remaining
    enqueued records and associated transactions - if any - are copied over without spaces between them).
    
    The default action is to push the old journal down into a 'bak' sub-directory and then create a
    new journal of the same size and pack it with the records from the old. However, it is possible to
    suppress the pushdown (using --no-pushdown), in which case either a new journal id (using
    --new-base-filename) or an old journal id (usnig --old-base-filename) must be supplied. In the former
    case,a new journal will be created using the new base file name alongside the old one. In the latter
    case, the old journal will be renamed to the supplied name, and the new one will take the default.
    Note that both can be specified together with the --no-pushdown option.
    
    To resize the journal, use the optional --num-jfiles and/or --jfile-size parameters. These
    should be large enough to write all the records or an error will result. If the size is large enough
    to write all records, but too small to keep below the enqueue threshold, a warning will be printed.
    Note that as any valid size will be accepted, a journal can also be shrunk, as long as it is sufficiently
    big to accept the transferred records.
    """
    
    BAK_DIR = "bak"
    JFILE_SIZE_PGS_MIN = 1
    JFILE_SIZE_PGS_MAX = 32768
    NUM_JFILES_MIN = 4
    NUM_JFILES_MAX = 64
    
    def __init__(self):
        """Constructor"""
        self._opts = None
        self._jdir = None
        self._fname = None
        self._fnum = None
        self._file = None
        self._file_rec_wr_cnt = None
        self._filler_wr_cnt = None
        self._last_rec_fid = None
        self._last_rec_offs = None
        self._rec_wr_cnt = None
        
        self._jrnl_info = None
        self._jrnl_analysis = None
        self._jrnl_reader = None
        
        self._process_args()
        self._jrnl_info = jrnl.JrnlInfo(self._jdir, self._opts.bfn)
        # FIXME: This is a hack... find an elegant way of getting the file size to jrec!
        jrnl.JRNL_FILE_SIZE = self._jrnl_info.get_jrnl_file_size_bytes()
        self._jrnl_analysis = janal.JrnlAnalyzer(self._jrnl_info)
        self._jrnl_reader = janal.JrnlReader(self._jrnl_info, self._jrnl_analysis, self._opts.qflag, self._opts.rflag,
                                            self._opts.vflag)
    
    def run(self):
        """Perform the action of resizing the journal"""
        if not self._opts.qflag:
            print self._jrnl_analysis
        self._jrnl_reader.run()
        if self._opts.vflag:
            print self._jrnl_info
        if not self._opts.qflag:
            print self._jrnl_reader.report(self._opts.vflag, self._opts.rflag)
        self._handle_old_files()
        self._create_new_files()
        if not self._opts.qflag:
            print "Transferred %d records to new journal." % self._rec_wr_cnt
        self._chk_free()
    
    def _chk_free(self):
        """Check if sufficient space is available in resized journal to be able to enqueue. Raise a warning if not."""
        if self._last_rec_fid == None or self._last_rec_offs == None:
            return
        wr_capacity_bytes = self._last_rec_fid * self._jrnl_info.get_jrnl_data_size_bytes() + self._last_rec_offs
        tot_capacity_bytes = self._jrnl_info.get_tot_jrnl_data_size_bytes()
        percent_full = 100.0 * wr_capacity_bytes / tot_capacity_bytes
        if percent_full > 80.0:
            raise jerr.JWarning("WARNING: Journal %s is %2.1f%% full and will likely not allow enqueuing of new records"
                                " until some existing records are dequeued." %
                                (self._jrnl_info.get_jrnl_id(), percent_full))
    
    def _create_new_files(self):
        """Create new journal files"""
        # Assemble records to be transfered
        master_record_list = {}
        txn_record_list = self._jrnl_reader.txn_obj_list()
        if self._opts.vflag and self._jrnl_reader.emap().size() > 0:
            print "* Assembling %d records from emap" % self._jrnl_reader.emap().size()
        for tup in self._jrnl_reader.emap().get_rec_list():
            hdr = tup[1]
            hdr.flags &= ~jrnl.Hdr.OWI_MASK # Turn off owi
            master_record_list[long(hdr.rid)] = hdr
            if hdr.xidsize > 0 and hdr.xid in txn_record_list:
                txn_hdr = txn_record_list[hdr.xid]
                del(txn_record_list[hdr.xid])
                txn_hdr.flags &= ~jrnl.Hdr.OWI_MASK # Turn off owi
                master_record_list[long(txn_hdr.rid)] = txn_hdr
        if self._opts.vflag and self._jrnl_reader.tmap().size() > 0:
            print "* Assembling %d records from tmap" % self._jrnl_reader.tmap().size()
        for xid in self._jrnl_reader.tmap().xids():
            for l in self._jrnl_reader.tmap().get(xid):
                hdr = l[1]
                hdr.flags &= ~jrnl.Hdr.OWI_MASK # Turn off owi
                master_record_list[hdr.rid] = hdr
        rid_list = master_record_list.keys()
        rid_list.sort()
            
        # get base filename
        bfn = self._opts.bfn
        if self._opts.nbfn != None:
            bfn = self._opts.nbfn
        
        # write jinf file
        self._jrnl_info.resize(self._opts.njf, self._opts.jfs)
        self._jrnl_info.write(self._jdir, bfn)
        
        # write records
        if self._opts.vflag:
            print "* Transferring records to new journal files"
        fro = self._jrnl_info.get_jrnl_sblk_size_bytes()
        while len(rid_list) > 0:
            hdr = master_record_list[rid_list.pop(0)]
            rec = hdr.encode()
            pos = 0
            while pos < len(rec):
                if self._file == None or self._file.tell() >= self._jrnl_info.get_jrnl_file_size_bytes():
                    if self._file == None:
                        rid = hdr.rid
                    elif len(rid_list) == 0:
                        rid = 0
                    else:
                        rid = rid_list[0]
                    if not self._rotate_file(rid, fro):
                        raise jerr.JournalSpaceExceededError()
                if len(rec) - pos <= self._jrnl_info.get_jrnl_file_size_bytes() - self._file.tell():
                    self._file.write(rec[pos:])
                    self._fill_file(jrnl.Utils.size_in_bytes_to_blk(self._file.tell(),
                                                                    self._jrnl_info.get_jrnl_dblk_size_bytes()))
                    pos = len(rec)
                    fro = self._jrnl_info.get_jrnl_sblk_size_bytes()
                else:
                    flen = self._jrnl_info.get_jrnl_file_size_bytes() - self._file.tell()
                    self._file.write(rec[pos:pos + flen])
                    pos += flen
                    rem = len(rec) - pos
                    if rem <= self._jrnl_info.get_jrnl_data_size_bytes():
                        fro = (jrnl.Utils.size_in_bytes_to_blk(self._jrnl_info.get_jrnl_sblk_size_bytes() + rem,
                                                               self._jrnl_info.get_jrnl_dblk_size_bytes()))
                    else:
                        fro = 0
            self._rec_wr_cnt += 1
            self._file_rec_wr_cnt += 1
        self._fill_file(add_filler_recs = True)
        while self._rotate_file():
            pass
        
    def _fill_file(self, to_posn = None, add_filler_recs = False):
        """Fill a file to a known offset"""
        if self._file == None:
            return
        if add_filler_recs:
            nfr = int(jrnl.Utils.rem_bytes_in_blk(self._file, self._jrnl_info.get_jrnl_sblk_size_bytes()) /
                      self._jrnl_info.get_jrnl_dblk_size_bytes())
            if nfr > 0:
                self._filler_wr_cnt = nfr
                for i in range(0, nfr):
                    self._file.write("RHMx")
                    self._fill_file(jrnl.Utils.size_in_bytes_to_blk(self._file.tell(),
                                                                    self._jrnl_info.get_jrnl_dblk_size_bytes()))
            self._last_rec_fid = self._fnum
            self._last_rec_offs = self._file.tell()
        if to_posn == None:
            to_posn = self._jrnl_info.get_jrnl_file_size_bytes()
        elif to_posn > self._jrnl_info.get_jrnl_file_size_bytes():
            raise jerr.FillExceedsFileSizeError(to_posn, self._jrnl_info.get_jrnl_file_size_bytes())
        diff = to_posn - self._file.tell()
        self._file.write(str("\0" * diff))
        #DEBUG
        if self._file.tell() != to_posn:
            raise jerr.FillSizeError(self._file.tell(), to_posn)
        
    def _rotate_file(self, rid = None, fro = None):
        """Switch to the next logical file"""
        if self._file != None:
            self._file.close()
            if self._opts.vflag:
                if self._file_rec_wr_cnt == 0:
                    print "  (empty)"
                elif self._filler_wr_cnt == None:
                    print "  (%d records)" % self._file_rec_wr_cnt
                else:
                    print "  (%d records + %d filler(s))" % (self._file_rec_wr_cnt, self._filler_wr_cnt)
        if self._fnum == None:
            self._fnum = 0
            self._rec_wr_cnt = 0
        elif self._fnum == self._jrnl_info.get_num_jrnl_files() - 1:
            return False
        else:
            self._fnum += 1
        self._file_rec_wr_cnt = 0
        self._fname = os.path.join(self._jrnl_info.get_jrnl_dir(), "%s.%04x.jdat" %
                                   (self._jrnl_info.get_jrnl_base_name(), self._fnum))
        if self._opts.vflag:
            print "* Opening file %s" % self._fname,
        self._file = open(self._fname, "w")
        if rid == None or fro == None:
            self._fill_file()
        else:
            now = time.time()
            fhdr = jrnl.FileHdr(0, "RHMf", jrnl.Hdr.HDR_VER, int(jrnl.Hdr.BIG_ENDIAN), 0, rid)
            fhdr.init(self._file, 0, self._fnum, self._fnum, fro, int(now), 1000000000*(now - int(now)))
            self._file.write(fhdr.encode())
            self._fill_file(self._jrnl_info.get_jrnl_sblk_size_bytes())
        return True
    
    def _handle_old_files(self):
        """Push old journal down into a backup directory"""
        target_dir = self._jdir
        if not self._opts.npd:
            target_dir = os.path.join(self._jdir, self.BAK_DIR)
            if os.path.exists(target_dir):
                if self._opts.vflag:
                    print "* Pushdown directory %s exists, deleting content" % target_dir
                for fname in glob.glob(os.path.join(target_dir, "*")):
                    os.unlink(fname)
            else:
                if self._opts.vflag:
                    print "* Creating new pushdown directory %s" % target_dir
                os.mkdir(target_dir)
        
        if not self._opts.npd or self._opts.obfn != None:
            if self._opts.obfn != None and self._opts.vflag:
                print "* Renaming old journal files using base name %s" % self._opts.obfn
            # .jdat files
            for fname in glob.glob(os.path.join(self._jdir, "%s.*.jdat" % self._opts.bfn)):
                tbfn = os.path.basename(fname)
                if self._opts.obfn != None:
                    per1 = tbfn.rfind(".")
                    if per1 >= 0:
                        per2 = tbfn.rfind(".", 0, per1)
                        if per2 >= 0:
                            tbfn = "%s%s" % (self._opts.obfn, tbfn[per2:])
                os.rename(fname, os.path.join(target_dir, tbfn))
            # .jinf file
            self._jrnl_info.write(target_dir, self._opts.obfn)
            os.unlink(os.path.join(self._jdir, "%s.jinf" % self._opts.bfn))

    def _print_options(self):
        """Print program options"""
        if self._opts.vflag:
            print "Journal dir: %s" % self._jdir
            print "Options: Base filename: %s" % self._opts.bfn
            print "         New base filename: %s" % self._opts.nbfn
            print "         Old base filename: %s" % self._opts.obfn
            print "         Pushdown: %s" % self._opts.npd
            print "         No. journal files: %d" % self._opts.njf
            print "         Journal file size: %d 64kiB blocks" % self._opts.jfs
            print "         Show records flag: %s" % self._opts.rflag
            print "         Verbose flag: %s" % True
            print
    
    def _process_args(self):
        """Process the command-line arguments"""
        opt = optparse.OptionParser(usage="%prog [options] DIR", version="%prog 1.0")
        opt.add_option("-b", "--base-filename",
                      action="store", dest="bfn", default="JournalData",
                      help="Base filename for old journal files")
        opt.add_option("-B", "--new-base-filename",
                      action="store", dest="nbfn",
                      help="Base filename for new journal files")
        opt.add_option("-n", "--no-pushdown",
                      action="store_true", dest="npd",
                      help="Suppress pushdown of old files into \"bak\" dir; old files will remain in existing dir")
        opt.add_option("-N", "--num-jfiles",
                      action="store", type="int", dest="njf", default=8,
                      help="Number of files for new journal (%d-%d)" % (self.NUM_JFILES_MIN, self.NUM_JFILES_MAX))
        opt.add_option("-o", "--old-base-filename",
                      action="store", dest="obfn",
                      help="Base filename for old journal files")
        opt.add_option("-q", "--quiet",
                      action="store_true", dest="qflag",
                      help="Quiet (suppress all non-error output)")
        opt.add_option("-r", "--records",
                      action="store_true", dest="rflag",
                      help="Print remaining records and transactions")
        opt.add_option("-s", "--jfile-size-pgs",
                      action="store", type="int", dest="jfs", default=24,
                      help="Size of each new journal file in 64kiB blocks (%d-%d)" %
                           (self.JFILE_SIZE_PGS_MIN, self.JFILE_SIZE_PGS_MAX))
        opt.add_option("-v", "--verbose",
                      action="store_true", dest="vflag",
                      help="Verbose output")
        (self._opts, args) = opt.parse_args()
        if len(args) == 0:
            opt.error("No journal directory argument")
        elif len(args) > 1:
            opt.error("Too many positional arguments: %s" % args)
        if self._opts.qflag and self._opts.rflag:
            opt.error("Quiet (-q/--quiet) and record (-r/--records) options are mutually exclusive")
        if self._opts.qflag and self._opts.vflag:
            opt.error("Quiet (-q/--quiet) and verbose (-v/--verbose) options are mutually exclusive")
        if self._opts.njf != None and (self._opts.njf < self.NUM_JFILES_MIN or self._opts.njf > self.NUM_JFILES_MAX):
            opt.error("Number of files (%d) is out of range (%d-%d)" %
                     (self._opts.njf, self.NUM_JFILES_MIN, self.NUM_JFILES_MAX))
        if self._opts.jfs != None and (self._opts.jfs < self.JFILE_SIZE_PGS_MIN or
                                       self._opts.jfs > self.JFILE_SIZE_PGS_MAX):
            opt.error("File size (%d) is out of range (%d-%d)" %
                     (self._opts.jfs, self.JFILE_SIZE_PGS_MIN, self.JFILE_SIZE_PGS_MAX))
        if self._opts.npd != None and (self._opts.nbfn == None and self._opts.obfn == None):
            opt.error("If (-n/--no-pushdown) is used, then at least one of (-B/--new-base-filename) and"
                     " (-o/--old-base-filename) must be used.")
        self._jdir = args[0]
        if not os.path.exists(self._jdir):
            opt.error("Journal path \"%s\" does not exist" % self._jdir)
        self._print_options()

#==============================================================================
# main program
#==============================================================================

if __name__ == "__main__":
    R = Resize()
    try:
        R.run()
    except Exception, e:
        sys.exit(e)