summaryrefslogtreecommitdiff
path: root/rdiff-backup/src/robust.py
blob: 5a7f82eb630c77ef4dc4d8d5b9bc5a2c4c757844 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
import tempfile
execfile("rpath.py")

#######################################################################
#
# robust - code which prevents mirror from being corrupted, error-recovery
#
# Ideally no matter an instance of rdiff-backup gets aborted, no
# information should get lost.  The target directory should be left in
# a coherent state, and later instances of rdiff-backup should clean
# things up so there is no sign that anything ever got aborted or
# failed.
#
# Thus, files should be updated in an atomic way as possible.  Each
# file should be updated (and the corresponding diff files written) or
# not, and it should be clear which happened.  In general, I don't
# think this is possible, since the creation of the diff files and the
# changing of updated files cannot be guarateed to happen together.
# It is possible, I think, to record various information to files
# which would allow a later process to figure out what the last
# operation was, but this would add several file operations to the
# processing of each file, and I don't think, would be a good
# tradeoff.
#
# The compromise reached here is that diff files should be created
# just before the mirror files are updated, and each file update
# should be done with a rename operation on a file in the same
# directory.  Furthermore, every once in a while, rdiff-backup will
# record which file it just finished processing.  If any fatal errors
# are caught, it will also record the last processed file.  Future
# instances may not know exactly when the previous instance was
# aborted, but they will be able to narrow down the possibilities.

class RobustAction:
	"""Represents a file operation to be accomplished later"""
	def __init__(self, init_thunk, final_thunk, error_thunk):
		"""RobustAction initializer

		All the thunks are functions whose return value will be
		ignored.  init_thunk should not make any irreversible changes
		but prepare for the writing of the important data. final_thunk
		should be as short as possible and do the real work.
		error_thunk is run if there is an error in init_thunk or
		final_thunk.  Errors in init_thunk should be corrected by
		error_thunk as if nothing had been run in the first place.
		The functions take no arguments except for error_thunk, which
		receives the exception as its only argument.

		"""
		self.init_thunk = init_thunk
		self.final_thunk = final_thunk
		self.error_thunk = error_thunk

	def execute(self):
		"""Actually run the operation"""
		try:
			self.init_thunk()
			self.final_thunk()
		except Exception, exp: # Catch all errors
			Log.exception()
			self.error_thunk(exp)
			raise exp


class Robust:
	"""Contains various file operations made safer using tempfiles"""
	null_action = RobustAction(lambda: None, lambda: None, lambda e: None)
	def chain(robust_action_list):
		"""Return chain tying together a number of robust actions

		The whole chain will be aborted if some error occurs in
		initialization stage of any of the component actions.

		"""
		ras_with_completed_inits = []
		def init():
			for ra in robust_action_list:
				ras_with_completed_inits.append(ra)
				ra.init_thunk()
		def final():
			for ra in robust_action_list: ra.final_thunk()
		def error(exp):
			for ra in ras_with_completed_inits: ra.error_thunk(exp)
		return RobustAction(init, final, error)

	def chain_nested(robust_action_list):
		"""Like chain but final actions performed in reverse order"""
		ras_with_completed_inits = []
		def init():
			for ra in robust_action_list:
				ras_with_completed_inits.append(ra)
				ra.init_thunk()
		def final():
			ralist_copy = robust_action_list[:]
			ralist_copy.reverse()
			for ra in ralist_copy: ra.final_thunk()
		def error(exp):
			for ra in ras_with_completed_inits: ra.error_thunk(exp)
		return RobustAction(init, final, error)

	def make_tf_robustaction(init_thunk, tempfiles, final_renames = None):
		"""Shortcut RobustAction creator when only tempfiles involved

		Often the robust action will just consist of some initial
		stage, renaming tempfiles in the final stage, and deleting
		them if there is an error.  This function makes it easier to
		create RobustActions of that type.

		"""
		assert type(tempfiles) is types.TupleType, tempfiles
		if final_renames is None: final = lambda: None
		else:
			assert len(tempfiles) == len(final_renames)
			def final(): # rename tempfiles to final positions
				for i in range(len(tempfiles)):
					final_name = final_renames[i]
					if final_name:
						if final_name.isdir(): # Cannot rename over directory
							final_name.delete()
						tempfiles[i].rename(final_name)
		def error(exp):
			for tf in tempfiles: tf.delete()
		return RobustAction(init_thunk, final, error)

	def copy_action(rorpin, rpout):
		"""Return robust action copying rorpin to rpout

		The source can be a rorp or an rpath.  Does not recurse.  If
		directories copied, then just exit (output directory not
		overwritten).
		
		"""
		tfl = [None] # Need mutable object that init and final can access
		def init(): 
			if not (rorpin.isdir() and rpout.isdir()): # already a dir
				tfl[0] = TempFileManager.new(rpout)
				if rorpin.isreg(): tfl[0].write_from_fileobj(rorpin.open("rb"))
				else: RPath.copy(rorpin, tf)
		def final():
			if tfl[0] and tfl[0].lstat():
				if rpout.isdir(): rpout.delete()
				tfl[0].rename(rpout)
		return RobustAction(init, final, lambda e: tfl[0] and tfl[0].delete())

	def copy_with_attribs_action(rorpin, rpout):
		"""Like copy_action but also copy attributes"""
		tfl = [None] # Need mutable object that init and final can access
		def init(): 
			if not (rorpin.isdir() and rpout.isdir()): # already a dir
				tfl[0] = TempFileManager.new(rpout)
				if rorpin.isreg(): tfl[0].write_from_fileobj(rorpin.open("rb"))
				else: RPath.copy(rorpin, tfl[0])
				if tfl[0].lstat(): # Some files, like sockets, won't be created
					RPathStatic.copy_attribs(rorpin, tfl[0])
		def final():
			if rorpin.isdir() and rpout.isdir():
				RPath.copy_attribs(rorpin, rpout)
			elif tfl[0] and tfl[0].lstat():
				if rpout.isdir(): rpout.delete()
				tfl[0].rename(rpout)
		return RobustAction(init, final, lambda e: tfl[0] and tfl[0].delete())

	def copy_attribs_action(rorpin, rpout):
		"""Return action which just copies attributes

		Copying attributes is already pretty atomic, so just run
		normal sequence.

		"""
		def final(): RPath.copy_attribs(rorpin, rpout)
		return RobustAction(lambda: None, final, lambda e: None)

	def symlink_action(rpath, linktext):
		"""Return symlink action by moving one file over another"""
		tf = TempFileManager.new(rpath)
		def init(): tf.symlink(linktext)
		return Robust.make_tf_robustaction(init, (tf,), (rpath,))

	def destructive_write_action(rp, s):
		"""Return action writing string s to rpath rp in robust way

		This will overwrite any data currently in rp.

		"""
		tf = TempFileManager.new(rp)
		def init():
			fp = tf.open("wb")
			fp.write(s)
			assert not fp.close()
			tf.setdata()
		return Robust.make_tf_robustaction(init, (tf,), (rp,))

MakeStatic(Robust)


