Flink - org.apache.Kafka.common.serialization. ByteArrayDataizer不是org.apache.Kafka.common.serialization. Dataizer的示例

siv3szwd  于 2024-01-04  发布在  Apache
关注(0)|答案(1)|浏览(171)

1.创建一个docker-compose.yml文件(使用flink、Kafka、zookeeper)
1.创建Kafka Producer(使用faker python lib创建伪数据)
1.创建Flink Kafka Consumer,从上面的Kafka主题消费

KafkaProducer:

from kafka.admin import KafkaAdminClient, NewTopic
from kafka import KafkaProducer

from faker import Faker
import json
import time

# Define Kafka broker address
bootstrap_servers = ['localhost:9092']

# Define Kafka topic name and configuration
topic_name = 'topic-4'
num_partitions = 3
replication_factor = 1

# Create KafkaAdminClient instance
admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)

#admin_client.delete_topics(topics="topic-4")

# Create a NewTopic object with the topic properties
new_topic = NewTopic(name=topic_name, num_partitions=num_partitions, replication_factor=replication_factor)

fake = Faker()

# Create a Kafka producer instance
producer = KafkaProducer(bootstrap_servers="localhost:9092")

start_time = time.time()
message_count = 10

#send data for 10 min every 30 sec
while (time.time() - start_time) < 3600:
    for i in range(2):
        data = {
            "name": fake.name(),
            "address": fake.address(),
            "phone_number": fake.phone_number()
        }
        producer.send("topic-4", json.dumps(data).encode("utf-8"))
        print(f"Sent message {i+1}: {data}")
    time.sleep(10)

字符串
Flink Kafka消费者:

import json

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer
from pyflink.common.serialization import SimpleStringSchema

env = StreamExecutionEnvironment.get_execution_environment()
#env.set_runtime_mode(RuntimeExecutionMode.BATCH)
env.set_parallelism(1)

env.add_jars("file:///opt/homebrew/Cellar/apache-flink/1.16.1/libexec/lib/flink-connector-kafka-1.16.1.jar","file:///opt/homebrew/Cellar/apache-flink/1.16.1/libexec/lib/kafka-clients-3.2.3.jar")

kafka_consumer = FlinkKafkaConsumer(
    'topic-4',
     SimpleStringSchema(),
    properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'my-group'}
)

kafka_data_stream = env.add_source(kafka_consumer)

# do some processing on the data
result_stream = kafka_data_stream \
    .map(lambda x: json.loads(x)) \
    .filter(lambda x: 'name' in x) \
    .filter(lambda x: 'address' in x) \
    .filter(lambda x: 'phone_number' in x) \
    .print()

env.execute("Kafka-Flink Python Example")

错误:

File "/Users/Desktop/docker_project/myproject/app/flink_consumer.py", line 39, in <module>
    env.execute("Kafka-Flink Python Example")
  File "/Users/opt/anaconda3/lib/python3.9/site-packages/pyflink/datastream/stream_execution_environment.py", line 764, in execute
    return JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph))
  File "/Users/opt/anaconda3/lib/python3.9/site-packages/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
  File "/Users/opt/anaconda3/lib/python3.9/site-packages/pyflink/util/exceptions.py", line 146, in deco
    return f(*a, **kw)
  File "/Users/opt/anaconda3/lib/python3.9/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o10.execute.
: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
        at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
        at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:268)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
        at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1277)
        at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
        at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
        at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
        at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
        at akka.dispatch.OnComplete.internal(Future.scala:300)
        at akka.dispatch.OnComplete.internal(Future.scala:297)
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
        at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
        at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
        at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
        at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
        at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
        at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
        at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
        at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
        at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
        at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
        at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
        at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
        at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
        at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
        at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:256)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:247)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:240)
        at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:738)
        at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:715)
        at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78)
        at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:477)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309)
        at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at akka.actor.Actor.aroundReceive(Actor.scala:537)
        at akka.actor.Actor.aroundReceive$(Actor.scala:535)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
        at akka.actor.ActorCell.invoke(ActorCell.scala:548)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
        at akka.dispatch.Mailbox.run(Mailbox.scala:231)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
        ... 4 more
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:825)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:647)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:627)
        at org.apache.flink.streaming.connectors.kafka.internals.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:55)
        at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:574)
        at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
        at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726)
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
        at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer
        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:403)
        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:434)
        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:419)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:710)
        ... 18 more

docker-compose.yml

version: "2.2"
services:
  jobmanager:
    image: flink:latest
    ports:
      - "8081:8081"
    command: jobmanager
    # volumes:
    #     - /host/path/to/job/artifacts:/opt/flink/usrlib
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        parallelism.default: 2
    networks:
      - flink_kafka_network

  taskmanager:
    image: flink:latest
    depends_on:
      - jobmanager
    command: taskmanager
    scale: 1
    # volumes:
    #   - /host/path/to/job/artifacts:/opt/flink/usrlib
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2
        parallelism.default: 2

  zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    ports:
     - "2181:2181"
    networks:
     - flink_kafka_network

  kafka:
    image: wurstmeister/kafka
    ports:
     - "9092:9092"
    expose:
     - "9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      #KAFKA_CREATE_TOPICS: "topic_test:1:1"
    volumes:
     - /var/run/docker.sock:/var/run/docker.sock
    networks:
     - flink_kafka_network

networks:
  flink_kafka_network:
    driver: bridge

z9ju0rcb

z9ju0rcb1#

我也遇到了同样的问题,不过我通过升级jdk解决了,升级到jdk11后问题就解决了,Flink官方文档也给出了jdk版本要求。DataStream API说明#

相关问题