OSDManager.py 34.9 KB
Newer Older
1
2
3
import os
import pickle
import subprocess
Felix Seibert's avatar
Felix Seibert committed
4
5
from urllib import request
import urllib.error
6
import shutil
7
import time
Felix Seibert's avatar
Felix Seibert committed
8
import datetime
9
import random
10

Felix Seibert's avatar
Felix Seibert committed
11
12
13
14
from xtreemfs_client import dataDistribution
from xtreemfs_client import div_util
from xtreemfs_client import folder
from xtreemfs_client import dirstatuspageparser
15
from xtreemfs_client import physicalPlacementRealizer
16
17
18
19

'''
xOSDManager - a python module to manage OSD selection in XtreemFS
currently only depth (level) 2 subdirectories can be managed
20
only unix-based OSs are supported
21
22
'''

23
24
25
max_processes_change_policy = 200
max_processes_add_replica = 200
max_processes_delete_replica = 200
26

27

28
class OSDManager(object):
Felix Seibert's avatar
Felix Seibert committed
29
30
    # TODO add support for arbitrary subdirectory level
    # (currently depth=2 is hardcoded, which is fine for GeoMultiSens purposes)
31
32
    def __init__(self, path_to_managed_folder, config_file='.das_config', value_map=None, debug=False):

33
34
        self.managed_folder = path_to_managed_folder
        self.config_file = config_file
35
36
37
38
39
40
41
        self.debug = debug

        if value_map is None:

            if not div_util.check_for_executable('xtfsutil'):
                raise ExecutableNotFoundException("No xtfsutil found. Please make sure it is contained in your PATH.")

42
            output_1 = subprocess.run(["xtfsutil", self.managed_folder], stdout=subprocess.PIPE, stderr=subprocess.PIPE,
43
44
45
46
47
48
49
50
51
                                      universal_newlines=True)

            if output_1.stderr.startswith("xtfsutil failed: Path doesn't point to an entity on an XtreemFS volume!"):
                raise NotAXtreemFSVolume("The specified folder '" + path_to_managed_folder +
                                         "' is not part of an XtreemFS volume!")

            if len(output_1.stderr) > 0:
                raise Exception("xtfsutil produced some error: " + output_1.stderr)

52
            self.path_on_volume = div_util.remove_leading_trailing_slashes(
53
                str(output_1.stdout).split("\n")[0].split()[-1])
54
            self.path_to_mount_point = self.managed_folder[0:(len(self.managed_folder) - len(self.path_on_volume) - 1)]
55

56
            output_2 = subprocess.run(["xtfsutil", self.path_to_mount_point],
57
58
                                      stdout=subprocess.PIPE, universal_newlines=True)

Felix Seibert's avatar
Felix Seibert committed
59
            self.volume_information = div_util.extract_volume_information(output_2.stdout)
60
            self.volume_name = self.volume_information[0]
Felix Seibert's avatar
Felix Seibert committed
61
            osd_list = list(map(lambda x: x[0], self.volume_information[1]))
62
63
            self.osd_selection_policy = self.volume_information[2]
            self.volume_address = self.volume_information[3]
64

Felix Seibert's avatar
Felix Seibert committed
65
            self.distribution = None
66
            if not self.__read_configuration():
Felix Seibert's avatar
Felix Seibert committed
67
                self.distribution = dataDistribution.DataDistribution()
68

Felix Seibert's avatar
Felix Seibert committed
69
            self.distribution.add_osd_list(osd_list)
Felix Seibert's avatar
Felix Seibert committed
70

71
            self.osd_information = None
Felix Seibert's avatar
Felix Seibert committed
72
73

            try:
74
                answer = request.urlopen(div_util.get_http_address(self.volume_address))
Felix Seibert's avatar
Felix Seibert committed
75
76
                html_data = answer.read().decode('UTF-8')

Felix Seibert's avatar
Felix Seibert committed
77
                parser = dirstatuspageparser.DIRStatusPageParser()
Felix Seibert's avatar
Felix Seibert committed
78
79
80
81
82
83
                parser.feed(html_data)

                # filter out data sets without last update time or wrong service type
                filtered_data_sets = list(filter(lambda x: int(x['last updated'].split()[0]) != 0, parser.dataSets))
                filtered_data_sets = list(filter(lambda x: x['type'] == 'SERVICE_TYPE_OSD', filtered_data_sets))

84
                self.osd_information = {}
85

Felix Seibert's avatar
Felix Seibert committed
86
87
88
89
90
91
                for data_set in filtered_data_sets:
                    uuid = data_set['uuid']
                    current_osd = {}
                    current_osd['usable_space'] = int(data_set['usable'].split()[0])
                    current_osd['total_space'] = int(data_set['total'].split()[0])

92
                    self.osd_information[uuid] = current_osd
Felix Seibert's avatar
Felix Seibert committed
93
94
95

            except urllib.error.URLError as error:
                print("osd information could not be fetched! Probably the http status page could not be found at:",
96
                      div_util.get_http_address(self.volume_address))
Felix Seibert's avatar
Felix Seibert committed
97
                print(error)
98
        else:
Felix Seibert's avatar
Felix Seibert committed
99
            try:
100
101
102
103
                self.path_on_volume = value_map['path_on_volume']
                self.path_to_mount_point = value_map['path_to_mount']
                self.volume_name = value_map['volume_name']
                self.osd_selection_policy = value_map['osd_selection_policy']
Felix Seibert's avatar
Felix Seibert committed
104
                self.distribution = value_map['data_distribution']
105
106
                self.volume_address = value_map['volume_address']
                self.osd_information = value_map['osd_information']
Felix Seibert's avatar
Felix Seibert committed
107
108
109
            except KeyError as error:
                print('key not found:', error)
                print('leaving in OSDManager field empty!')
110
111

    def __read_configuration(self):
Felix Seibert's avatar
Felix Seibert committed
112
        assert self.distribution is None
113
        path_to_config = os.path.join(self.managed_folder, self.config_file)
114
115
        try:
            f = open(path_to_config, "rb")
Felix Seibert's avatar
Felix Seibert committed
116
            self.distribution = pickle.load(f)
117
118
119
120
121
            return True
        except IOError:
            return False

    def __write_configuration(self):
122
        path_to_config = os.path.join(self.managed_folder, self.config_file)
123
        f = open(path_to_config, "wb")
Felix Seibert's avatar
Felix Seibert committed
124
        pickle.dump(self.distribution, f)
125

126
127
128
    def create_distribution_from_existing_files(self,
                                                fix_layout_internally=True, max_files_in_progress=10000,
                                                apply_layout=True,
129
130
                                                environment='LOCAL',
                                                movement_strategy='osd_balanced'):
Felix Seibert's avatar
Felix Seibert committed
131
132
133
134
135
136
        """
        create a good data distribution out of data already present in the file system.
        the created data distribution will then be transferred to the physical layer,
        i.e., all files will be moved to their corresponding OSD,
        using XtreemFS' read-only replication strategy.
        """
137
138
        start_time = time.time()

139
140
141
142
143
144
        if self.debug:
            print("creating distribution from existing files. osd manager: " + str(self))

        if not div_util.check_for_executable('du'):
            raise ExecutableNotFoundException("No du found. Please make sure it is contained in your PATH.")

145
146
        existing_folders = self.get_depth_2_subdirectories()
        new_folders = []
147
148
149
150
        for one_folder in existing_folders:
            du = subprocess.run(["du", "-s", one_folder], stdout=subprocess.PIPE,
                                universal_newlines=True)
            folder_size = int(du.stdout.split()[0])
151
152
            if folder_size == 0:
                folder_size = 1
153
            new_folder = folder.Folder(self.get_path_on_volume(one_folder),
154
155
                                       folder_size,
                                       None)
156
            new_folders.append(new_folder)
157

158
        new_assignments = self.distribution.add_folders(new_folders, debug=self.debug)
159
160
161
162
163

        if apply_layout:
            self.apply_osd_assignments(new_assignments)
        elif self.debug:
            print("NOT applying data layout!")
164
165
        if apply_layout:
            self.__write_configuration()
166
167
168
169
170

        if self.debug:
            print("osd manager after new folders have been added to data distribution:")
            print(str(self))

171
172
173
174
175
176
        if self.debug:
            total_time = round(time.time() - start_time)
            print("calculated distribution on existing files in secs: " + str(total_time))

        start_time = time.time()

177
        if fix_layout_internally:
178
179
180
            placement_realizer = \
                physicalPlacementRealizer.PhysicalPlacementRealizer(self, debug=self.debug,
                                                                    max_files_in_progress=max_files_in_progress)
181
            placement_realizer.realize_placement(strategy=movement_strategy)
182
        else:
183
            if environment == 'SLURM':
184
185
186
187
188
                osd_list = self.distribution.get_osd_list()
                osd_to_folders_map = {}
                all_folders = []
                for osd in osd_list:
                    osd_to_folders_map[osd] = []
Felix Seibert's avatar
fix    
Felix Seibert committed
189
                    for osds_folder in self.distribution.OSDs[osd].folders:
190
191
                        all_folders.append(osds_folder)
                for input_folder in all_folders:
Felix Seibert's avatar
fix    
Felix Seibert committed
192
                    osd_for_tile = self.distribution.get_containing_osd(input_folder).uuid
193
194
195
                    osd_to_folders_map[osd_for_tile].append(input_folder)

                move_commands = self.__generate_move_commands_slurm(osd_to_folders_map)
