当每一行都是由单独的线程生成时,如何将行追加到空的pyspark Dataframe ?

rkttyhzu  于 2023-10-15  发布在  Spark
关注(0)|答案(8)|浏览(133)

我有一个空的dataframe创建如下。模式:

table_schema = StructType([
    StructField('bookingNumber', StringType(), True),
    StructField('bookingProfile', StringType(), True),
    StructField('json_string', StringType(), True),
    StructField('json_ingestion_time', TimestampType(), True)
])

def prepare_empty_df(schema: StructType):
    empty_rdd = spark.sparkContext.emptyRDD()
    empty_df = spark.createDataFrame(empty_rdd, schema)
    return empty_df

我的数据来自一个API调用。每个GET调用都将返回JSON,我正在将API响应(JSON)转换为文本。我正在解析这个JSON的一些属性,然后插入到表中。因为我有20万个JSON,所以我不想在我的表上运行20万个插入查询,我想把所有API JSON调用的结果附加到一个空的JavaScript框架中,然后简单地摄取JavaScript框架。我所做的API调用不是顺序的,而是并行线程。也就是说,我一次运行4个parallel API调用,并试图将4个输出附加到一个空的嵌套框中。下面是我如何转换API JSON并将其附加到空的JavaScript框架中。
主要方法:

if __name__ == '__main__':
    spark = SparkSession.builder.appName('Raw_Check').getOrCreate()
    batch_size = 4
    booking_ids = []
    initial_df = prepare_empty_df(schema=raw_table_schema)

    initial_load = True

            
    cquery = f'select booking_id from db.table limit 20'
    booking_ids = get_booking_ids(spark=spark, query=cquery) # returns a list of bookings

    for i in range(0, len(booking_ids), batch_size):
        sub_list = booking_ids[i:i + batch_size]
        threads = []
        for index in range(batch_size):
            t = threading.Thread(target=get_json, name=str(index), args=(spark, sub_list[index], initial_df))
            threads.append(t)
            t.start()
        for index, thread in enumerate(threads):
            thread.join()

    print('Final Dataframe count')
    print(initial_df.count())
    print('-------------------------------------------------------------------------------------------------------------------------------------------------')
    print('Final Dataframe contents')
    initial_df.show()
    print('-------------------------------------------------------------------------------------------------------------------------------------------------')

get_json方法:

def get_json(spark: SparkSession, booking_number: str, init_df: DataFrame):
    headers = {"Content-type": "some_content_type"}
    token = doing_something_to_get_token

    token_headers = {'Authorization': f"Bearer {token}"}
    api_response = requests.get(f'https://api_url?booking_number={booking_number}', headers=token_headers)
    json_data = spark.sparkContext.parallelize([api_response.text])
    df = spark.read.json(json_data)

    api_df = (df.select('id').withColumnRenamed('id', 'bookingProfile')
        .withColumn('bookingNumber', lit(booking_number))
        .withColumn('json_string', lit(api_response.text))
        .withColumn('json_ingestion_time', lit(current_timestamp()))
    )
    api_df.show()
    init_df.unionAll(api_df)

我将API输出中的每一行联合到我在main方法中创建的initial_df。当脚本运行时,由于api_df.show(),我还可以看到来自API_df的数据。四个并行线程正在启动,我可以看到一次运行4个API调用。但是在最后,空的dataframe:我创建的initial_df在脚本结束时仍然是空的。计数为零,当我显示它的内容时,它基本上打印NULL。

-------------------------------------------------------------------------------------------------------------------------------------------------
Final Dataframe count
0
-------------------------------------------------------------------------------------------------------------------------------------------------
Final Dataframe contents
+--------------+-------------------+-----------+------------------------+
|bookingNumber |bookingProfile     |json_string|     json_ingestion_time|
+--------------+-------------------+-----------+------------------------+
+--------------+-------------------+-----------+------------------------+

谁能告诉我我在这里犯了什么错误,我该如何纠正?任何帮助都非常感激。

sxissh06

sxissh061#

MapPartitions

正如others已经指出的那样,Spark中惯用的方法是通过为每个booking_id调用REST API来将booking_idMap到预期结果。
但是,当将booking_id sMap到结果时,每个REST调用都有一定的开销,用于启动HTTP连接、执行SSL握手或获取访问API所需的访问令牌。这减慢了该过程。

HTTP会话

requests库提供了一种解决方案,通过使用会话对象来减少这种开销。医生说:
因此,如果您向同一主机发出多个请求,底层的TCP连接将被重用,这可以显著提高性能
基本上,HTTP(s)连接在每次API调用后保持打开(使用keep-alive头),并在下一次调用中重用。唯一要求是请求对象从一个API调用传递到下一个调用。因此,与其发起三个HTTPS连接

requests.get('https://api_url?booking_number=1')
requests.get('https://api_url?booking_number=2')
requests.get('https://api_url?booking_number=3')

仅创建并重用了一个会话:

s = requests.Session()
s.get('https://api_url?booking_number=1')
s.get('https://api_url?booking_number=2')
s.get('https://api_url?booking_number=3')

如果请求的日志记录是enabled,我们在第一种情况下看到打开了三个HTTP连接,而在第二种情况下只有一个。

MapPartitions

Spark提供了一种重用session对象的机制:mapPartitions。这个想法是Map一个完整的行分区,而不是单个行。执行实际Map的函数获取一个包含分区所有行的迭代器,并返回一个包含结果分区的迭代器。
在开始使用迭代器之前,会话对象被创建,然后在迭代时被重用:

def callRestApi(rowIt):
    import requests, json, datetime
    headers = {"Content-type": "some_content_type"}
    token = "doing_something_to_get_token"
    token_headers = {'Authorization': f"Bearer {token}"}
    s = requests.Session()
    s.headers.update(headers)
    s.headers.update(token_headers)

    for row in rowIt:
        booking_number = row['booking_id']        
        api_response = s.get(f'https://api_url?booking_number={booking_number}').text
        json_data = json.loads(api_response)
        yield [booking_number, json_data['id'], api_response, datetime.datetime.now()]

调用mapPartitions涉及到RDD的(小的)迂回,因为Pyspark API缺少以下功能:

api_df = df.rdd.mapPartitions(callRestApi).toDF(table_schema).cache()

再次启用请求日志记录,我们看到现在每个分区只创建一个连接,并且所有请求都包含头Connection: keep-alive

讨论

  • mapPartitions的实现比直接使用map稍微复杂一些,但表现出更好的性能特征。在我的测试中,根据Spark集群、REST API服务器和身份验证过程的性能,mapPartitions至少比map快20%。
  • mapPartitions可以减少REST API服务器上的压力,因为需要更少的连接和SSL握手。
  • 在生产系统中,callRestApi内部需要某种错误处理。当从REST API服务器获取和解析响应时,我们可能会看到连接问题或格式错误的json返回。应该处理这些问题,因为它们可能会中断对特定分区中所有行的处理。
wko9yo5t

wko9yo5t2#

为什么不尝试将API响应存储到组合的JSON文件中。然后在dataframe中读取json并摄取到您想要的表中。在这一部分中,您-->将API输出中的每一行联合到initial_df。而不是dataframe将其附加到文件中。每次API调用完成后,您将拥有一个包含所有API调用的所有记录的JSON文件。将json读入一个dataframe,做任何你想要的转换并插入到所需的表中。
目前我使用类似的方法,我从API调用中阅读大约650 k的JSON行/记录,每个API调用批量读取10,000条记录。在每个API调用中,我提取所需的JSON并将其附加到JSON文件中。在所有的API调用完成后,我在JSON文件中留下了650 k的记录,接下来我用它来读取到JavaScript框架中,进行所需的扁平化和转换,并存储到一个表中。

6rqinv9w

6rqinv9w3#

将完整的逻辑移动到udf函数& spark将使用多线程从API获取数据。
您可以使用repartition(<batch_size>)函数增加或减少并行线程。
请检查下面的逻辑并根据您的要求修改。

import pyspark
from pyspark.sql.functions import col, from_json, current_timestamp, udf
from pyspark.sql import Row
from pyspark.sql.types import StringType
import requests

# creating session object globally so that it can be re used inside udf function for multiple rows across partitions.

session = requests.Session()
batch_size = 4 # Number of thread or tasks to fetch data from api.

# Dummy Schema, You can change as per your requirement.
table_schema = 'completed STRING,id STRING,title STRING,userId STRING'
# Dummy Function to get list of booking_id, You can change as per your requirement.
def get_booking_ids(spark):
    return spark.createDataFrame(map(lambda n: Row(n), range(1, 100)), ['booking_id'])
# Dummy Function to invoke rest api & return response as a text
# You can add header, authentications, check if session exists or not if not create new session .... etc.
def get_api_response(booking_number, session):
    return session.get(f'https://jsonplaceholder.typicode.com/todos/{booking_number}').text
# Registering get_api_response function in udf.
get_response_udf = udf(lambda booking_id: get_api_response(booking_id, session), StringType()) # passing session object so that it can be reused for every request.
df = get_booking_ids(spark)\
.repartition(batch_size)\
# Invoking api & getting response as json string format.
.withColumn('api_response', get_response_udf(col('booking_id')))\ 
# Cache is important, else It will send request to api for every transformations.
.cache()\ 
# applying schema to json message
.withColumn('api_response', from_json(col('api_response'), table_schema))\ 
# adding current_timestamp(), You can also put this in get_api_response function
.withColumn('json_ingestion_time', current_timestamp())\
.selectExpr('booking_id', 'api_response.*', 'json_ingestion_time') # extracting all properties from json response.
df.printSchema()

root
 |-- booking_id: long (nullable = true)
 |-- completed: string (nullable = true)
 |-- id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- json_ingestion_time: timestamp (nullable = false)
df.show(100 , False)

+----------+---------+---+----------------------------------+------+--------------------------+
|booking_id|completed|id |title                             |userId|json_ingestion_time       |
+----------+---------+---+----------------------------------+------+--------------------------+
|3         |false    |3  |fugiat veniam minus               |1     |2023-08-13 20:23:11.425511|
|1         |false    |1  |delectus aut autem                |1     |2023-08-13 20:23:11.425511|
|4         |true     |4  |et porro tempora                  |1     |2023-08-13 20:23:11.425511|
|2         |false    |2  |quis ut nam facilis et officia qui|1     |2023-08-13 20:23:11.425511|
+----------+---------+---+----------------------------------+------+--------------------------+

群集

下面是我在集群中执行的屏幕截图

z5btuh9x

z5btuh9x4#

直接通过Spark执行API请求是有意义的。您可以使用parallelizemap的组合来实现这一点:
首先,将get_json函数缩减为只获取JSON:

def get_json(booking_number):
    import requests
    import datetime

    response = requests.get(f"https://jsonplaceholder.typicode.com/posts/{booking_number}")
    
    json_data = response.json()
    json_data['json_ingestion_time'] = datetime.datetime.now().isoformat()
    json_data['bookingNumber'] = booking_number
    
    return json_data

然后创建以下函数,它并行地从API获取数据:

max_parallel_requests = 4

# get booking ids as a list
booking_ids = get_booking_ids(spark=spark, query=cquery)

# fetch data from API and store them directly in the data_rdd
data_rdd = spark.sparkContext.parallelize(booking_ids, numSlices=max_parallel_requests).map(get_json)

# create a df from the rdd
df = data_rdd.toDF()

df.show()

并行度的大小还取决于您提供给Spark的资源。

aydmsdu9

aydmsdu95#

实际上我正在为这些类型的问题编写一些代码,你可以制作一个JSON模板,并在JSON中添加一个大的JSON字符串来解析,也许问题会出现在解析过程中,但pandas可以用JSON处理解析部分。
这个包叫做JsonDF,你可以从GitHub或者pip install JsonDf下载,如果有一些功能不存在的话,你可以编辑代码,这会很有帮助。
我建议你使用的代码是:

from JsonDF.utils.Json.Json import Json

template = Json('template', {}).objectify() # empty structure
template.insert('table_name': [1, 2, 3, 4]) # insert 'key: value' in the template

print(template)

以上将输出:

{
    'template': {
        'table_name': [1, 2, 3, 4],
    }
}

你可以用tempalte.template访问模板值,这是JsonDF处理Json对象的方式。
你可以用它来创建一个json模板并添加到其中,或者甚至解析一个已经存在的json,只需将json传递到Json()对象的第二个参数中,它将解析它并从中创建一个对象,你将能够从json中插入,更新,搜索和删除。
这将帮助你只处理JSON上的所有处理,然后传递最终的JSON以转换为dataframe。
希望对你有帮助。

