cassandra的execute-concurrent没有正常工作

tgabmvqs  于 2021-06-14  发布在  Cassandra
关注(0)|答案(1)|浏览(345)

我编写了一个python脚本,它读取csv并使用异步和并发将它们插入cassandra表中,但是并发比异步慢。我使用concurrent的目的是实现并行写入,从而加快在cassandra中索引csv文件的任务。
使用异步的代码:

for df in chunks:
            futures = []
            df = df.to_dict(orient='records')
            chunk_counter += 1
            for row in df:
                key = str(row["0"])
                row = json.dumps(row, default=str)
                futures.append(
                    self.session.execute_async(
                        insert_sql, [key, "version_1", row]
                    )
                )
                # batch.add(insert_sql, (key, "version_1", row))
            # self.session.execute(batch)
            for future in futures:
                self.log.debug(future)
                continue

使用concurrents的代码:

for df in chunks:
            futures = []
            df = df.to_dict(orient='records')
            chunk_counter += 1
            for row in df:
                key = str(row["0"])
                row = json.dumps(row, default=str)
                params = (key, row, )
                futures.append(
                    (
                        insert_sql,
                        params
                    )
                )
            results = execute_concurrent(
                self.session, futures, raise_on_first_error=False)
            for (success, result) in results:
                if not success:
                    self.handle_error(result)  # result will be an Exception
atmip9wb

atmip9wb1#

你没有设定 concurrency 的参数 execute_concurrent ,默认情况下使用100。
根据文件:
concurrency参数控制将并发执行的语句数。什么时候 Cluster.protocol_version 如果设置为1或2,建议将其保持在每个主机的核心连接数乘以已连接主机数的100倍以下(请参阅 Cluster.set_core_connections_per_host() ). 如果超过该数量,事件循环线程可能会尝试阻止创建新连接,从而严重影响吞吐量。如果protocol\u version为3或更高版本,则可以安全地尝试更高级别的并发性。

相关问题