在使用cqlengine python库在cassandra中更改了表的模式之后,我尝试将行插入到表中。更改前,模型看起来像:
class MetricsByDevice(Model):
device = columns.Text(primary_key=True, partition_key=True)
datetime = columns.DateTime(primary_key=True, clustering_order="DESC")
load_power = columns.Double()
inverter_power = columns.Double()
我将模式更改为这样,添加了四列(dso、node、park和commercializer):
class MetricsByDevice(Model):
device = columns.Text(primary_key=True, partition_key=True)
datetime = columns.DateTime(primary_key=True, clustering_order="DESC")
DSO = columns.Text(index=True, default='DSO_1'),
node = columns.Text(index=True, default='Node_1'),
park = columns.Integer(index=True, default=6),
commercializer = columns.Text(index=True, default='Commercializer_1'),
load_power = columns.Double()
inverter_power = columns.Double()
然后,我将表与包含行的脚本同步
sync_table(MetricsByDate)
我检查了数据库,已经创建了四列。现有行的这些字段的值为null(如预期的那样)。
然后我修改了负责在批处理行中插入的脚本,包括与新字段对应的值。它看起来像:
batch = BatchQuery()
for idx, message in enumerate(consumer):
data = message.value
ts_to_insert = dateutil.parser.parse(data['timestamp'])
filters = get_filters(message.partition_key)
MetricsByDate.batch(batch).create(
device=device,
date=str(ts_to_insert.date()),
time=str(ts_to_insert.time()),
created_at=now,
DSO=str(filters['DSO']),
node=str(filters['node']),
park=int(filters['park']),
commercializer=str(filters['commercializer']),
load_power=data['loadPower'],
inverter_power=data['inverterPower'],
)
if idx % 100 == 0: # Insert every 100 messages
batch.execute()
# Reset batch
batch = BatchQuery()
我已经检查了与新字段对应的值是否为none,并且具有正确的类型。尽管如此,它仍然正确地插入了所有行,但新字段中的值在cassandra中为空。
批插入不会返回任何错误。我不知道我是否遗漏了什么,或者是否需要执行额外的步骤来更新模式。我一直在查文件,但找不到任何有用的。
我做错什么了吗?
编辑
在亚历克斯·奥特的建议之后,我已经一行一行地插入了。将代码更改为:
for idx, message in enumerate(consumer):
data = message.value
ts_to_insert = dateutil.parser.parse(data['timestamp'])
filters = get_filters(message.partition_key)
metrics_by_date = MetricsByDate(
device=device,
date=str(ts_to_insert.date()),
time=str(ts_to_insert.time()),
created_at=now,
DSO=str(filters['DSO']),
node=str(filters['node']),
park=int(filters['park']),
commercializer=str(filters['commercializer']),
load_power=data['loadPower'],
inverter_power=data['inverterPower'],
)
metrics_by_date.save()
如果在执行行之前 metrics_by_date.save()
我添加这些打印语句:
print(metrics_by_date.DSO)
print(metrics_by_date.park)
print(metrics_by_date.load_power)
print(metrics_by_date.device)
print(metrics_by_date.date)
输出为:
(<cassandra.cqlengine.columns.Text object at 0x7ff0b492a670>,)
(<cassandra.cqlengine.columns.Integer object at 0x7ff0b492d190>,)
256.99
SQ3-3.2.3.1-70-17444
2020-04-22
在新字段中,我得到一个cassandra对象,但在其他字段中,我得到它们的值。它可能是一个线索,因为它继续在新列中插入null。
1条答案
按热度按时间pwuypxnk1#
我终于明白了。
这是一件愚蠢的事情,在模型定义中,不是因为knwon的原因,我在单独的字段中添加了逗号,而不是换行符。。。因此,将模型定义更正为:
真管用!!