我在pyspark中创建了一个UDF来做一个活动,并声明如下。
my_udf = udf(lambda z:udf_method(z), StringType())
def udf_method(udf_param):
try:
print('In UDF Method')
if 'something':
print('SOMETHING')
return 'SOMETHING'
else:
print('NOTHING')
return 'NOTHING'
except Exception as e:
traceback.print_exc()
字符串
用spark注册了它,如:spark.udf.register('udf_method', udf_method)
我有一个嵌套框架df
,我通过调用一个UDF在该嵌套框架中创建一个新列,如下所示。
new_df = df.withColumn('udf_output', udf_method(col('some_column_from_my_dataframe')))
new_df.write.mode('overrite').format('ORC').saveAsTable('dbname.tablename')
型
当我运行代码时,它没有在日志中打印任何东西。所以我添加了如下日志记录,看看是否有任何区别。
class debugger(object):
def __init__(self, name):
self.name = name
def log(self):
logger = logging.getLogger(self.name)
logger.setLevel(logging.DEBUG)
log = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)40s - %(lineno)4d - %(levelname)s - %(message)s')
log.setFormatter(formatter)
logger.addHandler(log)
return logger
debug_obj = debugger(__name__)
logger = debug_obj.log()
型
只是在print()
logger.info('SOMETHING')
logger.info('NOTHING')
的位置添加了这些logger.info
语句
我仍然没有看到任何日志在我的驱动程序从UDF。我知道,UDF运行在一个不同的进程和JVM上。谁能告诉我,我如何发送log.INFO/ERROR
从我的UDF方法到我的驱动程序日志像往常一样。
1条答案
按热度按时间e4yzc0pl1#
日志最终会保存在实际处理任务的执行器的本地JVM中。
检查spark UI,选择executors并检查stdout & stderr以获取信息和错误日志。