python—将表加载到具有限制的pysparkDataframe中

h5qlskok  于 2021-06-14  发布在  Cassandra
关注(0)|答案(1)|浏览(404)

在pyspark中,从数据库读取Dataframe时,是否可以将一定数量的数据加载到Dataframe中?我的意思是,如果能给 sqlContext 从数据库中读取时,这样就不必读取整个表(因为遍历750k行非常昂贵)。
下面是我目前用来过滤所需数据的代码。除了pyspark之外,我还使用了python3.7和cassandra db:

def connect_cassandra():
    spark = SparkSession.builder \
      .appName('SparkCassandraApp') \
      .config('spark.cassandra.connection.host', 'localhost') \
      .config("spark.driver.memory","15g") \
      .config("spark.executor.memory","15g") \
      .config("spark.driver.cores","4") \
      .config("spark.num.executors","6") \
      .config("spark.executor.cores","4") \
      .config('spark.cassandra.connection.port', '9042') \
      .config('spark.cassandra.output.consistency.level','ONE') \
      .master('local[*]') \
      .getOrCreate()

    sqlContext = SQLContext(spark)
    return sqlContext

def total_bandwidth(start_date, end_date):
    sqlContext = connect_cassandra()

    try:
        df = sqlContext \
          .read \
          .format("org.apache.spark.sql.cassandra") \
          .options(table="user_info", keyspace="acrs") \
          .load()
    except Exception as e:
        print(e)

    rows = df.where(df["created"] > str(start_date)) \
            .where(df["created"] < str(end_date)) \
            .groupBy(['src_ip', 'dst_ip']) \
            .agg(_sum('data').alias('total')) \
            .collect()

    data_dict = []
    for row in rows:
        src_ip = row['src_ip']
        dst_ip = row['dst_ip']
        data = row['total']
        data = {'src_ip' : src_ip, 'dst_ip' : dst_ip, 'data' : data}
        data_dict.append(data)

    print(data_dict)

如你们所见,我正试图用 start_date 以及 end_date . 但这需要太多的时间,导致行动缓慢。我想知道在将表加载到dataframe时是否有可用的dataframereader选项,以便减少所花费的时间(指数优先:p)。
我阅读了Dataframe阅读器文档,发现 option(String key, String value) 但是这些选项没有文档记录,所以不可能找出cassandra数据库有哪些选项以及如何使用它们。

6yoyoihd

6yoyoihd1#

您的主要问题是使用append方法。因为Dataframe中有大量的行,所以效率很低。我宁愿使用专门的pyspark方法来达到预期的效果。
我在本地机器上创建了一些带有一百万行的临时Dataframe(我假设您已经创建了sparksession)

>>> import pandas as pd

>>> n = 1000000
>>> df = spark.createDataFrame(
        pd.DataFrame({
            'src_ip': n * ['192.160.1.0'],
            'dst_ip': n * ['192.168.1.1'],
            'total': n * [1]
        })
    )
>>> df.count()
1000000

我们只从表中选择所需的列。

>>> import pyspark.sql.functions as F
>>> df.select('src_ip', 'dst_ip', F.col('total').alias('data')).show(5)
+-----------+-----------+----+
|     src_ip|     dst_ip|data|
+-----------+-----------+----+
|192.160.1.0|192.168.1.1|   1|
|192.160.1.0|192.168.1.1|   1|
|192.160.1.0|192.168.1.1|   1|
|192.160.1.0|192.168.1.1|   1|
|192.160.1.0|192.168.1.1|   1|
+-----------+-----------+----+
only showing top 5 rows

最后,让我们创建所需的数据字典列表。收集所有数据的最简单方法是使用列表理解。一旦我们选择了要合并到字典中的列,我们就可以使用 toDict() 方法。
吹毛求疵:
如果要收集所有值,请使用 collect() 方法。
如果不知道Dataframe的确切大小,可以使用 take(n) 返回的方法 n Dataframe中的元素。

>>> dict_list = [i.asDict() for i in df.select('src_ip', 'dst_ip', F.col('total').alias('data')).take(5)]
>>> dict_list
[{'data': 1, 'dst_ip': '192.168.1.1', 'src_ip': '192.160.1.0'},
 {'data': 1, 'dst_ip': '192.168.1.1', 'src_ip': '192.160.1.0'},
 {'data': 1, 'dst_ip': '192.168.1.1', 'src_ip': '192.160.1.0'},
 {'data': 1, 'dst_ip': '192.168.1.1', 'src_ip': '192.160.1.0'},
 {'data': 1, 'dst_ip': '192.168.1.1', 'src_ip': '192.160.1.0'}]

相关问题