python 使用Shopify GraphQL API自动上传照片,但我一直遇到字符串格式错误

vsnjm48y  于 12个月前  发布在  Python
关注(0)|答案(1)|浏览(82)

我已经尝试调整格式的工作,但我一直运行到相同的错误,无论如何改变。我是GraphQL和Shopify的新手,所以我有点困惑。
我试图实现的目标是自动上传大约2000张照片到shopify商店,并根据SKU/ProductID组合来执行此操作,以防止照片进入错误的项目。我已经有了一个相当实用的版本,使用REST API,但我想通过使用GraphQL使事情更高效,更快。

import os
import re
import time
import base64
import logging
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import tkinter as tk
from tkinter import filedialog

from shopify_config import SHOPIFY_STORE_URL, API_KEY, PASSWORD, API_VERSION

class ShopifyConnection:
    def __init__(self):
        self.base_url = f"{SHOPIFY_STORE_URL}/admin/api/{API_VERSION}/graphql.json"
        self.headers = {
            'Content-Type': 'application/json',
            'X-Shopify-Access-Token': PASSWORD
        }
        self.session = self._setup_session()

    def _setup_session(self):
        """Sets up a session with retry capabilities."""
        session = requests.Session()
        retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504])
        session.mount('https://', HTTPAdapter(max_retries=retries))
        return session

    def make_request(self, query):
        """Makes a GraphQL request to the Shopify API."""
        return self.session.post(self.base_url, headers=self.headers, json={'query': query})

##Used to paginate through shopify inventory and handle rate limits
class ShopifyPaginator:
    def __init__(self, connection):
        self.connection = connection

    def _handle_rate_limits(self, response):
        """Handle rate limits based on the response headers."""
        # Extract the rate limit headers
        query_cost = int(response.headers.get('X-Shopify-Storefront-Graphql-Call-Cost', 0))
        remaining_cost = int(response.headers.get('X-Shopify-Storefront-Graphql-Remaining-Cost', 1000))

        # Calculate the time to sleep based on the query cost and remaining bucket size
        if remaining_cost < query_cost:
            sleep_time = (query_cost - remaining_cost) / 50  # 50 points refill per second
            print("---------------------------------------------------------")
            print(f"Rate limit usage high, sleeping for {sleep_time} seconds")
            print("---------------------------------------------------------")
            time.sleep(sleep_time)

    def get_all_items(self, query, item_key):
        """Fetch all items from a paginated Shopify GraphQL endpoint."""
        items = []
        cursor = None

        while True:
            # Format the query with the cursor if it exists
            formatted_query = query.format(cursor=f'"{cursor}"' if cursor else 'null')
            response = self.connection.make_request(formatted_query)
            self._handle_rate_limits(response)  # Handle rate limits

            data = response.json()
            items.extend(data.get('data', {}).get(item_key, {}).get('edges', []))

            # Check for pagination
            if items and 'cursor' in items[-1]:
                cursor = items[-1]['cursor']
            else:
                break

        return items
    
    
##Filters for shopify
class ShopifyProductFilter:
    def __init__(self, paginator, namespace, key):
        self.paginator = paginator
        self.namespace = namespace
        self.key = key 

    def get_ProductIDandSKUsWithPhotoFalse(self):
        """Fetch a list of product IDs and their respective SKUs where the metafield value is FALSE."""
        
        query = """
        {{
            products(first: 250, after: {cursor}) {{
                edges {{
                    cursor
                    node {{
                        id
                        variants(first: 5) {{
                            edges {{
                                node {{
                                    sku
                                }}
                            }}
                        }}
                        metafields(first: 5, namespace: "{namespace}", key: "{key}") {{
                            edges {{
                                node {{
                                    value
                                }}
                            }}
                        }}
                    }}
                }}
            }}
        }}
        """

        
        formatted_query = query.format(namespace=self.namespace, key=self.key, cursor="{cursor}")
        
        products = self.paginator.get_all_items(formatted_query, 'products')
        
        SKUandProductID = []
        
        for product in products:
            if any(metafield['node']['value'] == 'FALSE' for metafield in product['node']['metafields']['edges']):
                product_id = product['node']['id']
                for variant in product['node']['variants']['edges']:
                    sku = variant['node']['sku']
                    SKUandProductID.append({'product_id': product_id, 'sku': sku})
        return SKUandProductID


class ShopifyPhotoUploader:
    def __init__(self, connection):
        self.connection = connection
        self.missing_photos = []
        self.successfully_uploaded = []

    def select_folder(self):
        root = tk.Tk()
        root.withdraw()
        folder_selected = filedialog.askdirectory()
        return folder_selected

    def upload_images_from_folder(self, folder_path, SKUandProductID):
        # Iterate through files in the folder and subfolders
        for root, _, files in os.walk(folder_path):
            for file in files:
                # Check if the file matches the naming scheme "GCE#20001_1.jpg"
                match = re.match(r'GCE#(\d+)_(\d+)\.jpg', file)
                if match:
                    sku, position = match.groups()
                    product_id = next((item['product_id'] for item in SKUandProductID if item['sku'] == sku), None)
                    if product_id:
                        full_image_path = os.path.join(root, file)
                        if self.upload_image(product_id, full_image_path, int(position), sku):
                            self.successfully_uploaded.append({'product_id': product_id, 'sku': sku})
                    else:
                        self.missing_photos.append(sku)

        # Create a log file for SKUs without photos
        if self.missing_photos:
            with open('missing_photos.log', 'w') as log_file:
                log_file.write("\n".join(self.missing_photos))

        # Update metafields for successfully uploaded images
        for item in self.successfully_uploaded:
            self.update_metafield(item['product_id'], "custom", "photo_shoot_", "True")

    def upload_image(self, product_id, image_path, position, sku):
        """Upload an image to Shopify using GraphQL."""
        # Encode the image to base64
        with open(image_path, "rb") as image_file:
            encoded_image = base64.b64encode(image_file.read()).decode('utf-8')
        
        # GraphQL mutation for uploading the image
        mutation = """
        mutation productImageCreate($productId: ID!, $image: ImageInput!) {
            productImageCreate(productId: $productId, image: $image) {
                image {
                    id
                    src
                }
                userErrors {
                    field
                    message
                }
            }
        }
        """

        # Variables for the mutation
        variables = {
            "productId": product_id,
            "image": {
                "attachment": encoded_image,
                "filename": f"{sku}_{position}.jpg",
                "position": position
            }
        }

        # Send the mutation to Shopify
        response = self.connection.make_request(mutation, variables)
        data = response.json()

        # Check for errors
        user_errors = data.get('data', {}).get('productImageCreate', {}).get('userErrors', [])
        if user_errors:
            for error in user_errors:
                field = error.get('field', 'Unknown field')
                message = error.get('message', 'Unknown error')
                print(f"Error uploading image for SKU {sku} at position {position}: {field} - {message}")
            return False
        else:
            image_id = data.get('data', {}).get('productImageCreate', {}).get('image', {}).get('id')
            image_src = data.get('data', {}).get('productImageCreate', {}).get('image', {}).get('src')
            print(f"Successfully uploaded image for SKU {sku} at position {position}. Image ID: {image_id}, Image URL: {image_src}")
            return True

    def update_metafield(self, product_id, namespace, key, value):
        """Update a product's metafield using GraphQL."""
        mutation = """
        mutation productUpdate($input: ProductInput!) {
            productUpdate(input: $input) {
                product {
                    id
                }
                userErrors {
                    field
                    message
                }
            }
        }
        """

        # Variables for the mutation
        variables = {
            "input": {
                "id": product_id,
                "metafields": [{
                    "namespace": namespace,
                    "key": key,
                    "value": value,
                    "valueType": "STRING"
                }]
            }
        }

        # Send the mutation to Shopify
        response = self.connection.make_request(mutation, variables)
        data = response.json()

        # Check for errors
        user_errors = data.get('data', {}).get('productUpdate', {}).get('userErrors', [])
        if user_errors:
            for error in user_errors:
                field = error.get('field', 'Unknown field')
                message = error.get('message', 'Unknown error')
                print(f"Error updating metafield for product {product_id}: {field} - {message}")
        else:
            print(f"Successfully updated metafield for product {product_id}.")

if __name__ == '__main__':
    
    NAMESPACE = 'custom'
    KEY = 'photo_shoot_'
    
    connection = ShopifyConnection()
    paginator = ShopifyPaginator(connection)
    
    filter = ShopifyProductFilter(paginator, NAMESPACE, KEY)
    SKUandProductID = filter.get_ProductIDandSKUsWithPhotoFalse()
    
    uploader = ShopifyPhotoUploader(connection)
    
    folder_path = uploader.select_folder()
    if not folder_path:
        print("No folder selected. Exiting")
        exit()
        
    uploader.upload_images_from_folder(folder_path, SKUandProductID)
    
    print("Image upload process complete!")

这里是错误

File "c:\Users\**\Desktop\ShopifyAutomation\RefactoredCode\uploadTool.py", line 272, in <module>
    SKUandProductID = filter.get_ProductIDandSKUsWithPhotoFalse()
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\**\Desktop\ShopifyAutomation\RefactoredCode\uploadTool.py", line 117, in get_ProductIDandSKUsWithPhotoFalse
    products = self.paginator.get_all_items(formatted_query, 'products')
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\**\Desktop\ShopifyAutomation\RefactoredCode\uploadTool.py", line 60, in get_all_items
    formatted_query = query.format(cursor=f'"{cursor}"' if cursor else 'null')
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
KeyError: '\n            products(first'
xytpbqjk

xytpbqjk1#

发现了这个问题,只是以防任何人遇到问题的标题一样,我只是双重检查您的“/”和“的。我的基本URL字符串中有一个额外的“/”。

相关问题