尝试从python写入cassandra时,cql查询中出现语法错误

3wabscal  于 2021-06-15  发布在  Cassandra
关注(0)|答案(1)|浏览(351)

所以,我正在用python构建一个应用程序,它从twitter获取数据,然后保存到cassandra。我目前的问题在于一个脚本,它从Kafka读取数据并试图将其写入Cassandra,如下所示:

import threading, logging, time
import multiprocessing
from cassandra.cluster import Cluster

from kafka import KafkaConsumer, KafkaProducer

class Consumer(multiprocessing.Process):
   def __init__(self):
        multiprocessing.Process.__init__(self)
        self.stop_event = multiprocessing.Event()

    def stop(self):
         self.stop_event.set()

    def run(self):
       consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
                                 auto_offset_reset='earliest',
                                 consumer_timeout_ms=1000)
        consumer.subscribe(['twitter'])

    while not self.stop_event.is_set():
        for message in consumer:
            # session.execute(
            #     """
            #     INSERT INTO mensaje_73 (tweet)
            #     VALUES (message)
            #     """
            # )
            print(message)
            cluster = Cluster()
            session = cluster.connect('twitter')
            session.execute(
                    """
                    INSERT INTO mensaje_73 (tweet)
                    VALUES (message)
                    """
                )

            # if self.stop_event.is_set():
            #     break

    consumer.close()

   def main():

    tasks = [
        Consumer()
    ]

    for t in tasks:
        t.start()

    time.sleep(10)

    for task in tasks:
        task.stop()

if __name__ == "__main__":
     logging.basicConfig(
        format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:% 
   (levelname)s:%(process)d:%(message)s',
        level=logging.INFO
    )
    main()

我已经尝试将测试消息插入到表twitter.mensaje泷73中,效果非常好,如下所示:

import threading, logging, time
import multiprocessing
from cassandra.cluster import Cluster

from kafka import KafkaConsumer, KafkaProducer

cluster = Cluster()
session = cluster.connect('twitter')
session.execute(
    """
    INSERT INTO mensaje_73 (tweet)
    VALUES ('helooo')
    """
)

任何帮助都将不胜感激:)

eufgjt7s

eufgjt7s1#

所以这里的问题是,你的 message 变量在cql中被视为文本,没有单引号就无法工作。因此,错误。
为了解决这个问题,我将使用一个准备好的语句,然后绑定 message 致:

session = cluster.connect('twitter')
preparedTweetInsert = session.prepare(
        """
        INSERT INTO mensaje_73 (tweet)
        VALUES (?)
        """
    )
session.execute(preparedTweetInsert,[message])

试一下,看看有没有用。
而且,这似乎是一个简单的数据模型。但有一件事要问你自己,你将如何查询这些数据?除非 tweet 是你唯一的主键。这也意味着,你可以查询个人推文的唯一方法,就是通过消息的确切文本。需要考虑的是,按天对其进行分区可能是一个更好的选择,因为它可以很好地分布并提供更好的查询模型。

相关问题