Xero:OAuth2:Python3:获取刷新令牌的示例代码

ebdffaop  于 2023-10-15  发布在  Python
关注(0)|答案(2)|浏览(121)

Xero已更改其API,要求OAuth2连接而不是OAuth 1。
我在OAuth 1中有一个工作解决方案,但OAuth2的示例最多也就是稀缺的,但主要是针对网站。
我是另一个开发人员,他设法创建了一个Oauth 1解决方案,成功地作为一个机器对机器解决方案工作,没有涉及Web服务器。
Xero有一些在Postman中运行的示例,它们在我的测试环境中很容易工作。
我尝试在Python 3中重现刷新令牌的 Postman 操作。
下面的代码是我目前的位置:

#client_id = "xxxxx"
#client_secret = "xxxxx"
callback_url = "https://api.xero.com/connections"
re_directURI = "https://developer.xero.com"
scopes = "offline_access accounting.contacts accounting.transactions"

refresh_url = "https://identity.xero.com/connect/token"

access_token = open('AccessToken.txt').read()
old_refresh_token = open('RefreshToken.txt','r').read()

# Refresh Token code...

import requests

#def refresh_xero_token(refresh_token):

headers = {
'grant_type': 'refresh_token',
'Content-Type': 'application/json',
}
data = {
'grant_type': 'refresh_token',
'refresh_token': old_refresh_token,
'client_id': client_id,
'client_secret': client_secret
}
print(data,headers)
response = requests.post(refresh_url, headers=headers, data=data)

#return response.json()
print(response.text)

到目前为止,我还没有找到一个不使用Web服务器的例子,只是使用Python与Xero服务器通信,将本地数据传输到Zero API中。
使用xoauth,.exe(windows)获取access_token,然后在postman中我可以运行示例刷新令牌,连接,发票等演示公司。
我相信只要能够复制这些例子,就能为我提供一个可行的解决方案。
当前使用此Python代码,我只能得到{“error”:“invalid_request”}
所以,很明显我错过了什么。
我将自己归类为Python或Oauth2的新手,但由于我之前成功使用了Oauth 1连接的解决方案,因此选择了这条道路。
我会问Xero开发者社区,但我写这篇文章是为了我们软件的用户将数据推送到他们的Xero帐户,所以为了测试,我只有一个试用帐户,这并不能给予我访问Xero开发者社区的权限。
这本身就很烦人。
Xero支持似乎也没有什么用,我试过了。
如果有任何人在那里能够帮助,这将是神话般的。
提前感谢您提供的任何帮助。

wz1wpwve

wz1wpwve1#

作为附加信息,我还可以使用此连接,例如在Xero中创建联系人:
在本例中,irContact是SQL表中的SQLAlchemy数据行。
def create_contact(connection,irContact,access_token):

#Setup new contact
address1 = {"AddressType": "POBOX"}

if irContact['addressline1'] is not None: address1.update({"AddressLine1": irContact['addressline1']})
if irContact['addressline2'] is not None: address1.update({"AddressLine2": irContact['addressline2']})
if irContact['addressline3'] is not None: address1.update({"AddressLine3": irContact['addressline3']})
if irContact['addressline4'] is not None: address1.update({"AddressLine4": irContact['addressline4']})
if irContact['city'] is not None: address1.update({"City": irContact['city']})
if irContact['region'] is not None: address1.update({"Region": irContact['region']})
if irContact['postalcode'] is not None: address1.update({"PostalCode": irContact['postalcode']})
if irContact['country'] is not None: address1.update({"Country": irContact['country']})
if irContact['attentionto'] is not None: address1.update({"AttentionTo": irContact['attentionto']})

#print (address1.values())

addresses = []
addresses.append(address1)

phones = []
if irContact['phonenumber'] is not None:
    phone1 = {"PhoneType": "DEFAULT"}
    #phone1.update({"PhoneType": "DEFAULT"})
    if irContact['phonenumber'] is not None: phone1.update({"PhoneNumber": irContact['phonenumber']})
    if irContact['phoneareacode'] is not None: phone1.update({"PhoneAreaCode": irContact['phoneareacode']})
    if irContact['phonecountrycode'] is not None: phone1.update({"PhoneCountryCode": irContact['phonecountrycode']})

    phones.append(phone1)
    #print (phone1.values())
    

if irContact['mobilenumber'] is not None:
    phone2 = {"PhoneType": "MOBILE"}
    if irContact['phonenumber'] is not None: phone2.update({"PhoneNumber": irContact['mobilenumber']})
    if irContact['phoneareacode'] is not None: phone2.update({"PhoneAreaCode": irContact['mobileareacode']})
    if irContact['phonecountrycode'] is not None: phone2.update({"PhoneCountryCode": irContact['mobilecountrycode']})

    phones.append(phone2)
    #print (phone2.values())
    

contact = { "Name": irContact['name'],
             "ContactNumber": irContact['contactnumber'],
             "AccountNumber": irContact['accountnumber'],
             #"ContactStatus": "ACTIVE",
             "FirstName": irContact['firstname'],
             "LastName": irContact['lastname'],
             #"EmailAddress": irContact['emailaddress'],
             "Addresses": addresses,
             #"Phones":phones
            }

contacts = [contact]
#print(contacts)

contacts_url = "https://api.xero.com/api.xro/2.0/Contacts"

headers = {
  'Authorization': f"Bearer {access_token}",
  'Accept': 'application/json',
  'Content-Type': 'application/json',
  'xero-tenant-id': tenant_id,
}
data = {
'Contacts': [contact],
}
#print(data)

try:
    response = requests.post(contacts_url, headers=headers, json=data)
except Exception as err:
    print("ERROR! Contact: %s" % (str(err) ))
    print(response.text)
    return 0

#print(response.text)

results = response.json()

if 'Contacts' in results:
    newcontacts = results['Contacts']
    for newcontact in newcontacts: break

    query = "update xero_contact set errortext='', ContactID='%s' where id=%d" % (newcontact["ContactID"], irContact['id'])
    connection.execute(query)

    query = "update xero_invoice_header set ContactID='%s' where OurContactID=%d and (InvoiceID='' or InvoiceID is null ) " % ( newcontact["ContactID"], irContact['id']  )
    connection.execute(query)

我相信,有了这么多的信息,任何人都可以创建自己的Xero机器到机器界面.通过对请求调用的头部和/或数据元素进行最小的调整,可以读取和创建其他记录。
我发现缺乏这些信息非常令人沮丧,如果人们能找到这些信息,它可能会帮助他们在未来。

4jb9z9bj

4jb9z9bj2#

在使用xoauth应用程序并建立连接之后,我发现使用刷新令牌,运行此函数可以保持连接正常运行。

def refresh_xero_token():
    refresh_url =       "https://identity.xero.com/connect/token"

    old_refresh_token = open('RefreshToken.txt','r').read()

    tokenb4 = f"{client_id}:{client_secret}"
    basic_token = base64.urlsafe_b64encode(tokenb4.encode()).decode()

    headers = {
      'Authorization': f"Basic {basic_token}",
      'Content-Type': 'application/x-www-form-urlencoded',
    }
    data = {
    'grant_type': 'refresh_token',
    'refresh_token': old_refresh_token
    }

    try:
        response = requests.post(refresh_url, headers=headers, data=data)

        results = response.json()
        open('RefreshToken.txt','w').write(results["refresh_token"])
        open('AccessToken.txt','w').write(results["access_token"])

    except Exception as err:
        print("ERROR ! Refreshing token error?")
        print(response.text)

相关问题