import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.util.Arrays;
public class Sample {
private static final Logger LOGGER = LoggerFactory.getLogger(Sample.class);
public static void main(String[] args) {
//Stage 1
new Sample.run("/path");
//Stage 2
new Sample.jdbcTest();
}
public jdbcTest(){
try {
Class.forName("com.mysql.jdbc.Driver");
Connection con=DriverManager.getConnection("jdbc:mysql://localhost:3306/sonoo","root","root");
Statement stmt=con.createStatement();
ResultSet rs=stmt.executeQuery("select * from emp");
ResultSet rs1=stmt.executeQuery("select * from emp_sal");
while(rs.next())
System.out.println(rs.getInt(1)+" "+rs.getString(2)+" "+rs.getString(3));
con.close();
} catch(Exception e){
System.out.println(e);
}
}
public void run(String inputFilePath) {
String master = "local[*]";
SparkConf conf = new SparkConf()
.setAppName(Sample.class.getName())
.setMaster(master);
JavaSparkContext context = new JavaSparkContext(conf);
context.textFile(inputFilePath)
.flatMap(text -> Arrays.asList(text.split(" ")).iterator())
.mapToPair(word -> new Tuple2<>(word, 1))
.reduceByKey((a, b) -> a + b)
.foreach(result -> LOGGER.info(String.format("Word [%s] count [%d].", result._1(), result._2)));
}
}
rs工作成功。rs1根本不执行。问题是,spark是否真的关心非spark任务,当它们作为spark submit命令的一部分提交时。它真的要等到mysql执行dml等吗。
暂无答案!
目前还没有任何答案,快来回答吧!