我正在尝试在Azure Databricks中创建自动加载器流。现在,当我试图启动writeStream时,我会遇到一个异常:com.databricks.sql.cloudfiles.errors.CloudFilesException: Failed to create an Event Grid subscription. Please make sure that your service principal has 'write' permissions (e.g., assign it a Contributor role) on the storage account rahulstorageek in order to create Event Grid Subscriptions
下面是我使用的代码:
# spark and cloudFiles configurations
spark.conf.set("fs.azure.account.key.<My ADLS Gen2 Storage account name>.blob.core.windows.net",
"<Access Key 2 of my Storage Account>")
queuesas = "<SAS Connection String for Queue Storage>"
cloudfilesConf = {
"cloudFiles.subscriptionId": "<Azure Free Trial Subscription Id>",
"cloudFiles.connectionString": queuesas,
"cloudFiles.format" : "csv",
"cloudFiles.tenantId": "<Service Principals tenant Id>",
"cloudFiles.clientId": "<Service Principals client Id>",
"cloudFiles.clientSecret": "<Service Principals generated client secret Value>",
"cloudFiles.resourceGroup" : "AzureDataBricks_Exploration_RG",
"cloudFiles.useNotifications": "true",
"cloudFiles.includeExistingFiles": "true",
"cloudFiles.validateOptions": "true",
}
# Creating manual schema of incoming data
from pyspark.sql.functions import *
from pyspark.sql.types import *
dataset_schema = StructType([
StructField("YearsExperience", DoubleType(), True),
StructField("Salary", IntegerType(), True)])
# Autoloader ReadStream
autoloader_df = (spark.readStream.format("cloudFiles")
.options(**cloudfilesConf)
.option("recursiveFileLookup","true")
.schema(dataset_schema)
.load("/mnt/autoloadersource/csv_files/")
)
# Autoloader Writestream
(autoloader_df.writeStream
.format("delta")
.option("mergeSchema", "true")
.option("checkpointLocation", "/mnt/autoloadersink/autostream_chckpnt")
.start("/mnt/autoloadersink/autoloader_dt01"))
## Exception is raised after executing this above command.
我已经为我正在使用的服务主体指定了以下角色。
此外,我为Queue生成的SAS令牌具有以下参数:
我已经试过把所有额外的角色给服务主体,你可以在上面的截图中看到,但是,我仍然得到同样的错误。
任何解决方案或建议都将受到高度重视。
1条答案
按热度按时间bfhwhh0e1#
您的权限不足,因为需要创建事件网格。文档明确规定了必要的角色:
Contributor
-将用于设置存储帐户中的资源,例如队列和事件订阅。Storage Queue Data Contributor
:将用于执行队列操作,例如从队列中检索和删除消息。(如果使用DBR 8.1+并提供连接字符串,则可能不需要)。EventGrid EventSubscription Contributor
:将用于执行事件网格订阅操作。