apache beam:readfromkafka抛出grpc.\u channel.\u InactiveVerpcError

92dk7w1h  于 2021-07-12  发布在  Spark
关注(0)|答案(0)|浏览(307)

我正在尝试从卡卡那里接收数据并进行处理。首先,我尝试了Kafka与Spark流。spark流媒体与Kafka制作人的合作很好localhost:9092". 所以我确信我本地电脑上的spark和docker的Kafka制作人合作得很好。
但是,当我尝试与KafkaApache梁,我得到一个错误。我验证了spark作业服务器是否在docker中成功运行。不过,我还是得到一个错误,如下所示。我感谢你的帮助。

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions,  StandardOptions
from environment_variables import *
from apache_beam.io.kafka import ReadFromKafka

def run_pipeline():
  options = PipelineOptions(runner='SparkRunner', job_endpoint = 'localhost:8099',environment_type='LOOPBACK')
  options.view_as(StandardOptions).streaming = True
  with beam.Pipeline(options=options) as p:
      p | "kafka" >> beam.io.kafka.ReadFromKafka(consumer_config={"bootstrap.servers": 'localhost:9092'},topics=[TOPICS], expansion_service='localhost:8097')
      p | "print" >> beam.Map(print)

run_pipeline()

错误:

Traceback (most recent call last):
  File "X:/Git_repo/project_red/Beam_streaming/junk6.py", line 18, in <module>
    run_pipeline()
  File "X:/Git_repo/project_red/Beam_streaming/junk6.py", line 16, in run_pipeline
    p | "kafka" >> beam.io.kafka.ReadFromKafka(consumer_config={"bootstrap.servers": 'localhost:9092'},topics=[TOPICS], expansion_service='localhost:8097')
  File "X:\Git_repo\project_red\venv\lib\site-packages\apache_beam\transforms\ptransform.py", line 1058, in __ror__
    return self.transform.__ror__(pvalueish, self.label)
  File "X:\Git_repo\project_red\venv\lib\site-packages\apache_beam\transforms\ptransform.py", line 573, in __ror__
    result = p.apply(self, pvalueish, label)
  File "X:\Git_repo\project_red\venv\lib\site-packages\apache_beam\pipeline.py", line 646, in apply
    return self.apply(transform, pvalueish)
  File "X:\Git_repo\project_red\venv\lib\site-packages\apache_beam\pipeline.py", line 689, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
  File "X:\Git_repo\project_red\venv\lib\site-packages\apache_beam\runners\runner.py", line 188, in apply
    return m(transform, input, options)
  File "X:\Git_repo\project_red\venv\lib\site-packages\apache_beam\runners\runner.py", line 218, in apply_PTransform
    return transform.expand(input)
  File "X:\Git_repo\project_red\venv\lib\site-packages\apache_beam\transforms\external.py", line 316, in expand
    response = service.Expand(request)
  File "X:\Git_repo\project_red\venv\lib\site-packages\grpc\_channel.py", line 923, in __call__
    return _end_unary_response_blocking(state, call, False, None)
  File "X:\Git_repo\project_red\venv\lib\site-packages\grpc\_channel.py", line 826, in _end_unary_response_blocking
    raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
    status = StatusCode.UNAVAILABLE
    details = "failed to connect to all addresses"
    debug_error_string = "{"created":"@1615358925.981000000","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":5397,"referenced_errors":[{"created":"@1615358925.981000000","description":"failed to connect to all addresses","file":"src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc","file_line":398,"grpc_status":14}]}"
>

作业服务器在docker上运行

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题