pyspark错误:attributeerror:'nonetype'对象没有属性'\u jvm'

hyrbngr7  于 2021-05-27  发布在  Spark
关注(0)|答案(5)|浏览(1708)

我有时间戳数据集,格式是
我在pyspark中编写了一个udf来处理这个数据集,并作为键值的Map返回。但我得到下面的错误信息。
dataset:df_ts_list

+--------------------+
|             ts_list|
+--------------------+
|[1477411200, 1477...|
|[1477238400, 1477...|
|[1477022400, 1477...|
|[1477224000, 1477...|
|[1477256400, 1477...|
|[1477346400, 1476...|
|[1476986400, 1477...|
|[1477321200, 1477...|
|[1477306800, 1477...|
|[1477062000, 1477...|
|[1477249200, 1477...|
|[1477040400, 1477...|
|[1477090800, 1477...|
+--------------------+

Pypark自定义项:

>>> def on_time(ts_list):
...     import sys
...     import os
...     sys.path.append('/usr/lib/python2.7/dist-packages')
...     os.system("sudo apt-get install python-numpy -y")
...     import numpy as np
...     import datetime
...     import time
...     from datetime import timedelta
...     ts = np.array(ts_list)
...     if ts.size == 0:
...             count = 0
...             duration = 0
...             st = time.mktime(datetime.now())
...             ymd = str(datetime.fromtimestamp(st).date())
...     else:
...             ts.sort()
...             one_tag = []
...             start = float(ts[0])
...             for i in range(len(ts)):
...                     if i == (len(ts)) - 1:
...                             end = float(ts[i])
...                             a_round = [start, end]
...                             one_tag.append(a_round)
...                     else:
...                             diff = (datetime.datetime.fromtimestamp(float(ts[i+1])) - datetime.datetime.fromtimestamp(float(ts[i])))
...                             if abs(diff.total_seconds()) > 3600:
...                                     end = float(ts[i])
...                                     a_round = [start, end]
...                                     one_tag.append(a_round)
...                                     start = float(ts[i+1])
...             one_tag = [u for u in one_tag if u[1] - u[0] > 300]
...             count = int(len(one_tag))
...             duration = int(np.diff(one_tag).sum())
...             ymd = str(datetime.datetime.fromtimestamp(time.time()).date())
...     return {'count':count,'duration':duration, 'ymd':ymd}

Pypark代码:

>>> on_time=udf(on_time, MapType(StringType(),StringType()))
>>> df_ts_list.withColumn("one_tag", on_time("ts_list")).select("one_tag").show()

错误:

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/worker.py", line 172, in main
    process()
  File "/usr/lib/spark/python/pyspark/worker.py", line 167, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/pyspark/worker.py", line 106, in <lambda>
    func = lambda _, it: map(mapper, it)
  File "/usr/lib/spark/python/pyspark/worker.py", line 92, in <lambda>
    mapper = lambda a: udf(*a)
  File "/usr/lib/spark/python/pyspark/worker.py", line 70, in <lambda>
    return lambda *a: f(*a)
  File "<stdin>", line 27, in on_time
  File "/usr/lib/spark/python/pyspark/sql/functions.py", line 39, in _
    jc = getattr(sc._jvm.functions, name)(col._jc if isinstance(col, Column) else col)
AttributeError: 'NoneType' object has no attribute '_jvm'

任何帮助都将不胜感激!

wydwbb8l

wydwbb8l1#

马吕斯的回答对我没什么帮助。所以,如果你喜欢我发现这个,因为这是谷歌上唯一的结果,而且你是pyspark的新手(一般来说也是spark),下面是对我有用的。
在我的例子中,我得到这个错误是因为我试图在pyspark环境建立之前执行pyspark代码。
确保pyspark可用并在执行依赖于的调用之前进行设置 pyspark.sql.functions 为我解决了这个问题。

30byixjq

30byixjq2#

错误消息说,在udf的第27行中,您正在调用一些PySparkSQL函数。它符合 abs() 所以我想你上面的某个地方 from pyspark.sql.functions import * 它覆盖了python的 abs() 功能。

ahy6op9u

ahy6op9u3#

udf 无法处理 None 价值观。例如,以下代码会导致相同的异常:

get_datetime = udf(lambda ts: to_timestamp(ts), DateType())
df = df.withColumn("datetime", get_datetime("ts"))

然而,这一点并不是:

get_datetime = udf(lambda ts: to_timestamp(ts) if ts is not None else None, DateType())
df = df.withColumn("datetime", get_datetime("ts"))
2vuwiymt

2vuwiymt4#

确保您正在初始化spark上下文。例如:

spark = SparkSession \
    .builder \
    .appName("myApp") \
    .config("...") \
    .getOrCreate()
sqlContext = SQLContext(spark)
productData = sqlContext.read.format("com.mongodb.spark.sql").load()

或者像在

spark = SparkSession.builder.appName('company').getOrCreate()
sqlContext = SQLContext(spark)
productData = sqlContext.read.format("csv").option("delimiter", ",") \
    .option("quote", "\"").option("escape", "\"") \
    .option("header", "true").option("inferSchema", "true") \
    .load("/path/thecsv.csv")
t3irkdon

t3irkdon5#

只是想澄清一点,很多人所面临的问题是源于一种糟糕的编程风格。就是这样 from blah import * 当你们这样做的时候

from pyspark.sql.functions import *

您覆盖了许多python内置函数。我强烈建议导入函数

import pyspark.sql.functions as f

# or

import pyspark.sql.functions as pyf

相关问题