196
                self.__execute_commands(move_commands)
197
198
            else:
                self.fix_physical_layout_externally()
199

200
201
202
203
        if self.debug:
            total_time = round(time.time() - start_time)
            print("fixed physical layout of existing files in secs: " + str(total_time))

204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
    def rebalance_existing_assignment(self,
                                      rebalance_algorithm='lpt',
                                      fix_layout_internally=True, max_files_in_progress=10000,
                                      environment='LOCAL',
                                      movement_strategy='osd_balanced'):
        if self.debug:
            print("rebalancing existing distribution... osd manager: \n" + str(self))

        start_time = time.time()
        self.update()
        update_time = round(time.time() - start_time)
        if self.debug:
            print("updated folder sizes in secs: " + str(update_time))

        start_time = time.time()

        if rebalance_algorithm is 'rebalance_one':
            movements = self.distribution.rebalance_one_folder()
        elif rebalance_algorithm is 'two_step_opt':
            movements = self.distribution.rebalance_two_steps_optimal_matching()
        elif rebalance_algorithm is 'two_step_rnd':
            movements = self.distribution.rebalance_two_steps_random_matching()
        else:
            movements = self.distribution.rebalance_lpt()

        if self.debug:
            rebalance_time = round(time.time() - start_time)
            print("rebalanced assignment in secs: " + str(rebalance_time))

233
234
235
            print("movements:")
            print(str(movements))

236
237
238
239
240
241
            total_movement_size = 0
            for folder_id in movements:
                total_movement_size += self.distribution.get_folder_size(folder_id)

            print("total movement size: " + str(total_movement_size))

242
243
            print("rebalanced osd manager: \n" + str(self))

244
245
246
        new_assignments = list(map(lambda key, value : (key, value[1]), list(movements.items())))
        self.apply_osd_assignments(new_assignments)

247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
        start_time = time.time()

        if fix_layout_internally:
            # TODO use movements to make realize_placement more efficient
            # TODO (for the first calculation of files that need to be moved)
            placement_realizer = \
                physicalPlacementRealizer.PhysicalPlacementRealizer(self, debug=self.debug,
                                                                    max_files_in_progress=max_files_in_progress)
            placement_realizer.realize_placement(strategy=movement_strategy)

        elif environment == 'SLURM':
            target_balanced = 1  # 0 is origin balanced, 1 is target balanced
            # we use self.__generate_move_commands_slurm() to do so.
            # this either ignores the origin or the target information.
            # TODO can we do this more intelligently?
            osd_to_folders_map = {}
            for folder_to_move in movements.keys():
                # we have a choice: origin or target balanced.
                osd_id = movements[folder_to_move][target_balanced]
                if osd_id in osd_to_folders_map:
                    osd_to_folders_map[osd_id].append(folder_to_move)
                else:
                    osd_to_folders_map[osd_id] = [folder_to_move]

            move_commands = self.__generate_move_commands_slurm(osd_to_folders_map)
            self.__execute_commands(move_commands)

        else:
            self.fix_physical_layout_externally()

277
278
        self.__write_configuration()

279
280
281
282
        if self.debug:
            total_time = round(time.time() - start_time)
            print("fixed physical layout of existing files in secs: " + str(total_time))

283
    def fix_physical_layout_externally(self):
Felix Seibert's avatar
Felix Seibert committed
284
285
286
287
        """
        fixes the physical layout, such that it matches the data distribution described in self.distribution.
        this is realized by calling move_folder_to_osd on all folders managed by this distribution.
        """
288
289
        if self.debug:
            print("fixing physical layout externally...")
290
291
        managed_folders = self.get_assigned_folder_ids()
        for folder_id in managed_folders:
292
293
294
            osd_for_folder = self.distribution.get_containing_osd(folder_id)
            self.move_folder_to_osd(folder_id, osd_for_folder.uuid)

295
    def create_empty_folders(self, folders):
Felix Seibert's avatar
Felix Seibert committed
296
297
298
        """
        create empty folders and assign OSDs.
        """
Felix Seibert's avatar
Felix Seibert committed
299
        average_size = int(self.distribution.get_average_folder_size())
300
301
302
303
304
305
        if average_size <= 0:
            average_size = 1

        tiles = []

        for input_folder in folders:
Felix Seibert's avatar
Felix Seibert committed
306
            new_tile = folder.Folder(self.get_path_on_volume(input_folder), average_size, None)
307
308
            tiles.append(new_tile)

Felix Seibert's avatar
Felix Seibert committed
309
        new_tiles = self.distribution.add_folders(tiles)
310
311
312
313
314
315
316
317

        self.apply_osd_assignments(new_tiles)

        for input_folder in folders:
            os.makedirs(input_folder, exist_ok=True)

        self.__write_configuration()

Felix Seibert's avatar
Felix Seibert committed
318
    def copy_folders(self, folders, environment='LOCAL', remote_source=None, sshfs_mount_dir='/tmp/sshfs_tmp_mnt',
319
                     apply_layout=True, execute_copy=True, random_osd_assignment=False, random_seed=None):
Felix Seibert's avatar
Felix Seibert committed
320
321
322
323
        """
        copy a list of given folders into the managed folder, assigning OSDs to new folders and updating
        self.dataDistribution
        """
324
325
326
327
328
329
330
331
332
333
        if self.debug:
            print("calling copy_folders with:")
            print("folders: " + str(folders))
            print("environment: " + str(environment))
            print("remote_source: " + str(remote_source))
            print("sshfs_mount_dir: " + str(sshfs_mount_dir))
            print("apply_layout: " + str(apply_layout))
            print("execute_copy: " + str(execute_copy))
            print("random_osd_assignemnt: " + str(random_osd_assignment))

334
335
336
337
338
339
340
341
342
343
344
345
        if not div_util.check_for_executable('du'):
            raise ExecutableNotFoundException("No du found. Please make sure it is contained in your PATH.")

        if remote_source is not None:
            if not div_util.check_for_executable('sshfs'):
                raise ExecutableNotFoundException("No sshfs found. Please make sure it is contained in your PATH.")
            if not div_util.check_for_executable('scp'):
                raise ExecutableNotFoundException("No scp found. Please make sure it is contained in your PATH.")
            if not div_util.check_for_executable('fusermount'):
                raise ExecutableNotFoundException("No fusermount found. Please make sure it is contained in your PATH.")

        if remote_source is not None:
Felix Seibert's avatar
Felix Seibert committed
346
            os.makedirs(sshfs_mount_dir, exist_ok=True)
347
348
349
350
351
352
353
354

        new_folders = []

        for input_folder in folders:
            last_2_path_elements = os.path.join(os.path.split(os.path.split(input_folder)[0])[1],
                                                os.path.split(input_folder)[1])
            if remote_source is not None:
                mount_point = os.path.join(sshfs_mount_dir, last_2_path_elements)
Felix Seibert's avatar
Felix Seibert committed
355
                os.makedirs(mount_point, exist_ok=True)
356
357
358
359
                subprocess.run(["sshfs", remote_source + ":" + input_folder, mount_point])
                du = subprocess.run(["du", "-s", mount_point], stdout=subprocess.PIPE,
                                    universal_newlines=True)
                subprocess.run(["fusermount", "-uz", mount_point])
Felix Seibert's avatar
Felix Seibert committed
360
                shutil.rmtree(mount_point)
361
362
363
364
365
366
                folder_size = int(du.stdout.split()[0])
            else:
                du = subprocess.run(["du", "-s", input_folder], stdout=subprocess.PIPE,
                                    universal_newlines=True)
                folder_size = int(du.stdout.split()[0])

367
            # as the folder_id is generated from the copy source, we cannot call get_path_on_volume to get the foler_id
368
            new_folder = folder.Folder(os.path.join(self.volume_name, self.path_on_volume, last_2_path_elements),
369
370
371
372
373
374
375
376
377
                                       folder_size,
                                       input_folder)
            if self.debug:
                print("new folder: " + str(new_folder))

            new_folders.append(new_folder)

        if remote_source is not None:
            shutil.rmtree(sshfs_mount_dir)
Felix Seibert's avatar
Felix Seibert committed
378
379
        if self.debug:
            print("OSDManager: random_osd_assignment: " + str(random_osd_assignment))
380

381
382
        new_assignments = self.distribution.add_folders(new_folders, random_osd_assignment=random_osd_assignment,
                                                        random_seed=random_seed)
Felix Seibert's avatar
Felix Seibert committed
383
384
385
386
        if apply_layout:
            self.apply_osd_assignments(new_assignments)
        elif self.debug:
            print("NOT applying data layout!")
387

Felix Seibert's avatar
Felix Seibert committed
388
389
        if apply_layout:
            self.__write_configuration()
390
391
392
393
394

        if self.debug:
            print("osd manager after new folders have been added to data distribution:")
            print(str(self))

395
396
        if execute_copy:
            self.__copy_data(new_folders, environment, remote_source)
397

398
399
400
401
402
    def __generate_move_commands_slurm(self, osd_to_folders_map, tmp_dir=None):
        if self.debug:
            print("Using SLURM mode for moving folders...")

        if tmp_dir is None:
