如何使用Azure Databricks(Pyspark)使用Azure AD服务主体从Azure事件中心使用事件中心?

c9qzyr3d  于 2023-03-22  发布在  Spark
关注(0)|答案(1)|浏览(177)

我希望获得一些有关代码片段的帮助,这些代码片段用于使用PySpark和Azure AD服务主体从Azure EventHubs消费事件。
(我不想使用scala)

f2uvfpb9

f2uvfpb91#

您仍然需要使用一些基于Java的代码来插入Azure EventHubs连接器,类似于what is shown in the connector's documentation
但是你也可以抓取this project,编译成jar文件,并将其附加到cluster/job中。然后你可以轻松地使用configure它来使用service principal进行身份验证:

topic = "..."
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("secret_scope", "sp-secret")
ehs_ns_name = "eventhubs-namespace-name"
callback_class_name = "net.alexott.demos.eventhubs_aad.ServicePrincipalCredentialsAuth"
# Instead of `servicebus.windows.net` there could be regional endpoints
ehs_endpoint = f"sb://{ehs_ns_name}.servicebus.windows.net"

# EventHubs connection string.
connectionString = f"Endpoint=sb://{ehs_ns_name}.servicebus.windows.net;EntityPath={topic}"

# Parameters that will be passed to the callback function
params = {
  "aad_tenant_id": tenant_id,
  "aad_client_id": client_id,
  "aad_client_secret": client_secret,
}

# Spark EventHubs options
ehConf = {
  'eventhubs.connectionString': sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString),
  'eventhubs.useAadAuth': True,
  'eventhubs.aadAuthCallback': callback_class_name,
  'eventhubs.AadAuthCallbackParams': json.dumps(params),
}

df = spark.readStream.format("eventhubs").options(**ehConf).load()

因为代码是用Java编写的,所以它不应该依赖于Scala版本。它还支持通过Kafka协议访问EventHubs,也通过AAD令牌进行身份验证。

相关问题