如何使用pyspark并行插入hive

qq24tv8q  于 2021-06-24  发布在  Hive
关注(0)|答案(1)|浏览(555)

我有一个工作,这是在工人之间分割,每个工人输出一个Dataframe,需要写入到配置单元,我无法想出如何从工人访问配置单元没有初始化另一个sparkcontext,所以我试图收集他们的输出,并插入它在一个时间如下

result = df.rdd.map(lambda rdd: predict_item_by_model(rdd, columns)).collect()
df_list = sc.parallelize(result).map(lambda df: hiveContext.createDataFrame(df)).collect() #throws error
mergedDF = reduce(DataFrame.union, df_list) 
mergedDF.write.mode('overwrite').partitionBy("item_id").saveAsTable("items")

但现在它抛出了这个错误 _pickle.PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063. 是否有直接从工人那里进入Hive的通道?如果没有,如何收集数据并插入一次?

bzzcjhmw

bzzcjhmw1#

.map(lambda df: hiveContext.createDataFrame(df))

这种方法根本不可能。完全不是它的工作原理。
任何spark驱动程序应用程序的第一步是创建一个sparkcontext,包括配置单元上下文(如果需要)。仅驱动程序方面。正如消息所说。
看看这里https://www.waitingforcode.com/apache-spark/serialization-issues-part-1/read 让你自己继续这个序列化问题。

相关问题