我在Kafka的宇宙里是新来的,我在这里真的很累。所以,任何帮助都将不胜感激。
我使用下面的ksql语句从kafka流创建了一个表:
CREATE TABLE calc AS
SELECT id, datetime, count(*)
FROM streamA
GROUP BY id, datetime
HAVING count(*) = total;
其中“streama”是由“topica”创建的流
我目前正在使用:
java 8,
Spring Bootv2.2.9
我的pom.xml看起来像:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!-- Packaging -->
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
<properties>
<spring-cloud.version>Hoxton.SR8</spring-cloud.version>
</properties>
<!-- Versioning -->
<groupId>some.name</groupId>
<artifactId>kafka.project</artifactId>
<version>2020.2.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.9.RELEASE</version>
<relativePath />
</parent>
<!-- Meta-data -->
<name>[${project.artifactId}]</name>
<description>Kafka Project</description>
<!-- Dependencies -->
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<!-- Build settings -->
<build>
<!-- Plugins -->
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
所以,我有两个问题:
有没有办法通过kafka streams api访问该表?
我可以通过我的应用程序而不是ksql来做类似的事情(例如创建那个表)吗?
先谢谢你
更新谢谢你的建议shrey jakhmola(从spring boot应用程序运行ksql的方法是什么),但是我有一个大的数据集需要定期访问。我认为这个解决办法不太理想。
@joshua oliphant,是的,这个表是由一个主题创建的流生成的。
1条答案
按热度按时间xjreopfe1#
有没有办法通过kafka streams api访问该表?
表
calc
将由名为CALC
. 如果需要,您可以在应用程序中自由使用此主题。使用标准消费者流或Kafka流。但是,如果您只想查询表的当前状态,那么可以使用ksqldb的pull查询。这些允许您从ksqldb正在构建的表中提取行。该功能是基本的,因为它不是ksqldb提供的核心流式sql的一部分,但满足一些用例。
如果您需要的东西超出了这一点,那么您还有其他选择:
您可以将结果注入您选择的更传统的sql系统,例如postgres,并对其进行查询(您可以使用ksql的
CREATE SINK CONNECTOR
将数据导出到postgres)。您可以使用标准的kafka客户端在自己的应用程序中使用数据(不过,只有当应用程序的每个示例都可以保存表中的所有数据时,这种方法才能很好地工作)。
您可以在应用程序中使用kafka流来使用表。这样做的好处是,应用程序的多个示例可以聚集在一起,这样每个示例只消耗表中的一部分数据。然后,您可能希望使用kafka streams交互式查询来访问表的当前状态
我可以通过我的应用程序而不是ksql来做类似的事情(例如创建那个表)吗?
如果你想把ksqldb从公式中去掉,那么是的,ksqldb在内部使用kafka流,所以你可以用ksqldb做任何事情,你也可以直接用kafka流。
sql类:
将Map到以下内容(粗略代码):