4zcjmb1e

4zcjmb1e6#

unionAll操作不会修改调用该方法的DataFrame,而是返回一个新的DataFrame。由于您没有将unionAll操作的结果赋值回initial_df,因此更改不是持久性的。我不知道它是否完全正确,但请尝试下面的例子。另外,如果我没有弄错的话,我认为unionAll是不推荐使用的,但你必须仔细检查。
祝你好运!
获取json

def get_json(spark: SparkSession, booking_number: str):
    headers = {"Content-type": "some_content_type"}
    token = doing_something_to_get_token

    token_headers = {'Authorization': f"Bearer {token}"}
    api_response = requests.get(f'https://api_url?booking_number={booking_number}', headers=token_headers)
    json_data = spark.sparkContext.parallelize([api_response.text])
    df = spark.read.json(json_data)

    api_df = (df.select('id').withColumnRenamed('id', 'bookingProfile')
        .withColumn('bookingNumber', lit(booking_number))
        .withColumn('json_string', lit(api_response.text))
        .withColumn('json_ingestion_time', lit(current_timestamp()))
    )
    return api_df

main.py

if __name__ == '__main__':
    spark = SparkSession.builder.appName('Raw_Check').getOrCreate()
    batch_size = 4
    booking_ids = []
    initial_df = prepare_empty_df(schema=raw_table_schema)

    initial_load = True
            
    cquery = f'select booking_id from db.table limit 20'
    booking_ids = get_booking_ids(spark=spark, query=cquery) # returns a list of bookings

    for i in range(0, len(booking_ids), batch_size):
        sub_list = booking_ids[i:i + batch_size]
        threads = []
        results = []

        for index in range(batch_size):
            t = threading.Thread(target=get_json, name=str(index), args=(spark, sub_list[index]))
            threads.append(t)
            t.start()

        for index, thread in enumerate(threads):
            thread.join()
            results.append(thread.result())  # Collect the result DataFrame

        for result_df in results:
            initial_df = initial_df.unionAll(result_df)  # Union collected DataFrames

    print('Final Dataframe count')
    print(initial_df.count())
    print('-------------------------------------------------------------------------------------------------------------------------------------------------')
    print('Final Dataframe contents')
    initial_df.show()
    print('-------------------------------------------------------------------------------------------------------------------------------------------------')
41zrol4v

41zrol4v7#

感谢大家的帮助,我采取了另一种方法,创建了一个类,并在构造函数中初始化了我的框架,如下所示:

class PrepareApiData:
    def __init__(self, df=None):
        self.finalDf = df

    def get_json(self, spark, api_param):
        try:
            # Generate a token => token_headers
            api_response = requests.get(f'API_URL={api_param}', headers=token_headers)
            json_data = spark.sparkContext.parallelize([api_response.text])
            df = spark.read.json(json_data)
            self.finalDf = self.finalDf.unionAll(df)
        except Exception as error:
            traceback.print_exc()

    def api_calls(self, spark, batch_size, booking_ids):
        try:
            for i in range(0, len(booking_ids), batch_size):
                sub_list = booking_ids[i:i + batch_size]
                threads = []
                for index in range(len(sub_list)):
                    t = threading.Thread(target=self.get_json, name=str(index), args=(spark, sub_list[index]))
                    threads.append(t)
                    t.start()
                for index, thread in enumerate(threads):
                    thread.join()
            return self.finalDf
        except Exception as e:
            traceback.print_exc()

下面是我如何从上面的类调用对象:

init_df = prepare_empty_df(schema=some_schema, spark=spark) => This method return me an empty DF with a schema
pad = PrepareApiData(init_df)
booking_ids = [LIST_OF_MY_BOOKING_IDs]
pad.api_calls(spark, batch_size, booking_ids)
gk7wooem

gk7wooem8#

我的想法是,不要把数组传给线程的参数,而是使用python * 多处理 * 库的 starmap。获取输出作为嵌套框架的列表
然后迭代数据框以合并空数据框中的结果
我希望这能解决你的问题

相关问题