Commit 144bb743 authored by Felix Seibert's avatar Felix Seibert
Browse files

refactor: bandwidths instead of capacities

parent be184d08
......@@ -34,14 +34,12 @@ class TestDataDistribution(unittest.TestCase):
distribution_felix = dataDistribution.DataDistribution()
distribution_felix.add_osd_list(create_test_osd_list(num_osds, osds_capacities))
distribution_felix.add_folders(create_test_folder_list(num_folders, folder_sizes),
osd_information=create_osd_information(num_osds, osds_capacities),
random_osd_assignment=True,
ignore_osd_capacities=True)
distribution_farouk = dataDistribution.DataDistribution()
distribution_farouk.add_osd_list(create_test_osd_list(num_osds, osds_capacities))
distribution_farouk.add_folders(create_test_folder_list(num_folders, folder_sizes),
osd_information=create_osd_information(num_osds, osds_capacities),
random_osd_assignment=True,
ignore_osd_capacities=True)
......@@ -64,17 +62,6 @@ class TestDataDistribution(unittest.TestCase):
self.assertTrue(felix_and_farouk_different)
self.assertTrue(max_osd_total_folder_size > osds_capacities[0])
def test_value_error(self):
distribution = dataDistribution.DataDistribution()
distribution.add_osd_list(create_test_osd_list(1, [0]))
try:
distribution.add_folders(create_test_folder_list(1, [1]),
random_osd_assignment=True,
ignore_osd_capacities=False)
except ValueError:
return # expect value error
self.fail("expect value error!")
def test_random_distribution_respecting_capacities(self):
# generate some random distributions and check whether they all respect the OSD capacities
num_osds = 3
......@@ -87,11 +74,11 @@ class TestDataDistribution(unittest.TestCase):
for i in range(0, 100):
distribution = dataDistribution.DataDistribution()
distribution.add_osd_list(create_test_osd_list(num_osds, osds_capacities))
distribution.set_osd_capacities(create_osd_information(num_osds, osds_capacities))
distribution.add_folders(create_test_folder_list(num_folders, folder_size),
random_osd_assignment=True,
ignore_osd_capacities=False,
osd_information=create_osd_information(num_osds, osds_capacities),
capacity=osd_capacity_key)
ignore_osd_capacities=False)
osds = distribution.get_osd_list()
total_folder_sizes = list(map(lambda x: distribution.OSDs[x].total_folder_size, osds))
......@@ -150,10 +137,13 @@ class TestDataDistribution(unittest.TestCase):
self.assertTrue(min(total_folder_sizes) == max(total_folder_sizes))
# test 1 for differently-sized OSDs
osd_capacities = [10, 20]
folder_sizes = [4, 4, 4]
osd_bandwidths_1 = [10, 20]
folder_sizes = [4, 4, 4, 4, 4, 4]
distribution = dataDistribution.DataDistribution()
distribution.add_osd_list(create_test_osd_list(num_osds, osd_capacities))
distribution.add_osd_list(create_test_osd_list(num_osds, osd_bandwidths_1))
distribution.set_osd_bandwidths(create_osd_information(num_osds, osd_bandwidths_1))
distribution.add_folders(create_test_folder_list(num_folders, folder_sizes))
osds = distribution.get_osd_list()
total_folder_sizes = list(map(lambda x: distribution.OSDs[x].total_folder_size, osds))
......@@ -161,35 +151,35 @@ class TestDataDistribution(unittest.TestCase):
# test 2 for differently-sized OSDs. the expected result is that the 4 large OSD receive 2 files each,
# while the 4 small OSDs receive no files.
osd_capacities = [10, 30]
osd_bandwidths_2 = [10, 30]
folder_sizes = [1]
num_folders = 8
distribution = dataDistribution.DataDistribution()
distribution.add_osd_list(create_test_osd_list(num_osds, osd_capacities))
distribution.add_folders(create_test_folder_list(num_folders, folder_sizes),
osd_information=create_osd_information(num_osds, osd_capacities),
ratio_parameter=osd_capacity_key)
distribution.add_osd_list(create_test_osd_list(num_osds, osd_bandwidths_2))
distribution.set_osd_bandwidths(create_osd_information(num_osds, osd_bandwidths_2))
distribution.add_folders(create_test_folder_list(num_folders, folder_sizes))
osds = distribution.get_osd_list()
total_folder_sizes = list(map(lambda x: distribution.OSDs[x].total_folder_size, osds))
self.assertEqual(0, min(total_folder_sizes))
self.assertEqual(2, max(total_folder_sizes))
def test_average_osd_load(self):
folder_sizes = [49, 123, 1, 7]
def test_average_osd_processing_time(self):
folder_sizes = [48, 123, 1, 7]
num_folders = 2
num_osds = 4
osd_capacities = [100, 150]
osd_bandwidths = [10, 15]
distribution = dataDistribution.DataDistribution()
distribution.add_osd_list(create_test_osd_list(num_osds, osd_capacities))
distribution.add_folders(create_test_folder_list(num_folders, folder_sizes),
create_osd_information(num_osds, osd_capacities),
osd_capacity_key)
distribution.add_osd_list(create_test_osd_list(num_osds, osd_bandwidths))
distribution.set_osd_bandwidths(create_osd_information(num_osds, osd_bandwidths))
average = (sum(folder_sizes) * num_folders) / (sum(osd_capacities * num_osds))
self.assertEqual(average, distribution.get_average_osd_load(create_osd_information(num_osds, osd_capacities),
osd_capacity_key))
distribution.add_folders(create_test_folder_list(num_folders, folder_sizes))
average = 3.05
self.assertEqual(average, distribution.get_average_processing_time())
def test_average_total_folder_size(self):
folder_sizes = [49, 123, 1, 7]
......@@ -203,7 +193,7 @@ class TestDataDistribution(unittest.TestCase):
osd_capacity_key)
average = (sum(folder_sizes) * num_folders) / (num_osds * len(osd_capacities))
self.assertEqual(average, distribution.get_average_total_folder_size())
self.assertEqual(average, distribution.get_average_load())
def test_rebalance_lpt(self):
folder_sizes = [1]
......@@ -213,6 +203,8 @@ class TestDataDistribution(unittest.TestCase):
distribution = dataDistribution.DataDistribution()
distribution.add_osd_list(create_test_osd_list(num_osds, osd_capacities))
distribution.set_osd_capacities(create_osd_information(num_osds, osd_capacities))
distribution.add_folders(create_test_folder_list(num_folders, folder_sizes), random_osd_assignment=True)
distribution.rebalance_lpt()
......@@ -224,26 +216,30 @@ class TestDataDistribution(unittest.TestCase):
def test_rebalance_one_folder(self):
folder_sizes = [1]
num_folders = 8
osd_capacities = [10]
osd_bandwidths = [10]
num_osds = 4
distribution = dataDistribution.DataDistribution()
distribution.add_osd_list(create_test_osd_list(num_osds, osd_capacities))
distribution.add_osd_list(create_test_osd_list(num_osds, osd_bandwidths))
distribution.add_folders(create_test_folder_list(num_folders, folder_sizes), random_osd_assignment=True)
distribution.rebalance_one_folder()
osds = distribution.get_osd_list()
total_folder_sizes = list(map(lambda x: distribution.OSDs[x].total_folder_size, osds))
# we should obtain a perfectly balanced distribution
self.assertEqual(min(total_folder_sizes), max(total_folder_sizes))
osd_capacities = [10, 30]
osd_bandwidths = [10, 30]
folder_sizes = [1]
num_folders = 8
distribution = dataDistribution.DataDistribution()
distribution.add_osd_list(create_test_osd_list(num_osds, osd_capacities))
distribution.add_osd_list(create_test_osd_list(num_osds, osd_bandwidths))
distribution.set_osd_bandwidths(create_osd_information(num_osds, osd_bandwidths))
distribution.add_folders(create_test_folder_list(num_folders, folder_sizes), random_osd_assignment=True)
distribution.rebalance_one_folder(osd_information=create_osd_information(num_osds, osd_capacities),
capacity=osd_capacity_key)
distribution.rebalance_one_folder()
osds = distribution.get_osd_list()
total_folder_sizes = list(map(lambda x: distribution.OSDs[x].total_folder_size, osds))
......@@ -274,8 +270,7 @@ def create_osd_information(num_osds, osd_capacities):
for i in range(0, num_osds):
for osd_capacity in osd_capacities:
osd_uuid = create_osd_id(osd_capacity, i)
osd_information[osd_uuid] = {}
osd_information[osd_uuid][osd_capacity_key] = osd_capacity
osd_information[osd_uuid] = osd_capacity
return osd_information
......
......@@ -11,12 +11,8 @@ class DataDistribution(object):
this class also allows to calculate several data distributions, e.g., mappings from folders to OSDs (each folder
gets mapped to one OSD).
the load is defined as the quotient from the total_folder_size of an OSD divided by its capacity.
"""
# TODO introduce consistent handling of (missing) OSD capacities / osd_information
def __init__(self):
self.OSDs = {}
......@@ -48,6 +44,25 @@ class DataDistribution(object):
new_osd = osd.OSD(osd_uuid)
self.OSDs[osd_uuid] = new_osd
def set_osd_capacities(self, osd_capacities):
"""
set osd capacities
:param osd_capacities: map from osd uuids to osd capacities
:return:
"""
for one_osd in self.OSDs.values():
assert type(osd_capacities[one_osd.uuid]) is int
one_osd.capacity = osd_capacities[one_osd.uuid]
def set_osd_bandwidths(self, osd_bandwidths):
"""
set osd bandwidths
:param osd_bandwidths:
:return:
"""
for one_osd in self.OSDs.values():
one_osd.bandwidth = osd_bandwidths[one_osd.uuid]
def get_osd_list(self):
"""
get a list of all existing OSD uuids.
......@@ -90,47 +105,52 @@ class DataDistribution(object):
return 0
return total_size / total_number_of_folders
def get_average_osd_load(self, osd_information, capacity):
def get_average_load(self):
"""
calculate the average OSD load, that is,
the ratio between the sum of all folder sizes and the total OSD capacity.
calculate the average OSD load, that is, the average of their total_folder_size.
"""
total_folder_size = 0
total_osd_capacity = 0
for osd_uuid in self.OSDs.keys():
total_folder_size += self.OSDs[osd_uuid].total_folder_size
total_osd_capacity += osd_information[osd_uuid][capacity]
return total_folder_size / total_osd_capacity
for osd in self.OSDs.values():
total_folder_size += osd.get_load()
return total_folder_size / len(self.OSDs)
def get_maximum_osd_load(self, osd_information, capacity):
def get_maximum_load(self):
"""
calculate the maximum OSD load.
calculate the maximum OSD load, that is, the maximum of their total_folder_size.
"""
assert osd_information is not None
assert capacity != ''
maximum_load = 0
maximum_osd = None
for osd in self.OSDs.values():
load = osd.total_folder_size / osd_information[osd.uuid][capacity]
load = osd.total_folder_size
if maximum_osd is None or load > maximum_load:
maximum_load = load
maximum_osd = osd
return maximum_osd, maximum_load
def get_average_total_folder_size(self):
def get_average_processing_time(self):
"""
calculate the average total_folder_size of the OSDs.
calculate the average OSD processing time, that is, the average of their (total_folder_size / bandwidth).
:return:
"""
total_folder_size = 0
num_osds = 0
total_processing_time = 0
for osd in self.OSDs.values():
total_folder_size += osd.total_folder_size
num_osds += 1
total_processing_time += osd.get_processing_time()
return total_processing_time / len(self.OSDs)
return total_folder_size / num_osds
def get_maximum_processing_time(self):
"""
calculate the maximum OSD processing time, also known as makespan
"""
maximum_processing_time = 0
maximum_osd = None
for osd in self.OSDs.values():
processing_time = osd.get_processing_time()
if maximum_osd is None or processing_time > maximum_processing_time:
maximum_processing_time = processing_time
maximum_osd = osd
return maximum_osd, maximum_processing_time
def add_folders(self, folders,
osd_information=None, ratio_parameter='', capacity='',
ignore_osd_capacities=True,
random_osd_assignment=False,
ignore_folder_sizes=False,
......@@ -140,17 +160,8 @@ class DataDistribution(object):
if not specified otherwise, the assignments are calculated using the LPT algorithm.
returns a list of assignments from folders to OSDs, for which (folders) there was previously no assignment.
if osd_information and ratio_parameter are given,
OSDs are assigned data proportionally to their ratio_parameter.
osd_information is a map (that we now call outer map) that contains, for each OSD, an inner_map.
outer_map[osd_uuid][ratio_parameter] is used to calculate the proportion of data assigned to OSD with uuid
osd_uuid.
if ignore_osd_capacities=False,
outer_map[osd_uuid][capacity] is used (only in combination with random_osd_assignment=True)
to generate random assignments that do not surpass the capacities of the OSDs.
(random assignment respecting OSD capacities)
if capacities and bandwidths are set for the OSDs, folders are assigned accordingly
(capacities are respected and OSDs with higher bandwidth obtain more/larger files).
if random_osd_assignment=True and ignore_osd_capacities=True, a totally random OSD assignment generated.
......@@ -190,15 +201,12 @@ class DataDistribution(object):
# random OSD assignment respecting OSD capacities
elif random_osd_assignment and not ignore_osd_capacities:
if osd_information is None or capacity == '':
raise ValueError("ignore_osd_capacities=False is not possible if osd_information or capacity is"
"not given!")
if debug:
print("using random osd assignment, respecting osd capacities")
for a_folder in new_folders:
suitable_osds = [] # list of OSDs with enough capacity
for one_osd in self.OSDs.values():
if osd_information[one_osd.uuid][capacity] - one_osd.total_folder_size - a_folder.size >= 0:
if one_osd.capacity - one_osd.total_folder_size - a_folder.size >= 0:
suitable_osds.append(one_osd)
suitable_random_osd = random.choice(suitable_osds)
suitable_random_osd.add_folder(a_folder.id, a_folder.size)
......@@ -206,7 +214,7 @@ class DataDistribution(object):
suitable_random_osd.uuid))
return osds_for_new_folders
# random OSD assignment ignoring folder sizes
# random OSD assignment ignoring folder sizes // round-robin style distribution with some randomness
elif random_osd_assignment and ignore_folder_sizes:
if debug:
print("using random osd assignment ignoring folder sizes")
......@@ -223,57 +231,56 @@ class DataDistribution(object):
# (following largest processing time first, also called post-greedy approach)
list.sort(new_folders, key=lambda x: x.size, reverse=True)
# if osd_information is None, use the fake osd_information, which assumes that all OSDs have the same capacity
# otherwise use the given osd_information
if osd_information is None:
ratio_parameter = 'dummy_value'
osd_information = self.get_equal_sized_fake_osd_information(ratio_parameter)
# for each folder calculate the best OSD and add it to it
for a_folder in new_folders:
least_used_osd, _ = self.get_lpt_osd(osd_information, ratio_parameter, a_folder.size)
least_used_osd, _ = self.get_lpt_osd(a_folder.size)
least_used_osd.add_folder(a_folder.id, a_folder.size)
osds_for_new_folders.append((a_folder.id,
least_used_osd.uuid))
return osds_for_new_folders
def rebalance_lpt(self, rebalance_factor=1, osd_information=None, capacity=''):
def rebalance_lpt(self, rebalance_factor=1):
"""
rebalance folders to OSDs by assigning folders to new OSDs using the following strategy:
1. 'unroll' the assignment. this means that, for each OSD, folders are removed until the OSD has less
total_folder_size than the average total folder size of this distribution multiplied by rebalance_factor.
processing time than the average processing time of this distribution multiplied by rebalance_factor.
2. reassign the removed folders using the LPT strategy.
"""
movements = {}
folders_to_be_reassigned = []
reassignment_factor = self.get_average_osd_load(osd_information, capacity) * rebalance_factor
# TODO reassignment factor based on load or based on processing time?
# as long as we use OSDs with the same bandwidth, there is no difference.
reassignment_limit = self.get_average_processing_time() * rebalance_factor
# for each OSD, remove the smallest folder until its total_folder_size does not exceed the reassignment_limit
# unrolling
for osd in self.OSDs.values():
while osd.total_folder_size > reassignment_factor * osd_information[osd.uuid][capacity]:
# TODO how to calculate the 'unrolling limit' for each individual OSD?
# again, as long as all OSDs have the same bandwidth, there is no difference.
# BUT it should definitely not depend on the load or pt of the OSD.
# so for now we use a static limit, the same for all OSDs.
while osd.get_processing_time() > reassignment_limit:
folder_id, folder_size = osd.get_smallest_folder()
folders_to_be_reassigned.append(folder.Folder(folder_id, folder_size, None))
movements[folder_id] = osd.uuid
osd.remove_folder(folder_id)
# reassignment
new_assignments = self.add_folders(folders_to_be_reassigned,
osd_information=osd_information, ratio_parameter=capacity)
new_assignments = self.add_folders(folders_to_be_reassigned)
for folder_id, target in new_assignments:
movements[folder_id] = (movements[folder_id], target)
return movements
def rebalance_one_folder(self, osd_information=None, capacity=''):
def rebalance_one_folder(self):
"""
rebalance folders to OSDs by assigning folders to new OSDs using the following strategy:
1. find OSD with the highest load
1. find OSD with the highest processing time
2. get folder with smallest size on this OSD
3. find new OSD for this folder using get_lpt_osd
4. if the load on the new OSD is lower than on the original OSD, move the folder to the new OSD.
otherwise, return.
4. if the processing time on the new OSD is lower than on the original OSD,
move the folder to the new OSD. otherwise, return.
one open question is whether getting the folder with smallest size in step 2 is a clever choice
(in principle, all folders of the OSD with the highest load are eligible).
......@@ -283,16 +290,11 @@ class DataDistribution(object):
but it might be possible to show that if there is no improvement step of the type that we check for,
there is no improvement step at all.
"""
if osd_information is None:
capacity_key = 'capacity'
osd_information = self.get_equal_sized_fake_osd_information(capacity_key)
capacity = capacity_key
movements = {}
while True:
# find OSD with the highest load (origin)
origin_osd, maximum_load = self.get_maximum_osd_load(osd_information, capacity)
# find OSD with the highest processing time (origin)
origin_osd, maximum_processing_time = self.get_maximum_processing_time()
# pick a folder of this OSD
# there are several ways to pick a folder (like largest, smallest, constrained by the resulting load of the
......@@ -302,9 +304,9 @@ class DataDistribution(object):
# find other OSD best suited for the picked folder (target)
# check whether moving folder from origin to target decreases the maximum load of all OSDs (makespan).
best_osd, best_osd_load = self.get_lpt_osd(osd_information, capacity, smallest_folder_size)
best_osd, best_osd_processing_time = self.get_lpt_osd(smallest_folder_size)
if best_osd_load < maximum_load:
if best_osd_processing_time < maximum_processing_time:
self.assign_new_osd(smallest_folder_id, best_osd.uuid)
movements[smallest_folder_id] = (origin_osd.uuid, best_osd.uuid)
else:
......@@ -312,28 +314,19 @@ class DataDistribution(object):
return movements
def get_lpt_osd(self, osd_information, ratio_parameter, folder_size):
def get_lpt_osd(self, folder_size):
"""
calculate the load of all OSDs, using the sum of their current total_folder_size and folder_size.
calculate the processing time of all OSDs, using the sum of their current total_folder_size and folder_size.
return (OSD with the smallest such value, the smallest value)
"""
least_used_osd = None
best_load_so_far = 1
best_processing_time = None
best_processing_time_osd = -1
for one_osd in self.OSDs.values():
one_osd_load = (one_osd.total_folder_size + folder_size) / osd_information[one_osd.uuid][ratio_parameter]
if (least_used_osd is None) or one_osd_load < best_load_so_far:
least_used_osd = one_osd
best_load_so_far = one_osd_load
return least_used_osd, best_load_so_far
def create_osd_ratios(self, osd_information, ratio_parameter):
osd_ratios = {} # ratios are given - use them to assign proportionally
total_osd_size = 0
for osd_size in osd_information.values():
total_osd_size += osd_size[ratio_parameter]
for osd_uuid, osd_size in osd_information.items():
osd_ratios[osd_uuid] = float(osd_size[ratio_parameter]) / float(total_osd_size)
return osd_ratios
processing_time = (one_osd.total_folder_size + folder_size) / one_osd.bandwidth
if (best_processing_time is None) or processing_time < best_processing_time_osd:
best_processing_time = one_osd
best_processing_time_osd = processing_time
return best_processing_time, best_processing_time_osd
def update_folder(self, folder, size):
"""
......@@ -344,13 +337,6 @@ class DataDistribution(object):
one_osd.update_folder(folder, size)
break
def get_equal_sized_fake_osd_information(self, capacity):
osd_information = {}
for osd_uuid in self.get_osd_list():
osd_information[osd_uuid] = {}
osd_information[osd_uuid][capacity] = 1
return osd_information
def description(self):
"""
generates a string describing this data distribution
......
import sys
class OSD(object):
"""
representation of an Object Storage device. the OSD is identified by its uuid.
it keeps track of the folders saved on the OSD as well as the size of the folders.
"""
def __init__(self, uuid: str):
def __init__(self, uuid: str, bandwidth=1, capacity=sys.maxsize):
if not isinstance(uuid, str):
raise ValueError("OSD uuid must be str!")
self.uuid = uuid
self.bandwidth = bandwidth
self.capacity = capacity
self.total_folder_size = 0
self.folders = {}
def add_folder(self, folder_id, folder_size):
assert self.total_folder_size + folder_size <= self.capacity
if folder_id not in self.folders:
self.folders[folder_id] = folder_size
else:
......@@ -40,7 +47,14 @@ class OSD(object):
return smallest_id, smallest_size
def get_load(self):
return self.total_folder_size
def get_processing_time(self):
return self.total_folder_size / self.bandwidth
def __str__(self):
return "osd: '" + self.uuid \
+ "' totalFolderSize: " + str(self.total_folder_size) \
+ " processing time: " + str(self.get_processing_time()) \
+ " number of folders: " + str(len(self.folders))
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment