我使用下面的代码从hdfs读取spark Dataframe :
from delta import *
from pyspark.sql import SparkSession
builder= SparkSession.builder.appName("MyApp") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark=configure_spark_with_delta_pip(builder).getOrCreate()
#change file path here
delta_df = spark.read.format("delta").load('hdfs://localhost:9000/final_project/data/2022-03-30/')
delta_df.show(10, truncate=False)
和下面的代码使用预先训练的管道:
from sparknlp.pretrained import PipelineModel
from pyspark.sql import SparkSession
import sparknlp
# spark session one way
spark = SparkSession.builder \
.appName("Spark NLP")\
.master("local[4]")\
.config("spark.driver.memory","16G")\
.config("spark.driver.maxResultSize", "0") \
.config("spark.kryoserializer.buffer.max", "2000M")\
.config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp-spark32_2.12:3.4.2").getOrCreate()
# alternate way #uncomment below to use
#spark=sparknlp.start(spark32=True)
# unzip the file and change path here
pipeline = PipelineModel.load("/home/sidd0613/final_project/classifierdl_bertwiki_finance_sentiment_pipeline_en_3.3.0_2.4_1636617651675")
print("-------")
# creating a spark data frame from the sentence
df=spark.createDataFrame([["As interest rates have increased, housing rents have also increased."]]).toDF('text')
# passing dataframe to the pipeline to derive sentiment
result = pipeline.transform(df)
#printing the result
print(result)
print("DONE!!!")
我想合并这两个代码,但这两个spark会话没有合并或不能同时为这两个任务工作。请帮助!
我尝试合并两个spark会话的.config()选项,但它不起作用。我也尝试创建两个spark会话,但它不起作用
一个普通的Spark会话足以读取其他格式的文件,但要读取delta文件,我必须严格使用此选项:使用增量点配置Spark(生成器)
有没有办法绕过这个问题?或者让代码运行?
1条答案
按热度按时间pexxcrt21#
configure_spark_with_delta_pip
只是设置SparkSession的正确参数的快捷方式...如果您查看其源代码,您将看到以下代码,您将看到它所做的一切都是配置spark.jars.packages
。但由于您将其单独用于SparkNLP,因此您将覆盖Delta的值。2022年4月14日更新:在回答时,它还没有发布,但在版本1.2.0中可用
为了处理这种情况,
configure_spark_with_delta_pip
有一个额外的参数extra_packages
来指定要配置的额外程序包。在发布带有额外参数的实现之前,您需要避免使用该函数,而只需自己配置所有参数,如下所示: