我们正在尝试将sql server jdbc连接器与kafkaavroserializer结合使用,并提供定制的producerinterceptor,以便在将数据发送到kafka之前对其进行加密。
在使用者端,我们希望使用flink连接器来解密,然后使用适当的反序列化程序。
为了达到这个目的,我们有几个问题:
1) 如果我们提供定制的consumerinterceptor来解密数据,那么当我们在flink中创建数据流时,应该通过属性文件传递吗?
Properties properties = new Properties();
...
`properties.setProperty("consumer.interceptor.classes": "OurCusromDecryptConsumerInterceptor")`;
...
DataStream<String> stream = env.addSource(new FlinkKafkaConsumer011<>("sqlserver-foobar", ???, properties));
上面的配置是否正确,或者我是否需要设置任何其他属性,以便将consumerinterceptor传入flink?
2) 另一个问题是关于flink中的反序列化程序。例如,我在网上查了一下,发现很少有这样的代码片段:
public class ConfluentAvroDeserializationSchema implements DeserializationSchema<Type ??> {
private final String schemaRegistryUrl;
private final int identityMapCapacity;
private KafkaAvroDecoder kafkaAvroDecoder;
public ConfluentAvroDeserializationSchema(String schemaRegistyUrl) {
this(schemaRegistyUrl, 1000);
}
因此,如果我们使用jdbc连接器向kafka传递数据而不做任何修改(除了加密数据),那么在反序列化过程中我们应该提供什么样的数据类型?我们将在反序列化之前解密数据。
public class ConfluentAvroDeserializationSchema implements DeserializationSchema<Type ??> {
提前谢谢,
1条答案
按热度按时间wfveoks01#
只需添加最终结果,就可以帮助任何正在寻找相同结果的人: