我想把Flink的数据写给Druid。我知道宁静,使上述方案,但我找不到用java编写的工作项目。如果有人达到上述情况,我真的很感激如果你能指导我如何解决我的问题。谢谢。
hfsqlsce1#
假设apache flink和druid在本地运行。你所需要做的就是按照链接中的定义为Druid启用宁静服务器https://druid.apache.org/docs/latest/tutorials/tutorial-tranquility.html如果你的Druid宁静服务器配置设置正确。下面使用flink的spring引导java代码应该允许您使用httppost方法将数据推入druid。flinkdruidapp.java文件
@SpringBootApplication public class FlinkDruidApp { private static String url = "http://localhost:8200/v1/post/wikipedia"; private static RestTemplate template; private static HttpHeaders headers; FlinkDruidApp() { template = new RestTemplate(); headers = new HttpHeaders(); headers.setAccept(Arrays.asList(MediaType.APPLICATION_JSON)); headers.setContentType(MediaType.APPLICATION_JSON); } public static void main(String[] args) throws Exception { SpringApplication.run(FlinkDruidApp.class, args); // Creating Flink Execution Environment ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //Define data source DataSet<String> data = env.readTextFile("/wikiticker-2015-09-12-sampled.json"); // Trasformation on the data data.map(x -> { return httpsPost(x).toString(); }).print(); } // http post method to post data in Druid private static ResponseEntity<String> httpsPost(String json) { HttpEntity<String> requestEntity = new HttpEntity<>(json, headers); ResponseEntity<String> response = template.exchange("http://localhost:8200/v1/post/wikipedia", HttpMethod.POST, requestEntity, String.class); return response; } @Bean public RestTemplate restTemplate() { return new RestTemplate(); } }
pom.xml文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.8.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.flinkdruid</groupId> <artifactId>FlinkDruid</artifactId> <version>0.0.1-SNAPSHOT</version> <name>FlinkDruid</name> <description>Flink Druid Connection</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.9.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>1.9.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
控制台上的输出
<200,{"result":{"received":1,"sent":1}},[Date:"Mon, 23 Sep 2019 13:29:39 GMT", Content-Type:"application/json; charset=UTF-8", Content-Length:"34", Server:"Jetty(9.2.5.v20141112)"]>
1条答案
按热度按时间hfsqlsce1#
假设apache flink和druid在本地运行。你所需要做的就是按照链接中的定义为Druid启用宁静服务器https://druid.apache.org/docs/latest/tutorials/tutorial-tranquility.html
如果你的Druid宁静服务器配置设置正确。下面使用flink的spring引导java代码应该允许您使用httppost方法将数据推入druid。
flinkdruidapp.java文件
pom.xml文件
控制台上的输出