如何使用pyspark执行多个k-means?

yqkkidmi  于 2024-01-06  发布在  Spark
关注(0)|答案(1)|浏览(176)

我对spark很陌生,对python也不是很熟悉,我用的是spark 2.2。
我有一个像下面这样的Hive表:

  1. +----------+-------+
  2. | category | point |
  3. +----------+-------+

字符串
我需要聚类每个类别的点,使用k-means算法,与pyspark。
我写的代码类似于:

  1. sparkSession = SparkSession.builder.enableHiveSupport().getOrCreate()
  2. def run():
  3. rawData = sparkSession.sql('select category, point from my_table')
  4. groupedData = rawData.groupBy("category").agg(collect_list("point").alias("point_list"))
  5. groupedData.rdd.map(lambda row: kmeans(row.point_list, row.category))
  6. def kmeans(points, category):
  7. kmeans = KMeans(k=10, initMode='k-means||')
  8. df = sparkSession.createDataFrame([(Vectors.dense(x),) for x in points], ["features"])
  9. model = kmeans.fit(df)
  10. predictions = model.transform(df)
  11. return (['kmeans: ' + str(row.prediction) for row in predictions.collect()], category)


它给了我一些错误,比如:

  1. pickle.PicklingError: Could not serialize object: Py4JError: An error occurred while calling o133.__getnewargs__. Trace:
  2. py4j.Py4JException: Method __getnewargs__([]) does not exist


我猜可能是因为运行我的kmeans方法的worker executors没有驱动程序创建的sparkSession,但是怎么解决呢?
感谢您的帮助!

2cmtqfgy

2cmtqfgy1#

你是对的。你得到这个错误是因为在executors中运行的方法不能调用kmeans。对于spark 2.2,你可以使用ml或mllib库来使用kmeans。你应该使用vector.dense作为你的功能集。下面是两个库使用kmeans的代码示例:
Spark MLlib聚类
Spark ml clustering

相关问题