我目前正在评估一个概念的证明,其中使用谷歌桶,一个java微服务和数据流。
沟通流程如下:
1.用户向第三方服务发送CSV文件
1.服务将CSV文件上传到Google bucket,其中包含ID和文件名
1.将触发一个create事件,并将其作为HTTP请求发送到Java微服务
- Java服务触发Google数据流作业
我开始认为Java服务是不必要的,我可以直接调用数据流后,CSV上传到桶?
正如您所看到的,这就是服务,它只是一个基本的控制器,用于验证来自“Create”触发器的请求参数,然后委托给Dataflow服务
@PostMapping(value = "/dataflow", produces = {MediaType.APPLICATION_JSON_VALUE})
public ResponseEntity<Object> triggerDataFlowJob(@RequestBody Map<String, Object> body) {
Map<String, String> requestParams = getRequestParams(body);
log.atInfo().log("Body %s", requestParams);
String bucket = requestParams.get("bucket");
String fileName = requestParams.get("name");
if (Objects.isNull(bucket) || Objects.isNull(fileName)) {
AuditLogger.log(AuditCode.INVALID_CLOUD_STORAGE_REQUEST.getCode(), AuditCode.INVALID_CLOUD_STORAGE_REQUEST.getAuditText());
return ResponseEntity.accepted().build();
}
log.atInfo().log("Triggering a Dataflow job, using Cloud Storage bucket: %s --> and file %s", bucket, fileName);
try {
return DataflowTransport
.newDataflowClient(options)
.build()
.projects()
.locations()
.flexTemplates()
.launch(gcpProjectIdProvider.getProjectId(),
dataflowProperties.getRegion(),
launchFlexTemplateRequest)
.execute();
} catch (Exception ex) {
if (ex instanceof GoogleJsonResponseException && ((GoogleJsonResponseException) ex).getStatusCode() == 409) {
log.atInfo().log("Dataflow job already triggered using Cloud Storage bucket: %s --> and file %s", bucket, fileName);
} else {
log.atSevere().withCause(ex).log("Error while launching dataflow jobs");
AuditLogger.log(AuditCode.LAUNCH_DATAFLOW_JOB.getCode(), AuditCode.LAUNCH_DATAFLOW_JOB.getAuditText());
}
}
return ResponseEntity.accepted().build();
}
有没有一种方法可以直接集成谷歌桶触发器与数据流?
1条答案
按热度按时间m0rkklqb1#
当文件上传到
Cloud Storage
时,可以触发带有事件弧的云函数V2。然后在这个云函数中,您可以触发一个
Dataflow
作业。Dataflow
作业,如下所示:这个云函数显示了一个
Python
的例子,但是如果你愿意,你可以保留你的Java
逻辑。