spark nlp函数在使用map时给出了酸洗错误

sxissh06  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(330)

我的rdd结构如下:

my_rdd = [Row(text='Hello World. This is bad.'), Row(text='This is good.'), ...]

我可以用python函数执行并行处理:

rdd2=my_rdd.map(lambda f: f.text.split()) 
for x in rdd2.collect():
  print(x)

它给了我预期的结果。
但是,当我尝试使用spark nlp语句生成器或情感分析器时,出现了一个错误:picklingerror:could not serialize object:typeerror:can't pickle\u thread.rlock objects
在这行中:对于rdd2.collect()中的x:
代码如下:

documenter = DocumentAssembler()\
    .setInputCol("text")\
    .setOutputCol("document")

sentencerDL = SentenceDetectorDLModel\
  .pretrained("sentence_detector_dl", "en") \
  .setInputCols(["document"]) \
  .setOutputCol("sentences")

sd_pipeline = PipelineModel(stages=[documenter, sentencerDL]) 
sd_model = LightPipeline(sd_pipeline)
pipeline = PretrainedPipeline('analyze_sentiment', 'en')

如果我尝试:

rdd2=my_rdd.map(lambda f: pipeline.annotate(f.text))

rdd2=my_rdd.map(lambda f: sd_model.fullAnnotate(f.text)[0]["sentences"].split()[0])

出现错误。当我在没有“Map”的情况下运行它们时,它们按预期运行。
有没有人知道如何并行执行spark nlp破句器或情绪分析器?我做错了什么?
谢谢大家!

blpfk2vs

blpfk2vs1#

当您在Dataframe上应用sparkml管道,该Dataframe的数据分布在不同的分区上时,默认情况下您将获得并行执行。spark nlp管道也是如此(这也是spark ml管道)。所以你可以这样做,
pipeline.transform(Dataframe)
并创建“Dataframe”,使数据分布在不同的节点上。这里有一个很好的教程,
https://sparkbyexamples.com/pyspark/pyspark-create-dataframe-from-list/
此外,对于在使用spark nlp转换后MapDataframe的内容,您可以使用sparknlp.functions下的函数,例如map\u annotations\u col,它将允许您Map包含spark nlp注解的Dataframe中特定列的内容。顺便说一句,这个,
rdd2=my_rdd.map(lambda f:pipeline.annotate(f.text))
如果您不应该这样做,则会出现异常,因为spark正在尝试序列化整个管道并将其发送到集群节点。这不是它应该采用的工作方式,您将数据传递给管道,并让管道选择要分发给集群的内容。

相关问题