class TempFileManager:
	"""Manage temp files"""

	# This is a connection-specific list of temp files, to be cleaned
	# up before rdiff-backup exits.
	_tempfiles = []

	# To make collisions less likely, this gets put in the file name
	# and incremented whenever a new file is requested.
	_tfindex = 0

	def new(cls, rp_base, same_dir = 1):
		"""Return new tempfile that isn't in use.

		If same_dir, tempfile will be in same directory as rp_base.
		Otherwise, use tempfile module to get filename.

		"""
		conn = rp_base.conn
		if conn is not Globals.local_connection:
			return conn.TempFileManager.new(rp_base, same_dir)

		def find_unused(conn, dir):
			"""Find an unused tempfile with connection conn in directory dir"""
			while 1:
				if cls._tfindex > 100000000:
					Log("Resetting index", 2)
					cls._tfindex = 0
				tf = TempFile(conn, os.path.join(dir,
								       "rdiff-backup.tmp.%d" % cls._tfindex))
				cls._tfindex = cls._tfindex+1
				if not tf.lstat(): return tf

		if same_dir: tf = find_unused(conn, rp_base.dirsplit()[0])
		else: tf = TempFile(conn, tempfile.mktemp())
		cls._tempfiles.append(tf)
		return tf

	def remove_listing(cls, tempfile):
		"""Remove listing of tempfile"""
		if Globals.local_connection is not tempfile.conn:
			tempfile.conn.TempFileManager.remove_listing(tempfile)
		elif tempfile in cls._tempfiles: cls._tempfiles.remove(tempfile)

	def delete_all(cls):
		"""Delete all remaining tempfiles"""
		for tf in cls._tempfiles[:]: tf.delete()

MakeClass(TempFileManager)


class TempFile(RPath):
	"""Like an RPath, but keep track of which ones are still here"""
	def rename(self, rp_dest):
		"""Rename temp file to permanent location, possibly overwriting"""
		if self.isdir() and not rp_dest.isdir():
			# Cannot move a directory directly over another file
			rp_dest.delete()
			if (isinstance(rp_dest, DSRPath) and rp_dest.perms_delayed
				and not self.hasfullperms()):
				# If we are moving to a delayed perm directory, delay
				# permission change on destination.
				rp_dest.chmod(self.getperms())
				self.chmod(0700)
		RPathStatic.rename(self, rp_dest)
		TempFileManager.remove_listing(self)

	def delete(self):
		RPath.delete(self)
		TempFileManager.remove_listing(self)


class SaveState:
	"""Save state in the middle of backups for resuming later"""
	_last_file_sym = None # RPath of sym pointing to last file processed
	_last_file_definitive_rp = None # Touch this if last file is really last
	_last_checkpoint_time = 0 # time in seconds of last checkpoint
	_checkpoint_rp = None # RPath of checkpoint data pickle
	
	def init_filenames(cls, incrementing):
		"""Set rpaths of markers.  Assume rbdir already set.

		If incrementing, then indicate increment operation, otherwise
		indicate mirror.

		"""
		if not Globals.isbackup_writer:
			return Globals.backup_writer.SaveState.init_filenames(incrementing)

		assert Globals.local_connection is Globals.rbdir.conn, \
			   Globals.rbdir.conn
		if incrementing: cls._last_file_sym = Globals.rbdir.append(
			"last-file-incremented.%s.snapshot" % Time.curtimestr)
		else: cls._last_file_sym = Globals.rbdir.append(
			"last-file-mirrored.%s.snapshot" % Time.curtimestr)
		cls._checkpoint_rp = Globals.rbdir.append(
			"checkpoint-data.%s.snapshot" % Time.curtimestr)
		cls._last_file_definitive_rp = Globals.rbdir.append(
			"last-file-definitive.%s.snapshot" % Time.curtimestr)

	def touch_last_file(cls):
		"""Touch last file marker, indicating backup has begun"""
		if not cls._last_file_sym.lstat(): cls._last_file_sym.touch()
		
	def touch_last_file_definitive(cls):
		"""Create last-file-definitive marker

		When a backup gets aborted, there may be time to indicate the
		last file successfully processed, and this should be touched.
		Sometimes when the abort is hard, there may be a last file
		indicated, but further files since then have been processed,
		in which case this shouldn't be touched.

		"""
		cls._last_file_definitive_rp.touch()

	def record_last_file_action(cls, last_file_rorp):
		"""Action recording last file to be processed as symlink in rbdir

		last_file_rorp is None means that no file is known to have
		been processed.

		"""
		if last_file_rorp:
			symtext = apply(os.path.join,
							('increments',) + last_file_rorp.index)
			return Robust.symlink_action(cls._last_file_sym, symtext)
		else: return RobustAction(lambda: None, cls.touch_last_file,
								  lambda exp: None)

	def checkpoint_inc_backup(cls, ITR, finalizer, last_file_rorp,
							  override = None):
		"""Save states of tree reducer and finalizer during inc backup

		If override is true, checkpoint even if one isn't due.

		"""
		if not override and not cls.checkpoint_needed(): return
		assert cls._checkpoint_rp, "_checkpoint_rp not set yet"

		cls._last_checkpoint_time = time.time()
		Log("Writing checkpoint time %s" % cls._last_checkpoint_time, 7)
		state_string = cPickle.dumps((ITR.getstate(), finalizer.getstate()))
		Robust.chain([Robust.destructive_write_action(cls._checkpoint_rp,
													  state_string),
					  cls.record_last_file_action(last_file_rorp)]).execute()

	def checkpoint_mirror(cls, finalizer, last_file_rorp, override = None):
		"""For a mirror, only finalizer and last_file should be saved"""
		if not override and not cls.checkpoint_needed(): return
		if not cls._checkpoint_rp:
			Log("Warning, _checkpoint_rp not set yet", 2)
			return

		cls._last_checkpoint_time = time.time()
		Log("Writing checkpoint time %s" % cls._last_checkpoint_time, 7)
		state_string = cPickle.dumps(finalizer.getstate())
		Robust.chain([Robust.destructive_write_action(cls._checkpoint_rp,
													  state_string),
					  cls.record_last_file_action(last_file_rorp)]).execute()

	def checkpoint_needed(cls):
		"""Returns true if another checkpoint is called for"""
		return (time.time() > cls._last_checkpoint_time +
				Globals.checkpoint_interval)

	def checkpoint_remove(cls):
		"""Remove all checkpointing data after successful operation"""
		for rp in Resume.get_relevant_rps(): rp.delete()

