我对spark很陌生,对python也不是很熟悉,我用的是spark 2.2。
我有一个像下面这样的Hive表:
+----------+-------+
| category | point |
+----------+-------+
字符串
我需要聚类每个类别的点,使用k-means算法,与pyspark。
我写的代码类似于:
sparkSession = SparkSession.builder.enableHiveSupport().getOrCreate()
def run():
rawData = sparkSession.sql('select category, point from my_table')
groupedData = rawData.groupBy("category").agg(collect_list("point").alias("point_list"))
groupedData.rdd.map(lambda row: kmeans(row.point_list, row.category))
def kmeans(points, category):
kmeans = KMeans(k=10, initMode='k-means||')
df = sparkSession.createDataFrame([(Vectors.dense(x),) for x in points], ["features"])
model = kmeans.fit(df)
predictions = model.transform(df)
return (['kmeans: ' + str(row.prediction) for row in predictions.collect()], category)
型
它给了我一些错误,比如:
pickle.PicklingError: Could not serialize object: Py4JError: An error occurred while calling o133.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
型
我猜可能是因为运行我的kmeans
方法的worker executors没有驱动程序创建的sparkSession
,但是怎么解决呢?
感谢您的帮助!
1条答案
按热度按时间2cmtqfgy1#
你是对的。你得到这个错误是因为在executors中运行的方法不能调用kmeans。对于spark 2.2,你可以使用ml或mllib库来使用kmeans。你应该使用vector.dense作为你的功能集。下面是两个库使用kmeans的代码示例:
Spark MLlib聚类
Spark ml clustering