403
            tmp_dir = os.path.join(self.path_to_mount_point, '.tmp_move_folder')
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
        os.makedirs(tmp_dir, exist_ok=True)

        slurm_hosts = div_util.get_slurm_hosts()

        if self.debug:
            print('slurm_hosts: ', slurm_hosts)

        osd_to_host_map = div_util.get_osd_to_hostname_map(self.volume_information[1], slurm_hosts)

        if self.debug:
            print('osd_to_host_map: ', osd_to_host_map)

        command_list = []

        host_name = ""
        for key in osd_to_folders_map.keys():
            if host_name == "":
                host_name = osd_to_host_map[key]
            command = ""
            for move_folder in osd_to_folders_map[key]:
424
                folder_path = self.get_absolute_file_path(move_folder)
Felix Seibert's avatar
Felix Seibert committed
425
                folder_tmp_path = os.path.join(tmp_dir, os.path.split(move_folder)[1])
426
427

                # move folder to temporary location
428
                command += "srun -N1-1 --nodelist=" + host_name
429
430
                command += " mv " + folder_path + " " + tmp_dir + " ; "
                # copy folder back from temporary location to initial location
431
432
433
                command += "srun -N1-1 --nodelist=" + host_name
                command += " cp -r " + folder_tmp_path + " " + os.path.split(folder_path)[0] + " ; "
                # delete folder from temporary location
434
                command += "srun -N1-1 --nodelist=" + host_name
435
                command += " rm -r " + folder_tmp_path + " ; "
436
437

            if len(command) > 0:
438
                command_list.append(command)
439

440
441
        return command_list

442
    def move_folder_to_osd(self, folder_id: str, new_osd_id: str, tmp_dir=None):
Felix Seibert's avatar
Felix Seibert committed
443
444
445
        """
        moves a folder from one OSD to another OSD. you may specify a temporary folder.
        """
Felix Seibert's avatar
Felix Seibert committed
446
447
448
        folder_path = os.path.join(self.get_target_dir(folder_id),
                                   os.path.split(folder_id)[1])

449
        if tmp_dir is None:
450
            tmp_dir = os.path.join(self.path_to_mount_point, '.tmp_move_folder')
451

452
453
454
455
456
457
458
459
        start_time = 0
        if self.debug:
            start_time = time.time()

        if self.debug:
            print("externally moving folder " + folder_id + " to osd: " + new_osd_id)

        os.makedirs(tmp_dir, exist_ok=True)
Felix Seibert's avatar
Felix Seibert committed
460
461
462
463

        if not div_util.check_for_executable('xtfsutil'):
            raise ExecutableNotFoundException("No xtfsutil found. Please make sure it is contained in your PATH.")
        # step 1: add folder to new OSD, update data distribution and xtreemfs configuration
Felix Seibert's avatar
Felix Seibert committed
464
        self.distribution.assign_new_osd(folder_id, new_osd_id)
Felix Seibert's avatar
Felix Seibert committed
465
466
467
        if self.debug:
            subprocess.run(["xtfsutil",
                            "--set-pattr", "1004.filenamePrefix", "--value",
468
                            "add " + folder_id + " " + new_osd_id + "", self.path_to_mount_point])
Felix Seibert's avatar
Felix Seibert committed
469
470
471
        else:
            subprocess.run(["xtfsutil",
                            "--set-pattr", "1004.filenamePrefix", "--value",
472
                            "add " + folder_id + " " + new_osd_id + "", self.path_to_mount_point],
Felix Seibert's avatar
Felix Seibert committed
473
474
475
476
477
478
479
480
481
                           stdout=subprocess.PIPE, universal_newlines=True)

        # step 2: one by one, move files to tmp_location and then back to the folder, which means that they should now
        # be located onto the new OSD.

        for root, dirs, files in os.walk(folder_path):
            for file in files:
                current_file_path = os.path.join(root, file)
                copied_file_path = os.path.join(tmp_dir, file)
482
483
                shutil.move(current_file_path, copied_file_path)
                # os.remove(current_file_path)
484
485
                shutil.copy(copied_file_path, os.path.split(current_file_path)[0])
                os.remove(copied_file_path)
Felix Seibert's avatar
Felix Seibert committed
486
487
488

        shutil.rmtree(tmp_dir, ignore_errors=True)

489
490
491
492
493
        if self.debug:
            total_time = time.time() - start_time
            print("externally moved folder " + folder_id +
                  " to osd: " + new_osd_id + " in secs: " + str(round(total_time)))

Felix Seibert's avatar
Felix Seibert committed
494
    def remove_folder(self, folder_id):
Felix Seibert's avatar
Felix Seibert committed
495
496
497
        """
        removes a folder from the distribution. this does NOT delete the folder from the file system.
        """
Felix Seibert's avatar
Felix Seibert committed
498
        containing_osd = self.distribution.get_containing_osd(folder_id)
Felix Seibert's avatar
Felix Seibert committed
499
500
501
502
503
504
505
506
        if containing_osd is not None:
            if not div_util.check_for_executable('xtfsutil'):
                raise ExecutableNotFoundException("No xtfsutil found. Please make sure it is contained in your PATH.")

            containing_osd.remove_folder(folder_id)
            if self.debug:
                subprocess.run(["xtfsutil",
                                "--set-pattr", "1004.filenamePrefix", "--value",
507
                                "remove " + folder_id + "", self.path_to_mount_point])
Felix Seibert's avatar
Felix Seibert committed
508
509
510
            else:
                subprocess.run(["xtfsutil",
                                "--set-pattr", "1004.filenamePrefix", "--value",
511
                                "remove " + folder_id + "", self.path_to_mount_point],
Felix Seibert's avatar
Felix Seibert committed
512
513
                               stdout=subprocess.PIPE, universal_newlines=True)

514
    def update(self, arg_folders=None):
Felix Seibert's avatar
Felix Seibert committed
515
516
517
518
519
        """
        update the given (by absolute path) folders, such that the values held by self.dataDistribution
        matches their size on disk.
        if no argument is given, all folders are updated.
        """
520
521
        if arg_folders is not None:
            for folder_for_update in arg_folders:
522
                if not folder_for_update.startswith(self.managed_folder):
523
524
525
526
527
528
529
530
                    raise PathNotManagedException(
                        "The path :" + folder_for_update + "is not managed by this instance of the XtreemFS OSD"
                                                           "manager!")

        folder_size_updates = {}

        folders = arg_folders
        if arg_folders is None:
531
            folders = self.get_depth_2_subdirectories()
532
533

        for folder_for_update in folders:
Felix Seibert's avatar
Felix Seibert committed
534
            folder_id = self.get_path_on_volume(folder_for_update)
535
536
537
538
539
540
541
542
            command = ["du", "-s", folder_for_update]
            if self.debug:
                print("executing: " + str(command))
            du = subprocess.run(command, stdout=subprocess.PIPE, universal_newlines=True)
            folder_disk_size = int(du.stdout.split()[0])
            folder_size_updates[folder_id] = folder_disk_size

        for folder_for_update, size in folder_size_updates.items():
Felix Seibert's avatar
Felix Seibert committed
543
            self.distribution.update_folder(folder_for_update, size)
544
545
546
547
548
549
550

        self.__write_configuration()

        if self.debug:
            print(str(self))

    def apply_osd_assignments(self, assignments):
Felix Seibert's avatar
Felix Seibert committed
551
552
553
554
555
        """
        apply the given assignments to the XtreemFS volume, using xtfsutil.
        the assignments are given as a list containing tuples (tile_id, osd),
        where tile_id is given by applying path_on_volume() onto the absolute path of the folder.
        """
556
557
558
        if not div_util.check_for_executable('xtfsutil'):
            raise ExecutableNotFoundException("No xtfsutil found. Please make sure it is contained in your PATH.")

559
        if self.osd_selection_policy is not "1000,1004":
560
            if self.debug:
561
                subprocess.run(["xtfsutil", "--set-osp", "prefix", self.path_to_mount_point])
562
            else:
563
                subprocess.run(["xtfsutil", "--set-osp", "prefix", self.path_to_mount_point],
564
565
566
567
568
569
570
                               stdout=subprocess.PIPE, universal_newlines=True)

        for new_tile in assignments:
            if self.debug:
                subprocess.run(["xtfsutil",
                                "--set-pattr", "1004.filenamePrefix", "--value",
                                "add " + new_tile[0] + " " + new_tile[1] + "",
571
                                self.path_to_mount_point])
572
573
574
575
            else:
                subprocess.run(["xtfsutil",
                                "--set-pattr", "1004.filenamePrefix", "--value",
                                "add " + new_tile[0] + " " + new_tile[1] + "",
576
                                self.path_to_mount_point],
577
578
579
                               stdout=subprocess.PIPE, universal_newlines=True)

    def __copy_data(self, input_folders, environment, remote_source):
Felix Seibert's avatar
Felix Seibert committed
580
581
582
        """
        copy data onto XtreemFS volume
        """
Felix Seibert's avatar
Felix Seibert committed
583
584
        if self.debug:
            print('calling copy_data with: ', input_folders, environment, remote_source)
Felix Seibert's avatar
Felix Seibert committed
585
        osd_list = self.distribution.get_osd_list()
586
587
588
589
        osd_to_folders_map = {}
        for osd in osd_list:
            osd_to_folders_map[osd] = []
        for input_folder in input_folders:
Felix Seibert's avatar
Felix Seibert committed
590
            osd_for_tile = self.distribution.get_containing_osd(input_folder.id).uuid
