Commit a2b05e2c authored by Felix Seibert's avatar Felix Seibert
Browse files

implement two step rebalancing algorithm

parent 4ad8c91f
......@@ -12,7 +12,7 @@ with open('HISTORY.rst') as history_file:
history = history_file.read()
requirements = [
# TODO: put package requirements here
'ortools'
]
setup_requirements = [
......
import random
import copy
from ortools.graph import pywrapgraph
from xtreemfs_client import osd
from xtreemfs_client import folder
......@@ -314,6 +317,103 @@ class DataDistribution(object):
return movements
def rebalance_two_steps(self):
"""
rebalance the distribution in two steps:
1. calculate new distribution, independently of the current one
2. use a minimum weight matching to transform the current distribution into the new distribution.
minimum weight perfect matching on bipartite graphs can be solved using the successive shortest path
algorithm.
while any algorithm (solving/approximating that kind of problem) could be used for the first step,
we here only implement the LPT algorithm, as it is a pretty good approximation with extremely good running time.
:return:
"""
virtual_distribution = copy.deepcopy(self)
virtual_distribution.rebalance_lpt(rebalance_factor=0)
# create a mincostflow object
min_cost_flow = pywrapgraph.SimpleMinCostFlow()
# define the directed graph for the flow
# arcs are added individually, and are added implicitly
# nodes (OSDs) have to be given by numeric id
# so we need some conversion logic between current/virtual osds and node ids
current_osds_list = list(self.OSDs.values())
current_osds_list.sort(key=lambda x: x.uuid)
virtual_osds_list = list(virtual_distribution.OSDs.values())
virtual_osds_list.sort(key=lambda x: x.uuid)
# conversion logic:
# n = len(current_osd_list) = len(virtual_osd_list)
# 0 = source, 1 = sink
# 2, ..., n + 1: current OSDs
# n + 2, ..., 2n + 1: virtual OSDs
num_osds = len(current_osds_list)
assert num_osds == len(virtual_osds_list)
# edges between the two partitions
for i in range(0, num_osds):
for j in range(0, num_osds):
current_osd = current_osds_list[i]
virtual_osd = virtual_osds_list[j]
# calculate the total size of folders that the current OSD has to fetch if the virtual OSD is assigned
# to it
edge_cost = 0
for folder_id in virtual_osd.folders.keys():
if not current_osd.contains_folder(folder_id):
edge_cost += virtual_osd.folders[folder_id]
tail = 2 + i # current OSD
head = num_osds + 2 + j # virtual OSD
min_cost_flow.AddArcWithCapacityAndUnitCost(tail, head, 1, edge_cost)
# (artificial) edges between the source node and the current OSDs
for i in range(0, num_osds):
edge_cost = 0
tail = 0
head = i + 2
min_cost_flow.AddArcWithCapacityAndUnitCost(tail, head, 1, edge_cost)
# (artificial) edges between the virtual OSDs and the sink node
for j in range(0, num_osds):
edge_cost = 0
tail = num_osds + 2 + j
head = 1
min_cost_flow.AddArcWithCapacityAndUnitCost(tail, head, 1, edge_cost)
# define the supplies (which equals the number of OSDs)
min_cost_flow.SetNodeSupply(0, num_osds)
min_cost_flow.SetNodeSupply(1, -num_osds)
# solve the min cost flow problem
min_cost_flow.Solve()
# we need to transform the calculated optimal assignment into a rebalanced distribution, including the necessary
# movements
current_to_virtual_osd_matching = []
for arc in range(min_cost_flow.NumArcs()):
tail = min_cost_flow.Tail(arc)
head = min_cost_flow.Head(arc)
if tail != 0 and head != 1 and min_cost_flow.Flow(arc) == 1:
current_osd = current_osds_list[tail - 2]
virtual_osd = virtual_osds_list[head - num_osds - 2]
current_to_virtual_osd_matching.append((current_osd, virtual_osd))
movements = {}
for current_osd, virtual_osd in current_to_virtual_osd_matching:
# iterate over virtual folders and check whether they are on the correct OSD.
# the correct OSD is current_osd, as it is the one that is matched with virtual_osd.
# if it is not present on current_osd, assign it to it.
# this also removes it from the origin osd.
for virtual_folder in virtual_osd.folders.keys():
if not current_osd.contains_folder(virtual_folder):
origin_osd = self.get_containing_osd(virtual_folder).uuid
target_osd = current_osd.uuid
movements[virtual_folder] = (origin_osd, target_osd)
self.assign_new_osd(virtual_folder, target_osd)
return movements
def get_lpt_osd(self, folder_size):
"""
calculate the processing time of all OSDs, using the sum of their current total_folder_size and folder_size.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment