我让flink以rabbitmq作为源运行,我的配置中的某些东西可能会导致rabbitmq中出现未确认的消息,然后最终一切都会崩溃。这是我的密码:
检查点配置:
Configuration conf = new Configuration();
conf.setInteger("rest.port", PropertyFileReader.getRestPort());
conf.setString("web.log.path", PropertyFileReader.getFlinkDashboardLogPath());
conf.setString("taskmanager.memory.flink.size", PropertyFileReader.getFlinkMemory());
conf.setString("taskmanager.memory.process.size", PropertyFileReader.getFlinkMemory());
conf.setInteger("taskmanager.data.port", PropertyFileReader.getFlinkTaskmanagerDataPort());
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(8, conf);
//Environment configurations
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(500);
env.setMaxParallelism(8);
env.setBufferTimeout(1000);
env.getConfig().setUseSnapshotCompression(true);
env.getConfig().disableSysoutLogging();
env.getConfig().enableObjectReuse();
if (PropertyFileReader.isCheckpointing_enabled()) {
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setStateBackend(new FsStateBackend(new Path(PropertyFileReader.getCheckpointing_path()).toUri(), true));
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.enableCheckpointing(3000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(6000);
env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
}
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
10, // number of restart attempts
Time.of(5, TimeUnit.SECONDS) // delay
));
消费者代码:
connectionConfig = new RMQConnectionConfig.Builder()
.setHost(PropertyFileReader.getRbt_host())
.setPort(PropertyFileReader.getRbt_port())
.setUserName(PropertyFileReader.getRbt_username())
.setPassword(PropertyFileReader.getRbt_password())
.setVirtualHost(PropertyFileReader.getRbt_virtualhost())
.setTopologyRecoveryEnabled(true)
.setAutomaticRecovery(true)
.setRequestedChannelMax(PropertyFileReader.getMax_channel())
.setRequestedChannelMax(PropertyFileReader.getRequested_max_channel())
.setNetworkRecoveryInterval(1)
.build();
public static DataStream<String> eventStreamObject(StreamExecutionEnvironment env) {
return env.addSource(new RMQSource<>(
connectionConfig,
PropertyFileReader.getRbt_queue(),
true,
new SimpleStringSchema()))
.setParallelism(1);
}
每次我开始flink的时候都会有这样的信息:
INFO org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase - No state to restore for the RMQSource.
我该怎么解决这个问题?
暂无答案!
目前还没有任何答案,快来回答吧!