div_util.py 5.75 KB
Newer Older
1
2
import subprocess
import sys
3
import socket
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import json
import os
import xattr


def get_osd_uuid(path):
    # TODO implement version not relying on xattr (this is possible using xtfsutil)
    attrs = xattr.listxattr(path)
    jsonString = xattr.getxattr(path, "xtreemfs.locations")
    parsed = json.loads(jsonString.decode())
    return parsed["replicas"][0]['osds'][0]['uuid']


def create_replication_policy_command(absolute_file_path):
    return ["xtfsutil", "-r", "RONLY", absolute_file_path]


def create_create_replica_command(absolute_file_path, new_osd):
    return ["xtfsutil", "-a" + new_osd, "--full", absolute_file_path]


def create_delete_replica_command(absolute_file_path, osd):
    return ["xtfsutil", "-d", osd, absolute_file_path]


def command_list_to_single_string(command_list):
    single_string = ""
    for command in command_list:
        single_string = single_string + command + " "
    return single_string


def run_commands(commands, max_processes=200):
    all_processes = set()
    running_processes = set()
    for command in commands:
        process = subprocess.Popen(command, stderr=subprocess.PIPE, stdout=subprocess.PIPE, shell=True)
        all_processes.add(process)
        running_processes.add(process)
        if (len(running_processes) >= max_processes):
            os.wait()
            running_processes.difference_update([p for p in running_processes if p.poll() is not None])
46
47
            #print("running procs: " + str(len(running_processes)) + " all procs: " + str(len(all_processes)) +
            #      " commands: " + str(len(commands)))
48
49
50
51

    for p in running_processes:
        p.wait()

52
    return list(map(lambda proc: (proc.args, proc.communicate(), proc.returncode), all_processes))
53
54
55
56


def print_process_list(processes):
    for process in processes:
57
        print("command: " + str(process[0]))
Felix Seibert's avatar
Felix Seibert committed
58
59
        print("stout: " + str(process[1][0]))
        print("stderr: " + str(process[1][1]))
60
        print("returncode: " + str(process[2]))
61

62
63
64
65
66
67

"""
extract volume information from a string which  is output from xtfsutil.
"""


68
69
70
71
72
73
74
75
def extract_volume_information(string):
    string_elements = string.split('\n')
    volume_name = ""
    volume_address = ""
    osd_selection_policy = ""
    osd_section = False
    osd_list = []
    for splitString in string_elements:
76
        if splitString.startswith("XtreemFS URL"):
77
78
79
            url_splits = splitString.split("/")
            volume_name = url_splits[-1]
            volume_address = url_splits[-2]
80
        if splitString.startswith("OSD Selection p"):
81
            osd_selection_policy = splitString.split()[-1]
82
        if splitString.startswith("Selectable OSDs"):
83
            osd_section = True
84
        elif not splitString.startswith("   "):
85
86
87
88
89
            osd_section = False
        if osd_section:
            end_index = splitString.rfind(" ")
            begin_index = splitString.rfind(" ", 0, end_index) + 1
            uuid_substring = splitString[begin_index:end_index]
90

91
92
93
            end_index = splitString.rfind(":")
            begin_index = splitString.rfind("(") + 1
            ip_addr_substring = splitString[begin_index:end_index]
94

95
96
97
98
99
100
101
102
103
104
105
106
107
            osd_list.append((uuid_substring, ip_addr_substring))
    return volume_name, osd_list, osd_selection_policy, volume_address


def get_http_address(volume_address):
    splits = volume_address.split(':')
    hostname = splits[0]
    port = splits[1]
    ip_addr = socket.gethostbyname(hostname)

    http_port = port[:-4] + '0' + port[-3:]

    return 'http://' + ip_addr + ':' + http_port + '/'
108
109
110
111
112
113
114
115
116


"""
find out whether we are in a slurm  or not.
if yes, return list of hostnames.
otherwise return empty list.
"""


117
def get_slurm_hosts():
118
119
120
    scontrol = subprocess.run(["which", "scontrol"], stdout=subprocess.PIPE,
                              universal_newlines=True)
    if not scontrol.stdout.endswith("not found"):
121
122
123
        slurm_hosts = subprocess.run(["scontrol", "show", "hostnames"],
                                     stdout=subprocess.PIPE, universal_newlines=True)
        hosts = slurm_hosts.stdout.split('\n')
124
125
126
127
128
129
130
131
132
133
134
135
        hosts = list(filter(None, hosts))
        return hosts
    else:
        return []


"""
maps osd uuids to hostnames,
given a list containing tuples (osdUUID, IPAddr).
"""


136
def get_osd_to_hostname_map(osds, hosts):
137
    # for each host look up the ip address
138
    osd_map = {}
139
140
141
    for host in hosts:
        host_output = subprocess.run(["host", host],
                                     stdout=subprocess.PIPE, universal_newlines=True)
142
        ip_address = host_output.stdout.split()[-1]
143
        for osd in osds:
144
145
146
            if osd[1] == ip_address:
                osd_map[osd[0]] = host
    return osd_map
147
148
149
150
151
152
153


'''
check whether xtfsutil exists
'''


154
def check_for_xtfsutil():
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
    xtfsutil = subprocess.run(["xtfsutil"], stdout=subprocess.PIPE,
                              stderr=subprocess.PIPE, universal_newlines=True)

    if (not xtfsutil.stdout.startswith("Usage:")) and \
            (not xtfsutil.stderr.startswith("Usage:")):
        print("xtfsutil (the xtreemfs client utility) was not found. " +
              "Please include xtfsutil in your PATH and restart.")
        sys.exit(1)
    else:
        print("Found xtfsutil.")


'''
check whether the given program exists in $PATH
'''


def check_for_executable(executable):
    try:
        subprocess.run([executable], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    except FileNotFoundError:
        return False
    return True


'''
remove all leading and trailing slashes (/)
'''


185
def remove_leading_trailing_slashes(string):
186
187
188
189
190
191
192
193
194
195
196
    if len(string) == 0:
        return string
    while string[0] == '/':
        string = string[1:]
        if len(string) == 0:
            return string
    while string[-1] == '/':
        string = string[:-1]
        if len(string) == 0:
            return string
    return string