我已经尝试调整格式的工作,但我一直运行到相同的错误,无论如何改变。我是GraphQL和Shopify的新手,所以我有点困惑。
我试图实现的目标是自动上传大约2000张照片到shopify商店,并根据SKU/ProductID组合来执行此操作,以防止照片进入错误的项目。我已经有了一个相当实用的版本,使用REST API,但我想通过使用GraphQL使事情更高效,更快。
import os
import re
import time
import base64
import logging
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import tkinter as tk
from tkinter import filedialog
from shopify_config import SHOPIFY_STORE_URL, API_KEY, PASSWORD, API_VERSION
class ShopifyConnection:
def __init__(self):
self.base_url = f"{SHOPIFY_STORE_URL}/admin/api/{API_VERSION}/graphql.json"
self.headers = {
'Content-Type': 'application/json',
'X-Shopify-Access-Token': PASSWORD
}
self.session = self._setup_session()
def _setup_session(self):
"""Sets up a session with retry capabilities."""
session = requests.Session()
retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504])
session.mount('https://', HTTPAdapter(max_retries=retries))
return session
def make_request(self, query):
"""Makes a GraphQL request to the Shopify API."""
return self.session.post(self.base_url, headers=self.headers, json={'query': query})
##Used to paginate through shopify inventory and handle rate limits
class ShopifyPaginator:
def __init__(self, connection):
self.connection = connection
def _handle_rate_limits(self, response):
"""Handle rate limits based on the response headers."""
# Extract the rate limit headers
query_cost = int(response.headers.get('X-Shopify-Storefront-Graphql-Call-Cost', 0))
remaining_cost = int(response.headers.get('X-Shopify-Storefront-Graphql-Remaining-Cost', 1000))
# Calculate the time to sleep based on the query cost and remaining bucket size
if remaining_cost < query_cost:
sleep_time = (query_cost - remaining_cost) / 50 # 50 points refill per second
print("---------------------------------------------------------")
print(f"Rate limit usage high, sleeping for {sleep_time} seconds")
print("---------------------------------------------------------")
time.sleep(sleep_time)
def get_all_items(self, query, item_key):
"""Fetch all items from a paginated Shopify GraphQL endpoint."""
items = []
cursor = None
while True:
# Format the query with the cursor if it exists
formatted_query = query.format(cursor=f'"{cursor}"' if cursor else 'null')
response = self.connection.make_request(formatted_query)
self._handle_rate_limits(response) # Handle rate limits
data = response.json()
items.extend(data.get('data', {}).get(item_key, {}).get('edges', []))
# Check for pagination
if items and 'cursor' in items[-1]:
cursor = items[-1]['cursor']
else:
break
return items
##Filters for shopify
class ShopifyProductFilter:
def __init__(self, paginator, namespace, key):
self.paginator = paginator
self.namespace = namespace
self.key = key
def get_ProductIDandSKUsWithPhotoFalse(self):
"""Fetch a list of product IDs and their respective SKUs where the metafield value is FALSE."""
query = """
{{
products(first: 250, after: {cursor}) {{
edges {{
cursor
node {{
id
variants(first: 5) {{
edges {{
node {{
sku
}}
}}
}}
metafields(first: 5, namespace: "{namespace}", key: "{key}") {{
edges {{
node {{
value
}}
}}
}}
}}
}}
}}
}}
"""
formatted_query = query.format(namespace=self.namespace, key=self.key, cursor="{cursor}")
products = self.paginator.get_all_items(formatted_query, 'products')
SKUandProductID = []
for product in products:
if any(metafield['node']['value'] == 'FALSE' for metafield in product['node']['metafields']['edges']):
product_id = product['node']['id']
for variant in product['node']['variants']['edges']:
sku = variant['node']['sku']
SKUandProductID.append({'product_id': product_id, 'sku': sku})
return SKUandProductID
class ShopifyPhotoUploader:
def __init__(self, connection):
self.connection = connection
self.missing_photos = []
self.successfully_uploaded = []
def select_folder(self):
root = tk.Tk()
root.withdraw()
folder_selected = filedialog.askdirectory()
return folder_selected
def upload_images_from_folder(self, folder_path, SKUandProductID):
# Iterate through files in the folder and subfolders
for root, _, files in os.walk(folder_path):
for file in files:
# Check if the file matches the naming scheme "GCE#20001_1.jpg"
match = re.match(r'GCE#(\d+)_(\d+)\.jpg', file)
if match:
sku, position = match.groups()
product_id = next((item['product_id'] for item in SKUandProductID if item['sku'] == sku), None)
if product_id:
full_image_path = os.path.join(root, file)
if self.upload_image(product_id, full_image_path, int(position), sku):
self.successfully_uploaded.append({'product_id': product_id, 'sku': sku})
else:
self.missing_photos.append(sku)
# Create a log file for SKUs without photos
if self.missing_photos:
with open('missing_photos.log', 'w') as log_file:
log_file.write("\n".join(self.missing_photos))
# Update metafields for successfully uploaded images
for item in self.successfully_uploaded:
self.update_metafield(item['product_id'], "custom", "photo_shoot_", "True")
def upload_image(self, product_id, image_path, position, sku):
"""Upload an image to Shopify using GraphQL."""
# Encode the image to base64
with open(image_path, "rb") as image_file:
encoded_image = base64.b64encode(image_file.read()).decode('utf-8')
# GraphQL mutation for uploading the image
mutation = """
mutation productImageCreate($productId: ID!, $image: ImageInput!) {
productImageCreate(productId: $productId, image: $image) {
image {
id
src
}
userErrors {
field
message
}
}
}
"""
# Variables for the mutation
variables = {
"productId": product_id,
"image": {
"attachment": encoded_image,
"filename": f"{sku}_{position}.jpg",
"position": position
}
}
# Send the mutation to Shopify
response = self.connection.make_request(mutation, variables)
data = response.json()
# Check for errors
user_errors = data.get('data', {}).get('productImageCreate', {}).get('userErrors', [])
if user_errors:
for error in user_errors:
field = error.get('field', 'Unknown field')
message = error.get('message', 'Unknown error')
print(f"Error uploading image for SKU {sku} at position {position}: {field} - {message}")
return False
else:
image_id = data.get('data', {}).get('productImageCreate', {}).get('image', {}).get('id')
image_src = data.get('data', {}).get('productImageCreate', {}).get('image', {}).get('src')
print(f"Successfully uploaded image for SKU {sku} at position {position}. Image ID: {image_id}, Image URL: {image_src}")
return True
def update_metafield(self, product_id, namespace, key, value):
"""Update a product's metafield using GraphQL."""
mutation = """
mutation productUpdate($input: ProductInput!) {
productUpdate(input: $input) {
product {
id
}
userErrors {
field
message
}
}
}
"""
# Variables for the mutation
variables = {
"input": {
"id": product_id,
"metafields": [{
"namespace": namespace,
"key": key,
"value": value,
"valueType": "STRING"
}]
}
}
# Send the mutation to Shopify
response = self.connection.make_request(mutation, variables)
data = response.json()
# Check for errors
user_errors = data.get('data', {}).get('productUpdate', {}).get('userErrors', [])
if user_errors:
for error in user_errors:
field = error.get('field', 'Unknown field')
message = error.get('message', 'Unknown error')
print(f"Error updating metafield for product {product_id}: {field} - {message}")
else:
print(f"Successfully updated metafield for product {product_id}.")
if __name__ == '__main__':
NAMESPACE = 'custom'
KEY = 'photo_shoot_'
connection = ShopifyConnection()
paginator = ShopifyPaginator(connection)
filter = ShopifyProductFilter(paginator, NAMESPACE, KEY)
SKUandProductID = filter.get_ProductIDandSKUsWithPhotoFalse()
uploader = ShopifyPhotoUploader(connection)
folder_path = uploader.select_folder()
if not folder_path:
print("No folder selected. Exiting")
exit()
uploader.upload_images_from_folder(folder_path, SKUandProductID)
print("Image upload process complete!")
这里是错误
File "c:\Users\**\Desktop\ShopifyAutomation\RefactoredCode\uploadTool.py", line 272, in <module>
SKUandProductID = filter.get_ProductIDandSKUsWithPhotoFalse()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "c:\Users\**\Desktop\ShopifyAutomation\RefactoredCode\uploadTool.py", line 117, in get_ProductIDandSKUsWithPhotoFalse
products = self.paginator.get_all_items(formatted_query, 'products')
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "c:\Users\**\Desktop\ShopifyAutomation\RefactoredCode\uploadTool.py", line 60, in get_all_items
formatted_query = query.format(cursor=f'"{cursor}"' if cursor else 'null')
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
KeyError: '\n products(first'
1条答案
按热度按时间xytpbqjk1#
发现了这个问题,只是以防任何人遇到问题的标题一样,我只是双重检查您的“/”和“的。我的基本URL字符串中有一个额外的“/”。