使用java非阻塞套接字进行多文件传输时丢失的消息

gkl3eglg  于 2021-07-03  发布在  Java
关注(0)|答案(0)|浏览(224)

我正在通过一个数据通道传输多个图像,使用从服务器到客户端的非阻塞连接。第一个客户机完全接收所有图像,但是第二个客户机丢失了第二个图像的消息/包,其余的都正常。
我有一个计数器来计算发送和接收的消息的数量,我还有下一个输出
服务器输出客户端输出
所以我收到了25405条信息,而不是28217条。
我还必须发送128字节的小程序包,如果不是的话,图像就会损坏。我猜某些消息丢失是因为程序正忙于处理前一条消息,但我不知道如何确保它到达。
代码如下:
服务器

public class Server extends Thread{    
    String initTag = "<in>";
    String imgPacketTag = "<img>";
    String titleTag = "<title>";
    String endFileTag = "<end>";

    int pto=9000;    
    String hhost="localhost";    
    InetSocketAddress clientSocket;
    DatagramChannel s;

    SelectionKey selKey;
    Selector sel;
    int bufferSize = 1024*1024;
    ByteBuffer b = ByteBuffer.allocate(bufferSize); 

    File [] imagenes = new File[21];
    final File folder = new File("imagenes");

    public void run(){

        listFilesForFolder(folder);

        try{                             

//           NetworkInterface ni = NetworkInterface.getByName("eth3");
           InetSocketAddress dir = new InetSocketAddress(hhost, pto);
           s =  DatagramChannel.open(StandardProtocolFamily.INET);
           s.setOption(StandardSocketOptions.SO_REUSEADDR, true);
//           s.setOption(StandardSocketOptions.IP_MULTICAST_IF, ni);
//           InetAddress group = InetAddress.getByName("230.0.0.1");
//           s.join(group, ni);
           s.configureBlocking(false);
           s.socket().bind(dir);
           sel = Selector.open();

           s.register(sel, SelectionKey.OP_READ);           

           System.out.println("Servidor listo.. Esperando datagramas...");
           while(true){
               sel.select();
               Iterator<SelectionKey>it = sel.selectedKeys().iterator();
               while(it.hasNext()){
                   SelectionKey k = (SelectionKey)it.next();
                   it.remove();                          
                   if(k.isReadable()){
                      handleReading(k);
                      continue;
                   }
               }//while
           }//while
        }catch(Exception e){
            e.printStackTrace();
        }//catch
    }//main

    public void processMsg(String msgReceived){
        System.out.println(msgReceived);
        if (msgReceived.contains(initTag)) {                                                     
            sendImages();
        }
    }    

    public void sendImages(){

        try{                                                   

            for (int i = 0; i < imagenes.length; i++) {                                
                File fileToSend = imagenes[i];  //Manejador  
                long tam = fileToSend.length();
                sendMessage(titleTag+fileToSend.getName()+","+tam);

                DataInputStream inputStream = new DataInputStream(new FileInputStream(fileToSend));                
                byte[] bArray = new byte[128];

                int cont=0;
                int bytesRead;

                int sendPkg = 0;

                while ( sendPkg < tam){                  
                    sendPkg+=128;
                    bytesRead = inputStream.read(bArray);
                    String newMsg;
                    if (sendPkg>=tam) {
                     newMsg = endFileTag + new String(bArray, 0, bytesRead, "ISO-8859-1");   
                    }else{
                     newMsg = imgPacketTag + new String(bArray, 0, bytesRead, "ISO-8859-1");   
                    }                     
                    sendMessage( newMsg );                    
                    cont++;                                       
                }                                         
                //sendMessage(endFileTag);
                System.out.println("Archivo "+fileToSend.getName()+" enviado con "+cont);                           
                inputStream.close();       
            }
            s.register(sel, SelectionKey.OP_READ);
            System.out.println("Archivos enviados");                  
        }catch(Exception e){
            e.printStackTrace();
        }
    }

    public void sendMessage(String txt){
        try {
            byte[] msgBytes;        
            msgBytes = txt.getBytes("ISO-8859-1");
            sendMessage(msgBytes);        
        } catch (UnsupportedEncodingException ex) {
            Logger.getLogger(Server.class.getName()).log(Level.SEVERE, null, ex);
        }

    }

    public void sendMessage(byte[] msgBytes){
        try {
            selKey = s.register(sel, SelectionKey.OP_WRITE);
            DatagramChannel ch =  (DatagramChannel)selKey.channel();

            b.clear();        
            //byte[] msgBytes = txt.getBytes();        
            b.putInt(msgBytes.length);                
            b.put(msgBytes);
            b.flip();        

            ch.send(b, clientSocket);

            s.register(sel, SelectionKey.OP_READ);                        
        } catch (ClosedChannelException ex) {
            Logger.getLogger(Server.class.getName()).log(Level.SEVERE, null, ex);
        } catch (IOException ex) {
            Logger.getLogger(Server.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    public void handleReading(SelectionKey k) throws IOException{

        DatagramChannel ch = (DatagramChannel)k.channel();
        b.clear();
        SocketAddress emisor = ch.receive(b);
        b.flip();
        clientSocket = (InetSocketAddress)emisor;
        System.out.println("Datagrama recibido desde "+ clientSocket.getAddress()+":"+clientSocket.getPort());                        
        int len = b.getInt();

        byte[] bytes = new byte[len]; 
        b.get(bytes);
        String msgReceived = new String(bytes);
        processMsg(msgReceived);            

    }

    public void listFilesForFolder(final File folder) {                
        File[] filesInFolder = folder.listFiles(); // This returns all the folders and files in your path
        int i = 0;
        for (File file : filesInFolder) { //For each of the entries do:
            if (!file.isDirectory()) { //check that it's not a dir
                imagenes[i] = file; //push the filename as a string
                System.out.println(file.getName());
            }
            i++;
        }                    
    }

}

客户

public class Client extends Thread{
    String nombre;
    int tipo;
    String msg;

    int pto=9000;    
    String hhost="localhost";
    SocketAddress remote=null;
    DatagramChannel cl;

    SelectionKey selKey;
    Selector sel;
    int bufferSize = 1024*1024;
    ByteBuffer b = ByteBuffer.allocate(bufferSize);    

    String initTag = "<in>";
    String imgPacketTag = "<img>";
    String titleTag = "<title>";
    String endFileTag = "<end>";

    String currentImgTitle ="a.png";
    FileOutputStream imgStream;
    int contador = 0;

    public Client(String nombre, int tipo){
        this.nombre = nombre;
        this.tipo = tipo;
        msg = initTag+nombre;
        File f = new File("imagenes"); 
        if (f.mkdir()) {
            System.out.println("directorio imagenes creado"); 
        }        
    }

    public void run(){     
        try{

            remote = new InetSocketAddress(hhost, pto);            

            cl = DatagramChannel.open(StandardProtocolFamily.INET);
            //cl.setOption(StandardSocketOptions.SO_REUSEADDR, true);
//            cl.setOption(StandardSocketOptions.IP_MULTICAST_IF, ni);
            cl.configureBlocking(false);
            sel = Selector.open();
            selKey = cl.register(sel, SelectionKey.OP_WRITE);

//            NetworkInterface ni = NetworkInterface.getByName("eth3");
//            InetAddress group = InetAddress.getByName("230.0.0.1");
//            cl.join(group, ni);            

            while(true){
                sel.select();
                Iterator<SelectionKey>it = sel.selectedKeys().iterator();
                while(it.hasNext()){
                    SelectionKey k = (SelectionKey)it.next();
                    it.remove();
                    if(k.isWritable()){
                        sendMessage();
                        continue;
                    }
                    if (k.isReadable()) {
                        handleReading(k);
                        continue;
                    }
                }//while                
            }//while            
        }catch(Exception e){
            e.printStackTrace();
        }//catch
    }

    public void processMsg(String txt){        

        if (txt.contains(titleTag)) {
            contador = 0;            
            String [] separadas  = (txt.replace(titleTag, "")).split(",");
            currentImgTitle = separadas[0];
            System.out.println("title: "+currentImgTitle);                
            //System.out.println("tam: "+separadas[1]);                            
            try {                            
                imgStream =  new FileOutputStream( "imagenes/"+currentImgTitle ) ;
            } catch (FileNotFoundException ex) {
                Logger.getLogger(Client.class.getName()).log(Level.SEVERE, null, ex);
            }            
            return;
        }           
        if (txt.contains(endFileTag)) {            

            try {
                contador++;               
                txt = txt.replace(endFileTag, "");                                                    
                byte[] buffer = txt.getBytes("ISO-8859-1");                                
                imgStream.write( buffer);                
                imgStream.flush();            
                System.out.println(" end of file "+contador);  
                imgStream.close();
            } catch (IOException ex) {
                Logger.getLogger(Client.class.getName()).log(Level.SEVERE, null, ex);
            }
            return;
        }
        if (txt.contains(imgPacketTag)) {
            contador++;            
            txt = txt.replace(imgPacketTag, "");                        
            try {                
                byte[] buffer = txt.getBytes("ISO-8859-1");                                
                imgStream.write( buffer);                
                imgStream.flush();
            } catch (IOException ex) {
                Logger.getLogger(Client.class.getName()).log(Level.SEVERE, null, ex);
            }
            return;
        }

    }

    public void sendMessage(){
        DatagramChannel ch =  (DatagramChannel)selKey.channel();
        b.clear();

        byte[] msgBytes = msg.getBytes();        
        b.putInt(msgBytes.length);                
        b.put(msgBytes);

        b.flip();

        try {
            ch.send(b, remote);
            cl.register(sel, SelectionKey.OP_READ);                        
        } catch (ClosedChannelException ex) {
            Logger.getLogger(Client.class.getName()).log(Level.SEVERE, null, ex);
        } catch (IOException ex) {
            Logger.getLogger(Client.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    public void handleReading(SelectionKey k) throws IOException{

        DatagramChannel ch = (DatagramChannel)k.channel();
        b.clear();
        SocketAddress emisor = ch.receive(b);
        b.flip();
        InetSocketAddress d = (InetSocketAddress)emisor;
        //System.out.println("Datagrama recibido desde "+ d.getAddress()+":"+d.getPort());                        
        int len = b.getInt();

        byte[] bytes = new byte[len]; 
        b.get(bytes);
        String msgReceived = new String(bytes, "ISO-8859-1");
        processMsg(msgReceived);            

    }
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题