java socket编程,bufferedreader的readline在收到客户端的响应后被阻塞

dzhpxtsq  于 2021-07-12  发布在  Java
关注(0)|答案(0)|浏览(264)

我正在尝试学习使用java的sockets,我成功地将数据发送到在我自己的机器上运行的serversocket,现在我需要在我们的项目中实现paymentgateway应用程序
在server.java上,它挂在readline()中。如何解决这个问题?感谢您在这方面的帮助。工作正常,直到低于line.system.out.println(“在这里”);然后就被封锁了。

public class Server implements Runnable {

        Server(Socket sock) throws IOException {
            this.socket = sock;
        }

        static{
            serverconfig = initServerConfig();
            threadPool = Executors.newScheduledThreadPool(Integer.parseInt((String)serverconfig.get("SERVER_PROCESSOR_THREAD")));
        }

        private static HashMap initServerConfig(){
            HashMap table = new HashMap();
            ResourceBundle myResources = null;
            Properties sysProps = System.getProperties();       
            sysProps.put("oracle.jdbc.V8Compatible", "true");       
            System.setProperties(sysProps);
            Locale locale = Locale.getDefault();
            try{
                myResources = ResourceBundle.getBundle(FILENAME,locale);
            }catch(MissingResourceException e){
                log.error("server config not found", e);
            }
            if(myResources!=null){
                log.info("Loading server configuraton ...");
                table.put("SERVER_IP",myResources.getString("server.ip"));
                table.put("SERVER_PORT",myResources.getString("server.port"));
                table.put("SERVER_PROCESSOR_THREAD",myResources.getString("processor.threads"));
        /**   table.put("MSG_HEADER_LENGTH",myResources.getString("msgheader.length"));*/
                table.put("ISO_HEADER_LENGTH",myResources.getString("isoheader.length"));           
                table.put("SOCKET_BUFFER_LENGTH",myResources.getString("socket.buffer.length"));
                table.put("SOCKET_TIME_OUT_SEC",myResources.getString("socket.timeout.sec"));
                String arrAllowed[] = myResources.getString("allowed.client.ip").split(",");            
                table.put("ALLOWED_CLIENT_IP",new ArrayList(Arrays.asList(arrAllowed)));            
            }
            else
            {
                log.error("ISO8583 server module is not deployed properly");
                System.exit(0);
            }

            return table;
        }

        public static void main(String[] args) throws Exception {

            mfact=ConfigParser.createFromUrl(new URL("file:///D:/SmartFren/ISO8583/7.0.0/modules/ISO8583_Payment_Gateway/src/defaultconfig/server-config.xml"));

            mfact.setAssignDate(true);

            mfact.setTraceNumberGenerator(new SimpleTraceGenerator((int) (System.currentTimeMillis() % 10000)));        
            log.info("Setting up server socket...");
            int port = Integer.parseInt((String)serverconfig.get("SERVER_PORT"));
            if(available(port)) {           
                ServerSocket serverSocket = new ServerSocket(port); 
                log.info("Waiting for connections...");     
                while (true) {
                    Socket socket = serverSocket.accept();
                    PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
                    //out.println("output");
                    String remoteAddrss = socket.getInetAddress().toString();
                    int remotePort = socket.getPort();
                    if(((List)serverconfig.get("ALLOWED_CLIENT_IP")).contains(remoteAddrss.replaceAll("/",""))) {
                        if(clientSockets.containsKey(remoteAddrss)) {
                            log.info(String.format("New connection from %s:%d so closing old connection socket", remoteAddrss, remotePort));
                            try {
                                ((Socket)clientSockets.get(remoteAddrss)).close();
                            }catch (SocketException ex) {
                                log.error("SocketException occurred in closing old socket...", ex);
                            }catch (IOException ex) {
                                log.error("IOException occurred in closing old socket...", ex);
                            }
                            clientSockets.remove(remoteAddrss);
                        }else {
                            log.info(String.format("New connection from %s:%d", remoteAddrss, remotePort)); 
                        }
                        clientSockets.put(remoteAddrss, socket);
                        socket.setSoTimeout(Integer.parseInt((String)serverconfig.get("SOCKET_TIME_OUT_SEC"))*1000);//takes input in miliseconds.
                        socket.setReceiveBufferSize(Integer.parseInt((String)serverconfig.get("SOCKET_BUFFER_LENGTH")));
                        new Thread(new Server(socket), "Sitra-j8583-client-handler").start();
                    }else {
                        log.debug(String.format("Connected remote client %s is not allowed to accesss the services.",remoteAddrss));
                        try {
                            socket.close();
                        }catch(Exception ex) {
                            log.error("Exception Caught while closing the socket.",ex);
                        }
                    }
                }           
            }else {
                log.debug("Server is already running. Could not start another server instance.");
            }       
        }
        @Override
        public void run() {
            int count = 0;
            System.out.println("in run");

            synchronized(this){
                this.runningThread = Thread.currentThread();
            }
            try {                       
                ServerShutdownHook shutdownHook = new ServerShutdownHook(socket);
                Runtime.getRuntime().addShutdownHook(shutdownHook);     

                while (socket.isConnected() && !socket.isClosed() && !socket.isInputShutdown()
                        && this.runningThread.isAlive() && !this.runningThread.isInterrupted()) {
                    System.out.println("connection ");

                        System.out.println("here it is");

 BufferedReader in = new BufferedReader (new InputStreamReader
 (socket.getInputStream ()));

String fromClient = in.readLine();

   System.out.println("fromClient"+fromClient);
   log.info("value reciedvec" + fromClient);
                        byte[] buf = new byte[fromClient.getBytes().length];
                        // We're not expecting ETX in this case

                        buf =fromClient.getBytes();

                        count++;

                        threadPool.schedule(new Processor(buf, socket), 400,TimeUnit.MILLISECONDS);
                /**}else{
                        System.out.println("Not in length buffer size");
                    }*/
                }
            }catch (SocketTimeoutException ex) {
                log.error("SocketTimeoutException occurred...", ex);
            }catch (SocketException ex) {
                log.error("SocketException occurred...", ex);
            }catch (IOException ex) {
                log.error("IOException occurred...", ex);
            }
            log.debug(String.format("Exiting after reading %d requests", count));
            try {
                socket.close();//additional try to close socket if input stream is only closed.
            } catch (IOException ex) {
                log.error("Exception Caught while closing the socket.",ex);
            }
        }

        private IsoMessage generateNetworkMessage(MessageFactory mfact,int iType){
            IsoMessage req = mfact.newMessage(0x800);       
            req.setValue(ISO8583Fields.NETWORK_MANAGEMENT_INFORMATION_CODE, iType, IsoType.NUMERIC, 3);
            return req;     
        }

        private static boolean available(int port) { 
            ServerSocket ss = null; 
            DatagramSocket ds = null; 
            try { 
                ss = new ServerSocket(port); 
                ss.setReuseAddress(true); 
                ds = new DatagramSocket(port); 
                ds.setReuseAddress(true); 
                return true; 
            } catch (IOException e) {
                log.error("IOException Caught.",e);
            } finally { 
                if (ds != null) { 
                    ds.close(); 
                }
                if (ss != null) { 
                    try { 
                        ss.close(); 
                    } catch (IOException e) {
                        log.error("IOException Caught.",e);
                        /* should not be thrown */ 
                    } 
                } 
            }
            return false; 
        }

        public synchronized void stop(){
            this.isStopped = true;
            try {
                this.serverSocket.close();
            } catch (IOException e) {
                log.error("Error closing server", e);
            }
        }

        private class ServerShutdownHook extends Thread {
            private Socket sock = null;

            ServerShutdownHook(Socket inSoc){
                this.sock = inSoc;
            }
            @Override
            public void run() { System.out.println(" ServerShutdownHook connection ");
                try{
                    if(sock != null && sock.isConnected() ){
                            IsoMessage req = generateNetworkMessage(mfact, 002);
                            log.info("Sending logoff Trace " + req.getField(ISO8583Fields.SYSTEMS_TRACE_AUDIT_NUMBER)+" at "+System.currentTimeMillis()+" "+new String(req.writeData()));
                            req.write(sock.getOutputStream(), 2);
                    }
                    Thread.sleep(100);
                }catch (IOException ex) {
                    log.error("Couldn't close socket",ex);
                }catch(InterruptedException ie){
                    log.error("Exception occurred...", ie);
                    Thread.currentThread().interrupt();
                }finally{
                    sock = null;
                }
            }
        }
        private class Processor implements Runnable {

