当我们发送消息之前,先问几个问题:每条消息都是很关键且不能容忍丢失么?偶尔重复消息可以么?我们关注的是消息延迟还是写入消息的吞吐量?
举个例子,有一个信用卡交易处理系统,当交易发生时会发送一条消息到Kafka,另一个服务来读取消息并根据规则引擎来检查交易是否通过,将结果通过Kafka返回,
对于这样的业务,消息既不能丢失也不能重复,由于交易量大因此吞吐量需要尽可能大,延迟可以稍微高一点。
再举个例子,假如我们需要收集用户在网页上的点击数据,对于这样的场景,少量消息丢失或者重复是可以容忍的,延迟多大都不重要只要不影响用户体验,吞吐则根据实时用户数来决定。
不同的业务需要使用不同的写入方式和配置。后面我们将会讨论这些API,现在先看下生产者写消息的基本流程:
流程如下:
创建Kafka生产者有三个基本属性:
下面是一个样例代码:
private Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String, String>(kafkaProps);
创建完生产者后,我们可以发送消息。Kafka中有三种发送消息的方式:
本章的例子都是单线程发送的,但生产者对象是线程安全的,它支持多线程发送消息来提高吞吐。需要的话,我们可以使用多个生产者对象来进一步提高吞吐。
kafka 的ProducerRecord就是我们要发送的消息,里面包含了很多的信息,主要有以下几个构造函数,可以更具自己的需求选择合适的构造函数。
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {}
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {}
public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {}
public ProducerRecord(String topic, Integer partition, K key, V value) {}
puic ProducerRecord(String topic, K key, V value) {}
public ProducerRecord(String topic, V value) {}
最简单的发送消息方式如下:
@Test
public void baseSend() {
ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "Precision Products", "France");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
}
这里做了如下几件事:
我们创建了一个ProducerRecord,并且指定了主题以及消息的key/value。主题总是字符串类型的,但key/value则可以是任意类型,在本例中也是字符串。需要注意的是,这里的key/value的类型需要与serializer和生产者的类型匹配。
使用send()方法来发送消息,该方法会返回一个RecordMetadata的Future对象,但由于我们没有跟踪Future对象,因此并不知道发送结果。如前所述,这种方式可能会丢失消息。
虽然我们忽略了发送消息到broker的异常,但是我们调用send()方法时仍然可能会遇到一些异常,例如序列化异常、发送缓冲区溢出异常等等。
同步发送方式可以简单修改如下:
@Test
public void syncSend() {
ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "Precision Products", "France");
try {
Future<RecordMetadata> future = producer.send(record);
RecordMetadata recordMetadata = future.get();
System.out.println(recordMetadata.toString());
} catch (Exception e) {
e.printStackTrace();
}
}
注意到,这里使用了Future.get()来获取发送结果,如果发送消息失败则会抛出异常,否则返回一个RecordMetadata对象。发送失败异常包含:
可恢复异常指的是,如果生产者进行重试可能会成功,例如连接异常;不可恢复异常则是进行重试也不会成功的异常,例如消息内容过大。
首先了解下什么场景下需要异步发送消息。假如生产者与broker之间的网络延时为10ms,我们发送100条消息,发送每条消息都等待结果,那么需要1秒的时间。而如果我们采用异步的方式,几乎没有任何耗时,而且我们还可以通过回调知道消息的发送结果。
异步发送消息的样例如下:
@Test
public void asyncSend() {
ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "Precision Products", "France");
producer.send(record, new DemoProducerCallback());
}
private class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
} else {
System.out.println(recordMetadata.toString());
}
}
}
异步回调的类需要实现org.apache.kafka.clients.producer.Callback接口,这个接口只有一个onCompletion方法。当Kafka返回异常时,异常值不为null,代码中只是简单的打印,但我们可以采取其他处理方式。
上面我们只配置了bootstrap.servers和序列化类,其实生产者还有很多配置,上面只是使用了默认值。下面来看下这些配置参数。
acks
acks控制多少个副本必须写入消息后生产者才能认为写入成功,这个参数对消息丢失可能性有很大影响。这个参数有三种取值:
buffer.memory
这个参数设置生产者缓冲发送的消息的内存大小,如果应用调用send方法的速度大于生产者发送的速度,那么调用会阻塞或者抛出异常,具体行为取决于block.on.buffer.full(这个参数在0.9.0.0版本被max.block.ms代替,允许抛出异常前等待一定时间)参数。
compresstion.type
默认情况下消息是不压缩的,这个参数可以指定使用消息压缩,参数可以取值为snappy、gzip或者lz4。snappy压缩算法由Google研发,这种算法在性能和压缩比取得比较好的平衡;相比之下,gzip消耗更多的CPU资源,但是压缩效果也是最好的。通过使用压缩,我们可以节省网络带宽和Kafka存储成本。
retries
当生产者发送消息收到一个可恢复异常时,会进行重试,这个参数指定了重试的次数。在实际情况中,这个参数需要结合retry.backoff.ms(重试等待间隔)来使用,建议总的重试时间比集群重新选举群首的时间长,这样可以避免生产者过早结束重试导致失败。
batch.size
当多条消息发送到一个分区时,生产者会进行批量发送,这个参数指定了批量消息的大小上限(以字节为单位)。当批量消息达到这个大小时,生产者会一起发送到broker;但即使没有达到这个大小,生产者也会有定时机制来发送消息,避免消息延迟过大。
linger.ms
这个参数指定生产者在发送批量消息前等待的时间,当设置此参数后,即便没有达到批量消息的指定大小,到达时间后生产者也会发送批量消息到broker。默认情况下,生产者的发送消息线程只要空闲了就会发送消息,即便只有一条消息。设置这个参数后,发送线程会等待一定的时间,这样可以批量发送消息增加吞吐量,但同时也会增加延迟。
client.id
这个参数可以是任意字符串,它是broker用来识别消息是来自哪个客户端的。在broker进行打印日志、衡量指标或者配额限制时会用到。
max.in.flight.requests.per.connection
这个参数指定生产者可以发送多少消息到broker并且等待响应,设置此参数较高的值可以提高吞吐量,但同时也会增加内存消耗。另外,如果设置过高反而会降低吞吐量,因为批量消息效率降低。设置为1,可以保证发送到broker的顺序和调用send方法顺序一致,即便出现失败重试的情况也是如此。
timeout.ms, request.timeout.ms, metadata.fetch.timeout.ms
这些参数控制生产者等待broker的响应时间。request.timeout.ms指定发送数据的等待响应时间,metadata.fetch.timeout.ms指定获取元数据(例如获取分区的群首信息)的等待响应时间。timeout.ms则指定broker在返回结果前等待其他副本(与acks参数相关)响应的时间,如果时间到了但其他副本没有响应结果,则返回消息写入失败。
max.block.ms
这个参数指定应用调用send方法或者获取元数据方法(例如partitionFor)时的阻塞时间,超过此时间则抛出timeout异常。
max.request.size
这个参数限制生产者发送数据包的大小,数据包的大小与消息的大小、消息数相关。如果我们指定了最大数据包大小为1M,那么最大的消息大小为1M,或者能够最多批量发送1000条消息大小为1K的消息。另外,broker也有message.max.bytes参数来控制接收的数据包大小。在实际中,建议这些参数值是匹配的,避免生产者发送了超过broker限定的数据大小。
receive.buffer.bytes, send.buffer.bytes
这两个参数设置用来发送/接收数据的TCP连接的缓冲区,如果设置为-1则使用操作系统自身的默认值。如果生产者与broker在不同的数据中心,建议提高这个值,因为不同数据中心往往延迟比较大。
最后讨论下顺序保证。Kafka保证分区的顺序,也就是说,如果生产者以一定的顺序发送消息到Kafka的某个分区,那么Kafka在分区内部保持此顺序,而且消费者也按照同样的顺序消费。但是,应用调用send方法的顺序和实际发送消息的顺序不一定是一致的。举个例子,如果retries参数不为0,而max.in.flights.requests.per.session参数大于1,那么有可能第一个批量消息写入失败,但是第二个批量消息写入成功,然后第一个批量消息重试写入成功,那么这个顺序乱序的。因此,如果需要保证消息顺序,建议设置max.in.flights.requests.per.session为1,这样可以在第一个批量消息发送失败重试时,第二个批量消息需要等待。
上面提到了Kafka自带的序列化类,现在来看下如何使用其他的序列化策略。
#### 自定义序列化
如果我们发送的消息不是整数或者字符串时,我们需要自定义序列化策略或者使用通用的Avro、Thrift或者Protobuf这些序列化方案。下面来看下如何使用自定义的序列化方案,以及存在的问题。
假如我们要发送的消息对象是这么一个Customer:
public class Customer {
private int customerID;
private String customerName;
public Customer(int ID, String name) {
this.customerID = ID;
this.customerName = name;
}
public int getID() {
return customerID;
}
public String getName() {
return customerName;
}
}
那么,自定义的序列化类实现样例如下:
public class CustomerSerializer implements Serializer<Customer> {
@Override
public void configure(Map configs, boolean isKey) {
// nothing to configure
}
@Override
/** * ID+name的长度+name */
public byte[] serialize(String topic, Customer data) {
try {
byte[] serializedName;
int stringSize;
if (data == null)
return null;
else {
if (data.getName() != null) {
serializedName = data.getName().getBytes("UTF-8");
stringSize = serializedName.length;
} else {
serializedName = new byte[0];
stringSize = 0;
}
}
ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
buffer.putInt(data.getID());
buffer.putInt(stringSize);
buffer.put(serializedName);
return buffer.array();
} catch (Exception e) {
throw new SerializationException("Error when serializing Customer to byte[] " + e);
}
}
@Override
public void close() {
// nothing to close
}
}
使用CustomerSerializer进行序列化
public class CustomerSend {
KafkaProducer<String, Customer> producer = null;
@Before
public void setup() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", CustomerSerializer.class.getName());
producer = new KafkaProducer<String, Customer>(props);
}
@Test
public void baseSend() {
ProducerRecord<String, Customer> record = new ProducerRecord<String, Customer>("test", "Precision Products", new Customer(1, "kingcall"));
try {
Future<RecordMetadata> recordMetadataFuture = producer.send(record);
System.out.println(recordMetadataFuture.get().toString());
} catch (Exception e) {
e.printStackTrace();
}
}
}
使用客户端命令进行消费kafka-console-consumer --bootstrap-server localhost:9092 --property print.key=true --topic test
我们将Customer的ID和名字进行了序列化,通过这个序列化对象,我们可以发送Customer的对象消息。但这样的序列化存在很多问题,比如想要将ID升级为Long型或者增加新的Customer域时,我们需要兼容新老消息。尤其是公司内多个团队同时消费Customer数据时,他们需要同时修改代码进行兼容。
因此,建议使用JSON、Apache Avro、Thrift或者Protobuf这些成熟的序列化/反序列化方案。下面来看下如何使用Avro来进行序列化。
#### 使用Avro序列化
Apache Avro是一个语言无关的序列化方案,使用Avro的数据使用语言无关的结构来描述,例如JSON。Avro一般序列化成字节文件,当然也可以序列化成JSON形式。Kafka使用Avro的好处是,当写入消息需要升级协议时,消费者读取消息可以不受影响。
例如,原始的协议可能是这样的:
{
"namespace": "customerManagement.avro",
"type": "record",
"name": "Customer",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string""},
{"name": "faxNumber", "type": ["null", "string"], "default": "null"}
]
}
在这个例子中,id和name是必须的,而faxNumber是可选的,默认为null。
在使用这个格式一段时间后,我们需要升级协议,去掉faxNumber属性并增加email属性:
{
"namespace": "customerManagement.avro",
"type": "record",
"name": "Customer",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string""},
{"name": "email", "type": ["null", "string"], "default": "null"}
]
}
消费者在处理消息时,会通过getName()、getId()和getFaxNumber()来获取消息属性,对于新的消息,消费者获取的faxNumber会为null。如果消费者升级应用代码,调用getEmail而不是getFaxNumber,对于老的消息,getEmail会返回null。
这个例子体现了Avro的优势:即使修改消息的结构而不升级消费者代码,消费者仍然可以读取数据而不会抛出异常错误。不过需要注意下面两点:
当使用Avro序列化成文件时,我们可以将数据的结构添加到文件中;但对于Kafka,如果对于每条Avro消息我们都附上消息结构,那么将会增加差不多一倍的开销。因此,我们可以使用模式注册中心(Schema Registry)的架构模式,将消息的结构存储在注册中心,这样消费者可以从注册中心获取数据的结构。模式注册中心不是Kafka项目的一部分,但有很多开源的方案可以考虑。在下面的例子中,我们将会使用Confluent Schema Registry来作为注册中心,其开源代码见GitHub,或者我们可以通过Confluent Platform来安装。如果你计划使用Confluent Schema Registry,那么可以参考这个文档。
使用模式注册中心的话,我们需要存储消息的所有格式到注册中心,然后在消息记录中添加格式的ID,这样消费者可以通过这个ID从注册中心获取数据的模式以进行反序列化。看上去很麻烦,我们需要存储数据的结构以及在消费端拉取数据结构,但是不需要担心,这些工作已经由serializer/deserializer来完成了,应用只需要使用Avro提供的serializer即可。
整体处理流程如下:
下面是一个发送生成的Avro对象到Kafka的代码样例(代码生成信息参考Avro文档):
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", schemaUrl);
String topic = "customerContacts";
int wait = 500;
Producer<String, Customer> producer = new KafkaProducer<String, Customer>(props);
// We keep producing new events until someone ctrl-c
while (true) {
Customer customer = CustomerGenerator.getNext();
System.out.println("Generated customer " + customer.toString());
ProducerRecord<String, Customer> record = new ProducerRecord<>(topic, customer.getId(), customer);
producer.send(record);
}
其中,
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", schemaUrl);
String schemaString = "{\"namespace\": \"customerManagement.avro\",
\"type\": \"record\", " +
"\"name\": \"Customer\"," +
"\"fields\": [" +
"{\"name\": \"id\", \"type\": \"int\"}," +
"{\"name\": \"name\", \"type\": \"string\"}," +
"{\"name\": \"email\", \"type\": [\"null\",\"string\"], \"default\":\"null\" }" +
"]}";
Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props);
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(schemaString);
for (int nCustomers = 0; nCustomers < customers; nCustomers++) {
String name = "exampleCustomer" + nCustomers;
String email = "example " + nCustomers + "@example.com";
GenericRecord customer = new GenericData.Record(schema);
customer.put("id", nCustomer);
customer.put("name", name);
customer.put("email", email);
ProducerRecord<String, GenericRecord> data = new ProducerRecord<String,GenericRecord>("customerContacts",name, customer);
producer.send(data);
}
这里我们仍然使用KafkaAvroSerializer,也提供模式注册中心的地址;但我们现在需要自己提供Avro的模式,而这个之前是由Avro生成的对象来提供的,我们发送的对象是GenericRecord,在创建的时候我们提供了模式以及写入的数据。最后,serializer会知道如何从GenericRecord中取出模式并且存储到注册中心,然后序列化对象数据。
我们创建消息的时候,必须要提供主题和消息的内容,而消息的key是可选的,当不指定key时默认为null。消息的key有两个重要的作用:
如果key为null,那么生产者会使用默认的分配器,该分配器使用轮询(round-robin)算法来将消息均衡到所有分区。
如果key不为null而且使用的是默认的分配器,那么生产者会对key进行哈希并根据结果将消息分配到特定的分区。注意的是,在计算消息与分区的映射关系时,使用的是全部的分区数而不仅仅是可用的分区数。这也意味着,如果某个分区不可用(虽然使用复制方案的话这极少发生),而消息刚好被分配到该分区,那么将会写入失败。
另外,如果需要增加额外的分区,那么消息与分区的映射关系将会发生改变,因此尽量避免这种情况,所以如果业务对数据和分区之间的关系有依赖,需要注意。
这里我们简单追中一下源码,来看一下kafka 自带的分配器,producer 中的send 方法实际上调用的实这里的doSend 方法,我们看到这个方法里面调用了partition 方法来计算分区
我们看一下partition 方法
/** * computes partition for given record. * if the record has partition returns the value otherwise * calls configured partitioner class to compute the partition. */
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
// record.partition() 的返回值是null
Integer partition = record.partition();
return partition != null ?
partition :
partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
所以这里会进入计算的环节,也就是调用partitioner.partition
,这里的partitioner
就是Partitioner的实现类
而在kafka 中的实现就只有这么一个DefaultPartitioner
,代码也很加简单
public class DefaultPartitioner implements Partitioner {
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
public void configure(Map<String, ?> configs) {}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
public void close() {}
}
现在来看下如何自定义一个分配器,下面将key为Banana的消息单独放在一个分区,与其他的消息进行分区隔离:
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.utils.Utils;
public class BananaPartitioner implements Partitioner {
public void configure(Map<String, ?> configs) {}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if ((keyBytes == null) || (!(key instanceOf String)))
throw new InvalidRecordException("We expect all messages to have customer name as key")
if (((String) key).equals("Banana"))
// Banana 永远放在最后一个分区
return numPartitions; // Banana will always go to last partition
//
return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1))
}
public void close() {}
}
先随机生成一个整数(之后每次在这个整数上递增1),将这个整数和topic的分区数量进行取余得到分区号(Round-Robin算法),是kafka默认的分区分配策略,能保证负载均衡不会出现倾斜的情况。
try-catch
这是为了提高代码的健壮性,因为发送消息毕竟属于网络相关的操作,不稳定性是比较多的版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/king14bhhb/article/details/114592335
内容来源于网络,如有侵权,请联系作者删除!