Commit 6d04cb2c authored by Felix Seibert's avatar Felix Seibert
Browse files

refactor: fix_internally now completely happens within a therefore created class.

parent bc4ce08e
......@@ -174,7 +174,10 @@ class OSDManager(object):
start_time = time.time()
if fix_layout_internally:
self.fix_physical_layout_internally(max_files_in_progress=max_files_in_progress)
placement_realizer = \
physicalPlacementRealizer.PhysicalPlacementRealizer(self, debug=self.debug,
max_files_in_progress=max_files_in_progress)
placement_realizer.realize_placement()
else:
if environment == 'SLURM':
osd_list = self.distribution.get_osd_list()
......@@ -209,35 +212,6 @@ class OSDManager(object):
osd_for_folder = self.distribution.get_containing_osd(folder_id)
self.move_folder_to_osd(folder_id, osd_for_folder.uuid)
def fix_physical_layout_internally(self, repeat_delete_interval_secs=15, iteration=0, max_files_in_progress=10000):
"""
fixes the physical layout, such that it matches the data distribution described in self.distribution
we use the following strategy: first, determine which files needs to be moved to another OSD, and create three lists.
for each file to be moved, one contains the command to set the default replication policy (1), one contains the command
to create a new replica on the target OSD (2) and one contains the command to delete the replica on the original OSD (3).
the command-lists are executed in order: 1, 2, 3, commands of list i only being executed if all commands of list i-1
have returned. the commands of any one list are executed in parallel, with an upper bound of 200 parallel executions
to not violate the maximum number of processes of the OS.
the commands of list 3 might return an error, as deleting the replica on the original OSD is not possible until
the new replica on the target OSD is complete. theses commands are collected and re-executed after a certain delay.
this process is repeated until no error is returned, which should eventually happen (as soon as all new replicas on
the target OSDs are complete).
"""
if self.debug:
print("starting to fix physical layout...this is fix-iteration " + str(iteration))
placement_realizer = physicalPlacementRealizer.PhysicalPlacementRealizer(self, debug=self.debug,
max_files_in_progress=max_files_in_progress)
placement_realizer.calculate_files_to_be_moved()
placement_realizer.move_files_randomly()
# end of recursion condition - no files left to be moved
if len((placement_realizer.files_to_be_moved.keys())) == 0:
return
self.fix_physical_layout_internally(iteration=iteration + 1)
def create_empty_folders(self, folders):
"""
create empty folders and assign OSDs.
......
......@@ -18,22 +18,52 @@ class FileToMove(object):
self.create_replica_command = create_replica_command
self.delete_replica_command = delete_replica_command
max_processes_change_policy = 200
max_processes_add_replica = 200
max_processes_delete_replica = 200
class PhysicalPlacementRealizer(object):
def __init__(self, osd_manager: OSDManager, debug=False, repeat_delete_interval_secs=15,
max_files_in_progress=10000):
max_files_in_progress=10000, max_execute_repetitions=5):
self.osd_manager = osd_manager
self.files_to_be_moved = {}
self.iterations = 0
self.max_total_files_in_progress = max_files_in_progress
self.debug = debug
self.repeat_delete_interval_secs = repeat_delete_interval_secs
self.max_execute_repetitions = max_execute_repetitions
def realize_placement(self, strategy='random'):
"""
fixes the physical layout, such that it matches the data distribution described in self.distribution
we use the following strategy: first, determine which files needs to be moved to another OSD, and create three lists.
for each file to be moved, one contains the command to set the default replication policy (1), one contains the command
to create a new replica on the target OSD (2) and one contains the command to delete the replica on the original OSD (3).
the command-lists are executed in order: 1, 2, 3, commands of list i only being executed if all commands of list i-1
have returned. the commands of any one list are executed in parallel, with an upper bound of 200 parallel executions
to not violate the maximum number of processes of the OS.
the commands of list 3 might return an error, as deleting the replica on the original OSD is not possible until
the new replica on the target OSD is complete. theses commands are collected and re-executed after a certain delay.
this process is repeated until no error is returned, which should eventually happen (as soon as all new replicas on
the target OSDs are complete).
"""
iteration = 0
self.calculate_files_to_be_moved()
while len(list(self.files_to_be_moved.keys())) > 0:
if self.debug:
print("starting to fix physical layout...this is fix-iteration " + str(iteration))
def realize_placement(self):
pass
if strategy == 'osd_balanced':
# TODO implement
pass
elif strategy == 'random':
self.move_files_randomly()
self.update_files_to_be_moved()
iteration += 1
def get_list_of_all_files_to_be_moved(self):
files_to_be_moved_list = []
......@@ -43,7 +73,11 @@ class PhysicalPlacementRealizer(object):
return files_to_be_moved_list
def update_files_to_be_moved(self):
self.calculate_files_to_be_moved()
def calculate_files_to_be_moved(self):
self.files_to_be_moved = {}
managed_folders = self.osd_manager.get_depth_2_subdirectories()
for managed_folder in managed_folders:
for directory in os.walk(managed_folder):
......@@ -59,7 +93,7 @@ class PhysicalPlacementRealizer(object):
file_on_correct_osd = False
osd_of_file = None # this assignment will always be overwritten,
# as there cannot be files in XtreemFS that do not have an OSD
# as there cannot be files in XtreemFS that do not have an OSD
for osd_of_file in osds_of_file:
if osd_of_file != osd_for_file:
# delete all replicas on wrong OSDs
......@@ -120,81 +154,87 @@ class PhysicalPlacementRealizer(object):
if len(files_to_be_moved) == 0:
break
# run sublist of commands the old way
start_time = time.time()
if self.debug:
print("starting execution of " + str(len(change_policy_command_list)) + " change policy commands...")
print(str(datetime.datetime.now()))
errored_processes = div_util.run_commands(change_policy_command_list, max_processes_change_policy)
end_time = time.time()
if self.debug:
print("executing " + str(len(change_policy_command_list)) + " change policy commands done in " +
str(round(end_time - start_time)) + " sec.")
# div_util.print_process_list(processes)
self.execute_command_list(change_policy_command_list,
create_replica_command_list,
delete_replica_command_list)
def execute_command_list(self, change_policy_command_list, create_replica_command_list,
delete_replica_command_list):
start_time = time.time()
if self.debug:
print("starting execution of " + str(len(change_policy_command_list)) + " change policy commands...")
print(str(datetime.datetime.now()))
errored_processes = div_util.run_commands(change_policy_command_list, max_processes_change_policy)
end_time = time.time()
if self.debug:
print("executing " + str(len(change_policy_command_list)) + " change policy commands done in " +
str(round(end_time - start_time)) + " sec.")
# div_util.print_process_list(processes)
start_time = time.time()
if self.debug:
print("starting execution of " + str(len(create_replica_command_list)) + " create replica commands...")
print(str(datetime.datetime.now()))
random.shuffle(create_replica_command_list)
errored_processes = div_util.run_commands(create_replica_command_list, max_processes_add_replica)
end_time = time.time()
if self.debug:
print("executing " + str(len(create_replica_command_list)) + " create replica commands done in " +
str(round(end_time - start_time)) + " sec.")
# div_util.print_process_list(processes)
start_time = time.time()
if self.debug:
print("starting execution of " + str(len(delete_replica_command_list)) + " delete replica commands...")
print(str(datetime.datetime.now()))
errored_processes = div_util.run_commands(delete_replica_command_list, max_processes_delete_replica,
print_errors=False)
# run and repeat delete commands, until they return no error
# (if an error is returned for another reason than that one would delete the last complete replica,
# this will probably not work.
iterations = 0
while True:
if iterations >= self.max_execute_repetitions:
print("results of last iteration: ")
div_util.print_process_list(errored_processes)
print("Original replicas could not be deleted after " + str(
self.max_execute_repetitions) + ". Aborting...")
break
start_time = time.time()
if self.debug:
print("starting execution of " + str(len(create_replica_command_list)) + " create replica commands...")
print(str(datetime.datetime.now()))
random.shuffle(create_replica_command_list)
errored_processes = div_util.run_commands(create_replica_command_list, max_processes_add_replica)
end_time = time.time()
if self.debug:
print("executing " + str(len(create_replica_command_list)) + " create replica commands done in " +
str(round(end_time - start_time)) + " sec.")
print("executing " + str(len(delete_replica_command_list))
+ " delete replica commands done. This is delete-iteration "
+ str(iterations))
# div_util.print_process_list(processes)
start_time = time.time()
if self.debug:
print("starting execution of " + str(len(delete_replica_command_list)) + " delete replica commands...")
print(str(datetime.datetime.now()))
errored_processes = div_util.run_commands(delete_replica_command_list, max_processes_delete_replica,
print_errors=False)
errored_deletions = []
# run and repeat delete commands, until they return no error
# (if an error is returned for another reason than that one would delete the last complete replica,
# this will probably not work.
iterations = 0
max_iterations = 5
while True:
if iterations >= max_iterations:
print("results of last iteration: ")
div_util.print_process_list(errored_processes)
print("Original replicas could not be deleted after " + str(max_iterations) + ". Aborting...")
break
for process in errored_processes:
# check the return code. if it is one, the replica could not be deleted, so we try again later.
if process[2] != 0:
errored_deletions.append(process[0])
if self.debug:
print("errored command: ")
print("command: " + str(process[0]))
print("stdoud: " + str(process[1][0]))
print("stderr: " + str(process[1][1]))
print("retcode: " + str(process[2]))
if self.debug:
print("executing " + str(len(delete_replica_command_list))
+ " delete replica commands done. This is delete-iteration "
+ str(iterations))
# div_util.print_process_list(processes)
errored_deletions = []
for process in errored_processes:
# check the return code. if it is one, the replica could not be deleted, so we try again later.
if process[2] != 0:
errored_deletions.append(process[0])
if self.debug:
print("errored command: ")
print("command: " + str(process[0]))
print("stdoud: " + str(process[1][0]))
print("stderr: " + str(process[1][1]))
print("retcode: " + str(process[2]))
if len(errored_deletions) == 0:
break
if len(errored_deletions) == 0:
break
time.sleep(self.repeat_delete_interval_secs)
time.sleep(self.repeat_delete_interval_secs)
if self.debug:
print("rerunning " + str(
len(errored_deletions)) + " commands because replica could not be deleted...")
if self.debug:
print("rerunning " + str(
len(errored_deletions)) + " commands because replica could not be deleted...")
errored_processes = div_util.run_commands(errored_deletions, max_processes_change_policy,
print_errors=False)
iterations += 1
errored_processes = div_util.run_commands(errored_deletions, max_processes_change_policy,
print_errors=False)
iterations += 1
if self.debug:
end_time = time.time()
print("deleting replicas done in in " + str(round(end_time - start_time)) + " sec.")
\ No newline at end of file
if self.debug:
end_time = time.time()
print("deleting replicas done in in " + str(round(end_time - start_time)) + " sec.")
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment