每小时覆盖一个txt文件,而不是使用python追加

0kjbasz6  于 2023-11-20  发布在  Python
关注(0)|答案(1)|浏览(113)

我有一个来自shodan https://help.shodan.io/guides/how-to-monitor-network的监控代码
该脚本不断运行,并在CLI中打印的结果.我提取的数据,我需要在这种格式data=(f"{ip}:{port}")后,追加新的发现到一个txt文件名为ips.txt,我想每一个持续时间e.x:2小时,覆盖整个ips在txt中,并再次开始写入到txt文件.
因为我想在这段时间内对结果进行一些bash操作,然后对新添加的数据进行另一个操作

**总结:**我想在2小时后删除txt文件中的所有数据,然后重新开始追加。

我写了这个时间代码,但它没有工作。

import subprocess
import time
import shlex
import json
from shodan import Shodan
from shodan.helpers import get_ip
from shodan.cli.helpers import get_api_key
from bs4 import BeautifulSoup
file_path = 'ips.txt'

duration = 900
start_time = time.time()
# Setup the Shodan API connection
api = Shodan(get_api_key())
for banner in api.stream.alert():
        if 'port' in banner:
                        port=(banner['port'])
        ip = get_ip(banner)
                
        data=(f"{ip}:{port}")
while time.time() - start_time < duration:   
        with open(file_path, 'a') as file:
                file.write(data+'\n')
        time.sleep(5)
open(file_path, 'w').close()
with open(file_path, 'w') as file:
        file.write('')

字符串

r7xajy2e

r7xajy2e1#

你可以使用上下文管理器来使这一点更清晰。这种技术也避免了不断打开和关闭输出文件的需要。

from time import time
from shodan import Shodan
from shodan.helpers import get_ip
from shodan.cli.helpers import get_api_key

class IPSWriter:
    def __init__(self, filename, interval=7200):
        self._filename = filename
        self._interval = interval
        self._last_write = float("-inf")
        self._fd = None

    def __enter__(self):
        return self

    def __exit__(self, *_):
        if self._fd:
            self._fd.close()

    @property
    def fd(self):
        if self._fd is None:
            self._fd = open(self._filename, "w+")
        return self._fd

    def write(self, text):
        if (time() - self._interval) >= self._last_write:
            self.fd.seek(0)
            self.fd.truncate()
        self.fd.write(text)
        self.fd.flush()
        self._last_write = time()

file_path = "ips.txt"

api = Shodan(get_api_key())

with IPSWriter(file_path) as writer:
    try:
        for banner in api.stream.alert():
            port = banner.get("port")
            ip = get_ip(banner)
            data = f"{ip}:{port}\n"
            writer.write(data)
    except KeyboardInterrupt: # break loop with Ctrl-C
        pass

字符串

相关问题