使用Kafka来流式传输包含元数据的文件?

mm5n2pyu  于 12个月前  发布在  Apache
关注(0)|答案(2)|浏览(102)

我想问,如果有人有在Kafka中生产和消费文件的经验。
.NET C#应该是语言。
我们正在研究一个概念,它从多个来源获取文件(最大100 mb),并编写多个消费者,并对不同的类型做出React,例如电子邮件或物理文件,丰富了元数据。
我试着弹了一下。
最好的情况是使用schema registry。
我设置了一个docker-compose文件来测试,看起来像这样:

version: '2.1'

services:
  zoo1:
    image: confluentinc/cp-zookeeper:7.3.2
    hostname: zoo1
    container_name: zoo1
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_SERVERS: zoo1:2888:3888

  kafka1:
    image: confluentinc/cp-kafka:7.3.2
    hostname: kafka1
    container_name: kafka1
    ports:
      - "9092:9092"
      - "29092:29092"
      - "9999:9999"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_JMX_PORT: 9001
      KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
    depends_on:
      - zoo1

  kafka-schema-registry:
    image: confluentinc/cp-schema-registry:7.3.2
    hostname: kafka-schema-registry
    container_name: kafka-schema-registry
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
      SCHEMA_REGISTRY_HOST_NAME: kafka-schema-registry
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
    depends_on:
      - zoo1
      - kafka1

  
  kafka-rest-proxy:
    image: confluentinc/cp-kafka-rest:7.3.2
    hostname: kafka-rest-proxy
    container_name: kafka-rest-proxy
    ports:
      - "8082:8082"
    environment:
      # KAFKA_REST_ZOOKEEPER_CONNECT: zoo1:2181
      KAFKA_REST_LISTENERS: http://0.0.0.0:8082/
      KAFKA_REST_SCHEMA_REGISTRY_URL: http://kafka-schema-registry:8081/
      KAFKA_REST_HOST_NAME: kafka-rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
    depends_on:
      - zoo1
      - kafka1
      - kafka-schema-registry

  
  kafka-connect:
    image: confluentinc/cp-kafka-connect:7.3.2
    hostname: kafka-connect
    container_name: kafka-connect
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "kafka1:19092"
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kafka-schema-registry:8081'
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kafka-schema-registry:8081'
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
      CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
      CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_PLUGIN_PATH: '/usr/share/java,/etc/kafka-connect/jars,/usr/share/confluent-hub-components'
    volumes:
      - ./connectors:/etc/kafka-connect/jars/
    depends_on:
      - zoo1
      - kafka1
      - kafka-schema-registry
      - kafka-rest-proxy
    command:
      - bash
      - -c
      - |
        confluent-hub install --no-prompt debezium/debezium-connector-mysql:latest
        confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.4.0
        /etc/confluent/docker/run

  
  ksqldb-server:
    image: confluentinc/cp-ksqldb-server:7.3.2
    hostname: ksqldb-server
    container_name: ksqldb-server
    ports:
      - "8088:8088"
    environment:
      KSQL_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
      KSQL_LISTENERS: http://0.0.0.0:8088/
      KSQL_KSQL_SERVICE_ID: ksqldb-server_
    depends_on:
      - zoo1
      - kafka1

  postgresql:
    extends:
      service: postgresql
      file: conduktor.yml 
            
  conduktor-platform:
    extends:
      service: conduktor-platform
      file: conduktor.yml

volumes:
  pg_data: {}
  conduktor_data: {}

下面是一个哑.Net Core API来生成一些消息:KafkaProducerController.cs

[ApiController]
    [Route("api/kafka")]
    public class KafkaProducerController : ControllerBase
    {
        private readonly KafkaDependentProducer<Null, string> messageProducer;
        private readonly KafkaDependentProducer<Null, FileData> iDokProducer;
        private string topic;
        private string iDokTopic;
        private readonly ProducerConfig config = new ProducerConfig
        {
            BootstrapServers = "localhost:9092",
            MessageMaxBytes = 10000000
        };

        public KafkaProducerController(KafkaDependentProducer<Null, string> messageProducer, KafkaDependentProducer<Null, FileData> iDokProducer, IConfiguration config)
        {
            this.topic = config.GetValue<string>("Kafka:MessageTopic");
            this.iDokTopic = config.GetValue<string>("Kafka:iDokTopic");
            this.messageProducer = messageProducer;
            this.iDokProducer = iDokProducer;
        }

        [HttpPost]
        [Route("")]
        public async Task<IActionResult> PostAsync([FromQuery] string message)
        {
            var kafkaMessage = new Message<Null, string> { Value = message };
            await this.messageProducer.ProduceAsync(topic, kafkaMessage);
            return Ok();
            //return Created(string.Empty, SendToKafka("gtm-test", kafkaMessage));
        }

        [HttpPost("upload")]
        //[Route("file")]
        public async Task<IActionResult> PostFileAsync(IFormFile file)
        {
            if (file == null || file.Length <= 0)
            {
                return BadRequest("No file uploaded.");
            }

            var kafkaMessage = new Message<Null, FileData> { Value = { formFile = file, Info1 = "Test", Info2 = "Test2" } };

            await this.iDokProducer.ProduceAsync(iDokTopic, kafkaMessage);
            return Ok();
    }
}

