项目读取器编码:
@Bean
public JpaCursorItemReader<ExtAssessments> extAssessmentsDiabeticReader() {
String jpqlQuery = "from ExtAssessments WHERE processStatus='new' AND type='diabetic'";
JpaCursorItemReader<ExtAssessments> itemReader = new JpaCursorItemReader<>();
itemReader.setQueryString(jpqlQuery);
itemReader.setEntityManagerFactory(entityManagerFactory);
try {
itemReader.afterPropertiesSet();
} catch (Exception e) {
log.info("Exception in item reader : " + e.getMessage());
}
itemReader.setSaveState(true);
return itemReader;
}
字符串
ItemProcessor代码->使用模型Map器将项目Map到DiabeticAssessment:extIdClassMap:用于单个错误处理updateTargetTableStatusToProcessing:用于将状态更新为正在处理并已完成
@Bean
public ItemProcessor<ExtAssessments, DiabeticAssessment> diabeticAssessmentProcessor() {
return item -> {
DiabeticAssessment diabeticAssessment = mapper.map(item, DiabeticAssessment.class);
log.info("set created by as data migration and created date as current data.");
diabeticAssessment.setCreatedBy(CREATED_BY_DATA_MIGRATION);
diabeticAssessment.setCreatedDatetime(Date.from(Instant.now()));
log.info("update ext Assessment process status as processing");
updateTargetTableStatusToProcessing(item);
log.info("Item has been processed for ext assessment id: " + item.getId());
int hashCode = diabeticAssessment.hashCode();
extIdClassMap.put(String.valueOf(hashCode), item.getId());
return diabeticAssessment;
};
}
型
ItemWriter代码:
@Transactional
@Bean
public ItemWriter<DiabeticAssessment> diabeticAssessmentWriter() {
return items -> {
Boolean isErrorFound;
String errorMessage;
for (DiabeticAssessment diabeticAssessment : items) {
isErrorFound = false;
errorMessage = "";
log.info("diabetic assessment data processing: " + diabeticAssessment);
try {
Map<String, Object> validate = isValidate(diabeticAssessment);
if ((Boolean) validate.get(IS_VALIDATE)) {
diabeticAssessmentRepository.save(diabeticAssessment);
} else {
isErrorFound = true;
errorMessage = validate.get(VALIDATION_ERROR_MESSAGE).toString();
//ERROR LOG SAVE METHOD for individual failed
log.info("ERROR JOB ID: --->>> " + jobId + " STEP ID: " + stepId + " Validation error message: "
+ validate.get(VALIDATION_ERROR_MESSAGE));
}
} catch (Exception e) {
isErrorFound = true;
errorMessage = "Exception during insertion operation";
log.info("diabetic assessment data not processed for JOB ID:" + jobId + " And STEP ID: " + stepId);
}
if (isErrorFound) {
MigrationLogRequest migrationLog = MigrationLogRequest.builder()
.extId(extIdClassMap.get(String.valueOf(diabeticAssessment.hashCode())))
.jobId(jobId)
.stepId(stepId)
.fromTable(TABLE_EXT_ASSESSMENTS)
.toTable(TABLE_DIABETIC_ASSESSMENT)
.message(errorMessage)
.severity(DATA_MIGRATION_ERROR)
.createdAt(Instant.now())
.createdBy(CREATED_BY_DATA_MIGRATION)
.build();
migrationLogService.CreateMigraitonLog(migrationLog);
}
log.info("assessment processing done for uuid: " + diabeticAssessment.getProfileId());
}
updateTargetTableStatusToCompleted();
boolean isSuccessful = validateRecords();
// if (!isSuccessful) {
// TO DO
//ERROR Message for chunk of data process failed
// loggerMethod(); //error item
// } else {
// loggerMethod(); //success item
// }
};
}
型
步骤代码:
@Bean
public Step stepDiabeticAssessmentInfo() {
return stepBuilder.get("stepDiabeticAssessmentInfo")
.<ExtAssessments, DiabeticAssessment>chunk(300)
.reader(extAssessmentsDiabeticReader())
.processor(diabeticAssessmentProcessor())
.faultTolerant()
.skip(Exception.class)
.skipLimit(300)
.writer(diabeticAssessmentWriter())
.build();
}
型
我们可以看到这里chunk是300。
JPQL项读取器是只调用300个数据项,还是调用所有数据,每次都降低性能?
String jpqlQuery = "from ExtAssessments WHERE processStatus='new' AND type='diabetic'";
型
如果它每次都调用所有数据,是否有任何JPQL或项目读取器方法来获取前300个数据,其中process_status = new
,然后处理和保存而不获取所有数据?
1条答案
按热度按时间r6hnlfcb1#
也许你可以把它看作是一个分页查询,其中块号代表一个页面上有多少项被读取,然后是阅读、处理和写入的流程,然后查询下一个页面上定义的块的数量。
我希望我能正确理解你的意思,并能帮助你