dataDistribution.py 15.1 KB
Newer Older
1
2
import random

Felix Seibert's avatar
Felix Seibert committed
3
from xtreemfs_client import osd
4
from xtreemfs_client import folder
5

6

Felix Seibert's avatar
Felix Seibert committed
7
8
9
10
class DataDistribution(object):
    """
    class to keep track of the osd (object storage device) locations of different folders, i.e.,
    their physical location.
11

Felix Seibert's avatar
Felix Seibert committed
12
13
    this class also allows to calculate several data distributions, e.g., mappings from folders to OSDs (each folder
    gets mapped to one OSD).
Felix Seibert's avatar
Felix Seibert committed
14
    """
15
16

    def __init__(self):
17
        self.OSDs = {}
18

19
    def add_new_osd(self, osd_uuid):
Felix Seibert's avatar
Felix Seibert committed
20
21
22
        """
        create a new empty osd and add it to the existing OSDs.
        """
23
24
        if osd_uuid in self.OSDs:
            print("key: " + osd_uuid + " is already present!")
25
            return
26
27
        new_osd = osd.OSD(osd_uuid)
        self.OSDs[osd_uuid] = new_osd
28

29
    def add_osd(self, new_osd):
Felix Seibert's avatar
Felix Seibert committed
30
31
32
        """
        add the given OSD (object) to the existing OSDs.
        """
33
34
        if new_osd.uuid in self.OSDs:
            print("key: " + new_osd.uuid + " is already present!")
35
            return
36
        self.OSDs[new_osd.uuid] = new_osd
37

38
    def add_osd_list(self, osd_list):
Felix Seibert's avatar
Felix Seibert committed
39
40
41
        """
        add the given list of OSDs (objects) to the existing OSDs.
        """
42
43
44
45
        for osd_uuid in osd_list:
            if osd_uuid not in self.OSDs:
                new_osd = osd.OSD(osd_uuid)
                self.OSDs[osd_uuid] = new_osd
46

47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
    def set_osd_capacities(self, osd_capacities):
        """
        set osd capacities
        :param osd_capacities: map from osd uuids to osd capacities
        :return:
        """
        for one_osd in self.OSDs.values():
            assert type(osd_capacities[one_osd.uuid]) is int
            one_osd.capacity = osd_capacities[one_osd.uuid]

    def set_osd_bandwidths(self, osd_bandwidths):
        """
        set osd bandwidths
        :param osd_bandwidths:
        :return:
        """
        for one_osd in self.OSDs.values():
            one_osd.bandwidth = osd_bandwidths[one_osd.uuid]

66
    def get_osd_list(self):
Felix Seibert's avatar
Felix Seibert committed
67
        """
68
        get a list of all existing OSD uuids.
Felix Seibert's avatar
Felix Seibert committed
69
        """
70
        osd_list = []
71
        for osd_name in self.OSDs.keys():
72
73
74
            osd_list.append(osd_name)
        return osd_list

75
    def get_containing_osd(self, folder_id):
Felix Seibert's avatar
Felix Seibert committed
76
77
78
        """
        get the OSD containing the given folder_id, or None if the folder is not assigned to any OSD.
        """
79
80
81
        for checked_osd in self.OSDs.values():
            if checked_osd.contains_folder(folder_id):
                return checked_osd
82
83
        return None

Felix Seibert's avatar
Felix Seibert committed
84
85
86
87
88
89
90
91
92
93
94
    def assign_new_osd(self, folder_id, new_osd):
        """
        assign folder_id to new_osd. if folder_id already is assigned to an OSD, this old assignment is deleted.
        """
        old_osd = self.get_containing_osd(folder_id)
        if old_osd is None:
            self.OSDs[new_osd].add_folder(folder_id, self.get_average_folder_size())
        else:
            self.OSDs[new_osd].add_folder(folder_id, self.OSDs[old_osd.uuid].folders[folder_id])
            self.OSDs[old_osd.uuid].remove_folder(folder_id)

95
    def get_average_folder_size(self):