MakeClass(SaveState)


class Resume:
	"""Check for old aborted backups and resume if necessary"""
	_session_info_list = None # List of ResumeSessionInfo's, sorted by time
	def FindTime(cls, index, later_than = 0):
		"""For a given index, find the appropriate time to use for inc

		If it is clear which time to use (because it is determined by
		definitive records, or there are no aborted backup, etc.) then
		just return the appropriate time.  Otherwise, if an aborted
		backup was last checkpointed before the index, assume that it
		didn't get there, and go for the older time.  If an inc file
		is already present, the function will be rerun with later time
		specified.

		"""
		if Time.prevtime > later_than: return Time.prevtime # usual case

		for si in cls.get_sis_covering_index(index):
			if si.time > later_than: return si.time
		raise SkipFileException("Index %s already covered, skipping" %
								str(index))

	def get_sis_covering_index(cls, index):
		"""Return sorted list of SessionInfos which may cover index

		Aborted backup may be relevant unless index is lower and we
		are sure that it didn't go further.

		"""
		return filter(lambda session_info:
					  not ((session_info.last_index is None or
							session_info.last_index < index) and
						   session_info.last_definitive), 
				cls._session_info_list)

	def SetSessionInfo(cls):
		"""Read data directory and initialize _session_info"""
		assert Globals.isbackup_writer
		silist = []
		rp_quad_dict = cls.group_rps_by_time(cls.get_relevant_rps())
		times = rp_quad_dict.keys()
		times.sort()
		for time in times:
			silist.append(cls.quad_to_si(time, rp_quad_dict[time]))
		cls._session_info_list = silist

	def get_relevant_rps(cls):
		"""Return list of relevant rpaths in rbdata directory"""
		relevant_bases = ['last-file-incremented', 'last-file-mirrored',
						  'checkpoint-data', 'last-file-definitive']
		rps = map(Globals.rbdir.append, Globals.rbdir.listdir())
		return filter(lambda rp: rp.isincfile()
					  and rp.getincbase_str() in relevant_bases, rps)

	def group_rps_by_time(cls, rplist):
		"""Take list of rps return time dict {time: quadlist}

		Times in seconds are the keys, values are triples of rps
		[last-file-incremented, last-file-mirrored, checkpoint-data,
		 last-is-definitive].

		"""
		result = {}
		for rp in rplist:
			time = Time.stringtotime(rp.getinctime())
			if result.has_key(time): quadlist = result[time]
			else: quadlist = [None, None, None, None]
			base_string = rp.getincbase_str()
			if base_string == 'last-file-incremented': quadlist[0] = rp
			elif base_string == 'last-file-mirrored': quadlist[1] = rp
			elif base_string == 'last-file-definitive': quadlist[3] = 1
			else:
				assert base_string == 'checkpoint-data'
				quadlist[2] = rp
			result[time] = quadlist
		return result

	def quad_to_si(cls, time, quad):
		"""Take time, quadlist, return associated ResumeSessionInfo"""
		increment_sym, mirror_sym, checkpoint_rp, last_definitive = quad
		assert not (increment_sym and mirror_sym) # both shouldn't exist
		ITR, finalizer = None, None
		if increment_sym:
			mirror = None
			last_index = cls.sym_to_index(increment_sym)
			if checkpoint_rp:
				ITR, finalizer = cls.unpickle_checkpoint(checkpoint_rp)
		elif mirror_sym:
			mirror = 1
			last_index = cls.sym_to_index(mirror_sym)
			if checkpoint_rp:
				finalizer = cls.unpickle_checkpoint(checkpoint_rp)
		return ResumeSessionInfo(mirror, time, last_index, last_definitive,
								 finalizer, ITR)

	def sym_to_index(cls, sym_rp):
		"""Read last file sym rp, return last file index

		If sym_rp is not a sym at all, return None, indicating that no
		file index was ever conclusively processed.

		"""
		if not sym_rp.issym(): return None
		link_components = sym_rp.readlink().split("/")
		assert link_components[0] == 'increments'
		return tuple(link_components[1:])

	def unpickle_checkpoint(cls, checkpoint_rp):
		"""Read data from checkpoint_rp and return unpickled data

		Return value is pair finalizer state for a mirror checkpoint,
		and (patch increment ITR, finalizer state) for increment
		checkpoint.

		"""
		fp = checkpoint_rp.open("rb")
		data = fp.read()
		fp.close()
		return cPickle.loads(data)

	def ResumeCheck(cls):
		"""Return relevant ResumeSessionInfo if there's one we should resume

		Also if find RSI to resume, reset current time to old resume
		time.

		"""
		cls.SetSessionInfo()
		if not cls._session_info_list:
			if Globals.resume == 1: 
				Log.FatalError("User specified resume, but no data on "
							   "previous backup found.")
			else: return None
		else:
			si = cls._session_info_list[-1]
			if (Globals.resume == 1 or
				(time.time() <= (si.time + Globals.resume_window) and
				 not Globals.resume == 0)):
				Log("Resuming aborted backup dated %s" %
					Time.timetopretty(si.time), 2)
				Time.setcurtime(si.time)
				return si
			else:
				Log("Last backup dated %s was aborted, but we aren't "
					"resuming it." % Time.timetopretty(si.time), 2)
				return None
		assert 0

MakeClass(Resume)


class ResumeSessionInfo:
	"""Hold information about a previously aborted session"""
	def __init__(self, mirror, time, last_index,
				 last_definitive, finalizer_state = None, ITR_state = None):
		"""Class initializer

		time - starting time in seconds of backup
		mirror - true if backup was a mirror, false if increment
		last_index - Last confirmed index processed by backup, or None
		last_definitive - True is we know last_index is really last
		finalizer_state - finalizer reducer state if available
		ITR_state - For increment, ITM reducer state (assume mirror if NA)

		"""
		self.time = time
		self.mirror = mirror
		self.last_index = last_index
		self.last_definitive = last_definitive
		self.ITR_state, self.finalizer_state, = ITR_state, finalizer_state