spring云流调度程序没有订阅服务器

ktecyv1j  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(430)

spring云流调度程序没有订阅服务器错误。
在成功启动spring引导容器之后,我们需要在kafka主题上放置一些通知消息,并且我们的几个微服务执行相同的功能,因此我们编写了一个包含输出通道定义和分派util的公共jar。只要我们在springapplication.run调用之后调用util,该功能就可以正常工作。
下面是我们的一个microservices应用程序类示例。

@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        ConfigurableApplicationContext context =SpringApplication.run(Application.class, args);
        context.getBean(SchedulerConsumerUtils.class).registerOrRestartConsumerJobs();
    }
}

上面的设置工作正常,但是这给开发人员在每个微服务上编写boiler模板代码带来了不必要的负担。因此,为了避免这种情况,我们编写了一个方面实现来执行相同的功能,但是在使用方面方法时,我们遇到了以下错误。
org.springframework.context.applicationcontextexception:未能启动bean'outputbindinglifecycle';嵌套的异常为org.springframework.messaging.messagedeliveryexception:dispatcher没有频道日程的订户rtestsvcs:dev:1180.ScheduledJobeExecutionResponseOutput'。;嵌套异常为org.springframework.integration.messagedispatchingexception:dispatcher没有订阅服务器
我们尝试了几种方法,比如springsmartlifecycle,来获取所有kafka输出/输入通道启动完成的句柄,但它们都遇到了相同的错误。
下面是我们在org.springframework.boot.springapplication.run(..)上的方面实现

@Aspect
@Component
public class SchedulerConsumerAspect {

    @Autowired
    protected ApplicationContext applicationContext;
    @AfterReturning(value = "execution(* org.springframework.boot.SpringApplication.run(..))",returning = "result")
    public void afterConsumerApplicationStartup(JoinPoint pjp, Object result) throws Throwable {
        if(result!=null){
            ConfigurableApplicationContext context=(ConfigurableApplicationContext) result;
            if(context.containsBean("schedulerConsumerUtils")){
                //For what ever reason the following call resulting in Dispatcher has no subscribers for channel error.
                //TODO fix the above issue and enable the following call.
                context.getBean(SchedulerConsumerUtils.class).registerOrRestartConsumerJobs();
            }
        }
    }

}

在我们的调试会话中,我们发现org.springframework.boot.springapplication.run(..)方面在引导过程中被多次调用。首先,当调用方面时,我们得到的结果值为null,一段时间后,spring boot调用同一方面,这次的结果不为null。即使在获得result not null之后,也没有被授权者,组件被完全初始化,这就是为什么您会看到对context.containsbean(“schedulerconsumerutils”)的检查。但是在bean初始化之后,我们看到输出通道没有完全绑定。
处理springcloudstreamkafka输出/输入通道绑定完成的最佳方法是什么?
为什么组件调用在springboot应用程序中运行良好,而不是通过aspect?这几天我挣扎着找不到合适的解决办法。非常感谢您的帮助。

g52tjvyc

g52tjvyc1#

我遵循了这个post-spring cloud stream-send message after application initialization的建议,并使用了第3个选项applicationrunner。前两个选项对我不起作用。

@Component
public class AppStartup implements ApplicationRunner {
    @Autowired
    protected ApplicationContext applicationContext;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        if(applicationContext!=null){
            applicationContext.getBean(SchedulerConsumerUtils.class).registerOrRestartConsumerJobs();
        }
    }
}

相关问题