1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
|
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
"""
Module: qlslibs.efp
Contains empty file pool (EFP) classes.
"""
import os
import os.path
import qlslibs.err
import shutil
import uuid
class EfpManager(object):
"""
Top level class to analyze the Qpid Linear Store (QLS) directory for the partitions that make up the
Empty File Pool (EFP).
"""
def __init__(self, directory, disk_space_required_kb):
if not os.path.exists(directory):
raise qlslibs.err.InvalidQlsDirectoryNameError(directory)
self.directory = directory
self.disk_space_required_kb = disk_space_required_kb
self.efp_partitions = []
self.efp_pools = {}
self.total_num_files = 0
self.total_cum_file_size_kb = 0
self.current_efp_partition = None
def add_file_pool(self, file_size_kb, num_files):
""" Add an EFP in the specified partition of the specified size containing the specified number of files """
dir_name = EmptyFilePool.get_directory_name(file_size_kb)
print 'Adding pool \'%s\' to partition %s' % (dir_name, self.current_efp_partition.partition_number)
self.total_cum_file_size_kb += self.current_efp_partition.create_new_efp(file_size_kb, num_files)
self.total_num_files += num_files
def freshen_file_pool(self, file_size_kb, num_files):
""" Freshen an EFP in the specified partition and of the specified size to the specified number of files """
if self.current_efp_partition is None:
partition_list = self.efp_partitions
partition_str = 'all partitions'
else:
partition_list = [self.current_efp_partition]
partition_str = 'partition %d' % self.current_efp_partition.partition_number
if file_size_kb is None:
pool_str = 'all pools'
else:
pool_str = 'pool \'%s\'' % EmptyFilePool.get_directory_name(int(file_size_kb))
print 'Freshening %s in %s to %d files' % (pool_str, partition_str, num_files)
for self.current_efp_partition in partition_list: # Partition objects
if file_size_kb is None:
file_size_list = self.current_efp_partition.efp_pools.keys()
else:
file_size_list = ['%sk' % file_size_kb]
for file_size in file_size_list:
efp = self.current_efp_partition.efp_pools[file_size]
num_files_needed = num_files - efp.get_tot_file_count()
if num_files_needed > 0:
self.current_efp_partition.create_new_efp_files(qlslibs.utils.efp_directory_size(file_size),
num_files_needed)
else:
print ' WARNING: Pool %s in partition %s already contains %d files: no action taken' % \
(self.current_efp_partition.efp_pools[file_size].size_str,
self.current_efp_partition.partition_number, efp.get_num_files())
def remove_file_pool(self, file_size_kb):
""" Remove an existing EFP from the specified partition and of the specified size """
dir_name = EmptyFilePool.get_directory_name(file_size_kb)
print 'Removing pool \'%s\' from partition %s' % (dir_name, self.current_efp_partition.partition_number)
self.efp_partitions.remove(self.current_efp_partition)
shutil.rmtree(os.path.join(self.current_efp_partition.efp_directory, dir_name))
def report(self):
print 'Empty File Pool (EFP) report'
print '============================'
print 'Found', len(self.efp_partitions), 'partition(s)'
if (len(self.efp_partitions)) > 0:
sorted_efp_partitions = sorted(self.efp_partitions, key=lambda x: x.partition_number)
EfpPartition.print_report_table_header()
for ptn in sorted_efp_partitions:
ptn.print_report_table_line()
print
for ptn in sorted_efp_partitions:
ptn.report()
def run(self, arg_tup):
self._analyze_efp()
if arg_tup is not None:
_, arg_file_size, arg_num_files, arg_add, arg_remove, arg_freshen, arg_list = arg_tup
self._check_args(arg_tup)
if arg_add:
self.add_file_pool(int(arg_file_size), int(arg_num_files))
if arg_remove:
self.remove_file_pool(int(arg_file_size))
if arg_freshen:
self.freshen_file_pool(arg_file_size, int(arg_num_files))
if arg_list:
self.report()
def _analyze_efp(self):
for dir_entry in os.listdir(self.directory):
try:
efp_partition = EfpPartition(os.path.join(self.directory, dir_entry), self.disk_space_required_kb)
efp_partition.scan()
self.efp_partitions.append(efp_partition)
for efpl in efp_partition.efp_pools.iterkeys():
if efpl not in self.efp_pools:
self.efp_pools[efpl] = []
self.efp_pools[efpl].append(efp_partition.efp_pools[efpl])
self.total_num_files += efp_partition.tot_file_count
self.total_cum_file_size_kb += efp_partition.tot_file_size_kb
except qlslibs.err.InvalidPartitionDirectoryNameError:
pass
def _check_args(self, arg_tup):
""" Value check of args. The names of partitions and pools are validated against the discovered instances """
arg_partition, arg_file_size, _, arg_add, arg_remove, arg_freshen, _ = arg_tup
if arg_partition is not None:
try:
if arg_partition[0] == 'p': # string partition name, eg 'p001'
partition_num = int(arg_partition[1:])
else: # numeric partition, eg '1'
partition_num = int(arg_partition)
found = False
for partition in self.efp_partitions:
if partition.partition_number == partition_num:
self.current_efp_partition = partition
found = True
break
if not found:
raise qlslibs.err.PartitionDoesNotExistError(arg_partition)
except ValueError:
raise qlslibs.err.InvalidPartitionDirectoryNameError(arg_partition)
if self.current_efp_partition is not None:
pool_list = self.current_efp_partition.efp_pools.keys()
efp_directory_name = EmptyFilePool.get_directory_name(int(arg_file_size))
if arg_add and efp_directory_name in pool_list:
raise qlslibs.err.PoolDirectoryAlreadyExistsError(efp_directory_name)
if (arg_remove or arg_freshen) and efp_directory_name not in pool_list:
raise qlslibs.err.PoolDirectoryDoesNotExistError(efp_directory_name)
class EfpPartition(object):
"""
Class that represents a EFP partition. Each partition contains one or more Empty File Pools (EFPs).
"""
PTN_DIR_PREFIX = 'p'
EFP_DIR_NAME = 'efp'
def __init__(self, directory, disk_space_required_kb):
self.directory = directory
self.partition_number = None
self.efp_pools = {}
self.tot_file_count = 0
self.tot_file_size_kb = 0
self._validate_partition_directory(disk_space_required_kb)
def create_new_efp_files(self, file_size_kb, num_files):
""" Create new EFP files in this partition """
dir_name = EmptyFilePool.get_directory_name(file_size_kb)
if dir_name in self.efp_pools.keys():
efp = self.efp_pools[dir_name]
else:
efp = EmptyFilePool(os.path.join(self.directory, EfpPartition.EFP_DIR_NAME), dir_name)
this_tot_file_size_kb = efp.create_new_efp_files(num_files)
self.tot_file_size_kb += this_tot_file_size_kb
self.tot_file_count += num_files
return this_tot_file_size_kb
@staticmethod
def print_report_table_header():
print 'p_no no_efp tot_files tot_size_kb directory'
print '---- ------ --------- ----------- ---------'
def print_report_table_line(self):
print '%4d %6d %9d %11d %s' % (self.partition_number, len(self.efp_pools), self.tot_file_count,
self.tot_file_size_kb, self.directory)
def report(self):
print 'Partition %s:' % os.path.basename(self.directory)
if len(self.efp_pools) > 0:
EmptyFilePool.print_report_table_header()
for dir_name in self.efp_pools.keys():
self.efp_pools[dir_name].print_report_table_line()
else:
print '<empty - no EFPs found in this partition>'
print
def scan(self):
if os.path.exists(self.directory):
efp_dir = os.path.join(self.directory, EfpPartition.EFP_DIR_NAME)
for dir_entry in os.listdir(efp_dir):
efp = EmptyFilePool(os.path.join(efp_dir, dir_entry), self.partition_number)
efp.scan()
self.tot_file_count += efp.get_tot_file_count()
self.tot_file_size_kb += efp.get_tot_file_size_kb()
self.efp_pools[dir_entry] = efp
def _validate_partition_directory(self, disk_space_required_kb):
if os.path.basename(self.directory)[0] is not EfpPartition.PTN_DIR_PREFIX:
raise qlslibs.err.InvalidPartitionDirectoryNameError(self.directory)
try:
self.partition_number = int(os.path.basename(self.directory)[1:])
except ValueError:
raise qlslibs.err.InvalidPartitionDirectoryNameError(self.directory)
if not qlslibs.utils.has_write_permission(self.directory):
raise qlslibs.err.WritePermissionError(self.directory)
if disk_space_required_kb is not None:
space_avail = qlslibs.utils.get_avail_disk_space(self.directory)
if space_avail < (disk_space_required_kb * 1024):
raise qlslibs.err.InsufficientSpaceOnDiskError(self.directory, space_avail,
disk_space_required_kb * 1024)
class EmptyFilePool(object):
"""
Class that represents a single Empty File Pool within a partition. Each EFP contains pre-formatted linear store
journal files (but it may also be empty).
"""
EFP_DIR_SUFFIX = 'k'
EFP_JRNL_EXTENTION = '.jrnl'
EFP_INUSE_DIRNAME = 'in_use'
EFP_RETURNED_DIRNAME = 'returned'
def __init__(self, directory, partition_number):
self.base_dir_name = os.path.basename(directory)
self.directory = directory
self.partition_number = partition_number
self.data_size_kb = None
self.efp_files = []
self.in_use_files = []
self.returned_files = []
self._validate_efp_directory()
def create_new_efp_files(self, num_files):
""" Create one or more new empty journal files of the prescribed size for this EFP """
this_total_file_size = 0
for _ in range(num_files):
this_total_file_size += self._create_new_efp_file()
return this_total_file_size
def get_directory(self):
return self.directory
@staticmethod
def get_directory_name(file_size_kb):
""" Static function to create an EFP directory name from the size of the files it contains """
return '%dk' % file_size_kb
def get_tot_file_count(self):
return len(self.efp_files)
def get_tot_file_size_kb(self):
return self.data_size_kb * len(self.efp_files)
@staticmethod
def print_report_table_header():
print ' ---------- efp ------------ --------- in_use ---------- -------- returned ---------'
print 'data_size_kb file_count tot_file_size_kb file_count tot_file_size_kb file_count tot_file_size_kb efp_directory'
print '------------ ---------- ---------------- ---------- ---------------- ---------- ---------------- -------------'
def print_report_table_line(self):
print '%12d %10d %16d %10d %16d %10d %16d %s' % (self.data_size_kb, len(self.efp_files),
self.data_size_kb * len(self.efp_files),
len(self.in_use_files),
self.data_size_kb * len(self.in_use_files),
len(self.returned_files),
self.data_size_kb * len(self.returned_files),
self.get_directory())
def scan(self):
for efp_file in os.listdir(self.directory):
if efp_file == self.EFP_INUSE_DIRNAME:
for in_use_file in os.listdir(os.path.join(self.directory, self.EFP_INUSE_DIRNAME)):
self.in_use_files.append(in_use_file)
continue
if efp_file == self.EFP_RETURNED_DIRNAME:
for returned_file in os.listdir(os.path.join(self.directory, self.EFP_RETURNED_DIRNAME)):
self.returned_files.append(returned_file)
continue
if self._validate_efp_file(os.path.join(self.directory, efp_file)):
self.efp_files.append(efp_file)
def _add_efp_file(self, efp_file_name):
""" Add a single journal file of the appropriate size to this EFP. No file size check is made here. """
self.efp_files.append(efp_file_name)
def _create_new_efp_file(self):
""" Create a single new empty journal file of the prescribed size for this EFP """
file_name = str(uuid.uuid4()) + EmptyFilePool.EFP_JRNL_EXTENTION
file_header = qlslibs.jrnl.FileHeader(0, qlslibs.jrnl.FileHeader.MAGIC, qlslibs.utils.DEFAULT_RECORD_VERSION,
0, 0, 0)
file_header.init(None, None, qlslibs.utils.DEFAULT_HEADER_SIZE_SBLKS, self.partition_number, self.data_size_kb,
0, 0, 0, 0, 0)
efh = file_header.encode()
efh_bytes = len(efh)
file_handle = open(os.path.join(self.directory, file_name), 'wb')
file_handle.write(efh)
file_handle.write('\xff' * (qlslibs.utils.DEFAULT_SBLK_SIZE - efh_bytes))
file_handle.write('\x00' * (int(self.data_size_kb) * 1024))
file_handle.close()
fqfn = os.path.join(self.directory, file_name)
self._add_efp_file(fqfn)
return os.path.getsize(fqfn)
def _validate_efp_directory(self):
if self.base_dir_name[-1] is not EmptyFilePool.EFP_DIR_SUFFIX:
raise qlslibs.err.InvalidEfpDirectoryNameError(self.directory)
try:
self.data_size_kb = int(os.path.basename(self.base_dir_name)[:-1])
except ValueError:
raise qlslibs.err.InvalidEfpDirectoryNameError(self.directory)
def _validate_efp_file(self, efp_file):
file_size = os.path.getsize(efp_file)
expected_file_size = (self.data_size_kb * 1024) + qlslibs.utils.DEFAULT_SBLK_SIZE
if file_size != expected_file_size:
print 'WARNING: File %s not of correct size (size=%d, expected=%d): Ignoring' % (efp_file, file_size,
expected_file_size)
return False
file_handle = open(efp_file)
args = qlslibs.utils.load_args(file_handle, qlslibs.jrnl.RecordHeader)
file_hdr = qlslibs.jrnl.FileHeader(*args)
file_hdr.init(file_handle, *qlslibs.utils.load_args(file_handle, qlslibs.jrnl.FileHeader))
if not file_hdr.is_header_valid(file_hdr):
file_handle.close()
return False
file_hdr.load(file_handle)
file_handle.close()
if not file_hdr.is_valid(True):
return False
return True
# =============================================================================
if __name__ == "__main__":
print "This is a library, and cannot be executed."
|