            private byte[] msg;
            private Socket sock;
            private IBaseHandler handler = null;
            private boolean isException = false;

            Processor(byte[] buf, Socket s) {

                msg = buf;
                sock = s;
            }
            @Override
            public void run() {
                try {
                    Transaction8583DAO transactionDAO = new Transaction8583DAO();
                    TRANSACTION8583 transaction8583 = new TRANSACTION8583();

                    transaction8583.setReqString(new String(msg).trim());
                    transaction8583.setEventStartTime(new Date());
                    transaction8583.setEventStatus("Generated");
                    transactionDAO.save(transaction8583);

                    /**IsoMessage incoming = mfact.parseMessage(msg, Integer.parseInt((String)serverconfig.get("ISO_HEADER_LENGTH")));  */

                    IsoMessage incoming = mfact.parseMessage(msg, 0);

                     log.debug("info :::::"+incoming.getField(ISO8583Fields.TRANSMISSION_DATE_TIME));

                    log.info("Starting handler for Trace " + incoming.getField(ISO8583Fields.SYSTEMS_TRACE_AUDIT_NUMBER)+" at "+System.currentTimeMillis()+" "+new String(msg));
                    log.info("Incoming" +incoming.toString()); 

                    handler = new SampoernaHandler(mfact);
                    IsoMessage response = handler.handleRequest(incoming,transaction8583);  

                    log.debug("AAAAAAAAAAAAAA"+sock.getOutputStream().toString());
                    log.debug("BBBBBBBBBBBBB"+sock.getOutputStream());

                /*  byte[] s = response.writeData();
                    String str = null;
                    try {
                        str = new String(s, "UTF-8");
                    } catch (UnsupportedEncodingException e) {
                        log.error("error while getting response string", e);
                        }

                    OutputStream outstream = socket.getOutputStream(); 
                    PrintWriter out = new PrintWriter(outstream);
                    out.print(str+" \n");
    *//*
                    OutputStream outstream = socket.getOutputStream(); 
                    PrintWriter out = new PrintWriter(outstream);
                    out.print("abcccc"+" \n");*/

                    byte[] s = response.writeData();
                    String str = null;
                    try {
                        str = new String(s, "UTF-8");
                    } catch (UnsupportedEncodingException e) {
                        log.error("error while getting response string", e);
                        }
                    OutputStream os = socket.getOutputStream();
                    OutputStreamWriter osw = new OutputStreamWriter(os);
                    BufferedWriter bw = new BufferedWriter(osw);
                    bw.write(str +"\r\n");
                    System.out.println("Message sent to the client is "+str);
                    bw.flush();
                //  response.write(sock.getOutputStream(), 0);//here message header length not need to write explicitly.

                } catch (IOException ex) {
                    log.error("IOException Sending response", ex);
                    isException = true;
                } catch(Exception e){
                    isException = true;
                    log.error("Exception occur.",e);
                } finally {
                    try {
                        if(this.isException) {
                            this.sock.close();                  
                            log.debug("Closing Socket.. because of some error");                        
                        }
                    } catch (IOException ex) {
                        log.error("IOException Sending response", ex);
                    }
                }
            }
            public void print(IsoMessage m) {
                log.debug("TYPE: " + Integer.toHexString(m.getType()));  
                for (int i = 2; i < 128; i++) {  
                    if (m.hasField(i)) {  
                         log.debug("printing Field"); 
                        log.debug("F " + i + "(" + m.getField(i).getType() + "): " + m.getObjectValue(i) + " -> '"+ m.getField(i).toString() + "'");  
                    }  
                }  
            }
        }
    }

在sampleclient.java中,当我们向服务器发送请求时,它会被阻塞。

public class SampleClient{  

    private static final Log log = LogFactory.getLog(SampleClient.class);  

    private static MessageFactory mfact;  

    private static Hashtable pending = new Hashtable();  

    public SampleClient() {  

    }  

