summaryrefslogtreecommitdiff
path: root/tests/atf_python/sys/net/tools.py
blob: 44bd74d8578fe5cbb765b415a71b2b18e9a8fba0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
#!/usr/local/bin/python3
import json
import os
import subprocess


class ToolsHelper(object):
    NETSTAT_PATH = "/usr/bin/netstat"
    IFCONFIG_PATH = "/sbin/ifconfig"

    @classmethod
    def get_output(cls, cmd: str, verbose=False) -> str:
        if verbose:
            print("run: '{}'".format(cmd))
        return os.popen(cmd).read()

    @classmethod
    def pf_rules(cls, rules, verbose=True):
        pf_conf = ""
        for r in rules:
            pf_conf = pf_conf + r + "\n"

        if verbose:
            print("Set rules:")
            print(pf_conf)

        ps = subprocess.Popen("/sbin/pfctl -g -f -", shell=True,
            stdin=subprocess.PIPE)
        ps.communicate(bytes(pf_conf, 'utf-8'))
        ret = ps.wait()
        if ret != 0:
            raise Exception("Failed to set pf rules %d" % ret)

        if verbose:
            cls.print_output("/sbin/pfctl -sr")

    @classmethod
    def print_output(cls, cmd: str, verbose=True):
        if verbose:
            print("======= {} =====".format(cmd))
        print(cls.get_output(cmd))
        if verbose:
            print()

    @classmethod
    def print_net_debug(cls):
        cls.print_output("ifconfig")
        cls.print_output("netstat -rnW")

    @classmethod
    def set_sysctl(cls, oid, val):
        cls.get_output("sysctl {}={}".format(oid, val))

    @classmethod
    def get_routes(cls, family: str, fibnum: int = 0):
        family_key = {"inet": "-4", "inet6": "-6"}.get(family)
        out = cls.get_output(
            "{} {} -rnW -F {} --libxo json".format(cls.NETSTAT_PATH, family_key, fibnum)
        )
        js = json.loads(out)
        js = js["statistics"]["route-information"]["route-table"]["rt-family"]
        if js:
            return js[0]["rt-entry"]
        else:
            return []

    @classmethod
    def get_nhops(cls, family: str, fibnum: int = 0):
        family_key = {"inet": "-4", "inet6": "-6"}.get(family)
        out = cls.get_output(
            "{} {} -onW -F {} --libxo json".format(cls.NETSTAT_PATH, family_key, fibnum)
        )
        js = json.loads(out)
        js = js["statistics"]["route-nhop-information"]["nhop-table"]["rt-family"]
        if js:
            return js[0]["nh-entry"]
        else:
            return []

    @classmethod
    def get_linklocals(cls):
        ret = {}
        ifname = None
        ips = []
        for line in cls.get_output(cls.IFCONFIG_PATH).splitlines():
            if line[0].isalnum():
                if ifname:
                    ret[ifname] = ips
                    ips = []
                ifname = line.split(":")[0]
            else:
                words = line.split()
                if words[0] == "inet6" and words[1].startswith("fe80"):
                    # inet6 fe80::1%lo0 prefixlen 64 scopeid 0x2
                    ip = words[1].split("%")[0]
                    scopeid = int(words[words.index("scopeid") + 1], 16)
                    ips.append((ip, scopeid))
        if ifname:
            ret[ifname] = ips
        return ret