使用spark和pyspark的并行api请求

yiytaume  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(220)

我正在使用emr集群执行以下api请求:

def get_normal_objects(self,object_name, get_id, chunk_size=35,**params):
    contacts_pages = []
    batch = 0
    while True:
        urls = ["{}/{}".format(self.base_url, "{}?page={}&api_key={}".format(object_name, page_number, self.api_keys))
                for page_number in range(batch * chunk_size + 1, chunk_size * (1 + batch) + 1)]

        responses_raw = self.get_responses(urls, self.office_token, chunk_size)
        LOGGER.info("Collecting data for {} for batch  {}".format(object_name, batch))

        try:
            responses_json = [json.loads(response_raw['output']) for response_raw in responses_raw]

当我提取不需要id的简单对象时,代码可以很好地工作,但当它尝试提取需要id的复杂关系对象时,实际上需要花费大量时间才能到达api:

"https://integrations.mydesktop.com.au/api/v1.2/properties/22028014/sales?api_key"

def get_complex_objects(self,object_name_1, object_name_2, ids,spark,  chunk_size=30,**params):
    results = []
    batch = 0

    while True:
        ids_start = batch * chunk_size + 1
        ids_end = chunk_size * (1 + batch) + 1
        chunk_ids = [ids[i] for i in range(ids_start, ids_end) if i < len(ids)]

        urls = [
            "{}/{}".format(self.base_url, "{}/{}/{}?api_key={}".format(object_name_1, contactId, object_name_2, self.api_keys))
            for contactId in chunk_ids]

        LOGGER.info("Collecting data for {}:{} for batch  {}".format(object_name_1, object_name_2, batch))
        responses_raw = self.get_responses(urls, self.office_token, chunk_size)

我使用下面的get\u response函数来获取响应:

def get_responses(self, urls, office_token, chunk_size=30,**params):
    """Calls all the urls in parallel in bathes of {chuck_size}

    Arguments:
        urls {List} -- list of all urls to call
        office_token {String} -- Office token

    Keyword Arguments:
        chunk_size {int} -- nuber of parallel api calls (default: {30})

    Returns:
        [type] -- [description]
    """
    for chunk in list(mydesktop.chunks(urls, chunk_size)):
        loop = asyncio.get_event_loop()
        future = asyncio.ensure_future(self.__run(params, office_token, chunk))
        responses = loop.run_until_complete(future)

    return responses

async def __fetch(self, url, params, session):
  try:
    async with session.get(url, params=params) as response:
        #print('X-RateLimit-Remaining:{0}'.format(response.headers['X-RateLimit-Remaining']))
        output = await response.read()
        return output
  except asyncio.TimeoutError as e:
    print(str(e))
    return None

async def __bound_fetch(self, sem, url, params, session):
    # Getter function with semaphore.
    async with sem:
        output = await self.__fetch(url, params, session)
        return {"url": url, "output":output}

async def __run(self, params, auth_user, urls):
    tasks = []
    sem = asyncio.Semaphore(400)
    async with ClientSession(auth=BasicAuth(auth_user, password='', ),
                             connector=TCPConnector(ssl=False)) as session:
        for url in urls:
            task = asyncio.ensure_future(self.__bound_fetch(sem, url, params, session))
            tasks.append(task)
        responses = await asyncio.gather(*tasks)
    return responses

我的问题是如何利用spark的并行特性,将url分发到执行器中,从而减少提取时间?
正在考虑使用url=spark.sparkcontext.parallelize(URL)将URL发送给执行者,然后使用map lambda执行get请求。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题