sparksqljava:向rdbms中插入数据

du7egjpx  于 2021-07-09  发布在  Spark
关注(0)|答案(0)|浏览(213)

披露:不是一个经验丰富的Spark开发。所以,一般代码反馈是受欢迎的。
我们正在尝试从日志中提取数据,并将提取的信息存储到mysql数据库中。如果id不存在,我们需要插入,如果已经存在,我们需要更新。我所做的是在本地工作的,但是当部署在emr上时,它不会写入db表。
这是密码,

public static Properties properties = loadProperties();

...

private static void save(SQLContext sqlContext, JavaRDD<Row> rows) {
    String queryString = "INSERT INTO "+ properties.getProperty("db.name")+".metrics " +
            "(id, start, end, success, sid) VALUES (?,?,?,?,?) " +
            "on DUPLICATE KEY " +
            "UPDATE end=?,success=?";

    Dataset<Row> rowDataset = sqlContext.createDataFrame(rows, getDBDataType());
    rowDataset.createOrReplaceTempView("temptable");
    int batchsize = (rowDataset.count()/10)>0 ? (int)(rowDataset.count()/10):1;
    rowDataset.coalesce(batchsize).foreachPartition(partition -> {
        try {
            Connection connection = DriverManager.getConnection(properties.getProperty("db.url")+"/"+properties.getProperty("db.dbName")+properties.getProperty("db.ConnectionParam")
                    , properties.getProperty("db.username"), properties.getProperty("db.password"));

            PreparedStatement finalPreparedStatement = connection.prepareStatement(queryString);
            partition.forEachRemaining(row -> {
                try {
                    if(finalPreparedStatement!=null){
                        finalPreparedStatement.setDouble(1, row.getDouble(0));
                        finalPreparedStatement.setTimestamp(2, row.getTimestamp(1));
                        finalPreparedStatement.setTimestamp(3, row.getTimestamp(2));
                        finalPreparedStatement.setBoolean(6, row.getBoolean(5));
                        finalPreparedStatement.setString(7, row.getString(6));

                        finalPreparedStatement.addBatch();
                    }
                } catch (SQLException throwables) {
                    throwables.printStackTrace();
                }
            });
            finalPreparedStatement.executeBatch();
            finalPreparedStatement.close();
            connection.close();
        } catch (SQLException throwables) {
            throwables.printStackTrace();
        }
    });
}

我做错了什么?我怎样才能解决这个问题?甚至调试这个?
同样,当我在本地计算机上运行此作业时,这也起作用。但部署到emr时不向db写入数据。db可以从spark集群访问,因为db和spark集群位于同一个虚拟私有云上。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题