pyflink连接器kafka

jtoj6r0c  于 2021-07-15  发布在  Flink
关注(0)|答案(0)|浏览(330)

我是像flink和kafka这样的框架的初学者。我想做的是一个以Kafka和Flink为基础设施核心的生产者和消费者。信息由kafka insert管理到一个主题中,然后从flink(python-so-pyflink)管理的信息中获取这些信息,然后将修改后的信息发送到另一个kafka主题,该主题带有一个使用者kakfa。我在这个基础设施中错过的是pyflink和kafka的设置。如何建立此链接?我在网上搜索指南,但找到的代码不起作用。有人能帮我一把吗?
下面我写了我的Kafka制作人.py

from time import sleep
from json import dumps
from kafka import KafkaProducer, errors
import csv
try:
  producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda x: dumps(x).encode('utf-8')
)
except errors.NoBrokersAvailable:
    print("Waiting for brokers to become available")
    sleep(10)

reader = csv.DictReader( csvfile)
j=0
for row in reader:
  print("Iteration", j)

  producer.send('batch_stream', value=row)

下面我写了我的consumerkafka.py

from kafka import KafkaConsumer
from json import loads
from time import sleep
consumer = KafkaConsumer(
   'output_batch',
   bootstrap_servers=['localhost:9092'],
   auto_offset_reset='earliest',
   enable_auto_commit=True,
   group_id='my-group-id',
   value_deserializer=lambda x: loads(x.decode('utf-8'))
)
for event in consumer:
 event_data = event.value
 # Do whatever you want
 print(event_data)
 sleep(1)

如何使用kafka开发pyflink连接器,以便从批处理流主题中获取信息,然后将信息操纵并刷新到输出批处理中?

import os
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, CsvTableSink, DataTypes, EnvironmentSettings
from pyflink.table.descriptors import Schema, Rowtime, Json, Kafka, Elasticsearch
from pyflink.table.window import Tumble
from pyflink.table.udf import udf
import pandas as pd

def register_information_source(st_env):
  st_env.connect(Kafka()
        .version("universal")
        .topic("batch_stream")
        .start_from_earliest()
        .property("zookeeper.connect", "zookeeper:2181")
        .property("bootstrap.servers", "kafka:9092")) \
    .with_format( Json()
        .fail_on_missing_field(True)
        .schema(DataTypes.ROW([
        DataTypes.FIELD("Id", DataTypes.BIGINT()),
        DataTypes.FIELD("X", DataTypes.FLOAT()),
        DataTypes.FIELD("Y", DataTypes.FLOAT()),
        DataTypes.FIELD("ANGLE", DataTypes.FLOAT()),
        DataTypes.FIELD("Timestamp", DataTypes.STRING())]))) \
    .with_schema(Schema()
        .field("Id", DataTypes.BIGINT())
        .field("X", DataTypes.FLOAT())
        .field("Y", DataTypes.FLOAT())
        .field("ANGLE", DataTypes.FLOAT())) \
    .in_append_mode() \
    .create_temporary_table("source")

def register_information_sink(st_env):
  st_env \
    .connect(  # declare the external system to connect to
    Kafka()
        .version("universal")
        .topic("output_batch")
        .property("zookeeper.connect", "zookeeper:2181")
        .property("bootstrap.servers", "kafka:9092")) \
    .with_format(  # declare a format for this system
    Json()
        .fail_on_missing_field(True)
        .schema(DataTypes.ROW([ DataTypes.FIELD("Id", DataTypes.BIGINT()),
        DataTypes.FIELD("X", DataTypes.FLOAT()),
        DataTypes.FIELD("Y", DataTypes.FLOAT()),
        DataTypes.FIELD("ANGLE", DataTypes.FLOAT()),
        DataTypes.FIELD("Timestamp", DataTypes.STRING())]))) \
    .with_schema(Schema()
        .field("Id", DataTypes.BIGINT())
        .field("X", DataTypes.FLOAT())
        .field("Y", DataTypes.FLOAT())
        .field("ANGLE", DataTypes.FLOAT())) \
    .in_append_mode() \
    .create_temporary_table("sink")
def job():
  s_env = StreamExecutionEnvironment.get_execution_environment()
  s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
  s_env.set_parallelism(1)

# use blink table planner

st_env = StreamTableEnvironment \
.create(s_env, environment_settings=EnvironmentSettings
    .new_instance()
    .in_streaming_mode()
    .use_blink_planner().build())

# register source and sink

register_information_source(st_env)
register_information_sink(st_env)

# query

st_env.from_path("source").insert_into("sink")

# execute

st_env.execute("FLinkMangment") 
if __name__ == '__main__':
  job()

我试着像上面那样发展,但是有一些问题。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题