背景

最近做实验需要用到一套能零配置发现本地局域网内 Peer 的机制。在 IPv4 这边很好实现,直接在本地链路上广播即可。但 IPv6 废弃掉了广播,说明这并不是最优解。因此,我花了一点时间实现了一个基于组播的 Peer 发现的例子,稍加修改就能嵌入正在做的实验中。

其实挺早就知道 BitTorrent 协议有一个名为 Local Peer Discovery 的拓展,但并没有怎么使用过,也没有详细了解过。这和我的需求几乎完全一致,因此简单读了下该拓展的文档 BEP 14,发现这套机制是通过组播实现的。就它了!

说起组播,或者也有的地方译作多播,在这之前我对其了解并不多。只是粗略地知道,有一个叫做 IGMP 的协议,IPv4 大量地址段被留给了组播,同时这个家伙似乎可以实现一对多的通信。刚好,趁这个机会学习一下组播的运用。

组播

本段中,我将简要地按我自己的理解描述一下什么是组播。本段仅为个人理解,仅供参考。

所谓组播,即是将信息送给一组接收者。这与广播不同,链路上的所有主机都会接收广播,但只有这“一组”内的接收者会接收组播。从这个角度来看,组播实现了比广播更好的区分机制,也许这就是 IPv6 用组播替代广播的原因吧。但无论如何,组播数据包也会出现在同一个二层上,因此并不代表该组以外的主机不会收到,只是被丢弃掉了。

那么什么是“组”?在我的理解中就理解成群聊。主机可以自由选择加入或不加入群聊,加入群聊就会收到信息。这与传统的 IP 包基于路由表的模型不大一样。主机可以处在多个子网中,只要这些子网以及将它们连接起来的更大的网络支持组播即可。不过,本文中考虑的问题是同一个局域网内的 Peer 发现问题,因此下面不考虑这种情况,而是假定只有一个局域网。

那么在组播网络里,如何决定数据包的接收者?如何加入一个组?这个部分 CloudFlare 有一篇不错的文章,什么是 IGMP?| 互联网组管理协议

简单来说,每个组播组用一个特殊的 IP 地址来表示。在 IPv4 中是 224.0.0.0/4,而在 IPv6 中是 ff00::/8。不管是 IPv4 还是 IPv6,组播地址段都非常庞大。这些庞大的地址段内又划分出了供各种用途的地址段,具体可以参考维基百科上的 Multicast address。因为我需要的是局域网内的组播,因此使用 224.0.0.0/24ffx2::/16,其中 x 是任意数。具体来说,下面的例子中我将使用 224.0.0.114ff12:114:514:1919::810

那么该如何使用这些地址呢?这和普通的 Socket 编程不太一样。首先,需要创建一个正常的 UDP Socket(很显然,这种多对多的通信方式不可能用 TCP),然后让这个 Socket 加入对应的多播组。具体方式因操作系统而异。下面我的例子中使用 Rust 实现,而 Rust 封装了加入多播组的过程,因此只要调用对应的函数即可。服务端的数据包处理流程与普通 UDP 无异。在客户端方面,只要将多播地址作为发送数据的目标地址即可。

实现

完整的实现见 这个仓库。本段中,我将把代码拆开来解释多播的使用方式。

定义通信格式

这里非常简单,用 UDP 传递序列化的结构体,结构体里仅有一个节点名字。当然,可以根据实际需要加入更多字段,比如节点的 IP 地址,只要报文不超过各层的大小限制就好。

1
2
3
4
#[derive(Serialize, Deserialize)]
struct Node {
pub name: String,
}

服务端:创建 UDP Socket 并加入多播组

1
2
3
4
5
6
7
8
9
10
const IPV4_MULTICAST_ADDR: &'static str = "224.0.0.114";
const IPV6_MULTICAST_ADDR: &'static str = "ff12:114:514:1919::810";
const PORT: u16 = 5679;

let socket = SocketAddr::new(ip, PORT); // 这里变量 ip 是 IPv6 地址 [::]
let socket = UdpSocket::bind(socket).await?;
let multicast_ipv4_addr: Ipv4Addr = IPV4_MULTICAST_ADDR.parse()?;
let multicast_ipv6_addr: Ipv6Addr = IPV6_MULTICAST_ADDR.parse()?;
socket.join_multicast_v4(multicast_ipv4_addr, Ipv4Addr::UNSPECIFIED)?;
socket.join_multicast_v6(&multicast_ipv6_addr, 0)?;

第 5 行、第 6 行中,我创建了一个 UDP Socket 并绑定到了 [::]:5679。在大多数现代 Linux 上,默认情况下绑定到 [::] 会同时监听 IPv4。因此,我不再额外打开一个监听 0.0.0.0:5679 的 Socket。一个 Socket 足以处理双栈请求。

第 9 行、第 10 行即是调用 Rust 封装好的加入多播组的函数。这里其实是 Tokio 的实现,但是与 std 的语法一样。不得不说这里语法显得怪怪的:一个使用传所有权一个使用借用;一个用 IP 地址表示接口而一个使用接口编号。但总之,这样一来,起码在 Linux 上,我们实现了在所有接口上监听来自对应组播组的数据。

服务端:处理数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
let mut buf = [0u8; 1024];

loop {
let (len, src) = socket.recv_from(&mut buf).await?;

let socket = socket.clone();
tokio::spawn(async move {
let node: Node = match serde_json::from_slice(&buf[..len]) {
Ok(node) => node,
Err(e) => {
eprintln!("Failed to parse message from {src}: {e}. ");
return Ok(());
},
};

println!("Got peer {} from {src}. ", node.name);

let response = serde_json::to_vec(THIS_NODE.wait())?;
socket.send_to(&response, src).await?;

Ok::<(), Error>(())
});
}

这里和正常的 UDP 编程一样,没有太多值得分析的地方。解码传来的 Peer 信息,再将自己的 Peer 信息返回回去。

客户端:收发数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
for interface in datalink::interfaces() {
if interface.name != args[2] {
continue;
}

for ip in interface.ips {
let ip = ip.ip();

match ip {
IpAddr::V4(ipv4) => futures.push(tokio::spawn(async move {
let socket: SocketAddr = format!("{ipv4}:0").parse()?;
let socket = UdpSocket::bind(socket).await?;

let remote: SocketAddr = format!("{IPV4_MULTICAST_ADDR}:{PORT}").parse()?;
socket.set_multicast_loop_v4(false)?;

let message = serde_json::to_vec(THIS_NODE.wait())?;
socket.send_to(&message, remote).await?;

let mut buf = [0u8; 1024];

loop {
let (len, remote) = socket.recv_from(&mut buf).await?;
let peer: Node = serde_json::from_slice(&buf[..len])?;
println!("Got peer {} from {remote}. ", peer.name);
}
})),
IpAddr::V6(ipv6) => futures.push(tokio::spawn(async move {
let link_local = Ipv6Cidr::from_str("fe80::/10")?;
if link_local.contains(ipv6) {
return Ok(());
}

let socket: SocketAddr = format!("[{ipv6}]:0").parse()?;
let socket = UdpSocket::bind(socket).await?;

let remote: SocketAddr = format!("[{IPV6_MULTICAST_ADDR}]:{PORT}").parse()?;
socket.set_multicast_loop_v6(false)?;

let message = serde_json::to_vec(THIS_NODE.wait())?;
socket.send_to(&message, remote).await?;

let mut buf = [0u8; 1024];

loop {
let (len, remote) = socket.recv_from(&mut buf).await?;
let peer: Node = serde_json::from_slice(&buf[..len])?;
println!("Got peer {} from {remote}. ", peer.name);
}
})),
};
}
}

这里可能显得有些奇怪:为什么要遍历系统中的网卡?这是因为我们用到的组播地址是限制在特定链路上的,而为了保证链路指定正确,需要正确设置源 IP 地址。在我的系统上,经过测试发现,IPv4 无需特殊设定,而 IPv6 必须使用对应网卡上的 IP 地址才能正确发出数据包。因此,程序有一个参数是希望发送 Peer 发现请求的网卡名,存放在 args[2] 里。接下来,遍历所有网卡,当遍历到这张网卡时,再遍历 IP 地址,并分别发送 Peer 发现请求。

值得注意的是,link-local 地址在这里没有意义,而 Linux 会默认为每个接口加上 link-local 的 IPv6。所以,对于 IPv6 地址,我特判了是否属于 fe80::/10,并跳过这样的地址。

测试

只要保证节点们在一个局域网内即可完成测试。为了方便,我选择用 Docker 测试。以下是大致流程和结果:

1
2
$ cargo build --release --target x86_64-unknown-linux-musl
$ docker build . -t multicast_example

对于这类小工具,我比较喜欢静态链接来避免麻烦。之后打 Docker 镜像。

需要注意,Docker 默认不开启 IPv6。参考 Enable IPv6 support 来打开。示例里的 IPv6 地址段 2001:db8:1::/64 是文档用地址段,可以改成一段 ULA。但这里我就偷懒不改了。

最后测试效果。打开三个终端,分别启动容器,以下是效果展示:

1
2
3
4
5
$ docker run --rm --env NODE_NAME="alice" --env INTERFACE_NAME="eth0" multicast_example
Got peer bob from [::ffff:172.17.0.5]:55237.
Got peer bob from [2001:db8:1::242:ac11:5]:51804.
Got peer charlie from [::ffff:172.17.0.6]:34716.
Got peer charlie from [2001:db8:1::242:ac11:6]:44606.
1
2
3
4
5
$ docker run --rm --env NODE_NAME="bob" --env INTERFACE_NAME="eth0" multicast_example
Got peer alice from 172.17.0.4:5679.
Got peer alice from [2001:db8:1::242:ac11:4]:5679.
Got peer charlie from [::ffff:172.17.0.6]:34716.
Got peer charlie from [2001:db8:1::242:ac11:6]:44606.
1
2
3
4
5
$ docker run --rm --env NODE_NAME="charlie" --env INTERFACE_NAME="eth0" multicast_example
Got peer alice from 172.17.0.4:5679.
Got peer bob from 172.17.0.5:5679.
Got peer alice from [2001:db8:1::242:ac11:4]:5679.
Got peer bob from [2001:db8:1::242:ac11:5]:5679.

完美。这里服务端看到的 IPv4 地址是映射到 IPv6 的,如有需要简单转换一下即可。