需要一个解决方案在rabbitmq上使用 AmqpWriter
并使用rabbitmq读取数据 AmqpReader
. 我们不是在寻找apachekafka,我们只想发送say程序细节并使用它。
编写器代码
作业配置.java
@Configuration
public class JobConfig {
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Bean
public ConnectionFactory connectionFactory() {
return new CachingConnectionFactory("localhost");
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
@Bean
public Queue myQueue() {
return new Queue("myqueue");
}
@Bean
public FlatFileItemReader<Customer> customerItemReader() {
FlatFileItemReader<Customer> reader = new FlatFileItemReader<>();
reader.setLinesToSkip(1);
reader.setResource(new ClassPathResource("/data/customer.csv"));
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames(new String[] { "id", "firstName", "lastName", "birthdate" });
DefaultLineMapper<Customer> customerLineMapper = new DefaultLineMapper<>();
customerLineMapper.setLineTokenizer(tokenizer);
customerLineMapper.setFieldSetMapper(new CustomerFieldSetMapper());
customerLineMapper.afterPropertiesSet();
reader.setLineMapper(customerLineMapper);
return reader;
}
@Bean
public AmqpItemWriter<Customer> amqpWriter(){
AmqpItemWriter<Customer> amqpItemWriter = new AmqpItemWriter<>(this.rabbitTemplate());
return amqpItemWriter;
}
@Bean
public Step step1() throws Exception {
return stepBuilderFactory.get("step1")
.<Customer, Customer>chunk(10)
.reader(customerItemReader())
.writer(amqpWriter())
.build();
}
@Bean
public Job job() throws Exception {
return jobBuilderFactory.get("job")
.incrementer(new RunIdIncrementer())
.start(step1())
.build();
}
}
customerfieldsetmapper.java文件
public class CustomerFieldSetMapper implements FieldSetMapper<Customer> {
@Override
public Customer mapFieldSet(FieldSet fieldSet) throws BindException {
return Customer.builder()
.id(fieldSet.readLong("id"))
.firstName(fieldSet.readRawString("firstName"))
.lastName(fieldSet.readRawString("lastName"))
.birthdate(fieldSet.readRawString("birthdate"))
.build();
}
}
客户.java
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class Customer implements Serializable {
private static final long serialVersionUID = 1L;
private Long id;
private String firstName;
private String lastName;
private String birthdate;
}
springbatchamqpplication.java应用程序
@EnableBatchProcessing
@SpringBootApplication
@EnableBinding(Source.class)
public class SpringBatchAmqpApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBatchAmqpApplication.class, args);
}
}
读卡器代码
作业配置.java
@Configuration
public class JobConfiguration {
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Bean
public ConnectionFactory connectionFactory() {
return new CachingConnectionFactory("localhost");
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setMessageConverter(jsonMessageConverter());
return factory;
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setDefaultReceiveQueue("myqueue");
rabbitTemplate.setMessageConverter(jsonMessageConverter());
return rabbitTemplate;
}
@Bean
public Queue myQueue() {
return new Queue("myqueue");
}
@Bean
public ItemReader<Customer> customerReader(){
return new AmqpItemReader<>(this.rabbitTemplate());
}
@Bean
public ItemWriter<Customer> customerItemWriter(){
return items -> {
for(Customer c : items) {
System.out.println(c.toString());
}
};
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<Customer, Customer> chunk(10)
.reader(customerReader())
.writer(customerItemWriter())
.listener(customerStepListener())
.build();
}
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.start(step1())
.build();
}
@Bean
public CustomerStepListener customerStepListener() {
return new CustomerStepListener();
}
}
customersteplistener.java文件
public class CustomerStepListener implements StepExecutionListener {
@Override
public void beforeStep(StepExecution stepExecution) {
System.out.println("==");
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
System.out.println("READ COUNT = "+stepExecution);
return ExitStatus.COMPLETED;
}
}
日志
2021-01-18 18:41:05.023 info 25532---[main]o.s.batch.core.job.simplestephandler:正在执行步骤:[step1]==2021-01-18 18:41:05.031 info 25532---[main]o.s.a.r.c.cachingconnectionfactory:正在尝试连接到:localhost:5672 2021-01-18 18:41:05.072信息25532---[main]o.s.a.r.c.cachingconnectionfactory:已创建新连接:连接工厂#20a14b55:0/simpleconnection@4650a407 [代表=amqp://guest@127.0.0.1:5672/,localport=55797]read count=step执行:id=1,version=2,name=step1,status=completed,exitstatus=completed,readcount=0,filtercount=0,writecount=0 readskipcount=0,writeskipcount=0,processskipcount=0,commitcount=1,rollbackcount=0,exitdescription=2021-01-18 18:41:05.097 info 25532---[main]o.s.batch.core.step.abstractstep:step:[step1]在73ms 2021-01-18 18 18:41:05.099 info 25532---[main]o.s.b.c.l.support.simplejobuncher:job:[simplejob:[name=job]]完成,参数如下:[{-spring.output.ansi.enabled=always}]状态:[完成]87毫秒
1条答案
按热度按时间fruv7luv1#
在“编写器代码”方面,您使用的是
AmqpItemWriter
配置有RabbitTemplate
. 默认情况下,消息将发送到无名交易所,这里是javadoc的摘录:在writer配置中,rabbit模板和队列之间没有“连接”。因此,您需要配置rabbit模板以将消息发送到您的队列:
这与你在读者方面所做的相似
rabbitTemplate.setDefaultReceiveQueue("myqueue");
.