实现通用池的nodejs tcp客户端发送到java tcp服务器时丢失数据

dnph8jn4  于 2021-07-08  发布在  Java
关注(0)|答案(0)|浏览(473)

我使用通用池将数据从nodejs tcp客户机发送到javatcp服务器。
这是我的nodejs客户端入口点中的相关代码 blockEventListener.js -此处显示完整代码:

function createPool() {
    const factory = {
        create: function() {
            return new Promise((resolve, reject) => {
                const socket = new net.Socket();
                socket.connect({
                    host: sdkAddress,
                    port: sdkPort,
                });
                socket.setKeepAlive(true);
                socket.on('connect', () => {
                    resolve(socket);
                });
                socket.on('error', error => {
                    if (error.code === "ECONNREFUSED") {
                        //console.log(`Retry after ${poolRetry}ms`);
                        setTimeout(() => {
                            socket.connect({
                                host: sdkAddress,
                                port: sdkPort,
                            });
                        }, poolRetry);
                    } else {
                        reject(error);
                    }
                });
                socket.on('close', hadError => {
                    console.log(`socket closed: ${hadError}`);
                });
            });
        },
        destroy: function(socket) {
            return new Promise((resolve) => {
                socket.destroy();
                resolve();
            });
        },
        validate: function (socket) {
            return new Promise((resolve) => {
                if (socket.destroyed || !socket.readable || !socket.writable) {
                    return resolve(false);
                } else {
                    return resolve(true);
                }
            });
        }
    };
    return genericPool.createPool(factory, {
        max: poolMax,
        min: poolMin,
        maxWaitingClients: poolQueue,
        testOnBorrow: true,
        acquireTimeoutMillis: queueWait
    });
}

const pool = createPool();
const poolStatusReport = `pool.spareResourceCapacity = ${pool.spareResourceCapacity}, pool.available = ${pool.available}, pool.borrowed = ${pool.borrowed}, pool.pending = ${pool.pending}`;

async function processPendingBlocks(ProcessingMap, channelid, configPath) {
    setTimeout(async () => {
        let nextBlockNumber = fs.readFileSync(configPath, "utf8");
        let processBlock;

        do {
            processBlock = ProcessingMap.get(channelid, nextBlockNumber);
            if (processBlock == undefined) {
                break;
            }

            if (pool.spareResourceCapacity == 0 && pool.available == 0) {
                break;
            }

            try {
                const sock = await pool.acquire();
                await blockProcessing.processBlockEvent(channelid, processBlock, sock, configPath);
                await pool.release(sock);
            } catch (error) {
                console.error(`Failed to process block: ${error}`);
            }

            ProcessingMap.remove(channelid, nextBlockNumber);
            fs.writeFileSync(configPath, parseInt(nextBlockNumber, 10) + 1);
            nextBlockNumber = fs.readFileSync(configPath, "utf8");
        } while (true);

        processPendingBlocks(ProcessingMap, channelid, configPath);
    }, blockProcessInterval);
}

这是中的相关代码 blockProcessing.js -此处显示完整代码:

exports.processBlockEvent = async function (channelname, block, socket, configPath) {
    return new Promise(async (resolve, reject) => {
        // some code

        for (var dataItem in dataArray) {

            // more code

            for (var actionItem in actions) {

                // yet more code

                for (var record in rwSet) {
                    // ignore lscc events
                    if (rwSet[record].namespace != "lscc") {
                        // create object to store properties
                        const writeObject = new Object();
                        writeObject.blocknumber = blockNumber;
                        writeObject.chaincodeid = chaincodeID;
                        writeObject.channelid = channelid;
                        writeObject.timestamp = timestamp;
                        writeObject.txnid = txnid;
                        writeObject.values = rwSet[record].rwset.writes;

                        writeToSocket(socket, writeObject, channelname, chaincodeID);
                    }
                }
            }
            console.log("---------");
        }

        // update the nextblock.nextBlock file to retrieve the next block
        fs.writeFileSync(configPath, parseInt(blockNumber, 10) + 1);

        socket.write('<END>\n');
        resolve(true);
    })
}

function writeToSocket(socket, writeObject, channelname, chaincodeID) {
    return new Promise(async (resolve, reject) => {
        console.log(`ChannelID: ${writeObject.channelid}`);
        console.log(`Transaction Timestamp: ${writeObject.timestamp}`);
        console.log(`ChaincodeID: ${writeObject.chaincodeid}`);
        console.log(`TxnID: ${writeObject.txnid}`);
        console.log(writeObject.values);

        let objstr = JSON.stringify(writeObject);
        socket.on('error', function(ex) {
            console.log('!!!!!!!!!!!!! ERROR !!!!!!!!!!!!!');
            console.log(ex);
            console.log(writeObject);
            console.log('=================================');
        });
        socket.write(objstr);
        socket.write('\n');

        var outputLog = path.resolve(__dirname, folderLog, `${channelname}_${chaincodeID}.log`);
        fs.appendFileSync(outputLog, objstr + "\n");
    });
}

这是我的简单java tcp服务器中的代码:

package demo;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;

public class SimpleListener {
    public static void main(String[] args) throws IOException {
        ServerSocket server = new ServerSocket(Integer.parseInt(args[0]));
        int n = 0;
        while (n < Integer.parseInt(args[1])) {
            new ThreadSocket(server.accept());
            n++;
        }
        server.close();
    }
}

class ThreadSocket extends Thread{
    private Socket insocket;    
    ThreadSocket(Socket insocket){
        this.insocket = insocket;
        this.start();
    }       
    @Override
    public void run() {     
        try {
            InputStream is = insocket.getInputStream();
            InputStreamReader reader = new InputStreamReader(is);
            BufferedReader in = new BufferedReader(reader);

            PrintWriter out = new PrintWriter(insocket.getOutputStream(), true);

            StringBuilder sb = new StringBuilder();
            String line = in.readLine();
            while (line != null && !"<END>".equalsIgnoreCase(line)) {
                sb.append(line + "\n");
                line = in.readLine();
            }
            String output = sb.toString();
            System.out.println("INCOMING: " + output);

            out.println(200);
            out.close();
            in.close();
            reader.close();
            is.close();
            insocket.close();
        } catch (IOException e) {
            e.printStackTrace();
        }       
    }
}

当我运行这些程序时,我希望看到客户端读取的所有数据都被完整地发送到服务器。相反,我观察到的类似于这个示例输出。最初,信息是完整的。但是,在前五行之后(我认为这是因为我将池的最小大小设置为5),我开始看到信息中的空白。关键是,我明白了 EPIPE 以及 ERR_STREAM_DESTROYED 在我的客户端输出中:

!!!!!!!!!!!!! ERROR !!!!!!!!!!!!!
Error [ERR_STREAM_DESTROYED]: Cannot call write after a stream was destroyed
    at doWrite (_stream_writable.js:399:19)
    at writeOrBuffer (_stream_writable.js:387:5)
    at Socket.Writable.write (_stream_writable.js:318:11)
    at /root/nodejslistener/accessreal/product/sdk/nodejs/blockProcessing.js:103:16
    at new Promise (<anonymous>)
    at Object.exports.processBlockEvent (/root/nodejslistener/accessreal/product/sdk/nodejs/blockProcessing.js:16:12)
    at Timeout._onTimeout (/root/nodejslistener/accessreal/product/sdk/nodejs/blockEventListener.js:185:39) {
  code: 'ERR_STREAM_DESTROYED'
}
{
  blocknumber: '427',
  ...more details...
}

epipe错误输出示例:

!!!!!!!!!!!!! ERROR !!!!!!!!!!!!!
Error: write EPIPE
    at afterWriteDispatched (internal/stream_base_commons.js:156:25)
    at writeGeneric (internal/stream_base_commons.js:147:3)
    at Socket._writeGeneric (net.js:788:11)
    at Socket._write (net.js:800:8)
    at doWrite (_stream_writable.js:403:12)
    at writeOrBuffer (_stream_writable.js:387:5)
    at Socket.Writable.write (_stream_writable.js:318:11)
    at /root/nodejslistener/accessreal/product/sdk/nodejs/blockProcessing.js:124:16
    at new Promise (<anonymous>)
    at writeToSocket (/root/nodejslistener/accessreal/product/sdk/nodejs/blockProcessing.js:109:12) {
  errno: 'EPIPE',
  code: 'EPIPE',
  syscall: 'write'
}
{
  blocknumber: '426',
  ...more details...
}

关于我目前的处境,我有几个问题:
当我忘记了 .close() 在我的java服务器中,我没有得到任何错误,但我只看到前五个数据块。后续的数据似乎丢失了,但我在我的客户机中没有看到任何错误。为什么?
我应该在nodejs客户机和/或java服务器中更改什么,以便成功地发送所有数据?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题