我正在使用flink(v1.10.0)任务作业,这是我的gradle(v6.0.1)构建脚本:
project(":flink-jobs:rdw-flinkjob-consume") {
archivesBaseName = "rdw-flinkjob-consume"
version = "1.0.0-SNAPSHOT"
bootJar {
manifest {
attributes 'Start-Class': 'com.sportswin.rdw.StreamingJob',
'Entry-Class': 'com.sportswin.rdw.StreamingJob'
}
}
dependencies {
implementation "org.apache.flink:flink-java:${flinkVersion}"
implementation "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
implementation "log4j:log4j:${log4jVersion}"
implementation "org.slf4j:slf4j-log4j12:${slf4jVersion}"
implementation "com.rabbitmq:amqp-client:4.2.0"
implementation "com.alibaba:fastjson:${fastjsonVersion}"
implementation "org.apache.flink:flink-connector-rabbitmq_${scalaBinaryVersion}:1.10.0"
}
}
但现在入门课还是 org.springframework.boot.loader.JarLauncher
,任务管理器显示错误:
2020-04-05 05:27:44,157 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - Unhandled exception.
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: null
这是我的任务定义代码:
int windowSize = 10;
long delay = 5100L;
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStream<Tuple3<String, String, Long>> leftSource = env.addSource(new StreamDataSource()).name("Demo Source");
DataStream<Tuple3<String, String, Long>> rightSource = env.addSource(new StreamDataSource1()).name("Demo Source");
DataStream<Tuple3<String, String, Long>> leftStream = leftSource. assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, String, Long>>(Time.milliseconds(delay)) {
@Override
public long extractTimestamp(Tuple3<String, String, Long> element) {
return element.f2;
}
}
);
DataStream<Tuple3<String, String, Long>> rigjhtStream = rightSource.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, String, Long>>(Time.milliseconds(delay)) {
@Override
public long extractTimestamp(Tuple3<String, String, Long> element) {
return element.f2;
}
}
);
leftStream.join(rigjhtStream)
.where(new LeftSelectKey())
.equalTo(new RightSelectKey())
.window(TumblingEventTimeWindows.of(Time.seconds(windowSize)))
.apply(new JoinFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>, Tuple5<String, String, String, Long, Long>>() {
@Override
public Tuple5<String, String, String, Long, Long> join(Tuple3<String, ng, Long> first, Tuple3<String, String, Long> second) {
return new Tuple5<>(first.f0, first.f1, second.f1, first.f2, second.f2);
}
}).print();
env.execute("TimeWindowDemo");
任务ui如下所示:
那么,我应该怎么做才能将entry类指定为 com.sportswin.rdw.StreamingJob
要解决问题并使简单的演示任务正常工作?
暂无答案!
目前还没有任何答案,快来回答吧!