无法从spark读取azure eventhub主题

ipakzgxi  于 2021-07-14  发布在  Spark
关注(0)|答案(0)|浏览(236)

环境细节
spark版本:3.x
python版本3.8和java版本8
azure-eventhubs-spark\ 2.12-2.3.17.jar

import json
from pyspark.sql import SparkSession

# the below command getOrCreate() uses the SparkSession shared across the jobs instead of using one SparkSession per job.

spark = SparkSession.builder.appName('ntorq_eventhub_load').getOrCreate()

# ntorq adls checkpoint location.

ntorq_connection_string = "connection-string"

ehConf = {}
ehConf['eventhubs.connectionString'] = spark.sparkContext._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(ntorq_connection_string)

# ehConf['eventhubs.connectionString'] = ntorq_connection_string

ehConf['eventhubs.consumerGroup'] = "$default"

OFFSET_START = "-1"   # the beginning
OFFSET_END = "@latest"

# Create the positions

startingEventPosition = {
  "offset": OFFSET_START ,
  "seqNo": -1,            #not in use
  "enqueuedTime": None,   #not in use
  "isInclusive": True
}

endingEventPosition = {
  "offset": OFFSET_END,           #not in use
  "seqNo": -1,              #not in use
  "enqueuedTime": None,
  "isInclusive": True
}

# Put the positions into the Event Hub config dictionary

ehConf["eventhubs.startingPosition"] = json.dumps(startingEventPosition)
ehConf["eventhubs.endingPosition"] = json.dumps(endingEventPosition)

df = spark \
  .readStream \
  .format("eventhubs") \
  .options(**ehConf) \
  .load() \
  .selectExpr("cast(body as string) as body_str")

df.writeStream \
    .format("console") \
    .start()

错误

21/04/25 20:17:53 WARN Utils: Your hostname,resolves to a loopback address: 127.0.0.1; using 192.168.1.202 instead (on interface en0)
21/04/25 20:17:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/04/25 20:17:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Traceback (most recent call last):
  File "/Users/PycharmProjects/pythonProject/test.py", line 12, in <module>
    ehConf['eventhubs.connectionString'] = spark.sparkContext._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(ntorq_connection_string)
TypeError: 'JavaPackage' object is not callable

代码在databricks环境下运行正常,但无法使用eventhub的所有消息每次运行前我都尝试清除默认检查点文件夹,但仍然面临此问题,因此希望在本地系统上尝试。在尝试本地环境时面临javapackage问题。谢谢你的帮助。谢谢您

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题