我希望获得一些有关代码片段的帮助,这些代码片段用于使用PySpark和Azure AD服务主体从Azure EventHubs消费事件。(我不想使用scala)
f2uvfpb91#
您仍然需要使用一些基于Java的代码来插入Azure EventHubs连接器,类似于what is shown in the connector's documentation。但是你也可以抓取this project,编译成jar文件,并将其附加到cluster/job中。然后你可以轻松地使用configure它来使用service principal进行身份验证:
jar
topic = "..." tenant_id = "..." client_id = "..." client_secret = dbutils.secrets.get("secret_scope", "sp-secret") ehs_ns_name = "eventhubs-namespace-name" callback_class_name = "net.alexott.demos.eventhubs_aad.ServicePrincipalCredentialsAuth" # Instead of `servicebus.windows.net` there could be regional endpoints ehs_endpoint = f"sb://{ehs_ns_name}.servicebus.windows.net" # EventHubs connection string. connectionString = f"Endpoint=sb://{ehs_ns_name}.servicebus.windows.net;EntityPath={topic}" # Parameters that will be passed to the callback function params = { "aad_tenant_id": tenant_id, "aad_client_id": client_id, "aad_client_secret": client_secret, } # Spark EventHubs options ehConf = { 'eventhubs.connectionString': sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString), 'eventhubs.useAadAuth': True, 'eventhubs.aadAuthCallback': callback_class_name, 'eventhubs.AadAuthCallbackParams': json.dumps(params), } df = spark.readStream.format("eventhubs").options(**ehConf).load()
因为代码是用Java编写的,所以它不应该依赖于Scala版本。它还支持通过Kafka协议访问EventHubs,也通过AAD令牌进行身份验证。
1条答案
按热度按时间f2uvfpb91#
您仍然需要使用一些基于Java的代码来插入Azure EventHubs连接器,类似于what is shown in the connector's documentation。
但是你也可以抓取this project,编译成
jar
文件,并将其附加到cluster/job中。然后你可以轻松地使用configure它来使用service principal进行身份验证:因为代码是用Java编写的,所以它不应该依赖于Scala版本。它还支持通过Kafka协议访问EventHubs,也通过AAD令牌进行身份验证。