我正在尝试从googledataflow连接到awsrdsmysql示例。我创建了一个java程序来创建管道。作业创建成功,但mysql连接总是失败,并出现以下错误:
java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
at com.google.cloud.dataflow.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:338)
at com.google.cloud.dataflow.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:308)
at com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:63)
at com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:50)
at com.google.cloud.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:87)
at com.google.cloud.dataflow.worker.MapTaskExecutorFactory.create(MapTaskExecutorFactory.java:154)
at com.google.cloud.dataflow.worker.DataflowWorker.doWork(DataflowWorker.java:308)
at com.google.cloud.dataflow.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:264)
at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:133)
at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:113)
at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:100)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.beam.sdk.util.UserCodeException: com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn$DoFnInvoker.invokeSetup(Unknown Source)
at com.google.cloud.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.deserializeCopy(DoFnInstanceManagers.java:63)
at com.google.cloud.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.peek(DoFnInstanceManagers.java:45)
at com.google.cloud.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:94)
at com.google.cloud.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:74)
at com.google.cloud.dataflow.worker.MapTaskExecutorFactory.createParDoOperation(MapTaskExecutorFactory.java:415)
at com.google.cloud.dataflow.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:326)
... 14 more
Caused by: com.mysql.cj.jdbc.exceptions.CommunicationsException:
Communications link failure
Caused by: java.net.SocketTimeoutException: connect timed out
java源代码如下:
public class MySQLToBQ {
public static void main(String[] args) throws Exception {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setProject("project_name");
options.setStagingLocation("gs://staging");
options.setTempLocation("gs://temp");
options.setRunner(DataflowRunner.class);
options.setJobName("MySQL-To-BQ-" + new SimpleDateFormat("yyyyMMdd-HHmmss").format(new Date()));
System.out.println("Job Name " + options.getJobName());
Pipeline p = Pipeline.create(options);
DataSourceConfiguration mysqlConfig = JdbcIO.DataSourceConfiguration.create(
"com.mysql.cj.jdbc.Driver", "jdbc:mysql://mysql_host:3306/mysql_database")
.withUsername("user")
.withPassword("password");
p.apply("mysql_source", JdbcIO.<SourceRow>read()
.withDataSourceConfiguration(mysqlConfig)
.withQuery("query")
.withCoder(SerializableCoder.of(SourceRow.class))
.withRowMapper(new JdbcIO.RowMapper<SourceRow>() {
@Override
public SourceRow mapRow(ResultSet resultSet) throws Exception {
SourceRow datarow = new SourceRow();
ResultSetMetaData rsmd = resultSet.getMetaData();
for(int i = 1; i <= rsmd.getColumnCount(); i++) {
datarow.add(rsmd.getColumnName(i), resultSet.getString(i));
}
return datarow;
}
}
)
)
.apply(table + "_transform", ParDo.of(new TransformToTableRow()))
.apply(table + "_destination", BigQueryIO.writeTableRows()
.to("table_name")
.withSchema(getSchema())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
);
p.run();
}
}
我能够创建一个计算引擎vm示例,并从那里成功地连接到mysql数据库。
2条答案
按热度按时间isr3a4wc1#
在dataflow上,不能将ip列入白名单,以使dataflow能够访问sql示例。我不确定awsrds是否支持cloudsql,所以应该改用jdbcsocketfactoryhttps://cloud.google.com/sql/docs/mysql/connect-external-app#java
nuypyhwy2#
对于java,您可以使用公共访问并使用以下命令:https://github.com/googlecloudplatform/cloud-sql-jdbc-socket-factory.