spring JpaCursorItemReader在每个块操作中只调用块大小还是完整数据?

hjzp0vay  于 2024-01-05  发布在  Spring
关注(0)|答案(1)|浏览(107)

项目读取器编码

@Bean
    public JpaCursorItemReader<ExtAssessments> extAssessmentsDiabeticReader() {
        String jpqlQuery = "from ExtAssessments WHERE processStatus='new' AND type='diabetic'";
        JpaCursorItemReader<ExtAssessments> itemReader = new JpaCursorItemReader<>();
        itemReader.setQueryString(jpqlQuery);
        itemReader.setEntityManagerFactory(entityManagerFactory);
        try {
            itemReader.afterPropertiesSet();
        } catch (Exception e) {
            log.info("Exception in item reader : " + e.getMessage());
        }
        itemReader.setSaveState(true);
        return itemReader;
    }

字符串

ItemProcessor代码->使用模型Map器将项目Map到DiabeticAssessment:extIdClassMap:用于单个错误处理updateTargetTableStatusToProcessing:用于将状态更新为正在处理并已完成

@Bean
    public ItemProcessor<ExtAssessments, DiabeticAssessment> diabeticAssessmentProcessor() {
        return item -> {
            DiabeticAssessment diabeticAssessment = mapper.map(item, DiabeticAssessment.class);
            log.info("set created by as data migration and created date as current data.");
            diabeticAssessment.setCreatedBy(CREATED_BY_DATA_MIGRATION);
            diabeticAssessment.setCreatedDatetime(Date.from(Instant.now()));
            log.info("update ext Assessment process status as processing");
            updateTargetTableStatusToProcessing(item);
            log.info("Item has been processed for ext assessment id: " + item.getId());
            int hashCode = diabeticAssessment.hashCode();
            extIdClassMap.put(String.valueOf(hashCode), item.getId());
            return diabeticAssessment;
        };
    }

ItemWriter代码

@Transactional
    @Bean
    public ItemWriter<DiabeticAssessment> diabeticAssessmentWriter() {
        return items -> {
            Boolean isErrorFound;
            String errorMessage;
            for (DiabeticAssessment diabeticAssessment : items) {
                isErrorFound = false;
                errorMessage = "";
                log.info("diabetic assessment data processing: " + diabeticAssessment);
                try {
                    Map<String, Object> validate = isValidate(diabeticAssessment);
                    if ((Boolean) validate.get(IS_VALIDATE)) {
                        diabeticAssessmentRepository.save(diabeticAssessment);

                    } else {
                        isErrorFound = true;
                        errorMessage = validate.get(VALIDATION_ERROR_MESSAGE).toString();
                        //ERROR LOG SAVE METHOD for individual failed
                        log.info("ERROR JOB ID: --->>>  " + jobId + " STEP ID: " + stepId + " Validation error message: "
                                + validate.get(VALIDATION_ERROR_MESSAGE));
                    }

                } catch (Exception e) {
                    isErrorFound = true;
                    errorMessage = "Exception during insertion operation";
                    log.info("diabetic assessment data not processed for JOB ID:" + jobId + " And STEP ID: " + stepId);
                }
                if (isErrorFound) {
                    MigrationLogRequest migrationLog = MigrationLogRequest.builder()
                            .extId(extIdClassMap.get(String.valueOf(diabeticAssessment.hashCode())))
                            .jobId(jobId)
                            .stepId(stepId)
                            .fromTable(TABLE_EXT_ASSESSMENTS)
                            .toTable(TABLE_DIABETIC_ASSESSMENT)
                            .message(errorMessage)
                            .severity(DATA_MIGRATION_ERROR)
                            .createdAt(Instant.now())
                            .createdBy(CREATED_BY_DATA_MIGRATION)
                            .build();
                    migrationLogService.CreateMigraitonLog(migrationLog);
                }
                log.info("assessment processing done for uuid: " + diabeticAssessment.getProfileId());
            }
            updateTargetTableStatusToCompleted();
            boolean isSuccessful = validateRecords();
           // if (!isSuccessful) {
                // TO DO
                //ERROR Message for chunk of data process failed
                //   loggerMethod(); //error item
           // } else {
                // loggerMethod(); //success item
          //  }
        };
    }

步骤代码

@Bean
    public Step stepDiabeticAssessmentInfo() {
        return stepBuilder.get("stepDiabeticAssessmentInfo")
                .<ExtAssessments, DiabeticAssessment>chunk(300)
                .reader(extAssessmentsDiabeticReader())
                .processor(diabeticAssessmentProcessor())
                .faultTolerant()
                .skip(Exception.class)
                .skipLimit(300)
                .writer(diabeticAssessmentWriter())
                .build();
    }


我们可以看到这里chunk是300。
JPQL项读取器是只调用300个数据项,还是调用所有数据,每次都降低性能?

String jpqlQuery = "from ExtAssessments WHERE processStatus='new' AND type='diabetic'";


如果它每次都调用所有数据,是否有任何JPQL或项目读取器方法来获取前300个数据,其中process_status = new,然后处理和保存而不获取所有数据?

r6hnlfcb

r6hnlfcb1#

也许你可以把它看作是一个分页查询,其中块号代表一个页面上有多少项被读取,然后是阅读、处理和写入的流程,然后查询下一个页面上定义的块的数量。
我希望我能正确理解你的意思,并能帮助你

相关问题