java 谷歌数据流触发谷歌桶上传?

xdnvmnnf  于 2023-02-14  发布在  Java
关注(0)|答案(1)|浏览(142)

我目前正在评估一个概念的证明,其中使用谷歌桶,一个java微服务和数据流。
沟通流程如下:
1.用户向第三方服务发送CSV文件
1.服务将CSV文件上传到Google bucket,其中包含ID和文件名
1.将触发一个create事件,并将其作为HTTP请求发送到Java微服务

  1. Java服务触发Google数据流作业
    我开始认为Java服务是不必要的,我可以直接调用数据流后,CSV上传到桶?
    正如您所看到的,这就是服务,它只是一个基本的控制器,用于验证来自“Create”触发器的请求参数,然后委托给Dataflow服务
@PostMapping(value = "/dataflow", produces = {MediaType.APPLICATION_JSON_VALUE})
    public ResponseEntity<Object> triggerDataFlowJob(@RequestBody Map<String, Object> body) {
        Map<String, String> requestParams = getRequestParams(body);
        log.atInfo().log("Body %s", requestParams);

        String bucket = requestParams.get("bucket");
        String fileName = requestParams.get("name");

        if (Objects.isNull(bucket) || Objects.isNull(fileName)) {
            AuditLogger.log(AuditCode.INVALID_CLOUD_STORAGE_REQUEST.getCode(), AuditCode.INVALID_CLOUD_STORAGE_REQUEST.getAuditText());
            return ResponseEntity.accepted().build();
        }

        log.atInfo().log("Triggering a Dataflow job, using Cloud Storage bucket: %s --> and file %s", bucket, fileName);
        try {
                return DataflowTransport
                .newDataflowClient(options)
                .build()
                .projects()
                .locations()
                .flexTemplates()
                .launch(gcpProjectIdProvider.getProjectId(),
                        dataflowProperties.getRegion(),
                        launchFlexTemplateRequest)
                .execute();
        } catch (Exception ex) {
            if (ex instanceof GoogleJsonResponseException && ((GoogleJsonResponseException) ex).getStatusCode() == 409) {
                log.atInfo().log("Dataflow job already triggered using Cloud Storage bucket: %s --> and file %s", bucket, fileName);
            } else {
                log.atSevere().withCause(ex).log("Error while launching dataflow jobs");
                AuditLogger.log(AuditCode.LAUNCH_DATAFLOW_JOB.getCode(), AuditCode.LAUNCH_DATAFLOW_JOB.getAuditText());
            }
        }

        return ResponseEntity.accepted().build();
    }

有没有一种方法可以直接集成谷歌桶触发器与数据流?

m0rkklqb

m0rkklqb1#

当文件上传到Cloud Storage时,可以触发带有事件弧的云函数V2。
然后在这个云函数中,您可以触发一个Dataflow作业。

gcloud functions deploy your_function_name \
--gen2 \
--trigger-event-filters="type=google.cloud.storage.object.v1.finalized" \
--trigger-event-filters="bucket=YOUR_STORAGE_BUCKET
  • 在Cloud函数中,您将使用code sample触发Dataflow作业,如下所示:
def startDataflowProcess(data, context):
    from googleapiclient.discovery import build
    #replace with your projectID
    project = "grounded-pivot-266616"
    job = project + " " + str(data['timeCreated'])
    #path of the dataflow template on google storage bucket
    template = "gs://sample-bucket/sample-template"
    inputFile = "gs://" + str(data['bucket']) + "/" + str(data['name'])
    #user defined parameters to pass to the dataflow pipeline job
    parameters = {
        'inputFile': inputFile,
    }
    #tempLocation is the path on GCS to store temp files generated during the dataflow job
    environment = {'tempLocation': 'gs://sample-bucket/temp-location'}

    service = build('dataflow', 'v1b3', cache_discovery=False)
    #below API is used when we want to pass the location of the dataflow job
    request = service.projects().locations().templates().launch(
        projectId=project,
        gcsPath=template,
        location='europe-west1',
        body={
            'jobName': job,
            'parameters': parameters,
            'environment':environment
        },
    )
    response = request.execute()
    print(str(response))

这个云函数显示了一个Python的例子,但是如果你愿意,你可以保留你的Java逻辑。

相关问题