summaryrefslogtreecommitdiff
path: root/rdiff-backup/src/highlevel.py
blob: 1e1b3deaa2892b8fbba76c5877e78f441f5cca34 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
from __future__ import generators
execfile("filelist.py")

#######################################################################
#
# highlevel - High level functions for mirroring, mirror & inc, etc.
#

class SkipFileException(Exception):
	"""Signal that the current file should be skipped but then continue

	This exception will often be raised when there is problem reading
	an individual file, but it makes sense for the rest of the backup
	to keep going.

	"""
	pass


class HighLevel:
	"""High level static functions

	The design of some of these functions is represented on the
	accompanying diagram.

	"""
	def Mirror(src_rpath, dest_rpath, checkpoint = 1, session_info = None):
		"""Turn dest_rpath into a copy of src_rpath

		Checkpoint true means to checkpoint periodically, otherwise
		not.  If session_info is given, try to resume Mirroring from
		that point.

		"""
		SourceS = src_rpath.conn.HLSourceStruct
		DestS = dest_rpath.conn.HLDestinationStruct

		SourceS.set_session_info(session_info)
		DestS.set_session_info(session_info)
		src_init_dsiter = SourceS.split_initial_dsiter(src_rpath)
		dest_sigiter = DestS.get_sigs(dest_rpath, src_init_dsiter)
		diffiter = SourceS.get_diffs_and_finalize(dest_sigiter)
		DestS.patch_and_finalize(dest_rpath, diffiter, checkpoint)

		dest_rpath.setdata()

	def Mirror_and_increment(src_rpath, dest_rpath, inc_rpath,
							 session_info = None):
		"""Mirror + put increments in tree based at inc_rpath"""
		SourceS = src_rpath.conn.HLSourceStruct
		DestS = dest_rpath.conn.HLDestinationStruct

		SourceS.set_session_info(session_info)
		DestS.set_session_info(session_info)
		if not session_info: dest_rpath.conn.SaveState.touch_last_file()
		src_init_dsiter = SourceS.split_initial_dsiter(src_rpath)
		dest_sigiter = DestS.get_sigs(dest_rpath, src_init_dsiter)
		diffiter = SourceS.get_diffs_and_finalize(dest_sigiter)
		DestS.patch_increment_and_finalize(dest_rpath, diffiter, inc_rpath)

		dest_rpath.setdata()
		inc_rpath.setdata()

	def Restore(rest_time, mirror_base, baseinc_tup, target_base):
		"""Like Restore.RestoreRecursive but check arguments"""
		if not isinstance(target_base, DSRPath):
			target_base = DSRPath(target_base.conn, target_base.base,
								  target_base.index, target_base.data)
		Restore.RestoreRecursive(rest_time, mirror_base,
								 baseinc_tup, target_base)

MakeStatic(HighLevel)


class HLSourceStruct:
	"""Hold info used by HL on the source side"""
	_session_info = None # set to si if resuming
	def set_session_info(cls, session_info):
		cls._session_info = session_info

	def iterate_from(cls, rpath):
		"""Supply more aruments to DestructiveStepping.Iterate_from"""
		if cls._session_info:
			return DestructiveStepping.Iterate_from(rpath, 1,
									   cls._session_info.last_index)
		else: return DestructiveStepping.Iterate_from(rpath, 1)

	def split_initial_dsiter(cls, rpath):
		"""Set iterators of all dsrps from rpath, returning one"""
		dsiter = cls.iterate_from(rpath)
		initial_dsiter1, cls.initial_dsiter2 = Iter.multiplex(dsiter, 2)
		return initial_dsiter1

	def get_diffs_and_finalize(cls, sigiter):
		"""Return diffs and finalize any dsrp changes remaining

		Return a rorpiterator with files included of signatures of
		dissimilar files.  This is the last operation run on the local
		filestream, so finalize dsrp writes.

		"""
		collated = RORPIter.CollateIterators(cls.initial_dsiter2, sigiter)
		finalizer = DestructiveStepping.Finalizer()
		def diffs():
			for dsrp, dest_sig in collated:
				try:
					if dest_sig:
						if dest_sig.isplaceholder(): yield dest_sig
						else: yield RORPIter.diffonce(dest_sig, dsrp)
					if dsrp: finalizer(dsrp)
				except (IOError, OSError, RdiffException):
					Log.exception()
					Log("Error processing %s, skipping" %
						str(dest_sig.index), 2)
			finalizer.getresult()
		return diffs()

MakeClass(HLSourceStruct)


class HLDestinationStruct:
	"""Hold info used by HL on the destination side"""
	_session_info = None # set to si if resuming
	def set_session_info(cls, session_info):
		cls._session_info = session_info

	def iterate_from(cls, rpath):
		"""Supply more arguments to DestructiveStepping.Iterate_from"""
		if cls._session_info:
			return DestructiveStepping.Iterate_from(rpath, None,
							       cls._session_info.last_index)
		else: return DestructiveStepping.Iterate_from(rpath, None)

	def split_initial_dsiter(cls, rpath):
		"""Set initial_dsiters (iteration of all dsrps from rpath)"""
		dsiter = cls.iterate_from(rpath)
		result, cls.initial_dsiter2 = Iter.multiplex(dsiter, 2)
		return result

	def get_dissimilar(cls, baserp, src_init_iter, dest_init_iter):
		"""Get dissimilars

		Returns an iterator which enumerates the dsrps which are
		different on the source and destination ends.  The dsrps do
		not necessarily exist on the destination end.

		Also, to prevent the system from getting backed up on the
		remote end, if we don't get enough dissimilars, stick in a
		placeholder every so often, like fiber.  The more
		placeholders, the more bandwidth used, but if there aren't
		enough, lots of memory will be used because files will be
		accumulating on the source side.  How much will accumulate
		will depend on the Globals.conn_bufsize value.

		"""
		collated = RORPIter.CollateIterators(src_init_iter, dest_init_iter)
		def generate_dissimilar():
			counter = 0
			for src_rorp, dest_dsrp in collated:
				if not dest_dsrp:
					dsrp = DSRPath(baserp.conn, baserp.base, src_rorp.index)
					if dsrp.lstat():
						Log("Warning: Found unexpected destination file %s."
							% dsrp.path, 2)
						if DestructiveStepping.isexcluded(dsrp, None): continue
					counter = 0
					yield dsrp
				elif not src_rorp or not src_rorp == dest_dsrp:
					counter = 0
					yield dest_dsrp
				else: # source and destinition both exist and are same
					if counter == 20:
						placeholder = RORPath(src_rorp.index)
						placeholder.make_placeholder()
						counter = 0
						yield placeholder
					else: counter += 1
		return generate_dissimilar()

	def get_sigs(cls, baserp, src_init_iter):
		"""Return signatures of all dissimilar files"""
		dest_iters1 = cls.split_initial_dsiter(baserp)
		dissimilars = cls.get_dissimilar(baserp, src_init_iter, dest_iters1)
		return RORPIter.Signatures(dissimilars)

	def get_dsrp(cls, dest_rpath, index):
		"""Return initialized dsrp based on dest_rpath with given index"""
		dsrp = DSRPath(dest_rpath.conn, dest_rpath.base, index)
		DestructiveStepping.initialize(dsrp, None)
		return dsrp

	def get_finalizer(cls):
		"""Return finalizer, starting from session info if necessary"""
		init_state = cls._session_info and cls._session_info.finalizer_state
		return DestructiveStepping.Finalizer(init_state)

	def get_ITR(cls, inc_rpath):
		"""Return ITR, starting from state if necessary"""
		init_state = cls._session_info and cls._session_info.ITR_state
		return Inc.make_patch_increment_ITR(inc_rpath, init_state)

	def patch_and_finalize(cls, dest_rpath, diffs, checkpoint = 1):
		"""Apply diffs and finalize"""
		collated = RORPIter.CollateIterators(diffs, cls.initial_dsiter2)
		finalizer = cls.get_finalizer()
		dsrp = None
		
		def error_checked():
			"""Inner writing loop, check this for errors"""
			indexed_tuple = collated.next()
			Log("Processing %s" % str(indexed_tuple), 7)
			diff_rorp, dsrp = indexed_tuple
			if not dsrp:
				dsrp = cls.get_dsrp(dest_rpath, diff_rorp.index)
				DestructiveStepping.initialize(dsrp, None)
			if diff_rorp and not diff_rorp.isplaceholder():
				RORPIter.patchonce_action(None, dsrp, diff_rorp).execute()
			finalizer(dsrp)
			return dsrp

		try:
			while 1:
				try: dsrp = cls.check_skip_error(error_checked)
				except StopIteration: break
				if checkpoint: SaveState.checkpoint_mirror(finalizer, dsrp)
		except: cls.handle_last_error(dsrp, finalizer)
		finalizer.getresult()
		if checkpoint: SaveState.checkpoint_remove()

	def patch_increment_and_finalize(cls, dest_rpath, diffs, inc_rpath):
		"""Apply diffs, write increment if necessary, and finalize"""
		collated = RORPIter.CollateIterators(diffs, cls.initial_dsiter2)
		finalizer, ITR = cls.get_finalizer(), cls.get_ITR(inc_rpath)
		dsrp = None

		def error_checked():
			"""Inner writing loop, catch variety of errors from this"""
			indexed_tuple = collated.next()
			Log("Processing %s" % str(indexed_tuple), 7)
			diff_rorp, dsrp = indexed_tuple
			if not dsrp:
				dsrp = cls.get_dsrp(dest_rpath, indexed_tuple.index)
				DestructiveStepping.initialize(dsrp, None)
				indexed_tuple = IndexedTuple(indexed_tuple.index,
											 (diff_rorp, dsrp))
			if diff_rorp and diff_rorp.isplaceholder():
				indexed_tuple = IndexedTuple(indexed_tuple.index,
											 (None, dsrp))
			ITR(indexed_tuple)
			finalizer(dsrp)
			return dsrp

		try:
			while 1:
				try: dsrp = cls.check_skip_error(error_checked)
				except StopIteration: break
				SaveState.checkpoint_inc_backup(ITR, finalizer, dsrp)
		except: cls.handle_last_error(dsrp, finalizer, ITR)
		ITR.getresult()
		finalizer.getresult()
		SaveState.checkpoint_remove()

	def check_skip_error(cls, thunk):
		"""Run thunk, catch certain errors skip files"""
		try: return thunk()
		except (IOError, OSError, SkipFileException), exp:
			Log.exception()
			if (not isinstance(exp, IOError) or
				(isinstance(exp, IOError) and
				 (exp[0] in [2,  # Means that a file is missing
							 5,  # Reported by docv (see list)
							 13, # Permission denied IOError
							 20, # Means a directory changed to non-dir
							 26, # Requested by Campbell (see list) -
                                 # happens on some NT systems
							 36] # filename too long
				 ))):
				Log("Skipping file", 2)
				return None
			else: raise

	def handle_last_error(cls, dsrp, finalizer, ITR = None):
		"""If catch fatal error, try to checkpoint before exiting"""
		Log.exception(1)
		if ITR: SaveState.checkpoint_inc_backup(ITR, finalizer, dsrp, 1)
		else: SaveState.checkpoint_mirror(finalizer, dsrp, 1)
		SaveState.touch_last_file_definitive()
		raise

MakeClass(HLDestinationStruct)