我想记录我的spark作业在p6spy帮助下执行的jdbc语句。
使用p6spy通常很简单:插入字符串 :p6spy:
并将p6spy驱动程序类包含到应用程序的类路径中。之后,所有jdbc操作都将被记录到一个文件中。
例如,如果原始(mysql)连接字符串
jdbc:mysql://172.17.0.2:3306/spark_test
启用日志记录的连接字符串将是
jdbc:p6spy:mysql://172.17.0.2:3306/spark_test
我使用这行代码将Dataframe写入mysql表
df.write.mode(SaveMode.Overwrite).jdbc("jdbc:p6spy:mysql://172.17.0.2:3306/spark_test", "test_table", prop)
与 prop
包含数据库用户和密码。
这行代码失败并显示错误消息
Exception in thread "main" java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax;
check the manual that corresponds to your MySQL server version for the right syntax to use near '"value" INTEGER NOT NULL)' at line 1
没有 :p6spy:
连接字符串中的一部分,一切正常。
我目前的发现
错误的原因是spark试图执行该语句
CREATE TABLE test_table ("value" INTEGER NOT NULL)
在列名周围包含**“”。对于mysql,正确的字符应该是**。 spark可以处理不同的sql方言。方言在org.apache.spark.sql.jdbc包中实现。要使用的方言取决于数据库的jdbc url。每个方言对象实现该方法
canHandle(url : String). mysqldialect处理以
jdbc:mysql但不是那些
jdbc:p6spy:mysql` . 不幸的是,对于未知的url类型,spark默认为noopdialect。此方言在列名周围添加了**“**”。
可能的解决方案
可以通过调用jdbcdaluens.registerdialect来注册新的数据库方言。这里可以注册一个新的方言来实现 canHandle
方法as
override def canHandle(url: String): Boolean = url.startsWith("jdbc:p6spy:mysql")
然后将所有其他方法调用委托给原始mysql方言。
不幸的是,mysqldialect对象被声明为
private case object MySQLDialect extends JdbcDialect {
...
}
所以我自己的方言实现不能直接使用mysqldialect。一种选择是将mysqldialect的代码复制到我自己的dialect对象中(代码不长),但我希望避免复制代码。
还有其他选择吗?
我使用的是spark 2.4.5
暂无答案!
目前还没有任何答案,快来回答吧!