如何使用pyspark执行多个k-means?

yqkkidmi  于 12个月前  发布在  Spark
关注(0)|答案(1)|浏览(132)

我对spark很陌生,对python也不是很熟悉,我用的是spark 2.2。
我有一个像下面这样的Hive表:

+----------+-------+
| category | point |
+----------+-------+

字符串
我需要聚类每个类别的点,使用k-means算法,与pyspark。
我写的代码类似于:

sparkSession = SparkSession.builder.enableHiveSupport().getOrCreate()

def run():
  rawData = sparkSession.sql('select category, point from my_table')
  groupedData = rawData.groupBy("category").agg(collect_list("point").alias("point_list"))
  groupedData.rdd.map(lambda row: kmeans(row.point_list, row.category))

def kmeans(points, category):
  kmeans = KMeans(k=10, initMode='k-means||')
  df = sparkSession.createDataFrame([(Vectors.dense(x),) for x in points], ["features"])
  model = kmeans.fit(df)
  predictions = model.transform(df)
  return (['kmeans: ' + str(row.prediction) for row in predictions.collect()], category)


它给了我一些错误,比如:

pickle.PicklingError: Could not serialize object: Py4JError: An error occurred while calling o133.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist


我猜可能是因为运行我的kmeans方法的worker executors没有驱动程序创建的sparkSession,但是怎么解决呢?
感谢您的帮助!

2cmtqfgy

2cmtqfgy1#

你是对的。你得到这个错误是因为在executors中运行的方法不能调用kmeans。对于spark 2.2,你可以使用ml或mllib库来使用kmeans。你应该使用vector.dense作为你的功能集。下面是两个库使用kmeans的代码示例:
Spark MLlib聚类
Spark ml clustering

相关问题