Commit 2f28c327 authored by Felix Seibert's avatar Felix Seibert
Browse files

add method to fix physical layout in an OSD-balanced fashion

parent d095ba98
......@@ -26,16 +26,17 @@ max_processes_delete_replica = 200
class PhysicalPlacementRealizer(object):
def __init__(self, osd_manager: OSDManager, debug=False, repeat_delete_interval_secs=15,
max_files_in_progress=10000, max_execute_repetitions=5):
max_files_in_progress=10000, max_files_in_progress_per_osd=200, max_execute_repetitions=5):
self.osd_manager = osd_manager
self.files_to_be_moved = {}
self.iterations = 0
self.max_total_files_in_progress = max_files_in_progress
self.max_files_in_progress_total = max_files_in_progress
self.max_files_in_progress_per_osd = max_files_in_progress_per_osd
self.debug = debug
self.repeat_delete_interval_secs = repeat_delete_interval_secs
self.max_execute_repetitions = max_execute_repetitions
def realize_placement(self, strategy='random'):
def realize_placement(self, strategy='osd_balanced'):
"""
fixes the physical layout, such that it matches the data distribution described in self.distribution
we use the following strategy: first, determine which files needs to be moved to another OSD, and create three lists.
......@@ -58,8 +59,7 @@ class PhysicalPlacementRealizer(object):
print("starting to fix physical layout...this is fix-iteration " + str(iteration))
if strategy == 'osd_balanced':
# TODO implement
pass
self.move_files_osd_balanced()
elif strategy == 'random':
self.move_files_randomly()
self.update_files_to_be_moved()
......@@ -139,6 +139,42 @@ class PhysicalPlacementRealizer(object):
self.files_to_be_moved[movement_key] = []
self.files_to_be_moved[movement_key].append(file_to_move)
def get_next_files(self, movement_key):
"""
get the next files to be moved from self.files_to_be_moved, that are contained in the list found at movement_key
if any list becomes empty, it is removed from self.files_to_be_moved
:param movement_key:
:return:
"""
next_files_to_move = []
num_files = 0
while len(self.files_to_be_moved[movement_key]) > 0 and num_files < self.max_files_in_progress_per_osd:
next_files_to_move.append(self.files_to_be_moved[movement_key].pop())
num_files += 1
if len(self.files_to_be_moved[movement_key]) == 0:
del self.files_to_be_moved[movement_key]
return next_files_to_move
def move_files_osd_balanced(self):
"""
executes the necessary commands in order to move all files in self.files_to_be_moved to their target OSD.
files are processed in such order that OSDs are more or less equally loaded at any point of time.
:return:
"""
num_movement_keys = len(list(self.files_to_be_moved.keys()))
if num_movement_keys * self.max_files_in_progress_per_osd > self.max_files_in_progress_total:
self.max_files_in_progress_per_osd = int(self.max_files_in_progress_total / num_movement_keys)
if self.debug:
print("setting max_files_in_progress_per_osd on: " + str(self.max_files_in_progress_per_osd))
while len(list(self.files_to_be_moved.keys())) > 0:
files_to_move_now = []
for movement_key in list(self.files_to_be_moved.keys()):
files_to_move_now.extend(self.get_next_files(movement_key))
random.shuffle(files_to_move_now)
self.execute_command_list(self.transform_files_to_move_into_three_command_lists(files_to_move_now))
def move_files_randomly(self):
"""
executes the necessary commands in order to move all files in self.files_to_be_moved to their target OSD.
......@@ -147,49 +183,49 @@ class PhysicalPlacementRealizer(object):
:return:
"""
files_to_be_moved = self.get_list_of_all_files_to_be_moved()
if self.debug:
print("number of files that need to be moved: " + str(len(files_to_be_moved)))
random.shuffle(files_to_be_moved)
while len(files_to_be_moved) > 0:
# list 1
change_policy_command_list = []
# list 2
create_replica_command_list = []
# list 3
delete_replica_command_list = []
for i in range(0, self.max_total_files_in_progress):
file_to_be_moved = files_to_be_moved.pop()
if file_to_be_moved.policy_command is not None:
change_policy_command_list.append(file_to_be_moved.policy_command)
if file_to_be_moved.create_replica_command is not None:
create_replica_command_list.append(file_to_be_moved.create_replica_command)
if file_to_be_moved.delete_replica_command is not None:
delete_replica_command_list.append(file_to_be_moved.delete_replica_command)
files_to_be_moved_now = []
for i in range(0, self.max_files_in_progress_total):
files_to_be_moved_now.append(files_to_be_moved.pop())
if len(files_to_be_moved) == 0:
break
self.execute_command_list(self.transform_files_to_move_into_three_command_lists(files_to_be_moved_now))
def transform_files_to_move_into_three_command_lists(self, files_to_move):
change_policy_command_list = []
create_replica_command_list = []
delete_replica_command_list = []
self.execute_command_list(change_policy_command_list,
create_replica_command_list,
delete_replica_command_list)
for file_to_be_moved in files_to_move:
if file_to_be_moved.policy_command is not None:
change_policy_command_list.append(file_to_be_moved.policy_command)
if file_to_be_moved.create_replica_command is not None:
create_replica_command_list.append(file_to_be_moved.create_replica_command)
if file_to_be_moved.delete_replica_command is not None:
delete_replica_command_list.append(file_to_be_moved.delete_replica_command)
def execute_command_list(self, change_policy_command_list, create_replica_command_list,
delete_replica_command_list):
return change_policy_command_list, create_replica_command_list, delete_replica_command_list
def execute_command_list(self, command_list_triple):
"""
executes three list of commands, that are typically needed for moving files in XtreemFS from their current OSD
executes three list of commands, given in the that are typically needed for moving files in XtreemFS from their current OSD
to the designated target OSD. the lists are treated as their order in the argument list. commands from the next
list are only executed when all commands of the previous list have returned (or been killed).
deletion commands are repeated a number of times, as the target OSD requires some time to create the new replica
(for fetching all objects, i.e., file content).
:param change_policy_command_list: list of change replication policy commands to be executed.
:param create_replica_command_list: list of create replica commands to be executed.
:param delete_replica_command_list: list of delete replica commands to be executed.
:param a triple (change_policy_command_list, create_replica_command_list, delete_replica_command_list)
with: change_policy_command_list:list of change replication policy commands to be executed.
create_replica_command_list: list of create replica commands to be executed.
delete_replica_command_list: list of delete replica commands to be executed.
:return:
"""
change_policy_command_list = command_list_triple[0]
create_replica_command_list = command_list_triple[1]
delete_replica_command_list = command_list_triple[2]
start_time = time.time()
if self.debug:
print("starting execution of " + str(len(change_policy_command_list)) + " change policy commands...")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment