Commit fef86bab authored by Felix Seibert's avatar Felix Seibert
Browse files

properly formatting docstrings

parent 6d7ea2ac
......@@ -24,10 +24,9 @@ max_processes_add_replica = 200
max_processes_delete_replica = 200
# TODO add support for arbitrary subdirectory level
# (currently depth=2 is hardcoded, which is fine for GeoMultiSens purposes)
class OSDManager(object):
# TODO add support for arbitrary subdirectory level
# (currently depth=2 is hardcoded, which is fine for GeoMultiSens purposes)
def __init__(self, path_to_managed_folder, config_file='.das_config', value_map=None, debug=False):
self.managed_folder = path_to_managed_folder
......@@ -123,15 +122,14 @@ class OSDManager(object):
f = open(path_to_config, "wb")
pickle.dump(self.distribution, f)
'''
create a good data distribution out of data already present in the file system.
the created data distribution will then be transferred to the physical layer,
i.e., all files will be moved to their corresponding OSD,
using XtreemFS' read-only replication strategy.
'''
def create_distribution_from_existing_files(self, fix_layout_internally=True, apply_layout=True,
environment='LOCAL'):
"""
create a good data distribution out of data already present in the file system.
the created data distribution will then be transferred to the physical layer,
i.e., all files will be moved to their corresponding OSD,
using XtreemFS' read-only replication strategy.
"""
start_time = time.time()
if self.debug:
......@@ -196,12 +194,11 @@ class OSDManager(object):
total_time = round(time.time() - start_time)
print("fixed physical layout of existing files in secs: " + str(total_time))
'''
fixes the physical layout, such that it matches the data distribution described in self.distribution.
this is realized by calling move_folder_to_osd on all folders managed by this distribution.
'''
def fix_physical_layout_externally(self):
"""
fixes the physical layout, such that it matches the data distribution described in self.distribution.
this is realized by calling move_folder_to_osd on all folders managed by this distribution.
"""
if self.debug:
print("fixing physical layout externally...")
managed_folders = self.get_assigned_folders()
......@@ -210,23 +207,21 @@ class OSDManager(object):
osd_for_folder = self.distribution.get_containing_osd(folder_id)
self.move_folder_to_osd(folder_id, osd_for_folder.uuid)
'''
fixes the physical layout, such that it matches the data distribution described in self.distribution
we use the following strategy: first, determine which files needs to be moved to another OSD, and create three lists.
for each file to be moved, one contains the command to set the default replication policy (1), one contains the command
to create a new replica on the target OSD (2) and one contains the command to delete the replica on the original OSD (3).
the command-lists are executed in order: 1, 2, 3, commands of list i only being executed if all commands of list i-1
have returned. the commands of any one list are executed in parallel, with an upper bound of 200 parallel executions
to not violate the maximum number of processes of the OS.
the commands of list 3 might return an error, as deleting the replica on the original OSD is not possible until
the new replica on the target OSD is complete. theses commands are collected and re-executed after a certain delay.
this process is repeated until no error is returned, which should eventually happen (as soon as all new replicas on
the target OSDs are complete).
'''
def fix_physical_layout_internally(self, repeat_delete_interval_secs=15, iteration=0):
"""
fixes the physical layout, such that it matches the data distribution described in self.distribution
we use the following strategy: first, determine which files needs to be moved to another OSD, and create three lists.
for each file to be moved, one contains the command to set the default replication policy (1), one contains the command
to create a new replica on the target OSD (2) and one contains the command to delete the replica on the original OSD (3).
the command-lists are executed in order: 1, 2, 3, commands of list i only being executed if all commands of list i-1
have returned. the commands of any one list are executed in parallel, with an upper bound of 200 parallel executions
to not violate the maximum number of processes of the OS.
the commands of list 3 might return an error, as deleting the replica on the original OSD is not possible until
the new replica on the target OSD is complete. theses commands are collected and re-executed after a certain delay.
this process is repeated until no error is returned, which should eventually happen (as soon as all new replicas on
the target OSDs are complete).
"""
if self.debug:
print("starting to fix physical layout...this is fix-iteration " + str(iteration))
# list 1
......@@ -352,11 +347,10 @@ class OSDManager(object):
self.fix_physical_layout_internally(iteration=iteration + 1)
'''
create empty folders and assign OSDs.
'''
def create_empty_folders(self, folders):
"""
create empty folders and assign OSDs.
"""
average_size = int(self.distribution.get_average_folder_size())
if average_size <= 0:
average_size = 1
......@@ -376,13 +370,12 @@ class OSDManager(object):
self.__write_configuration()
'''
copy a list of given folders into the managed folder, assigning OSDs to new folders and updating
self.dataDistribution
'''
def copy_folders(self, folders, environment='LOCAL', remote_source=None, sshfs_mount_dir='/tmp/sshfs_tmp_mnt',
apply_layout=True, execute_copy=True, random_osd_assignment=False):
"""
copy a list of given folders into the managed folder, assigning OSDs to new folders and updating
self.dataDistribution
"""
if self.debug:
print("calling copy_folders with:")
print("folders: " + str(folders))
......@@ -500,11 +493,10 @@ class OSDManager(object):
# command_list.append(command)
return command_list
'''
moves a folder from one OSD to another OSD. you may specify a temporary folder.
'''
def move_folder_to_osd(self, folder_id: str, new_osd_id: str, tmp_dir=None):
"""
moves a folder from one OSD to another OSD. you may specify a temporary folder.
"""
folder_path = os.path.join(self.get_target_dir(folder_id),
os.path.split(folder_id)[1])
......@@ -553,11 +545,10 @@ class OSDManager(object):
print("externally moved folder " + folder_id +
" to osd: " + new_osd_id + " in secs: " + str(round(total_time)))
'''
removes a folder from the distribution. this does NOT delete the folder from the file system.
'''
def remove_folder(self, folder_id):
"""
removes a folder from the distribution. this does NOT delete the folder from the file system.
"""
containing_osd = self.distribution.get_containing_osd(folder_id)
if containing_osd is not None:
if not div_util.check_for_executable('xtfsutil'):
......@@ -574,14 +565,12 @@ class OSDManager(object):
"remove " + folder_id + "", self.path_to_mount_point],
stdout=subprocess.PIPE, universal_newlines=True)
'''
update the given (by absolute path) folders, such that the values held by self.dataDistribution
matches their size on disk.
if no argument is given, all folders are updated.
'''
def update(self, arg_folders=None):
"""
update the given (by absolute path) folders, such that the values held by self.dataDistribution
matches their size on disk.
if no argument is given, all folders are updated.
"""
if arg_folders is not None:
for folder_for_update in arg_folders:
if not folder_for_update.startswith(self.managed_folder):
......@@ -612,13 +601,12 @@ class OSDManager(object):
if self.debug:
print(str(self))
'''
apply the given assignments to the XtreemFS volume, using xtfsutil.
the assignments are given as a list containing tuples (tile_id, osd),
where tile_id is given by applying path_on_volume() onto the absolute path of the folder.
'''
def apply_osd_assignments(self, assignments):
"""
apply the given assignments to the XtreemFS volume, using xtfsutil.
the assignments are given as a list containing tuples (tile_id, osd),
where tile_id is given by applying path_on_volume() onto the absolute path of the folder.
"""
if not div_util.check_for_executable('xtfsutil'):
raise ExecutableNotFoundException("No xtfsutil found. Please make sure it is contained in your PATH.")
......@@ -642,11 +630,10 @@ class OSDManager(object):
self.path_to_mount_point],
stdout=subprocess.PIPE, universal_newlines=True)
'''
copy data onto XtreemFS volume
'''
def __copy_data(self, input_folders, environment, remote_source):
"""
copy data onto XtreemFS volume
"""
if self.debug:
print('calling copy_data with: ', input_folders, environment, remote_source)
osd_list = self.distribution.get_osd_list()
......@@ -680,15 +667,14 @@ class OSDManager(object):
self.__execute_commands(copy_commands)
'''
generates a list of copy commands, one command for each OSD that receives new data.
the copy commands are constructed such that they can be executed in a slurm environment (that is, within a slurm job
allocation) at ZIB.
each command is a copy command including all new folders for the corresponding OSD, preceded by a srun command
to execute the copy command locally on the slurm node on which the target OSD resides.
'''
def __generate_copy_commands_slurm(self, osd_to_folders_map, remote_source):
"""
generates a list of copy commands, one command for each OSD that receives new data.
the copy commands are constructed such that they can be executed in a slurm environment (that is, within a slurm job
allocation) at ZIB.
each command is a copy command including all new folders for the corresponding OSD, preceded by a srun command
to execute the copy command locally on the slurm node on which the target OSD resides.
"""
if self.debug:
print("Using SLURM mode for copying...")
......@@ -720,14 +706,13 @@ class OSDManager(object):
# command_list.append(command)
return command_list
'''
generates a list of copy commands, one command for each OSD that receives new data.
the copy commands are constructed such that they can be executed on the GeoMultiSens cluster at HU Berlin.
each command is a copy command including all new folders for the corresponding OSD, preceded by a ssh command
to execute the copy command locally on the node of the target OSD.
'''
def __generate_copy_commands_hu_cluster(self, osd_to_folders_map):
"""
generates a list of copy commands, one command for each OSD that receives new data.
the copy commands are constructed such that they can be executed on the GeoMultiSens cluster at HU Berlin.
each command is a copy command including all new folders for the corresponding OSD, preceded by a ssh command
to execute the copy command locally on the node of the target OSD.
"""
if self.debug:
print("Using HU_CLUSTER mode for copying...")
......@@ -757,12 +742,11 @@ class OSDManager(object):
return command_list
'''
generates a list of copy commands, one command for each OSD that receives new data.
plain old cp is used for the actual copying.
'''
def __generate_copy_commands_local(self, osd_to_folders_map):
"""
generates a list of copy commands, one command for each OSD that receives new data.
plain old cp is used for the actual copying.
"""
if self.debug:
print("Using local cp for copying...")
......@@ -781,12 +765,11 @@ class OSDManager(object):
return command_list
'''
execute, in parallel, a given set of commands. note that the degree of parallelism will match the length of
command_list.
'''
def __execute_commands(self, command_list):
"""
execute, in parallel, a given set of commands. note that the degree of parallelism will match the length of
command_list.
"""
if self.debug:
print("Executing commands: ")
for command in command_list:
......@@ -806,13 +789,12 @@ class OSDManager(object):
print(str(terminated_process.communicate()))
print("Executing commands done.")
'''
creates a list of all folders, managed by this XtreemFS OSD manager, that have an assigned OSD
TODO this actually just returns a list of all level 2 subdirectories, independently of whether they are actually
assigned to some OSD or not. i think that this is not the expected behaviour.
'''
def get_assigned_folders(self):
"""
creates a list of all folders, managed by this XtreemFS OSD manager, that have an assigned OSD
TODO this actually just returns a list of all level 2 subdirectories, independently of whether they are actually
assigned to some OSD or not. i think that this is not the expected behaviour.
"""
assigned_folders = []
for depth_1_folder in os.listdir(self.managed_folder):
depth_1_path = os.path.join(self.managed_folder, depth_1_folder)
......@@ -824,31 +806,28 @@ class OSDManager(object):
return assigned_folders
'''
gets the path to the target dir, given the folder_id
'''
def get_target_dir(self, folder_id):
"""
gets the path to the target dir, given the folder_id
"""
path_on_mount_point = folder_id[len(self.volume_name) + 1:]
return os.path.split(os.path.join(self.path_to_mount_point, path_on_mount_point))[0]
'''
remove the leading part of the path, such that only the part onto the xtreemfs volume remains, including
the volume itself.
throws an exception when the path is not managed by this XtreemFS OSD manager.
'''
def path_on_volume(self, path):
"""
remove the leading part of the path, such that only the part onto the xtreemfs volume remains, including
the volume itself.
throws an exception when the path is not managed by this XtreemFS OSD manager.
"""
if not path.startswith(self.managed_folder):
raise PathNotManagedException("Path " + path + " is not managed by this instance of the XtreemFS OSD"
"manager!")
return os.path.join(self.volume_name, path[len(self.path_to_mount_point) + 1:])
'''
search for the assigned folder that is a prefix of the given path on volume
'''
def get_containing_folder_id(self, path_on_volume):
"""
search for the assigned folder that is a prefix of the given path on volume
"""
for osd in self.distribution.OSDs.values():
for a_folder in osd.folders:
if path_on_volume.startswith(a_folder):
......
......@@ -2,16 +2,16 @@ import random
import osd
"""
class to keep track of the osd (object storage device) locations of different folders, i.e.,
their physical location.
this class also allows to calculate a 'good' osd for new data, based on the
distribution known beforehand.
"""
class DataDistribution(object):
"""
class to keep track of the osd (object storage device) locations of different folders, i.e.,
their physical location.
this class also allows to calculate a 'good' osd for new data, based on the
distribution known beforehand.
"""
class DataDistribution(object):
def __init__(self):
self.OSDs = {}
......@@ -64,14 +64,13 @@ class DataDistribution(object):
self.OSDs[new_osd].add_folder(folder_id, self.OSDs[old_osd.uuid].folders[folder_id])
self.OSDs[old_osd.uuid].remove_folder(folder_id)
'''
adds a list of folders to the data distribution.
returns a list of assignments from folders to OSDs, for which (folders) there was previously no assignment.
if the optional arguments are given, OSDs are assigned data proportionally to their ratio_parameter.
'''
def add_folders(self, folders, osd_information=None, ratio_parameter='', random_osd_assignment=False):
"""
adds a list of folders to the data distribution.
returns a list of assignments from folders to OSDs, for which (folders) there was previously no assignment.
if the optional arguments are given, OSDs are assigned data proportionally to their ratio_parameter.
"""
new_folders = []
for folder in folders:
containing_osd = self.get_containing_osd(folder.id)
......@@ -80,7 +79,7 @@ class DataDistribution(object):
else:
new_folders.append(folder)
print("dataDistribution: random_osd_assignment: " + str(random_osd_assignment))
print("dataDistribution: random_osd_assignment: " + str(random_osd_assignment))
osds_for_new_folders = []
......@@ -124,20 +123,18 @@ class DataDistribution(object):
return osds_for_new_folders
'''
updates the size of a given folder
'''
def update_folder(self, folder, size):
"""
updates the size of a given folder
"""
for one_osd in self.OSDs.values():
if folder in one_osd.folders.keys():
one_osd.update_folder(folder, size)
'''
generates a string describing this data distribution
'''
def description(self):
"""
generates a string describing this data distribution
"""
string = ""
for one_osd in self.OSDs.values():
string += str(one_osd)
......
......@@ -48,12 +48,10 @@ def print_error(finished_process):
print(str(finished_process[1][1]))
'''
execute list of commands in parallel, return list of executions returned with an error
'''
def run_commands(commands, max_processes=200, print_errors=True):
"""
execute list of commands in parallel, return list of executions returned with an error
"""
# running_processes = set()
running_processes_map = {}
errored_processes = []
......@@ -80,13 +78,12 @@ def run_commands(commands, max_processes=200, print_errors=True):
errored_processes.append(finished_process)
if print_errors:
print_error(finished_process)
# print("progress: " + str(num_finished) + "/" + str(num_total))
# print("progress: " + str(num_finished) + "/" + str(num_total))
# running_processes = running_processes.difference(difference)
for finished_process in difference:
del running_processes_map[finished_process]
timeout_processes = 0
# for finished_process in running_processes:
for still_running_process in running_processes_map:
......@@ -103,8 +100,8 @@ def run_commands(commands, max_processes=200, print_errors=True):
continue
still_running_process = (still_running_process.args,
(stdout, stderr),
still_running_process.returncode)
(stdout, stderr),
still_running_process.returncode)
if still_running_process[2] != 0:
errored_processes.append(still_running_process)
if print_errors:
......@@ -128,12 +125,10 @@ def print_process_list(processes):
print("returncode: " + str(process[2]))
"""
extract volume information from a string which is output from xtfsutil.
"""
def extract_volume_information(string):
"""
extract volume information from a string which is output from xtfsutil.
"""
string_elements = string.split('\n')
volume_name = ""
volume_address = ""
......@@ -175,14 +170,12 @@ def get_http_address(volume_address):
return 'http://' + ip_addr + ':' + http_port + '/'
"""
find out whether we are in a slurm or not.
if yes, return list of hostnames.
otherwise return empty list.
"""
def get_slurm_hosts():
"""
find out whether we are in a slurm or not.
if yes, return list of hostnames.
otherwise return empty list.
"""
scontrol = subprocess.run(["which", "scontrol"], stdout=subprocess.PIPE,
universal_newlines=True)
if not scontrol.stdout.endswith("not found"):
......@@ -195,13 +188,11 @@ def get_slurm_hosts():
return []
"""
maps osd uuids to hostnames,
given a list containing tuples (osdUUID, IPAddr).
"""
def get_osd_to_hostname_map(osds, hosts):
"""
maps osd uuids to hostnames,
given a list containing tuples (osdUUID, IPAddr).
"""
# for each host look up the ip address
osd_map = {}
for host in hosts:
......@@ -214,12 +205,10 @@ def get_osd_to_hostname_map(osds, hosts):
return osd_map
'''
check whether xtfsutil exists
'''
def check_for_xtfsutil():
"""
check whether xtfsutil exists
"""
xtfsutil = subprocess.run(["xtfsutil"], stdout=subprocess.PIPE,
stderr=subprocess.PIPE, universal_newlines=True)
......@@ -232,12 +221,10 @@ def check_for_xtfsutil():
print("Found xtfsutil.")
'''
check whether the given program exists in $PATH
'''
def check_for_executable(executable):
"""
check whether the given program exists in $PATH
"""
try:
subprocess.run([executable], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except FileNotFoundError:
......@@ -245,12 +232,10 @@ def check_for_executable(executable):
return True
'''
remove all leading and trailing slashes (/)
'''
def remove_leading_trailing_slashes(string):
"""
remove all leading and trailing slashes (/)
"""
if len(string) == 0:
return string
while string[0] == '/':
......
"""
representation of an Object Storage device. the OSD is identified by its uuid.
it keeps track of the folders saved on the OSD as well as the size of the folders.
"""
class OSD(object):
"""
representation of an Object Storage device. the OSD is identified by its uuid.
it keeps track of the folders saved on the OSD as well as the size of the folders.
"""
def __init__(self, uuid):
self.uuid = uuid
self.total_folder_size = 0
......
......@@ -2,14 +2,13 @@ import os
import div_util
'''
verify a tile folder: check whether all files in its subdirectories
(representing scenes) are located on the same OSD.
it relies on xtfsutil, so make sure xtfsutil is included in your PATH.
'''
def verify_tile_folder(tile_folder, verbose):
"""
verify a tile folder: check whether all files in its subdirectories
(representing scenes) are located on the same OSD.
it relies on xtfsutil, so make sure xtfsutil is included in your PATH.
"""
osd = None
for folders in os.walk(tile_folder):
for filename in folders[2]:
......@@ -31,13 +30,11 @@ def verify_tile_folder(tile_folder, verbose):
return osd
'''
verify a whole gms folder: gmsFolder should be structured like
gmsFolder/utmStripes/utmTiles/scenes/files
'''
def verify_gms_folder(gms_folder, verbose=False):
"""
verify a whole gms folder: gmsFolder should be structured like
gmsFolder/utmStripes/utmTiles/scenes/files
"""
layout_is_correct = True
for utmStripe in os.listdir(gms_folder):
if not os.path.isdir(gms_folder + "/" + utmStripe):
......@@ -51,12 +48,10 @@ def verify_gms_folder(gms_folder, verbose=False):
return layout_is_correct
'''
print OSD for each file in a given folder
'''
def print_tree(path):
"""
print OSD for each file in a given folder
"""
print("printing OSDs for all files in the following tree: " + path)
number_of_files = 0
osd_set = set()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment