OSDManager.py 38.5 KB
Newer Older
1
2
3
import os
import pickle
import subprocess
Felix Seibert's avatar
Felix Seibert committed
4
5
from urllib import request
import urllib.error
6
import shutil
7
import time
Felix Seibert's avatar
Felix Seibert committed
8
import datetime
9
import random
10

Felix Seibert's avatar
Felix Seibert committed
11
12
13
14
from xtreemfs_client import dataDistribution
from xtreemfs_client import div_util
from xtreemfs_client import folder
from xtreemfs_client import dirstatuspageparser
15
16
17
18

'''
xOSDManager - a python module to manage OSD selection in XtreemFS
currently only depth (level) 2 subdirectories can be managed
19
only unix-based OSs are supported
20
21
'''

22
23
24
max_processes_change_policy = 200
max_processes_add_replica = 200
max_processes_delete_replica = 200
25

26

27
class OSDManager(object):
Felix Seibert's avatar
Felix Seibert committed
28
29
    # TODO add support for arbitrary subdirectory level
    # (currently depth=2 is hardcoded, which is fine for GeoMultiSens purposes)
30
31
    def __init__(self, path_to_managed_folder, config_file='.das_config', value_map=None, debug=False):

32
33
        self.managed_folder = path_to_managed_folder
        self.config_file = config_file
34
35
36
37
38
39
40
        self.debug = debug

        if value_map is None:

            if not div_util.check_for_executable('xtfsutil'):
                raise ExecutableNotFoundException("No xtfsutil found. Please make sure it is contained in your PATH.")

41
            output_1 = subprocess.run(["xtfsutil", self.managed_folder], stdout=subprocess.PIPE, stderr=subprocess.PIPE,
42
43
44
45
46
47
48
49
50
                                      universal_newlines=True)

            if output_1.stderr.startswith("xtfsutil failed: Path doesn't point to an entity on an XtreemFS volume!"):
                raise NotAXtreemFSVolume("The specified folder '" + path_to_managed_folder +
                                         "' is not part of an XtreemFS volume!")

            if len(output_1.stderr) > 0:
                raise Exception("xtfsutil produced some error: " + output_1.stderr)

51
            self.path_on_volume = div_util.remove_leading_trailing_slashes(
52
                str(output_1.stdout).split("\n")[0].split()[-1])
53
            self.path_to_mount_point = self.managed_folder[0:(len(self.managed_folder) - len(self.path_on_volume) - 1)]
54

55
            output_2 = subprocess.run(["xtfsutil", self.path_to_mount_point],
56
57
                                      stdout=subprocess.PIPE, universal_newlines=True)

Felix Seibert's avatar
Felix Seibert committed
58
            self.volume_information = div_util.extract_volume_information(output_2.stdout)
59
            self.volume_name = self.volume_information[0]
Felix Seibert's avatar
Felix Seibert committed
60
            osd_list = list(map(lambda x: x[0], self.volume_information[1]))
61
62
            self.osd_selection_policy = self.volume_information[2]
            self.volume_address = self.volume_information[3]
63

Felix Seibert's avatar
Felix Seibert committed
64
            self.distribution = None
65
            if not self.__read_configuration():
Felix Seibert's avatar
Felix Seibert committed
66
                self.distribution = dataDistribution.DataDistribution()
67

Felix Seibert's avatar
Felix Seibert committed
68
            self.distribution.add_osd_list(osd_list)
Felix Seibert's avatar
Felix Seibert committed
69

70
            self.osd_information = None
Felix Seibert's avatar
Felix Seibert committed
71
72

            try:
73
                answer = request.urlopen(div_util.get_http_address(self.volume_address))
Felix Seibert's avatar
Felix Seibert committed
74
75
                html_data = answer.read().decode('UTF-8')

Felix Seibert's avatar
Felix Seibert committed
76
                parser = dirstatuspageparser.DIRStatusPageParser()
Felix Seibert's avatar
Felix Seibert committed
77
78
79
80
81
82
                parser.feed(html_data)

                # filter out data sets without last update time or wrong service type
                filtered_data_sets = list(filter(lambda x: int(x['last updated'].split()[0]) != 0, parser.dataSets))
                filtered_data_sets = list(filter(lambda x: x['type'] == 'SERVICE_TYPE_OSD', filtered_data_sets))

83
                self.osd_information = {}
84

Felix Seibert's avatar
Felix Seibert committed
85
86
87
88
89
90
                for data_set in filtered_data_sets:
                    uuid = data_set['uuid']
                    current_osd = {}
                    current_osd['usable_space'] = int(data_set['usable'].split()[0])
                    current_osd['total_space'] = int(data_set['total'].split()[0])

91
                    self.osd_information[uuid] = current_osd
Felix Seibert's avatar
Felix Seibert committed
92
93
94

            except urllib.error.URLError as error:
                print("osd information could not be fetched! Probably the http status page could not be found at:",
95
                      div_util.get_http_address(self.volume_address))
Felix Seibert's avatar
Felix Seibert committed
96
                print(error)
97
        else:
Felix Seibert's avatar
Felix Seibert committed
98
            try:
