披露:不是一个经验丰富的Spark开发。所以,一般代码反馈是受欢迎的。
我们正在尝试从日志中提取数据,并将提取的信息存储到mysql数据库中。如果id不存在,我们需要插入,如果已经存在,我们需要更新。我所做的是在本地工作的,但是当部署在emr上时,它不会写入db表。
这是密码,
public static Properties properties = loadProperties();
...
private static void save(SQLContext sqlContext, JavaRDD<Row> rows) {
String queryString = "INSERT INTO "+ properties.getProperty("db.name")+".metrics " +
"(id, start, end, success, sid) VALUES (?,?,?,?,?) " +
"on DUPLICATE KEY " +
"UPDATE end=?,success=?";
Dataset<Row> rowDataset = sqlContext.createDataFrame(rows, getDBDataType());
rowDataset.createOrReplaceTempView("temptable");
int batchsize = (rowDataset.count()/10)>0 ? (int)(rowDataset.count()/10):1;
rowDataset.coalesce(batchsize).foreachPartition(partition -> {
try {
Connection connection = DriverManager.getConnection(properties.getProperty("db.url")+"/"+properties.getProperty("db.dbName")+properties.getProperty("db.ConnectionParam")
, properties.getProperty("db.username"), properties.getProperty("db.password"));
PreparedStatement finalPreparedStatement = connection.prepareStatement(queryString);
partition.forEachRemaining(row -> {
try {
if(finalPreparedStatement!=null){
finalPreparedStatement.setDouble(1, row.getDouble(0));
finalPreparedStatement.setTimestamp(2, row.getTimestamp(1));
finalPreparedStatement.setTimestamp(3, row.getTimestamp(2));
finalPreparedStatement.setBoolean(6, row.getBoolean(5));
finalPreparedStatement.setString(7, row.getString(6));
finalPreparedStatement.addBatch();
}
} catch (SQLException throwables) {
throwables.printStackTrace();
}
});
finalPreparedStatement.executeBatch();
finalPreparedStatement.close();
connection.close();
} catch (SQLException throwables) {
throwables.printStackTrace();
}
});
}
我做错了什么?我怎样才能解决这个问题?甚至调试这个?
同样,当我在本地计算机上运行此作业时,这也起作用。但部署到emr时不向db写入数据。db可以从spark集群访问,因为db和spark集群位于同一个虚拟私有云上。
暂无答案!
目前还没有任何答案,快来回答吧!