pandas Discord.py 异步等待不会关闭与气流DAG/任务的连接

k4emjkb1  于 2023-02-14  发布在  其他
关注(0)|答案(1)|浏览(112)
    • bounty已结束**。此问题的答案可获得+50的信誉奖励。奖励宽限期将在1小时后结束。NinjaWarrior正在寻找来自信誉良好来源的答案:答案应该通过修改我提供的代码片段以及解释来提供代码示例。

我正尝试将一个 Dataframe 发送到Discord通道。但是,我在获取www.example.com以关闭连接并继续下一个任务时遇到问题。我已经尝试使用此线程(How to run async function in Airflow?)中建议的事件循环以及www.example.com()函数。我对异步不太熟悉,希望在此处获得一些指针。以下是我的Python代码,我将使用 Discord.py to close the connection and move on the next task. I've tried using the event loop as suggested in this thread ( How to run async function in Airflow? ) as well as asyncio.run() function. Not really familiar with the async and hoping to get some pointers here. Below is my code in Python that I've tried importing in DAG and Task without success. Thanks in advance!
空气流量:2.5.1
Python语言:3.7

import discord
from tabulate import tabulate
import asyncio
import pandas as pd


async def post_to_discord(df, channel_id, bot_token, as_message=True, num_rows=5):
    intents = discord.Intents.default()
    intents.members = True
    client = discord.Client(intents=intents)
    try:
        @client.event
        async def on_ready():
            channel = client.get_channel(channel_id)
            if as_message:
                # Post the dataframe as a message, num_rows rows at a time
                for i in range(0, len(df), num_rows):
                    message = tabulate(df.iloc[i:i+num_rows,:], headers='keys', tablefmt='pipe', showindex=False)
                    await channel.send(message)

            else:
                # Send the dataframe as a CSV file
                df.to_csv("dataframe.csv", index=False)
                with open("dataframe.csv", "rb") as f:
                    await channel.send(file=discord.File(f))
        # client.run(bot_token)
        await client.start(bot_token)
        await client.wait_until_ready()
    finally:
        await client.close()

async def main(df, channel_id, bot_token, as_message=True, num_rows=5):
    # loop = asyncio.get_event_loop()
    # result = loop.run_until_complete(post_to_discord(df, channel_id, bot_token, as_message, num_rows))
    result = asyncio.run(post_to_discord(df, channel_id, bot_token, as_message, num_rows))
    await result
    return result

if __name__ =='__main__':
    main()
fkvaft9z

fkvaft9z1#

看起来你的脚本可以工作,但是服务器阻塞了打开的套接字(值得称赞的是-discord服务器很擅长这个),所以我们将创建一个ping函数(从另一个答案中采用)。

def ping(ip, port):
    try:
        s = socket.socket() # TCP - standard values are `socket.AF_INET, socket.SOCK_STREAM` so you don't have to write them
        s.settimeout(2)
        print('[DEBUG] connect')
        s.connect((ip, int(port)))
        #result = s.connect_ex((ip, int(port)))
        #print('result:', result)
        return True
    except socket.timeout as ex:
        print('[DEBUG] timeout')
        return True        
    except Exception as ex:
        print('[Exception]', ex)
        return False
    finally:
        print('[DEBUG] close')
        s.close()

请随意测试您的ID

id = ...channel number...
print(id, type(id))

你应该看到

<built-in function id> <class 'builtin_function_or_method'>

然后让我们继续改进您的代码:

import discord
import asyncio
import time # you are not using this module 
import socket
import os
from tabulate import tabulate
import pandas as pd # as pd is not required

def ping(ip, port):
        try:
            s = socket.socket() # TCP - standard values are `socket.AF_INET, socket.SOCK_STREAM` so you don't have to write them
            s.settimeout(2)
            print('[DEBUG] connect')
            s.connect((ip, int(port)))
            #result = s.connect_ex((ip, int(port)))
            #print('result:', result)
            return True
        except socket.timeout as ex:
            print('[DEBUG] timeout')
            return True        
        except Exception as ex:
            print('[Exception]', ex)
            return False
        finally:
            print('[DEBUG] close')
            s.close()


TOKEN = os.getenv('DISCORD_TOKEN')
client = discord.Client()

async def post_to_discord(df, channel_id, bot_token, as_message=True, num_rows=5):
    intents = discord.Intents.default()
    intents.members = True
    client = discord.Client(intents=intents)
    try:
        @client.event
        async def on_ready():
            channel = client.get_channel(channel_id)
            if as_message:
                # Post the dataframe as a message, num_rows rows at a time
                for i in range(0, len(df), num_rows):
                    message = tabulate(df.iloc[i:i+num_rows,:], headers='keys', tablefmt='pipe', showindex=False)
                    await channel.send(message)

            else:
                # Send the dataframe as a CSV file
                df.to_csv("dataframe.csv", index=False)
                with open("dataframe.csv", "rb") as f:
                    await channel.send(file=discord.File(f))
        # client.run(bot_token)
        await client.start(bot_token)
        await client.wait_until_ready()
        while True:
            online = ping("26.51.174.109", "25565") #modify it as you see fit
            #online = ping("192.168.1.101", "8081") #same as above
            if online:
                print("server online")
                #await channel.edit(name="Server Status - Online")
            else:
                print("server offline")
                #await channel.edit(name="Server Status - Offline")
            await asyncio.sleep(5)
        # optional - client.run(TOKEN)
    finally:
        await client.close()

async def main(df, channel_id, bot_token, as_message=True, num_rows=5):
    # loop = asyncio.get_event_loop()
    # result = loop.run_until_complete(post_to_discord(df, channel_id, bot_token, as_message, num_rows))
    result = asyncio.run(post_to_discord(df, channel_id, bot_token, as_message, num_rows))
    await result
    return result

if __name__ =='__main__':
    main()

相关问题