kafka源代码-了解selector.poll()的语义

txu3uszq  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(374)

我在研究kafka的网络层代码时,有一些关于selector类的问题,特别是poll()方法是如何实现的。poll()方法如下所示:

void poll(int timeout){
....
    /* check ready keys */
    long startSelect = time.nanoseconds();
    int readyKeys = select(timeout);
    long endSelect = time.nanoseconds();
    this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());

    if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
        pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
        pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
    }

...
}

有什么特别的要求吗 pollSelectionKeys() 方法首先对 select() 方法,然后对立即连接的键?只是为了清楚起见,我们单独执行这些操作,还是涉及一些特定的要求?
第二,在 pollSelectionKeys() 方法,我们有:

void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
                                       boolean isImmediatelyConnected,
                                       long currentTimeNanos){
...
    /* if channel is ready write to any sockets that have space in their buffer and for which
    we have data */
    if (channel.ready() && key.isWritable()) {
        Send send = channel.write();
        if (send != null) {
            this.completedSends.add(send);
            this.sensors.recordBytesSent(channel.id(), send.size());
        }
    }
...
}

据我所知,我们只给 KafkaChannel 当它属于之前调用 select() 方法,或者 KafkaChannel 与一个 immediatelyConnectedKeys . 我的问题是,我们为什么要去做写信给 KafkaChannels 这种方式?更具体地说,我们不需要遍历所有 KafkaChannels 已经连接的,如果他们有 Send 与之关联的对象?就这样,我们写信给 KafkaChannel 尽快,不用等它属于我们 immediatelyConnectedKeys 或者 readyKeys .

vnjpjtjt

vnjpjtjt1#

在i/o连接完成之前,tcp连接不可用

chhkpiq4

chhkpiq42#

答案在于selector类的connect方法(下面的相关部分)

connected = socketChannel.connect(address);
..............................
................................

 SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);

根据nio socketchannel connect的文档说明
如果此通道处于非阻塞模式,则调用此方法将启动非阻塞连接操作。如果立即建立连接,就像本地连接一样,则此方法返回true。否则,此方法返回false,连接操作必须稍后通过调用finishconnect方法完成。
因此,一个典型的交互工作流程如下(这里解释得很好)
如果以非阻塞模式连接,则应:
为op\ U connect注册通道
当它启动时,调用finishconnect()
如果返回true,则取消注册opu connect并注册opu read或opu write,具体取决于接下来要执行的操作
如果返回false,则不执行任何操作,继续选择
如果connect()或finishconnect()引发异常,请关闭通道并重试,或者忽略它,或者告诉用户或
任何合适的。
如果在通道连接之前不想做任何事情,请在阻塞模式下进行连接,并在连接成功时进入非阻塞模式。
这个connect方法可能会像本地连接一样立即连接,并且可能不会触发为这个连接socketchannel注册的op\u connect事件(connect调用后的几行),因此当使用典型的java nio register代码时,我们可能会错过它。我们最终需要在这样的通道上调用finishconnect(参见工作流中的第二个要点)。所以我们将这样的通道密钥添加到另一个集合中 immediatelyConnectedKeys 这样他们就可以处理太晚,否则我们会错过他们完全。

if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
            pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
            pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
        }

后来呢 pollSelectionKeys 方法(注意 finishConnect 调用finishconnect到底层socketchannel

/* complete any connections that have finished their handshake (either normally or immediately) */
                if (isImmediatelyConnected || key.isConnectable()) {
                    if (channel.finishConnect()) {
.........................
.........................

总而言之,Kafka的代码看起来像是标准的nio代码,除非Kafka团队能够解释更多的内容。与此相关的一个有趣的误解(bug归档和最终被jdk团队拒绝)可以在这里找到
对于问题的第二部分,您可能正在询问以下代码。为什么两个电话都要钥匙

if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
            pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
            pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
        }

我们现在维护了两组密钥 selector.keys() 但是密钥集是不可直接修改的,所以它是一种只读视图。此密钥集中的密钥只有在被取消并且其通道被取消注册后才会被删除。所以通常 selector.selectedKeys() 用于访问就绪通道。也 selector.selectedKeys() 显然不会把钥匙从 immediatelyConnectedKeys . 对从selector.selectedkeys()获得的这些键进行处理的通常模式是对集合进行迭代,测试由key表示的通道准备好用于什么事件(可接受的、可连接的、可读/可写的),执行操作,然后将其从集合中删除。此删除部分是非常必要的。选择器不会从所选键集本身删除selectionkey示例。处理完频道后,必须这样做。下一次通道变为“就绪”时,选择器将再次将其添加到选定的键集。所以这就是为什么两者都被处理和方法 pollSelectionKeys 是为了兼顾两者。

相关问题