HDFS 尝试在spark Dataframe 上使用johnsnow预训练管道,但无法在同一会话中读取增量文件

sz81bmfz  于 2022-12-09  发布在  HDFS
关注(0)|答案(1)|浏览(274)

我使用下面的代码从hdfs读取spark Dataframe :

from delta import *
from pyspark.sql import SparkSession


builder= SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark=configure_spark_with_delta_pip(builder).getOrCreate()

#change file path here

delta_df = spark.read.format("delta").load('hdfs://localhost:9000/final_project/data/2022-03-30/')

delta_df.show(10, truncate=False)

和下面的代码使用预先训练的管道:

from sparknlp.pretrained import PipelineModel
from pyspark.sql import SparkSession
import sparknlp

# spark session one way
spark = SparkSession.builder \
    .appName("Spark NLP")\
    .master("local[4]")\
    .config("spark.driver.memory","16G")\
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.kryoserializer.buffer.max", "2000M")\
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp-spark32_2.12:3.4.2").getOrCreate()

# alternate way #uncomment below to use
#spark=sparknlp.start(spark32=True)

# unzip the file and change path here
pipeline = PipelineModel.load("/home/sidd0613/final_project/classifierdl_bertwiki_finance_sentiment_pipeline_en_3.3.0_2.4_1636617651675")

print("-------")

# creating a spark data frame from the sentence
df=spark.createDataFrame([["As interest rates have increased, housing rents have also increased."]]).toDF('text')

# passing dataframe to the pipeline to derive sentiment
result = pipeline.transform(df)

#printing the result
print(result)

print("DONE!!!")

我想合并这两个代码,但这两个spark会话没有合并或不能同时为这两个任务工作。请帮助!
我尝试合并两个spark会话的.config()选项,但它不起作用。我也尝试创建两个spark会话,但它不起作用
一个普通的Spark会话足以读取其他格式的文件,但要读取delta文件,我必须严格使用此选项:使用增量点配置Spark(生成器)
有没有办法绕过这个问题?或者让代码运行?

pexxcrt2

pexxcrt21#

configure_spark_with_delta_pip只是设置SparkSession的正确参数的快捷方式...如果您查看其源代码,您将看到以下代码,您将看到它所做的一切都是配置spark.jars.packages。但由于您将其单独用于SparkNLP,因此您将覆盖Delta的值。
2022年4月14日更新:在回答时,它还没有发布,但在版本1.2.0中可用
为了处理这种情况,configure_spark_with_delta_pip有一个额外的参数extra_packages来指定要配置的额外程序包。

builder = SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", 
            "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.driver.memory","16G")\
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.kryoserializer.buffer.max", "2000M")

my_packages = ["com.johnsnowlabs.nlp:spark-nlp-spark32_2.12:3.4.2"]

spark=configure_spark_with_delta_pip(builder, extra_packages=my_packages) \
  .getOrCreate()

在发布带有额外参数的实现之前,您需要避免使用该函数,而只需自己配置所有参数,如下所示:

scala_version = "2.12"
delta_version = "1.1.0"
all_packages = ["com.johnsnowlabs.nlp:spark-nlp-spark32_2.12:3.4.2", 
   f"io.delta:delta-core_{scala_version}:{delta_version}"]

spark = SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", 
            "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.driver.memory","16G")\
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.kryoserializer.buffer.max", "2000M") \
    .config("spark.jars.packages", ",".join(all_packages)) \
    .getOrCreate()

相关问题