Netty 是如何解决半包和粘包问题?

你好,我是猿java。

Netty 是一个高性能、异步事件驱动的网络应用框架,广泛应用于各种网络通信场景。这篇文章,我们将详细分析 Netty 是如何解决半包和粘包问题。

什么是半包和粘包?

半包问题

半包问题是指一个完整的应用层消息被分成多个 TCP 数据包发送,接收端在一次读取操作中只接收到消息的一部分。

例如,发送端发送了一条 100 字节的消息,但由于网络原因,这条消息被拆分成了两个 TCP 数据包,一个 60 字节,另一个 40 字节。接收端可能在第一次读取时只接收到前 60 字节的数据,剩下的 40 字节需要在后续的读取操作中才能接收到。

粘包问题

粘包问题是指多个应用层消息在传输过程中被粘在一起,接收端在一次读取操作中接收到大于 1个消息的情况。

例如,发送端发送了两条消息,每条 50 字节,但接收端在一次读取操作中收到了 80 字节的数据,超过了 1条消息的内容。

产生原因

产生半包和粘包问题主要是以下 3个原因:

  • TCP 的流式特性:TCP 是面向字节流的协议,没有消息边界的概念,它保证数据的顺序和可靠性,但不保证每次发送的数据对应每次接收的数据。
  • 网络状况:网络的拥塞、延迟、抖动等因素可能导致数据包的拆分和重组。
  • 操作系统和缓冲区:操作系统 TCP/IP 协议栈和应用程序的缓冲区大小也会影响数据的读取方式。

示例

假设发送端发送了两条消息:

  • 消息1:Hello
  • 消息2:World

在半包情况下,接收端可能会这样接收:

  • 第一次读取:Hel
  • 第二次读取:loWo
  • 第三次读取:rld

在粘包情况下,接收端可能会这样接收:

  • 第一次读取:HelloWor
  • 第二次读取:ld

解决方案

基于固定长度的解码器

基于固定长度的解码器是指发消息时,每条消息的长度固定,读消息时也通过固定长度来读取消息,从而解决半包和粘包问题。

实现方式

Netty 提供了 FixedLengthFrameDecoder 类来实现这一功能,核心源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
private final int frameLength;

public FixedLengthFrameDecoder(int frameLength) {
this.frameLength = frameLength;
}

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
while (in.readableBytes() >= frameLength) {
ByteBuf buf = in.readBytes(frameLength);
out.add(buf);
}
}
}

注意点

使用定长帧需要注意以下几点:

  • 固定长度:消息长度必须是固定的,发送端需要确保消息长度一致。如果长度超出固定长度,解包时消息就会错位,如果消息不足固定长度,需要使用填充字符补齐。
  • 填充字符:选择合适的填充字符(如空格)来补齐消息长度,接收端在处理时需要去除这些填充字符。

优点

  • 简单易实现:实现起来非常简单,不需要额外的头部信息或分隔符。
  • 解析效率高:由于每个消息长度固定,接收端解析时只需按照固定长度读取。

缺点

  • 不灵活:消息长度固定,可能会造成空间浪费(如果消息长度较短)或不足(如果消息长度较长)。
  • 适用场景有限:适用于固定格式和长度的协议,不适用于可变长度消息的场景。

示例

下面我们通过一个示例来展示使用定长帧是如何解决半包粘包问题的。

发送端,确保每个消息的长度固定。如果实际消息长度不足,可以使用填充字符(如空格)来补齐。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class FixedLengthFrameSender {

private static final int FRAME_LENGTH = 10; // 固定消息长度

public static void send(Channel channel, String message) {
// 确保消息长度不超过固定长度
if (message.length() > FRAME_LENGTH) {
throw new IllegalArgumentException("Message too long");
}
// 使用空格填充消息到固定长度
String paddedMessage = String.format("%-" + FRAME_LENGTH + "s", message);

// 将消息转换为字节数组并发送
ByteBuf buffer = Unpooled.copiedBuffer(paddedMessage.getBytes());
channel.writeAndFlush(buffer);
}
}

接收端,使用 Netty 提供的 FixedLengthFrameDecoder 解码器来处理固定长度的消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class FixedLengthFrameReceiver {
private static final int FRAME_LENGTH = 10; // 固定消息长度

public static void main(String[] args) throws Exception {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
// 添加定长帧解码器
p.addLast(new FixedLengthFrameDecoder(FRAME_LENGTH));
// 添加自定义处理器
p.addLast(new FixedLengthFrameHandler());
}
});
// 启动服务器
b.bind(8888).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

public static class FixedLengthFrameHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
byte[] receivedBytes = new byte[in.readableBytes()];
in.readBytes(receivedBytes);
String receivedMsg = new String(receivedBytes).trim(); // 去除填充字符
System.out.println("Received: " + receivedMsg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
}

基于换行符解码器

自定义分隔符解码器

基于换行符解码器和自定义分隔符解码器(比如 特殊字符)来划分消息边界,从而解决半包和粘包问题,使用者可以根据自己的需求灵活确定分隔符。

实现方式

Netty 提供了 DelimiterBasedFrameDecoder 类来实现这一功能,核心源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public DelimiterBasedFrameDecoder(
int maxFrameLength, boolean stripDelimiter, boolean failFast, ByteBuf... delimiters) {
validateMaxFrameLength(maxFrameLength);
ObjectUtil.checkNonEmpty(delimiters, "delimiters");

if (isLineBased(delimiters) && !isSubclass()) {
lineBasedDecoder = new LineBasedFrameDecoder(maxFrameLength, stripDelimiter, failFast);
this.delimiters = null;
} else {
this.delimiters = new ByteBuf[delimiters.length];
for (int i = 0; i < delimiters.length; i ++) {
ByteBuf d = delimiters[i];
validateDelimiter(d);
this.delimiters[i] = d.slice(d.readerIndex(), d.readableBytes());
}
lineBasedDecoder = null;
}
this.maxFrameLength = maxFrameLength;
this.stripDelimiter = stripDelimiter;
this.failFast = failFast;
}

注意点

  • 分隔符选择:选择一个不会出现在消息内容中的分隔符(如换行符 \n 或特定字符 |)。
  • 消息格式:发送端在每个消息的末尾添加分隔符,确保接收端能够正确解析消息边界。

优点

  • 灵活性高:可以处理可变长度的消息。
  • 实现相对简单:只需在消息末尾添加特定的分隔符,接收端根据分隔符拆分消息。

缺点

  • 分隔符冲突:如果消息内容中包含分隔符,可能导致解析错误,需要对消息内容进行转义处理。
  • 解析效率低:需要扫描整个数据流寻找分隔符,效率较低。

示例

下面我们通过一个示例来展示使用分隔符是如何解决半包粘包问题的。

发送端,确保每个消息以特定的分隔符结尾。常用的分隔符包括换行符(\n)、特定字符(如 |)等。

1
2
3
4
5
6
7
8
9
10
11
12
13
public class DelimiterBasedFrameSender {

private static final String DELIMITER = "\n"; // 分隔符

public static void send(Channel channel, String message) {
// 在消息末尾添加分隔符
String delimitedMessage = message + DELIMITER;

// 将消息转换为字节数组并发送
ByteBuf buffer = Unpooled.copiedBuffer(delimitedMessage.getBytes());
channel.writeAndFlush(buffer);
}
}

接收端,使用 Netty 提供的 DelimiterBasedFrameDecoder 解码器来处理以分隔符结尾的消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

public class DelimiterBasedFrameReceiver {

private static final String DELIMITER = "\n"; // 分隔符
private static final int MAX_FRAME_LENGTH = 1024; // 最大帧长度

public static void main(String[] args) throws Exception {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
// 添加分隔符解码器
ByteBuf delimiter = Unpooled.copiedBuffer(DELIMITER.getBytes());
p.addLast(new DelimiterBasedFrameDecoder(MAX_FRAME_LENGTH, delimiter));
// 添加字符串解码器
p.addLast(new StringDecoder());
// 添加自定义处理器
p.addLast(new DelimiterBasedFrameHandler());
}
});

// 启动服务器
b.bind(8888).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

public static class DelimiterBasedFrameHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
String receivedMsg = (String) msg;
System.out.println("Received: " + receivedMsg);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
}

基于长度字段的解码器

基于长度字段的解码器是指在消息头部添加长度字段,指示消息的总长度。

实现方式

Netty 提供了 LengthFieldBasedFrameDecoder 类来实现这一功能,核心源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder {
private final int maxFrameLength;
private final int lengthFieldOffset;
private final int lengthFieldLength;

public LengthFieldBasedFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) {
this.maxFrameLength = maxFrameLength;
this.lengthFieldOffset = lengthFieldOffset;
this.lengthFieldLength = lengthFieldLength;
}

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() < lengthFieldOffset + lengthFieldLength) {
return;
}

in.markReaderIndex();
int length = in.getInt(in.readerIndex() + lengthFieldOffset);
if (in.readableBytes() < lengthFieldOffset + lengthFieldLength + length) {
in.resetReaderIndex();
return;
}

in.skipBytes(lengthFieldOffset + lengthFieldLength);
ByteBuf frame = in.readBytes(length);
out.add(frame);
}
}

关键点

  • 长度字段位置:长度字段通常位于消息的头部,用于指示消息的总长度。
  • 解码器参数:
    • maxFrameLength:消息的最大长度,防止内存溢出。
    • lengthFieldOffset:长度字段在消息中的偏移量。
    • lengthFieldLength:长度字段的字节数(通常为 4 字节)。
    • lengthAdjustment:长度调整值,如果长度字段不包含消息头的长度,需要进行调整。
    • initialBytesToStrip:解码后跳过的字节数,通常为长度字段的长度。

优点

  • 灵活性高:支持可变长度的消息。
  • 解析效率高:通过长度字段可以直接读取完整消息,无需扫描整个数据流。

缺点

  • 实现复杂:需要在消息头部添加长度字段,接收端需要解析头部信息。
  • 额外开销:消息头部的长度字段会增加一些额外的字节数。

示例

下面我们通过一个示例来展示使用长度字段是如何解决半包粘包问题的。

发送端,确保每个消息在发送前都包含长度字段。长度字段通常放在消息的头部,用于指示消息的总长度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;

public class LengthFieldBasedFrameSender {

public static void send(Channel channel, String message) {
// 将消息转换为字节数组
byte[] messageBytes = message.getBytes();
int messageLength = messageBytes.length;

// 创建一个 ByteBuf 来存储长度字段和消息内容
ByteBuf buffer = Unpooled.buffer(4 + messageLength);

// 写入长度字段(4 字节,表示消息长度)
buffer.writeInt(messageLength);

// 写入消息内容
buffer.writeBytes(messageBytes);

// 发送消息
channel.writeAndFlush(buffer);
}
}

接收端,使用 Netty 提供的 LengthFieldBasedFrameDecoder 解码器来处理包含长度字段的消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

public class LengthFieldBasedFrameReceiver {

private static final int MAX_FRAME_LENGTH = 1024; // 最大帧长度

public static void main(String[] args) throws Exception {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
// 添加长度字段解码器
p.addLast(new LengthFieldBasedFrameDecoder(
MAX_FRAME_LENGTH, 0, 4, 0, 4));
// 添加字符串解码器
p.addLast(new StringDecoder());
// 添加自定义处理器
p.addLast(new LengthFieldBasedFrameHandler());
}
});

// 启动服务器
b.bind(8888).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

public static class LengthFieldBasedFrameHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
String receivedMsg = (String) msg;
System.out.println("Received: " + receivedMsg);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
}

自定义解码器

如果上述 Netty提供的方案无法满足业务需求的话,Netty还提供了一个扩展点,使用者可以通过自定义解码器来处理消息,

实现方式

例如,自定义头部信息来表示消息长度或结束标志,示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class CustomProtocolDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 根据自定义协议解析消息
if (in.readableBytes() < 4) {
return;
}

in.markReaderIndex();
int length = in.readInt();
if (in.readableBytes() < length) {
in.resetReaderIndex();
return;
}

ByteBuf frame = in.readBytes(length);
out.add(frame);
}
}

优点

  • 高度灵活:可以根据具体需求设计协议,适应各种复杂场景。
  • 功能丰富:可以在自定义协议中添加其他信息(如校验和、序列号等),增强协议的功能和可靠性。

缺点

  • 实现复杂:设计和实现自定义协议需要更多的工作量。
  • 维护成本高:自定义协议可能需要更多的维护和更新工作。

总结

本文我们分析了产生半包和粘包的原因以及在Netty中的 5种解决方案:

  • 基于固定长度解码器
  • 基于换行符解码器
  • 自定义分隔符解码器
  • 基于长度字段解码器
  • 自定义解码器

通过学习这些内容,我们不仅掌握了半包和粘包问题的理论知识,同时也学会了在日常开发的技术设计时,我们建议给出多种解决方法供大家讨论和选择。

学习交流

如果你觉得文章有帮助,请帮忙转发给更多的好友,或关注公众号:猿java,持续输出硬核文章。

drawing