Commit ca608964 authored by Felix Seibert's avatar Felix Seibert
Browse files

first version of max_files_in_progress

parent bd66e08f
......@@ -146,7 +146,7 @@ class OSDManager(object):
folder_size = int(du.stdout.split()[0])
if folder_size == 0:
folder_size = 1
new_folder = folder.Folder(self.path_on_volume(one_folder),
new_folder = folder.Folder(self.get_path_on_volume(one_folder),
folder_size,
None)
new_folders.append(new_folder)
......@@ -206,7 +206,7 @@ class OSDManager(object):
osd_for_folder = self.distribution.get_containing_osd(folder_id)
self.move_folder_to_osd(folder_id, osd_for_folder.uuid)
def fix_physical_layout_internally(self, repeat_delete_interval_secs=15, iteration=0):
def fix_physical_layout_internally(self, repeat_delete_interval_secs=15, iteration=0, max_files_in_progress=10):
"""
fixes the physical layout, such that it matches the data distribution described in self.distribution
we use the following strategy: first, determine which files needs to be moved to another OSD, and create three lists.
......@@ -223,22 +223,22 @@ class OSDManager(object):
"""
if self.debug:
print("starting to fix physical layout...this is fix-iteration " + str(iteration))
# list 1
change_policy_command_list = []
# list 2
create_replica_command_list = []
# list 3
delete_replica_command_list = []
# one element of this list should look like this: (policy_command, create_command, delete_command).
# None is an accepted value.
command_tuple_list = []
# create commands
managed_folders = self.get_depth_2_subdirectories()
for managed_folder in managed_folders:
for directory in os.walk(managed_folder):
for filename in directory[2]:
policy_command = None
create_command = None
delete_command = None
absolute_file_path = os.path.join(directory[0], filename)
osds_of_file = div_util.get_osd_uuids(absolute_file_path)
path_on_volume = self.path_on_volume(absolute_file_path)
path_on_volume = self.get_path_on_volume(absolute_file_path)
containing_folder_id = self.get_containing_folder_id(path_on_volume)
osd_for_file = self.distribution.get_containing_osd(containing_folder_id).uuid
......@@ -247,7 +247,6 @@ class OSDManager(object):
if osd_of_file != osd_for_file:
# delete all replicas on wrong OSDs
delete_command = div_util.create_delete_replica_command(absolute_file_path, osd_of_file)
delete_replica_command_list.append(div_util.command_list_to_single_string(delete_command))
else:
file_on_correct_osd = True
......@@ -256,93 +255,121 @@ class OSDManager(object):
# otherwise, there is a unique replica on the correct OSD => no change necessary,
# OR there are multiple replicas => replication policy must be set.
policy_command = div_util.create_replication_policy_command(absolute_file_path)
change_policy_command_list.append(div_util.command_list_to_single_string(policy_command))
if not file_on_correct_osd:
# create a replica on the correct osd
create_command = div_util.create_create_replica_command(absolute_file_path, osd_for_file)
create_replica_command_list.append(div_util.command_list_to_single_string(create_command))
command_tuple_list.append((policy_command, create_command, delete_command))
# end of recursion condition
if len(create_replica_command_list) == 0 and len(delete_replica_command_list) == 0:
if len(command_tuple_list) == 0:
return
# run commands
start_time = time.time()
if self.debug:
print("starting execution of change policy commands...")
print(str(datetime.datetime.now()))
errored_processes = div_util.run_commands(change_policy_command_list, max_processes_change_policy)
end_time = time.time()
if self.debug:
print("executing " + str(len(change_policy_command_list)) + " change policy commands done in " +
str(round(end_time - start_time)) + " sec.")
# div_util.print_process_list(processes)
# run commands the new way
random.shuffle(command_tuple_list)
start_time = time.time()
if self.debug:
print("starting execution of create replica commands...")
print(str(datetime.datetime.now()))
random.shuffle(create_replica_command_list)
errored_processes = div_util.run_commands(create_replica_command_list, max_processes_add_replica)
end_time = time.time()
if self.debug:
print("executing " + str(len(create_replica_command_list)) + " create replica commands done in " +
str(round(end_time - start_time)) + " sec.")
# div_util.print_process_list(processes)
start_time = time.time()
if self.debug:
print("starting execution of delete replica commands...")
print(str(datetime.datetime.now()))
errored_processes = div_util.run_commands(delete_replica_command_list, max_processes_delete_replica,
print_errors=False)
# run and repeat delete commands, until they return no error
# (if an error is returned for another reason than that one would delete the last complete replica,
# this will probably not work.
iterations = 0
max_iterations = 5
while True:
if iterations >= max_iterations:
print("results of last iteration: ")
div_util.print_process_list(errored_processes)
print("Original replicas could not be deleted after " + str(max_iterations) + ". Aborting...")
break
if self.debug:
print("executing " + str(len(delete_replica_command_list))
+ " delete replica commands done. This is delete-iteration "
+ str(iterations))
# div_util.print_process_list(processes)
while len(command_tuple_list) > 0:
# list 1
change_policy_command_list = []
# list 2
create_replica_command_list = []
# list 3
delete_replica_command_list = []
errored_deletions = []
for i in range(0, max_files_in_progress):
policy_command, create_command, delete_command = command_tuple_list.pop()
if policy_command is not None:
change_policy_command_list.append(policy_command)
if create_command is not None:
create_replica_command_list.append(create_command)
if delete_command is not None:
delete_replica_command_list.append(delete_command)
if len(command_tuple_list) == 0:
break
for process in errored_processes:
# check the return code. if it is one, the replica could not be deleted, so we try again later.
if process[2] != 0:
errored_deletions.append(process[0])
print("errored command: ")
print("command: " + str(process[0]))
print("stdoud: " + str(process[1][0]))
print("stderr: " + str(process[1][1]))
print("retcode: " + str(process[2]))
if len(errored_deletions) == 0:
break
time.sleep(repeat_delete_interval_secs)
# run sublist of commands the old way
start_time = time.time()
if self.debug:
print("starting execution of " + str(len(change_policy_command_list)) + " change policy commands...")
print("commands: ")
for command in change_policy_command_list:
print(str(command))
print(str(datetime.datetime.now()))
errored_processes = div_util.run_commands(change_policy_command_list, max_processes_change_policy)
end_time = time.time()
if self.debug:
print("executing " + str(len(change_policy_command_list)) + " change policy commands done in " +
str(round(end_time - start_time)) + " sec.")
# div_util.print_process_list(processes)
start_time = time.time()
if self.debug:
print("starting execution of " + str(len(create_replica_command_list)) + " create replica commands...")
print(str(datetime.datetime.now()))
random.shuffle(create_replica_command_list)
errored_processes = div_util.run_commands(create_replica_command_list, max_processes_add_replica)
end_time = time.time()
if self.debug:
print("rerunning " + str(len(errored_deletions)) + " commands because replica could not be deleted...")
print("executing " + str(len(create_replica_command_list)) + " create replica commands done in " +
str(round(end_time - start_time)) + " sec.")
# div_util.print_process_list(processes)
errored_processes = div_util.run_commands(errored_deletions, max_processes_change_policy,
start_time = time.time()
if self.debug:
print("starting execution of " + str(len(delete_replica_command_list)) + " delete replica commands...")
print(str(datetime.datetime.now()))
errored_processes = div_util.run_commands(delete_replica_command_list, max_processes_delete_replica,
print_errors=False)
iterations += 1
if self.debug:
end_time = time.time()
print("deleting replicas done in in " + str(round(end_time - start_time)) + " sec.")
# run and repeat delete commands, until they return no error
# (if an error is returned for another reason than that one would delete the last complete replica,
# this will probably not work.
iterations = 0
max_iterations = 5
while True:
if iterations >= max_iterations:
print("results of last iteration: ")
div_util.print_process_list(errored_processes)
print("Original replicas could not be deleted after " + str(max_iterations) + ". Aborting...")
break
if self.debug:
print("executing " + str(len(delete_replica_command_list))
+ " delete replica commands done. This is delete-iteration "
+ str(iterations))
# div_util.print_process_list(processes)
errored_deletions = []
for process in errored_processes:
# check the return code. if it is one, the replica could not be deleted, so we try again later.
if process[2] != 0:
errored_deletions.append(process[0])
print("errored command: ")
print("command: " + str(process[0]))
print("stdoud: " + str(process[1][0]))
print("stderr: " + str(process[1][1]))
print("retcode: " + str(process[2]))
if len(errored_deletions) == 0:
break
time.sleep(repeat_delete_interval_secs)
if self.debug:
print("rerunning " + str(
len(errored_deletions)) + " commands because replica could not be deleted...")
errored_processes = div_util.run_commands(errored_deletions, max_processes_change_policy,
print_errors=False)
iterations += 1
if self.debug:
end_time = time.time()
print("deleting replicas done in in " + str(round(end_time - start_time)) + " sec.")
self.fix_physical_layout_internally(iteration=iteration + 1)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment