我有一个spark rdd对象(使用pyspark),我正在尝试获得sql的等价物
SELECT MY_FIELD COUNT(*) GROUP BY MY_FIELD
所以我尝试了以下代码:
my_groupby_count = myRDD.map(lambda x: x.type).reduceByKey(lambda x, y: x + y).collect()
# 'type' is the name of the field inside the RDD row
但我得到了一个错误,我不知道如何处理:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-24-23b4c60c6fd6> in <module>()
----> 1 my_groupby_count = myRDD.map(lambda x: x.type).reduceByKey(lambda x, y: x + y).collect()
/root/spark/python/pyspark/rdd.py in collect(self)
with SCCallSiteSync(self.context) as css:
--> port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
return list(_load_from_socket(port, self._jrdd_deserializer))
/root/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
answer = self.gateway_client.send_command(command)
return_value = get_return_value(
-> answer, self.gateway_client, self.target_id, self.name)
现在,由于这个方法以前对我很有效,我怀疑它可能与数据本身有关。例如,我知道x.type中的一些值是none,但我不知道如何除去它们。
你知道如何继续调查吗?p、 s.todf()也会失败,我想可能是因为同样的原因。另外,我还介绍了rdd的解决方案,而不是dataframe。谢谢
1条答案
按热度按时间mwg9r5ms1#
您需要提供tuple reducebykey。你好像忘了“()”
旁注:使用countbykey()的同一代码的较短版本