dataDistribution.py 23.7 KB
Newer Older
1
import random
2
3
4
import copy

from ortools.graph import pywrapgraph
5

Felix Seibert's avatar
Felix Seibert committed
6
from xtreemfs_client import osd
7
from xtreemfs_client import folder
8

9

Felix Seibert's avatar
Felix Seibert committed
10
11
12
13
class DataDistribution(object):
    """
    class to keep track of the osd (object storage device) locations of different folders, i.e.,
    their physical location.
14

Felix Seibert's avatar
Felix Seibert committed
15
16
    this class also allows to calculate several data distributions, e.g., mappings from folders to OSDs (each folder
    gets mapped to one OSD).
Felix Seibert's avatar
Felix Seibert committed
17
    """
18
19

    def __init__(self):
20
        self.OSDs = {}
21

22
    def add_new_osd(self, osd_uuid):
Felix Seibert's avatar
Felix Seibert committed
23
24
25
        """
        create a new empty osd and add it to the existing OSDs.
        """
26
27
        if osd_uuid in self.OSDs:
            print("key: " + osd_uuid + " is already present!")
28
            return
29
30
        new_osd = osd.OSD(osd_uuid)
        self.OSDs[osd_uuid] = new_osd
31

32
    def add_osd(self, new_osd):
Felix Seibert's avatar
Felix Seibert committed
33
34
35
        """
        add the given OSD (object) to the existing OSDs.
        """
36
37
        if new_osd.uuid in self.OSDs:
            print("key: " + new_osd.uuid + " is already present!")
38
            return
39
        self.OSDs[new_osd.uuid] = new_osd
40

41
    def add_osd_list(self, osd_list):
Felix Seibert's avatar
Felix Seibert committed
42
43
44
        """
        add the given list of OSDs (objects) to the existing OSDs.
        """
45
46
47
48
        for osd_uuid in osd_list:
            if osd_uuid not in self.OSDs:
                new_osd = osd.OSD(osd_uuid)
                self.OSDs[osd_uuid] = new_osd
49

50
51
52
53
54
55
56
57
58
    def replace_osd(self, new_osd):
        """
        replaces the osd with uuid new_osd.uuid by new_osd
        :param new_osd:
        :return:
        """
        assert new_osd.uuid in self.OSDs.keys()
        self.OSDs[new_osd.uuid] = new_osd

59
60
61
62
63
64
    def set_osd_capacities(self, osd_capacities):
        """
        set osd capacities
        :param osd_capacities: map from osd uuids to osd capacities
        :return:
        """
65
66
67
68
        # make sure that the keyset of self.OSDs matches the keyset of osd_capacities
        for osd_uuid in osd_capacities:
            assert osd_uuid in self.OSDs.keys()
        assert len(self.OSDs) == len(osd_capacities)
69
70
71
72
73
74
75
76
77
78
79
80
81
        for one_osd in self.OSDs.values():
            assert type(osd_capacities[one_osd.uuid]) is int
            one_osd.capacity = osd_capacities[one_osd.uuid]

    def set_osd_bandwidths(self, osd_bandwidths):
        """
        set osd bandwidths
        :param osd_bandwidths:
        :return:
        """
        for one_osd in self.OSDs.values():
            one_osd.bandwidth = osd_bandwidths[one_osd.uuid]

82
    def get_osd_list(self):
Felix Seibert's avatar
Felix Seibert committed
83
        """
84
        get a list of all existing OSD uuids.
Felix Seibert's avatar
Felix Seibert committed
85
        """
86
        osd_list = []
87
        for osd_name in self.OSDs.keys():
88
89
90
            osd_list.append(osd_name)
        return osd_list

91
    def get_containing_osd(self, folder_id):
Felix Seibert's avatar
Felix Seibert committed
92
93
94
        """
        get the OSD containing the given folder_id, or None if the folder is not assigned to any OSD.
        """
95
96
97
        for checked_osd in self.OSDs.values():
            if checked_osd.contains_folder(folder_id):
                return checked_osd
98
99
        return None

Felix Seibert's avatar
Felix Seibert committed
100
101
102
103
104
105
106
107
108
109
110
    def assign_new_osd(self, folder_id, new_osd):
        """
        assign folder_id to new_osd. if folder_id already is assigned to an OSD, this old assignment is deleted.
        """
        old_osd = self.get_containing_osd(folder_id)
        if old_osd is None:
            self.OSDs[new_osd].add_folder(folder_id, self.get_average_folder_size())
        else:
            self.OSDs[new_osd].add_folder(folder_id, self.OSDs[old_osd.uuid].folders[folder_id])
            self.OSDs[old_osd.uuid].remove_folder(folder_id)

111
112
113
114
115
116
117
118
119
120
121
122
    def get_total_folder_size(self):
        total_size = 0
        for one_osd in self.OSDs.values():
            total_size += one_osd.total_folder_size
        return total_size

    def get_total_bandwidth(self):
        total_bandwidth = 0
        for one_osd in self.OSDs.values():
            total_bandwidth += one_osd.bandwidth
        return total_bandwidth

123
124
125
126
127
128
    def get_total_capacity(self):
        total_capacity = 0
        for one_osd in self.OSDs.values():
            total_capacity += one_osd.capacity
        return total_capacity

129
    def get_average_folder_size(self):
Felix Seibert's avatar
Felix Seibert committed
130
131
132
        """
        get the average folder size of all folders of all OSDs.
        """
133
        total_size = 0
134
        total_number_of_folders = 0
135
        for one_osd in self.OSDs.values():
Felix Seibert's avatar
Felix Seibert committed
136
            total_size += one_osd.total_folder_size
137
            total_number_of_folders += len(one_osd.folders)
138
139
        if total_number_of_folders == 0:
            return 0
140
141
        return total_size / total_number_of_folders

142
    def get_average_load(self):
143
        """
144
        calculate the average OSD load, that is, the average of their total_folder_size.
145
146
        """
        total_folder_size = 0
147
148
149
        for osd in self.OSDs.values():
            total_folder_size += osd.get_load()
        return total_folder_size / len(self.OSDs)
150

151
    def get_maximum_load(self):
152
        """
153
        calculate the maximum OSD load, that is, the maximum of their total_folder_size.
154
155
156
157
        """
        maximum_load = 0
        maximum_osd = None
        for osd in self.OSDs.values():
158
            load = osd.total_folder_size
159
160
161
162
163
            if maximum_osd is None or load > maximum_load:
                maximum_load = load
                maximum_osd = osd
        return maximum_osd, maximum_load

164
    def get_average_processing_time(self):
165
        """
166
167
        calculate the average OSD processing time, that is, the average of their (total_folder_size / bandwidth).
        :return:
168
        """
169
        total_processing_time = 0
170
        for osd in self.OSDs.values():
171
172
            total_processing_time += osd.get_processing_time()
        return total_processing_time / len(self.OSDs)
173

174
175
176
177
178
179
180
181
182
183
184
185
    def get_maximum_processing_time(self):
        """
        calculate the maximum OSD processing time, also known as makespan
        """
        maximum_processing_time = 0
        maximum_osd = None
        for osd in self.OSDs.values():
            processing_time = osd.get_processing_time()
            if maximum_osd is None or processing_time > maximum_processing_time:
                maximum_processing_time = processing_time
                maximum_osd = osd
        return maximum_osd, maximum_processing_time
186

187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
    def get_lower_bound_on_makespan(self):
        total_remaining_file_size = self.get_total_folder_size()
        artificial_data_distribution = copy.deepcopy(self)
        for tmp_osd in artificial_data_distribution.OSDs.values():
            for a_folder in list(tmp_osd.folders.keys()):
                tmp_osd.remove_folder(a_folder)

        while total_remaining_file_size > 0:
            free_OSDs = list(filter(lambda x: x.get_free_capacity() > 0, artificial_data_distribution.OSDs.values()))
            total_bandwidth = sum(list(map(lambda x: x.bandwidth, free_OSDs)))
            assigned_file_size = 0
            for free_OSD in free_OSDs:
                optimal_share = (free_OSD.bandwidth / total_bandwidth) * total_remaining_file_size
                assignable_share = min(optimal_share, free_OSD.get_free_capacity())
                free_OSD.add_folder("dummy_id", assignable_share)
                assigned_file_size += assignable_share
            total_remaining_file_size -= assigned_file_size

        return artificial_data_distribution.get_maximum_processing_time()[1]

207
208
209
    def add_folders(self, folders,
                    ignore_osd_capacities=True,
                    random_osd_assignment=False,
210
                    ignore_folder_sizes=False,
211
212
                    debug=False,
                    random_seed=None):
Felix Seibert's avatar
Felix Seibert committed
213
214
        """
        adds a list of folders to the data distribution.
Felix Seibert's avatar
Felix Seibert committed
215
        if not specified otherwise, the assignments are calculated using the LPT algorithm.
Felix Seibert's avatar
Felix Seibert committed
216
217
        returns a list of assignments from folders to OSDs, for which (folders) there was previously no assignment.

218
219
        if capacities and bandwidths are set for the OSDs, folders are assigned accordingly
        (capacities are respected and OSDs with higher bandwidth obtain more/larger files).
220
221
222
223
224

        if random_osd_assignment=True and ignore_osd_capacities=True, a totally random OSD assignment generated.

        if random_osd_assignment=True and ignore_folder_sizes=True,
        folders are randomly assigned to OSDs such that all OSDs have the same number of folders (if possible).
225
226

        the assignment is stable (i.e., folders already assigned to an OSD are not reassigned to another OSD).
Felix Seibert's avatar
Felix Seibert committed
227
        """
228
229

        # find out which folders are not assigned yet
230
        new_folders = []
231
        for a_folder in folders:
232
            # TODO adding folders to OSDs might violate their capacity
233
            containing_osd = self.get_containing_osd(a_folder.id)
234
            if containing_osd is not None:
235
                containing_osd.add_folder(a_folder.id, a_folder.size)
236
            else:
237
                new_folders.append(a_folder)
238

239
240
        if debug:
            print("dataDistribution: random_osd_assignment: " + str(random_osd_assignment))
Felix Seibert's avatar
Felix Seibert committed
241

242
243
        # keep track of which unassigned folder gets assigned to which OSD.
        # this information must be returned
244
245
        osds_for_new_folders = []

246
247
248
        if random_osd_assignment:
            random.seed(random_seed)

249
        # totally random OSD assignment, ignoring OSD capacities
250
        # (might lead to I/O errors when too many groups are assigned to an OSD)
251
        if random_osd_assignment and ignore_osd_capacities and not ignore_folder_sizes:
252
253
            if debug:
                print("using totally random osd assignment")
254
            for a_folder in new_folders:
Felix Seibert's avatar
Felix Seibert committed
255
                random_osd = random.choice(list(self.OSDs.values()))
256
257
                random_osd.add_folder(a_folder.id, a_folder.size)
                osds_for_new_folders.append((a_folder.id,
Felix Seibert's avatar
Felix Seibert committed
258
                                             random_osd.uuid))
Felix Seibert's avatar
Felix Seibert committed
259
            return osds_for_new_folders
260

261
262
        # random OSD assignment respecting OSD capacities
        elif random_osd_assignment and not ignore_osd_capacities:
263
264
            if debug:
                print("using random osd assignment, respecting osd capacities")
265
            for a_folder in new_folders:
266
                suitable_osds = self.get_suitable_osds(a_folder.size)  # list of OSDs with enough capacity
267
                suitable_random_osd = random.choice(suitable_osds)
268
269
                suitable_random_osd.add_folder(a_folder.id, a_folder.size)
                osds_for_new_folders.append((a_folder.id,
270
271
272
                                             suitable_random_osd.uuid))
            return osds_for_new_folders

273
        # random OSD assignment ignoring folder sizes // round-robin style distribution with some randomness
274
        elif random_osd_assignment and ignore_folder_sizes:
275
276
277
278
279
280
281
282
283
284
            if debug:
                print("using random osd assignment ignoring folder sizes")

            average_folder_size = self.get_average_folder_size()
            if average_folder_size == 0:
                average_folder_size = 1

            modified_folders = list(map(lambda f: folder.Folder(f.id, average_folder_size, f.origin), folders))
            random.shuffle(modified_folders)
            return self.add_folders(modified_folders)
285

286
287
        # balanced deterministic OSD assignment (LPT)
        # (following largest processing time first, also called post-greedy approach)
288
289
        list.sort(new_folders, key=lambda x: x.size, reverse=True)

290
        # for each folder calculate the best OSD and add it to it
291
        for a_folder in new_folders:
292
            least_used_osd, _ = self.get_lpt_osd(a_folder.size)
293
294
            least_used_osd.add_folder(a_folder.id, a_folder.size)
            osds_for_new_folders.append((a_folder.id,
295
                                         least_used_osd.uuid))
296
297
        return osds_for_new_folders

298
    def rebalance_lpt(self, rebalance_factor=1):
299
300
301
        """
        rebalance folders to OSDs by assigning folders to new OSDs using the following strategy:
                1. 'unroll' the assignment. this means that, for each OSD, folders are removed until the OSD has less
302
                processing time than the average processing time of this distribution multiplied by rebalance_factor.
303
304
                2. reassign the removed folders using the LPT strategy.
        """
305
        total_folder_size = self.get_total_folder_size()
306
307
308
309
        movements = {}
        folders_to_be_reassigned = []

        # for each OSD, remove the smallest folder until its total_folder_size does not exceed the reassignment_limit
310
        # unrolling
311
        for osd in self.OSDs.values():
312
313
            # self.get_total_folder_size / self.get_total_bandwidth() is the optimal processing time for each OSD:
            # this value is a lower bound for the makespan
314
            reassignment_limit = self.get_rebalance_limit(rebalance_factor, total_folder_size)
315
            while osd.get_processing_time() > reassignment_limit:
316
317
318
319
320
                folder_id, folder_size = osd.get_smallest_folder()
                folders_to_be_reassigned.append(folder.Folder(folder_id, folder_size, None))
                movements[folder_id] = osd.uuid
                osd.remove_folder(folder_id)

321
        # reassignment
322
        new_assignments = self.add_folders(folders_to_be_reassigned)
323
324

        for folder_id, target in new_assignments:
325
326
327
328
            if movements[folder_id] == target:
                del movements[folder_id]
            else:
                movements[folder_id] = (movements[folder_id], target)
329

330
        return movements
331

332
333
334
    def get_rebalance_limit(self, factor, total_folder_size):
        return factor * (total_folder_size / self.get_total_bandwidth())

335
    def rebalance_one_folder(self):
336
337
        """
        rebalance folders to OSDs by assigning folders to new OSDs using the following strategy:
338
                1. find OSD with the highest processing time
339
340
                2. get folder with smallest size on this OSD
                3. find new OSD for this folder using get_lpt_osd
341
342
                4. if the processing time on the new OSD is lower than on the original OSD,
                move the folder to the new OSD. otherwise, return.
343
344
345
346
347
348
349
350
351
352
353
354
        one open question is whether getting the folder with smallest size in step 2 is a clever choice
        (in principle, all folders of the OSD with the highest load are eligible).

        this optimization scheme classifies as local search. two distributions are neighbors if one can be transformed
        into the other by moving one folder from one OSD to another. note, however, that we do not search the whole
        neighborhood of a distribution.
        but it might be possible to show that if there is no improvement step of the type that we check for,
        there is no improvement step at all.
        """
        movements = {}

        while True:
355
356
            # find OSD with the highest processing time (origin)
            origin_osd, maximum_processing_time = self.get_maximum_processing_time()
357
358
359
360
361
362
363
364
365

            # pick a folder of this OSD
            # there are several ways to pick a folder (like largest, smallest, constrained by the resulting load of the
            # origin OSD, random...), it is not clear which way is a good way
            # for now pick the smallest folder on origin OSD
            smallest_folder_id, smallest_folder_size = self.OSDs[origin_osd.uuid].get_smallest_folder()

            # find other OSD best suited for the picked folder (target)
            # check whether moving folder from origin to target decreases the maximum load of all OSDs (makespan).
366
            best_osd, best_osd_processing_time = self.get_lpt_osd(smallest_folder_size)
367

368
            if best_osd_processing_time < maximum_processing_time:
369
370
371
372
373
374
375
                self.assign_new_osd(smallest_folder_id, best_osd.uuid)
                movements[smallest_folder_id] = (origin_osd.uuid, best_osd.uuid)
            else:
                break

        return movements

376
    def rebalance_two_steps_optimal_matching(self):
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
        """
        rebalance the distribution in two steps:
            1. calculate new distribution, independently of the current one
            2. use a minimum weight matching to transform the current distribution into the new distribution.
            minimum weight perfect matching on bipartite graphs can be solved using the successive shortest path
            algorithm.
        while any algorithm (solving/approximating that kind of problem) could be used for the first step,
        we here only implement the LPT algorithm, as it is a pretty good approximation with extremely good running time.
        :return:
        """
        virtual_distribution = copy.deepcopy(self)
        virtual_distribution.rebalance_lpt(rebalance_factor=0)

        # create a mincostflow object
        min_cost_flow = pywrapgraph.SimpleMinCostFlow()

        # define the directed graph for the flow
        # arcs are added individually, and are added implicitly
        # nodes (OSDs) have to be given by numeric id
        # so we need some conversion logic between current/virtual osds and node ids

        current_osds_list = list(self.OSDs.values())
        current_osds_list.sort(key=lambda x: x.uuid)
        virtual_osds_list = list(virtual_distribution.OSDs.values())
        virtual_osds_list.sort(key=lambda x: x.uuid)

        # conversion logic:
        # n = len(current_osd_list) = len(virtual_osd_list)
        # 0 = source, 1 = sink
        # 2, ..., n + 1: current OSDs
        # n + 2, ..., 2n + 1: virtual OSDs
        num_osds = len(current_osds_list)
        assert num_osds == len(virtual_osds_list)

        # edges between the two partitions
        for i in range(0, num_osds):
            for j in range(0, num_osds):
                current_osd = current_osds_list[i]
                virtual_osd = virtual_osds_list[j]
                # calculate the total size of folders that the current OSD has to fetch if the virtual OSD is assigned
                # to it
                edge_cost = 0
                for folder_id in virtual_osd.folders.keys():
                    if not current_osd.contains_folder(folder_id):
                        edge_cost += virtual_osd.folders[folder_id]
                tail = 2 + i  # current OSD
                head = num_osds + 2 + j  # virtual OSD
                min_cost_flow.AddArcWithCapacityAndUnitCost(tail, head, 1, edge_cost)

        # (artificial) edges between the source node and the current OSDs
        for i in range(0, num_osds):
            edge_cost = 0
            tail = 0
            head = i + 2
            min_cost_flow.AddArcWithCapacityAndUnitCost(tail, head, 1, edge_cost)

        # (artificial) edges between the virtual OSDs and the sink node
        for j in range(0, num_osds):
            edge_cost = 0
            tail = num_osds + 2 + j
            head = 1
            min_cost_flow.AddArcWithCapacityAndUnitCost(tail, head, 1, edge_cost)

        # define the supplies (which equals the number of OSDs)
        min_cost_flow.SetNodeSupply(0, num_osds)
        min_cost_flow.SetNodeSupply(1, -num_osds)

        # solve the min cost flow problem
        min_cost_flow.Solve()

        # we need to transform the calculated optimal assignment into a rebalanced distribution, including the necessary
        # movements
        current_to_virtual_osd_matching = []
        for arc in range(min_cost_flow.NumArcs()):
            tail = min_cost_flow.Tail(arc)
            head = min_cost_flow.Head(arc)
            if tail != 0 and head != 1 and min_cost_flow.Flow(arc) == 1:
                current_osd = current_osds_list[tail - 2]
                virtual_osd = virtual_osds_list[head - num_osds - 2]
                current_to_virtual_osd_matching.append((current_osd, virtual_osd))

        movements = {}
        for current_osd, virtual_osd in current_to_virtual_osd_matching:
            # iterate over virtual folders and check whether they are on the correct OSD.
            # the correct OSD is current_osd, as it is the one that is matched with virtual_osd.
            # if it is not present on current_osd, assign it to it.
            # this also removes it from the origin osd.
            for virtual_folder in virtual_osd.folders.keys():
                if not current_osd.contains_folder(virtual_folder):
                    origin_osd = self.get_containing_osd(virtual_folder).uuid
                    target_osd = current_osd.uuid
                    movements[virtual_folder] = (origin_osd, target_osd)
469
470
471
472

        for current_osd, virtual_osd in current_to_virtual_osd_matching:
            virtual_osd.uuid = current_osd.uuid
            self.replace_osd(virtual_osd)
473
474
475

        return movements

476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
    def rebalance_two_steps_random_matching(self):
        """
        rebalance the distribution in two steps:
            1. calculate new distribution, independently of the current one
            2. the OSDs of the new (virtual) matching are randomly assigned to the actual (current OSDs), i.e.,
            no matter which OSD has which folders.
        while any algorithm (solving/approximating that kind of problem) could be used for the first step,
        we here only implement the LPT algorithm, as it is a pretty good approximation with extremely good running time.
        :return:
        """
        virtual_distribution = copy.deepcopy(self)
        virtual_distribution.rebalance_lpt(rebalance_factor=0)

        movements = {}

        for virtual_osd in virtual_distribution.OSDs.values():
            for virtual_folder in virtual_osd.folders.keys():
                if not self.OSDs[virtual_osd.uuid].contains_folder(virtual_folder):
                    movements[virtual_folder] = (self.get_containing_osd(virtual_folder).uuid, virtual_osd.uuid)

        for virtual_osd in virtual_distribution.OSDs.values():
            self.replace_osd(virtual_osd)

        return movements

501
502
503
504
505
506
507
508
509
    def get_suitable_osds(self, folder_size):
        """
        create a list of OSDs with at least folder_size free capacity.
        :return:
        """
        suitable_osds = []
        for one_osd in self.OSDs.values():
            if one_osd.capacity - one_osd.total_folder_size - folder_size >= 0:
                suitable_osds.append(one_osd)
510
511
512
513
        if len(suitable_osds) == 0:
            print("no suitable OSD found!")
            print("total OSD capacity: " + str(self.get_total_capacity()))
            print("current total folder size: " + str(self.get_total_folder_size()))
514
515
        return suitable_osds

516
    def get_lpt_osd(self, folder_size):
517
        """
518
        calculate the processing time of all OSDs, using the sum of their current total_folder_size and folder_size.
519
520
        return (OSD with the smallest such value, the smallest value)
        """
521
522
        best_processing_time = None
        best_processing_time_osd = -1
523
        for one_osd in self.get_suitable_osds(folder_size):
524
525
526
527
528
            processing_time = (one_osd.total_folder_size + folder_size) / one_osd.bandwidth
            if (best_processing_time is None) or processing_time < best_processing_time_osd:
                best_processing_time = one_osd
                best_processing_time_osd = processing_time
        return best_processing_time, best_processing_time_osd
529

530
    def update_folder(self, folder, size):
Felix Seibert's avatar
Felix Seibert committed
531
532
533
        """
        updates the size of a given folder
        """
534
535
536
        for one_osd in self.OSDs.values():
            if folder in one_osd.folders.keys():
                one_osd.update_folder(folder, size)
537
538
                break

539
    def description(self):
Felix Seibert's avatar
Felix Seibert committed
540
541
542
        """
        generates a string describing this data distribution
        """
543
        string = ""
544
545
        for one_osd in self.OSDs.values():
            string += str(one_osd)
546
            string += "\n"
547
            string += "folders : " + str(one_osd.folders)
548
            string += "\n"
549
        string += "average folder size: " + str(self.get_average_folder_size())
550
551
552
        return string

    def __str__(self):
553
554
555
556
557
        string_representation = "DataDistribution has " + str(len(self.OSDs)) \
                                + " osds: \n"
        for key, value in self.OSDs.items():
            string_representation += str(value) + " \n"
        return string_representation