是什么导致节点kafka流出现这种间歇性问题?

hl0ma9xz  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(401)

我有一个Kafka制作人和消费者。
制作人这样做:

const returnMessage = {
        prop1: 'some string',
        prop2: 'another string',
        prop3: nestedObject
    };
    console.log(JSON.stringify(returnMessage))
    await stream.writeToStream(JSON.stringify(returnMessage));

消费者这样做:

incomingStream.forEach(
                message => {
                        console.log(message.value)
                        let messageObject = message.value;
                        ...other stuff...
                }
            );

现在,在生产者方面,返回消息总是作为一个适当的字符串记录,一切都很好。但在使用者方面,首先,message.value是一个正确的字符串,可以从中解析json,但在后续请求中,它会出现“[object]”。如果
我觉得我错过了一些重要的东西…如果你有任何见解,请帮忙。

oipij1gg

oipij1gg1#

好的,我知道了。这个 incomingStream.forEach 发生在路由内,而流正在控制器级别示例化。我通过移动 forEach 到控制器级别,并让它在每条消息上发出已解析的消息,然后订阅路由内的事件。

相关问题