databricks激发了将结构化流写入大量接收器的最佳实践?

ht4b089n  于 2021-05-19  发布在  Spark
关注(0)|答案(1)|浏览(513)

我正在使用databricks spark 3.x,我正在读取大量的流(100+),每个流都有自己的契约,需要写出自己的delta/parquet/sql/whatever表。虽然这是许多流,但每个流的活动量很低—有些流一天可能只能看到数百条记录。我想流,因为我的目标是一个相当低的延迟方法。
下面是我要说的(代码为简单而缩写;我正确地使用了检查点、输出模式等)。假设 schemas 变量包含每个主题的架构。我尝试过这种方法,在这里我创建了大量的单个流,但是它需要大量的计算,而且大部分都被浪费了:

  1. def batchprocessor(topic, schema):
  2. def F(df, batchId):
  3. sql = f'''
  4. MERGE INTO SOME TABLE
  5. USING SOME MERGE TABLE ON SOME CONDITION
  6. WHEN MATCHED
  7. UPDATE SET *
  8. WHEN NOT MATCHED
  9. INSERT *
  10. '''
  11. df.createOrReplaceTempView(f"SOME MERGE TABLE")
  12. df._jdf.sparkSession().sql(sql)
  13. return F
  14. for topic in topics:
  15. query = (spark
  16. .readStream
  17. .format("delta")
  18. .load(f"/my-stream-one-table-per-topic/{topic}")
  19. .withColumn('json', from_json(col('value'),schemas[topic]))
  20. .select(col('json.*'))
  21. .writeStream
  22. .format("delta")
  23. .foreachBatch(batchProcessor(topic, schema))
  24. .start())

我还尝试创建一个流来进行大量过滤,但即使在我将单个消息推送到单个主题的测试环境中,性能也相当糟糕:

  1. def batchprocessor(df, batchId):
  2. df.cache()
  3. for topic in topics:
  4. filteredDf = (df.filter(f"topic == '{topic}'")
  5. .withColumn('json', from_json(col('value'),schemas[topic]))
  6. .select(col('json.*')))
  7. sql = f'''
  8. MERGE INTO SOME TABLE
  9. USING SOME MERGE TABLE ON SOME CONDITION
  10. WHEN MATCHED
  11. UPDATE SET *
  12. WHEN NOT MATCHED
  13. INSERT *
  14. '''
  15. filteredDf.createOrReplaceTempView(f"SOME MERGE TABLE")
  16. filteredDf._jdf.sparkSession().sql(sql)
  17. df.unpersist()
  18. query = (spark
  19. .readStream
  20. .format("delta")
  21. .load(f"/my-stream-all-topics-in-one-but-partitioned")
  22. .writeStream
  23. .format("delta")
  24. .foreachBatch(batchProcessor)
  25. .start())

有没有什么好的方法可以对这样的流进行实质性的解复用?它已经被分区了,所以我假设查询规划器没有做太多的冗余工作,但是看起来仍然有大量的开销。

lh80um4z

lh80um4z1#

我运行了一系列基准测试,选项2更有效。我还不知道为什么。

相关问题