使用hivestoragehandler的kafka生产者

5lhxktic  于 2021-06-01  发布在  Hadoop
关注(0)|答案(2)|浏览(635)

我对hive/hadoop比较陌生
我在看这个Hive的存储处理器。
现在,我尝试编写一个hivestoragehandler的定制实现,用于使用配置单元表查询消息并将消息推送到kafka。
我看到了hivestoragehandler的其他实现,它允许我们使用配置单元表对nosql数据库进行查询和写入。
我正试图为Kafka复制这一点。我在上面找到了一个项目
hiveka-使用hive查询kafka
在这里,他们试图使用配置单元表上的查询从kafka读取数据。我想用insert on the table来写Kafka主题。
有人能指导我吗?

mspsb9vt

mspsb9vt1#

如果我理解正确的话,你想从Hive中读取事件,然后推到Kafka。我没有使用存储处理程序的经验,但我建议编写适当的代码来生成kafka,然后将这些事件提供给hadoop/hive。
在kafka中有一个名为kafka connect的框架,它可以写入外部系统。confluent为hdfs编写了这样一个连接器,它通过在文件写入hdfs时更新配置单元元存储来提供配置单元支持。
不需要编写存储处理程序,就可以尝试使用jdbc源连接器,或者使用spark/flink从配置单元读取数据并推入kafka。
不过,一般来说,hadoop是cdc事件的目标,而不是它的生成源。主要是因为查询速度太慢。。。如果要在insert上创建事件,通常需要一些表扫描,因此从cassandra/hbase生成事件可能是更好的选择

zmeyuzjn

zmeyuzjn2#

我想用insert on the table来写Kafka主题。
这可以使用kafka hivestoragehandler。下面是这个特性可能的一般用例
Kafka主题查询
从kafka主题中查询数据并插入到配置单元托管/外部表中
从Kafka主题中查询数据并推送到其他Kafka主题中
从配置单元外部/托管表查询数据并推入kafka主题
您正在尝试执行第三个用例。
首先为源和目标kafka主题创建两个外部表。

create external table if not exists source_topic_table
(
<fields>
)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES (
'kafka.topic'='source_topic_name',
'kafka.bootstrap.servers'=''
);

create external table if not exists target_topic_table
(
<fields>
)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES (
'kafka.topic'='target_topic_name',
'kafka.bootstrap.servers'=''
);

然后使用合并查询将数据插入到目标Kafka主题中

merge into target_topic_table
using (
select
<columns>,
cast(null as binary) as `__key`,
cast(null as int) as `__partition`,
cast(-1 as bigint) as `__offset`,
cast(to_epoch_milli(current_timestamp) as bigint) as `__timestamp`
from source_topic_table
) sub
on
sub.column_name = target_topic_table.coulmn_name <Some condition>
when not matched then insert values
(
<sub.columns>,
sub.`__key`,sub.`__partition`,sub.`__offset`,sub.`__timestamp`
);

注:
使用配置单元外部非本机表
除了用户定义的有效负载模式之外,kafka存储处理程序还附加了4个列(\u key、\u partition、\u offset、\u timestmap),用户可以使用这些列来查询kafka元数据字段
如果数据不是csv格式,用户必须设置“kafka.serde.class”表属性
用户还可以设置“kafka.write.semantic”表属性,该属性允许无值、至少\u一次值或正好\u一次值。

相关问题