我是Spark SQL DataFrames和ML的新手(PySpark)。我如何创建一个自定义的tokenizer,例如删除停止词并使用nltk中的一些库?我可以扩展默认的吗?
8hhllhi21#
我可以扩展默认的吗?不完全是。默认的Tokenizer是pyspark.ml.wrapper.JavaTransformer的子类,和pyspark.ml.feature的其他转换器和估计器一样,将实际的处理委托给Scala的对应物。既然你想使用Python,你应该直接扩展pyspark.ml.pipeline.Transformer。
Tokenizer
pyspark.ml.wrapper.JavaTransformer
pyspark.ml.feature
pyspark.ml.pipeline.Transformer
import nltkfrom pyspark import keyword_only ## < 2.0 -> pyspark.ml.util.keyword_onlyfrom pyspark.ml import Transformerfrom pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters# Available in PySpark >= 2.3.0 from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable from pyspark.sql.functions import udffrom pyspark.sql.types import ArrayType, StringTypeclass NLTKWordPunctTokenizer( Transformer, HasInputCol, HasOutputCol, # Credits https://stackoverflow.com/a/52467470 # by https://stackoverflow.com/users/234944/benjamin-manns DefaultParamsReadable, DefaultParamsWritable): stopwords = Param(Params._dummy(), "stopwords", "stopwords", typeConverter=TypeConverters.toListString) @keyword_only def __init__(self, inputCol=None, outputCol=None, stopwords=None): super(NLTKWordPunctTokenizer, self).__init__() self.stopwords = Param(self, "stopwords", "") self._setDefault(stopwords=[]) kwargs = self._input_kwargs self.setParams(**kwargs) @keyword_only def setParams(self, inputCol=None, outputCol=None, stopwords=None): kwargs = self._input_kwargs return self._set(**kwargs) def setStopwords(self, value): return self._set(stopwords=list(value)) def getStopwords(self): return self.getOrDefault(self.stopwords) # Required in Spark >= 3.0 def setInputCol(self, value): """ Sets the value of :py:attr:`inputCol`. """ return self._set(inputCol=value) # Required in Spark >= 3.0 def setOutputCol(self, value): """ Sets the value of :py:attr:`outputCol`. """ return self._set(outputCol=value) def _transform(self, dataset): stopwords = set(self.getStopwords()) def f(s): tokens = nltk.tokenize.wordpunct_tokenize(s) return [t for t in tokens if t.lower() not in stopwords] t = ArrayType(StringType()) out_col = self.getOutputCol() in_col = dataset[self.getInputCol()] return dataset.withColumn(out_col, udf(f, t)(in_col))
import nltk
from pyspark import keyword_only ## < 2.0 -> pyspark.ml.util.keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
# Available in PySpark >= 2.3.0
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
class NLTKWordPunctTokenizer(
Transformer, HasInputCol, HasOutputCol,
# Credits https://stackoverflow.com/a/52467470
# by https://stackoverflow.com/users/234944/benjamin-manns
DefaultParamsReadable, DefaultParamsWritable):
stopwords = Param(Params._dummy(), "stopwords", "stopwords",
typeConverter=TypeConverters.toListString)
@keyword_only
def __init__(self, inputCol=None, outputCol=None, stopwords=None):
super(NLTKWordPunctTokenizer, self).__init__()
self.stopwords = Param(self, "stopwords", "")
self._setDefault(stopwords=[])
kwargs = self._input_kwargs
self.setParams(**kwargs)
def setParams(self, inputCol=None, outputCol=None, stopwords=None):
return self._set(**kwargs)
def setStopwords(self, value):
return self._set(stopwords=list(value))
def getStopwords(self):
return self.getOrDefault(self.stopwords)
# Required in Spark >= 3.0
def setInputCol(self, value):
"""
Sets the value of :py:attr:`inputCol`.
return self._set(inputCol=value)
def setOutputCol(self, value):
Sets the value of :py:attr:`outputCol`.
return self._set(outputCol=value)
def _transform(self, dataset):
stopwords = set(self.getStopwords())
def f(s):
tokens = nltk.tokenize.wordpunct_tokenize(s)
return [t for t in tokens if t.lower() not in stopwords]
t = ArrayType(StringType())
out_col = self.getOutputCol()
in_col = dataset[self.getInputCol()]
return dataset.withColumn(out_col, udf(f, t)(in_col))
使用示例(数据来自ML -功能):
sentenceDataFrame = spark.createDataFrame([ (0, "Hi I heard about Spark"), (0, "I wish Java could use case classes"), (1, "Logistic regression models are neat")], ["label", "sentence"])tokenizer = NLTKWordPunctTokenizer( inputCol="sentence", outputCol="words", stopwords=nltk.corpus.stopwords.words('english'))tokenizer.transform(sentenceDataFrame).show()
sentenceDataFrame = spark.createDataFrame([
(0, "Hi I heard about Spark"),
(0, "I wish Java could use case classes"),
(1, "Logistic regression models are neat")
], ["label", "sentence"])
tokenizer = NLTKWordPunctTokenizer(
inputCol="sentence", outputCol="words",
stopwords=nltk.corpus.stopwords.words('english'))
tokenizer.transform(sentenceDataFrame).show()
有关自定义Python Estimator,请参见How to Roll a Custom Estimator in PySpark mllib此答案取决于内部API,并兼容Spark 2.0.3、2.1.1、2.2.0或更高版本(SPARK-19348)。有关与之前Spark版本兼容的代码,请参见revision 8。
Estimator
1条答案
按热度按时间8hhllhi21#
我可以扩展默认的吗?
不完全是。默认的
Tokenizer
是pyspark.ml.wrapper.JavaTransformer
的子类,和pyspark.ml.feature
的其他转换器和估计器一样,将实际的处理委托给Scala的对应物。既然你想使用Python,你应该直接扩展pyspark.ml.pipeline.Transformer
。使用示例(数据来自ML -功能):
有关自定义Python
Estimator
,请参见How to Roll a Custom Estimator in PySpark mllib此答案取决于内部API,并兼容Spark 2.0.3、2.1.1、2.2.0或更高版本(SPARK-19348)。有关与之前Spark版本兼容的代码,请参见revision 8。