我尝试在Pyspark中根据条件current_time - lastReadTime〉refresh_interval定期从数据库读取数据。我提供的refresh_interval是5分钟。
这是Kafka的结构化流媒体,我稍后加入来自postgres的数据。
然而,每当我在5分钟内更改数据库中的数据时,即使5分钟还没有过去,我也会从数据库中获取新数据。
下面是我正在使用的代码。
def __init__(self, config,spark):
self.refresh_frequency_sec = config.getint('postgres-config', 'refresh-frequency-sec')
self.spark = spark
self.lastMetaReadTime = time()
self.rules = self.fetchRules()
def fetchRules(self):
jdbcDF = self.spark.read \
.format("jdbc") \
.option("driver", "org.postgresql.Driver")\
.option("url", self.connection_url) \
.option("dbtable", self.dbtable) \
.option("user", self.user) \
.option("password", self.password) \
.load()
return jdbcDF
def getRules(self):
if time() - self.lastMetaReadTime > self.refresh_frequency_sec:
self.rules = self.fetchRules()
self.lastMetaReadTime = time()
return self.rules
我哪里做错了?
1条答案
按热度按时间tzdcorbm1#
您在innit()方法中为'self.rules'调用了错误的方法。
您的初始方法应该是: