Pyspark如果经过特定的持续时间,则定期从数据库中获取数据

hrysbysz  于 2022-12-17  发布在  Spark
关注(0)|答案(1)|浏览(122)

我尝试在Pyspark中根据条件current_time - lastReadTime〉refresh_interval定期从数据库读取数据。我提供的refresh_interval是5分钟。
这是Kafka的结构化流媒体,我稍后加入来自postgres的数据。
然而,每当我在5分钟内更改数据库中的数据时,即使5分钟还没有过去,我也会从数据库中获取新数据。
下面是我正在使用的代码。

def __init__(self, config,spark):
    self.refresh_frequency_sec = config.getint('postgres-config', 'refresh-frequency-sec')
    self.spark = spark
    self.lastMetaReadTime = time()
    self.rules = self.fetchRules()
    
def fetchRules(self):
    jdbcDF = self.spark.read \
        .format("jdbc") \
        .option("driver", "org.postgresql.Driver")\
        .option("url", self.connection_url) \
        .option("dbtable", self.dbtable) \
        .option("user", self.user) \
        .option("password", self.password) \
        .load()
    return jdbcDF
    
def getRules(self):
    
    if time() - self.lastMetaReadTime > self.refresh_frequency_sec:
        self.rules = self.fetchRules()
        self.lastMetaReadTime = time()
        
    return self.rules

我哪里做错了?

tzdcorbm

tzdcorbm1#

您在innit()方法中为'self.rules'调用了错误的方法。
您的初始方法应该是:

def __init__(self, config,spark):
    self.refresh_frequency_sec = config.getint('postgres-config', 'refresh-frequency-sec')
    self.spark = spark
    self.lastMetaReadTime = time()
    self.rules = self.getRules()

相关问题