如何将流数据集写入hive?

rbpvctlc  于 2021-06-26  发布在  Hive
关注(0)|答案(1)|浏览(453)

这个问题在这里已经有答案了

如何将spark结构化流Dataframe插入到配置单元外部表/位置(3个答案)
两年前关门了。
使用apachespark2.2:结构化流,我正在创建一个程序,从kafka读取数据并将其写入hive。我正在寻找写大量数据传入Kafka主题@100记录/秒。
已创建配置单元表:

CREATE TABLE demo_user( timeaa BIGINT, numberbb INT, decimalcc DOUBLE, stringdd STRING, booleanee BOOLEAN ) STORED AS ORC ;

通过手动配置单元查询插入:

INSERT INTO TABLE demo_user (1514133139123, 14, 26.4, 'pravin', true);

通过spark结构化流代码插入:

SparkConf conf = new SparkConf();
conf.setAppName("testing");
conf.setMaster("local[2]");
conf.set("hive.metastore.uris", "thrift://localhost:9083");
SparkSession session = 
SparkSession.builder().config(conf).enableHiveSupport().getOrCreate();

// workaround START: code to insert static data into hive
String insertQuery = "INSERT INTO TABLE demo_user (1514133139123, 14, 26.4, 'pravin', true)";
session.sql(insertQuery);
// workaround END:

// Solution START
Dataset<Row> dataset = readFromKafka(sparkSession); // private method reading data from Kafka's 'xyz' topic

//**My question here:**
// some code which writes dataset into hive table demo_user
// Solution END
hts6caw3

hts6caw31#

在使用以下命令时,不需要创建配置单元表,这是自动创建的
dataset.write.jdbc(字符串url,字符串表,java.util.properties connectionproperties)
或使用
dataset.write.saveastable(字符串tablename)

相关问题