使用p6spy记录spark jdbc数据源

cgh8pdjw  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(426)

我想记录我的spark作业在p6spy帮助下执行的jdbc语句。
使用p6spy通常很简单:插入字符串 :p6spy: 并将p6spy驱动程序类包含到应用程序的类路径中。之后,所有jdbc操作都将被记录到一个文件中。
例如,如果原始(mysql)连接字符串

jdbc:mysql://172.17.0.2:3306/spark_test

启用日志记录的连接字符串将是

jdbc:p6spy:mysql://172.17.0.2:3306/spark_test

我使用这行代码将Dataframe写入mysql表

df.write.mode(SaveMode.Overwrite).jdbc("jdbc:p6spy:mysql://172.17.0.2:3306/spark_test", "test_table", prop)

prop 包含数据库用户和密码。
这行代码失败并显示错误消息

Exception in thread "main" java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax;
check the manual that corresponds to your MySQL server version for the right syntax to use near '"value" INTEGER NOT NULL)' at line 1

没有 :p6spy: 连接字符串中的一部分,一切正常。

我目前的发现

错误的原因是spark试图执行该语句

CREATE TABLE test_table ("value" INTEGER NOT NULL)

在列名周围包含**“”。对于mysql,正确的字符应该是**。 spark可以处理不同的sql方言。方言在org.apache.spark.sql.jdbc包中实现。要使用的方言取决于数据库的jdbc url。每个方言对象实现该方法canHandle(url : String). mysqldialect处理以jdbc:mysql但不是那些jdbc:p6spy:mysql` . 不幸的是,对于未知的url类型,spark默认为noopdialect。此方言在列名周围添加了**“**”。

可能的解决方案

可以通过调用jdbcdaluens.registerdialect来注册新的数据库方言。这里可以注册一个新的方言来实现 canHandle 方法as

override def canHandle(url: String): Boolean = url.startsWith("jdbc:p6spy:mysql")

然后将所有其他方法调用委托给原始mysql方言。
不幸的是,mysqldialect对象被声明为

private case object MySQLDialect extends JdbcDialect {
  ...
}

所以我自己的方言实现不能直接使用mysqldialect。一种选择是将mysqldialect的代码复制到我自己的dialect对象中(代码不长),但我希望避免复制代码。
还有其他选择吗?
我使用的是spark 2.4.5

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题