KafkaDepedentProducer.cs:

public class KafkaDependentProducer<K, V>
    {
        IProducer<K, V> kafkaHandle;

        public KafkaDependentProducer(KafkaClientHandle handle, ISerializer<V> serializer = null)
        {
            if (serializer == null)
            {
                kafkaHandle = new DependentProducerBuilder<K, V>(handle.Handle).Build();
            }
            else
            {
                kafkaHandle = new DependentProducerBuilder<K, V>(handle.Handle)
                    .SetValueSerializer(serializer)
                    .Build();
            }
        }

        /// <summary>
        ///     Asychronously produce a message and expose delivery information
        ///     via the returned Task. Use this method of producing if you would
        ///     like to await the result before flow of execution continues.
        /// <summary>
        public Task ProduceAsync(string topic, Message<K, V> message)
            => this.kafkaHandle.ProduceAsync(topic, message);

        /// <summary>
        ///     Asynchronously produce a message and expose delivery information
        ///     via the provided callback function. Use this method of producing
        ///     if you would like flow of execution to continue immediately, and
        ///     handle delivery information out-of-band.
        /// </summary>
        public void Produce(string topic, Message<K, V> message, Action<DeliveryReport<K, V>> deliveryHandler = null)
            => this.kafkaHandle.Produce(topic, message, deliveryHandler);

        public void Flush(TimeSpan timeout)
            => this.kafkaHandle.Flush(timeout);
    }

消费者:
Program.cs

class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Demo for using Kafka APIs");

            Consume("gtm-test");
        }
        static void Consume(string topic)
        {
            var conf = new ConsumerConfig
            {
                GroupId = "st_consumer_group",
                BootstrapServers = "localhost:9092",
                AutoOffsetReset = AutoOffsetReset.Earliest
            };

            CancellationTokenSource cts = new CancellationTokenSource();
            Console.CancelKeyPress += (_, e) =>
            {
                e.Cancel = true; // prevent the process from terminating.
                cts.Cancel();
            };

            using (var builder = new ConsumerBuilder<Ignore,FileData>(conf)
                .SetValueDeserializer(new JsonDeserializer<FileData>().AsSyncOverAsync())
                .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
                .Build())
            {
                builder.Subscribe(new List<string>() { "gtm-test", "gtm-test-file"});
                var cancelToken = new CancellationTokenSource();
                try
                {
                    while (true)
                    {
                        var consumer = builder.Consume(cancelToken.Token);
                        Console.WriteLine($"Message: {consumer.Message.Value} received from {consumer.TopicPartitionOffset}, Topic: {consumer.Topic}");
                    }
                }
                catch (Exception)
                {
                    builder.Close();
                }
            }
        }

谢谢你的帮助!

yyhrrdl8

yyhrrdl81#

Kafka不是用来传输大文件的。您将需要对代理(如果可能的话,您将面临许多限制)和产生/使用这些数据的应用程序(大量的阈值、缓冲区和超时)进行调优。
相反,你应该选择这两个选项之一:

  • 将您的文件逐块发送到主题中,以符合Kafka最佳实践(小记录),并在使用时重建文件(重新组装块)。
  • 使用 *Claim-Check模式 *(如OneCricketeer所述)。你的生产者发送一个简单的引用到存储在主题中的其他地方(S3)的文件,消费者读取它并从像S3这样的冷存储中获取这个引用。您必须为您的应用程序共享一些机密才能访问S3。

(免责声明:我的公司写了关于这个更深入的https://www.conduktor.io/blog/beyond-limits-produce-large-records-without-undermining-apache-kafka/

5m1hhzi4

5m1hhzi42#

Kafka不应该用于传输大型二进制blob。使用像S3这样的系统(或本地Minio),并发送引用共享磁盘存储上文件位置的URI值。然后,您可以让您的消费者使用该字符串下载文件,而不是将二进制文件内容塞进Kafka记录中。
“元数据”可以是全新的文件(例如,相同的文件名,但后缀为.meta),因为S3可以被视为键-值blob存储。

相关问题