    public static void main(String[] args) throws Exception {  
        Random rng = new Random(System.currentTimeMillis());  
        log.debug("Reading config");  
        mfact = ConfigParser.createFromClasspathConfig("server-config.xml");  
        mfact.setAssignDate(true);  
        mfact.setTraceNumberGenerator(new SimpleTraceGenerator((int)(System.currentTimeMillis() % 10000)));  
        log.debug("Connecting to server");
        System.setProperty("javax.net.debug","all");
        //Socket sock = new Socket("127.0.0.1", 9999);
        Socket sock = new Socket("192.168.1.5", 9999);
        if(sock.isConnected()) {
            //Send 10 messages, then wait for the responses   
            //SampleClient reader = new SampleClient(sock);
            new Thread(new SampleClient().new Receiver(sock),"j8583-SampleClient-Receiver").start();
            String accountNumber = "30033997";
            //Logn Message
            IsoMessage req =  generateNetworkMessage(mfact,NetworkManagementCode.LOGIN);        
            pending.put(req.getField(11).toString(), req);
            System.out.println("req field 11"+req.getField(11).toString());
            log.info(String.format("Sending request %s", req.getField(11)+" : "+req.getObjectValue(11))+" "+new String(req.writeData()));
            req.write(sock.getOutputStream(), 0);
            Thread.sleep(500);
}}
    private static IsoMessage generateNetworkMessage(MessageFactory mfact,int iType){
        IsoMessage req = mfact.newMessage(0x800);       
        req.setValue(ISO8583Fields.NETWORK_MANAGEMENT_INFORMATION_CODE, iType, IsoType.NUMERIC, 3);
        req.setIsoHeader(null);
        int i = new String(req.writeData()).length();
        req.setIsoHeader(String.format("%1$04d", i));
        System.out.println("req header "+req.toString());
        return req;     
    }
  final class Receiver implements Runnable {        
        private Socket sock;
        private IBaseHandler handler = null;
        Receiver(Socket s) {
            sock = s;
            log.debug("Receiver: sock:"+sock.isConnected()+" Input: "+sock.isInputShutdown()+" Output: "+sock.isOutputShutdown());
        }

        public void run() {  
            byte[] lenbuf = new byte[4];  

            try {
                log.debug("Thread started with pending size:"+pending.size() );                     
                // For high volume apps you will be better off only reading the stream in this thread
                // and then using another thread to parse the buffers and process the requests
                // Otherwise the network buffer might fill up and you can miss a request.
                while (sock != null && sock.isConnected()) {// && Thread.currentThread().isAlive() && !Thread.currentThread().isInterrupted()) {
                    if (sock.getInputStream().read(lenbuf) == lenbuf.length) {
                        StringBuffer sb = new StringBuffer();
                        for(int i=0;i<lenbuf.length;i++){   
                              System.out.println("lenbuf> "+lenbuf[i]);
                            System.err.print("lenbuf["+i+"]:"+Integer.parseInt(lenbuf[i]+""));
                            sb.append((char)Integer.parseInt(lenbuf[i]+""));
                        } 
                        //int size = ((lenbuf[0] & 0xff) << 8) | (lenbuf[1] & 0xff);
                        int size = Integer.parseInt(sb.toString());
                        byte[] buf = new byte[size];  
                        //We're not expecting ETX in this case   
                        if (sock.getInputStream().read(buf) == size) {  
                            try {  
                                //We'll use this header length as a reference.   
                                //In practice, ISO headers for any message type are the same length.
                                //log.debug("new String(buf):"+new String(buf));
                                IsoMessage resp = mfact.parseMessage(buf,0);
                                //IsoMessage incoming = mfact.parseMessage(buf, Integer.parseInt((String)serverconfig.get("ISO_HEADER_LENGTH")));
                                //print(resp);
                                log.debug("Read response " + resp.getField(11) + " conf " + resp.getField(38) + ": " + new String(buf));  
                                pending.remove(resp.getField(11).toString());  
                            } catch (ParseException ex) {  
                                log.error("Parsing response", ex);  
                            }  
                        } else {
                            log.debug("Clearing Pending:"+pending.size());
                            pending.clear();  
                            return;  
                        }
                    }
                }
                log.info("Socket is disconnected. While pending has "+pending.size());
            } catch (IOException ex) {  
                log.error("Reading responses", ex);  
            } finally {  
       }  
        }

        public void print(IsoMessage m) {  
            log.debug("TYPE: " + Integer.toHexString(m.getType()));  
            for (int i = 2; i < 128; i++) {  
                if (m.hasField(i)) {  
                    log.debug("F " + i + "(" + m.getField(i).getType() + "): " + m.getObjectValue(i) + " -> '"+ m.getField(i).toString() + "'");  
                }  
            }  
        }
    }

}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题