我一直在尝试将Kafka消费者连接到BigQuery。以下是我的配置-
gcloud dataflow flex-template run first-kafka --template-file-gcs-location gs://dataflow-templates-us-east4/latest/flex/Kafka_to_BigQuery --region us-east4 --worker-region us-east4 --subnetwork <subnetwork-url> --parameters inputTopics=topic1,bootstrapServers=<bootstrap server>,outputTableSpec=<Bigquery table>,stagingLocation=gs://<xxx>/staging-dataflow,dataflowKmsKey=<CMS Key>,[email protected]
protocol=SASL_SSL,它还使用用户名和密码,如xyz.conf
KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="abcd" password="1234";};
我也有JKS信托商店文件。
我的问题是我怎样才能提供这些配置到Anglow模板?
- security.protocol = SASL_SSL
- java.security.auth.login.config = xyz.conf
1.如果需要客户端信任库JKS,那么我如何也提供它?(目前我在我的云存储桶中有这些)注意:我的客户端机器在AWS MSK上运行
错误我得到没有设置这些配置是-
java.lang.RuntimeException:org.apache.kafka.common.errors.TimeoutException:获取主题元数据时超时已过期
1条答案
按热度按时间yjghlzjz1#
要在连接到Kafka时向Dataflow Flex Template提供自定义Kafka客户端配置,包括
security.protocol
、java.security.auth.login.config
和信任存储区设置,您可以使用--parameters
标志来传入其他配置值。以下是如何在gcloud
命令中包含这些配置:在这个命令中,我添加了三个新参数:
securityProtocol
:此参数允许您为Kafka指定security.protocol
配置,例如SASL_SSL
。loginConfigFile
:此参数用于java.security.auth.login.config
配置,应该指向您的xyz.conf
文件。truststoreLocation
:此参数应指向信任库JKS文件在Google Cloud Storage中的位置。请确保将<gs://path/to/truststore.jks>
替换为到您的信任库的实际GCS路径。现在,您需要修改Dataflow管道代码以读取这些参数并在Kafka消费者配置中设置它们。
在您的Dataflow管道代码中(可能在
@Setup
方法中),您可以使用RuntimeValueProvider
接口访问这些参数。下面是一个如何检索这些值并在Kafka消费者配置中设置它们的示例:这段代码读取传递给Dataflow作业的参数,在Kafka消费者的属性中设置它们,并配置信任库(如果提供)。确保Dataflow管道代码使用
options
对象访问传递给作业的参数。有了这些配置,您的Dataflow作业应该能够使用所需的安全设置和自定义配置连接到Kafka。