如何在Pyspark中从UDF打印或记录?

fcipmucu  于 11个月前  发布在  Spark
关注(0)|答案(1)|浏览(116)

我在pyspark中创建了一个UDF来做一个活动,并声明如下。

my_udf = udf(lambda z:udf_method(z), StringType())

def udf_method(udf_param):
  try:
    print('In UDF Method')
    if 'something':
      print('SOMETHING')
      return 'SOMETHING'
    else:
      print('NOTHING')
      return 'NOTHING'
  except Exception as e:
    traceback.print_exc()

字符串
用spark注册了它,如:spark.udf.register('udf_method', udf_method)
我有一个嵌套框架df,我通过调用一个UDF在该嵌套框架中创建一个新列,如下所示。

new_df = df.withColumn('udf_output', udf_method(col('some_column_from_my_dataframe')))
new_df.write.mode('overrite').format('ORC').saveAsTable('dbname.tablename')


当我运行代码时,它没有在日志中打印任何东西。所以我添加了如下日志记录,看看是否有任何区别。

class debugger(object):
    def __init__(self, name):
        self.name = name

    def log(self):
        logger = logging.getLogger(self.name)
        logger.setLevel(logging.DEBUG)
        log = logging.StreamHandler()
        formatter = logging.Formatter('%(asctime)s - %(name)40s - %(lineno)4d - %(levelname)s - %(message)s')
        log.setFormatter(formatter)
        logger.addHandler(log)
        return logger

debug_obj = debugger(__name__)
logger = debug_obj.log()


只是在print()logger.info('SOMETHING')logger.info('NOTHING')的位置添加了这些logger.info语句
我仍然没有看到任何日志在我的驱动程序从UDF。我知道,UDF运行在一个不同的进程和JVM上。谁能告诉我,我如何发送log.INFO/ERROR从我的UDF方法到我的驱动程序日志像往常一样。

e4yzc0pl

e4yzc0pl1#

日志最终会保存在实际处理任务的执行器的本地JVM中。
检查spark UI,选择executors并检查stdout & stderr以获取信息和错误日志。

相关问题