在EventHub上使用Spark-Kafka适配器的High Egress

jaql4c8m  于 2023-08-06  发布在  Apache
关注(0)|答案(1)|浏览(127)

我们有一个EventHub命名空间,包含2个EventHub、32个分区和20个吞吐量单元。其中一个EventHub处于休眠状态,吞吐量非常小,另一个吞吐量高得多,占传出消息的90%以上。每个吞吐量单位(TU)允许较低的(?)的2 MB或每秒4,096条消息,因此我们应该能够每分钟提取2 x 20 x 60 MB(2.4GB)或4,096 x 20 x 60(4,915,200)条消息。
我们正在生成的事件的最大大小为~ 40 kb。我们正在使用Spark结构化流,微批大小为32偏移量(1.28 MB),大约每1.5秒触发一次-因此估计的出口速率为每分钟51.2 MB(约为最大允许出口的2.1%)。
我们观察到每分钟传出消息的字节数比预期的要高得多,因此我们受到了限制。我们已经确认每个微批次的大小远低于所报道的大小。有什么想法是什么可能会导致这种不一致?另一点需要注意的是,我们使用的是Spark-Kafka adapter,而不是Microsoft的Spark Eventhub Connector
Spark UI

Eventhub指标(1分钟时间粒度)

spark
  .readStream 
  .format("kafka") 
  .option("kafka.bootstrap.servers", "xxxxx") 
  .option("kafka.sasl.mechanism", "PLAIN") 
  .option("kafka.security.protocol", "SASL_SSL") 
  .option("kafka.sasl.jaas.config", EH_SASL) 
  .option("subscribe", "xxxxx") 
  .option("sasl.mechanisms", "SCRAM-SHA-256")
  .option("startingOffsets", "earliest") 
  .option("includeHeaders", true)
  .option("truncate", false)
  .option("failOnDataLoss", false)
  .option("maxOffsetsPerTrigger", 32)
  .load
  .writeStream
  .format("noop")
  .option("checkpointLocation", "xxxxx")
  .trigger(Trigger.AvailableNow)
  .start

字符串

lc8prwob

lc8prwob1#

最后,我们用Spark Eventhub Connector替换了Spark-Kafka适配器,并且报告的指标与给定流配置的预期更加一致。我们做了三次测试

  • 每次触发1,000个偏移(1秒触发)
  • 导致每分钟约140 MB的流出。
  • 无节流
  • 每次触发10,000个偏移(1秒触发)
  • 导致每分钟约1.5GB的出口。
  • 无节流
  • 每次触发15,000个偏移(1秒触发)
  • 导致每分钟约2GB的出口。
  • 一些(最小)节流

请注意,图中的峰值可以通过消息大小的不一致来解释--消息大小更一致的用户应该看到一个更平坦的图。
x1c 0d1x的数据

import org.apache.spark.sql.streaming.Trigger

val eventHubsConf = EventHubsConf(connectionString)
  .setStartingPosition(EventPosition.fromStartOfStream)
  .setMaxEventsPerTrigger(15000)
  .setConsumerGroup("testytest")
  .setReceiverTimeout(java.time.Duration.ofSeconds(30))
  .setOperationTimeout(java.time.Duration.ofSeconds(30))

spark
  .readStream
  .format("eventhubs")
  .options(eventHubsConf.toMap)
  .load
  .writeStream
  .format("noop")
  .trigger(Trigger.ProcessingTime("1 seconds"))
  .option("checkpointLocation", "xxxxx")
  .start

字符串

相关问题