我们正忙于一个poc,在那里我们生成一个kafka主题的消息(现在大约200万,最终应该是1.3亿),我们喜欢通过elasticsearch对这个主题进行查询。因此,一个小型的poc已经制作完成,它通过合流的elasticsearch sink连接器(最新版本)和连接器6.0.0将数据输入es。但是,我们遇到了很多超时问题,最终任务失败,并显示需要重新启动任务的消息:
ERROR WorkerSinkTask{id=transactions-elasticsearch-connector-3} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: java.net.SocketTimeoutException: Read timed out (org.apache.kafka.connect.runtime.WorkerSinkTask)
我的接收器连接器配置如下:
{
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://elasticsearch:9200",
"key.converter" : "org.apache.kafka.connect.storage.StringConverter",
"value.converter" : "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url" : "http://schema-registry:8081",
"topics": "transactions,trades",
"type.name": "transactions",
"tasks.max" : "4",
"batch.size" : "50",
"max.buffered.events" : "500",
"max.buffered.records" : "500",
"flush.timeout.ms" : "100000",
"linger.ms" : "50",
"max.retries" : "10",
"connection.timeout.ms" : "2000",
"name": "transactions-elasticsearch-connector",
"key.ignore": "true",
"schema.ignore": "false",
"transforms" : "ExtractTimestamp",
"transforms.ExtractTimestamp.type" : "org.apache.kafka.connect.transforms.InsertField\$Value",
"transforms.ExtractTimestamp.timestamp.field" : "MSG_TS"
}
不幸的是,即使不生成消息并手动启动elasticsearch接收器连接器,任务也会关闭,需要重新启动。我已经摆弄了各种批量大小的窗口,重试等,但没有结果。注意,我们只有一个kafka代理、一个elasticsearch连接器和一个elasticsearch示例在docker容器中运行。
我们还可以看到很多这样的超时消息:
[2020-12-08 13:23:34,107] WARN Failed to execute batch 100534 of 50 records with attempt 1/11, will attempt retry after 43 ms. Failure reason: Read timed out (io.confluent.connect.elasticsearch.bulk.BulkProcessor)
^[[36mconnect |^[[0m [2020-12-08 13:23:34,116] WARN Failed to execute batch 100536 of 50 records with attempt 1/11, will attempt retry after 18 ms. Failure reason: Read timed out (io.confluent.connect.elasticsearch.bulk.BulkProcessor)
^[[36mconnect |^[[0m [2020-12-08 13:23:34,132] WARN Failed to execute batch 100537 of 50 records with attempt 1/11, will attempt retry after 24 ms. Failure reason: Read timed out (io.confluent.connect.elasticsearch.bulk.BulkProcessor)
^[[36mconnect |^[[0m [2020-12-08 13:23:36,746] WARN Failed to execute batch 100539 of 50 records with attempt 1/11, will attempt retry after 0 ms. Failure reason: Read timed out (io.confluent.connect.elasticsearch.bulk.BulkProcessor)
^[[36mconnect |^[[0m [2020-12-08 13:23:37,139] WARN Failed to execute batch 100536 of 50 records with attempt 2/11, will attempt retry after 184 ms. Failure reason: Read timed out (io.confluent.connect.elasticsearch.bulk.BulkProcessor)
^[[36mconnect |^[[0m [2020-12-08 13:23:37,155] WARN Failed to execute batch 100534 of 50 records with attempt 2/11, will attempt retry after 70 ms. Failure reason: Read timed out (io.confluent.connect.elasticsearch.bulk.BulkProcessor)
^[[36mconnect |^[[0m [2020-12-08 13:23:37,160] WARN Failed to execute batch 100537 of 50 records with attempt 2/11, will attempt retry after 157 ms. Failure reason: Read timed out (io.confluent.connect.elasticsearch.bulk.BulkProcessor)
^[[36mconnect |^[[0m [2020-12-08 13:23:39,681] WARN Failed to execute batch 100540 of 50 records with attempt 1/11, will attempt retry after 12 ms. Failure reason: Read timed out (io.confluent.connect.elasticsearch.bulk.BulkProcessor)
^[[36mconnect |^[[0m [2020-12-08 13:23:39,750] WARN Failed to execute batch 100539 of 50 records with attempt 2/11, will attempt retry after 90 ms. Failure reason: Read timed out (io.confluent.connect.elasticsearch.bulk.BulkProcessor)
^[[36mconnect |^[[0m [2020-12-08 13:23:40,231] WARN Failed to execute batch 100534 of 50 records with attempt 3/11, will attempt retry after 204 ms. Failure reason: Read timed out (io.confluent.connect.elasticsearch.bulk.BulkProcessor)
^[[36mconnect |^[[0m [2020-12-08 13:23:40,322] WARN Failed to execute batch 100537 of 50 records with attempt 3/11, will attempt retry after 58 ms. Failure reason: Read timed out (io.confluent.connect.elasticsearch.bulk.BulkProcessor)
你知道我们可以改进什么使整个链条可靠吗?出于我们的目的,只要所有消息都能可靠地进入elasticsearch,而无需每次连接器的任务都重新启动,它就不需要非常快。
暂无答案!
目前还没有任何答案,快来回答吧!