我正在通过循环块大小为10000行将数据从S3加载到Oracle中,但在循环一些行后会导致内存问题:
错误:Task exited with return code Negsignal.SIGKILL
def gen_chunks(self,reader, chunksize=10000):
chunk = []
for i, line in enumerate(reader):
if (i % chunksize == 0 and i > 0):
yield list(map(tuple, chunk))
chunk = []
chunk.append(line)
yield list(map(tuple, chunk))
def execute(self, context):
self.s3_key = self.get_s3_key(self.s3_key)
s3_hook = S3Hook()
s3_client = s3_hook.get_conn()
snflk_hook = SnowflakeHook(snowflake_conn_id=self.snowflake_conn_id)
orcl = OracleHook(oracle_conn_id=self.oracle_conn_id)
try:
with tempfile.NamedTemporaryFile() as f_source:
s3_client.download_file(self.s3_bucket, self.s3_key, f_source.name)
logger.info('Source file downloaded succesfully')
with open(f_source.name, 'r') as f:
csv_reader = csv.reader(f, delimiter='|')
for chunk in self.gen_chunks(csv_reader):
orcl.bulk_insert_rows(table=self.oracle_table,rows=chunk, target_fields=self.target_fields, commit_every=10000)
except Exception as ex:
raise AirflowException(f'Error: {ex}')
1条答案
按热度按时间yzuktlbb1#
通过将
commit_every
参数减小为3000
来修复此问题。此答案由CC BY-SA 4.0下的OP Kar作为问题Airflow:OracleHook:bulk_insert_rows:任务退出并返回代码Negsignal.SIGKILL的edit发布。