在“dataflowrunner”worker上安装kubectl

ggazkfy8  于 2021-06-15  发布在  ElasticSearch
关注(0)|答案(1)|浏览(424)

我想在运行在kubernetes上的elasticsearch集群中播种数据。我的数据在bigquery上,我想使用dataflow(python)加载数据。python-apache-beam版本似乎没有ElasticSearch接收器。我在dataflow中编写了我自己的elasticsearch writer,但是我需要从kubernetes集群转发我的elasticsearch端口。所以我需要安装googlecloudsdk和kubectl,这样我就可以转发端口写我的数据,然后关闭它回来。当我在本地运行作业时,我的代码似乎工作得很好,但我似乎无法在workers上安装googlecloudsdk和kubectl。
当我在本地运行作业时,我的代码似乎工作得很好,但我似乎无法在workers上安装googlecloudsdk和kubectl。
这些是在subacces.popen的setup.py中调用的命令

  1. ['export', 'CLOUD_SDK_REPO="cloud-sdk-$(lsb_release', '-c', '-s)"'],
  2. ['echo', '"deb', 'https://packages.cloud.google.com/apt', '$CLOUD_SDK_REPO', 'main"', '|', 'sudo', 'tee', '-a', '/etc/apt/sources.list.d/google-cloud-sdk.list'],
  3. ['sudo', 'rm', '/etc/apt/sources.list.d/partner.list'],
  4. ['sudo', 'apt-get', 'install', 'google-cloud-sdk', 'kubectl']

这是我在start\u包中端口转发elasticsearch服务的方法

  1. def _open_connection(self):
  2. tries = 0
  3. connected = False
  4. while tries <= 3 and not connected:
  5. tries += 1
  6. try:
  7. res = requests.get('http://{0}:{1}'.format(self.host, self.port))
  8. connected = (res.status_code == 200)
  9. except Exception as e:
  10. logging.warning(e)
  11. subprocess.check_call('gcloud container clusters get-credentials {0}'.format(ES_CLUSTER_NAME), shell=True)
  12. try:
  13. subprocess.check_call('kubectl version', shell=True)
  14. except exception as ee:
  15. logging.warning(ee)
  16. subprocess.check_call('gcloud components install kubectl', shell=True)
  17. subprocess.call('kubectl port-forward elasticsearch-0 {0}:{0} & disown'.format(self.port), shell=True)
  18. time.sleep(3)
  19. return connected

我期望这些命令(我尝试了一些变体)在每个worker上安装所需的包,但是安装一直失败。

t5fffqht

t5fffqht1#

我修复了这个问题,跳过了端口转发,而是在我的elasticsearch端口上实现了一个内部负载均衡器。这样我的数据流工作者就可以直接连接到内部ip来写入数据。

相关问题