99
100
101
102
                self.path_on_volume = value_map['path_on_volume']
                self.path_to_mount_point = value_map['path_to_mount']
                self.volume_name = value_map['volume_name']
                self.osd_selection_policy = value_map['osd_selection_policy']
Felix Seibert's avatar
Felix Seibert committed
103
                self.distribution = value_map['data_distribution']
104
105
                self.volume_address = value_map['volume_address']
                self.osd_information = value_map['osd_information']
Felix Seibert's avatar
Felix Seibert committed
106
107
108
            except KeyError as error:
                print('key not found:', error)
                print('leaving in OSDManager field empty!')
109
110

    def __read_configuration(self):
Felix Seibert's avatar
Felix Seibert committed
111
        assert self.distribution is None
112
        path_to_config = os.path.join(self.managed_folder, self.config_file)
113
114
        try:
            f = open(path_to_config, "rb")
Felix Seibert's avatar
Felix Seibert committed
115
            self.distribution = pickle.load(f)
116
117
118
119
120
            return True
        except IOError:
            return False

    def __write_configuration(self):
121
        path_to_config = os.path.join(self.managed_folder, self.config_file)
122
        f = open(path_to_config, "wb")
Felix Seibert's avatar
Felix Seibert committed
123
        pickle.dump(self.distribution, f)
124

125
    def create_distribution_from_existing_files(self, fix_layout_internally=True, apply_layout=True,
126
                                                environment='LOCAL'):
Felix Seibert's avatar
Felix Seibert committed
127
128
129
130
131
132
        """
        create a good data distribution out of data already present in the file system.
        the created data distribution will then be transferred to the physical layer,
        i.e., all files will be moved to their corresponding OSD,
        using XtreemFS' read-only replication strategy.
        """
133
134
        start_time = time.time()

135
136
137
138
139
140
        if self.debug:
            print("creating distribution from existing files. osd manager: " + str(self))

        if not div_util.check_for_executable('du'):
            raise ExecutableNotFoundException("No du found. Please make sure it is contained in your PATH.")

141
142
        existing_folders = self.get_depth_2_subdirectories()
        new_folders = []
143
144
145
146
        for one_folder in existing_folders:
            du = subprocess.run(["du", "-s", one_folder], stdout=subprocess.PIPE,
                                universal_newlines=True)
            folder_size = int(du.stdout.split()[0])
147
148
            if folder_size == 0:
                folder_size = 1
149
150
151
            new_folder = folder.Folder(self.path_on_volume(one_folder),
                                       folder_size,
                                       None)
152
            new_folders.append(new_folder)
153

154
        new_assignments = self.distribution.add_folders(new_folders)
155
156
157
158
159

        if apply_layout:
            self.apply_osd_assignments(new_assignments)
        elif self.debug:
            print("NOT applying data layout!")
160
161
        if apply_layout:
            self.__write_configuration()
162
163
164
165
166

        if self.debug:
            print("osd manager after new folders have been added to data distribution:")
            print(str(self))

167
168
169
170
171
172
        if self.debug:
            total_time = round(time.time() - start_time)
            print("calculated distribution on existing files in secs: " + str(total_time))

        start_time = time.time()

173
174
175
        if fix_layout_internally:
            self.fix_physical_layout_internally()
        else:
176
            if environment == 'SLURM':
177
178
179
180
181
                osd_list = self.distribution.get_osd_list()
                osd_to_folders_map = {}
                all_folders = []
                for osd in osd_list:
                    osd_to_folders_map[osd] = []
Felix Seibert's avatar
fix    
Felix Seibert committed
182
                    for osds_folder in self.distribution.OSDs[osd].folders:
183
184
                        all_folders.append(osds_folder)
                for input_folder in all_folders:
Felix Seibert's avatar
fix    
Felix Seibert committed
185
                    osd_for_tile = self.distribution.get_containing_osd(input_folder).uuid
186
187
188
                    osd_to_folders_map[osd_for_tile].append(input_folder)

                move_commands = self.__generate_move_commands_slurm(osd_to_folders_map)
189
                self.__execute_commands(move_commands)
190
191
            else:
                self.fix_physical_layout_externally()
192

193
194
195
196
        if self.debug:
            total_time = round(time.time() - start_time)
            print("fixed physical layout of existing files in secs: " + str(total_time))

197
    def fix_physical_layout_externally(self):
Felix Seibert's avatar
Felix Seibert committed
198
199
200
201
        """
        fixes the physical layout, such that it matches the data distribution described in self.distribution.
        this is realized by calling move_folder_to_osd on all folders managed by this distribution.
        """
202
203
        if self.debug:
            print("fixing physical layout externally...")
204
205
        managed_folders = self.get_assigned_folder_ids()
        for folder_id in managed_folders:
206
207
208
            osd_for_folder = self.distribution.get_containing_osd(folder_id)
            self.move_folder_to_osd(folder_id, osd_for_folder.uuid)

209
    def fix_physical_layout_internally(self, repeat_delete_interval_secs=15, iteration=0):
Felix Seibert's avatar
Felix Seibert committed
210
211
212
213
214
215
216
217
218
219
220
221
222
223
        """
        fixes the physical layout, such that it matches the data distribution described in self.distribution
        we use the following strategy: first, determine which files needs to be moved to another OSD, and create three lists.
        for each file to be moved, one contains the command to set the default replication policy (1), one contains the command
        to create a new replica on the target OSD (2) and one contains the command to delete the replica on the original OSD (3).
        the command-lists are executed in order: 1, 2, 3, commands of list i only being executed if all commands of list i-1
        have returned. the commands of any one list are executed in parallel, with an upper bound of 200 parallel executions
        to not violate the maximum number of processes of the OS.

        the commands of list 3 might return an error, as deleting the replica on the original OSD is not possible until
        the new replica on the target OSD is complete. theses commands are collected and re-executed after a certain delay.
        this process is repeated until no error is returned, which should eventually happen (as soon as all new replicas on
        the target OSDs are complete).
        """
224
225
        if self.debug:
            print("starting to fix physical layout...this is fix-iteration " + str(iteration))
226
        # list 1
227
        change_policy_command_list = []
228
        # list 2
229
        create_replica_command_list = []
230
        # list 3
231
232
        delete_replica_command_list = []

233
        # create commands
234
        managed_folders = self.get_depth_2_subdirectories()
235
236
237
238
239
        for managed_folder in managed_folders:
            for directory in os.walk(managed_folder):
                for filename in directory[2]:

                    absolute_file_path = os.path.join(directory[0], filename)
240
                    osds_of_file = div_util.get_osd_uuids(absolute_file_path)
Felix Seibert's avatar
Felix Seibert committed
241
242
                    path_on_volume = self.path_on_volume(absolute_file_path)
                    containing_folder_id = self.get_containing_folder_id(path_on_volume)
Felix Seibert's avatar
Felix Seibert committed
243
                    osd_for_file = self.distribution.get_containing_osd(containing_folder_id).uuid
244

245
246
247
248
249
250
251
252
253
254
255
256
257
                    file_on_correct_osd = False
                    for osd_of_file in osds_of_file:
                        if osd_of_file != osd_for_file:
                            # delete all replicas on wrong OSDs
                            delete_command = div_util.create_delete_replica_command(absolute_file_path, osd_of_file)
                            delete_replica_command_list.append(div_util.command_list_to_single_string(delete_command))
                        else:
                            file_on_correct_osd = True

                    if not file_on_correct_osd and len(osds_of_file) < 2:
                        # only one replica on a wrong OSD => need to set replication policy.
                        # otherwise, there is a unique replica on the correct OSD => no change necessary,
                        # OR there are multiple replicas => replication policy must be set.
258
259
                        policy_command = div_util.create_replication_policy_command(absolute_file_path)
                        change_policy_command_list.append(div_util.command_list_to_single_string(policy_command))
260
261
262
263

                    if not file_on_correct_osd:
                        # create a replica on the correct osd
                        create_command = div_util.create_create_replica_command(absolute_file_path, osd_for_file)
264
                        create_replica_command_list.append(div_util.command_list_to_single_string(create_command))
265

266
267
268
269
270
        # end of recursion condition
        if len(create_replica_command_list) == 0 and len(delete_replica_command_list) == 0:
            return

            # run commands
Felix Seibert's avatar
Felix Seibert committed
271
        start_time = time.time()
Felix Seibert's avatar
Felix Seibert committed
272
273
274
        if self.debug:
            print("starting execution of change policy commands...")
            print(str(datetime.datetime.now()))
275
        errored_processes = div_util.run_commands(change_policy_command_list, max_processes_change_policy)
Felix Seibert's avatar
Felix Seibert committed
276
        end_time = time.time()
277
        if self.debug:
278
            print("executing " + str(len(change_policy_command_list)) + " change policy commands done in " +
Felix Seibert's avatar
Felix Seibert committed
279
                  str(round(end_time - start_time)) + " sec.")
280
            # div_util.print_process_list(processes)
281

282
        start_time = time.time()
Felix Seibert's avatar
Felix Seibert committed
283
284
285
        if self.debug:
            print("starting execution of create replica commands...")
            print(str(datetime.datetime.now()))
286
        random.shuffle(create_replica_command_list)
287
        errored_processes = div_util.run_commands(create_replica_command_list, max_processes_add_replica)
Felix Seibert's avatar
Felix Seibert committed
288
        end_time = time.time()
289
        if self.debug:
290
            print("executing " + str(len(create_replica_command_list)) + " create replica commands done in " +
Felix Seibert's avatar
Felix Seibert committed
291
                  str(round(end_time - start_time)) + " sec.")
292
            # div_util.print_process_list(processes)
293

Felix Seibert's avatar
Felix Seibert committed
294
        start_time = time.time()
Felix Seibert's avatar
Felix Seibert committed
295
296
297
        if self.debug:
            print("starting execution of delete replica commands...")
            print(str(datetime.datetime.now()))
298
299
        errored_processes = div_util.run_commands(delete_replica_command_list, max_processes_delete_replica,
                                                  print_errors=False)
300

301
302
303
304
        # run and repeat delete commands, until they return no error
        # (if an error is returned for another reason than that one would delete the last complete replica,
        # this will probably not work.
        iterations = 0
305
        max_iterations = 5
306
        while True:
307
308
            if iterations >= max_iterations:
                print("results of last iteration: ")
309
                div_util.print_process_list(errored_processes)
