python-3.x 气流:OracleHook:bulk_insert_rows:任务已退出,返回代码为Negsignal.SIGKILL

798qvoo8  于 2022-12-20  发布在  Python
关注(0)|答案(1)|浏览(163)

我正在通过循环块大小为10000行将数据从S3加载到Oracle中,但在循环一些行后会导致内存问题:
错误:Task exited with return code Negsignal.SIGKILL

def gen_chunks(self,reader, chunksize=10000):
        chunk = []
        for i, line in enumerate(reader):
            if (i % chunksize == 0 and i > 0):
                yield list(map(tuple, chunk))
                chunk = []
            chunk.append(line)
        yield list(map(tuple, chunk))

    def execute(self, context):
        self.s3_key = self.get_s3_key(self.s3_key)
        s3_hook = S3Hook()
        s3_client = s3_hook.get_conn()
        snflk_hook = SnowflakeHook(snowflake_conn_id=self.snowflake_conn_id)
        orcl = OracleHook(oracle_conn_id=self.oracle_conn_id)
        try:
            with tempfile.NamedTemporaryFile() as f_source:
                s3_client.download_file(self.s3_bucket, self.s3_key, f_source.name)
                logger.info('Source file downloaded succesfully')
                with open(f_source.name, 'r') as f:
                    csv_reader = csv.reader(f, delimiter='|')
                    for chunk in self.gen_chunks(csv_reader):
                        orcl.bulk_insert_rows(table=self.oracle_table,rows=chunk, target_fields=self.target_fields,  commit_every=10000)
        except Exception as ex:
            raise AirflowException(f'Error: {ex}')
yzuktlbb

yzuktlbb1#

通过将commit_every参数减小为3000来修复此问题。

for chunk in self.gen_chunks(csv_reader):
    orcl.bulk_insert_rows(table=self.oracle_table,rows=chunk, target_fields=self.target_fields, commit_every=3000)

此答案由CC BY-SA 4.0下的OP Kar作为问题Airflow:OracleHook:bulk_insert_rows:任务退出并返回代码Negsignal.SIGKILL的edit发布。

相关问题