Commit c83f0ede authored by Felix Seibert's avatar Felix Seibert
Browse files

changed module and import structure; hopefully more stability for different network environments.

first commit of working_branch

2nd commit on working branch, adding some missing files.

changed module and file names such that there are matching.

import statements are now more explicit

changed name of field

get_http_address returns an url containing an ip address, to avoid potential dns resolving issues.
parent 7d28c0c6
This diff is collapsed.
Path (on volume) /
XtreemFS file Id 1aaa1b2b-e889-48d8-9d45-d6cdfed6c10f:1
XtreemFS URL pbrpc://localhost:32638/xfs_volume
Owner felix
Group users
Type volume
Available Space 18 GB
Quota / Used Space unlimited / 9 GB
Voucher Size 10 MB
Default User Quota unlimited
Default GroupQuota unlimited
Num. Files/Dirs 1252 / 317
Access Control p. POSIX (permissions & ACLs)
OSD Selection p. 1000,1004
Replica Selection p. default
Default Striping p. STRIPING_POLICY_RAID0 / 1 / 128kB
Default Repl. p. not set
Snapshots enabled no
Tracing enabled no
Selectable OSDs flx-local-osd-1 (130.73.78.157:32640)
flx-local-osd-2 (130.73.78.157:32641)
flx-local-osd-3 (130.73.78.157:32642)
\ No newline at end of file
import subprocess
import unittest
import os
import shutil
import OSDManager
import div_util
import verify
import dir_status_page_parser
import dataDistribution
import folder
class TestDataDistribution(unittest.TestCase):
def setUp(self):
self.osdList = ['osd1', 'osd2', 'osd3']
self.sixty_folders = []
for i in range(0, 60):
self.sixty_folders.append(folder.Folder('id' + str(i), 1, 'None'))
self.six_folders = []
for i in range(0, 6):
self.six_folders.append(folder.Folder('id' + str(i), 1, 'None'))
self.six_hundred_folders = []
for i in range(0, 600):
self.six_hundred_folders.append(folder.Folder('id' + str(i), 1, 'None'))
def test_add_folders_no_osd_information(self):
data_distribution = dataDistribution.DataDistribution()
data_distribution.add_osd_list(self.osdList)
data_distribution.add_folders(self.sixty_folders)
total_osd_folder_sizes = []
for osd in data_distribution.OSDs.values():
total_osd_folder_sizes.append(osd.totalFolderSize)
expected_osd_folder_sizes = [20, 20, 20]
self.assertListEqual(expected_osd_folder_sizes, total_osd_folder_sizes)
def test_add_folders(self):
data_distribution = dataDistribution.DataDistribution()
data_distribution.add_osd_list(self.osdList)
osd_information = {}
osd_1 = {'usable_space': 10, 'total_space': 10}
osd_2 = {'usable_space': 20, 'total_space': 20}
osd_3 = {'usable_space': 30, 'total_space': 30}
osd_information['osd1'] = osd_1
osd_information['osd2'] = osd_2
osd_information['osd3'] = osd_3
data_distribution.add_folders(self.sixty_folders, osd_information, 'total_space')
total_osd_folder_sizes = {}
for osd in data_distribution.OSDs.values():
total_osd_folder_sizes[osd.uuid] = osd.totalFolderSize
expected_osd_folder_sizes = {'osd1': 10, 'osd2': 20, 'osd3': 30}
self.assertEqual(expected_osd_folder_sizes, total_osd_folder_sizes)
class TestDirStatusPageParser(unittest.TestCase):
def test_feed(self):
parser = dir_status_page_parser.dir_status_page_parser()
html_data = open('../test_data/dir_status_page.html', 'r').read()
parser.feed(html_data)
filtered_data_sets = list(filter(lambda x: int(x['last updated'].split()[0]) != 0, parser.dataSets))
filtered_data_sets = list(filter(lambda x: x['type'] == 'SERVICE_TYPE_OSD', filtered_data_sets))
self.assertEqual(len(filtered_data_sets), 24)
total_usable_space = 0
total_space = 0
for dataSet in filtered_data_sets:
total_usable_space += int(dataSet['usable'])
total_space += int(dataSet['total'].split()[0])
self.assertGreaterEqual(total_space, total_usable_space)
class TestDivUtil(unittest.TestCase):
def test_remove_leading_trailing_slashes(self):
s = '/hello/world////'
self.assertEqual(div_util.remove_leading_trailing_slashes(s), 'hello/world')
s = '/just_one_tile'
self.assertEqual(div_util.remove_leading_trailing_slashes(s), 'just_one_tile')
def test_extract_volume_information(self):
test_string = open('../test_data/example_volume_information', 'r').read()
volume_information = div_util.extract_volume_information(test_string)
self.assertEqual('xfs_volume', volume_information[0])
expected_osd_list = []
expected_osd_list.append(('flx-local-osd-1', '130.73.78.157'))
expected_osd_list.append(('flx-local-osd-2', '130.73.78.157'))
expected_osd_list.append(('flx-local-osd-3', '130.73.78.157'))
self.assertEqual(expected_osd_list, volume_information[1])
self.assertEqual('1000,1004', volume_information[2])
self.assertEqual('localhost:32638', volume_information[3])
def test_get_http_address(self):
test_address = 'localhost:32638'
self.assertEqual('http://localhost:30638/', div_util.get_http_address(test_address))
test_address = '192.168.227.11:32638'
self.assertEqual('http://192.168.227.11:30638/', div_util.get_http_address(test_address))
class TestXOSDManager(unittest.TestCase):
def setUp(self):
# fields for unit testing without an xtreemfs instance
self.volume_name = 'volume'
self.path_to_mount = '/mnt/x_mnt'
self.path_on_volume = 'one/two/three'
self.some_folder = 'some/folder'
self.some_sub_folder = 'subfolder'
self.absolute_path_to_managed_folder = os.path.join(self.path_to_mount, self.path_on_volume)
self.absolute_path_random = '/la/la/la/la/la/la/la'
self.volume_address = '1.1.1.1:30000'
self.value_map = {'path_on_volume': self.path_on_volume, 'path_to_mount': self.path_to_mount,
'volume_name': self.volume_name, 'osd_selection_policy': None, 'data_distribution': None,
'volume_address': self.volume_address}
def test_init_no_xtreemfs_volume(self):
dummy_path = '/'
with self.assertRaises(OSDManager.NotAXtreemFSVolume):
OSDManager.OSDManager(dummy_path)
def test_path_on_volume(self):
osd_man = OSDManager.OSDManager(self.absolute_path_to_managed_folder, value_map=self.value_map)
self.assertEqual(os.path.join(self.volume_name, self.path_on_volume, self.some_folder),
osd_man.path_on_volume(
os.path.join(self.path_to_mount, self.path_on_volume, self.some_folder)))
with self.assertRaises(OSDManager.PathNotManagedException):
osd_man.path_on_volume(self.absolute_path_random)
def test_get_target_dir(self):
osd_man = OSDManager.OSDManager(self.absolute_path_to_managed_folder, value_map=self.value_map)
folder_id = os.path.join(self.volume_name, self.path_on_volume, self.some_folder, self.some_sub_folder)
target_dir = osd_man.get_target_dir(folder_id)
expected_target_dir = os.path.join(self.path_to_mount, self.path_on_volume, self.some_folder)
self.assertEqual(expected_target_dir, target_dir)
class TestXOSDManagerWithXtreemFS(unittest.TestCase):
def setUp(self):
# fields for unit testing with a running xtreemfs instance and real files
# two mount points of the same xtreemfs volume
self.mount_point_1 = '/dev/shm/xfs_mnt_1'
self.mount_point_2 = '/dev/shm/xfs_mnt_2'
self.path_on_mount_point = 'x/y/folder'
self.tmp_folder = '/tmp/python-1298324809321934'
self.test_files_folder = 'test_folder'
self.depth_1_name = 'dir_'
self.depth_2_name = 'FOLDER_'
self.depth_3_name = 'img_'
self.file_name = 'file_'
self.num_depth_1_dirs = 2
self.num_depth_2_dirs = 3
self.num_depth_3_dirs = 2
self.num_files = 5
self.depth_1_dirs = []
self.depth_2_dirs = []
self.depth_3_dirs = []
self.file_sizes = {1: 1, 2: 2, 3: 4, 4: 8, 5: 16}
self.file_size_multiplier = 1024 * 8
self.create_test_files()
def tearDown(self):
clean_up_volume(self.mount_point_1)
clean_up_folder(os.path.join(self.tmp_folder, self.test_files_folder))
def create_test_files(self):
clean_up_volume(self.mount_point_1)
clean_up_folder(os.path.join(self.tmp_folder, self.test_files_folder))
for i in range(0, self.num_depth_1_dirs):
depth_1_dir = os.path.join(self.tmp_folder, self.test_files_folder, self.depth_1_name + str(i))
self.depth_1_dirs.append(depth_1_dir)
for j in range(0, self.num_depth_2_dirs):
depth_2_dir = os.path.join(depth_1_dir, self.depth_2_name + str(j))
self.depth_2_dirs.append(depth_2_dir)
for k in range(0, self.num_depth_3_dirs):
depth_3_dir = os.path.join(self.tmp_folder, self.test_files_folder,
self.depth_1_name + str(i),
self.depth_2_name + str(j),
self.depth_3_name + str(k))
self.depth_3_dirs.append(depth_3_dir)
os.makedirs(depth_3_dir)
for l in range(0, self.num_files):
file = open(os.path.join(depth_3_dir, self.file_name + str(l)), 'w')
for m in range(0, self.file_sizes[j + 1] * self.file_size_multiplier):
file.write("1")
def test_copy_folders(self):
managed_folder = os.path.join(self.mount_point_1, self.path_on_mount_point)
os.makedirs(managed_folder)
x_man = OSDManager.OSDManager(managed_folder)
x_man.copy_folders(self.depth_2_dirs)
self.assertTrue(verify.verify_gms_folder(managed_folder))
self.assertEqual(count_folder_and_files(managed_folder),
count_folder_and_files(os.path.join(self.tmp_folder, self.test_files_folder)))
def test_create_empty_folders(self):
managed_folder = os.path.join(self.mount_point_1, self.path_on_mount_point)
os.makedirs(managed_folder)
new_dirs = []
copy_tuples = []
for depth_2_dir in self.depth_2_dirs:
new_dir = os.path.join(os.path.split(os.path.split(depth_2_dir)[0])[1],
os.path.split(depth_2_dir)[1])
new_dir = os.path.join(managed_folder, new_dir)
new_dirs.append(new_dir)
copy_tuples.append((depth_2_dir, new_dir))
x_man = OSDManager.OSDManager(managed_folder)
x_man.create_empty_folders(new_dirs)
# now copy files manually and check whether the data layout is good
for src, dst in copy_tuples:
shutil.rmtree(dst, ignore_errors=True)
# shutil.copytree requires that the target directory does not exist
shutil.copytree(src, dst)
self.assertEqual(count_folder_and_files(managed_folder),
count_folder_and_files(os.path.join(self.tmp_folder, self.test_files_folder)))
self.assertTrue(verify.verify_gms_folder(managed_folder))
def test_update(self):
managed_folder = os.path.join(self.mount_point_1, self.path_on_mount_point)
os.makedirs(managed_folder)
new_dirs = []
copy_tuples = []
for depth_2_dir in self.depth_2_dirs:
new_dir = os.path.join(os.path.split(os.path.split(depth_2_dir)[0])[1],
os.path.split(depth_2_dir)[1])
new_dir = os.path.join(managed_folder, new_dir)
new_dirs.append(new_dir)
copy_tuples.append((depth_2_dir, new_dir))
x_man = OSDManager.OSDManager(managed_folder)
x_man.create_empty_folders(new_dirs)
for src, dst in copy_tuples:
shutil.rmtree(dst, ignore_errors=True)
# shutil.copytree requires that the target directory does not exist
shutil.copytree(src, dst)
du_source = subprocess.run(["du", "-s", os.path.join(self.tmp_folder, self.test_files_folder)],
stdout=subprocess.PIPE, universal_newlines=True)
size_source = int(str(du_source.stdout).split()[0])
total_size_in_distribution = 0
for osd in x_man.distribution.OSDs.values():
total_size_in_distribution += osd.totalFolderSize
self.assertNotEqual(size_source, total_size_in_distribution)
x_man.update()
du_source = subprocess.run(["du", "-s", os.path.join(self.tmp_folder, self.test_files_folder)],
stdout=subprocess.PIPE, universal_newlines=True)
size_source = int(str(du_source.stdout).split()[0])
total_size_in_distribution = 0
for osd in x_man.distribution.OSDs.values():
total_size_in_distribution += osd.totalFolderSize
self.assertEqual(size_source, total_size_in_distribution)
def test_read_write(self):
managed_folder_1 = os.path.join(self.mount_point_1, self.path_on_mount_point)
os.makedirs(managed_folder_1)
x_man_1 = OSDManager.OSDManager(managed_folder_1)
x_man_1.copy_folders(self.depth_2_dirs)
total_size_in_distribution_1 = 0
for osd in x_man_1.distribution.OSDs.values():
total_size_in_distribution_1 += osd.totalFolderSize
managed_folder_2 = os.path.join(self.mount_point_2, self.path_on_mount_point)
x_man_2 = OSDManager.OSDManager(managed_folder_2)
total_size_in_distribution_2 = 0
for osd in x_man_2.distribution.OSDs.values():
total_size_in_distribution_2 += osd.totalFolderSize
self.assertEqual(total_size_in_distribution_1, total_size_in_distribution_2)
def test_remove_folder(self):
managed_folder_1 = os.path.join(self.mount_point_1, self.path_on_mount_point)
os.makedirs(managed_folder_1)
x_man_1 = OSDManager.OSDManager(managed_folder_1)
x_man_1.copy_folders(self.depth_2_dirs)
removed_folder_id = 'xfs_volume/x/y/folder/dir_0/FOLDER_1'
x_man_1.remove_folder(removed_folder_id)
new_files_target_dir = os.path.join(x_man_1.get_target_dir(removed_folder_id),
os.path.split(removed_folder_id)[1])
self.assertIsNotNone(verify.verify_tile_folder(new_files_target_dir, False))
for i in range(1, 100):
file_path = os.path.join(new_files_target_dir, 'new_file_' + str(i))
open(file_path, 'a').close()
self.assertIsNone(verify.verify_tile_folder(new_files_target_dir, False))
def test_move_folder_to_OSD(self):
managed_folder_1 = os.path.join(self.mount_point_1, self.path_on_mount_point)
os.makedirs(managed_folder_1)
x_man_1 = OSDManager.OSDManager(managed_folder_1)
x_man_1.copy_folders(self.depth_2_dirs)
moved_folder_id = 'xfs_volume/x/y/folder/dir_0/FOLDER_1'
new_osd_id = 'flx-local-osd-2'
moved_folder_path = os.path.join(x_man_1.get_target_dir(moved_folder_id),
os.path.split(moved_folder_id)[1])
old_num_dirs_and_files = count_folder_and_files(moved_folder_path)
x_man_1.move_folder_to_osd(moved_folder_id, new_osd_id)
self.assertEqual('flx-local-osd-2',
verify.verify_tile_folder(moved_folder_path, False))
self.assertEqual(old_num_dirs_and_files, count_folder_and_files(moved_folder_path))
def count_folder_and_files(top_dir):
# calculate and return some number, based on a directory tree rooted at top_dir
num = 0
for item in os.walk(top_dir):
for directory in item[1]:
if not directory.startswith('.'):
num += 1
for file in item[2]:
if not file.startswith('.'):
num += 1
return num
def clean_up_volume(mount_point):
clean_up_folder(mount_point)
subprocess.run(["xtfsutil",
"--set-pattr", "1004.filenamePrefix", "--value", "clear", mount_point],
stdout=subprocess.PIPE, universal_newlines=True)
def clean_up_folder(folder):
try:
for item in os.listdir(folder):
if os.path.isdir(os.path.join(folder, item)):
shutil.rmtree(os.path.join(folder, item), ignore_errors=True)
else:
os.remove(os.path.join(folder, item))
assert len(os.listdir(folder)) == 0
except FileNotFoundError:
pass
......@@ -3,7 +3,7 @@ import unittest
import os
import shutil
import xOSDManager
import OSDManager
import div_util
import verify
......@@ -36,21 +36,21 @@ class TestXOSDManager(unittest.TestCase):
def test_init_no_xtreemfs_volume(self):
dummy_path = '/'
with self.assertRaises(xOSDManager.NotAXtreemFSVolume):
xOSDManager.OSDManager(dummy_path)
with self.assertRaises(OSDManager.NotAXtreemFSVolume):
OSDManager.OSDManager(dummy_path)
def test_path_on_volume(self):
osd_man = xOSDManager.OSDManager(self.absolute_path_to_managed_folder, value_map=self.value_map)
osd_man = OSDManager.OSDManager(self.absolute_path_to_managed_folder, value_map=self.value_map)
self.assertEqual(os.path.join(self.volume_name, self.path_on_volume, self.some_folder),
osd_man.path_on_volume(
os.path.join(self.path_to_mount, self.path_on_volume, self.some_folder)))
with self.assertRaises(xOSDManager.PathNotManagedException):
with self.assertRaises(OSDManager.PathNotManagedException):
osd_man.path_on_volume(self.absolute_path_random)
def test_get_target_dir(self):
osd_man = xOSDManager.OSDManager(self.absolute_path_to_managed_folder, value_map=self.value_map)
osd_man = OSDManager.OSDManager(self.absolute_path_to_managed_folder, value_map=self.value_map)
folder_id = os.path.join(self.volume_name, self.path_on_volume, self.some_folder, self.some_sub_folder)
target_dir = osd_man.get_target_dir(folder_id)
......@@ -120,7 +120,7 @@ class TestXOSDManagerWithXtreemFS(unittest.TestCase):
managed_folder = os.path.join(self.mount_point_1, self.path_on_mount_point)
os.makedirs(managed_folder)
x_man = xOSDManager.OSDManager(managed_folder)
x_man = OSDManager.OSDManager(managed_folder)
x_man.copy_folders(self.depth_2_dirs)
self.assertTrue(verify.verifyGMSFolder(managed_folder))
......@@ -141,7 +141,7 @@ class TestXOSDManagerWithXtreemFS(unittest.TestCase):
new_dirs.append(new_dir)
copy_tuples.append((depth_2_dir, new_dir))
x_man = xOSDManager.OSDManager(managed_folder)
x_man = OSDManager.OSDManager(managed_folder)
x_man.create_empty_folders(new_dirs)
# now copy files manually and check whether the data layout is good
......@@ -168,7 +168,7 @@ class TestXOSDManagerWithXtreemFS(unittest.TestCase):
new_dirs.append(new_dir)
copy_tuples.append((depth_2_dir, new_dir))
x_man = xOSDManager.OSDManager(managed_folder)
x_man = OSDManager.OSDManager(managed_folder)
x_man.create_empty_folders(new_dirs)
for src, dst in copy_tuples:
......@@ -181,7 +181,7 @@ class TestXOSDManagerWithXtreemFS(unittest.TestCase):
size_source = int(str(du_source.stdout).split()[0])
total_size_in_distribution = 0
for osd in x_man.dataDistribution.osds.values():
for osd in x_man.distribution.osds.values():
total_size_in_distribution += osd.totalFolderSize
self.assertNotEqual(size_source, total_size_in_distribution)
......@@ -193,7 +193,7 @@ class TestXOSDManagerWithXtreemFS(unittest.TestCase):
size_source = int(str(du_source.stdout).split()[0])
total_size_in_distribution = 0
for osd in x_man.dataDistribution.osds.values():
for osd in x_man.distribution.osds.values():
total_size_in_distribution += osd.totalFolderSize
self.assertEqual(size_source, total_size_in_distribution)
......@@ -202,18 +202,18 @@ class TestXOSDManagerWithXtreemFS(unittest.TestCase):
managed_folder_1 = os.path.join(self.mount_point_1, self.path_on_mount_point)
os.makedirs(managed_folder_1)
x_man_1 = xOSDManager.OSDManager(managed_folder_1)
x_man_1 = OSDManager.OSDManager(managed_folder_1)
x_man_1.copy_folders(self.depth_2_dirs)
total_size_in_distribution_1 = 0
for osd in x_man_1.dataDistribution.osds.values():
for osd in x_man_1.distribution.osds.values():
total_size_in_distribution_1 += osd.totalFolderSize
managed_folder_2 = os.path.join(self.mount_point_2, self.path_on_mount_point)
x_man_2 = xOSDManager.OSDManager(managed_folder_2)
x_man_2 = OSDManager.OSDManager(managed_folder_2)
total_size_in_distribution_2 = 0
for osd in x_man_2.dataDistribution.osds.values():
for osd in x_man_2.distribution.osds.values():
total_size_in_distribution_2 += osd.totalFolderSize
self.assertEqual(total_size_in_distribution_1, total_size_in_distribution_2)
......
import os
import pickle
import subprocess
from urllib import request
import urllib.error
import shutil
import dataDistribution
import div_util
import folder
import xtreemfs_client.dataDistribution as dataDistribution
import xtreemfs_client.div_util as div_util
import xtreemfs_client.folder as folder
import xtreemfs_client.dir_status_page_parser as dir_status_page_parser
'''
xOSDManager - a python module to manage OSD selection in XtreemFS
......@@ -14,8 +16,8 @@ currently only depth (level) 2 subdirectories can be managed
'''
# TODO add support for arbitrary subdirectory level (currently depth=2 is hardcoded, which is fine for GeoMultiSens
# TODO purposes)
# TODO add support for arbitrary subdirectory level
# (currently depth=2 is hardcoded, which is fine for GeoMultiSens purposes)
class OSDManager(object):
def __init__(self, path_to_managed_folder, config_file='.das_config', value_map=None, debug=False):
......@@ -39,37 +41,71 @@ class OSDManager(object):
if len(output_1.stderr) > 0:
raise Exception("xtfsutil produced some error: " + output_1.stderr)
self.pathOnVolume = div_util.removeLeadingTrailingSlashes(
self.pathOnVolume = div_util.remove_leading_trailing_slashes(
str(output_1.stdout).split("\n")[0].split()[-1])
self.pathToMountPoint = self.managedFolder[0:(len(self.managedFolder) - len(self.pathOnVolume) - 1)]
output_2 = subprocess.run(["xtfsutil", self.pathToMountPoint],
stdout=subprocess.PIPE, universal_newlines=True)
volume_information = div_util.extractVolumeInformation(output_2.stdout)
self.volumeName = volume_information[0]
osd_list = list(map(lambda x: x[0], volume_information[1]))
self.osdSelectionPolicy = volume_information[2]
self.volume_information = div_util.extract_volume_information(output_2.stdout)
self.volumeName = self.volume_information[0]
osd_list = list(map(lambda x: x[0], self.volume_information[1]))
self.osdSelectionPolicy = self.volume_information[2]
self.volumeAddress = self.volume_information[3]
self.dataDistribution = None
self.distribution = None
if not self.__read_configuration():
self.dataDistribution = dataDistribution.DataDistribution()
self.distribution = dataDistribution.DataDistribution()
self.distribution.add_osd_list(osd_list)
self.osdInformation = None
try:
answer = request.urlopen(div_util.get_http_address(self.volumeAddress))
html_data = answer.read().decode('UTF-8')
parser = dir_status_page_parser.dir_status_page_parser()
parser.feed(html_data)
# filter out data sets without last update time or wrong service type
filtered_data_sets = list(filter(lambda x: int(x['last updated'].split()[0]) != 0, parser.dataSets))
filtered_data_sets = list(filter(lambda x: x['type'] == 'SERVICE_TYPE_OSD', filtered_data_sets))
self.dataDistribution.addOSDList(osd_list)
self.osdInformation = {}
for data_set in filtered_data_sets:
uuid = data_set['uuid']