Felix Seibert's avatar
Felix Seibert committed
96
97
98
        """
        get the average folder size of all folders of all OSDs.
        """
99
        total_size = 0
100
        total_number_of_folders = 0
101
        for one_osd in self.OSDs.values():
Felix Seibert's avatar
Felix Seibert committed
102
            total_size += one_osd.total_folder_size
103
            total_number_of_folders += len(one_osd.folders)
104
105
        if total_number_of_folders == 0:
            return 0
106
107
        return total_size / total_number_of_folders

108
    def get_average_load(self):
109
        """
110
        calculate the average OSD load, that is, the average of their total_folder_size.
111
112
        """
        total_folder_size = 0
113
114
115
        for osd in self.OSDs.values():
            total_folder_size += osd.get_load()
        return total_folder_size / len(self.OSDs)
116

117
    def get_maximum_load(self):
118
        """
119
        calculate the maximum OSD load, that is, the maximum of their total_folder_size.
120
121
122
123
        """
        maximum_load = 0
        maximum_osd = None
        for osd in self.OSDs.values():
124
            load = osd.total_folder_size
125
126
127
128
129
            if maximum_osd is None or load > maximum_load:
                maximum_load = load
                maximum_osd = osd
        return maximum_osd, maximum_load

130
    def get_average_processing_time(self):
131
        """
132
133
        calculate the average OSD processing time, that is, the average of their (total_folder_size / bandwidth).
        :return:
134
        """
135
        total_processing_time = 0
136
        for osd in self.OSDs.values():
137
138
            total_processing_time += osd.get_processing_time()
        return total_processing_time / len(self.OSDs)
139

140
141
142
143
144
145
146
147
148
149
150
151
    def get_maximum_processing_time(self):
        """
        calculate the maximum OSD processing time, also known as makespan
        """
        maximum_processing_time = 0
        maximum_osd = None
        for osd in self.OSDs.values():
            processing_time = osd.get_processing_time()
            if maximum_osd is None or processing_time > maximum_processing_time:
                maximum_processing_time = processing_time
                maximum_osd = osd
        return maximum_osd, maximum_processing_time
152

153
154
155
    def add_folders(self, folders,
                    ignore_osd_capacities=True,
                    random_osd_assignment=False,
156
157
                    ignore_folder_sizes=False,
                    debug=False):
Felix Seibert's avatar
Felix Seibert committed
158
159
        """
        adds a list of folders to the data distribution.
Felix Seibert's avatar
Felix Seibert committed
160
        if not specified otherwise, the assignments are calculated using the LPT algorithm.
Felix Seibert's avatar
Felix Seibert committed
161
162
        returns a list of assignments from folders to OSDs, for which (folders) there was previously no assignment.

163
164
        if capacities and bandwidths are set for the OSDs, folders are assigned accordingly
        (capacities are respected and OSDs with higher bandwidth obtain more/larger files).
165
166
167
168
169

        if random_osd_assignment=True and ignore_osd_capacities=True, a totally random OSD assignment generated.

        if random_osd_assignment=True and ignore_folder_sizes=True,
        folders are randomly assigned to OSDs such that all OSDs have the same number of folders (if possible).
170
171

        the assignment is stable (i.e., folders already assigned to an OSD are not reassigned to another OSD).
Felix Seibert's avatar
Felix Seibert committed
172
        """
173
174

        # find out which folders are not assigned yet
175
        new_folders = []
176
177
        for a_folder in folders:
            containing_osd = self.get_containing_osd(a_folder.id)
178
            if containing_osd is not None:
179
                containing_osd.add_folder(a_folder.id, a_folder.size)
180
            else:
181
                new_folders.append(a_folder)
182

183
184
        if debug:
            print("dataDistribution: random_osd_assignment: " + str(random_osd_assignment))
Felix Seibert's avatar
Felix Seibert committed
185

186
187
        # keep track of which unassigned folder gets assigned to which OSD.
        # this information must be returned
188
189
        osds_for_new_folders = []

190
191
        # totally random OSD assignment, even ignoring OSD capacities
        # (might lead to I/O errors when too many groups are assigned to an OSD)
192
        if random_osd_assignment and ignore_osd_capacities and not ignore_folder_sizes:
193
194
            if debug:
                print("using totally random osd assignment")
195
            for a_folder in new_folders:
Felix Seibert's avatar
Felix Seibert committed
196
                random_osd = random.choice(list(self.OSDs.values()))
197
198
                random_osd.add_folder(a_folder.id, a_folder.size)
                osds_for_new_folders.append((a_folder.id,
Felix Seibert's avatar
Felix Seibert committed
199
                                             random_osd.uuid))
Felix Seibert's avatar
Felix Seibert committed
200
            return osds_for_new_folders
201

202
203
        # random OSD assignment respecting OSD capacities
        elif random_osd_assignment and not ignore_osd_capacities:
204
205
            if debug:
                print("using random osd assignment, respecting osd capacities")
206
            for a_folder in new_folders:
207
                suitable_osds = []  # list of OSDs with enough capacity
208
                for one_osd in self.OSDs.values():
209
                    if one_osd.capacity - one_osd.total_folder_size - a_folder.size >= 0:
210
211
                        suitable_osds.append(one_osd)
                suitable_random_osd = random.choice(suitable_osds)
212
213
                suitable_random_osd.add_folder(a_folder.id, a_folder.size)
                osds_for_new_folders.append((a_folder.id,
214
215
216
                                             suitable_random_osd.uuid))
            return osds_for_new_folders

217
        # random OSD assignment ignoring folder sizes // round-robin style distribution with some randomness
218
        elif random_osd_assignment and ignore_folder_sizes:
219
220
221
222
223
224
225
226
227
228
            if debug:
                print("using random osd assignment ignoring folder sizes")

            average_folder_size = self.get_average_folder_size()
            if average_folder_size == 0:
                average_folder_size = 1

            modified_folders = list(map(lambda f: folder.Folder(f.id, average_folder_size, f.origin), folders))
            random.shuffle(modified_folders)
            return self.add_folders(modified_folders)
229

230
231
        # balanced deterministic OSD assignment (LPT)
        # (following largest processing time first, also called post-greedy approach)
232
233
        list.sort(new_folders, key=lambda x: x.size, reverse=True)

234
        # for each folder calculate the best OSD and add it to it
235
        for a_folder in new_folders:
236
            least_used_osd, _ = self.get_lpt_osd(a_folder.size)
237
238
            least_used_osd.add_folder(a_folder.id, a_folder.size)
            osds_for_new_folders.append((a_folder.id,
239
                                         least_used_osd.uuid))
240
241
        return osds_for_new_folders

242
    def rebalance_lpt(self, rebalance_factor=1):
243
244
245
        """
        rebalance folders to OSDs by assigning folders to new OSDs using the following strategy:
                1. 'unroll' the assignment. this means that, for each OSD, folders are removed until the OSD has less
246
                processing time than the average processing time of this distribution multiplied by rebalance_factor.
247
248
                2. reassign the removed folders using the LPT strategy.
        """
249
250
        movements = {}
        folders_to_be_reassigned = []
251
252
253
        # TODO reassignment factor based on load or based on processing time?
        # as long as we use OSDs with the same bandwidth, there is no difference.
        reassignment_limit = self.get_average_processing_time() * rebalance_factor
254
255

        # for each OSD, remove the smallest folder until its total_folder_size does not exceed the reassignment_limit
256
        # unrolling
257
        for osd in self.OSDs.values():
258
259
260
261
262
            # TODO how to calculate the 'unrolling limit' for each individual OSD?
            # again, as long as all OSDs have the same bandwidth, there is no difference.
            # BUT it should definitely not depend on the load or pt of the OSD.
            # so for now we use a static limit, the same for all OSDs.
            while osd.get_processing_time() > reassignment_limit:
263
264
265
266
267
                folder_id, folder_size = osd.get_smallest_folder()
                folders_to_be_reassigned.append(folder.Folder(folder_id, folder_size, None))
                movements[folder_id] = osd.uuid
                osd.remove_folder(folder_id)

268
        # reassignment
269
        new_assignments = self.add_folders(folders_to_be_reassigned)
270
271
272
273

        for folder_id, target in new_assignments:
            movements[folder_id] = (movements[folder_id], target)

274
        return movements, reassignment_limit
275

276
    def rebalance_one_folder(self):
277
278
        """
        rebalance folders to OSDs by assigning folders to new OSDs using the following strategy:
279
                1. find OSD with the highest processing time
280
281
                2. get folder with smallest size on this OSD
                3. find new OSD for this folder using get_lpt_osd
282
283
                4. if the processing time on the new OSD is lower than on the original OSD,
                move the folder to the new OSD. otherwise, return.
284
285
286
287
288
289
290
291
292
293
294
295
        one open question is whether getting the folder with smallest size in step 2 is a clever choice
        (in principle, all folders of the OSD with the highest load are eligible).

        this optimization scheme classifies as local search. two distributions are neighbors if one can be transformed
        into the other by moving one folder from one OSD to another. note, however, that we do not search the whole
        neighborhood of a distribution.
        but it might be possible to show that if there is no improvement step of the type that we check for,
        there is no improvement step at all.
        """
        movements = {}

        while True:
296
297
            # find OSD with the highest processing time (origin)
            origin_osd, maximum_processing_time = self.get_maximum_processing_time()
298
299
300
301
302
303
304
305
306

            # pick a folder of this OSD
            # there are several ways to pick a folder (like largest, smallest, constrained by the resulting load of the
            # origin OSD, random...), it is not clear which way is a good way
            # for now pick the smallest folder on origin OSD
            smallest_folder_id, smallest_folder_size = self.OSDs[origin_osd.uuid].get_smallest_folder()

            # find other OSD best suited for the picked folder (target)
            # check whether moving folder from origin to target decreases the maximum load of all OSDs (makespan).
307
            best_osd, best_osd_processing_time = self.get_lpt_osd(smallest_folder_size)
308

309
            if best_osd_processing_time < maximum_processing_time:
310
311
312
313
314
315
316
                self.assign_new_osd(smallest_folder_id, best_osd.uuid)
                movements[smallest_folder_id] = (origin_osd.uuid, best_osd.uuid)
            else:
                break

        return movements

317
    def get_lpt_osd(self, folder_size):
318
        """
319
        calculate the processing time of all OSDs, using the sum of their current total_folder_size and folder_size.
320
321
        return (OSD with the smallest such value, the smallest value)
        """
322
323
        best_processing_time = None
        best_processing_time_osd = -1
324
        for one_osd in self.OSDs.values():
325
326
327
328
329
            processing_time = (one_osd.total_folder_size + folder_size) / one_osd.bandwidth
            if (best_processing_time is None) or processing_time < best_processing_time_osd:
                best_processing_time = one_osd
                best_processing_time_osd = processing_time
        return best_processing_time, best_processing_time_osd
330

331
    def update_folder(self, folder, size):
Felix Seibert's avatar
Felix Seibert committed
332
333
334
        """
        updates the size of a given folder
        """
335
336
337
        for one_osd in self.OSDs.values():
            if folder in one_osd.folders.keys():
                one_osd.update_folder(folder, size)
338
339
                break

340
    def description(self):
Felix Seibert's avatar
Felix Seibert committed
341
342
343
        """
        generates a string describing this data distribution
        """
344
        string = ""
345
346
        for one_osd in self.OSDs.values():
            string += str(one_osd)
347
            string += "\n"
348
            string += "folders : " + str(one_osd.folders)
349
            string += "\n"
350
        string += "average folder size: " + str(self.get_average_folder_size())
351
352
353
        return string

    def __str__(self):
354
355
356
357
358
        string_representation = "DataDistribution has " + str(len(self.OSDs)) \
                                + " osds: \n"
        for key, value in self.OSDs.items():
            string_representation += str(value) + " \n"
        return string_representation