我得到了一个与我写的Spark自定义项酸洗错误。它对Dataframe的每一行应用spark管道并返回类(它是一个布尔值,true或false)。
以下管道正在处理添加到列表中的数据。我已附上下面的输出。
documentAssembler = DocumentAssembler()\
.setInputCol("text")\
.setOutputCol("sentence")
tokenizer = Tokenizer()\
.setInputCols(["sentence"])\
.setOutputCol("token")
bert_embeddings = BertEmbeddings.pretrained("biobert_pubmed_base_cased")\
.setInputCols(["sentence", "token"])\
.setOutputCol("embeddings")\
.setMaxSentenceLength(512)
embeddingsSentence = SentenceEmbeddings() \
.setInputCols(["sentence", "embeddings"]) \
.setOutputCol("sentence_embeddings") \
.setPoolingStrategy("AVERAGE")\
.setStorageRef('biobert_pubmed_base_cased')
classsifierdl = ClassifierDLModel.pretrained("classifierdl_ade_biobert", "en", "clinical/models")\
.setInputCols(["sentence", "sentence_embeddings"]) \
.setOutputCol("class")
ade_clf_pipeline = Pipeline(
stages=[documentAssembler,
tokenizer,
bert_embeddings,
embeddingsSentence,
classsifierdl])
empty_data = spark.createDataFrame([[""]]).toDF("text")
ade_clf_model = ade_clf_pipeline.fit(empty_data)
ade_lp_pipeline = LightPipeline(ade_clf_model)
texts = ["I feel a bit drowsy & have a little blurred vision, after taking a pill.",
"I've been on Arthrotec 50 for over 10 years on and off, only taking it when I needed it.",
"Due to my arthritis getting progressively worse, to the point where I am in tears with the agony, gp's started me on 75 twice a day and I have to take it every day for the next month to see how I get on, here goes.",
"So far its been very good, pains almost gone, but I feel a bit weird, didn't have that when on 50."]
for text in texts:
result = ade_lp_pipeline.annotate(text)
print (result['class'][0])
这是结果
True
False
True
False
加载数据
import pyspark.sql.functions as F
! wget -q https://raw.githubusercontent.com/JohnSnowLabs/spark-nlp-workshop/master/tutorials/Certification_Trainings/Healthcare/data/sample_ADE_dataset.csv
ade_DF = spark.read\
.option("header", "true")\
.csv("./sample_ADE_dataset.csv")\
.filter(F.col("label").isin(['True','False']))
ade_DF.show(truncate=100)
定义spark自定义项
import pyspark.sql.functions as f
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
def get_ade_lp_class(text):
ade_class = ade_lp_pipeline.annotate(text)['class']
return ade_class
udf_get_ade_lp_class = udf(lambda x: get_ade_lp_class(x), StringType())
ade_DF.select('text',
udf_get_ade_lp_class('text')).show()
这是错误
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-packages/pyspark/serializers.py", line 590, in dumps
return cloudpickle.dumps(obj, 2)
File "/usr/local/lib/python3.6/dist-packages/pyspark/cloudpickle.py", line 863, in dumps
cp.dump(obj)
File "/usr/local/lib/python3.6/dist-packages/pyspark/cloudpickle.py", line 260, in dump
return Pickler.dump(self, obj)
File "/usr/lib/python3.6/pickle.py", line 409, in dump
self.save(obj)
File "/usr/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.6/pickle.py", line 736, in save_tuple
save(element)
File "/usr/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/lib/python3.6/dist-packages/pyspark/cloudpickle.py", line 400, in save_function
self.save_function_tuple(obj)
File "/usr/local/lib/python3.6/dist-packages/pyspark/cloudpickle.py", line 549, in save_function_tuple
save(state)
File "/usr/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
save(v)
File "/usr/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.6/pickle.py", line 852, in _batch_setitems
save(v)
File "/usr/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/lib/python3.6/dist-packages/pyspark/cloudpickle.py", line 400, in save_function
self.save_function_tuple(obj)
File "/usr/local/lib/python3.6/dist-packages/pyspark/cloudpickle.py", line 549, in save_function_tuple
save(state)
File "/usr/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
save(v)
File "/usr/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.6/pickle.py", line 852, in _batch_setitems
save(v)
File "/usr/lib/python3.6/pickle.py", line 521, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib/python3.6/pickle.py", line 634, in save_reduce
save(state)
File "/usr/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
save(v)
File "/usr/lib/python3.6/pickle.py", line 521, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib/python3.6/pickle.py", line 634, in save_reduce
save(state)
File "/usr/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
save(v)
File "/usr/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.6/pickle.py", line 781, in save_list
self._batch_appends(obj)
File "/usr/lib/python3.6/pickle.py", line 805, in _batch_appends
save(x)
File "/usr/lib/python3.6/pickle.py", line 521, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib/python3.6/pickle.py", line 634, in save_reduce
save(state)
File "/usr/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
save(v)
File "/usr/lib/python3.6/pickle.py", line 496, in save
rv = reduce(self.proto)
File "/usr/local/lib/python3.6/dist-packages/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/local/lib/python3.6/dist-packages/pyspark/sql/utils.py", line 63, in deco
return f(*a,**kw)
File "/usr/local/lib/python3.6/dist-packages/py4j/protocol.py", line 332, in get_return_value
format(target_id, ".", name, value))
py4j.protocol.Py4JError: An error occurred while calling o66.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:274)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
---------------------------------------------------------------------------
Py4JError Traceback (most recent call last)
/usr/local/lib/python3.6/dist-packages/pyspark/serializers.py in dumps(self, obj)
589 try:
--> 590 return cloudpickle.dumps(obj, 2)
591 except pickle.PickleError:
52 frames
Py4JError: An error occurred while calling o66.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:274)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
During handling of the above exception, another exception occurred:
PicklingError Traceback (most recent call last)
/usr/local/lib/python3.6/dist-packages/pyspark/serializers.py in dumps(self, obj)
598 msg = "Could not serialize object: %s: %s" % (e.__class__.__name__, emsg)
599 cloudpickle.print_exec(sys.stderr)
--> 600 raise pickle.PicklingError(msg)
601
602
PicklingError: Could not serialize object: Py4JError: An error occurred while calling o66.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:274)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
1条答案
按热度按时间ibrsph3r1#
我对spark nlp没有经验,但请尝试以下方法: