summaryrefslogtreecommitdiff
path: root/tools/testing/selftests/drivers/net/hw/uso.py
blob: 6d61e56cab3c59cea662eb06d0cc2fc6c238ff77 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#!/usr/bin/env python3
# SPDX-License-Identifier: GPL-2.0

"""Test USO

Sends large UDP datagrams with UDP_SEGMENT and verifies that the peer
receives the expected total payload and that the NIC transmitted at least
the expected number of segments.
"""
import random
import socket
import string

from lib.py import ksft_run, ksft_exit, KsftSkipEx
from lib.py import ksft_eq, ksft_ge, ksft_variants, KsftNamedVariant
from lib.py import NetDrvEpEnv
from lib.py import bkg, defer, ethtool, ip, rand_port, wait_port_listen

# python doesn't expose this constant, so we need to hardcode it to enable UDP
# segmentation for large payloads
UDP_SEGMENT = 103


def _send_uso(cfg, ipver, mss, total_payload, port):
    if ipver == "4":
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        dst = (cfg.remote_addr_v["4"], port)
    else:
        sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
        dst = (cfg.remote_addr_v["6"], port)

    sock.setsockopt(socket.IPPROTO_UDP, UDP_SEGMENT, mss)
    payload = ''.join(random.choice(string.ascii_lowercase)
                      for _ in range(total_payload))
    sock.sendto(payload.encode(), dst)
    sock.close()


def _get_tx_packets(cfg):
    stats = ip(f"-s link show dev {cfg.ifname}", json=True)[0]
    return stats['stats64']['tx']['packets']


def _test_uso(cfg, ipver, mss, total_payload):
    cfg.require_ipver(ipver)
    cfg.require_cmd("socat", remote=True)

    features = ethtool(f"-k {cfg.ifname}", json=True)
    uso_was_on = features[0]["tx-udp-segmentation"]["active"]

    try:
        ethtool(f"-K {cfg.ifname} tx-udp-segmentation on")
    except Exception as exc:
        raise KsftSkipEx(
            "Device does not support tx-udp-segmentation") from exc
    if not uso_was_on:
        defer(ethtool, f"-K {cfg.ifname} tx-udp-segmentation off")

    expected_segs = (total_payload + mss - 1) // mss

    port = rand_port(stype=socket.SOCK_DGRAM)
    rx_cmd = f"socat -{ipver} -T 2 -u UDP-LISTEN:{port},reuseport STDOUT"

    tx_before = _get_tx_packets(cfg)

    with bkg(rx_cmd, host=cfg.remote, exit_wait=True) as rx:
        wait_port_listen(port, proto="udp", host=cfg.remote)
        _send_uso(cfg, ipver, mss, total_payload, port)

    ksft_eq(len(rx.stdout), total_payload,
            comment=f"Received {len(rx.stdout)}B, expected {total_payload}B")

    cfg.wait_hw_stats_settle()

    tx_after = _get_tx_packets(cfg)
    tx_delta = tx_after - tx_before

    ksft_ge(tx_delta, expected_segs,
            comment=f"Expected >= {expected_segs} tx packets, got {tx_delta}")


def _uso_variants():
    for ipver in ["4", "6"]:
        yield KsftNamedVariant(f"v{ipver}_partial", ipver, 1400, 1400 * 10 + 500)
        yield KsftNamedVariant(f"v{ipver}_exact", ipver, 1400, 1400 * 5)


@ksft_variants(_uso_variants())
def test_uso(cfg, ipver, mss, total_payload):
    """Send a USO datagram and verify the peer receives the expected segments."""
    _test_uso(cfg, ipver, mss, total_payload)


def main() -> None:
    """Run USO tests."""
    with NetDrvEpEnv(__file__) as cfg:
        ksft_run([test_uso],
                 args=(cfg, ))
    ksft_exit()


if __name__ == "__main__":
    main()