1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
|
import tempfile
execfile("hardlink.py")
#######################################################################
#
# robust - code which prevents mirror from being corrupted, error-recovery
#
# Ideally no matter an instance of rdiff-backup gets aborted, no
# information should get lost. The target directory should be left in
# a coherent state, and later instances of rdiff-backup should clean
# things up so there is no sign that anything ever got aborted or
# failed.
#
# Thus, files should be updated in an atomic way as possible. Each
# file should be updated (and the corresponding diff files written) or
# not, and it should be clear which happened. In general, I don't
# think this is possible, since the creation of the diff files and the
# changing of updated files cannot be guarateed to happen together.
# It is possible, I think, to record various information to files
# which would allow a later process to figure out what the last
# operation was, but this would add several file operations to the
# processing of each file, and I don't think, would be a good
# tradeoff.
#
# The compromise reached here is that diff files should be created
# just before the mirror files are updated, and each file update
# should be done with a rename operation on a file in the same
# directory. Furthermore, every once in a while, rdiff-backup will
# record which file it just finished processing. If any fatal errors
# are caught, it will also record the last processed file. Future
# instances may not know exactly when the previous instance was
# aborted, but they will be able to narrow down the possibilities.
class RobustAction:
"""Represents a file operation to be accomplished later"""
def __init__(self, init_thunk, final_thunk, error_thunk):
"""RobustAction initializer
All the thunks are functions whose return value will be
ignored. init_thunk should not make any irreversible changes
but prepare for the writing of the important data. final_thunk
should be as short as possible and do the real work.
error_thunk is run if there is an error in init_thunk or
final_thunk. Errors in init_thunk should be corrected by
error_thunk as if nothing had been run in the first place.
The functions take no arguments except for error_thunk, which
receives the exception as its only argument.
"""
self.init_thunk = init_thunk
self.final_thunk = final_thunk
self.error_thunk = error_thunk
def execute(self):
"""Actually run the operation"""
try:
self.init_thunk()
self.final_thunk()
except Exception, exp: # Catch all errors
Log.exception()
self.error_thunk(exp)
raise exp
class Robust:
"""Contains various file operations made safer using tempfiles"""
null_action = RobustAction(lambda: None, lambda: None, lambda e: None)
def chain(robust_action_list):
"""Return chain tying together a number of robust actions
The whole chain will be aborted if some error occurs in
initialization stage of any of the component actions.
"""
ras_with_completed_inits = []
def init():
for ra in robust_action_list:
ras_with_completed_inits.append(ra)
ra.init_thunk()
def final():
for ra in robust_action_list: ra.final_thunk()
def error(exp):
for ra in ras_with_completed_inits: ra.error_thunk(exp)
return RobustAction(init, final, error)
def chain_nested(robust_action_list):
"""Like chain but final actions performed in reverse order"""
ras_with_completed_inits = []
def init():
for ra in robust_action_list:
ras_with_completed_inits.append(ra)
ra.init_thunk()
def final():
ralist_copy = robust_action_list[:]
ralist_copy.reverse()
for ra in ralist_copy: ra.final_thunk()
def error(exp):
for ra in ras_with_completed_inits: ra.error_thunk(exp)
return RobustAction(init, final, error)
def make_tf_robustaction(init_thunk, tempfiles, final_renames = None):
"""Shortcut RobustAction creator when only tempfiles involved
Often the robust action will just consist of some initial
stage, renaming tempfiles in the final stage, and deleting
them if there is an error. This function makes it easier to
create RobustActions of that type.
"""
assert type(tempfiles) is types.TupleType, tempfiles
if final_renames is None: final = lambda: None
else:
assert len(tempfiles) == len(final_renames)
def final(): # rename tempfiles to final positions
for i in range(len(tempfiles)):
final_name = final_renames[i]
if final_name:
if final_name.isdir(): # Cannot rename over directory
final_name.delete()
tempfiles[i].rename(final_name)
def error(exp):
for tf in tempfiles: tf.delete()
return RobustAction(init_thunk, final, error)
def copy_action(rorpin, rpout):
"""Return robust action copying rorpin to rpout
The source can be a rorp or an rpath. Does not recurse. If
directories copied, then just exit (output directory not
overwritten).
"""
tfl = [None] # Need mutable object that init and final can access
def init():
if not (rorpin.isdir() and rpout.isdir()): # already a dir
tfl[0] = TempFileManager.new(rpout)
if rorpin.isreg(): tfl[0].write_from_fileobj(rorpin.open("rb"))
else: RPath.copy(rorpin, tf)
def final():
if tfl[0] and tfl[0].lstat():
if rpout.isdir(): rpout.delete()
tfl[0].rename(rpout)
return RobustAction(init, final, lambda e: tfl[0] and tfl[0].delete())
def copy_with_attribs_action(rorpin, rpout, compress = None):
"""Like copy_action but also copy attributes"""
tfl = [None] # Need mutable object that init and final can access
def init():
if not (rorpin.isdir() and rpout.isdir()): # already a dir
tfl[0] = TempFileManager.new(rpout)
if rorpin.isreg():
tfl[0].write_from_fileobj(rorpin.open("rb"), compress)
else: RPath.copy(rorpin, tfl[0])
if tfl[0].lstat(): # Some files, like sockets, won't be created
RPathStatic.copy_attribs(rorpin, tfl[0])
def final():
if rorpin.isdir() and rpout.isdir():
RPath.copy_attribs(rorpin, rpout)
elif tfl[0] and tfl[0].lstat():
if rpout.isdir(): rpout.delete()
tfl[0].rename(rpout)
return RobustAction(init, final, lambda e: tfl[0] and tfl[0].delete())
def copy_attribs_action(rorpin, rpout):
"""Return action which just copies attributes
Copying attributes is already pretty atomic, so just run
normal sequence.
"""
def final(): RPath.copy_attribs(rorpin, rpout)
return RobustAction(lambda: None, final, lambda e: None)
def symlink_action(rpath, linktext):
"""Return symlink action by moving one file over another"""
tf = TempFileManager.new(rpath)
def init(): tf.symlink(linktext)
return Robust.make_tf_robustaction(init, (tf,), (rpath,))
def destructive_write_action(rp, s):
"""Return action writing string s to rpath rp in robust way
This will overwrite any data currently in rp.
"""
tf = TempFileManager.new(rp)
def init():
fp = tf.open("wb")
fp.write(s)
assert not fp.close()
tf.setdata()
return Robust.make_tf_robustaction(init, (tf,), (rp,))
MakeStatic(Robust)
class TempFileManager:
"""Manage temp files"""
# This is a connection-specific list of temp files, to be cleaned
# up before rdiff-backup exits.
_tempfiles = []
# To make collisions less likely, this gets put in the file name
# and incremented whenever a new file is requested.
_tfindex = 0
def new(cls, rp_base, same_dir = 1):
"""Return new tempfile that isn't in use.
If same_dir, tempfile will be in same directory as rp_base.
Otherwise, use tempfile module to get filename.
"""
conn = rp_base.conn
if conn is not Globals.local_connection:
return conn.TempFileManager.new(rp_base, same_dir)
def find_unused(conn, dir):
"""Find an unused tempfile with connection conn in directory dir"""
while 1:
if cls._tfindex > 100000000:
Log("Resetting index", 2)
cls._tfindex = 0
tf = TempFile(conn, os.path.join(dir,
"rdiff-backup.tmp.%d" % cls._tfindex))
cls._tfindex = cls._tfindex+1
if not tf.lstat(): return tf
if same_dir: tf = find_unused(conn, rp_base.dirsplit()[0])
else: tf = TempFile(conn, tempfile.mktemp())
cls._tempfiles.append(tf)
return tf
def remove_listing(cls, tempfile):
"""Remove listing of tempfile"""
if Globals.local_connection is not tempfile.conn:
tempfile.conn.TempFileManager.remove_listing(tempfile)
elif tempfile in cls._tempfiles: cls._tempfiles.remove(tempfile)
def delete_all(cls):
"""Delete all remaining tempfiles"""
for tf in cls._tempfiles[:]: tf.delete()
MakeClass(TempFileManager)
class TempFile(RPath):
"""Like an RPath, but keep track of which ones are still here"""
def rename(self, rp_dest):
"""Rename temp file to permanent location, possibly overwriting"""
if self.isdir() and not rp_dest.isdir():
# Cannot move a directory directly over another file
rp_dest.delete()
if (isinstance(rp_dest, DSRPath) and rp_dest.perms_delayed
and not self.hasfullperms()):
# If we are moving to a delayed perm directory, delay
# permission change on destination.
rp_dest.chmod(self.getperms())
self.chmod(0700)
RPathStatic.rename(self, rp_dest)
# Sometimes this just seems to fail silently, as in one
# hardlinked twin is moved over the other. So check to make
# sure below.
self.setdata()
if self.lstat():
rp_dest.delete()
RPathStatic.rename(self, rp_dest)
self.setdata()
if self.lstat(): raise OSError("Cannot rename tmp file correctly")
TempFileManager.remove_listing(self)
def delete(self):
RPath.delete(self)
TempFileManager.remove_listing(self)
class SaveState:
"""Save state in the middle of backups for resuming later"""
_last_file_sym = None # RPath of sym pointing to last file processed
_last_file_definitive_rp = None # Touch this if last file is really last
_last_checkpoint_time = 0 # time in seconds of last checkpoint
_checkpoint_rp = None # RPath of checkpoint data pickle
def init_filenames(cls, incrementing):
"""Set rpaths of markers. Assume rbdir already set.
If incrementing, then indicate increment operation, otherwise
indicate mirror.
"""
if not Globals.isbackup_writer:
return Globals.backup_writer.SaveState.init_filenames(incrementing)
assert Globals.local_connection is Globals.rbdir.conn, \
(Globals.rbdir.conn, Globals.backup_writer)
if incrementing: cls._last_file_sym = Globals.rbdir.append(
"last-file-incremented.%s.snapshot" % Time.curtimestr)
else: cls._last_file_sym = Globals.rbdir.append(
"last-file-mirrored.%s.snapshot" % Time.curtimestr)
cls._checkpoint_rp = Globals.rbdir.append(
"checkpoint-data.%s.snapshot" % Time.curtimestr)
cls._last_file_definitive_rp = Globals.rbdir.append(
"last-file-definitive.%s.snapshot" % Time.curtimestr)
def touch_last_file(cls):
"""Touch last file marker, indicating backup has begun"""
cls._last_file_sym.touch()
def touch_last_file_definitive(cls):
"""Create last-file-definitive marker
When a backup gets aborted, there may be time to indicate the
last file successfully processed, and this should be touched.
Sometimes when the abort is hard, there may be a last file
indicated, but further files since then have been processed,
in which case this shouldn't be touched.
"""
cls._last_file_definitive_rp.touch()
def record_last_file_action(cls, last_file_rorp):
"""Action recording last file to be processed as symlink in rbdir
last_file_rorp is None means that no file is known to have
been processed.
"""
if last_file_rorp:
symtext = apply(os.path.join,
('increments',) + last_file_rorp.index)
return Robust.symlink_action(cls._last_file_sym, symtext)
else: return RobustAction(lambda: None, cls.touch_last_file,
lambda exp: None)
def checkpoint_inc_backup(cls, ITR, finalizer, last_file_rorp,
override = None):
"""Save states of tree reducer and finalizer during inc backup
If override is true, checkpoint even if one isn't due.
"""
if not override and not cls.checkpoint_needed(): return
assert cls._checkpoint_rp, "_checkpoint_rp not set yet"
cls._last_checkpoint_time = time.time()
Log("Writing checkpoint time %s" % cls._last_checkpoint_time, 7)
state_string = cPickle.dumps((ITR.getstate(), finalizer.getstate()))
Robust.chain([Robust.destructive_write_action(cls._checkpoint_rp,
state_string),
cls.record_last_file_action(last_file_rorp)]).execute()
def checkpoint_mirror(cls, finalizer, last_file_rorp, override = None):
"""For a mirror, only finalizer and last_file should be saved"""
if not override and not cls.checkpoint_needed(): return
if not cls._checkpoint_rp:
Log("Warning, _checkpoint_rp not set yet", 2)
return
cls._last_checkpoint_time = time.time()
Log("Writing checkpoint time %s" % cls._last_checkpoint_time, 7)
state_string = cPickle.dumps(finalizer.getstate())
Robust.chain([Robust.destructive_write_action(cls._checkpoint_rp,
state_string),
cls.record_last_file_action(last_file_rorp)]).execute()
def checkpoint_needed(cls):
"""Returns true if another checkpoint is called for"""
return (time.time() > cls._last_checkpoint_time +
Globals.checkpoint_interval)
def checkpoint_remove(cls):
"""Remove all checkpointing data after successful operation"""
for rp in Resume.get_relevant_rps(): rp.delete()
if Globals.preserve_hardlinks: Hardlink.remove_all_checkpoints()
MakeClass(SaveState)
class Resume:
"""Check for old aborted backups and resume if necessary"""
_session_info_list = None # List of ResumeSessionInfo's, sorted by time
def FindTime(cls, index, later_than = 0):
"""For a given index, find the appropriate time to use for inc
If it is clear which time to use (because it is determined by
definitive records, or there are no aborted backup, etc.) then
just return the appropriate time. Otherwise, if an aborted
backup was last checkpointed before the index, assume that it
didn't get there, and go for the older time. If an inc file
is already present, the function will be rerun with later time
specified.
"""
if Time.prevtime > later_than: return Time.prevtime # usual case
for si in cls.get_sis_covering_index(index):
if si.time > later_than: return si.time
raise SkipFileException("Index %s already covered, skipping" %
str(index))
def get_sis_covering_index(cls, index):
"""Return sorted list of SessionInfos which may cover index
Aborted backup may be relevant unless index is lower and we
are sure that it didn't go further.
"""
return filter(lambda session_info:
not ((session_info.last_index is None or
session_info.last_index < index) and
session_info.last_definitive),
cls._session_info_list)
def SetSessionInfo(cls):
"""Read data directory and initialize _session_info"""
silist = []
rp_quad_dict = cls.group_rps_by_time(cls.get_relevant_rps())
times = rp_quad_dict.keys()
times.sort()
for time in times:
silist.append(cls.quad_to_si(time, rp_quad_dict[time]))
cls._session_info_list = silist
def get_relevant_rps(cls):
"""Return list of relevant rpaths in rbdata directory"""
relevant_bases = ['last-file-incremented', 'last-file-mirrored',
'checkpoint-data', 'last-file-definitive']
rps = map(Globals.rbdir.append, Globals.rbdir.listdir())
return filter(lambda rp: rp.isincfile()
and rp.getincbase_str() in relevant_bases, rps)
def group_rps_by_time(cls, rplist):
"""Take list of rps return time dict {time: quadlist}
Times in seconds are the keys, values are triples of rps
[last-file-incremented, last-file-mirrored, checkpoint-data,
last-is-definitive].
"""
result = {}
for rp in rplist:
time = Time.stringtotime(rp.getinctime())
if result.has_key(time): quadlist = result[time]
else: quadlist = [None, None, None, None]
base_string = rp.getincbase_str()
if base_string == 'last-file-incremented': quadlist[0] = rp
elif base_string == 'last-file-mirrored': quadlist[1] = rp
elif base_string == 'last-file-definitive': quadlist[3] = 1
else:
assert base_string == 'checkpoint-data'
quadlist[2] = rp
result[time] = quadlist
return result
def quad_to_si(cls, time, quad):
"""Take time, quadlist, return associated ResumeSessionInfo"""
increment_sym, mirror_sym, checkpoint_rp, last_definitive = quad
assert not (increment_sym and mirror_sym) # both shouldn't exist
ITR, finalizer = None, None
if increment_sym:
mirror = None
last_index = cls.sym_to_index(increment_sym)
if checkpoint_rp:
ITR, finalizer = cls.unpickle_checkpoint(checkpoint_rp)
elif mirror_sym:
mirror = 1
last_index = cls.sym_to_index(mirror_sym)
if checkpoint_rp:
finalizer = cls.unpickle_checkpoint(checkpoint_rp)
return ResumeSessionInfo(mirror, time, last_index, last_definitive,
finalizer, ITR)
def sym_to_index(cls, sym_rp):
"""Read last file sym rp, return last file index
If sym_rp is not a sym at all, return None, indicating that no
file index was ever conclusively processed.
"""
if not sym_rp.issym(): return None
link_components = sym_rp.readlink().split("/")
assert link_components[0] == 'increments'
return tuple(link_components[1:])
def unpickle_checkpoint(cls, checkpoint_rp):
"""Read data from checkpoint_rp and return unpickled data
Return value is pair finalizer state for a mirror checkpoint,
and (patch increment ITR, finalizer state) for increment
checkpoint.
"""
fp = checkpoint_rp.open("rb")
data = fp.read()
fp.close()
return cPickle.loads(data)
def ResumeCheck(cls):
"""Return relevant ResumeSessionInfo if there's one we should resume
Also if find RSI to resume, reset current time to old resume
time.
"""
cls.SetSessionInfo()
if not cls._session_info_list:
if Globals.resume == 1:
Log.FatalError("User specified resume, but no data on "
"previous backup found.")
else: return None
else:
si = cls._session_info_list[-1]
if (Globals.resume == 1 or
(time.time() <= (si.time + Globals.resume_window) and
not Globals.resume == 0)):
Log("Resuming aborted backup dated %s" %
Time.timetopretty(si.time), 2)
Time.setcurtime(si.time)
if Globals.preserve_hardlinks:
if (not si.last_definitive or not
Hardlink.retrieve_checkpoint(Globals.rbdir, si.time)):
Log("Hardlink information not successfully "
"recovered.", 2)
return si
else:
Log("Last backup dated %s was aborted, but we aren't "
"resuming it." % Time.timetopretty(si.time), 2)
return None
assert 0
MakeClass(Resume)
class ResumeSessionInfo:
"""Hold information about a previously aborted session"""
def __init__(self, mirror, time, last_index,
last_definitive, finalizer_state = None, ITR_state = None):
"""Class initializer
time - starting time in seconds of backup
mirror - true if backup was a mirror, false if increment
last_index - Last confirmed index processed by backup, or None
last_definitive - True is we know last_index is really last
finalizer_state - finalizer reducer state if available
ITR_state - For increment, ITM reducer state (assume mirror if NA)
"""
self.time = time
self.mirror = mirror
self.last_index = last_index
self.last_definitive = last_definitive
self.ITR_state, self.finalizer_state, = ITR_state, finalizer_state
|