python Pytrends:请求失败:Google返回代码为429的响应

1tu0hz3e  于 2023-11-15  发布在  Python
关注(0)|答案(6)|浏览(385)

我使用Pytrends来提取Google趋势数据,例如:

from pytrends.request import TrendReq
pytrend = TrendReq()
pytrend.build_payload(kw_list=['bitcoin'], cat=0, timeframe=from_date+' '+today_date)

字符串
它返回一个错误:

ResponseError: The request failed: Google returned a response with code 429.


我昨天做的,但由于某种原因,它现在不工作了!github的源代码也失败了:

pytrends = TrendReq(hl='en-US', tz=360, proxies = {'https': 'https://34.203.233.13:80'})


我该怎么解决这个问题?非常感谢!

x8goxv8g

x8goxv8g1#

TLDR;我用自定义补丁解决了这个问题

说明

问题来自Google bot识别系统。与其他类似系统一样,它停止服务来自可疑客户端的过于频繁的请求。用于识别可信客户端的一些功能是由网页上的JavaScript代码生成的特定头的存在。不幸的是,python requests库并没有提供针对那些bot识别系统的这种级别的伪装,因为JavaScript代码甚至没有被执行。利用我的浏览器与谷歌趋势交互生成的标题。这些标题是由浏览器生成的,同时我使用我的谷歌帐户登录,换句话说,这些标题与我的谷歌帐户相关联,所以对他们来说,我是值得信赖的。

解决方案

我用下面的方法解决:
1.首先,您必须在使用Google帐户登录时从Web浏览器使用Google趋势;
1.为了跟踪实际的HTTP GET:(我使用Chromium)进入“更多工具”->“开发人员工具”->“网络”选项卡。
1.访问Google Trend页面并搜索趋势;它将在“网络”选项卡的左侧边栏上触发大量HTTP请求;
1.识别GET请求(在我的例子中是/trends/explore?q=topic&geo=US),右键单击它,然后选择Copy -> Copy as cURL;
1.然后转到this page并将cURL脚本粘贴到左侧,并复制页面右侧生成的python脚本中的“headers”字典;
1.然后转到你的代码并子类化TrendReq类,这样你就可以传递刚刚复制的自定义头:

from pytrends.request import TrendReq as UTrendReq
GET_METHOD='get'

import requests

headers = {
...
}

class TrendReq(UTrendReq):
    def _get_data(self, url, method=GET_METHOD, trim_chars=0, **kwargs):
        return super()._get_data(url, method=GET_METHOD, trim_chars=trim_chars, headers=headers, **kwargs)

字符串
1.从你的代码中删除任何“import TrendReq”,因为现在它将使用你刚刚创建的这个;
1.再次鞠躬;
1.如果将来出现错误消息:重复这个过程。你需要用新的值更新头字典,它可能会触发验证码机制。

kmbjn2e3

kmbjn2e32#

这一个花了一段时间,但结果是库只是需要更新。你可以看看我在这里发布的一些方法,这两种方法都导致了状态429响应:
https://github.com/GeneralMills/pytrends/issues/243
最终,我能够通过在bash提示符下运行以下命令使它再次工作:
运行:
第一个月
最新版本。
希望这对你也有用。
编辑:
如果你不能从源代码升级,你可能会有一些运气:
pip install pytrends --upgrade
另外,如果在Windows上,请确保您以管理员身份运行git。

xa9qqrwz

xa9qqrwz3#

即使在用pip install --upgrade --user git+https://github.com/GeneralMills/pytrends更新模块并重新启动python之后,我也遇到了同样的问题。
但是,这个问题是通过以下方法解决的:
而不是

pytrends = TrendReq(hl='en-US', tz=360, timeout=(10,25), proxies=['https://34.203.233.13:80',], retries=2, backoff_factor=0.1, requests_args={'verify':False})

字符串
刚跑:

pytrend = TrendReq()


希望这能有所帮助!

xxb16uws

xxb16uws4#

通过pip install运行upgrade命令后,您应该重新启动python内核并重新加载pytrend库。

slwdgvem

slwdgvem5#

我也遇到了同样的问题,做了一些与Antonio Ercole De Luca非常相似的事情。然而,对我来说,问题在于 * cookie *,而不是标题。
我像Antonio一样创建了一个子类,但这次修改了cookie方法:

cookies = {
    "SEARCH_SAMESITE": "####",
    "SID": "####",
    .
    .
    .
}

class CookieTrendReq(TrendReq):
    def GetGoogleCookie(self):
        return dict(filter(lambda i: i[0] == 'NID', cookies.items()))

字符串
我使用了和他获取头部相同的方法来获取cookie:
1.访问trends.google.com
1.打开开发人员工具,然后转到网络选项卡
1.进行搜索,然后右键单击顶部的GET请求(应该看起来像explore?q=...
1.将请求复制为cURL
1.把这个贴到curlconverter.com上,然后得到饼干!

tcbh2hod

tcbh2hod6#

现在我们再次面临同样的问题,以下代码将有助于解决429问题
下面的代码将通过更改浏览器多次重试

import json
import urllib.parse
from datetime import datetime, timedelta
from curl_cffi import requests
import time

def build_payload(keywords, timeframe='now 7-d', geo='US'):
    token_payload = {
        'hl': 'en-US',
        'tz': '0',
        'req': {
            'comparisonItem': [{'keyword': keyword, 'time': timeframe, 'geo': geo} for keyword in keywords],
            'category': 0,
            'property': ''
        }
    }
    token_payload['req'] = json.dumps(token_payload['req'])
    return token_payload

def convert_to_desired_format(raw_data):
    trend_data = {}
    for entry in raw_data['default']['timelineData']:
        timestamp = int(entry['time'])
        date_time_str = datetime.utcfromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
        value = entry['value'][0]
        trend_data[date_time_str] = value
    return trend_data

# Cookies
def get_google_cookies(impersonate_version='chrome110'):
    with requests.Session() as session:
        session.get("https://www.google.com", impersonate=impersonate_version)
        return session.cookies

def fetch_trends_data(keywords, days_ago=7, geo='US', hl='en-US', max_retries=5, browser_version='chrome110', browser_switch_retries=2):
    browser_versions = ['chrome110', 'edge101', 'chrome107', 'chrome104', 'chrome100', 'chrome101', 'chrome99']
    current_browser_version_index = browser_versions.index(browser_version)
    cookies = get_google_cookies(impersonate_version=browser_versions[current_browser_version_index])

    for browser_retry in range(browser_switch_retries + 1):
        data_fetched = False  # Reset data_fetched to False at the beginning of each browser_retry
        with requests.Session() as s:
            # phase 1: token
            for retry in range(max_retries):
                time.sleep(2)
                token_payload = build_payload(keywords)
                url = 'https://trends.google.com/trends/api/explore'
                params = urllib.parse.urlencode(token_payload)
                full_url = f"{url}?{params}"
                response = s.get(full_url, impersonate=browser_versions[current_browser_version_index], cookies=cookies)
                if response.status_code == 200:
                    content = response.text[4:]
                    try:
                        data = json.loads(content)
                        widgets = data['widgets']
                        tokens = {}
                        request = {}
                        for widget in widgets:
                            if widget['id'] == 'TIMESERIES':
                                tokens['timeseries'] = widget['token']
                                request['timeseries'] = widget['request']
                        break  # Break out of the retry loop as we got the token
                    except json.JSONDecodeError:
                        print(f"Failed to decode JSON while fetching token, retrying {retry + 1}/{max_retries}")
                else:
                    print(f"Error {response.status_code} while fetching token, retrying {retry + 1}/{max_retries}")
            else:
                print(f"Exceeded maximum retry attempts ({max_retries}) while fetching token. Exiting...")
                return None

            # phase 2: trends data
            for retry in range(max_retries):
                time.sleep(5)
                req_string = json.dumps(request['timeseries'], separators=(',', ':'))
                encoded_req = urllib.parse.quote(req_string, safe=':,+')
                url = f"https://trends.google.com/trends/api/widgetdata/multiline?hl={hl}&tz=0&req={encoded_req}&token={tokens['timeseries']}&tz=0"
                response = s.get(url, impersonate=browser_versions[current_browser_version_index], cookies=cookies)
                if response.status_code == 200:
                    content = response.text[5:]
                    try:
                        raw_data = json.loads(content)
                        # Convert raw data
                        trend_data = convert_to_desired_format(raw_data)
                        data_fetched = True  # Set data_fetched to True as we have successfully fetched the trend data
                        return trend_data
                    except json.JSONDecodeError:
                        print(f"Failed to decode JSON while fetching trends data, retrying {retry + 1}/{max_retries}")
                else:
                    print(f"Error {response.status_code} while fetching trends data, retrying {retry + 1}/{max_retries}")
            else:
                print(f"Exceeded maximum retry attempts ({max_retries}) while fetching trends data.")

        # change browser
        if not data_fetched and browser_retry < browser_switch_retries:
            time.sleep(5)
            current_browser_version_index = (current_browser_version_index + 1) % len(browser_versions)
            print(f"Switching browser version to {browser_versions[current_browser_version_index]} and retrying...")

    print(f"Exceeded maximum browser switch attempts ({browser_switch_retries}). Exiting...")
    return None

# Example
keywords = ["test"]
trends_data = fetch_trends_data(keywords)
print(trends_data)

字符串

下面的代码为顶部和上升顶部

import json
import urllib.parse
from datetime import datetime, timedelta
from curl_cffi import requests
import time
import os

def build_payload(keywords, timeframe='now 1-H', geo=''):
    token_payload = {
        'hl': 'en-US',
        'tz': '0',
        'req': {
            'comparisonItem': [{'keyword': keyword, 'time': timeframe, 'geo': geo} for keyword in keywords],
            'category': 0,
            'property': ''
        }
    }
    token_payload['req'] = json.dumps(token_payload['req'])
    return token_payload

def convert_to_desired_format(raw_data):
    trend_data = {'TOP': {}, 'RISING': {}}

    if 'rankedList' in raw_data.get('default', {}):
        for item in raw_data['default']['rankedList']:
            for entry in item.get('rankedKeyword', []):
                query = entry.get('query')
                value = entry.get('value')
                if query and value:
                    link = entry.get('link', '')
                    trend_type = link.split('=')[-1].split('&')[0].upper() if link else None

                    if trend_type in ['TOP', 'RISING']:
                        trend_data[trend_type][query] = value
    return trend_data

def get_google_cookies(impersonate_version='chrome110'):
    with requests.Session() as session:
        session.get("https://www.google.com", impersonate=impersonate_version)
        return session.cookies

def fetch_trends_data(keywords, days_ago=7, geo='US', hl='en-US', max_retries=5, browser_version='chrome110', browser_switch_retries=2):
    browser_versions = ['chrome110', 'edge101', 'chrome107', 'chrome104', 'chrome100', 'chrome101', 'chrome99']
    current_browser_version_index = browser_versions.index(browser_version)
    cookies = get_google_cookies(impersonate_version=browser_versions[current_browser_version_index])

    for browser_retry in range(browser_switch_retries + 1):
        data_fetched = False
        with requests.Session() as s:
            # phase 1: token
            for retry in range(max_retries):
                time.sleep(2)
                token_payload = build_payload(keywords)
                url = 'https://trends.google.com/trends/api/explore'
                params = urllib.parse.urlencode(token_payload)
                full_url = f"{url}?{params}"
                response = s.get(full_url, impersonate=browser_versions[current_browser_version_index], cookies=cookies)
                if response.status_code == 200:
                    content = response.text[4:]
                    try:
                        data = json.loads(content)
                        widgets = data['widgets']
                        tokens = {}
                        request = {}
                        for widget in widgets:
                            if widget['id'] == 'RELATED_QUERIES':
                                tokens['related_queries'] = widget['token']
                                request['related_queries'] = widget['request']
                        break
                    except json.JSONDecodeError:
                        print(f"Failed to decode JSON while fetching token, retrying {retry + 1}/{max_retries}")
                else:
                    print(f"Error {response.status_code} while fetching token, retrying {retry + 1}/{max_retries}")
            else:
                print(f"Exceeded maximum retry attempts ({max_retries}) while fetching token. Exiting...")
                return None

            # phase 2: trends data
            for retry in range(max_retries):
                time.sleep(5)
                req_string = json.dumps(request['related_queries'], separators=(',', ':'))
                encoded_req = urllib.parse.quote(req_string, safe=':,+')
                url = f"https://trends.google.com/trends/api/widgetdata/relatedsearches?hl={hl}&tz=0&req={encoded_req}&token={tokens['related_queries']}&tz=0"
                response = s.get(url, impersonate=browser_versions[current_browser_version_index], cookies=cookies)
                print(f"URL: {url}")
                if response.status_code == 200:
                    content = response.text[5:]
                    try:
                        file_name = f"trends_data_{os.getpid()}.json"
                        with open(file_name, 'w') as json_file:
                            json_file.write(content)
                        
                        # Remove first line from the file
                        with open(file_name, 'r') as f:
                            lines = f.readlines()[1:]
                        with open(file_name, 'w') as f:
                            f.writelines(lines)
                        
                        # Load JSON content from the file
                        with open(file_name, 'r') as json_file:
                            data = json.load(json_file)
                        
                        # Extract and print queries and values from both rankedLists separately
                        for item in data['default']['rankedList'][0]['rankedKeyword']:
                            print(f"Top: {item['query']}, Value: {item['value']}")
                        
                        for item in data['default']['rankedList'][1]['rankedKeyword']:
                            print(f"Rising: {item['query']}, Value: {item['value']}")
                        
                        return content
                    except json.JSONDecodeError:
                        print(f"Failed to decode JSON while fetching trends data, retrying {retry + 1}/{max_retries}")
                else:
                    print(f"Error {response.status_code} while fetching trends data, retrying {retry + 1}/{max_retries}")
            else:
                print(f"Exceeded maximum retry attempts ({max_retries}) while fetching trends data.")

        if not data_fetched and browser_retry < browser_switch_retries:
            time.sleep(5)
            current_browser_version_index = (current_browser_version_index + 1) % len(browser_versions)
            print(f"Switching browser version to {browser_versions[current_browser_version_index]} and retrying...")

    print(f"Exceeded maximum browser switch attempts ({browser_switch_retries}). Exiting...")
    return None

# Example
keywords = ["test"]
trends_data = fetch_trends_data(keywords)
print(trends_data)

相关问题