如何使用Azure Communication Email python sdk发送异步电子邮件

rdrgkggo  于 2023-08-07  发布在  Python
关注(0)|答案(1)|浏览(99)

我尝试使用Azure电子邮件通信电子邮件发送电子邮件,它工作正常,直到我需要实现异步发送电子邮件,因为它使用了多次,响应非常慢

class Email(Configuration):
def __init__(self):
    super().__init__()
    self.logger_file = 'email_exception.log'

async def send_mail(self, email_content: dict, email_recipient: str = None):
    print(f" sending email called")
    error = False
    message = "Success"
    recipient_address = email_recipient
    if email_recipient is None or email_recipient == '':
        recipient_address = os.getenv("RECIPIENT_EMAIL_ADDRESS")

    try:
        content = {
            "senderAddress": sender_address,
            "recipients":  {
                "to": [{"address": recipient_address}, {"address": recipient_address}],
                "cc": [{"address": recipient_address}],
                "bcc": [{"address": recipient_address}]
            },
            "content": email_content
        }
        client = EmailClient.from_connection_string(connection_string)

        poller = client.begin_send(content)

        print(f" polllerrrr {poller}")

        time_elapsed = 0
        while not poller.done():
            await poller.wait(POLLER_WAIT_TIME)
            time_elapsed += POLLER_WAIT_TIME

            if time_elapsed > 18 * POLLER_WAIT_TIME:
                raise RuntimeError("Polling timed out.")

        if poller.result()["status"] == "Succeeded":
            print(
                f"Successfully sent the emails (operation id: {poller.result()['id']})")
            # logging.info(f"Successfully sent the email ")
            return error, message
        else:

            raise RuntimeError(str(poller.result()["error"]))

    except Exception as ex:
        self.logger.error(
            f"exception was thrown when sending email {str(ex)}")
        error = True
        message = str(ex)
        return error, message

字符串
然而,我得到了异常object NoneType can't be used in 'await' expression,虽然它成功地发送电子邮件,我认为它不是异步的,因为它抛出异常,任何想法?

cnjp1d6j

cnjp1d6j1#

如何使用Azure Communication发送异步电子邮件电子邮件Python SDK
您可以使用下面的示例代码使用Azure Communication Email Python SDK发送异步电子邮件。

验证码:

import os
from azure.communication.email import EmailClient
import asyncio

async def send_email():
    connection_string = "your-connection string"
    client = EmailClient.from_connection_string(connection_string)

    sender_address ="<sender address>"
    recipient_address = "<recipient address>"

    email_message = {
        "subject": "This is the subject",
        "plainText": "This is the body",
        "html": "<html><h1>This is the body</h1></html>"
    }

    email_content = {
        "senderAddress": sender_address,
        "recipients": {
            "to": [{"address": recipient_address}],
        },
        "content": email_message
    }

    try:
        poller = client.begin_send(email_content)
        result = poller.result()
        if result:
            print(f"Email sent with ID: {result['id']}")
        else:
            print("Error sending email: result is None")
    except Exception as ex:
        print(f"Error sending email: {str(ex)}")

async def main():
    await send_email()

if __name__ == "__main__":
    asyncio.run(main())

字符串

输出:

Email sent with ID: 50cfeb24-6660-4cf5-a07e-xxxxxxx


的数据

邮箱:

参考:Quickstart - How to send an email using Azure Communication Service - An Azure Communication Services Quickstart | Microsoft Learn

相关问题