从kafka控制台生成器将数据读入flink程序

xhv8bpkk  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(382)

我创建了一个名为 test 并使用控制台生成器在控制台中编写了一些字符串。

  1. ./bin/kafka-console-producer.sh --topic test --broker-list localhost:9092

幸运的是,我能够通过使用 console-consumer . 现在,我想使用 console-consumer 在flink程序中使用下面的代码

  1. public class ReadFromKafka {
  2. public static void main(String[] args) throws Exception {
  3. // create execution environment
  4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5. Properties properties = new Properties();
  6. properties.setProperty("bootstrap.servers", "localhost:9092");
  7. properties.setProperty("zookeeper.connect", "localhost:2181");
  8. properties.setProperty("group.id", "test");
  9. DataStream<String> message = env.addSource(new FlinkKafkaConsumer08<String>("test", new SimpleStringSchema(),properties));
  10. message.map(new MapFunction<String, String>() {
  11. private static final long serialVersionUID = -6867736771747690202L;
  12. @Override
  13. public String map(String value) throws Exception {
  14. return " Value: " + value;
  15. }
  16. }).print();
  17. env.execute();
  18. } //main
  19. } //ReadFromKafka

pom.xml的内容如下

  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  3. <modelVersion>4.0.0</modelVersion>
  4. <groupId>org.stsffap</groupId>
  5. <artifactId>cep-monitoring</artifactId>
  6. <name>cep-monitoring</name>
  7. <version>1.0</version>
  8. <packaging>jar</packaging>
  9. <properties>
  10. <flink.version>1.0.1</flink.version>
  11. <slf4j.version>1.7.7</slf4j.version>
  12. <log4j.version>1.2.17</log4j.version>
  13. </properties>
  14. <dependencies>
  15. <dependency>
  16. <groupId>org.slf4j</groupId>
  17. <artifactId>slf4j-log4j12</artifactId>
  18. <version>${slf4j.version}</version>
  19. </dependency>
  20. <dependency>
  21. <groupId>log4j</groupId>
  22. <artifactId>log4j</artifactId>
  23. <version>${log4j.version}</version>
  24. </dependency>
  25. <dependency>
  26. <groupId>org.apache.flink</groupId>
  27. <artifactId>flink-streaming-java_2.10</artifactId>
  28. <version>${flink.version}</version>
  29. </dependency>
  30. <dependency>
  31. <groupId>org.apache.flink</groupId>
  32. <artifactId>flink-java</artifactId>
  33. <version>1.3.2</version>
  34. </dependency>
  35. <dependency>
  36. <groupId>org.apache.flink</groupId>
  37. <artifactId>flink-clients_2.10</artifactId>
  38. <version>1.3.2</version>
  39. </dependency>
  40. <dependency>
  41. <groupId>org.apache.flink</groupId>
  42. <artifactId>flink-cep_2.10</artifactId>
  43. <version>${flink.version}</version>
  44. </dependency>
  45. <dependency>
  46. <groupId>org.apache.flink</groupId>
  47. <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
  48. <version>1.3.2</version>
  49. </dependency>
  50. </dependencies>
  51. <build>
  52. <plugins>
  53. <plugin>
  54. <groupId>org.apache.maven.plugins</groupId>
  55. <artifactId>maven-compiler-plugin</artifactId>
  56. <version>3.1</version>
  57. <configuration>
  58. <source>1.8</source>
  59. <target>1.8</target>
  60. <compilerId>jdt</compilerId>
  61. </configuration>
  62. <dependencies>
  63. <dependency>
  64. <groupId>org.eclipse.tycho</groupId>
  65. <artifactId>tycho-compiler-jdt</artifactId>
  66. <version>0.21.0</version>
  67. </dependency>
  68. </dependencies>
  69. </plugin>
  70. </plugins>
  71. </build>
  72. </project>

每当我执行这个代码时,我都会遇到以下错误

  1. objc[892]: Class JavaLaunchHelper is implemented in both /Library/Java/JavaVirtualMachines/jdk1.8.0_144.jdk/Contents/Home/bin/java (0x109f654c0) and /Library/Java/JavaVirtualMachines/jdk1.8.0_144.jdk/Contents/Home/jre/lib/libinstrument.dylib (0x10afd44e0). One of the two will be used. Which one is undefined.
  2. Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/streaming/api/checkpoint/CheckpointedFunction
  3. at java.lang.ClassLoader.defineClass1(Native Method)
  4. at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
  5. at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
  6. at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
  7. at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
  8. at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
  9. at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
  10. at java.security.AccessController.doPrivileged(Native Method)
  11. at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
  12. at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  13. at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
  14. at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  15. at java.lang.ClassLoader.defineClass1(Native Method)
  16. at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
  17. at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
  18. at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
  19. at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
  20. at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
  21. at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
  22. at java.security.AccessController.doPrivileged(Native Method)
  23. at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
  24. at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  25. at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
  26. at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  27. at org.stsffap.cep.monitoring.ReadFromKafka.main(ReadFromKafka.java:24)
  28. Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
  29. at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
  30. at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  31. at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
  32. at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  33. ... 25 more

也是Kafka的一个版本,我使用下面的find by命令

  1. find ./libs/ -name \*kafka_\* | head -1 | grep -o '\kafka[^\n]*'

  1. kafka_2.11-0.9.0.0-javadoc.jar

我是否需要使用.8.x版本的Kafka来运行我的示例?

对意见和建议高度赞赏。提前谢谢。祝你好运!

gopyfrb3

gopyfrb31#

我的程序开始工作时做了以下修改,我把Kafka的版本更新到了.9.x

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
  4. <version>${flink.version}</version>
  5. </dependency>

我将flink版本从1.0.1升级到1.1.2,如下所示

  1. <properties>
  2. <!-- <flink.version>1.0.1</flink.version>-->
  3. <flink.version>1.1.2</flink.version>
  4. <slf4j.version>1.7.7</slf4j.version>
  5. <log4j.version>1.2.17</log4j.version>
  6. </properties>
  7. <dependencies>
  8. <dependency>
  9. <groupId>org.slf4j</groupId>
  10. <artifactId>slf4j-log4j12</artifactId>
  11. <version>${slf4j.version}</version>
  12. </dependency>
  13. <dependency>
  14. <groupId>log4j</groupId>
  15. <artifactId>log4j</artifactId>
  16. <version>${log4j.version}</version>
  17. </dependency>
  18. <dependency>
  19. <groupId>org.apache.flink</groupId>
  20. <artifactId>flink-streaming-java_2.10</artifactId>
  21. <version>${flink.version}</version>
  22. </dependency>
  23. <dependency>
  24. <groupId>org.apache.flink</groupId>
  25. <artifactId>flink-java</artifactId>
  26. <version>${flink.version}</version>
  27. </dependency>
  28. <dependency>
  29. <groupId>org.apache.flink</groupId>
  30. <artifactId>flink-clients_2.10</artifactId>
  31. <version>${flink.version}</version>
  32. </dependency>
展开查看全部

相关问题