Commit bc4ce08e authored by Felix Seibert's avatar Felix Seibert
Browse files

refactor: move most of fix_layout code to new class

parent 2d5b6d9d
......@@ -12,6 +12,7 @@ from xtreemfs_client import dataDistribution
from xtreemfs_client import div_util
from xtreemfs_client import folder
from xtreemfs_client import dirstatuspageparser
from xtreemfs_client import physicalPlacementRealizer
'''
xOSDManager - a python module to manage OSD selection in XtreemFS
......@@ -226,153 +227,14 @@ class OSDManager(object):
if self.debug:
print("starting to fix physical layout...this is fix-iteration " + str(iteration))
# one element of this list should look like this: (policy_command, create_command, delete_command).
# None is an accepted value.
command_tuple_list = []
# create commands
managed_folders = self.get_depth_2_subdirectories()
for managed_folder in managed_folders:
for directory in os.walk(managed_folder):
for filename in directory[2]:
policy_command = None
create_command = None
delete_command = None
absolute_file_path = os.path.join(directory[0], filename)
osds_of_file = div_util.get_osd_uuids(absolute_file_path)
path_on_volume = self.get_path_on_volume(absolute_file_path)
containing_folder_id = self.get_containing_folder_id(path_on_volume)
osd_for_file = self.distribution.get_containing_osd(containing_folder_id).uuid
file_on_correct_osd = False
for osd_of_file in osds_of_file:
if osd_of_file != osd_for_file:
# delete all replicas on wrong OSDs
delete_command = div_util.create_delete_replica_command(absolute_file_path, osd_of_file)
else:
file_on_correct_osd = True
if not file_on_correct_osd and len(osds_of_file) < 2:
# only one replica on a wrong OSD => need to set replication policy.
# otherwise, there is a unique replica on the correct OSD => no change necessary,
# OR there are multiple replicas => replication policy must be set.
policy_command = div_util.create_replication_policy_command(absolute_file_path)
if not file_on_correct_osd:
# create a replica on the correct osd
create_command = div_util.create_create_replica_command(absolute_file_path, osd_for_file)
# in python, strings are also booleans!!! :)
if policy_command or create_command or delete_command:
command_tuple_list.append((policy_command, create_command, delete_command))
# end of recursion condition
if len(command_tuple_list) == 0:
return
if self.debug:
print("number of files that need to be moved: " + str(len(command_tuple_list)))
# run commands the new way
random.shuffle(command_tuple_list)
while len(command_tuple_list) > 0:
# list 1
change_policy_command_list = []
# list 2
create_replica_command_list = []
# list 3
delete_replica_command_list = []
for i in range(0, max_files_in_progress):
policy_command, create_command, delete_command = command_tuple_list.pop()
if policy_command is not None:
change_policy_command_list.append(policy_command)
if create_command is not None:
create_replica_command_list.append(create_command)
if delete_command is not None:
delete_replica_command_list.append(delete_command)
if len(command_tuple_list) == 0:
break
# run sublist of commands the old way
start_time = time.time()
if self.debug:
print("starting execution of " + str(len(change_policy_command_list)) + " change policy commands...")
print(str(datetime.datetime.now()))
errored_processes = div_util.run_commands(change_policy_command_list, max_processes_change_policy)
end_time = time.time()
if self.debug:
print("executing " + str(len(change_policy_command_list)) + " change policy commands done in " +
str(round(end_time - start_time)) + " sec.")
# div_util.print_process_list(processes)
placement_realizer = physicalPlacementRealizer.PhysicalPlacementRealizer(self, debug=self.debug,
max_files_in_progress=max_files_in_progress)
placement_realizer.calculate_files_to_be_moved()
placement_realizer.move_files_randomly()
start_time = time.time()
if self.debug:
print("starting execution of " + str(len(create_replica_command_list)) + " create replica commands...")
print(str(datetime.datetime.now()))
random.shuffle(create_replica_command_list)
errored_processes = div_util.run_commands(create_replica_command_list, max_processes_add_replica)
end_time = time.time()
if self.debug:
print("executing " + str(len(create_replica_command_list)) + " create replica commands done in " +
str(round(end_time - start_time)) + " sec.")
# div_util.print_process_list(processes)
start_time = time.time()
if self.debug:
print("starting execution of " + str(len(delete_replica_command_list)) + " delete replica commands...")
print(str(datetime.datetime.now()))
errored_processes = div_util.run_commands(delete_replica_command_list, max_processes_delete_replica,
print_errors=False)
# run and repeat delete commands, until they return no error
# (if an error is returned for another reason than that one would delete the last complete replica,
# this will probably not work.
iterations = 0
max_iterations = 5
while True:
if iterations >= max_iterations:
print("results of last iteration: ")
div_util.print_process_list(errored_processes)
print("Original replicas could not be deleted after " + str(max_iterations) + ". Aborting...")
break
if self.debug:
print("executing " + str(len(delete_replica_command_list))
+ " delete replica commands done. This is delete-iteration "
+ str(iterations))
# div_util.print_process_list(processes)
errored_deletions = []
for process in errored_processes:
# check the return code. if it is one, the replica could not be deleted, so we try again later.
if process[2] != 0:
errored_deletions.append(process[0])
if self.debug:
print("errored command: ")
print("command: " + str(process[0]))
print("stdoud: " + str(process[1][0]))
print("stderr: " + str(process[1][1]))
print("retcode: " + str(process[2]))
if len(errored_deletions) == 0:
break
time.sleep(repeat_delete_interval_secs)
if self.debug:
print("rerunning " + str(
len(errored_deletions)) + " commands because replica could not be deleted...")
errored_processes = div_util.run_commands(errored_deletions, max_processes_change_policy,
print_errors=False)
iterations += 1
if self.debug:
end_time = time.time()
print("deleting replicas done in in " + str(round(end_time - start_time)) + " sec.")
# end of recursion condition - no files left to be moved
if len((placement_realizer.files_to_be_moved.keys())) == 0:
return
self.fix_physical_layout_internally(iteration=iteration + 1)
......
import datetime
import os
import random
import time
from xtreemfs_client import OSDManager
from xtreemfs_client import div_util
class FileToMove(object):
def __init__(self, absolute_file_path, origin_osd, target_osd,
policy_command, create_replica_command, delete_replica_command):
self.absolute_file_path = absolute_file_path
self.origin_osd = origin_osd
self.target_osd = target_osd
self.policy_command = policy_command
self.create_replica_command = create_replica_command
self.delete_replica_command = delete_replica_command
max_processes_change_policy = 200
max_processes_add_replica = 200
max_processes_delete_replica = 200
class PhysicalPlacementRealizer(object):
def __init__(self, osd_manager: OSDManager, debug=False, repeat_delete_interval_secs=15,
max_files_in_progress=10000):
self.osd_manager = osd_manager
self.files_to_be_moved = {}
self.iterations = 0
self.max_total_files_in_progress = max_files_in_progress
self.debug = debug
self.repeat_delete_interval_secs = repeat_delete_interval_secs
def realize_placement(self):
pass
def get_list_of_all_files_to_be_moved(self):
files_to_be_moved_list = []
for list_per_key in self.files_to_be_moved.values():
for file_to_be_moved in list_per_key:
files_to_be_moved_list.append(file_to_be_moved)
return files_to_be_moved_list
def calculate_files_to_be_moved(self):
managed_folders = self.osd_manager.get_depth_2_subdirectories()
for managed_folder in managed_folders:
for directory in os.walk(managed_folder):
for filename in directory[2]:
policy_command = None
create_command = None
delete_command = None
absolute_file_path = os.path.join(directory[0], filename)
osds_of_file = div_util.get_osd_uuids(absolute_file_path)
path_on_volume = self.osd_manager.get_path_on_volume(absolute_file_path)
containing_folder_id = self.osd_manager.get_containing_folder_id(path_on_volume)
osd_for_file = self.osd_manager.distribution.get_containing_osd(containing_folder_id).uuid
file_on_correct_osd = False
osd_of_file = None # this assignment will always be overwritten,
# as there cannot be files in XtreemFS that do not have an OSD
for osd_of_file in osds_of_file:
if osd_of_file != osd_for_file:
# delete all replicas on wrong OSDs
delete_command = div_util.create_delete_replica_command(absolute_file_path, osd_of_file)
else:
file_on_correct_osd = True
if not file_on_correct_osd and len(osds_of_file) < 2:
# only one replica on a wrong OSD => need to set replication policy.
# otherwise, there is a unique replica on the correct OSD => no change necessary,
# OR there are multiple replicas => replication policy must be set.
policy_command = div_util.create_replication_policy_command(absolute_file_path)
if not file_on_correct_osd:
# create a replica on the correct osd
create_command = div_util.create_create_replica_command(absolute_file_path, osd_for_file)
# in python, strings are also booleans!!! :)
if policy_command or create_command or delete_command:
# create FileToMove object and add it to the corresponding list in the map
file_to_move = FileToMove(absolute_file_path,
osd_of_file,
osd_for_file,
policy_command,
create_command,
delete_command)
movement_key = (osd_of_file, osd_for_file)
if not movement_key in self.files_to_be_moved.keys():
self.files_to_be_moved[movement_key] = []
self.files_to_be_moved[movement_key].append(file_to_move)
def move_files_randomly(self):
files_to_be_moved = self.get_list_of_all_files_to_be_moved()
if self.debug:
print("number of files that need to be moved: " + str(len(files_to_be_moved)))
random.shuffle(files_to_be_moved)
while len(files_to_be_moved) > 0:
# list 1
change_policy_command_list = []
# list 2
create_replica_command_list = []
# list 3
delete_replica_command_list = []
for i in range(0, self.max_total_files_in_progress):
file_to_be_moved = files_to_be_moved.pop()
if file_to_be_moved.policy_command is not None:
change_policy_command_list.append(file_to_be_moved.policy_command)
if file_to_be_moved.create_replica_command is not None:
create_replica_command_list.append(file_to_be_moved.create_replica_command)
if file_to_be_moved.delete_replica_command is not None:
delete_replica_command_list.append(file_to_be_moved.delete_replica_command)
if len(files_to_be_moved) == 0:
break
# run sublist of commands the old way
start_time = time.time()
if self.debug:
print("starting execution of " + str(len(change_policy_command_list)) + " change policy commands...")
print(str(datetime.datetime.now()))
errored_processes = div_util.run_commands(change_policy_command_list, max_processes_change_policy)
end_time = time.time()
if self.debug:
print("executing " + str(len(change_policy_command_list)) + " change policy commands done in " +
str(round(end_time - start_time)) + " sec.")
# div_util.print_process_list(processes)
start_time = time.time()
if self.debug:
print("starting execution of " + str(len(create_replica_command_list)) + " create replica commands...")
print(str(datetime.datetime.now()))
random.shuffle(create_replica_command_list)
errored_processes = div_util.run_commands(create_replica_command_list, max_processes_add_replica)
end_time = time.time()
if self.debug:
print("executing " + str(len(create_replica_command_list)) + " create replica commands done in " +
str(round(end_time - start_time)) + " sec.")
# div_util.print_process_list(processes)
start_time = time.time()
if self.debug:
print("starting execution of " + str(len(delete_replica_command_list)) + " delete replica commands...")
print(str(datetime.datetime.now()))
errored_processes = div_util.run_commands(delete_replica_command_list, max_processes_delete_replica,
print_errors=False)
# run and repeat delete commands, until they return no error
# (if an error is returned for another reason than that one would delete the last complete replica,
# this will probably not work.
iterations = 0
max_iterations = 5
while True:
if iterations >= max_iterations:
print("results of last iteration: ")
div_util.print_process_list(errored_processes)
print("Original replicas could not be deleted after " + str(max_iterations) + ". Aborting...")
break
if self.debug:
print("executing " + str(len(delete_replica_command_list))
+ " delete replica commands done. This is delete-iteration "
+ str(iterations))
# div_util.print_process_list(processes)
errored_deletions = []
for process in errored_processes:
# check the return code. if it is one, the replica could not be deleted, so we try again later.
if process[2] != 0:
errored_deletions.append(process[0])
if self.debug:
print("errored command: ")
print("command: " + str(process[0]))
print("stdoud: " + str(process[1][0]))
print("stderr: " + str(process[1][1]))
print("retcode: " + str(process[2]))
if len(errored_deletions) == 0:
break
time.sleep(self.repeat_delete_interval_secs)
if self.debug:
print("rerunning " + str(
len(errored_deletions)) + " commands because replica could not be deleted...")
errored_processes = div_util.run_commands(errored_deletions, max_processes_change_policy,
print_errors=False)
iterations += 1
if self.debug:
end_time = time.time()
print("deleting replicas done in in " + str(round(end_time - start_time)) + " sec.")
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment