我使用springintegration来管理一些计划任务,其中一个我选择使用 HttpRequestHandlingMessagingGateway
最后使用聚合器收集任务结果,如下所示。最后一个频道“responsehttpchannel”收到的信息有什么遗漏吗?这不是我第一次使用spring集成,也不是http网关。但在这种情况下,我永远不能让聚合器工作。在这个请求-响应消息中执行一些其他任务。如果至少暂停对我有效的话,那已经是胜利了。
@Configuration
@EnableIntegration
@EnableConfigurationProperties
@EnableAsync
@EnableScheduling
@EnableIntegrationGraphController(path = "/integration", allowedOrigins="*")
public class SysmatIntegrationConfiguration {
private static final String MESSAGE_ORIGIN = "MESSAGE_ORIGIN";
@Autowired
IndexSchedulerService<IndexScheduler<?>> indexSchedulerService;
@Autowired
ApplicationContext applicationContext;
@Autowired
ListaTarefaItemHistorySplitter tarefaItemHistorySplitter;
@Autowired
ListTarefaItemSpliter tarefaItemSplitter;
@Autowired
SelecionadorItem selecionadorItem;
@Autowired
TarefaItemHistoryLog tarefaItemHistoryLog;
@Autowired
TarefaIndexarItemMaster tarefaIndexarItemMaster;
public SysmatIntegrationConfiguration() {
}
@Bean
public HeaderAttributeCorrelationStrategy headerAttributeCorrelationStrategy() {
return new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID);
}
@Bean
@Qualifier(value="expressionParser")
ExpressionParser expressionParser() {
return new SpelExpressionParser();
}
@Bean
@Qualifier(value="indexTarefaTemplate")
public MessagingTemplate indexTarefaTemplate(@Autowired @Qualifier("tarefaControllerChannel") MessageChannel indexChannel) {
return new MessagingTemplate(indexChannel);
}
@Bean
@Primary
@Qualifier(value="jobMessageTemplate")
public MessagingTemplate messagingTemplate(@Autowired @Qualifier("indexChannel") MessageChannel indexChannel) {
MessagingTemplate messagingTemplate = new MessagingTemplate(indexChannel);
return messagingTemplate;
}
//schedulerMessageTemplate
@Bean
@Primary
@Qualifier(value="schedulerMessageTemplate")
public MessagingTemplate schedulerMessageTemplate(@Autowired @Qualifier("tarefaIndexSchedulerChannel") MessageChannel tarefaIndexSchedulerChannel) {
MessagingTemplate messagingTemplate = new MessagingTemplate(tarefaIndexSchedulerChannel);
return messagingTemplate;
}
@Bean(name="threadFactory")
@Qualifier("threadFactory")
public ThreadFactory highPriorityThreadFactory() {
return constructThread(Thread.NORM_PRIORITY , false);
}
@Bean(name="normalPriorityThreadFactory")
@Qualifier("normalPriorityThreadFactory")
public ThreadFactory normalPriorityThreadFactory() {
return constructThread(Thread.NORM_PRIORITY , false);
}
private ThreadFactory constructThread(final int priority , final Boolean daemon) {
return new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(Optional.ofNullable(daemon).orElse(false));
t.setPriority(priority);
return t;
}
};
}
@Bean(name="threadPoolExecutor")
@Qualifier(value="threadPoolExecutor")
public ThreadPoolTaskExecutor threadPoolExecutor(@Autowired @Qualifier("threadFactory") ThreadFactory threadFactory) {
ThreadPoolTaskExecutor tpe = new ThreadPoolTaskExecutor();
tpe.setCorePoolSize(100);
tpe.setMaxPoolSize(Integer.MAX_VALUE);
tpe.setThreadFactory(threadFactory);
return tpe;
}
@Bean(name="taskScheduler")
@Qualifier(value="taskScheduler")
public ThreadPoolTaskScheduler taskScheduler(@Autowired @Qualifier("normalPriorityThreadFactory") ThreadFactory threadFactory) {
ThreadPoolTaskScheduler tpe = new ThreadPoolTaskScheduler();
tpe.setThreadNamePrefix("IntegrationPoolTaskScheduler");
tpe.setThreadFactory(threadFactory);
tpe.setPoolSize(20);
return tpe;
}
@Bean(name="threadPoolTaskScheduler")
@Qualifier(value="threadPoolTaskScheduler")
public ThreadPoolTaskScheduler threadPoolTaskScheduler(@Autowired @Qualifier("normalPriorityThreadFactory") ThreadFactory threadFactory) {
ThreadPoolTaskScheduler tpe = new ThreadPoolTaskScheduler();
tpe.setThreadNamePrefix("ThreadPoolTaskScheduler");
tpe.setThreadFactory(threadFactory);
tpe.setPoolSize(20);
return tpe;
}
@Bean
@Qualifier(value="myTaskRegistar")
public ScheduledTaskRegistrar myTaskRegistar(@Autowired @Qualifier("threadPoolTaskScheduler")TaskScheduler taskScheduler) {
ScheduledTaskRegistrar scheduledTaskRegistrar = new ScheduledTaskRegistrar();
scheduledTaskRegistrar.setTaskScheduler(taskScheduler);
return scheduledTaskRegistrar;
}
@Primary
@Bean("httpMessageConverters")
public List<HttpMessageConverter<?>> httpMessageConverters(){
List<HttpMessageConverter<?>> converters = new ArrayList<>();
converters.add(new MappingJackson2HttpMessageConverter(new ObjectMapper()));
converters.add(new FormHttpMessageConverter());
converters.add(new StringHttpMessageConverter());
return converters;
}
@Bean(name="defaultHttpHeaderMapper")
@Qualifier(value="defaultHttpHeaderMapper")
public DefaultHttpHeaderMapper defaultHeaderMapper() {
DefaultHttpHeaderMapper mapper = new DefaultHttpHeaderMapper();
mapper.setInboundHeaderNames(new String[] {"*"});
mapper.setOutboundHeaderNames(new String[] {"*"});
return mapper;
}
@Bean
@Qualifier("indexChannel")
public MessageChannel indexChannel(@Autowired @Qualifier("threadPoolExecutor")ThreadPoolTaskExecutor threadPoolExecutor) {
PublishSubscribeChannel psc = MessageChannels
.publishSubscribe("indexChannel" , threadPoolExecutor)
.get();
return psc;
}
@Bean(name="tarefaIndexChannel")
@Qualifier(value="tarefaIndexChannel")
public MessageChannel tarefaIndexChannel(@Autowired @Qualifier("threadPoolExecutor")ThreadPoolTaskExecutor threadPoolExecutor) {
PublishSubscribeChannel psc = MessageChannels
.publishSubscribe("tarefaIndexChannel" , threadPoolExecutor)
.get();
return psc;
}
@Bean(name="tarefaIndexSchedulerChannel")
@Qualifier(value="tarefaIndexSchedulerChannel")
public MessageChannel tarefaIndexSchedulerChannel(@Autowired @Qualifier("threadPoolExecutor")ThreadPoolTaskExecutor threadPoolExecutor) {
PublishSubscribeChannel psc = MessageChannels
.publishSubscribe("tarefaIndexSchedulerChannel" , threadPoolExecutor)
.get();
return psc;
}
@Bean(name="tarefaItemMasterIndexChannel")
@Qualifier(value="tarefaItemMasterIndexChannel")
public MessageChannel tarefaItemMasterIndexChannel(@Autowired @Qualifier("threadPoolExecutor")ThreadPoolTaskExecutor threadPoolExecutor) {
PublishSubscribeChannel psc = MessageChannels
.publishSubscribe("tarefaIndexChannel" , threadPoolExecutor)
.get();
return psc;
}
@Bean
@Qualifier("logginChannel")
public DirectChannel loggingChannel() {
return new DirectChannel();
}
@Bean(name="tarefaPartitionChannel")
@Qualifier(value="tarefaPartitionChannel")
public MessageChannel tarefaPartitionChannel(@Autowired @Qualifier("threadPoolExecutor")ThreadPoolTaskExecutor threadPoolExecutor) {
PublishSubscribeChannel psc = MessageChannels
.publishSubscribe("tarefaPartitionChannel" , threadPoolExecutor)
.get();
return psc;
}
@Bean(name="tarefaControllerChannel")
@Qualifier(value="tarefaControllerChannel")
public MessageChannel tarefaControllerChannel() {
return new DirectChannel();
}
@Bean(name="tarefaHttpIntegrationChannel")
@Qualifier(value="tarefaHttpIntegrationChannel")
public MessageChannel tarefaHttpIntegrationChannel(@Autowired @Qualifier("threadPoolExecutor")ThreadPoolTaskExecutor threadPoolExecutor) {
PublishSubscribeChannel psc = MessageChannels
.publishSubscribe("tarefaHttpIntegrationChannel" , threadPoolExecutor)
.get();
return psc;
}
@Bean(name="tarefaHttpIndexIntegrationChannel")
@Qualifier(value="tarefaHttpIndexIntegrationChannel")
public MessageChannel tarefaHttpIndexIntegrationChannel(@Autowired @Qualifier("threadPoolExecutor")ThreadPoolTaskExecutor threadPoolExecutor) {
PublishSubscribeChannel psc = MessageChannels
.publishSubscribe("tarefaHttpIndexIntegrationChannel" , threadPoolExecutor)
.get();
return psc;
}
@Bean(name="sumarizaTarefaChannel")
@Qualifier(value="sumarizaTarefaChannel")
public MessageChannel sumarizaTarefaChannel(@Autowired @Qualifier("threadPoolExecutor")ThreadPoolTaskExecutor threadPoolExecutor) {
PublishSubscribeChannel psc = MessageChannels
.publishSubscribe("sumarizaTarefaChannel" , threadPoolExecutor)
.get();
return psc;
}
@Bean
public IntegrationFlow loggingFlow(@Autowired
@Qualifier("loggingChannel") MessageChannel logChannel) {
return IntegrationFlows
.from(logChannel)
.handle(m -> logMessage(m))
.get();
}
@Bean
public IntegrationFlow indexPrepareFlow(@Autowired @Qualifier("indexChannel") MessageChannel indexChannel ,
@Autowired @Qualifier("tarefaIndexChannel") MessageChannel tarefaIndexChannel) {
return IntegrationFlows
.from(indexChannel)
.transform(selecionadorItem)
.channel("tarefaPartitionChannel").get();
}
@Bean
public IntegrationFlow itemRouteToPartition(@Autowired HttpRequestHandler httpRequestHandler) {
return IntegrationFlows
.from("tarefaHttpIntegrationChannel")
.enrichHeaders(h -> h.header(MESSAGE_ORIGIN, "HTTP"))
.handle(httpRequestHandler)
.get();
}
@Bean
public IntegrationFlow indexRouteToPartition() {
return IntegrationFlows
.from("tarefaControllerChannel")
.channel("tarefaPartitionChannel").get();
}
@Bean
public IntegrationFlow indexSchedulerRouteToPartition(@Autowired TarefaIndexScheduler tarefaIndexScheduler) {
return IntegrationFlows
.from("tarefaIndexSchedulerChannel")
.transform(tarefaIndexScheduler)
.channel("tarefaPartitionChannel").get();
}
@Bean
public IntegrationFlow indexSplitHistory(@Autowired IndexTaskMonitor indexTaskMonitor) {
return IntegrationFlows
.from("tarefaPartitionChannel")
.wireTap(flow -> flow.handle(m->indexTaskMonitor.processIndexRequest(m.getPayload())))
.split(tarefaItemHistorySplitter)
.channel("tarefaIndexChannel").get();
}
@Bean
public IntegrationFlow indexFlow(@Autowired @Qualifier("tarefaIndexChannel") MessageChannel tarefaIndexChannel
,@Autowired @Qualifier("tarefaItemMasterIndexChannel") MessageChannel indexMasterChannel) {
return IntegrationFlows
.from(tarefaIndexChannel)
.routeToRecipients(route ->
route.recipientFlow(subFlow ->
subFlow.transform(tarefaItemHistoryLog)
.split(tarefaItemSplitter)
.channel(indexMasterChannel)))
.get();
}
@Bean
public IntegrationFlow indexItemMasterFlow(@Autowired @Qualifier("tarefaItemMasterIndexChannel") MessageChannel indexMasterChannel,
@Autowired @Qualifier("loggingChannel") MessageChannel logChannel ,
@Autowired JmsMessageSender jmsSender) {
return IntegrationFlows
.from(indexMasterChannel)
.transform(tarefaIndexarItemMaster)
.channel(logChannel)
.handle(m -> jmsSender.send(m.getPayload()))
.get();
}
@Bean
public IntegrationFlow sumarizaIndexItemFlow() {
return IntegrationFlows
.from("sumarizaTarefaChannel")
.wireTap(subflow ->
subflow
.filter(new GenericSelector<Message<?>>() {
@Override
public boolean accept(Message<?> source) {
return Optional
.ofNullable(source.getHeaders().get(MESSAGE_ORIGIN))
.map(h -> h.equals("HTTP")).orElse(false);
}
})
.channel(c -> c.direct("prehttpChannel")))
.wireTap(subflow ->
subflow
.filter(new GenericSelector<Message<?>>() {
@Override
public boolean accept(Message<?> source) {
return Optional
.ofNullable(source.getHeaders().get(MESSAGE_ORIGIN))
.map(h -> h.equals("SCHEDULER")).orElse(false);
}
})
.channel(c -> c.direct("schedulerReplyChannel")))
.get();
}
@Bean
public IntegrationFlow httpSchedulerRequestResponse() {
return IntegrationFlows
.from("schedulerReplyChannel")
.channel("loggingChannel")
.get();
}
@Bean
public IntegrationFlow httpResponse() {
return IntegrationFlows
.from("prehttpChannel")
.channel("responseHttpChannel")
.channel("loggingChannel")
.get();
}
@Bean
public CrossOrigin crossOrigin() {
CrossOrigin co = new CrossOrigin();
co.setOrigin("*");
co.setMethod(RequestMethod.GET , RequestMethod.POST);
return co;
}
@Bean(name="itemRequestMapping")
@Qualifier(value="itemRequestMapping")
public RequestMapping itemRequestMapping() {
RequestMapping mapping = new RequestMapping();
mapping.setMethods(HttpMethod.GET , HttpMethod.POST);
mapping.setConsumes(MediaType.APPLICATION_JSON_VALUE);
mapping.setProduces(MediaType.APPLICATION_JSON_VALUE);
mapping.setPathPatterns("/int_sysmat/{id}");
return mapping;
}
@Bean
public HttpRequestHandlingMessagingGateway httpRequestGateway
(@Autowired @Qualifier("tarefaHttpIntegrationChannel")
MessageChannel tarefaHttpIntegrationChannel ,
@Autowired @Qualifier("responseHttpChannel") MessageChannel responseHttpChannel,
@Autowired @Qualifier("expressionParser") ExpressionParser parser,
@Autowired @Qualifier("itemRequestMapping") RequestMapping requestMapping,
@Autowired CrossOrigin crossOrigin,
@Autowired List<HttpMessageConverter<?>> httpMessageConverters) {
HttpRequestHandlingMessagingGateway g = new HttpRequestHandlingMessagingGateway();
g.setRequestChannel(tarefaHttpIntegrationChannel);
g.setReplyChannel(responseHttpChannel);
g.setPayloadExpression(parser.parseExpression("#pathVariables.id"));
g.setRequestMapping(requestMapping);
g.setCrossOrigin(crossOrigin);
g.setMessageConverters(httpMessageConverters);
g.setReplyTimeout(180000);
return g;
}
@Bean(name="indexIntegrationRepresentationRequestMapping")
@Qualifier(value="indexIntegrationRepresentationRequestMapping")
public RequestMapping indexRepresentationRequestMapping() {
RequestMapping mapping = new RequestMapping();
mapping.setMethods(HttpMethod.GET , HttpMethod.POST);
mapping.setConsumes(MediaType.APPLICATION_JSON_VALUE);
mapping.setProduces(MediaType.APPLICATION_JSON_VALUE);
mapping.setPathPatterns("/sysmat_index_integration");
return mapping;
}
@Bean("httpSysmatIntegrationRequestGateway")
@Qualifier(value="httpSysmatIntegrationRequestGateway")
public HttpRequestHandlingMessagingGateway httpSysmatIntegrationRequestGateway
(@Autowired @Qualifier("tarefaHttpIndexIntegrationChannel") MessageChannel tarefaHttpIndexIntegrationChannel ,
@Autowired @Qualifier("expressionParser") ExpressionParser parser,
@Autowired @Qualifier("indexIntegrationRepresentationRequestMapping") RequestMapping requestMapping,
@Autowired CrossOrigin crossOrigin,
@Autowired List<HttpMessageConverter<?>> httpMessageConverters ,
@Autowired @Qualifier("responseHttpChannel") MessageChannel httpResponseChannel) {
HttpRequestHandlingMessagingGateway g = new HttpRequestHandlingMessagingGateway();
g.setRequestChannel(tarefaHttpIndexIntegrationChannel);
g.setReplyChannel(httpResponseChannel);
g.setExtractReplyPayload(false);
g.setRequestPayloadTypeClass(IndexIntegrationRepresentation.class);
g.setRequestMapping(requestMapping);
g.setCrossOrigin(crossOrigin);
g.setMessageConverters(httpMessageConverters);
g.setReplyTimeout(180000);
return g;
}
@Bean
@Qualifier(value="responseHttpChannel")
public MessageChannel responseHttpChannel() {
return new DirectChannel();
}
@Bean
public IntegrationFlow tarefaHttpIndexIntegrationPartitionFlow(@Autowired TarefaIndexIntegrationHistory transformer) {
return IntegrationFlows
.from("tarefaHttpIndexIntegrationChannel")
.handle(transformer)
.get();
}
private Message<?> logMessage(Message<?> m) {
System.err.println(m.getPayload());
return m;
}
}
任何线索我都很高兴。
当做。
暂无答案!
目前还没有任何答案,快来回答吧!