下面是一个从mysql读取数据并存储在csv文件中的简单程序。如果查询返回的记录超过1000万条,那么它的速度会很慢。
我完全理解,要并行进行,我们需要像
从查询中获取记录计数(从用户中选择*
然后用适当的方法将查询分解为并行块(从state='ca'的用户中选择*
然后,数据可以在50个线程中并行读取,也可以跨进程分布。
apachespark使用具有上下限和分区数的partition\u列,如下所示。
我很想知道是否有一种方法/模式/算法可以在非spark应用程序中并行地获得大量数据。不过,我将查看下面实现的spark代码。
https://medium.com/@radek.strnad/tips-for-use-jdbc-in-apache-spark-sql-396ea7b2e3d3
spark.read("jdbc")
.option("url", url)
.option("dbtable", "pets")
.option("user", user)
.option("password", password)
.option("numPartitions", 10)
.option("partitionColumn", "owner_id")
.option("lowerBound", 1)
.option("upperBound", 10000)
.load()
SELECT * FROM pets WHERE owner_id >= 1 and owner_id < 1000
SELECT * FROM pets WHERE owner_id >= 1000 and owner_id < 2000
SELECT * FROM pets WHERE owner_id >= 2000 and owner_id < 3000
简单的mysql代码,用于读取和存储csv文件中的数据
public static void main(String[] args)
{
try
{
String myDriver = "org.gjt.mm.mysql.Driver";
String myUrl = "jdbc:mysql://localhost/test";
Class.forName(myDriver);
Connection conn = DriverManager.getConnection(myUrl, "root", "");
String query = "SELECT * FROM users";
Statement st = conn.createStatement();
ResultSet rs = st.executeQuery(query);
StringBuilder sb = new StringBuilder();
while (rs.next())
{
int id = rs.getInt("id");
String firstName = rs.getString("first_name");
String lastName = rs.getString("last_name");
Date dateCreated = rs.getDate("date_created");
boolean isAdmin = rs.getBoolean("is_admin");
int numPoints = rs.getInt("num_points");
sb.append(String.format("%s, %s, %s, %s, %s, %s\n", id, firstName, lastName, dateCreated, isAdmin, numPoints));
}
try (FileOutputStream oS = new FileOutputStream(new File("aFile.csv"))) {
oS.write(sb.toString().getBytes());
} catch (IOException e) {
e.printStackTrace();
}
st.close();
}
catch (Exception e)
{
System.err.println("Got an exception! ");
System.err.println(e.getMessage());
}
}
1条答案
按热度按时间y53ybaqx1#
这并不能准确回答你的问题,但是
SELECT DATA INTO OUTFILE
可以帮助您快速导出数据。这是一个生成csv文件的命令示例,
这使用一个快速路径将数据写入文件系统,可能比线程方法更快。编程当然更容易。
在如此大容量的查询之前使用
SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;
以避免阻塞对表的插入和更新。如果您将使用多个java线程来检索数据,我建议您使用以下策略:
在生成线程之前,请确定最大
id
执行此查询得到的值:SELECT MAX(id) FROM users;
决定你将产生多少线程。太多的线程会适得其反,因为它们会使mysql服务器过载。50个线程与mysql服务器的连接太多了。用四个或八个。给每个线程分配自己的一段
id
要检索的值。例如,如果有一千万行和四个线程,那么段将是[1-2500000]、[2500001-5000000]、[5000001-7500000]和[7500001-10000000]。在每个线程中,打开一个到mysql的jdbc连接,然后
WHERE id BETWEEN segmentstart AND segmentfinish
选择正确的行(mysql连接不是线程安全对象)。放
SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;
在选择查询之前。id
(大概)是users
table,所以WHERE
使用它进行过滤将非常有效。