amqpwriter和amqpreader示例

tsm1rwdh  于 2021-07-26  发布在  Java
关注(0)|答案(1)|浏览(352)

需要一个解决方案在rabbitmq上使用 AmqpWriter 并使用rabbitmq读取数据 AmqpReader . 我们不是在寻找apachekafka,我们只想发送say程序细节并使用它。
编写器代码
作业配置.java

@Configuration
public class JobConfig {

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Bean
    public ConnectionFactory connectionFactory() {
        return new CachingConnectionFactory("localhost");
    }

    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory());
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(connectionFactory());
    }

    @Bean
    public Queue myQueue() {
       return new Queue("myqueue");
    }

    @Bean
    public FlatFileItemReader<Customer> customerItemReader() {
        FlatFileItemReader<Customer> reader = new FlatFileItemReader<>();
        reader.setLinesToSkip(1);
        reader.setResource(new ClassPathResource("/data/customer.csv"));

        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames(new String[] { "id", "firstName", "lastName", "birthdate" });

        DefaultLineMapper<Customer> customerLineMapper = new DefaultLineMapper<>();
        customerLineMapper.setLineTokenizer(tokenizer);
        customerLineMapper.setFieldSetMapper(new CustomerFieldSetMapper());
        customerLineMapper.afterPropertiesSet();

        reader.setLineMapper(customerLineMapper);

        return reader;
    }

    @Bean
    public AmqpItemWriter<Customer> amqpWriter(){
        AmqpItemWriter<Customer> amqpItemWriter = new AmqpItemWriter<>(this.rabbitTemplate());
        return amqpItemWriter;
    }

    @Bean
    public Step step1() throws Exception {
        return stepBuilderFactory.get("step1")
                .<Customer, Customer>chunk(10)
                .reader(customerItemReader())
                .writer(amqpWriter())
                .build();
    }

    @Bean
    public Job job() throws Exception {
        return jobBuilderFactory.get("job")
                .incrementer(new RunIdIncrementer())
                .start(step1())
                .build();
    }
}

customerfieldsetmapper.java文件

public class CustomerFieldSetMapper implements FieldSetMapper<Customer> {

    @Override
    public Customer mapFieldSet(FieldSet fieldSet) throws BindException {
        return Customer.builder()
                .id(fieldSet.readLong("id"))
                .firstName(fieldSet.readRawString("firstName"))
                .lastName(fieldSet.readRawString("lastName"))
                .birthdate(fieldSet.readRawString("birthdate"))
                .build();
    }
}

客户.java

@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class Customer implements Serializable {
    private static final long serialVersionUID = 1L;
    private Long id;
    private String firstName;
    private String lastName;
    private String birthdate;
}

springbatchamqpplication.java应用程序

@EnableBatchProcessing
@SpringBootApplication
@EnableBinding(Source.class)
public class SpringBatchAmqpApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBatchAmqpApplication.class, args);
    }
}

读卡器代码
作业配置.java

@Configuration
public class JobConfiguration {
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Bean
    public ConnectionFactory connectionFactory() {
        return new CachingConnectionFactory("localhost");
    }

    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory());
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setMessageConverter(jsonMessageConverter());
        return factory; 
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        rabbitTemplate.setDefaultReceiveQueue("myqueue");
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        return rabbitTemplate;
    }

    @Bean
    public Queue myQueue() {
        return new Queue("myqueue");
    }

    @Bean
    public ItemReader<Customer> customerReader(){
        return new AmqpItemReader<>(this.rabbitTemplate());
    }

    @Bean
    public ItemWriter<Customer> customerItemWriter(){
        return items -> {
            for(Customer c : items) {
                System.out.println(c.toString());
            }
        };
    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .<Customer, Customer> chunk(10)
                .reader(customerReader())
                .writer(customerItemWriter())
                .listener(customerStepListener())
                .build();
    }

    @Bean
    public Job job() {
        return jobBuilderFactory.get("job")
                .start(step1())
                .build();
    }

    @Bean
    public CustomerStepListener customerStepListener() {
        return new CustomerStepListener();
    }
}

customersteplistener.java文件

public class CustomerStepListener implements StepExecutionListener {

    @Override
    public void beforeStep(StepExecution stepExecution) {
        System.out.println("==");
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        System.out.println("READ COUNT = "+stepExecution);
        return ExitStatus.COMPLETED;
    }
}

日志
2021-01-18 18:41:05.023 info 25532---[main]o.s.batch.core.job.simplestephandler:正在执行步骤:[step1]==2021-01-18 18:41:05.031 info 25532---[main]o.s.a.r.c.cachingconnectionfactory:正在尝试连接到:localhost:5672 2021-01-18 18:41:05.072信息25532---[main]o.s.a.r.c.cachingconnectionfactory:已创建新连接:连接工厂#20a14b55:0/simpleconnection@4650a407 [代表=amqp://guest@127.0.0.1:5672/,localport=55797]read count=step执行:id=1,version=2,name=step1,status=completed,exitstatus=completed,readcount=0,filtercount=0,writecount=0 readskipcount=0,writeskipcount=0,processskipcount=0,commitcount=1,rollbackcount=0,exitdescription=2021-01-18 18:41:05.097 info 25532---[main]o.s.batch.core.step.abstractstep:step:[step1]在73ms 2021-01-18 18 18:41:05.099 info 25532---[main]o.s.b.c.l.support.simplejobuncher:job:[simplejob:[name=job]]完成,参数如下:[{-spring.output.ansi.enabled=always}]状态:[完成]87毫秒

fruv7luv

fruv7luv1#

在“编写器代码”方面,您使用的是 AmqpItemWriter 配置有 RabbitTemplate . 默认情况下,消息将发送到无名交易所,这里是javadoc的摘录:

Messages will be sent to the nameless exchange if not specified on the provided AmqpTemplate.

在writer配置中,rabbit模板和队列之间没有“连接”。因此,您需要配置rabbit模板以将消息发送到您的队列:

@Bean
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
    rabbitTemplate.setRoutingKey(myQueue().getName());
    return rabbitTemplate;
}

这与你在读者方面所做的相似 rabbitTemplate.setDefaultReceiveQueue("myqueue"); .

相关问题