Commit ee5e0587 authored by Felix Seibert's avatar Felix Seibert
Browse files

adding a function to create a good data layout out of files already present in xtreemfs

(which do not satisfy any precondition on the data distribution)
parent 7d71236e
......@@ -14,6 +14,7 @@ import dir_status_page_parser
'''
xOSDManager - a python module to manage OSD selection in XtreemFS
currently only depth (level) 2 subdirectories can be managed
only unix-based OSs are supported
'''
......@@ -116,6 +117,119 @@ class OSDManager(object):
f = open(path_to_config, "wb")
pickle.dump(self.distribution, f)
'''
create a good data distribution out of data already present in the file system.
the created data distribution will then be transferred to the physical layer,
i.e., all files will be moved to their corresponding OSD,
using XtreemFS' read-only replication strategy.
'''
def create_distribution_from_existing_files(self, apply_layout=True):
if self.debug:
print("creating distribution from existing files. osd manager: " + str(self))
if not div_util.check_for_executable('du'):
raise ExecutableNotFoundException("No du found. Please make sure it is contained in your PATH.")
existing_folders = self.get_assigned_folders()
managed_folders = []
for one_folder in existing_folders:
du = subprocess.run(["du", "-s", one_folder], stdout=subprocess.PIPE,
universal_newlines=True)
folder_size = int(du.stdout.split()[0])
new_folder = folder.Folder(self.path_on_volume(one_folder),
folder_size,
None)
managed_folders.append(new_folder)
new_assignments = self.distribution.add_folders(managed_folders)
if apply_layout:
self.apply_osd_assignments(new_assignments)
elif self.debug:
print("NOT applying data layout!")
self.__write_configuration()
if self.debug:
print("osd manager after new folders have been added to data distribution:")
print(str(self))
self.fix_physical_layout()
'''
fixes the physical layout, such that it matches the data distribution described in self.distribution
we use the following strategy: first, determine which files needs to be moved to another OSD, and create three lists.
for each file to be moved, one contains the command to set the default replication policy (1), one contains the command
to create a new replica on the target OSD (2) and one contains the command to delete the replica on the original OSD (3).
the command-lists are executed in order: 1, 2, 3, commands of list i only being executed if all commands of list i-1
have returned. the commands of any one list are executed in parallel, with an upper bound of 200 parallel executions
to not violate the maximum number of processes of the OS.
the commands of list 3 might return an error, as deleting the replica on the original OSD is not possible until
the new replica on the target OSD is complete. theses commands are collected and re-executed after a certain delay.
this process is repeated until no error is returned, which should eventually happen (as soon as all new replicas on
the target OSDs are complete).
'''
def fix_physical_layout(self, repeat_delete_interval_secs=15):
change_policy_command_list = []
create_replica_command_list = []
delete_replica_command_list = []
for directory in os.walk(self.managedFolder):
for filename in directory[2]:
absolute_file_path = os.path.join(directory[0], filename)
osd_of_file = div_util.get_osd_uuid(absolute_file_path)
osd_for_file = self.distribution.get_containing_osd(self.path_on_volume(absolute_file_path))
if osd_of_file != osd_for_file:
# create replication commands
# collect replication commands
# run replication commands - handle errors if new replica is not complete yet
# if no replication is used (otherwise than for moving the files from osd to osd),
# we should reset the replication policy to none
policy_command = div_util.create_replication_policy_command(absolute_file_path)
create_command = div_util.create_create_replica_command(absolute_file_path, osd_for_file)
delete_command = div_util.create_delete_replica_command(absolute_file_path, osd_of_file)
change_policy_command_list.append(div_util.command_list_to_single_string(policy_command))
create_replica_command_list.append(div_util.command_list_to_single_string(create_command))
delete_replica_command_list.append(div_util.command_list_to_single_string(delete_command))
processes = div_util.run_commands(change_policy_command_list)
if self.debug:
print("executing " + str(len(processes)) + " change policy commands done. Results:")
div_util.print_process_list(processes)
processes = div_util.run_commands(create_replica_command_list)
if self.debug:
print("executing " + str(len(processes)) + " create replica commands done. Results:")
div_util.print_process_list(processes)
processes = div_util.run_commands(delete_replica_command_list)
successfull_deletions = []
errored_deletions = []
while True:
if self.debug:
print("executing " + str(len(processes)) + " delete replica commands done. Results:")
div_util.print_process_list(processes)
for process in processes:
stdout, stderr = process.communicate()
if stderr == b'':
pass
else:
errored_deletions.append(process.args)
if len(errored_deletions) == 0:
break
time.sleep(repeat_delete_interval_secs)
processes = div_util.run_commands(errored_deletions)
'''
create empty folders and assign OSDs.
'''
......@@ -406,7 +520,7 @@ class OSDManager(object):
command_list = []
host_name = ""
#command = ""
# command = ""
for key in osd_to_folders_map.keys():
if host_name == "":
host_name = osd_to_host_map[key]
......@@ -418,7 +532,7 @@ class OSDManager(object):
command += " ;"
if len(osd_to_folders_map[key]) > 0:
command_list.append(command)
#command_list.append(command)
# command_list.append(command)
return command_list
'''
......
......@@ -11,7 +11,7 @@ import OSDManager
import verify
"""
das - data add script. basically a command line wrapper for xOSDManager.
das - data add script. basically a command line wrapper for OSDManager.
please use absolute paths only!
"""
......@@ -52,6 +52,11 @@ parser.add_argument("--debug", "-d", action='store_const', const=True, default=F
parser.add_argument("--verify", "-v", action='store_const', const=True, default=False)
parser.add_argument("--create-from-existing-files", action='store_const',const=True, default=False,
help='creates a data distribution based on the files already present,'
'and changes the physical location of files to match the locations prescribed by the '
'distribution.')
args = parser.parse_args()
if args.debug:
......@@ -93,3 +98,6 @@ elif args.new_folders:
elif args.update:
x_man.update()
print(x_man)
elif args.create_from_existing_files:
x_man.create_distribution_from_existing_files()
import subprocess
import sys
import socket
import json
import os
import xattr
def get_osd_uuid(path):
# TODO implement version not relying on xattr (this is possible using xtfsutil)
attrs = xattr.listxattr(path)
jsonString = xattr.getxattr(path, "xtreemfs.locations")
parsed = json.loads(jsonString.decode())
return parsed["replicas"][0]['osds'][0]['uuid']
def create_replication_policy_command(absolute_file_path):
return ["xtfsutil", "-r", "RONLY", absolute_file_path]
def create_create_replica_command(absolute_file_path, new_osd):
return ["xtfsutil", "-a" + new_osd, "--full", absolute_file_path]
def create_delete_replica_command(absolute_file_path, osd):
return ["xtfsutil", "-d", osd, absolute_file_path]
def command_list_to_single_string(command_list):
single_string = ""
for command in command_list:
single_string = single_string + command + " "
return single_string
def run_commands(commands, max_processes=200):
all_processes = set()
running_processes = set()
for command in commands:
process = subprocess.Popen(command, stderr=subprocess.PIPE, stdout=subprocess.PIPE, shell=True)
all_processes.add(process)
running_processes.add(process)
if (len(running_processes) >= max_processes):
os.wait()
running_processes.difference_update([p for p in running_processes if p.poll() is not None])
for p in running_processes:
p.wait()
return all_processes
def print_process_list(processes):
for process in processes:
print("command: " + str(process.args))
stdout, stderr = process.communicate()
print("stoud: " + str(stdout))
print("stderr: " + str(stderr))
"""
extract volume information from a string which is output from xtfsutil.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment