我想做一个python apache beam Kafka客户端,它将从Kafka获取流数据(Kafka返回大量数据,如数百万/数十亿),通过键和值过滤数据,并以字典列表的形式批量返回数据。我被卡在了Kafka的连接上,我在使用ReadFromKafka
连接时遇到了一个问题,看起来这个方法只返回None
,但它不应该(我使用底部的脚本测试了连接)。我不知道我做错了什么,如果有人帮助我,我会非常感激。我使用DirectRunner
作为管道运行器。(self.kafka_config.topics
是主题列表)
我目前的pipeline apache beam Kafka脚本看起来是这样的:
import logging
from typing import Any, Dict, Iterator, List, Optional
import apache_beam as beam
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.options.pipeline_options import PipelineOptions
from internal_kafka_client import KafkaConfig
class KafkaProcessData(beam.DoFn):
# pylint: disable = abstract-method
def __init__(self, filter_key: Any = None, filter_value: Any = None):
super().__init__()
self.filter_key = filter_key
self.filter_value = filter_value
def process(self, element: Any, *args, **kwargs) -> Iterator[Any]:
if self.filter_key and self.filter_value:
if (
self.filter_key in element
and element[self.filter_key] == self.filter_value
):
yield element
yield element
class BatchElements(beam.DoFn):
# pylint: disable = abstract-method
def __init__(self, batch_size: int):
super().__init__()
self.batch_size = batch_size
self.buffer = []
def process(self, element: Any, *args, **kwargs) -> Iterator[List[Any]]:
self.buffer.append(element)
if len(self.buffer) >= self.batch_size:
yield self.buffer
self.buffer = []
class KafkaClient:
# pylint: disable = too-few-public-methods
"""
pipeline_runner choices:
- DataflowRunner (streaming mode)
- FlinkRunner (
streaming mode, locally, not on cluster, haven't tested with cluster
)
- DirectRunner (streaming mode)
"""
_PIPELINE_TYPES = ["DataflowRunner", "FlinkRunner", "DirectRunner"]
def __init__(
self,
kafka_config: KafkaConfig,
logger: logging.Logger,
pipeline_runner: str,
apache_beam_pipeline_options: Optional[Dict] = None,
) -> None:
if not apache_beam_pipeline_options:
apache_beam_pipeline_options = {}
if pipeline_runner not in self._PIPELINE_TYPES:
raise ValueError(
f"Given pipeline type {pipeline_runner} is not in "
f"available pipeline list {self._PIPELINE_TYPES}"
)
self.pipeline_runner = pipeline_runner
self.kafka_config = kafka_config
self.logger = logger
self.beam_options = PipelineOptions(
**apache_beam_pipeline_options, save_main_session=True
)
def process(self, filter_key: Any = None, filter_value: Any = None) -> List[Dict]:
kafka_consumer_config = {
"bootstrap.servers": self.kafka_config.server_address,
"auto.offset.reset": self.kafka_config.offset_reset,
}
if self.kafka_config.group_id:
kafka_consumer_config["group.id"] = self.kafka_config.group_id
with beam.Pipeline(self.pipeline_runner, options=self.beam_options) as pipeline:
# pylint: disable = unsupported-binary-operation
kafka_data = pipeline | "Read from Kafka" >> ReadFromKafka(
topics=self.kafka_config.topics,
consumer_config=kafka_consumer_config,
)
processed_data = kafka_data | "Process Data" >> beam.ParDo(
KafkaProcessData(filter_key=filter_key, filter_value=filter_value)
)
processed_data_batches = processed_data | "Batch Data" >> beam.ParDo(
BatchElements(batch_size=self.kafka_config.batch_size)
)
results = pipeline.run()
results.wait_until_finish()
return processed_data_batches[0] if processed_data_batches else []
字符串
我测试Kafka连接的脚本。
from confluent_kafka import Consumer, KafkaError
bootstrap_servers = '<<kafka_address>>:<<kafka_port>>'
topic = '<<topic_name>>'
consumer_config = {
'bootstrap.servers': bootstrap_servers,
'group.id': 'my-group',
'auto.offset.reset': 'earliest'
}
def test_kafka_connection():
consumer = Consumer(consumer_config)
consumer.subscribe([topic])
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print('Kafka error: {}'.format(msg.error()))
break
print('Received message: {}'.format(msg.value().decode('utf-8')))
except KeyboardInterrupt:
consumer.close()
print('Consumer closed')
if __name__ == '__main__':
test_kafka_connection()
型
我在谷歌上寻找答案,但大多数或所有的解决方案看起来都和我的Kafka连接一样,所以我不知道我做错了什么,我在这里寻求帮助:)
1条答案
按热度按时间ego6inou1#
原来,
DirectRunner
加载所有数据到内存,我得到了内存异常.为了解决这个问题,我不得不将max_num_records
参数添加到ReadFromKafka
,并将enable.auto.commit
设置为kafka_consumer_config
var中的False
。