310
311
312
313
                print("Original replicas could not be deleted after " + str(max_iterations) + ". Aborting...")
                break

            if self.debug:
314
                print("executing " + str(len(delete_replica_command_list))
315
                      + " delete replica commands done. This is delete-iteration "
316
317
                      + str(iterations))
                # div_util.print_process_list(processes)
318

319
320
            errored_deletions = []

321
            for process in errored_processes:
322
                # check the return code. if it is one, the replica could not be deleted, so we try again later.
323
                if process[2] != 0:
324
                    errored_deletions.append(process[0])
325
                    print("errored command: ")
326
327
328
                    print("command: " + str(process[0]))
                    print("stdoud: " + str(process[1][0]))
                    print("stderr: " + str(process[1][1]))
329
                    print("retcode: " + str(process[2]))
330
331
332
333
334
335

            if len(errored_deletions) == 0:
                break

            time.sleep(repeat_delete_interval_secs)

336
337
338
            if self.debug:
                print("rerunning " + str(len(errored_deletions)) + " commands because replica could not be deleted...")

339
340
            errored_processes = div_util.run_commands(errored_deletions, max_processes_change_policy,
                                                      print_errors=False)
341
            iterations += 1
342

Felix Seibert's avatar
Felix Seibert committed
343
344
345
346
        if self.debug:
            end_time = time.time()
            print("deleting replicas done in in " + str(round(end_time - start_time)) + " sec.")

347
        self.fix_physical_layout_internally(iteration=iteration + 1)
348

349
    def create_empty_folders(self, folders):
Felix Seibert's avatar
Felix Seibert committed
350
351
352
        """
        create empty folders and assign OSDs.
        """
Felix Seibert's avatar
Felix Seibert committed
353
        average_size = int(self.distribution.get_average_folder_size())
354
355
356
357
358
359
360
361
362
        if average_size <= 0:
            average_size = 1

        tiles = []

        for input_folder in folders:
            new_tile = folder.Folder(self.path_on_volume(input_folder), average_size, None)
            tiles.append(new_tile)

Felix Seibert's avatar
Felix Seibert committed
363
        new_tiles = self.distribution.add_folders(tiles)
364
365
366
367
368
369
370
371

        self.apply_osd_assignments(new_tiles)

        for input_folder in folders:
            os.makedirs(input_folder, exist_ok=True)

        self.__write_configuration()

Felix Seibert's avatar
Felix Seibert committed
372
    def copy_folders(self, folders, environment='LOCAL', remote_source=None, sshfs_mount_dir='/tmp/sshfs_tmp_mnt',
373
                     apply_layout=True, execute_copy=True, random_osd_assignment=False):
Felix Seibert's avatar
Felix Seibert committed
374
375
376
377
        """
        copy a list of given folders into the managed folder, assigning OSDs to new folders and updating
        self.dataDistribution
        """
378
379
380
381
382
383
384
385
386
387
        if self.debug:
            print("calling copy_folders with:")
            print("folders: " + str(folders))
            print("environment: " + str(environment))
            print("remote_source: " + str(remote_source))
            print("sshfs_mount_dir: " + str(sshfs_mount_dir))
            print("apply_layout: " + str(apply_layout))
            print("execute_copy: " + str(execute_copy))
            print("random_osd_assignemnt: " + str(random_osd_assignment))

388
389
390
391
392
393
394
395
396
397
398
399
        if not div_util.check_for_executable('du'):
            raise ExecutableNotFoundException("No du found. Please make sure it is contained in your PATH.")

        if remote_source is not None:
            if not div_util.check_for_executable('sshfs'):
                raise ExecutableNotFoundException("No sshfs found. Please make sure it is contained in your PATH.")
            if not div_util.check_for_executable('scp'):
                raise ExecutableNotFoundException("No scp found. Please make sure it is contained in your PATH.")
            if not div_util.check_for_executable('fusermount'):
                raise ExecutableNotFoundException("No fusermount found. Please make sure it is contained in your PATH.")

        if remote_source is not None:
Felix Seibert's avatar
Felix Seibert committed
400
            os.makedirs(sshfs_mount_dir, exist_ok=True)
401
402
403
404
405
406
407
408

        new_folders = []

        for input_folder in folders:
            last_2_path_elements = os.path.join(os.path.split(os.path.split(input_folder)[0])[1],
                                                os.path.split(input_folder)[1])
            if remote_source is not None:
                mount_point = os.path.join(sshfs_mount_dir, last_2_path_elements)
Felix Seibert's avatar
Felix Seibert committed
409
                os.makedirs(mount_point, exist_ok=True)
410
411
412
413
                subprocess.run(["sshfs", remote_source + ":" + input_folder, mount_point])
                du = subprocess.run(["du", "-s", mount_point], stdout=subprocess.PIPE,
                                    universal_newlines=True)
                subprocess.run(["fusermount", "-uz", mount_point])
Felix Seibert's avatar
Felix Seibert committed
414
                shutil.rmtree(mount_point)
415
416
417
418
419
420
                folder_size = int(du.stdout.split()[0])
            else:
                du = subprocess.run(["du", "-s", input_folder], stdout=subprocess.PIPE,
                                    universal_newlines=True)
                folder_size = int(du.stdout.split()[0])

421
            # as the folder_id is generated from the copy source, we cannot call get_path_on_volume to get the foler_id
422
            new_folder = folder.Folder(os.path.join(self.volume_name, self.path_on_volume, last_2_path_elements),
423
424
425
426
427
428
429
430
431
                                       folder_size,
                                       input_folder)
            if self.debug:
                print("new folder: " + str(new_folder))

            new_folders.append(new_folder)

        if remote_source is not None:
            shutil.rmtree(sshfs_mount_dir)
Felix Seibert's avatar
Felix Seibert committed
432
433
        if self.debug:
            print("OSDManager: random_osd_assignment: " + str(random_osd_assignment))
434

Felix Seibert's avatar
Felix Seibert committed
435
        new_assignments = self.distribution.add_folders(new_folders, random_osd_assignment=random_osd_assignment)
Felix Seibert's avatar
Felix Seibert committed
436
437
438
439
        if apply_layout:
            self.apply_osd_assignments(new_assignments)
        elif self.debug:
            print("NOT applying data layout!")
440

Felix Seibert's avatar
Felix Seibert committed
441
442
        if apply_layout:
            self.__write_configuration()
443
444
445
446
447

        if self.debug:
            print("osd manager after new folders have been added to data distribution:")
            print(str(self))

448
449
        if execute_copy:
            self.__copy_data(new_folders, environment, remote_source)
450

451
452
453
454
455
    def __generate_move_commands_slurm(self, osd_to_folders_map, tmp_dir=None):
        if self.debug:
            print("Using SLURM mode for moving folders...")

        if tmp_dir is None:
456
            tmp_dir = os.path.join(self.path_to_mount_point, '.tmp_move_folder')
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
        os.makedirs(tmp_dir, exist_ok=True)

        slurm_hosts = div_util.get_slurm_hosts()

        if self.debug:
            print('slurm_hosts: ', slurm_hosts)

        osd_to_host_map = div_util.get_osd_to_hostname_map(self.volume_information[1], slurm_hosts)

        if self.debug:
            print('osd_to_host_map: ', osd_to_host_map)

        command_list = []

        host_name = ""
        # command = ""
        for key in osd_to_folders_map.keys():
            if host_name == "":
                host_name = osd_to_host_map[key]
            command = ""
            for move_folder in osd_to_folders_map[key]:
478
                folder_path = self.get_absolute_file_path(move_folder)
Felix Seibert's avatar
Felix Seibert committed
479
                folder_tmp_path = os.path.join(tmp_dir, os.path.split(move_folder)[1])
480
481
482
483
484
485
486
487
488
                # copy folder to temporary location
                command += "srun -N1-1 --nodelist=" + host_name
                command += " cp -r  " + folder_path + " " + tmp_dir + " ; "
                # delete folder within the managed xtreemfs folder
                command += "srun -N1-1 --nodelist=" + host_name
                command += " rm -r " + folder_path + " ; "
                # copy folder back from temporary location
                command += " cp -r " + folder_tmp_path + " " + os.path.split(folder_path)[0] + " ; "
                # delete folder from temporary location
489
                command += "srun -N1-1 --nodelist=" + host_name
490
                command += " rm -r " + folder_tmp_path + " ; "
491
492
493
494
495
            if len(osd_to_folders_map[key]) > 0:
                command_list.append(command)
        # command_list.append(command)
        return command_list

496
    def move_folder_to_osd(self, folder_id: str, new_osd_id: str, tmp_dir=None):
Felix Seibert's avatar
Felix Seibert committed
497
498
499
        """
        moves a folder from one OSD to another OSD. you may specify a temporary folder.
        """
Felix Seibert's avatar
Felix Seibert committed
500
501
502
        folder_path = os.path.join(self.get_target_dir(folder_id),
                                   os.path.split(folder_id)[1])

503
        if tmp_dir is None:
504
            tmp_dir = os.path.join(self.path_to_mount_point, '.tmp_move_folder')
505

506
507
508
509
510
511
512
513
        start_time = 0
        if self.debug:
            start_time = time.time()

        if self.debug:
            print("externally moving folder " + folder_id + " to osd: " + new_osd_id)

        os.makedirs(tmp_dir, exist_ok=True)
Felix Seibert's avatar
Felix Seibert committed
514
515
516
517

        if not div_util.check_for_executable('xtfsutil'):
            raise ExecutableNotFoundException("No xtfsutil found. Please make sure it is contained in your PATH.")
        # step 1: add folder to new OSD, update data distribution and xtreemfs configuration
Felix Seibert's avatar
Felix Seibert committed
518
        self.distribution.assign_new_osd(folder_id, new_osd_id)
Felix Seibert's avatar
Felix Seibert committed
519
520
521
        if self.debug:
            subprocess.run(["xtfsutil",
                            "--set-pattr", "1004.filenamePrefix", "--value",
522
                            "add " + folder_id + " " + new_osd_id + "", self.path_to_mount_point])
Felix Seibert's avatar
Felix Seibert committed
523
524
525
        else:
            subprocess.run(["xtfsutil",
                            "--set-pattr", "1004.filenamePrefix", "--value",
526
                            "add " + folder_id + " " + new_osd_id + "", self.path_to_mount_point],
Felix Seibert's avatar
Felix Seibert committed
527
528
529
530
531
532
533
534
535
                           stdout=subprocess.PIPE, universal_newlines=True)

        # step 2: one by one, move files to tmp_location and then back to the folder, which means that they should now
        # be located onto the new OSD.

        for root, dirs, files in os.walk(folder_path):
            for file in files:
                current_file_path = os.path.join(root, file)
                copied_file_path = os.path.join(tmp_dir, file)
536
537
538
539
                shutil.copy(current_file_path, copied_file_path)
                os.remove(current_file_path)
                shutil.copy(copied_file_path, os.path.split(current_file_path)[0])
                os.remove(copied_file_path)
Felix Seibert's avatar
Felix Seibert committed
540
541
542

        shutil.rmtree(tmp_dir, ignore_errors=True)

543
544
545
546
547
        if self.debug:
            total_time = time.time() - start_time
            print("externally moved folder " + folder_id +
                  " to osd: " + new_osd_id + " in secs: " + str(round(total_time)))

Felix Seibert's avatar
Felix Seibert committed
548
    def remove_folder(self, folder_id):
Felix Seibert's avatar
Felix Seibert committed
549
550
551
        """
        removes a folder from the distribution. this does NOT delete the folder from the file system.
        """
Felix Seibert's avatar
Felix Seibert committed
552
        containing_osd = self.distribution.get_containing_osd(folder_id)
Felix Seibert's avatar
Felix Seibert committed
553
554
555
556
557
558
559
560
        if containing_osd is not None:
            if not div_util.check_for_executable('xtfsutil'):
                raise ExecutableNotFoundException("No xtfsutil found. Please make sure it is contained in your PATH.")

            containing_osd.remove_folder(folder_id)
            if self.debug:
                subprocess.run(["xtfsutil",
                                "--set-pattr", "1004.filenamePrefix", "--value",
561
                                "remove " + folder_id + "", self.path_to_mount_point])
Felix Seibert's avatar
Felix Seibert committed
562
563
564
            else:
                subprocess.run(["xtfsutil",
                                "--set-pattr", "1004.filenamePrefix", "--value",
565
                                "remove " + folder_id + "", self.path_to_mount_point],
Felix Seibert's avatar
Felix Seibert committed
566
567
                               stdout=subprocess.PIPE, universal_newlines=True)

568
    def update(self, arg_folders=None):
Felix Seibert's avatar
Felix Seibert committed
569
570
571
572
573
        """
        update the given (by absolute path) folders, such that the values held by self.dataDistribution
        matches their size on disk.
        if no argument is given, all folders are updated.
        """
574
575
        if arg_folders is not None:
            for folder_for_update in arg_folders:
576
                if not folder_for_update.startswith(self.managed_folder):
577
578
579
580
581
582
583
584
                    raise PathNotManagedException(
                        "The path :" + folder_for_update + "is not managed by this instance of the XtreemFS OSD"
                                                           "manager!")

        folder_size_updates = {}

        folders = arg_folders
        if arg_folders is None:
585
            folders = self.get_depth_2_subdirectories()
586
587
588
589
590
591
592
593
594
595
596

        for folder_for_update in folders:
            folder_id = self.path_on_volume(folder_for_update)
            command = ["du", "-s", folder_for_update]
            if self.debug:
                print("executing: " + str(command))
            du = subprocess.run(command, stdout=subprocess.PIPE, universal_newlines=True)
            folder_disk_size = int(du.stdout.split()[0])
            folder_size_updates[folder_id] = folder_disk_size

        for folder_for_update, size in folder_size_updates.items():
Felix Seibert's avatar
Felix Seibert committed
597
            self.distribution.update_folder(folder_for_update, size)
598
599
600
601
602
603
604

        self.__write_configuration()

        if self.debug:
            print(str(self))

    def apply_osd_assignments(self, assignments):
Felix Seibert's avatar
Felix Seibert committed
605
606
607
608
609
        """
        apply the given assignments to the XtreemFS volume, using xtfsutil.
        the assignments are given as a list containing tuples (tile_id, osd),
        where tile_id is given by applying path_on_volume() onto the absolute path of the folder.
        """
610
611
612
        if not div_util.check_for_executable('xtfsutil'):
            raise ExecutableNotFoundException("No xtfsutil found. Please make sure it is contained in your PATH.")

613
        if self.osd_selection_policy is not "1000,1004":
614
            if self.debug:
615
                subprocess.run(["xtfsutil", "--set-osp", "prefix", self.path_to_mount_point])
616
            else:
617
                subprocess.run(["xtfsutil", "--set-osp", "prefix", self.path_to_mount_point],
618
619
620
621
622
623
624
                               stdout=subprocess.PIPE, universal_newlines=True)

        for new_tile in assignments:
            if self.debug:
                subprocess.run(["xtfsutil",
                                "--set-pattr", "1004.filenamePrefix", "--value",
                                "add " + new_tile[0] + " " + new_tile[1] + "",
625
                                self.path_to_mount_point])
626
627
628
629
            else:
                subprocess.run(["xtfsutil",
                                "--set-pattr", "1004.filenamePrefix", "--value",
                                "add " + new_tile[0] + " " + new_tile[1] + "",
630
                                self.path_to_mount_point],
631
632
633
                               stdout=subprocess.PIPE, universal_newlines=True)

    def __copy_data(self, input_folders, environment, remote_source):
Felix Seibert's avatar
Felix Seibert committed
634
635
636
        """
        copy data onto XtreemFS volume
        """
Felix Seibert's avatar
Felix Seibert committed
637
638
        if self.debug:
            print('calling copy_data with: ', input_folders, environment, remote_source)
Felix Seibert's avatar
Felix Seibert committed
639
        osd_list = self.distribution.get_osd_list()
640
641
642
643
        osd_to_folders_map = {}
        for osd in osd_list:
            osd_to_folders_map[osd] = []
        for input_folder in input_folders:
Felix Seibert's avatar
Felix Seibert committed
644
            osd_for_tile = self.distribution.get_containing_osd(input_folder.id).uuid
645
646
647
648
649
650
651
652
653
654
655
            osd_to_folders_map[osd_for_tile].append(input_folder)

        if self.debug:
            print("osd to folders map:")
            for key, value in osd_to_folders_map.items():
                print("osd: " + key)
                print("assigned folders:")
                for input_folder in value:
                    print(str(input_folder))

        # trigger the copying!
Felix Seibert's avatar
Felix Seibert committed
656
        if environment == "SLURM":
657
658
            assert remote_source is not None
            copy_commands = self.__generate_copy_commands_slurm(osd_to_folders_map, remote_source)
Felix Seibert's avatar
Felix Seibert committed
659
        elif environment == "HU_CLUSTER":
660
661
662
663
664
665
666
667
            copy_commands = self.__generate_copy_commands_hu_cluster(osd_to_folders_map)
        else:
            copy_commands = self.__generate_copy_commands_local(osd_to_folders_map)

        for input_folder in input_folders:
            path_to_target_dir = self.get_target_dir(input_folder.id)
            os.makedirs(path_to_target_dir, exist_ok=True)

668
        self.__execute_commands(copy_commands)
669
670

    def __generate_copy_commands_slurm(self, osd_to_folders_map, remote_source):
Felix Seibert's avatar
Felix Seibert committed
671
672
673
674
675
676
677
        """
        generates a list of copy commands, one command for each OSD that receives new data.
        the copy commands are constructed such that they can be executed in a slurm environment (that is, within a slurm job
        allocation) at ZIB.
        each command is a copy command including all new folders for the corresponding OSD, preceded by a srun command
        to execute the copy command locally on the slurm node on which the target OSD resides.
        """
678
679
680
        if self.debug:
            print("Using SLURM mode for copying...")

Felix Seibert's avatar
Felix Seibert committed
681
682
683
684
685
686
687
688
689
        slurm_hosts = div_util.get_slurm_hosts()

        if self.debug:
            print('slurm_hosts: ', slurm_hosts)

        osd_to_host_map = div_util.get_osd_to_hostname_map(self.volume_information[1], slurm_hosts)

        if self.debug:
            print('osd_to_host_map: ', osd_to_host_map)
690
691
692

        command_list = []

693
        host_name = ""
694
        # command = ""
695
        for key in osd_to_folders_map.keys():
696
697
            if host_name == "":
                host_name = osd_to_host_map[key]
698
699
700
701
702
703
704
705
            command = ""
            for copy_folder in osd_to_folders_map[key]:
                command += "srun -N1-1 --nodelist=" + host_name
                command += " scp -rq " + remote_source + ":" + copy_folder.origin
                command += " " + self.get_target_dir(copy_folder.id)
                command += " ;"
            if len(osd_to_folders_map[key]) > 0:
                command_list.append(command)
706
        # command_list.append(command)
707
708
709
        return command_list

    def __generate_copy_commands_hu_cluster(self, osd_to_folders_map):
Felix Seibert's avatar
Felix Seibert committed
710
711
712
713
714
715
        """
        generates a list of copy commands, one command for each OSD that receives new data.
        the copy commands are constructed such that they can be executed on the GeoMultiSens cluster at HU Berlin.
        each command is a copy command including all new folders for the corresponding OSD, preceded by a ssh command
        to execute the copy command locally on the node of the target OSD.
        """
716
717
718
719
720
721
        if self.debug:
            print("Using HU_CLUSTER mode for copying...")

        if not div_util.check_for_executable('xtfsutil'):
            raise ExecutableNotFoundException("No xtfsutil found. Please make sure it is contained in your PATH.")

722
        xtfsutil = subprocess.run(["xtfsutil", self.path_to_mount_point],
723
                                  stdout=subprocess.PIPE, universal_newlines=True)
Felix Seibert's avatar
Felix Seibert committed
724
        volume_information = div_util.extract_volume_information(xtfsutil.stdout)
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
        osd_to_ip_address = {}
        for (osd, ip) in volume_information[1]:
            osd_to_ip_address[osd] = ip

        command_list = []

        for key in osd_to_folders_map.keys():
            ip_address = osd_to_ip_address[key]
            command = "ssh " + ip_address + " \'"
            for copy_folder in osd_to_folders_map[key]:
                command += " cp -r"
                command += " " + copy_folder.origin
                command += " " + self.get_target_dir(copy_folder.id)
                command += " ;"
            command += " \' "
            if len(osd_to_folders_map[key]) > 0:
                command_list.append(command)

        return command_list

    def __generate_copy_commands_local(self, osd_to_folders_map):
Felix Seibert's avatar
Felix Seibert committed
746
747
748
749
        """
        generates a list of copy commands, one command for each OSD that receives new data.
        plain old cp is used for the actual copying.
        """
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
        if self.debug:
            print("Using local cp for copying...")

        command_list = []

        for key in osd_to_folders_map.keys():
            command = ""
            for copy_folder in osd_to_folders_map[key]:
                command += "cp -r "
                command += copy_folder.origin
                command += " " + self.get_target_dir(copy_folder.id)
                command += " ; "

            if len(osd_to_folders_map[key]) > 0:
                command_list.append(command)

        return command_list

768
    def __execute_commands(self, command_list):
Felix Seibert's avatar
Felix Seibert committed
769
770
771
772
        """
        execute, in parallel, a given set of commands. note that the degree of parallelism will match the length of
        command_list.
        """
773
        if self.debug:
774
            print("Executing commands: ")
775
776
777
778
779
            for command in command_list:
                print(str(command))
            print("in total " + str(len(command_list)) + " commands.")
        processes = []
        for command in command_list:
Felix Seibert's avatar
Felix Seibert committed
780
            process = subprocess.Popen(command, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True)
781
            time.sleep(5)
782
783
784
785
786
787
            processes.append(process)

        for p in processes:
            p.wait()

        if self.debug:
Felix Seibert's avatar
Felix Seibert committed
788
789
            for terminated_process in processes:
                print(str(terminated_process.communicate()))
790
            print("Executing commands done.")
791

792
    def get_depth_2_subdirectories(self):
Felix Seibert's avatar
Felix Seibert committed
793
        """
794
        creates a list of all depth 2 subdirectories of self.managed_folder
Felix Seibert's avatar
Felix Seibert committed
795
        """
796
        subdirectories = []
797
798
        for depth_1_folder in os.listdir(self.managed_folder):
            depth_1_path = os.path.join(self.managed_folder, depth_1_folder)
799
800
            if os.path.isdir(depth_1_path):
                for depth_2_folder in os.listdir(depth_1_path):
801
                    depth_2_path = os.path.join(self.managed_folder, depth_1_folder, depth_2_folder)
802
                    if os.path.isdir(depth_2_path):
803
804
805
                        subdirectories.append(depth_2_path)

        return subdirectories
806

807
808
809
810
811
812
813
814
815
    def get_assigned_folder_ids(self):
        """
        creates a list of ids of all assigned folders (folders assigned to OSDs)
        """
        osd_list = self.distribution.get_osd_list()
        assigned_folders = []
        for osd in osd_list:
            for one_folder in self.distribution.OSDs[osd].folders:
                assigned_folders.append(one_folder)
816
817
818
        return assigned_folders

    def get_target_dir(self, folder_id):
Felix Seibert's avatar
Felix Seibert committed
819
        """
820
        gets the path to the target dir (where to copy the folder), given the folder_id
Felix Seibert's avatar
Felix Seibert committed
821
        """
822
        return os.path.split(self.get_absolute_file_path(folder_id))[0]
823

824
    def get_path_on_volume(self, path):
Felix Seibert's avatar
Felix Seibert committed
825
826
827
828
        """
        remove the leading part of the path, such that only the part onto the xtreemfs volume remains, including
        the volume itself.
        throws an exception when the path is not managed by this XtreemFS OSD manager.
829
        use this method to calculate the folder_id.
Felix Seibert's avatar
Felix Seibert committed
830
        """
831
        if not path.startswith(self.managed_folder):
832
833
            raise PathNotManagedException("Path " + path + " is not managed by this instance of the XtreemFS OSD"
                                                           "manager!")
834
        return os.path.join(self.volume_name, path[len(self.path_to_mount_point) + 1:])
835

836
837
838
    def get_absolute_file_path(self, folder_id):
        return os.path.join(self.path_to_mount_point, folder_id[len(self.volume_name) + 1:])

839
    def get_containing_folder_id(self, path_on_volume):
Felix Seibert's avatar
Felix Seibert committed
840
841
842
        """
        search for the assigned folder that is a prefix of the given path on volume
        """
Felix Seibert's avatar
Felix Seibert committed
843
        for osd in self.distribution.OSDs.values():
844
            for a_folder in osd.folders:
845
846
                if path_on_volume.startswith(a_folder):
                    return a_folder
847
848
        return None

849
    def __str__(self):
850
851
852
        representation = "pathToMountPoint: " + self.path_to_mount_point + " volumeName: " + self.volume_name + " pathOnVolume: " \
                         + self.path_on_volume
        representation += ("\nconfigFile: " + self.config_file + "\n")
Felix Seibert's avatar
Felix Seibert committed
853
        representation += self.distribution.description()
854
855
856
857
858
859
860
861
862
863
864
865
866
        return representation


class ExecutableNotFoundException(Exception):
    """raise this when an external executable can not be found"""


class NotAXtreemFSVolume(Exception):
    """raise this when a path does not point to a folder on a xtreemfs volume"""


class PathNotManagedException(Exception):
    """raise this when a path is handled, that is not managed by xOSDManager"""