Python apache beam从Kafka读取-我无法连接到kafka

ugmeyewa  于 2023-08-08  发布在  Python
关注(0)|答案(1)|浏览(89)

我想做一个python apache beam Kafka客户端,它将从Kafka获取流数据(Kafka返回大量数据,如数百万/数十亿),通过键和值过滤数据,并以字典列表的形式批量返回数据。我被卡在了Kafka的连接上,我在使用ReadFromKafka连接时遇到了一个问题,看起来这个方法只返回None,但它不应该(我使用底部的脚本测试了连接)。我不知道我做错了什么,如果有人帮助我,我会非常感激。我使用DirectRunner作为管道运行器。(self.kafka_config.topics是主题列表)
我目前的pipeline apache beam Kafka脚本看起来是这样的:

import logging
from typing import Any, Dict, Iterator, List, Optional

import apache_beam as beam
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.options.pipeline_options import PipelineOptions

from internal_kafka_client import KafkaConfig

class KafkaProcessData(beam.DoFn):
    # pylint: disable = abstract-method
    def __init__(self, filter_key: Any = None, filter_value: Any = None):
        super().__init__()
        self.filter_key = filter_key
        self.filter_value = filter_value

    def process(self, element: Any, *args, **kwargs) -> Iterator[Any]:
        if self.filter_key and self.filter_value:
            if (
                self.filter_key in element
                and element[self.filter_key] == self.filter_value
            ):
                yield element
        yield element

class BatchElements(beam.DoFn):
    # pylint: disable = abstract-method
    def __init__(self, batch_size: int):
        super().__init__()
        self.batch_size = batch_size
        self.buffer = []

    def process(self, element: Any, *args, **kwargs) -> Iterator[List[Any]]:
        self.buffer.append(element)
        if len(self.buffer) >= self.batch_size:
            yield self.buffer
            self.buffer = []

class KafkaClient:
    # pylint: disable = too-few-public-methods
    """
    pipeline_runner choices:
        - DataflowRunner (streaming mode)
        - FlinkRunner (
                streaming mode, locally, not on cluster, haven't tested with cluster
            )
        - DirectRunner (streaming mode)
    """
    _PIPELINE_TYPES = ["DataflowRunner", "FlinkRunner", "DirectRunner"]

    def __init__(
        self,
        kafka_config: KafkaConfig,
        logger: logging.Logger,
        pipeline_runner: str,
        apache_beam_pipeline_options: Optional[Dict] = None,
    ) -> None:
        if not apache_beam_pipeline_options:
            apache_beam_pipeline_options = {}
        if pipeline_runner not in self._PIPELINE_TYPES:
            raise ValueError(
                f"Given pipeline type {pipeline_runner} is not in "
                f"available pipeline list {self._PIPELINE_TYPES}"
            )
        self.pipeline_runner = pipeline_runner
        self.kafka_config = kafka_config
        self.logger = logger
        self.beam_options = PipelineOptions(
            **apache_beam_pipeline_options, save_main_session=True
        )

    def process(self, filter_key: Any = None, filter_value: Any = None) -> List[Dict]:
        kafka_consumer_config = {
            "bootstrap.servers": self.kafka_config.server_address,
            "auto.offset.reset": self.kafka_config.offset_reset,
        }
        if self.kafka_config.group_id:
            kafka_consumer_config["group.id"] = self.kafka_config.group_id

        with beam.Pipeline(self.pipeline_runner, options=self.beam_options) as pipeline:
            # pylint: disable = unsupported-binary-operation
            kafka_data = pipeline | "Read from Kafka" >> ReadFromKafka(
                topics=self.kafka_config.topics,
                consumer_config=kafka_consumer_config,
            )
            processed_data = kafka_data | "Process Data" >> beam.ParDo(
                KafkaProcessData(filter_key=filter_key, filter_value=filter_value)
            )
            processed_data_batches = processed_data | "Batch Data" >> beam.ParDo(
                BatchElements(batch_size=self.kafka_config.batch_size)
            )
            results = pipeline.run()
            results.wait_until_finish()
            return processed_data_batches[0] if processed_data_batches else []

字符串
我测试Kafka连接的脚本。

from confluent_kafka import Consumer, KafkaError

bootstrap_servers = '<<kafka_address>>:<<kafka_port>>'
topic = '<<topic_name>>'

consumer_config = {
    'bootstrap.servers': bootstrap_servers,
    'group.id': 'my-group',
    'auto.offset.reset': 'earliest'
}

def test_kafka_connection():
    consumer = Consumer(consumer_config)
    consumer.subscribe([topic])
    try:
        while True:
            msg = consumer.poll(1.0)

            if msg is None:
                continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    continue
                else:
                    print('Kafka error: {}'.format(msg.error()))
                    break
            print('Received message: {}'.format(msg.value().decode('utf-8')))

    except KeyboardInterrupt:
        consumer.close()
        print('Consumer closed')

if __name__ == '__main__':
    test_kafka_connection()


我在谷歌上寻找答案,但大多数或所有的解决方案看起来都和我的Kafka连接一样,所以我不知道我做错了什么,我在这里寻求帮助:)

ego6inou

ego6inou1#

原来,DirectRunner加载所有数据到内存,我得到了内存异常.为了解决这个问题,我不得不将max_num_records参数添加到ReadFromKafka,并将enable.auto.commit设置为kafka_consumer_config var中的False

相关问题