kafka的java客户端-生产者

x33g5p2x  于2022-06-30 转载在 Kafka  
字(8.6k)|赞(0)|评价(0)|浏览(687)

kafka的java客户端-生产者

生产者消息发送流程

发送原理

在消息发送的过程中,涉及俩个线程,main线程和sender线程,在main线程中创建一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafkabroker

生产者重要参数列表

参数名称描述
bootstrap.servers生产者连接集群所需的 broker 地址清单。例如<br>hadoop102:9092,hadoop103:9092,hadoop104:9092,可以设置 1 个或者多个,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者从给定的 broker里查找到其他 broker信息
key.serializer和 value.serializer指定发送消息的 key 和 value 的序列化类型。一定要写全类名
buffer.memoryRecordAccumulator 缓冲区总大小,默认 32m。
batch.size缓冲区一批数据最大值,默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加
linger.ms如果数据迟迟未达到 batch.size,sender 等待 linger.time之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms之间
acks0:生产者发送过来的数据,不需要等数据落盘应答。<br>1:生产者发送过来的数据,Leader收到数据后应答。<br>-1(all):生产者发送过来的数据,Leader+和 isr 队列里面的所有节点收齐数据后应答。默认值是-1,-1 和all是等价的
max.in.flight.requests.per.connection允许最多没有返回 ack 的次数,默认为 5,开启幂等性要保证该值是 1-5 的数字
retries当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是 int最大值,2147483647。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了
retry.backoff.ms两次重试之间的时间间隔,默认是 100ms
enable.idempotence是否开启幂等性,默认 true,开启幂等性
compression.type生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。支持压缩类型:none、gzip、snappy、lz4 和 zstd

生产者发送消息

依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.4.1</version>
</dependency>

生产者同步发送消息

public class MyProducer {

    private final static String TOPIC_NAME = "my-topic-test-1";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1.创建kafka生产者配置对象
        Properties props = new Properties();
        //2.给 kafka 配置对象添加配置信息:bootstrap.servers
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "1.14.252.45:19092,1.14.252.45:19093,1.14.252.45:19094");
        //key,value 序列化(必须)
        //把发送的key从字符串序列化为字节数组
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //把发送消息value从字符串序列化为字节数组
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 3. 创建 kafka 生产者对象
        Producer<String, String> producer = new KafkaProducer<>(props);

        Order order = new Order((long) 1, 100);
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, order.getId().toString(), JSON.toJSONString(order));
        //4. 调用 send 方法,发送消息
        RecordMetadata metadata = producer.send(producerRecord).get();
        producer.close();
        //=====阻塞=======
        System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset());
    }
}

@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class Order {
    private Long id;
    private Integer num;
}

因为这里我们选择的是同步发送消息,在收到kafka的ack告知发送成功之前一直处于阻塞状态

这里我们进入docker部署的kafka02容器中开启消费者,然后重新在运行一遍消费者

发送消息在指定分区上

ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME,0, order.getId().toString(), JSON.toJSONString(order));

不指定分区,按照分区策略

未指定发送分区,具体发送的分区计算公式:hash(key)%partitionNum

ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME,order.getId().toString(), JSON.toJSONString(order));

带回调函数的异步发送

默认就是异步发送

回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

生产者发消息,发送完后不用等待broker给回复,直接执行下面的业务逻辑。可以提供callback,让broker异步的调用callback,告知生产者,消息发送的结果

public class MyProducer2 {
    public static void main(String[] args) throws InterruptedException {

        //1.创建kafka生产者配置对象
        Properties props = new Properties();
        //2.给 kafka 配置对象添加配置信息:bootstrap.servers
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "1.14.252.45:19092,1.14.252.45:19093,1.14.252.45:19094");
        //key,value 序列化(必须)
        //把发送的key从字符串序列化为字节数组
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //把发送消息value从字符串序列化为字节数组
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 3. 创建 kafka 生产者对象
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 5; i++) {
            Order order = new Order((long) i, 100);
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("first", order.getId().toString(), JSON.toJSONString(order));
            //4. 调用 send 方法,发送消息
            producer.send(producerRecord, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        System.err.println("发送消息失败:" +
                                exception.getStackTrace());
                    }
                    if (metadata != null) {
                        System.out.println("异步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset());
                    }
                }
            });
        }
        producer.close();
    }
}
  1. 开启 Kafka 消费者
  2. 运行idea中的代码

生产者ack参数配置

在同步发消息的场景下:生产者发动broker上后,ack会有 3 种不同的选择

  1. acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息。
  2. acks=1: 至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入。就可以继续发送下一条消息。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失
  3. acks=-1或all: 需要等待 min.insync.replicas(默认为 1 ,推荐配置大于等于2) 这个参数配置的副本个数都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置
props.put(ProducerConfig.ACKS_CONFIG, "1");

数据完全可靠条件 = ACK 级别设置为-1 + 分区副本大于等于2 + ISR 里应答的最小 副本 数量大于等于2

public class CustomProducerAcks {

    public static void main(String[] args) {

        // 0 配置
        Properties properties = new Properties();

        // 连接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

        // 指定对应的key和value的序列化类型 key.serializer
//        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // acks
        properties.put(ProducerConfig.ACKS_CONFIG,"1");

        // 重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG,3);

        // 1 创建kafka生产者对象
        // "" hello
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2 发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first","atguigu"+i));
        }

        // 3 关闭资源
        kafkaProducer.close();
    }
}

生产者如何提高吞吐量

public class CustomProducerParameters {

    public static void main(String[] args) {

        // 配置
        Properties properties = new Properties();

        // 连接kafka集群
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

        // 序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 缓冲区大小
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);

        // 批次大小
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);

        // linger.ms
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);

        // 压缩
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");

        // 1 创建生产者
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2 发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first","atguigu"+i));
        }

        // 3 关闭资源
        kafkaProducer.close();
    }
}

生产者数据去重

幂等性

幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。
精确一次( (Exactly Once) ) = 幂等性 + 至少一次( ( ack=-1 + 分区副本数>=2 + ISR 最小副本数量>=2) )
如何使用幂等性

开启参数 enable.idempotence 默认为 true,false关闭

生产者事务

事务相关API

// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws
ProducerFencedException;
// 4 提交事务
void commitTransaction() throws ProducerFencedException;
// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

单个 Producer,使用事务保证消息的仅一次发送

public class CustomProducerTranactions {

    public static void main(String[] args) {

        // 0 配置
        Properties properties = new Properties();

        // 连接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");

        // 指定对应的key和value的序列化类型 key.serializer
//        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 指定事务id
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tranactional_id_01");

        // 1 创建kafka生产者对象
        // "" hello
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        kafkaProducer.initTransactions();

        kafkaProducer.beginTransaction();

        try {
            // 2 发送数据
            for (int i = 0; i < 5; i++) {
                kafkaProducer.send(new ProducerRecord<>("first", "atguigu" + i));
            }

            int i = 1 / 0;

            kafkaProducer.commitTransaction();
        } catch (Exception e) {
            kafkaProducer.abortTransaction();
        } finally {
            // 3 关闭资源
            kafkaProducer.close();
        }
    }
}

相关文章