我想问,如果有人有在Kafka中生产和消费文件的经验。
.NET C#应该是语言。
我们正在研究一个概念,它从多个来源获取文件(最大100 mb),并编写多个消费者,并对不同的类型做出React,例如电子邮件或物理文件,丰富了元数据。
我试着弹了一下。
最好的情况是使用schema registry。
我设置了一个docker-compose文件来测试,看起来像这样:
version: '2.1'
services:
zoo1:
image: confluentinc/cp-zookeeper:7.3.2
hostname: zoo1
container_name: zoo1
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: zoo1:2888:3888
kafka1:
image: confluentinc/cp-kafka:7.3.2
hostname: kafka1
container_name: kafka1
ports:
- "9092:9092"
- "29092:29092"
- "9999:9999"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_JMX_PORT: 9001
KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
depends_on:
- zoo1
kafka-schema-registry:
image: confluentinc/cp-schema-registry:7.3.2
hostname: kafka-schema-registry
container_name: kafka-schema-registry
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
SCHEMA_REGISTRY_HOST_NAME: kafka-schema-registry
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
depends_on:
- zoo1
- kafka1
kafka-rest-proxy:
image: confluentinc/cp-kafka-rest:7.3.2
hostname: kafka-rest-proxy
container_name: kafka-rest-proxy
ports:
- "8082:8082"
environment:
# KAFKA_REST_ZOOKEEPER_CONNECT: zoo1:2181
KAFKA_REST_LISTENERS: http://0.0.0.0:8082/
KAFKA_REST_SCHEMA_REGISTRY_URL: http://kafka-schema-registry:8081/
KAFKA_REST_HOST_NAME: kafka-rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
depends_on:
- zoo1
- kafka1
- kafka-schema-registry
kafka-connect:
image: confluentinc/cp-kafka-connect:7.3.2
hostname: kafka-connect
container_name: kafka-connect
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka1:19092"
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kafka-schema-registry:8081'
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kafka-schema-registry:8081'
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_PLUGIN_PATH: '/usr/share/java,/etc/kafka-connect/jars,/usr/share/confluent-hub-components'
volumes:
- ./connectors:/etc/kafka-connect/jars/
depends_on:
- zoo1
- kafka1
- kafka-schema-registry
- kafka-rest-proxy
command:
- bash
- -c
- |
confluent-hub install --no-prompt debezium/debezium-connector-mysql:latest
confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.4.0
/etc/confluent/docker/run
ksqldb-server:
image: confluentinc/cp-ksqldb-server:7.3.2
hostname: ksqldb-server
container_name: ksqldb-server
ports:
- "8088:8088"
environment:
KSQL_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
KSQL_LISTENERS: http://0.0.0.0:8088/
KSQL_KSQL_SERVICE_ID: ksqldb-server_
depends_on:
- zoo1
- kafka1
postgresql:
extends:
service: postgresql
file: conduktor.yml
conduktor-platform:
extends:
service: conduktor-platform
file: conduktor.yml
volumes:
pg_data: {}
conduktor_data: {}
下面是一个哑.Net Core API来生成一些消息:KafkaProducerController.cs
[ApiController]
[Route("api/kafka")]
public class KafkaProducerController : ControllerBase
{
private readonly KafkaDependentProducer<Null, string> messageProducer;
private readonly KafkaDependentProducer<Null, FileData> iDokProducer;
private string topic;
private string iDokTopic;
private readonly ProducerConfig config = new ProducerConfig
{
BootstrapServers = "localhost:9092",
MessageMaxBytes = 10000000
};
public KafkaProducerController(KafkaDependentProducer<Null, string> messageProducer, KafkaDependentProducer<Null, FileData> iDokProducer, IConfiguration config)
{
this.topic = config.GetValue<string>("Kafka:MessageTopic");
this.iDokTopic = config.GetValue<string>("Kafka:iDokTopic");
this.messageProducer = messageProducer;
this.iDokProducer = iDokProducer;
}
[HttpPost]
[Route("")]
public async Task<IActionResult> PostAsync([FromQuery] string message)
{
var kafkaMessage = new Message<Null, string> { Value = message };
await this.messageProducer.ProduceAsync(topic, kafkaMessage);
return Ok();
//return Created(string.Empty, SendToKafka("gtm-test", kafkaMessage));
}
[HttpPost("upload")]
//[Route("file")]
public async Task<IActionResult> PostFileAsync(IFormFile file)
{
if (file == null || file.Length <= 0)
{
return BadRequest("No file uploaded.");
}
var kafkaMessage = new Message<Null, FileData> { Value = { formFile = file, Info1 = "Test", Info2 = "Test2" } };
await this.iDokProducer.ProduceAsync(iDokTopic, kafkaMessage);
return Ok();
}
}
KafkaDepedentProducer.cs:
public class KafkaDependentProducer<K, V>
{
IProducer<K, V> kafkaHandle;
public KafkaDependentProducer(KafkaClientHandle handle, ISerializer<V> serializer = null)
{
if (serializer == null)
{
kafkaHandle = new DependentProducerBuilder<K, V>(handle.Handle).Build();
}
else
{
kafkaHandle = new DependentProducerBuilder<K, V>(handle.Handle)
.SetValueSerializer(serializer)
.Build();
}
}
/// <summary>
/// Asychronously produce a message and expose delivery information
/// via the returned Task. Use this method of producing if you would
/// like to await the result before flow of execution continues.
/// <summary>
public Task ProduceAsync(string topic, Message<K, V> message)
=> this.kafkaHandle.ProduceAsync(topic, message);
/// <summary>
/// Asynchronously produce a message and expose delivery information
/// via the provided callback function. Use this method of producing
/// if you would like flow of execution to continue immediately, and
/// handle delivery information out-of-band.
/// </summary>
public void Produce(string topic, Message<K, V> message, Action<DeliveryReport<K, V>> deliveryHandler = null)
=> this.kafkaHandle.Produce(topic, message, deliveryHandler);
public void Flush(TimeSpan timeout)
=> this.kafkaHandle.Flush(timeout);
}
消费者:
Program.cs
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Demo for using Kafka APIs");
Consume("gtm-test");
}
static void Consume(string topic)
{
var conf = new ConsumerConfig
{
GroupId = "st_consumer_group",
BootstrapServers = "localhost:9092",
AutoOffsetReset = AutoOffsetReset.Earliest
};
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
using (var builder = new ConsumerBuilder<Ignore,FileData>(conf)
.SetValueDeserializer(new JsonDeserializer<FileData>().AsSyncOverAsync())
.SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
.Build())
{
builder.Subscribe(new List<string>() { "gtm-test", "gtm-test-file"});
var cancelToken = new CancellationTokenSource();
try
{
while (true)
{
var consumer = builder.Consume(cancelToken.Token);
Console.WriteLine($"Message: {consumer.Message.Value} received from {consumer.TopicPartitionOffset}, Topic: {consumer.Topic}");
}
}
catch (Exception)
{
builder.Close();
}
}
}
谢谢你的帮助!
2条答案
按热度按时间yyhrrdl81#
Kafka不是用来传输大文件的。您将需要对代理(如果可能的话,您将面临许多限制)和产生/使用这些数据的应用程序(大量的阈值、缓冲区和超时)进行调优。
相反,你应该选择这两个选项之一:
(免责声明:我的公司写了关于这个更深入的https://www.conduktor.io/blog/beyond-limits-produce-large-records-without-undermining-apache-kafka/)
5m1hhzi42#
Kafka不应该用于传输大型二进制blob。使用像S3这样的系统(或本地Minio),并发送引用共享磁盘存储上文件位置的URI值。然后,您可以让您的消费者使用该字符串下载文件,而不是将二进制文件内容塞进Kafka记录中。
“元数据”可以是全新的文件(例如,相同的文件名,但后缀为
.meta
),因为S3可以被视为键-值blob存储。