spring JpaCursorItemReader在每个块操作中只调用块大小还是完整数据?

hjzp0vay  于 2024-01-05  发布在  Spring
关注(0)|答案(1)|浏览(124)

项目读取器编码

  1. @Bean
  2. public JpaCursorItemReader<ExtAssessments> extAssessmentsDiabeticReader() {
  3. String jpqlQuery = "from ExtAssessments WHERE processStatus='new' AND type='diabetic'";
  4. JpaCursorItemReader<ExtAssessments> itemReader = new JpaCursorItemReader<>();
  5. itemReader.setQueryString(jpqlQuery);
  6. itemReader.setEntityManagerFactory(entityManagerFactory);
  7. try {
  8. itemReader.afterPropertiesSet();
  9. } catch (Exception e) {
  10. log.info("Exception in item reader : " + e.getMessage());
  11. }
  12. itemReader.setSaveState(true);
  13. return itemReader;
  14. }

字符串

ItemProcessor代码->使用模型Map器将项目Map到DiabeticAssessment:extIdClassMap:用于单个错误处理updateTargetTableStatusToProcessing:用于将状态更新为正在处理并已完成

  1. @Bean
  2. public ItemProcessor<ExtAssessments, DiabeticAssessment> diabeticAssessmentProcessor() {
  3. return item -> {
  4. DiabeticAssessment diabeticAssessment = mapper.map(item, DiabeticAssessment.class);
  5. log.info("set created by as data migration and created date as current data.");
  6. diabeticAssessment.setCreatedBy(CREATED_BY_DATA_MIGRATION);
  7. diabeticAssessment.setCreatedDatetime(Date.from(Instant.now()));
  8. log.info("update ext Assessment process status as processing");
  9. updateTargetTableStatusToProcessing(item);
  10. log.info("Item has been processed for ext assessment id: " + item.getId());
  11. int hashCode = diabeticAssessment.hashCode();
  12. extIdClassMap.put(String.valueOf(hashCode), item.getId());
  13. return diabeticAssessment;
  14. };
  15. }

ItemWriter代码

  1. @Transactional
  2. @Bean
  3. public ItemWriter<DiabeticAssessment> diabeticAssessmentWriter() {
  4. return items -> {
  5. Boolean isErrorFound;
  6. String errorMessage;
  7. for (DiabeticAssessment diabeticAssessment : items) {
  8. isErrorFound = false;
  9. errorMessage = "";
  10. log.info("diabetic assessment data processing: " + diabeticAssessment);
  11. try {
  12. Map<String, Object> validate = isValidate(diabeticAssessment);
  13. if ((Boolean) validate.get(IS_VALIDATE)) {
  14. diabeticAssessmentRepository.save(diabeticAssessment);
  15. } else {
  16. isErrorFound = true;
  17. errorMessage = validate.get(VALIDATION_ERROR_MESSAGE).toString();
  18. //ERROR LOG SAVE METHOD for individual failed
  19. log.info("ERROR JOB ID: --->>> " + jobId + " STEP ID: " + stepId + " Validation error message: "
  20. + validate.get(VALIDATION_ERROR_MESSAGE));
  21. }
  22. } catch (Exception e) {
  23. isErrorFound = true;
  24. errorMessage = "Exception during insertion operation";
  25. log.info("diabetic assessment data not processed for JOB ID:" + jobId + " And STEP ID: " + stepId);
  26. }
  27. if (isErrorFound) {
  28. MigrationLogRequest migrationLog = MigrationLogRequest.builder()
  29. .extId(extIdClassMap.get(String.valueOf(diabeticAssessment.hashCode())))
  30. .jobId(jobId)
  31. .stepId(stepId)
  32. .fromTable(TABLE_EXT_ASSESSMENTS)
  33. .toTable(TABLE_DIABETIC_ASSESSMENT)
  34. .message(errorMessage)
  35. .severity(DATA_MIGRATION_ERROR)
  36. .createdAt(Instant.now())
  37. .createdBy(CREATED_BY_DATA_MIGRATION)
  38. .build();
  39. migrationLogService.CreateMigraitonLog(migrationLog);
  40. }
  41. log.info("assessment processing done for uuid: " + diabeticAssessment.getProfileId());
  42. }
  43. updateTargetTableStatusToCompleted();
  44. boolean isSuccessful = validateRecords();
  45. // if (!isSuccessful) {
  46. // TO DO
  47. //ERROR Message for chunk of data process failed
  48. // loggerMethod(); //error item
  49. // } else {
  50. // loggerMethod(); //success item
  51. // }
  52. };
  53. }

步骤代码

  1. @Bean
  2. public Step stepDiabeticAssessmentInfo() {
  3. return stepBuilder.get("stepDiabeticAssessmentInfo")
  4. .<ExtAssessments, DiabeticAssessment>chunk(300)
  5. .reader(extAssessmentsDiabeticReader())
  6. .processor(diabeticAssessmentProcessor())
  7. .faultTolerant()
  8. .skip(Exception.class)
  9. .skipLimit(300)
  10. .writer(diabeticAssessmentWriter())
  11. .build();
  12. }


我们可以看到这里chunk是300。
JPQL项读取器是只调用300个数据项,还是调用所有数据,每次都降低性能?

  1. String jpqlQuery = "from ExtAssessments WHERE processStatus='new' AND type='diabetic'";


如果它每次都调用所有数据,是否有任何JPQL或项目读取器方法来获取前300个数据,其中process_status = new,然后处理和保存而不获取所有数据?

r6hnlfcb

r6hnlfcb1#

也许你可以把它看作是一个分页查询,其中块号代表一个页面上有多少项被读取,然后是阅读、处理和写入的流程,然后查询下一个页面上定义的块的数量。
我希望我能正确理解你的意思,并能帮助你

相关问题