pyspark,错误为self.\u sock.recv\u into(b)socket.timeout:超时

62lalag4  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(605)

目标是使用自定义项对行进行分类。我在windows上使用pyspark。
使用简单的函数或操作(如filter)似乎是可行的。
任何关于如何解决超时/套接字故障的指导都会很有帮助(请参阅下面的错误)。
数据中没有空值。

  1. from pyspark.sql.functions import udf
  2. from pyspark.sql.types import IntegerType,StringType
  3. def BreakDown(arr_value):
  4. start_year = arr_value[0]
  5. start_month = arr_value[1]
  6. end_year = arr_value[2]
  7. end_month = arr_value[3]
  8. curr_year = arr_value[4]
  9. curr_month = arr_value[5]
  10. if (curr_year == start_year) & (curr_month >= start_month) : return 1
  11. elif (curr_year == end_year) & (curr_month <= end_month) : return 1
  12. elif (curr_year > start_year) & (curr_year < end_year) : return 1
  13. else: return 0
  14. udfBreakDown = udf(BreakDown, IntegerType())
  15. temp = temp.withColumn('include', udfBreakDown(F.struct('start_year','start_month','end_year','end_month','curr_year','curr_month')))

pythonexception:python工作进程引发了一个异常。请看下面的堆栈跟踪。回溯(最近一次呼叫):
文件“e:\spark\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py”,第585行,主文件“e:\spark\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark\serializers.py”,第593行,read\int length=stream.read(4)文件“c:\programdata\anaconda3\lib\socket.py”,第669行,in readinto return self.\u sock.recv\u into(b)socket.timeout:超时

l5tcr1uw

l5tcr1uw1#

当您可以使用spark内置函数时,请始终避免使用udf。你可以用 when 功能如下:

  1. from pyspark.sql import functions as F
  2. def get_include_col():
  3. c = F.when((F.col("curr_year") == F.col("start_year")) & (F.col("curr_month") >= F.col("start_month")), F.lit(1)) \
  4. .when((F.col("curr_year") == F.col("end_year")) & (F.col("curr_month") <= F.col("end_month")), F.lit(1)) \
  5. .when((F.col("curr_year") > F.col("start_year")) & (F.col("curr_year") < F.col("end_year")), F.lit(1)) \
  6. .otherwise(F.lit(0))
  7. return c
  8. temp = temp.withColumn('include', get_include_col())

你也可以使用 functools.reduce 动态生成when表达式而不必对所有表达式进行磁带化。例如:

  1. import functools
  2. from pyspark.sql import functions as F
  3. cases = [
  4. ("curr_year = start_year and curr_month >= start_month", 1),
  5. ("curr_year = end_year and curr_month <= end_month", 1),
  6. ("curr_year > start_year and curr_year < end_year", 1)
  7. ]
  8. include_col = functools.reduce(
  9. lambda acc, x: acc.when(F.expr(x[0]), F.lit(x[1])),
  10. cases,
  11. F
  12. ).otherwise(F.lit(0))
  13. temp = temp.withColumn('include', include_col)
展开查看全部

相关问题