使用 Netty 远程传输文件

x33g5p2x  于2022-05-30 转载在 其他  
字(7.3k)|赞(0)|评价(0)|浏览(267)

一 点睛

Netty 作为 NIO 框架,自然支持文件传输功能。本篇演示如何使用 Netty 进行远程发送文件。

二 待发送的文件

package netty.file;

import java.io.File;
import java.io.Serializable;

/**
 * @className: MySendFile
 * @description: 待发送的文件
 * @date: 2022/5/28
 * @author: cakin
 */
public class MySendFile implements Serializable {
    private static final long serialVersionUID = 1L;
    // 文件
    private File file;
    // 文件名
    private String fileName;
    // 开始位置
    private int start;
    // 结束位置
    private int end;
    // 数据
    private byte[] bytes;

    public File getFile() {
        return file;
    }

    public void setFile(File file) {
        this.file = file;
    }

    public String getFileName() {
        return fileName;
    }

    public void setFileName(String fileName) {
        this.fileName = fileName;
    }

    public int getStart() {
        return start;
    }

    public void setStart(int start) {
        this.start = start;
    }

    public int getEnd() {
        return end;
    }

    public void setEnd(int end) {
        this.end = end;
    }

    public byte[] getBytes() {
        return bytes;
    }

    public void setBytes(byte[] bytes) {
        this.bytes = bytes;
    }

    public static long getSerialversionuid() {
        return serialVersionUID;
    }

}

三 服务端

1 主程序类

package netty.file;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class MyNettyServerTest {
    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            ChannelFuture channelFuture = serverBootstrap
                    .group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new MyNettyServerInitializer())
                    .bind(8888).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

2 自定义初始化器

package netty.file;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

public class MyNettyServerInitializer extends ChannelInitializer<SocketChannel> {
    protected void initChannel(SocketChannel sc) throws Exception {
        ChannelPipeline pipeline = sc.pipeline();
        pipeline.addLast(new ObjectEncoder());
        pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(null))) ;
        // 自定义处理器
        pipeline.addLast( new MyNettyServerHandler());
    }
}

3 自定义处理器

package netty.file;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.io.File;
import java.io.RandomAccessFile;

public class MyNettyServerHandler extends SimpleChannelInboundHandler {
    private int readLenth;
    private int start = 0;
    private String file_dir = "g:/upload";

    /**
     * 功能描述:接受文件,每次接受文件的一部分
     *
     * @author cakin
     * @date 2022/5/28
     */
    @Override
    public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof MySendFile) {
            MySendFile sendFile = (MySendFile) msg;
            byte[] bytes = sendFile.getBytes();
            readLenth = sendFile.getEnd();
            String fileName = sendFile.getFileName();
            String path = file_dir + File.separator + fileName;
            File file = new File(path);
            RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
            randomAccessFile.seek(start);
            randomAccessFile.write(bytes);
            start = start + readLenth;
            if (readLenth > 0) {
                ctx.writeAndFlush(start);
                randomAccessFile.close();
            } else {
                ctx.flush();
                ctx.close();
            }
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        ctx.flush();
        ctx.close();
    }
}

四 客户端

1 主程序类

package netty.file;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.io.File;

public class MyNettyClientTest {
    // 连接服务器
    public static void connect(int port, String host,
                               final MySendFile fileUploadFile) throws Exception {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new MyNettyClientInitializer(fileUploadFile));
            ChannelFuture f = bootstrap.connect(host, port).sync();
            f.channel().closeFuture().sync();
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        int port = 8888;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                e.printStackTrace();
            }
        }
        try {
            MySendFile sendFile = new MySendFile();
            File file = new File("g:/navicat.chm");
            String fileName = file.getName();
            sendFile.setFile(file);
            sendFile.setFileName(fileName);
            sendFile.setStart(0);
            connect(port, "127.0.0.1", sendFile);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

2 自定义初始化器

package netty.file;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

public class MyNettyClientInitializer extends ChannelInitializer<SocketChannel> {
    MySendFile sendFile;

    public MyNettyClientInitializer(MySendFile fileUploadFile) {
        this.sendFile = fileUploadFile;
    }

    protected void initChannel(SocketChannel sc) throws Exception {
        ChannelPipeline pipeline = sc.pipeline();
        pipeline.addLast(new ObjectEncoder());
        pipeline.addLast(new ObjectDecoder(
                ClassResolvers.weakCachingConcurrentResolver(null)));
        // 自定义处理器
        pipeline.addLast(new MyNettyClientHandler(sendFile));
    }
}

3 自定义处理器

package netty.file;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.io.RandomAccessFile;

public class MyNettyClientHandler extends SimpleChannelInboundHandler {
    private int readLength;
    private int start = 0;
    private int lastLength = 0;
    public RandomAccessFile randomAccessFile;
    private MySendFile sendFile;

    public MyNettyClientHandler(MySendFile ef) {
        this.sendFile = ef;
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        System.out.println("【客户端】文件发送完毕");
    }

    /**
     * 功能描述:发送文件第一部分
     *
     * @author cakin
     * @date 2022/5/28
     */
    public void channelActive(ChannelHandlerContext ctx) {
        try {
            randomAccessFile = new RandomAccessFile(sendFile.getFile(),
                    "r");
            randomAccessFile.seek(sendFile.getStart());
            lastLength = 1024 * 1024;
            byte[] bytes = new byte[lastLength];
            if ((readLength = randomAccessFile.read(bytes)) != -1) {
                sendFile.setEnd(readLength);
                sendFile.setBytes(bytes);
                ctx.writeAndFlush(sendFile);
            } else {
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 功能描述:发送文件其他部分
     *
     * @author cakin
     * @date 2022/5/28
     */
    @Override
    public void channelRead0(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        if (msg instanceof Integer) {
            start = (Integer) msg;
            if (start != -1) {
                randomAccessFile = new RandomAccessFile(
                        sendFile.getFile(), "r");
                randomAccessFile.seek(start);
                int length = (int) (randomAccessFile.length() - start);
                if (length < lastLength) {
                    lastLength = length;
                }
                byte[] bytes = new byte[lastLength];
                if ((readLength = randomAccessFile.read(bytes)) != -1
                        && (randomAccessFile.length() - start) > 0) {
                    sendFile.setEnd(readLength);
                    sendFile.setBytes(bytes);
                    try {
                        ctx.writeAndFlush(sendFile);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                } else {
                    randomAccessFile.close();
                    ctx.close();
                    System.out.println("本地文件准备完毕");
                }
            }
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }

}

五 测试

先启动服务端,再启动客户端。就可将文件从 G:\navicat.chm 上传到 G:\upload\navicat.chm 下。

相关文章