基于netty的文件传输

项目中需要迁移一部分文件,原计划通过ftp上传;但是服务器环境中ftp传输总是有问题,因此自己开发一个文件传输服务

设计

本次任务中需要实现一个文件传输程序,不需要额外的功能。


程序分为 server 端和 client

server 采用netty实现;client采用原生socket即可


传输中涉及协议定义;协议主要是为了方便扩展,如果只是一个简单的文件传输,那么只需要用socket即可

协议定义如下

  • 开头四个byte 用于存储本次帧长度
  • 后续字节采用json格式,如下
    1
    2
    3
    4
    5
    {
    "type":"本次协议的操作",
    "data":"协议的数据,一般是文件的base64"
    }

实现

server

只展示核心的 pipline 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65

protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(
new StringEncoder(),
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4),
new StringDecoder(CharsetUtil.UTF_8),
new SimpleChannelInboundHandler<String>() {

protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {

logger.debug("收到消息:{}", msg);
JSONObject json = JSONObject.parseObject(msg);
String body = null;
if (json.getString("type").equals("check")) {//校验文件是否存在

String path = json.getString("msg");

body = new File(dir, path).exists() + "";
} else if (json.getString("type").equals("upload")) {//文件上传
String path = json.getString("msg");
String data = json.getString("data");//文件base64
File file = new File(dir, path);
if (!file.getParentFile().exists()) {
file.getParentFile().mkdirs();
}
// base64解码
byte[] bytes = Base64.decodeBase64(data);
try (FileOutputStream out = new FileOutputStream(file)) {
IOUtils.write(bytes, out);
}
body = "ok";
logger.debug("upload write");
} else if (json.getString("type").equals("download")) {
String path = json.getString("msg");
File file = new File(dir, path);
JSONObject j = new JSONObject();
if (file.exists()) {
j.put("r", "true");
j.put("data", Base64.encodeBase64String(IOUtils.toByteArray(new FileInputStream(file))));
} else {
j.put("r", "false");
}
body = j.toJSONString();
logger.debug("download {}", j.toString());
}

int length = body.getBytes().length;
// 首先写一个长度
byte[] bytes = new byte[4];
//通过移位运算,截取低8位的方式,将int保存到byte数组
bytes[0] = (byte) (length >>> 24);
bytes[1] = (byte) (length >>> 16);
bytes[2] = (byte) (length >>> 8);
bytes[3] = (byte) length;

logger.debug("写入长度={}", length);
ByteBuf buf = Unpooled.buffer(4 + length).writeBytes(bytes).writeBytes(body.getBytes());
ctx.writeAndFlush(buf);
// ctx.writeAndFlush(bytes);
// ctx.writeAndFlush(Unpooled.copiedBuffer((body).getBytes()));

}
}
);
}

其中较为重要的是对数据的拆包,用到了 new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)

依次解释参数

  • Integer.MAX_VALUE 数据包的最大长度
  • 0 字节偏移量,代表帧与帧之间是否有间隔,此次为0即可
  • 4 代表长度字段的字节数,本次使用int存储长度,因此是4个长度
  • 0
  • 4 交给下一个handler的数据跳过的字节数。例如一个帧 携带的数据长度 20 字节,加上长度,一共是 24 字节;此处为4,代表拆包器交给下一个handler的数据会跳过4个字节,从下标4处开始读取长度字段大小的字节数据,也就是20个字节

new StringDecoder(CharsetUtil.UTF_8) 此处是对数据进行解码,后续handler可以直接使用字符串,其参数来自于LengthFieldBasedFrameDecoder读取的数据


最后一个就是处理器,其获取的数据就是json格式,直接处理即可;

需要注意的是数据返回。返回给client的数据也要遵循协议,即开头4个字节代表数据大小

此处采用了对int进行位运算的方案,而不是调用更为方便的 writeInt(int) 方法;

因为该方法并不会一定写入四个字节,而是写入当前int实际占据的字节数。例如本次数据长度为 2,那么实际只会写入一个字节;因为int占据4个字节,但是 2 只会用到32位低位的两位,netty只会写入一个字节

client

client端采取原生的socket实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

private String send(JSONObject json) throws IOException {
String body = json.toJSONString();
int length = body.length();
Socket socket = new Socket(properties.getHost(), properties.getPort());
socket.getOutputStream().write(int2Bytes(length));
socket.getOutputStream().write(body.getBytes());

// 读取长度

byte[] le = new byte[4];
InputStream input = socket.getInputStream();

length = input.read(le, 0, 4);
logger.debug("[netty] - 读取长度的字节长度={}", length);
if (length != 4) {
throw new AppMsgException("错误的包数据");
}
length = bytes2Int(le);

logger.debug("[netty] - 包的长度={}", length);
byte[] bytes = new byte[length];

byte[] cache = new byte[65535];//缓冲区大小
int readLength = 0;
do {
int t = input.read(cache, 0, cache.length);
for (int i = 0; i < t; i++, readLength++) {
bytes[readLength] = cache[i];
}
} while (readLength != length);
socket.close();
return new String(bytes);
}


基本逻辑是——建立链接->发送数据->读取返回

socket.getOutputStream().write(int2Bytes(length)); 这一行的原理同上述返回,都是确保一定写入了4个字节

读取内容也较为简单,也是先读取四个字节,然后再读取指定长度的数据

注意 原生socket 缓冲区大小有限制,即使指定读取 50 个字节,也可能无法读取到足够的数据,因此需要循环读取,保证能够读取到指定长度的数据


最后再提一下,socket连接尽量不要使用多线程。我就因为把Socket作为成员变量,然后多线程操作,结果导致数据读取老是出错


基于netty的文件传输
http://blog.inkroom.cn/2020/05/15/1BZHG4N.html
作者
inkbox
发布于
2020年5月15日
许可协议