在pyspark中,从数据库读取Dataframe时,是否可以将一定数量的数据加载到Dataframe中?我的意思是,如果能给 sqlContext
从数据库中读取时,这样就不必读取整个表(因为遍历750k行非常昂贵)。
下面是我目前用来过滤所需数据的代码。除了pyspark之外,我还使用了python3.7和cassandra db:
def connect_cassandra():
spark = SparkSession.builder \
.appName('SparkCassandraApp') \
.config('spark.cassandra.connection.host', 'localhost') \
.config("spark.driver.memory","15g") \
.config("spark.executor.memory","15g") \
.config("spark.driver.cores","4") \
.config("spark.num.executors","6") \
.config("spark.executor.cores","4") \
.config('spark.cassandra.connection.port', '9042') \
.config('spark.cassandra.output.consistency.level','ONE') \
.master('local[*]') \
.getOrCreate()
sqlContext = SQLContext(spark)
return sqlContext
def total_bandwidth(start_date, end_date):
sqlContext = connect_cassandra()
try:
df = sqlContext \
.read \
.format("org.apache.spark.sql.cassandra") \
.options(table="user_info", keyspace="acrs") \
.load()
except Exception as e:
print(e)
rows = df.where(df["created"] > str(start_date)) \
.where(df["created"] < str(end_date)) \
.groupBy(['src_ip', 'dst_ip']) \
.agg(_sum('data').alias('total')) \
.collect()
data_dict = []
for row in rows:
src_ip = row['src_ip']
dst_ip = row['dst_ip']
data = row['total']
data = {'src_ip' : src_ip, 'dst_ip' : dst_ip, 'data' : data}
data_dict.append(data)
print(data_dict)
如你们所见,我正试图用 start_date
以及 end_date
. 但这需要太多的时间,导致行动缓慢。我想知道在将表加载到dataframe时是否有可用的dataframereader选项,以便减少所花费的时间(指数优先:p)。
我阅读了Dataframe阅读器文档,发现 option(String key, String value)
但是这些选项没有文档记录,所以不可能找出cassandra数据库有哪些选项以及如何使用它们。
1条答案
按热度按时间6yoyoihd1#
您的主要问题是使用append方法。因为Dataframe中有大量的行,所以效率很低。我宁愿使用专门的pyspark方法来达到预期的效果。
我在本地机器上创建了一些带有一百万行的临时Dataframe(我假设您已经创建了sparksession)
我们只从表中选择所需的列。
最后,让我们创建所需的数据字典列表。收集所有数据的最简单方法是使用列表理解。一旦我们选择了要合并到字典中的列,我们就可以使用
toDict()
方法。吹毛求疵:
如果要收集所有值,请使用
collect()
方法。如果不知道Dataframe的确切大小,可以使用
take(n)
返回的方法n
Dataframe中的元素。