我想在运行在kubernetes上的elasticsearch集群中播种数据。我的数据在bigquery上,我想使用dataflow(python)加载数据。python-apache-beam版本似乎没有ElasticSearch接收器。我在dataflow中编写了我自己的elasticsearch writer,但是我需要从kubernetes集群转发我的elasticsearch端口。所以我需要安装googlecloudsdk和kubectl,这样我就可以转发端口写我的数据,然后关闭它回来。当我在本地运行作业时,我的代码似乎工作得很好,但我似乎无法在workers上安装googlecloudsdk和kubectl。
当我在本地运行作业时,我的代码似乎工作得很好,但我似乎无法在workers上安装googlecloudsdk和kubectl。
这些是在subacces.popen的setup.py中调用的命令
['export', 'CLOUD_SDK_REPO="cloud-sdk-$(lsb_release', '-c', '-s)"'],
['echo', '"deb', 'https://packages.cloud.google.com/apt', '$CLOUD_SDK_REPO', 'main"', '|', 'sudo', 'tee', '-a', '/etc/apt/sources.list.d/google-cloud-sdk.list'],
['sudo', 'rm', '/etc/apt/sources.list.d/partner.list'],
['sudo', 'apt-get', 'install', 'google-cloud-sdk', 'kubectl']
这是我在start\u包中端口转发elasticsearch服务的方法
def _open_connection(self):
tries = 0
connected = False
while tries <= 3 and not connected:
tries += 1
try:
res = requests.get('http://{0}:{1}'.format(self.host, self.port))
connected = (res.status_code == 200)
except Exception as e:
logging.warning(e)
subprocess.check_call('gcloud container clusters get-credentials {0}'.format(ES_CLUSTER_NAME), shell=True)
try:
subprocess.check_call('kubectl version', shell=True)
except exception as ee:
logging.warning(ee)
subprocess.check_call('gcloud components install kubectl', shell=True)
subprocess.call('kubectl port-forward elasticsearch-0 {0}:{0} & disown'.format(self.port), shell=True)
time.sleep(3)
return connected
我期望这些命令(我尝试了一些变体)在每个worker上安装所需的包,但是安装一直失败。
1条答案
按热度按时间t5fffqht1#
我修复了这个问题,跳过了端口转发,而是在我的elasticsearch端口上实现了一个内部负载均衡器。这样我的数据流工作者就可以直接连接到内部ip来写入数据。