我尝试用sparksql和sql语句创建数据库( "CREATE DATABASE IF NOT EXISTS test"
)在本地mysql中
SparkSession spark = SparkSession.builder().master("local[*]").appName("Kafka2MySQL_RDD").getOrCreate();
String createdbSql = "CREATE DATABASE IF NOT EXISTS test";
spark.sql(createdbSql);
spark.sql("show databases").show();
Properties jdbcProps = new Properties();
jdbcProps.put("user", "root"));
jdbcProps.put("password", "password");
JavaRDD<Row> row = inputRDD.map(e -> RowFactory.create(e.getAllValues()));
Dataset<Row> dataset = spark.createDataFrame(row, EntityMySQL.getStructType());
dataset.write().mode(SaveMode.Append).jdbc("mysql.output.uri", "test", jdbcProps);
``` `spark.sql("show databases").show();` 行打印名称空间,如下所示,
+---------+
|namespace|
+---------+
| default|
| test|
+---------+
柱头必须 `databases` 但印刷价值是 `namespace` . 所以呢 `dataset.write().mode(SaveMode.Append).jdbc` line抛出异常。
Exception in thread "main" java.sql.SQLSyntaxErrorException: Unknown database 'test'
我认为这个例外的一个原因是 `SparkSession` 对象在生成时没有与mysql连接的值。但是在这种情况下没有生成数据库,所以我不知道如何将mysql连接信息配置为 `SparkSession` 对象。如有任何答复,我们将不胜感激。致以最诚挚的问候
暂无答案!
目前还没有任何答案,快来回答吧!