Kafka MSK Java生产者/消费者,在AVRO中具有键和值,并使用粘合模式注册表

umuewwlo  于 2022-12-03  发布在  Apache
关注(0)|答案(1)|浏览(259)

我正在尝试使用MSK连接Mysql CDC连接器,同时使用键和值作为AVRO模式,使用粘合模式注册表(GSR)。
当我使用confluent模式注册表执行此操作时,键和值的模式名称将类似于serverName.schemaName.tableName_key和serverName.schemaName.tableName_value。
但是,当我使用GSR时,键模式和值模式都是以serveName.schemaName.tableName的形式出现的,因此,试图覆盖每个模式都失败了。
所以我想出的变通办法是对键和值使用两个不同的注册表,它很有效。我还能够使用JDBC接收器连接器阅读主题。
然而,我无法弄清楚如何编写一个简单的java生产者/消费者来编写/读取“键和值都是AVRO”的主题,并为键和值模式使用两个不同的注册表。
我看了GSR github中的代码,但是在AWSSchemaRegistryConstants类中只有一个注册表名和一个模式名,所以不确定如何传递两个不同的注册表和模式。

/**
 * Registry Name.
 */
 public static final String REGISTRY_NAME = "registry.name";
 /**
 * Schema name.
 */
 public static final String SCHEMA_NAME = "schemaName";
ffscu2ro

ffscu2ro1#

@OneCricketeer,这很好用..再次感谢你在这里帮助我。
对于reference https://github.com/awslabs/aws-glue-schema-registry/issues/234,这是我所做的。
在通用项目中创建自定义类

import com.amazonaws.services.schemaregistry.common.AWSSchemaNamingStrategy
    
class MySchemaNamingStrategy extends AWSSchemaNamingStrategy {
    @Override 
    public String getSchemaName(String transportName, String, data, boolean isKey) {
       return transportName + (isKey ? "-key" : "-value");
    }
    @Override 
    public String getSchemaName(String transportName) {
        return transportName + "-value";
    }
}

在MSK Connect上,我设置了以下参数

key.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
key.converter.schemaNameGenerationClass=com.amazonaws.services.schemaregistry.common.MySchemaNamingStrategy
value.converter.schemaNameGenerationClass=com.amazonaws.services.schemaregistry.common.MySchemaNamingStrategy
key.converter.registryName=MyRegistry
value.converter.registryName=MyRegistry
key.converter.region=us-east-1
value.converter.region=us-east-1
key.converter.schemaAutoRegistrationEnabled=true
value.converter.schemaAutoRegistrationEnabled=true
key.converter.avroRecordType=GENERIC_RECORD
value.converter.avroRecordType=GENERIC_RECORD

这将为每个表创建具有适当模式的主题
则Java消费者将类似于

Properties props = new Properties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupName);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());

props.put(AWSSchemaRegistryConstants.AWS_REGION, regionName);
props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());
props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, registryName);
props.put(AWSSchemaRegistryConstants.SCHEMA_NAMING_GENERATION_CLASS , "com.amazonaws.services.schemaregistry.common.ConfluentSchemaNamingStrategy");

try {
    final KafkaConsumer<GenericRecord, GenericRecord> consumer = new KafkaConsumer<GenericRecord, GenericRecord>(props);
    consumer.subscribe(Collections.singletonList(topicName));

    while (true) {
        final ConsumerRecords<GenericRecord, GenericRecord> records = consumer.poll(Duration.ofMillis(1000) );
        System.out.println("Received messages : count = " + records.count());
        for (final ConsumerRecord<GenericRecord, GenericRecord> record : records) {
            final String key = (record.key() == null ? "NULL_KEY" : record.key().toString());
            final String value = (record.value() == null ? "NULL_VALUE" : record.value().toString());
            System.out.println("Received message: key = " + key );
            System.out.println("Received message: value = " + value);
        }
    }
} catch (Exception e) {
    e.printStackTrace();
}

相关问题