spark cassandra写入cqlsh中未显示的行

plicqrtu  于 2021-06-10  发布在  Cassandra
关注(0)|答案(0)|浏览(261)

我正在使用sparks cassandra连接器将流式消息写入cassandra列族。即使日志显示为正在写入表的数据,在查询数据时,也不会显示。
下面是正在运行的软件堆栈。
java 1.8、cassandra 3.11.4、spark 2.4.3
我使用下面的代码来编写数据集中的数据,

dataset.write().format("org.apache.spark.sql.cassandra")
     .option("table", tableName)
     .option("keyspace", keySpace)
     .mode(SaveMode.Append).save();

日志消息指示记录正在成功写入,如下所示,

[Executor task launch worker for task 58915] INFO  2020-01-27 08:49:25,608 com.datastax.spark.connector.writer.TableWriter  - Wrote 7 rows to keyspace.ssl_connections_1_2020 in 0.011 s.
[Executor task launch worker for task 58916] INFO  2020-01-27 08:49:25,612 com.datastax.spark.connector.writer.TableWriter  - Wrote 9 rows to keyspace.ssl_connections_1_2020 in 0.011 s.
[Executor task launch worker for task 58919] DEBUG 2020-01-27 08:49:25,619 com.datastax.spark.connector.writer.TableWriter  - Writing data partition to keyspace.ssl_connections_1_2020 in batches of BytesInBatch(1024)

如果您能在调试问题的过程中提供帮助,我们将不胜感激。
编辑表格结构

connections_timeseries_1_2020 (
    srcip text,
    day int,
    hour int,
    id text,
    connectionstate text,
    connectiontimestamp timestamp,
    dayofweek int,
    destip text,
    destipbytes double,
    destmac text,
    destpackets double,
    destport int,
    duration double,
    history text,
    localdest boolean,
    localsrc boolean,
    minute int,
    missedbytes double,
    month int,
    protocol text,
    receivedbytes double,
    reportedsensor text,
    services list<text>,
    srcipbytes double,
    srcmac text,
    srcpackets double,
    srcport int,
    transmittedbytes double,
    vlan int,
    weekofyear int,
    year int,
    PRIMARY KEY ((srcip, day), hour, id)
) WITH CLUSTERING ORDER BY (hour ASC, id ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': '500'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';

行最初被保存,但在日期更改时停止,消息的类型是json,它在保存到cassandra之前被转换为spark中的数据集。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题