1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
|
#!/usr/bin/env python3
# SPDX-License-Identifier: GPL-2.0
"""
Tests for XDP metadata kfuncs (e.g. bpf_xdp_metadata_rx_hash).
These tests load device-bound XDP programs from xdp_metadata.bpf.o
that call metadata kfuncs, send traffic, and verify the extracted
metadata via BPF maps.
"""
from lib.py import ksft_run, ksft_eq, ksft_exit, ksft_ge, ksft_ne, ksft_pr
from lib.py import KsftNamedVariant, ksft_variants
from lib.py import CmdExitFailure, KsftSkipEx, NetDrvEpEnv
from lib.py import NetdevFamily
from lib.py import bkg, cmd, rand_port, wait_port_listen
from lib.py import ip, bpftool, defer
from lib.py import bpf_map_set, bpf_map_dump, bpf_prog_map_ids
def _load_xdp_metadata_prog(cfg, prog_name, bpf_file="xdp_metadata.bpf.o"):
"""Load a device-bound XDP metadata program and return prog/map info.
Returns:
dict with 'id', 'name', and 'maps' (name -> map_id).
"""
abs_path = cfg.net_lib_dir / bpf_file
pin_dir = "/sys/fs/bpf/xdp_metadata_test"
cmd(f"rm -rf {pin_dir}", shell=True, fail=False)
cmd(f"mkdir -p {pin_dir}", shell=True)
try:
bpftool(f"prog loadall {abs_path} {pin_dir} type xdp "
f"xdpmeta_dev {cfg.ifname}")
except CmdExitFailure as e:
cmd(f"rm -rf {pin_dir}", shell=True, fail=False)
raise KsftSkipEx(
f"Failed to load device-bound XDP program '{prog_name}'"
) from e
defer(cmd, f"rm -rf {pin_dir}", shell=True, fail=False)
pin_path = f"{pin_dir}/{prog_name}"
ip(f"link set dev {cfg.ifname} xdpdrv pinned {pin_path}")
defer(ip, f"link set dev {cfg.ifname} xdpdrv off")
xdp_info = ip(f"-d link show dev {cfg.ifname}", json=True)[0]
prog_id = xdp_info["xdp"]["prog"]["id"]
return {"id": prog_id,
"name": xdp_info["xdp"]["prog"]["name"],
"maps": bpf_prog_map_ids(prog_id)}
def _send_probe(cfg, port, proto="tcp"):
"""Send a single payload from the remote end using socat.
Args:
cfg: Configuration object containing network settings.
port: Port number for the exchange.
proto: Protocol to use, either "tcp" or "udp".
"""
cfg.require_cmd("socat", remote=True)
if proto == "tcp":
rx_cmd = f"socat -{cfg.addr_ipver} -T 2 TCP-LISTEN:{port},reuseport STDOUT"
tx_cmd = f"echo -n rss_hash_test | socat -t 2 -u STDIN TCP:{cfg.baddr}:{port}"
else:
rx_cmd = f"socat -{cfg.addr_ipver} -T 2 -u UDP-RECV:{port},reuseport STDOUT"
tx_cmd = f"echo -n rss_hash_test | socat -t 2 -u STDIN UDP:{cfg.baddr}:{port}"
with bkg(rx_cmd, exit_wait=True):
wait_port_listen(port, proto=proto)
cmd(tx_cmd, host=cfg.remote, shell=True)
# BPF map keys matching the enums in xdp_metadata.bpf.c
_SETUP_KEY_PORT = 1
_RSS_KEY_HASH = 0
_RSS_KEY_TYPE = 1
_RSS_KEY_PKT_CNT = 2
_RSS_KEY_ERR_CNT = 3
XDP_RSS_L4 = 0x8 # BIT(3) from enum xdp_rss_hash_type
@ksft_variants([
KsftNamedVariant("tcp", "tcp"),
KsftNamedVariant("udp", "udp"),
])
def test_xdp_rss_hash(cfg, proto):
"""Test RSS hash metadata extraction via bpf_xdp_metadata_rx_hash().
This test will only run on devices that support xdp-rx-metadata-features.
Loads the xdp_rss_hash program from xdp_metadata, sends a packet using
the specified protocol, and verifies that the program extracted a non-zero
hash with an L4 hash type.
"""
dev_info = cfg.netnl.dev_get({"ifindex": cfg.ifindex})
rx_meta = dev_info.get("xdp-rx-metadata-features", [])
if "hash" not in rx_meta:
raise KsftSkipEx("device does not support XDP rx hash metadata")
prog_info = _load_xdp_metadata_prog(cfg, "xdp_rss_hash")
port = rand_port()
bpf_map_set("map_xdp_setup", _SETUP_KEY_PORT, port)
rss_map_id = prog_info["maps"]["map_rss"]
_send_probe(cfg, port, proto=proto)
rss = bpf_map_dump(rss_map_id)
pkt_cnt = rss.get(_RSS_KEY_PKT_CNT, 0)
err_cnt = rss.get(_RSS_KEY_ERR_CNT, 0)
hash_val = rss.get(_RSS_KEY_HASH, 0)
hash_type = rss.get(_RSS_KEY_TYPE, 0)
ksft_ge(pkt_cnt, 1, comment="should have received at least one packet")
ksft_eq(err_cnt, 0, comment=f"RSS hash error count: {err_cnt}")
ksft_ne(hash_val, 0,
f"RSS hash should be non-zero for {proto.upper()} traffic")
ksft_pr(f" RSS hash: {hash_val:#010x}")
ksft_pr(f" RSS hash type: {hash_type:#06x}")
ksft_ne(hash_type & XDP_RSS_L4, 0,
f"RSS hash type should include L4 for {proto.upper()} traffic")
def main():
"""Run XDP metadata kfunc tests against a real device."""
with NetDrvEpEnv(__file__) as cfg:
cfg.netnl = NetdevFamily()
ksft_run(
[
test_xdp_rss_hash,
],
args=(cfg,))
ksft_exit()
if __name__ == "__main__":
main()
|