pyspark 将自定义日志记录保存在数据块中,但不带tye,块除外

vd8tlhqk  于 2023-08-03  发布在  Spark
关注(0)|答案(1)|浏览(66)

我正在尝试用数据块写一个日志系统,用于我们需要运行的几个作业。目前,我正在设置一个日志记录器,并将文件记录到内存中-> log_stream = io.StringIO()
所有函数都包含在try中,除了在记录器中捕获信息或异常并记录它们的块。但它也被用来确定notebook的最后一个块将运行。这是必需的,因为它包含将内存中的文件上载到blob存储的代码。
然而,我觉得这个方法非常“丑陋”,因为除了块之外,每个代码都需要在try中覆盖。
他们的任何方法都总是运行笔记本的最后一个块,即使代码的一部分完全失败/错误。或者是否有其他方法来确保日志文件在发生任何错误时直接上传?
当前代码:
--登录--

log_stream = io.StringIO()

logger = logging.getLogger(database_name_bron)
logger.setLevel(logging.DEBUG)

formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

handler = logging.StreamHandler(log_stream)
handler.setLevel(logging.DEBUG)
handler.setFormatter(formatter)
if logger.hasHandlers():
    logger.handlers.clear()
logger.addHandler(handler)

字符串
--代码块示例--

try:
    table = output_dict['*'].select( \
        col('1*').alias('1*'), \
        col('2*').alias('2*'), \
        col('3*').alias('3*'), \
        col('4*').alias('4*'), \
        col('5*').alias('5*'), \
            )

    #join tabellen
    table2= table2.join(table1, table2.5* == table1.4*, 'left')
    logger.info('left join van table 1en table2')
except Exception as e:
    logger.exception(f"Een error heeft plaatsgevonden tijdens het joinen van table1 en table2: {e}")


-- upload block --

#extraheer log data
log_content = log_stream.getvalue()

#upload data naar de blob storage
dbutils.fs.put(f"abfss://{container_name}@{storage_account}.dfs.core.windows.net/{p_container_name}", log_content, overwrite=True)

#netjes afsluiten van de handler
logger.removeHandler(handler)
handler.close()

eblbsuwk

eblbsuwk1#

通过创建自定义日志上下文,可以在自定义日志上下文中运行代码。
下面是代码。

import logging
import io

class LoggingContext:
    def  __init__(self, logger, storage_account, container_name, p_container_name):
        self.logger = logger
        self.storage_account = storage_account
        self.container_name = container_name
        self.p_container_name = p_container_name
        self.log_stream = io.StringIO()
        
    def __enter__(self):
        handler = logging.StreamHandler(self.log_stream)
        handler.setLevel(logging.DEBUG)
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)
        return  self.logger
        
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.logger.removeHandler(self.logger.handlers[0])
        handler = self.logger.handlers[0]
        handler.close()
        if exc_type is  not  None:
            self.logger.exception(f"An error occurred: {exc_val}")
        log_content = self.log_stream.getvalue()
        dbutils.fs.put(f"abfss://{self.container_name}@{self.storage_account}.dfs.core.windows.net/{self.p_container_name}",log_content, overwrite=True)

字符串
在这里,__init__初始化上下文中所有必需的变量。__enter__配置日志设置,如formatlevello_streamhandler。和__exit__处理关闭处理程序,执行日志内容上传操作到存储。

logger = logging.getLogger("database_name_bron")
logger.setLevel(logging.DEBUG)
storage_account = 'jgsadls'
container_name = 'data'
p_container_name = 'databricks_log'


在此处添加您所需的信息。
上下文代码

with LoggingContext(logger, storage_account, container_name, p_container_name):
    logger.info('Log started')
    logger.info("Table joined.Check logs for more info....")
    y = 1/0
    logger.info("Log ended")


在这里,将您的代码块添加到LoggingContext中,无论您在哪里运行上面的代码。
x1c 0d1x的数据
输出:



每当你运行代码块时,只要在Logcontext中运行它,而不是try/except。

相关问题