在clojure中使用泛型类的类型提示

dxpyg8gm  于 2021-06-24  发布在  Flink
关注(0)|答案(2)|浏览(416)

我试图从运行在clojure中的apache flink中得到一个小例子,但是现在我被卡住了,因为clojure中的类型暗示和flink中的一些奇怪的怪癖。
这是我的密码:

(ns pipeline.core
 (:import
 (org.apache.flink.api.java ExecutionEnvironment)
 (org.apache.flink.api.common.functions FlatMapFunction)
 (org.apache.flink.api.java.tuple Tuple2)
 (org.apache.flink.util Collector)
 (java.lang String)))

(def flink-env (ExecutionEnvironment/createLocalEnvironment))

(def dataset (.fromElements flink-env (to-array ["please test me"])))

(defn tokenizer [] (reify FlatMapFunction
                 ( flatMap [this value collector] 
                   (println value))))

(.flatMap dataset (tokenizer))

如果我不提供类型提示,我会从flink api中得到一个错误:

Caused by: java.lang.IllegalArgumentException: The types of the interface org.apache.flink.api.common.functions.FlatMapFunction could not be inferred. Support for synthetic interfaces, lambdas, and generic types is limited at this point.
at org.apache.flink.api.java.typeutils.TypeExtractor.getParameterType(TypeExtractor.java:662)

如果我提供类型提示:

(defn tokenizer [] (reify FlatMapFunction
                 ( ^void flatMap [this ^String value ^Collector collector] 
                   (println value))))

我从clojure编译器中得到一个错误:

Caused by: java.lang.IllegalArgumentException: Can't find matching method: flatMap, leave off hints for auto match.
at clojure.lang.Compiler$NewInstanceMethod.parse(Compiler.java:8065)

有没有办法用泛型类在clojure中添加类型提示?应该是这样的:

(defn tokenizer [] (reify FlatMapFunction
                 ( ^void flatMap [this ^String value ^Collector<Tuple2<String, Integer>> collector] 
                   (println value))))

但这行不通。有什么想法吗?
lein配置如下所示:

(defproject pipeline "0.1.0-SNAPSHOT"
 :description "FIXME: write description"
 :url "http://example.com/FIXME"
 :license {:name "Eclipse Public License"
        :url "http://www.eclipse.org/legal/epl-v10.html"}
 :dependencies [[org.clojure/clojure "1.7.0"]               
             [org.apache.flink/flink-java "0.9.0"]              
             ]
  :aot :all)
lnvxswe2

lnvxswe21#

作为此注解的后续,在clojure中使用泛型类的类型提示
对于最新的flink版本(在1.6.1上测试),您需要定义一个自定义类,否则会出现如下错误:

Exception in thread "main" java.lang.IllegalArgumentException: No matching method found: returns for class org.apache.flink.api.java.operators.FlatMapOperator, compiling:(WordCount.clj:69:13)

自定义类:

package org.apache.flink.java;

import org.apache.flink.api.java.tuple.Tuple2;

public class WordCountTuple extends Tuple2<String, Integer> {

}

clojure代码

(ns org.apache.flink.clojure.WordCount
  (:import
   (org.apache.flink.api.common.functions FlatMapFunction)
   (org.apache.flink.api.java DataSet)
   (org.apache.flink.api.java ExecutionEnvironment)
   (org.apache.flink.api.java.tuple Tuple2)
   (org.apache.flink.java WordCountTuple)
   (org.apache.flink.util Collector)
   (java.lang String))
  (:require [clojure.string :as str])
  (:gen-class))

(def flink-env (ExecutionEnvironment/getExecutionEnvironment))

(def text (.fromElements flink-env (to-array ["please test me and me too"])))

(deftype tokenizer [] FlatMapFunction
         (flatMap [this value collector]
           (doseq [v (str/split value #"\s")]
             (.collect collector (Tuple2. v (int 1))))))

(def tokens (.returns (.flatMap text (tokenizer.)) WordCountTuple))

(def counts (.sum (.groupBy tokens (int-array [0])) 1))

(defn -main []
  (.print counts))

这里是工作示例https://github.com/guillaume/flink-external

r7xajy2e

r7xajy2e2#

clojure无法处理反射,因此需要通过flink方法手动指定返回类型 returns .

(.returns (.flatMap dataset (tokenizer)) String)

此外,您需要使用 deftype 定义 tokenizer 并在使用时示例化一个新对象,因为flink无法处理匿名类:

(deftype tokenizer [] FlatMapFunction
                      (flatMap [this value collector] 
                        (println value)))

(.flatMap dataset (tokenizer.))

下面是一个完整的“字数计算示例”,可以打包到jar中执行。
注意类型提示和类型转换。为了 tokenizer 输出 (int 1) 是必需的,否则 Long 会是第二种 Tuple2 . 此外,我们使用一个字符串来声明 tokenizer (类类型是不够的,因为还必须指定反射类型)。最后,我们需要输入提示 (int-array [0]) 以解决 groupBy (没有它,这个方法对clojure编译器来说是不明确的)。

(ns org.apache.flink.flink-clojure.WordCount
 (:import
 (org.apache.flink.api.common.functions FlatMapFunction)
 (org.apache.flink.api.java DataSet)
 (org.apache.flink.api.java ExecutionEnvironment)
 (org.apache.flink.api.java.tuple Tuple2)
 (org.apache.flink.util Collector)
 (java.lang String))
 (:require [clojure.string :as str])
 (:gen-class))

(def flink-env (ExecutionEnvironment/createLocalEnvironment))

(def text (.fromElements flink-env (to-array ["please test me and me too"])))

(deftype tokenizer [] FlatMapFunction
                      (flatMap [this value collector]
                        (doseq [v (str/split value #"\s")]
                          (.collect collector (Tuple2. v (int 1))))))

(def tokens (.returns (.flatMap text (tokenizer.)) "Tuple2<String,Integer>"))

(def counts (.sum (.groupBy tokens (int-array [0])) 1))

(defn -main []
  (.print counts)
)

相关问题