如何在Kafka-Bigquery Dataflow flex模板中设置参数

ercv8c1e  于 2023-10-15  发布在  Apache
关注(0)|答案(1)|浏览(103)

我一直在尝试将Kafka消费者连接到BigQuery。以下是我的配置-

gcloud dataflow flex-template run first-kafka --template-file-gcs-location gs://dataflow-templates-us-east4/latest/flex/Kafka_to_BigQuery --region us-east4 --worker-region us-east4 --subnetwork <subnetwork-url> --parameters inputTopics=topic1,bootstrapServers=<bootstrap server>,outputTableSpec=<Bigquery table>,stagingLocation=gs://<xxx>/staging-dataflow,dataflowKmsKey=<CMS Key>,[email protected]

protocol=SASL_SSL,它还使用用户名和密码,如xyz.conf

KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="abcd" password="1234";};

我也有JKS信托商店文件。
我的问题是我怎样才能提供这些配置到Anglow模板?

  1. security.protocol = SASL_SSL
  2. java.security.auth.login.config = xyz.conf
    1.如果需要客户端信任库JKS,那么我如何也提供它?(目前我在我的云存储桶中有这些)注意:我的客户端机器在AWS MSK上运行
    错误我得到没有设置这些配置是-
    java.lang.RuntimeException:org.apache.kafka.common.errors.TimeoutException:获取主题元数据时超时已过期
yjghlzjz

yjghlzjz1#

要在连接到Kafka时向Dataflow Flex Template提供自定义Kafka客户端配置,包括security.protocoljava.security.auth.login.config和信任存储区设置,您可以使用--parameters标志来传入其他配置值。以下是如何在gcloud命令中包含这些配置:

gcloud dataflow flex-template run first-kafka \
    --template-file-gcs-location gs://dataflow-templates-us-east4/latest/flex/Kafka_to_BigQuery \
    --region us-east4 \
    --worker-region us-east4 \
    --subnetwork <subnetwork-url> \
    --parameters inputTopics=topic1,bootstrapServers=<bootstrap server>,outputTableSpec=<Bigquery table>,stagingLocation=gs://<xxx>/staging-dataflow,dataflowKmsKey=<CMS Key>,[email protected],\
    securityProtocol=SASL_SSL,loginConfigFile=xyz.conf,truststoreLocation=<gs://path/to/truststore.jks>

在这个命令中,我添加了三个新参数:

  1. securityProtocol:此参数允许您为Kafka指定security.protocol配置,例如SASL_SSL
  2. loginConfigFile:此参数用于java.security.auth.login.config配置,应该指向您的xyz.conf文件。
  3. truststoreLocation:此参数应指向信任库JKS文件在Google Cloud Storage中的位置。请确保将<gs://path/to/truststore.jks>替换为到您的信任库的实际GCS路径。
    现在,您需要修改Dataflow管道代码以读取这些参数并在Kafka消费者配置中设置它们。
    在您的Dataflow管道代码中(可能在@Setup方法中),您可以使用RuntimeValueProvider接口访问这些参数。下面是一个如何检索这些值并在Kafka消费者配置中设置它们的示例:
@Setup
public void setup() {
    String securityProtocol = options.getSecurityProtocol();
    String loginConfigFile = options.getLoginConfigFile();
    String truststoreLocation = options.getTruststoreLocation();

    Properties kafkaProps = new Properties();
    kafkaProps.put("security.protocol", securityProtocol);
    kafkaProps.put("java.security.auth.login.config", loginConfigFile);

    // Configure truststore (if required)
    if (truststoreLocation != null && !truststoreLocation.isEmpty()) {
        kafkaProps.put("ssl.truststore.location", truststoreLocation);
        // You may need to provide other truststore-related settings here
    }

    // Create Kafka consumer with the configured properties
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(kafkaProps);
    
    // Perform other Kafka consumer setup and processing
    // ...
}

这段代码读取传递给Dataflow作业的参数,在Kafka消费者的属性中设置它们,并配置信任库(如果提供)。确保Dataflow管道代码使用options对象访问传递给作业的参数。
有了这些配置,您的Dataflow作业应该能够使用所需的安全设置和自定义配置连接到Kafka。

相关问题