mysql—java应用程序中使用jdbc并行读取海量数据的标准算法或模式

eqqqjvef  于 2021-06-25  发布在  Mysql
关注(0)|答案(1)|浏览(391)

下面是一个从mysql读取数据并存储在csv文件中的简单程序。如果查询返回的记录超过1000万条,那么它的速度会很慢。
我完全理解,要并行进行,我们需要像
从查询中获取记录计数(从用户中选择*
然后用适当的方法将查询分解为并行块(从state='ca'的用户中选择*
然后,数据可以在50个线程中并行读取,也可以跨进程分布。
apachespark使用具有上下限和分区数的partition\u列,如下所示。
我很想知道是否有一种方法/模式/算法可以在非spark应用程序中并行地获得大量数据。不过,我将查看下面实现的spark代码。
https://medium.com/@radek.strnad/tips-for-use-jdbc-in-apache-spark-sql-396ea7b2e3d3

spark.read("jdbc")
  .option("url", url)
  .option("dbtable", "pets")
  .option("user", user)
  .option("password", password)
  .option("numPartitions", 10)
  .option("partitionColumn", "owner_id")
  .option("lowerBound", 1)
  .option("upperBound", 10000)
  .load()

SELECT * FROM pets WHERE owner_id >= 1 and owner_id < 1000
SELECT * FROM pets WHERE owner_id >= 1000 and owner_id < 2000
SELECT * FROM pets WHERE owner_id >= 2000 and owner_id < 3000

简单的mysql代码,用于读取和存储csv文件中的数据

public static void main(String[] args)
{
    try
    {
        String myDriver = "org.gjt.mm.mysql.Driver";
        String myUrl = "jdbc:mysql://localhost/test";
        Class.forName(myDriver);
        Connection conn = DriverManager.getConnection(myUrl, "root", "");
        String query = "SELECT * FROM users";
        Statement st = conn.createStatement();
        ResultSet rs = st.executeQuery(query);
        StringBuilder sb = new StringBuilder();
        while (rs.next())
        {
            int id = rs.getInt("id");
            String firstName = rs.getString("first_name");
            String lastName = rs.getString("last_name");
            Date dateCreated = rs.getDate("date_created");
            boolean isAdmin = rs.getBoolean("is_admin");
            int numPoints = rs.getInt("num_points");
            sb.append(String.format("%s, %s, %s, %s, %s, %s\n", id, firstName, lastName, dateCreated, isAdmin, numPoints));
        }

        try (FileOutputStream oS = new FileOutputStream(new File("aFile.csv"))) {
            oS.write(sb.toString().getBytes());
        } catch (IOException e) {
            e.printStackTrace();
        }
        st.close();
    }
    catch (Exception e)
    {
        System.err.println("Got an exception! ");
        System.err.println(e.getMessage());
    }
}
y53ybaqx

y53ybaqx1#

这并不能准确回答你的问题,但是 SELECT DATA INTO OUTFILE 可以帮助您快速导出数据。
这是一个生成csv文件的命令示例,

SELECT * 
  INTO OUTFILE '/some/path/to/users.csv'
FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '"'
 LINES TERMINATED BY '\n'
  FROM users;

这使用一个快速路径将数据写入文件系统,可能比线程方法更快。编程当然更容易。
在如此大容量的查询之前使用 SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED; 以避免阻塞对表的插入和更新。
如果您将使用多个java线程来检索数据,我建议您使用以下策略:
在生成线程之前,请确定最大 id 执行此查询得到的值: SELECT MAX(id) FROM users; 决定你将产生多少线程。太多的线程会适得其反,因为它们会使mysql服务器过载。50个线程与mysql服务器的连接太多了。用四个或八个。
给每个线程分配自己的一段 id 要检索的值。例如,如果有一千万行和四个线程,那么段将是[1-2500000]、[2500001-5000000]、[5000001-7500000]和[7500001-10000000]。
在每个线程中,打开一个到mysql的jdbc连接,然后 WHERE id BETWEEN segmentstart AND segmentfinish 选择正确的行(mysql连接不是线程安全对象)。
SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED; 在选择查询之前。 id (大概)是 users table,所以 WHERE 使用它进行过滤将非常有效。

相关问题