如何在gradle中指定flink task job的入口类

mxg2im7a  于 2021-06-24  发布在  Flink
关注(0)|答案(0)|浏览(193)

我正在使用flink(v1.10.0)任务作业,这是我的gradle(v6.0.1)构建脚本:

project(":flink-jobs:rdw-flinkjob-consume") {
    archivesBaseName = "rdw-flinkjob-consume"
    version = "1.0.0-SNAPSHOT"

    bootJar {
        manifest {
            attributes 'Start-Class': 'com.sportswin.rdw.StreamingJob',
                    'Entry-Class': 'com.sportswin.rdw.StreamingJob'
        }
    }

    dependencies {
        implementation "org.apache.flink:flink-java:${flinkVersion}"
        implementation "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
        implementation "log4j:log4j:${log4jVersion}"
        implementation "org.slf4j:slf4j-log4j12:${slf4jVersion}"
        implementation "com.rabbitmq:amqp-client:4.2.0"
        implementation "com.alibaba:fastjson:${fastjsonVersion}"
        implementation "org.apache.flink:flink-connector-rabbitmq_${scalaBinaryVersion}:1.10.0"
    }
}

但现在入门课还是 org.springframework.boot.loader.JarLauncher ,任务管理器显示错误:

2020-04-05 05:27:44,157 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Unhandled exception.
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: null

这是我的任务定义代码:

int windowSize = 10;
long delay = 5100L;

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);

DataStream<Tuple3<String, String, Long>> leftSource = env.addSource(new StreamDataSource()).name("Demo Source");
DataStream<Tuple3<String, String, Long>> rightSource = env.addSource(new StreamDataSource1()).name("Demo Source");

DataStream<Tuple3<String, String, Long>> leftStream = leftSource. assignTimestampsAndWatermarks(
    new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, String, Long>>(Time.milliseconds(delay)) {
        @Override
        public long extractTimestamp(Tuple3<String, String, Long> element) {
            return element.f2;
        }
    }
);

DataStream<Tuple3<String, String, Long>> rigjhtStream = rightSource.assignTimestampsAndWatermarks(
    new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, String, Long>>(Time.milliseconds(delay)) {
        @Override
        public long extractTimestamp(Tuple3<String, String, Long> element) {
            return element.f2;
        }
    }
);

leftStream.join(rigjhtStream)
    .where(new LeftSelectKey())
    .equalTo(new RightSelectKey())
    .window(TumblingEventTimeWindows.of(Time.seconds(windowSize)))
    .apply(new JoinFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>, Tuple5<String, String, String, Long, Long>>() {
        @Override
        public Tuple5<String, String, String, Long, Long> join(Tuple3<String, ng, Long> first, Tuple3<String, String, Long> second) {
            return new Tuple5<>(first.f0, first.f1, second.f1, first.f2, second.f2);
        }
    }).print();

env.execute("TimeWindowDemo");

任务ui如下所示:

那么,我应该怎么做才能将entry类指定为 com.sportswin.rdw.StreamingJob 要解决问题并使简单的演示任务正常工作?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题