flink在rabbitmq中未确认的消息

x8goxv8g  于 2021-06-26  发布在  Flink
关注(0)|答案(0)|浏览(403)

我让flink以rabbitmq作为源运行,我的配置中的某些东西可能会导致rabbitmq中出现未确认的消息,然后最终一切都会崩溃。这是我的密码:
检查点配置:

Configuration conf = new Configuration();
    conf.setInteger("rest.port", PropertyFileReader.getRestPort());
    conf.setString("web.log.path", PropertyFileReader.getFlinkDashboardLogPath());
    conf.setString("taskmanager.memory.flink.size", PropertyFileReader.getFlinkMemory());
    conf.setString("taskmanager.memory.process.size", PropertyFileReader.getFlinkMemory());
    conf.setInteger("taskmanager.data.port", PropertyFileReader.getFlinkTaskmanagerDataPort());

    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(8, conf);
    //Environment configurations
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    env.getConfig().setAutoWatermarkInterval(500);
    env.setMaxParallelism(8);
    env.setBufferTimeout(1000);
    env.getConfig().setUseSnapshotCompression(true);
    env.getConfig().disableSysoutLogging();
    env.getConfig().enableObjectReuse();
    if (PropertyFileReader.isCheckpointing_enabled()) {
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.setStateBackend(new FsStateBackend(new Path(PropertyFileReader.getCheckpointing_path()).toUri(), true));
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.enableCheckpointing(3000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(6000);
        env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
    }
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
            10, // number of restart attempts
            Time.of(5, TimeUnit.SECONDS) // delay
    ));

消费者代码:

connectionConfig = new RMQConnectionConfig.Builder()
            .setHost(PropertyFileReader.getRbt_host())
            .setPort(PropertyFileReader.getRbt_port())
            .setUserName(PropertyFileReader.getRbt_username())
            .setPassword(PropertyFileReader.getRbt_password())
            .setVirtualHost(PropertyFileReader.getRbt_virtualhost())
            .setTopologyRecoveryEnabled(true)
            .setAutomaticRecovery(true)
            .setRequestedChannelMax(PropertyFileReader.getMax_channel())
            .setRequestedChannelMax(PropertyFileReader.getRequested_max_channel())
            .setNetworkRecoveryInterval(1)
            .build();

 public static DataStream<String> eventStreamObject(StreamExecutionEnvironment env) {
    return env.addSource(new RMQSource<>(
            connectionConfig,
            PropertyFileReader.getRbt_queue(),
            true,
            new SimpleStringSchema()))
            .setParallelism(1);
}

每次我开始flink的时候都会有这样的信息:

INFO  org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase  - No state to restore for the RMQSource.

我该怎么解决这个问题?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题