591
592
593
594
595
596
597
598
599
600
601
            osd_to_folders_map[osd_for_tile].append(input_folder)

        if self.debug:
            print("osd to folders map:")
            for key, value in osd_to_folders_map.items():
                print("osd: " + key)
                print("assigned folders:")
                for input_folder in value:
                    print(str(input_folder))

        # trigger the copying!
Felix Seibert's avatar
Felix Seibert committed
602
        if environment == "SLURM":
603
604
            assert remote_source is not None
            copy_commands = self.__generate_copy_commands_slurm(osd_to_folders_map, remote_source)
Felix Seibert's avatar
Felix Seibert committed
605
        elif environment == "HU_CLUSTER":
606
607
608
609
610
611
612
613
            copy_commands = self.__generate_copy_commands_hu_cluster(osd_to_folders_map)
        else:
            copy_commands = self.__generate_copy_commands_local(osd_to_folders_map)

        for input_folder in input_folders:
            path_to_target_dir = self.get_target_dir(input_folder.id)
            os.makedirs(path_to_target_dir, exist_ok=True)

614
        self.__execute_commands(copy_commands)
615
616

    def __generate_copy_commands_slurm(self, osd_to_folders_map, remote_source):
Felix Seibert's avatar
Felix Seibert committed
617
618
619
620
621
622
623
        """
        generates a list of copy commands, one command for each OSD that receives new data.
        the copy commands are constructed such that they can be executed in a slurm environment (that is, within a slurm job
        allocation) at ZIB.
        each command is a copy command including all new folders for the corresponding OSD, preceded by a srun command
        to execute the copy command locally on the slurm node on which the target OSD resides.
        """
624
625
626
        if self.debug:
            print("Using SLURM mode for copying...")

Felix Seibert's avatar
Felix Seibert committed
627
628
629
630
631
632
633
634
635
        slurm_hosts = div_util.get_slurm_hosts()

        if self.debug:
            print('slurm_hosts: ', slurm_hosts)

        osd_to_host_map = div_util.get_osd_to_hostname_map(self.volume_information[1], slurm_hosts)

        if self.debug:
            print('osd_to_host_map: ', osd_to_host_map)
636
637
638

        command_list = []

639
        host_name = ""
640
        # command = ""
641
        for key in osd_to_folders_map.keys():
642
643
            if host_name == "":
                host_name = osd_to_host_map[key]
644
645
646
647
648
649
650
651
            command = ""
            for copy_folder in osd_to_folders_map[key]:
                command += "srun -N1-1 --nodelist=" + host_name
                command += " scp -rq " + remote_source + ":" + copy_folder.origin
                command += " " + self.get_target_dir(copy_folder.id)
                command += " ;"
            if len(osd_to_folders_map[key]) > 0:
                command_list.append(command)
652
        # command_list.append(command)
653
654
655
        return command_list

    def __generate_copy_commands_hu_cluster(self, osd_to_folders_map):
Felix Seibert's avatar
Felix Seibert committed
656
657
658
659
660
661
        """
        generates a list of copy commands, one command for each OSD that receives new data.
        the copy commands are constructed such that they can be executed on the GeoMultiSens cluster at HU Berlin.
        each command is a copy command including all new folders for the corresponding OSD, preceded by a ssh command
        to execute the copy command locally on the node of the target OSD.
        """
662
663
664
665
666
667
        if self.debug:
            print("Using HU_CLUSTER mode for copying...")

        if not div_util.check_for_executable('xtfsutil'):
            raise ExecutableNotFoundException("No xtfsutil found. Please make sure it is contained in your PATH.")

668
        xtfsutil = subprocess.run(["xtfsutil", self.path_to_mount_point],
669
                                  stdout=subprocess.PIPE, universal_newlines=True)
Felix Seibert's avatar
Felix Seibert committed
670
        volume_information = div_util.extract_volume_information(xtfsutil.stdout)
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
        osd_to_ip_address = {}
        for (osd, ip) in volume_information[1]:
            osd_to_ip_address[osd] = ip

        command_list = []

        for key in osd_to_folders_map.keys():
            ip_address = osd_to_ip_address[key]
            command = "ssh " + ip_address + " \'"
            for copy_folder in osd_to_folders_map[key]:
                command += " cp -r"
                command += " " + copy_folder.origin
                command += " " + self.get_target_dir(copy_folder.id)
                command += " ;"
            command += " \' "
            if len(osd_to_folders_map[key]) > 0:
                command_list.append(command)

        return command_list

    def __generate_copy_commands_local(self, osd_to_folders_map):
Felix Seibert's avatar
Felix Seibert committed
692
693
694
695
        """
        generates a list of copy commands, one command for each OSD that receives new data.
        plain old cp is used for the actual copying.
        """
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
        if self.debug:
            print("Using local cp for copying...")

        command_list = []

        for key in osd_to_folders_map.keys():
            command = ""
            for copy_folder in osd_to_folders_map[key]:
                command += "cp -r "
                command += copy_folder.origin
                command += " " + self.get_target_dir(copy_folder.id)
                command += " ; "

            if len(osd_to_folders_map[key]) > 0:
                command_list.append(command)

        return command_list

714
    def __execute_commands(self, command_list):
Felix Seibert's avatar
Felix Seibert committed
715
716
717
718
        """
        execute, in parallel, a given set of commands. note that the degree of parallelism will match the length of
        command_list.
        """
719
        if self.debug:
720
            print("Executing commands: ")
721
722
723
724
725
            for command in command_list:
                print(str(command))
            print("in total " + str(len(command_list)) + " commands.")
        processes = []
        for command in command_list:
Felix Seibert's avatar
Felix Seibert committed
726
            process = subprocess.Popen(command, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True)
727
            time.sleep(5)
728
729
730
731
732
733
            processes.append(process)

        for p in processes:
            p.wait()

        if self.debug:
Felix Seibert's avatar
Felix Seibert committed
734
735
            for terminated_process in processes:
                print(str(terminated_process.communicate()))
736
            print("Executing commands done.")
737

738
    def get_depth_2_subdirectories(self):
Felix Seibert's avatar
Felix Seibert committed
739
        """
740
        creates a list of all depth 2 subdirectories of self.managed_folder
Felix Seibert's avatar
Felix Seibert committed
741
        """
742
        subdirectories = []
743
744
        for depth_1_folder in os.listdir(self.managed_folder):
            depth_1_path = os.path.join(self.managed_folder, depth_1_folder)
745
746
            if os.path.isdir(depth_1_path):
                for depth_2_folder in os.listdir(depth_1_path):
747
                    depth_2_path = os.path.join(self.managed_folder, depth_1_folder, depth_2_folder)
748
                    if os.path.isdir(depth_2_path):
749
750
751
                        subdirectories.append(depth_2_path)

        return subdirectories
752

753
754
755
756
757
758
759
760
761
    def get_assigned_folder_ids(self):
        """
        creates a list of ids of all assigned folders (folders assigned to OSDs)
        """
        osd_list = self.distribution.get_osd_list()
        assigned_folders = []
        for osd in osd_list:
            for one_folder in self.distribution.OSDs[osd].folders:
                assigned_folders.append(one_folder)
762
763
764
        return assigned_folders

    def get_target_dir(self, folder_id):
Felix Seibert's avatar
Felix Seibert committed
765
        """
766
        gets the path to the target dir (where to copy the folder), given the folder_id
Felix Seibert's avatar
Felix Seibert committed
767
        """
768
        return os.path.split(self.get_absolute_file_path(folder_id))[0]
769

770
    def get_path_on_volume(self, path):
Felix Seibert's avatar
Felix Seibert committed
771
772
773
774
        """
        remove the leading part of the path, such that only the part onto the xtreemfs volume remains, including
        the volume itself.
        throws an exception when the path is not managed by this XtreemFS OSD manager.
775
        use this method to calculate the folder_id.
Felix Seibert's avatar
Felix Seibert committed
776
        """
777
        if not path.startswith(self.managed_folder):
778
779
            raise PathNotManagedException("Path " + path + " is not managed by this instance of the XtreemFS OSD"
                                                           "manager!")
780
        return os.path.join(self.volume_name, path[len(self.path_to_mount_point) + 1:])
781

782
783
784
    def get_absolute_file_path(self, folder_id):
        return os.path.join(self.path_to_mount_point, folder_id[len(self.volume_name) + 1:])

785
    def get_containing_folder_id(self, path_on_volume):
Felix Seibert's avatar
Felix Seibert committed
786
787
788
        """
        search for the assigned folder that is a prefix of the given path on volume
        """
Felix Seibert's avatar
Felix Seibert committed
789
        for osd in self.distribution.OSDs.values():
790
            for a_folder in osd.folders:
791
792
                if path_on_volume.startswith(a_folder):
                    return a_folder
793
794
        return None

795
    def __str__(self):
796
797
798
        representation = "pathToMountPoint: " + self.path_to_mount_point + " volumeName: " + self.volume_name + " pathOnVolume: " \
                         + self.path_on_volume
        representation += ("\nconfigFile: " + self.config_file + "\n")
Felix Seibert's avatar
Felix Seibert committed
799
        representation += self.distribution.description()
800
801
802
803
804
805
806
807
808
809
810
811
812
        return representation


class ExecutableNotFoundException(Exception):
    """raise this when an external executable can not be found"""


class NotAXtreemFSVolume(Exception):
    """raise this when a path does not point to a folder on a xtreemfs volume"""


class PathNotManagedException(Exception):
    """raise this when a path is handled, that is not managed by xOSDManager"""