项目中需要迁移一部分文件,原计划通过ftp上传;但是服务器环境中ftp传输总是有问题,因此自己开发一个文件传输服务
设计
本次任务中需要实现一个文件传输程序,不需要额外的功能。
程序分为 server 端和 client 端
server 采用netty实现;client采用原生socket即可
传输中涉及协议定义;协议主要是为了方便扩展,如果只是一个简单的文件传输,那么只需要用socket即可
协议定义如下
- 开头四个byte 用于存储本次帧长度
- 后续字节采用json格式,如下
1 2 3 4 5
| { "type":"本次协议的操作", "data":"协议的数据,一般是文件的base64" }
|
实现
server
只展示核心的 pipline 实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast( new StringEncoder(), new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4), new StringDecoder(CharsetUtil.UTF_8), new SimpleChannelInboundHandler<String>() {
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
logger.debug("收到消息:{}", msg); JSONObject json = JSONObject.parseObject(msg); String body = null; if (json.getString("type").equals("check")) {
String path = json.getString("msg");
body = new File(dir, path).exists() + ""; } else if (json.getString("type").equals("upload")) { String path = json.getString("msg"); String data = json.getString("data"); File file = new File(dir, path); if (!file.getParentFile().exists()) { file.getParentFile().mkdirs(); } byte[] bytes = Base64.decodeBase64(data); try (FileOutputStream out = new FileOutputStream(file)) { IOUtils.write(bytes, out); } body = "ok"; logger.debug("upload write"); } else if (json.getString("type").equals("download")) { String path = json.getString("msg"); File file = new File(dir, path); JSONObject j = new JSONObject(); if (file.exists()) { j.put("r", "true"); j.put("data", Base64.encodeBase64String(IOUtils.toByteArray(new FileInputStream(file)))); } else { j.put("r", "false"); } body = j.toJSONString(); logger.debug("download {}", j.toString()); }
int length = body.getBytes().length; byte[] bytes = new byte[4]; bytes[0] = (byte) (length >>> 24); bytes[1] = (byte) (length >>> 16); bytes[2] = (byte) (length >>> 8); bytes[3] = (byte) length;
logger.debug("写入长度={}", length); ByteBuf buf = Unpooled.buffer(4 + length).writeBytes(bytes).writeBytes(body.getBytes()); ctx.writeAndFlush(buf);
} } ); }
|
其中较为重要的是对数据的拆包,用到了 new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)
依次解释参数
- Integer.MAX_VALUE 数据包的最大长度
- 0 字节偏移量,代表帧与帧之间是否有间隔,此次为0即可
- 4 代表长度字段的字节数,本次使用int存储长度,因此是4个长度
- 0
- 4 交给下一个handler的数据跳过的字节数。例如一个帧 携带的数据长度 20 字节,加上长度,一共是 24 字节;此处为4,代表拆包器交给下一个handler的数据会跳过4个字节,从下标4处开始读取长度字段大小的字节数据,也就是20个字节
new StringDecoder(CharsetUtil.UTF_8) 此处是对数据进行解码,后续handler可以直接使用字符串,其参数来自于LengthFieldBasedFrameDecoder读取的数据
最后一个就是处理器,其获取的数据就是json格式,直接处理即可;
需要注意的是数据返回。返回给client的数据也要遵循协议,即开头4个字节代表数据大小
此处采用了对int进行位运算的方案,而不是调用更为方便的 writeInt(int) 方法;
因为该方法并不会一定写入四个字节,而是写入当前int实际占据的字节数。例如本次数据长度为 2,那么实际只会写入一个字节;因为int占据4个字节,但是 2 只会用到32位低位的两位,netty只会写入一个字节
client
client端采取原生的socket实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| private String send(JSONObject json) throws IOException { String body = json.toJSONString(); int length = body.length(); Socket socket = new Socket(properties.getHost(), properties.getPort()); socket.getOutputStream().write(int2Bytes(length)); socket.getOutputStream().write(body.getBytes());
byte[] le = new byte[4]; InputStream input = socket.getInputStream();
length = input.read(le, 0, 4); logger.debug("[netty] - 读取长度的字节长度={}", length); if (length != 4) { throw new AppMsgException("错误的包数据"); } length = bytes2Int(le);
logger.debug("[netty] - 包的长度={}", length); byte[] bytes = new byte[length];
byte[] cache = new byte[65535]; int readLength = 0; do { int t = input.read(cache, 0, cache.length); for (int i = 0; i < t; i++, readLength++) { bytes[readLength] = cache[i]; } } while (readLength != length); socket.close(); return new String(bytes); }
|
基本逻辑是——建立链接->发送数据->读取返回
socket.getOutputStream().write(int2Bytes(length)); 这一行的原理同上述返回,都是确保一定写入了4个字节
读取内容也较为简单,也是先读取四个字节,然后再读取指定长度的数据
注意 原生socket 缓冲区大小有限制,即使指定读取 50 个字节,也可能无法读取到足够的数据,因此需要循环读取,保证能够读取到指定长度的数据
最后再提一下,socket连接尽量不要使用多线程。我就因为把Socket作为成员变量,然后多线程操作,结果导致数据读取老是出错