Commit 1803beac authored by Felix Seibert's avatar Felix Seibert
Browse files

first commit of working_branch

parent 7d28c0c6
import osd
"""
class to keep track of the osd (object storage device) locations of different
satellite images, i.e., their physical location.
the images are abstracted to the coordinates of their center.
this class also allows to calculate a 'good' osd for new data, based on the
class to keep track of the osd (object storage device) locations of different folders, i.e.,
their physical location.
this class also allows to calculate a 'good' osd for new data, based on the
distribution known beforehand.
"""
class DataDistribution(object):
def __init__(self):
self.osds = {} # this can be a set???
self.OSDs = {}
def addNewOSD(self, osdUuid):
if osdUuid in self.osds:
print ("key: " + osdUuid + " is already present!")
def add_new_osd(self, osd_uuid):
if osd_uuid in self.OSDs:
print("key: " + osd_uuid + " is already present!")
return
newOSD = osd.OSD(osdUuid)
self.osds[osdUuid] = newOSD
new_osd = osd.OSD(osd_uuid)
self.OSDs[osd_uuid] = new_osd
def addOSD(self, osd):
if osd.uuid in self.osds:
print ("key: " + osd.uuid + " is already present!")
def add_osd(self, new_osd):
if new_osd.uuid in self.OSDs:
print("key: " + new_osd.uuid + " is already present!")
return
self.osds[osd.uuid] = osd
self.OSDs[new_osd.uuid] = new_osd
def addOSDList(self, osdList):
for osdUuid in osdList:
if not osdUuid in self.osds:
newOSD = osd.OSD(osdUuid)
self.osds[osdUuid] = newOSD
def add_osd_list(self, osd_list):
for osdUuid in osd_list:
if osdUuid not in self.OSDs:
new_osd = osd.OSD(osdUuid)
self.OSDs[osdUuid] = new_osd
def get_osd_list(self):
osd_list = []
for osd_name in self.osds.keys():
for osd_name in self.OSDs.keys():
osd_list.append(osd_name)
return osd_list
def getContainingOSD(self, folder_id):
for osd in self.osds.values():
if osd.contains_folder(folder_id):
return osd
def get_containing_osd(self, folder_id):
for checked_osd in self.OSDs.values():
if checked_osd.contains_folder(folder_id):
return checked_osd
return None
def getAverageFolderSize(self):
totalSize = 0
def get_average_folder_size(self):
total_size = 0
total_number_of_folders = 0
for osd in self.osds.values():
totalSize += osd.totalFolderSize
total_number_of_folders += len(osd.folders)
for one_osd in self.OSDs.values():
total_size += one_osd.totalFolderSize
total_number_of_folders += len(one_osd.folders)
if total_number_of_folders == 0:
return 0
return totalSize / total_number_of_folders
return total_size / total_number_of_folders
def assign_new_osd(self, folder_id, new_osd):
old_osd = self.get_containing_osd(folder_id)
if old_osd is None:
self.OSDs[new_osd].add_folder(folder_id, self.get_average_folder_size())
else:
self.OSDs[new_osd].add_folder(folder_id, self.OSDs[old_osd.uuid].folders[folder_id])
self.OSDs[old_osd.uuid].remove_folder(folder_id)
'''
adds a list of folders to the data distribution.
returns a list of assignments from folders to OSDs, for which (folders) there was previously no assignment.
if the optional arguments are given, OSDs are assigned data proportionally to their ratio_parameter.
'''
def addFolders(self, folders):
def add_folders(self, folders, osd_information=None, ratio_parameter=''):
new_folders = []
for folder in folders:
containingOSD = self.getContainingOSD(folder.id)
if containingOSD is not None:
containingOSD.add_folder(folder.id, folder.size)
containing_osd = self.get_containing_osd(folder.id)
if containing_osd is not None:
containing_osd.add_folder(folder.id, folder.size)
else:
new_folders.append(folder)
list.sort(new_folders, key=lambda folder: folder.size, reverse=True)
list.sort(new_folders, key=lambda x: x.size, reverse=True)
osds_for_new_folders = []
osd_ratios = {}
if osd_information is not None and ratio_parameter != '':
total_osd_size = 0
for osd_size in osd_information.values():
total_osd_size += osd_size[ratio_parameter]
for osd_uuid, osd_size in osd_information.items():
osd_ratios[osd_uuid] = float(osd_size[ratio_parameter]) / float(total_osd_size)
else:
for osd_uuid in self.OSDs.keys():
osd_ratios[osd_uuid] = float(1)
for folder in new_folders:
leastUsedOSD = None
for osd in self.osds.values():
if (leastUsedOSD == None) or \
osd.totalFolderSize <= leastUsedOSD.totalFolderSize:
leastUsedOSD = osd
leastUsedOSD.add_folder(folder.id, folder.size)
osds_for_new_folders.append((folder.id, leastUsedOSD.uuid))
least_used_osd = None
for one_osd in self.OSDs.values():
if (least_used_osd is None) or \
one_osd.totalFolderSize / osd_ratios[one_osd.uuid] \
<= least_used_osd.totalFolderSize / osd_ratios[least_used_osd.uuid]:
least_used_osd = one_osd
least_used_osd.add_folder(folder.id, folder.size)
osds_for_new_folders.append((folder.id, least_used_osd.uuid))
return osds_for_new_folders
'''
updates the size of a given folder
'''
def update_folder(self, folder, size):
for osd in self.osds.values():
if folder in osd.folders.keys():
osd.update_folder(folder, size)
for one_osd in self.OSDs.values():
if folder in one_osd.folders.keys():
one_osd.update_folder(folder, size)
'''
generates a string describing this data distribution
'''
def description(self):
string = ""
for osd in self.osds.values():
string += str(osd)
for one_osd in self.OSDs.values():
string += str(one_osd)
string += "\n"
string += "folders : " + str(osd.folders)
string += "folders : " + str(one_osd.folders)
string += "\n"
string += "average folder size: " + str(self.getAverageFolderSize())
string += "average folder size: " + str(self.get_average_folder_size())
return string
def __str__(self):
stringRepresentation = "DataDistribution has " + str(len(self.osds)) \
+ " osds: \n"
for key, osd in self.osds.items():
stringRepresentation += str(osd) + " \n"
return stringRepresentation
string_representation = "DataDistribution has " + str(len(self.OSDs)) \
+ " osds: \n"
for key, value in self.OSDs.items():
string_representation += str(value) + " \n"
return string_representation
......@@ -6,33 +6,39 @@ extract volume information from a string which is output from xtfsutil.
"""
def extractVolumeInformation(string):
stringElements = string.split('\n')
volumeName = ""
OSDSelectionPolicy = ""
OSDSection = False
OSDList = []
for splitString in stringElements:
# print ("splitString: " + splitString)
def extract_volume_information(string):
string_elements = string.split('\n')
volume_name = ""
volume_address = ""
osd_selection_policy = ""
osd_section = False
osd_list = []
for splitString in string_elements:
if splitString.startswith("XtreemFS URL"):
volumeName = splitString.split("/")[-1]
url_splits = splitString.split("/")
volume_name = url_splits[-1]
volume_address = url_splits[-2]
if splitString.startswith("OSD Selection p"):
OSDSelectionPolicy = splitString.split()[-1]
osd_selection_policy = splitString.split()[-1]
if splitString.startswith("Selectable OSDs"):
OSDSection = True
osd_section = True
elif not splitString.startswith(" "):
OSDSection = False
if OSDSection:
endIndex = splitString.rfind(" ")
beginIndex = splitString.rfind(" ", 0, endIndex) + 1
uuidSubstring = splitString[beginIndex:endIndex]
osd_section = False
if osd_section:
end_index = splitString.rfind(" ")
begin_index = splitString.rfind(" ", 0, end_index) + 1
uuid_substring = splitString[begin_index:end_index]
endIndex = splitString.rfind(":")
beginIndex = splitString.rfind("(") + 1
IPAddrSubstring = splitString[beginIndex:endIndex]
end_index = splitString.rfind(":")
begin_index = splitString.rfind("(") + 1
ip_addr_substring = splitString[begin_index:end_index]
OSDList.append((uuidSubstring, IPAddrSubstring))
return (volumeName, OSDList, OSDSelectionPolicy)
osd_list.append((uuid_substring, ip_addr_substring))
return volume_name, osd_list, osd_selection_policy, volume_address
def get_http_address(address):
return 'http://' + address[:-4] + '0' + address[-3:] + '/'
"""
......@@ -42,13 +48,13 @@ otherwise return empty list.
"""
def getSlurmHosts():
def get_slurm_hosts():
scontrol = subprocess.run(["which", "scontrol"], stdout=subprocess.PIPE,
universal_newlines=True)
if not scontrol.stdout.endswith("not found"):
slurmHosts = subprocess.run(["scontrol", "show", "hostnames"],
stdout=subprocess.PIPE, universal_newlines=True)
hosts = slurmHosts.stdout.split('\n')
slurm_hosts = subprocess.run(["scontrol", "show", "hostnames"],
stdout=subprocess.PIPE, universal_newlines=True)
hosts = slurm_hosts.stdout.split('\n')
hosts = list(filter(None, hosts))
return hosts
else:
......@@ -61,17 +67,17 @@ given a list containing tuples (osdUUID, IPAddr).
"""
def getOSDToHostnameMap(osds, hosts):
def get_osd_to_hostname_map(osds, hosts):
# for each host look up the ip address
OSDMap = {}
osd_map = {}
for host in hosts:
host_output = subprocess.run(["host", host],
stdout=subprocess.PIPE, universal_newlines=True)
IPAddr = host_output.stdout.split()[-1]
ip_address = host_output.stdout.split()[-1]
for osd in osds:
if osd[1] == IPAddr:
OSDMap[osd[0]] = host
return OSDMap
if osd[1] == ip_address:
osd_map[osd[0]] = host
return osd_map
'''
......@@ -79,7 +85,7 @@ check whether xtfsutil exists
'''
def checkForXtfsutil():
def check_for_xtfsutil():
xtfsutil = subprocess.run(["xtfsutil"], stdout=subprocess.PIPE,
stderr=subprocess.PIPE, universal_newlines=True)
......@@ -110,7 +116,7 @@ remove all leading and trailing slashes (/)
'''
def removeLeadingTrailingSlashes(string):
def remove_leading_trailing_slashes(string):
if len(string) == 0:
return string
while string[0] == '/':
......
......@@ -2,8 +2,9 @@
representation of an Object Storage device. the OSD is identified by its uuid.
it keeps track of the folders saved on the OSD as well as the size of the folders.
"""
class OSD(object):
class OSD(object):
def __init__(self, uuid):
self.uuid = uuid
self.totalFolderSize = 0
......@@ -12,7 +13,7 @@ class OSD(object):
def add_folder(self, folder_id, folder_size):
if folder_id not in self.folders:
self.folders[folder_id] = folder_size
else:
else:
self.folders[folder_id] += folder_size
self.totalFolderSize += folder_size
......@@ -31,5 +32,5 @@ class OSD(object):
def __str__(self):
return "osd: '" + self.uuid \
+ "' totalFolderSize: " + str(self.totalFolderSize) \
+ " number of folders: " + str(len(self.folders))
+ "' totalFolderSize: " + str(self.totalFolderSize) \
+ " number of folders: " + str(len(self.folders))
import argparse
import os
import subprocess
import div_util
'''
verify a tile folder: check whether all files in its subdirectories
(representing scenes) are located on the same OSD.
......@@ -11,14 +8,14 @@ it relies on xtfsutil, so make sure xtfsutil is included in your PATH.
'''
def verifyTileFolder(tileFolder, verbose):
def verify_tile_folder(tile_folder, verbose):
osd = None
for folders in os.walk(tileFolder):
for folders in os.walk(tile_folder):
for filename in folders[2]:
file_path = os.path.join(folders[0], filename)
xtfsutil = subprocess.run(["xtfsutil", file_path],
stdout=subprocess.PIPE, universal_newlines=True)
osd_for_file = extractOSD(xtfsutil.stdout)
osd_for_file = extract_osd(xtfsutil.stdout)
if verbose:
print("file: " + file_path)
print("osd of file: " + osd_for_file)
......@@ -26,28 +23,29 @@ def verifyTileFolder(tileFolder, verbose):
osd = osd_for_file
else:
if not osd_for_file == osd:
print("files in " + tileFolder + " are located on different OSDs!")
print("files in " + tile_folder + " are located on different OSDs!")
return None
return osd
'''
verify a whole gms folder: gmsFolder should be structured like
gmsFolder/utmStripes/utmTiles/scenes/files
'''
def verifyGMSFolder(gmsFolder, verbose=False):
layoutIsCorrect = True
for utmStripe in os.listdir(gmsFolder):
if not os.path.isdir(gmsFolder + "/" + utmStripe):
def verify_gms_folder(gms_folder, verbose=False):
layout_is_correct = True
for utmStripe in os.listdir(gms_folder):
if not os.path.isdir(gms_folder + "/" + utmStripe):
continue
for tile in os.listdir(gmsFolder + "/" + utmStripe):
if not os.path.isdir(gmsFolder + "/" + utmStripe + "/" + tile):
for tile in os.listdir(gms_folder + "/" + utmStripe):
if not os.path.isdir(gms_folder + "/" + utmStripe + "/" + tile):
continue
check = verifyTileFolder(gmsFolder + "/" + utmStripe + "/" + tile, verbose)
check = verify_tile_folder(gms_folder + "/" + utmStripe + "/" + tile, verbose)
if check is None:
return False
return layoutIsCorrect
return layout_is_correct
'''
......@@ -55,47 +53,30 @@ print OSD for each file in a given folder
'''
def printTree(path):
numberOfFiles = 0
osdSet = set()
def print_tree(path):
print("printing OSDs for all files in the following tree: " + path)
number_of_files = 0
osd_set = set()
for directory in os.walk(path):
for file in directory[2]:
numberOfFiles += 1
fileName = os.path.join(directory[0], file)
print(fileName)
xtfsutil = subprocess.run(["xtfsutil", fileName],
number_of_files += 1
file_name = os.path.join(directory[0], file)
print(file_name)
xtfsutil = subprocess.run(["xtfsutil", file_name],
stdout=subprocess.PIPE, universal_newlines=True)
osdOfFile = extractOSD(xtfsutil.stdout)
osdSet.add(osdOfFile)
print(osdOfFile)
osd_of_file = extract_osd(xtfsutil.stdout)
osd_set.add(osd_of_file)
print(osd_of_file)
print("number of files: " + str(numberOfFiles))
print("OSDs: " + str(osdSet))
print("number of files: " + str(number_of_files))
print("OSDs: " + str(osd_set))
def extractOSD(xtfsutilString):
stringElements = xtfsutilString.split('\n')
for splitString in stringElements:
def extract_osd(xtfsutil_string):
string_elements = xtfsutil_string.split('\n')
for splitString in string_elements:
if splitString.lstrip().startswith("OSD 1"):
endIndex = splitString.rfind(" ")
beginIndex = splitString.rfind(" ", 0, endIndex) + 1
uuidSubstring = splitString[beginIndex:endIndex]
return uuidSubstring
'''
parser = argparse.ArgumentParser(description="XtreemFS data layout verification script")
parser.add_argument("folder_to_verify")
parser.add_argument("--verbose", "-v", action='store_const', const=True, default=False)
parser.add_argument("--print-layout", "-p", action='store_const', const=True, default=False)
div_util.checkForXtfsutil()
args = parser.parse_args()
if args.print_layout:
printTree(args.folder_to_verify)
else:
verifyGMSFolder(args.folder_to_verify, args.verbose)
'''
end_index = splitString.rfind(" ")
begin_index = splitString.rfind(" ", 0, end_index) + 1
uuid_substring = splitString[begin_index:end_index]
return uuid_substring
import os
import pickle
import subprocess
from urllib import request
import urllib.error
import shutil
import dataDistribution
import div_util
import folder
import dir_status_page_parser
'''
xOSDManager - a python module to manage OSD selection in XtreemFS
......@@ -14,8 +16,8 @@ currently only depth (level) 2 subdirectories can be managed
'''
# TODO add support for arbitrary subdirectory level (currently depth=2 is hardcoded, which is fine for GeoMultiSens
# TODO purposes)
# TODO add support for arbitrary subdirectory level
# (currently depth=2 is hardcoded, which is fine for GeoMultiSens purposes)
class OSDManager(object):
def __init__(self, path_to_managed_folder, config_file='.das_config', value_map=None, debug=False):
......@@ -39,30 +41,64 @@ class OSDManager(object):
if len(output_1.stderr) > 0:
raise Exception("xtfsutil produced some error: " + output_1.stderr)
self.pathOnVolume = div_util.removeLeadingTrailingSlashes(
self.pathOnVolume = div_util.remove_leading_trailing_slashes(
str(output_1.stdout).split("\n")[0].split()[-1])
self.pathToMountPoint = self.managedFolder[0:(len(self.managedFolder) - len(self.pathOnVolume) - 1)]
output_2 = subprocess.run(["xtfsutil", self.pathToMountPoint],
stdout=subprocess.PIPE, universal_newlines=True)
volume_information = div_util.extractVolumeInformation(output_2.stdout)
self.volumeName = volume_information[0]
osd_list = list(map(lambda x: x[0], volume_information[1]))
self.osdSelectionPolicy = volume_information[2]
self.volume_information = div_util.extract_volume_information(output_2.stdout)
self.volumeName = self.volume_information[0]
osd_list = list(map(lambda x: x[0], self.volume_information[1]))
self.osdSelectionPolicy = self.volume_information[2]
self.volumeAddress = self.volume_information[3]
self.dataDistribution = None
if not self.__read_configuration():
self.dataDistribution = dataDistribution.DataDistribution()
self.dataDistribution.addOSDList(osd_list)
self.dataDistribution.add_osd_list(osd_list)
self.osdInformation = None
try:
answer = request.urlopen(div_util.get_http_address(self.volumeAddress))
html_data = answer.read().decode('UTF-8')
parser = dir_status_page_parser.DIRStatusPageParser()
parser.feed(html_data)
# filter out data sets without last update time or wrong service type
filtered_data_sets = list(filter(lambda x: int(x['last updated'].split()[0]) != 0, parser.dataSets))
filtered_data_sets = list(filter(lambda x: x['type'] == 'SERVICE_TYPE_OSD', filtered_data_sets))
self.osdInformation = {}
for data_set in filtered_data_sets:
uuid = data_set['uuid']
current_osd = {}
current_osd['usable_space'] = int(data_set['usable'].split()[0])
current_osd['total_space'] = int(data_set['total'].split()[0])
self.osdInformation[uuid] = current_osd
except urllib.error.URLError as error:
print("osd information could not be fetched! Probably the http status page could not be found at:",
div_util.get_http_address(self.volumeAddress))
print(error)
else:
self.pathOnVolume = value_map['path_on_volume']
self.pathToMountPoint = value_map['path_to_mount']
self.volumeName = value_map['volume_name']
self.osdSelectionPolicy = value_map['osd_selection_policy']
self.dataDistribution = value_map['data_distribution']
try:
self.pathOnVolume = value_map['path_on_volume']
self.pathToMountPoint = value_map['path_to_mount']
self.volumeName = value_map['volume_name']
self.osdSelectionPolicy = value_map['osd_selection_policy']
self.dataDistribution = value_map['data_distribution']
self.volumeAddress = value_map['volume_address']
self.osdInformation = value_map['osd_information']
except KeyError as error:
print('key not found:', error)
print('leaving in OSDManager field empty!')
def __read_configuration(self):
assert self.dataDistribution is None
......@@ -84,7 +120,7 @@ class OSDManager(object):
'''
def create_empty_folders(self, folders):
average_size = int(self.dataDistribution.getAverageFolderSize())
average_size = int(self.dataDistribution.get_average_folder_size())
if average_size <= 0:
average_size = 1
......@@ -94,7 +130,7 @@ class OSDManager(object):
new_tile = folder.Folder(self.path_on_volume(input_folder), average_size, None)
tiles.append(new_tile)
new_tiles = self.dataDistribution.addFolders(tiles)
new_tiles = self.dataDistribution.add_folders(tiles)
self.apply_osd_assignments(new_tiles)
......@@ -108,7 +144,8 @@ class OSDManager(object):
self.dataDistribution
'''
def copy_folders(self, folders, environment='LOCAL', remote_source=None, sshfs_mount_dir='/tmp/sshfs_tmp_mnt'):
def copy_folders(self, folders, environment='LOCAL', remote_source=None, sshfs_mount_dir='/tmp/sshfs_tmp_mnt',