pyspark Azure事件中心的Spark Streaming AvailableNow触发器?

ruarlubt  于 2022-11-01  发布在  Spark
关注(0)|答案(1)|浏览(98)

我正在尝试使用带有availableNow触发器的Spark流将数据从Azure Event Hub摄取到Databricks中的Delta Lake表中。
我的代码:

conn_str = "my conn string"
ehConf = {
  "eventhubs.connectionString": 
    spark.sparkContext._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(conn_str),
  "eventhubs.consumerGroup":
    "my-consumer-grp",
}

read_stream = spark.readStream \
  .format("eventhubs") \
  .options(**ehConf) \
  .load()

stream = read_stream.writeStream \
  .format("delta") \
  .option("checkpointLocation", checkpoint_location) \
  .trigger(availableNow=True) \
  .toTable(full_table_name, mode="append")

根据文档https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers,availableNow触发器应该以微批处理方式处理当前可用的所有数据。
然而,这并没有发生,相反,它只处理了1000行。流的输出说明了这一点:

{
"sources" : [ {
    "description" : "org.apache.spark.sql.eventhubs.EventHubsSource@2c5bba32",
    "startOffset" : {
      "my-hub-name" : {
        "0" : 114198857
      }
    },
    "endOffset" : {
      "my-hub-name" : {
        "0" : 119649573
      }
    },
    "latestOffset" : {
      "my-hub-name" : {
        "0" : 119650573
      }
    },
    "numInputRows" : 1000,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 36.1755236407047
  } ]
}

我们可以清楚地看到偏移量的变化超过了1000次处理的方式。
我已验证目标表的内容,它包含最后1000个偏移。
根据Pyspark的事件中心配置https://github.com/Azure/azure-event-hubs-spark/blob/master/docs/PySpark/structured-streaming-pyspark.md#event-hubs-configuration
默认情况下,maxEventsPerTrigger设置为1000*partitionCount,但这只会影响每个批处理处理的事件数,而不会影响availableNow触发器处理的记录总数。
运行触发器为once=True的相同查询将改为摄取所有事件(假设批处理大小设置得足够大)。
availableNow触发器似乎未按Azure事件中心的预期工作。

jv4diomz

jv4diomz1#

“avaiableNow”触发器似乎尚未在“azure-event-hub-spark”包中实现。
但是,使用Kafka连接器到Azure Event Hub -https://github.com/Azure/azure-event-hubs-for-kafka/tree/master/tutorials/spark,有一个变通办法
所以前面的代码基本上变成了

bootstrap_servers = "my-evh-namespace.servicebus.windows.net:9093"
eventhub_endpoint = "my-evh-namespace-endpoint"

# The 'kafkashaded' part here is because it's running in Databricks.

# Otherwise drop that part.

EH_SASL = f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{eventhub_endpoint}\";"

topic = "my-eventhub-name"

read_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", bootstrap_servers) \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.jaas.config", EH_SASL) \
    .option("subscribe", topic) \
    .option("maxOffsetsPerTrigger", 1000) \
    .option("startingOffsets", "earliest") \
    .option("includeHeaders", "true") \
    .load()

# Notice that the output writeStream remains the same.

stream = read_stream.writeStream \
  .format("delta") \
  .option("checkpointLocation", checkpoint_location) \
  .trigger(availableNow=True) \
  .toTable(full_table_name, mode="append")

这将导致流按预期执行-以maxOffsetsPerTrigger大小的批接收直到开始时间的所有事件

相关问题