Commit 8bf9c502 authored by Felix Seibert's avatar Felix Seibert
Browse files

eliminating need for modules xattr and json

parent 1fba7a40
......@@ -142,6 +142,8 @@ class OSDManager(object):
du = subprocess.run(["du", "-s", one_folder], stdout=subprocess.PIPE,
universal_newlines=True)
folder_size = int(du.stdout.split()[0])
if folder_size == 0:
folder_size = 1
new_folder = folder.Folder(self.path_on_volume(one_folder),
folder_size,
None)
......@@ -193,29 +195,41 @@ class OSDManager(object):
for filename in directory[2]:
absolute_file_path = os.path.join(directory[0], filename)
osd_of_file = div_util.get_osd_uuid(absolute_file_path)
osds_of_file = div_util.get_osd_uuids(absolute_file_path)
path_on_volume = self.path_on_volume(absolute_file_path)
containing_folder_id = self.get_containing_folder_id(path_on_volume)
osd_for_file = self.distribution.get_containing_osd(containing_folder_id).uuid
if osd_of_file != osd_for_file:
file_on_correct_osd = False
for osd_of_file in osds_of_file:
if osd_of_file != osd_for_file:
# delete all replicas on wrong OSDs
delete_command = div_util.create_delete_replica_command(absolute_file_path, osd_of_file)
delete_replica_command_list.append(div_util.command_list_to_single_string(delete_command))
else:
file_on_correct_osd = True
if not file_on_correct_osd and len(osds_of_file) < 2:
# only one replica on a wrong OSD => need to set replication policy.
# otherwise, there is a unique replica on the correct OSD => no change necessary,
# OR there are multiple replicas => replication policy must be set.
policy_command = div_util.create_replication_policy_command(absolute_file_path)
create_command = div_util.create_create_replica_command(absolute_file_path, osd_for_file)
delete_command = div_util.create_delete_replica_command(absolute_file_path, osd_of_file)
change_policy_command_list.append(div_util.command_list_to_single_string(policy_command))
if not file_on_correct_osd:
# create a replica on the correct osd
create_command = div_util.create_create_replica_command(absolute_file_path, osd_for_file)
create_replica_command_list.append(div_util.command_list_to_single_string(create_command))
delete_replica_command_list.append(div_util.command_list_to_single_string(delete_command))
# run commands
start_time = time.time()
if self.debug:
print("starting execution of change policy commands...")
print(str(datetime.datetime.now()))
processes = div_util.run_commands(change_policy_command_list, max_processes_change_policy)
errored_processes = div_util.run_commands(change_policy_command_list, max_processes_change_policy)
end_time = time.time()
if self.debug:
print("executing " + str(len(processes)) + " change policy commands done in " +
print("executing " + str(len(change_policy_command_list)) + " change policy commands done in " +
str(round(end_time - start_time)) + " sec.")
# div_util.print_process_list(processes)
......@@ -224,10 +238,10 @@ class OSDManager(object):
print("starting execution of create replica commands...")
print(str(datetime.datetime.now()))
random.shuffle(create_replica_command_list)
processes = div_util.run_commands(create_replica_command_list, max_processes_add_replica)
errored_processes = div_util.run_commands(create_replica_command_list, max_processes_add_replica)
end_time = time.time()
if self.debug:
print("executing " + str(len(processes)) + " create replica commands done in " +
print("executing " + str(len(create_replica_command_list)) + " create replica commands done in " +
str(round(end_time - start_time)) + " sec.")
# div_util.print_process_list(processes)
......@@ -235,7 +249,7 @@ class OSDManager(object):
if self.debug:
print("starting execution of delete replica commands...")
print(str(datetime.datetime.now()))
processes = div_util.run_commands(delete_replica_command_list, max_processes_delete_replica, print_errors=False)
errored_processes = div_util.run_commands(delete_replica_command_list, max_processes_delete_replica, print_errors=False)
# run and repeat delete commands, until they return no error
# (if an error is returned for another reason than that one would delete the last complete replica,
......@@ -245,19 +259,19 @@ class OSDManager(object):
while True:
if iterations >= max_iterations:
print("results of last iteration: ")
div_util.print_process_list(processes)
div_util.print_process_list(errored_processes)
print("Original replicas could not be deleted after " + str(max_iterations) + ". Aborting...")
break
if self.debug:
print("executing " + str(len(processes))
print("executing " + str(len(delete_replica_command_list))
+ " delete replica commands done. This is iteration "
+ str(iterations))
# div_util.print_process_list(processes)
errored_deletions = []
for process in processes:
for process in errored_processes:
# check the return code. if it is one, the replica could not be deleted, so we try again later.
if process[2] != 0:
errored_deletions.append(process[0])
......@@ -275,7 +289,7 @@ class OSDManager(object):
if self.debug:
print("rerunning " + str(len(errored_deletions)) + " commands because replica could not be deleted...")
processes = div_util.run_commands(errored_deletions, max_processes_change_policy, print_errors=False)
errored_processes = div_util.run_commands(errored_deletions, max_processes_change_policy, print_errors=False)
iterations += 1
if self.debug:
......
import subprocess
import sys
import socket
import json
import os
import xattr
def get_osd_uuid(path):
# TODO implement version not relying on xattr (this is possible using xtfsutil)
attrs = xattr.listxattr(path)
jsonString = xattr.getxattr(path, "xtreemfs.locations")
parsed = json.loads(jsonString.decode())
return parsed["replicas"][0]['osds'][0]['uuid']
def get_osd_uuids(path):
xtfsutil = subprocess.run(["xtfsutil", path],
stdout=subprocess.PIPE, universal_newlines=True)
string_elements = xtfsutil.stdout.split('\n')
osd_list = []
for splitString in string_elements:
if splitString.lstrip().startswith("OSD "):
end_index = splitString.rfind(" ")
begin_index = splitString.rfind(" ", 0, end_index) + 1
uuid_substring = splitString[begin_index:end_index]
osd_list.append(uuid_substring)
return osd_list
def create_replication_policy_command(absolute_file_path):
......
import os
import subprocess
import div_util
'''
verify a tile folder: check whether all files in its subdirectories
(representing scenes) are located on the same OSD.
......@@ -13,9 +15,11 @@ def verify_tile_folder(tile_folder, verbose):
for folders in os.walk(tile_folder):
for filename in folders[2]:
file_path = os.path.join(folders[0], filename)
xtfsutil = subprocess.run(["xtfsutil", file_path],
stdout=subprocess.PIPE, universal_newlines=True)
osd_for_file = extract_osd(xtfsutil.stdout)
osds_for_file = div_util.get_osd_uuids(file_path)
if len(osds_for_file) > 1:
print("files in " + tile_folder + " are located on multiple OSDs!")
return None
osd_for_file = osds_for_file[0]
if verbose:
print("file: " + file_path)
print("osd of file: " + osd_for_file)
......@@ -62,24 +66,10 @@ def print_tree(path):
number_of_files += 1
file_name = os.path.join(directory[0], file)
print(file_name)
xtfsutil = subprocess.run(["xtfsutil", file_name],
stdout=subprocess.PIPE, universal_newlines=True)
osds_of_file = extract_osd(xtfsutil.stdout)
osds_of_file = div_util.get_osd_uuids(file_name)
for osd_of_file in osds_of_file:
osd_set.add(osd_of_file)
print(osds_of_file)
print("number of files: " + str(number_of_files))
print("OSDs: " + str(osd_set))
def extract_osd(xtfsutil_string):
string_elements = xtfsutil_string.split('\n')
osd_list = []
for splitString in string_elements:
if splitString.lstrip().startswith("OSD "):
end_index = splitString.rfind(" ")
begin_index = splitString.rfind(" ", 0, end_index) + 1
uuid_substring = splitString[begin_index:end_index]
osd_list.append(uuid_substring)
return osd_list
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment