Apache Flink -无法使用Int或Long泛型参数编译KeyedOneInputStreamOperatorTestHarness

ffvjumwh  于 2022-12-09  发布在  Apache
关注(0)|答案(2)|浏览(105)

The bellow mentioned Apache Flink test code doesn't compile in Scala when I want to use KeyedOneInputStreamOperatorTestHarness[K, IN, OUT] with Int or Long as the key (K) parameter. The same code can be compiled with generic parameters of type STRING, INTEGER or java.lang.Long:

import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.operators.KeyedProcessOperator
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness
import org.apache.flink.streaming.api.scala._
import org.junit.Test

case class TaxiRide(rideId: Int)

class RideKeySelector extends KeySelector[TaxiRide, Int] {
  override def getKey(in: TaxiRide): Int = in.rideId
}

class RideTests {
  private def setupHarness(function: KeyedProcessFunction[Int, TaxiRide, Int]): KeyedOneInputStreamOperatorTestHarness[Int, TaxiRide, Int] = {
    val operator: KeyedProcessOperator[Int, TaxiRide, Int] = new KeyedProcessOperator(function)

    val testHarness: KeyedOneInputStreamOperatorTestHarness[Int, TaxiRide, Int] =
      new KeyedOneInputStreamOperatorTestHarness(operator, new RideKeySelector(), Types.INT)

    testHarness.setup()
    testHarness.open()

    testHarness
  }
}

Build error:

type mismatch;
 found   : org.example.RideKeySelector
 required: org.apache.flink.api.java.functions.KeySelector[org.example.TaxiRide,Int]
      new KeyedOneInputStreamOperatorTestHarness(operator, new RideKeySelector(), Types.INT)

I created the project from the following Maven archetype:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-quickstart-scala</artifactId>
    <version>1.14.2</version>
</dependency>

I added the following test dependencies to pom.xml in IntelliJ IDEA:

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-test-utils_2.11</artifactId>
        <version>1.14.2</version>    
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime</artifactId>
        <version>1.14.2</version>    
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>1.14.2</version>    
        <scope>test</scope>    
        <classifier>tests</classifier>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_2.11</artifactId>
        <version>1.14.2</version>
        <scope>test</scope>
        <classifier>tests</classifier>
    </dependency>
</dependencies>

I tried to replace the key selector with a lambda:

val testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, (ride: TaxiRide) => ride.rideId, Types.INT)

The build error changes to:

overloaded method constructor KeyedOneInputStreamOperatorTestHarness with alternatives:
  (x$1: org.apache.flink.streaming.api.operators.OneInputStreamOperator[IN,OUT],x$2: org.apache.flink.api.java.functions.KeySelector[IN,K],x$3: org.apache.flink.api.common.typeinfo.TypeInformation[K],x$4: org.apache.flink.runtime.operators.testutils.MockEnvironment)org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness[K,IN,OUT] <and>
  (x$1: org.apache.flink.streaming.api.operators.OneInputStreamOperator[IN,OUT],x$2: org.apache.flink.api.java.functions.KeySelector[IN,K],x$3: org.apache.flink.api.common.typeinfo.TypeInformation[K])org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness[K,IN,OUT] <and>
  (x$1: org.apache.flink.streaming.api.operators.StreamOperatorFactory[OUT],x$2: org.apache.flink.api.java.functions.KeySelector[IN,K],x$3: org.apache.flink.api.common.typeinfo.TypeInformation[K])org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness[K,IN,OUT] <and>
  (x$1: org.apache.flink.streaming.api.operators.StreamOperatorFactory[OUT],x$2: org.apache.flink.api.java.functions.KeySelector[IN,K],x$3: org.apache.flink.api.common.typeinfo.TypeInformation[K],x$4: Int,x$5: Int,x$6: Int)org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness[K,IN,OUT] <and>
  (x$1: org.apache.flink.streaming.api.operators.OneInputStreamOperator[IN,OUT],x$2: org.apache.flink.api.java.functions.KeySelector[IN,K],x$3: org.apache.flink.api.common.typeinfo.TypeInformation[K],x$4: Int,x$5: Int,x$6: Int)org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness[K,IN,OUT]
 cannot be applied to (org.apache.flink.streaming.api.operators.KeyedProcessOperator[Int,org.example.TaxiRide,Int], org.example.TaxiRide => Integer, org.apache.flink.api.common.typeinfo.TypeInformation[Integer])
     testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, (ride: TaxiRide) => ride.rideId, Types.INT)

It looks like that it cannot accept Scala types, why is it so?

ozxc1zmp

ozxc1zmp1#

Parameterised types are connected to a concept called type variance .
When you have a Wrapper which take a type parameter A , then your Wrapper can either be invariant , covariant or contravariant with respective to type A .
invariant :: types Wrapper[A] and Wrapper[B] simply do not care about type relationship between A and B . There will simply be no relation between Wrapper[A] and Wrapper[B] .
covariant :: given a type B which is sub type of another type A (read as B extends A ), then Wrapper[B] will also be a sub type of Wrapper[A] (read as Wrapper[B] extends Wrapper[A] ).
contravariant :: given a type B which is sub type of another type A (read as B extends A ), then Wrapper[B] will be a super type of Wrapper[A] (read as Wrapper[A] extends Wrapper[B] ).
Java only allows invariant type parameters while Scala allows you to choose the variance as required using Wrapper[A] for invariant , Wrapper[+A] for covariant and Wrapper[-A] for contravariant relations.
So, for any paramterised type Wrapper[T] implemented in Java, Wrapper[A] and Wrapper[B] will have no relation, irrespective of any existing relationship between A and B .
As KeyedOneInputStreamOperatorTestHarness is implemented in Java, it is invariant with respect to relationship between RideKeySelector and KeySelector[TaxiRide, Int] .
While you don't need to worry about variance in many cases when you are dealing with invariant type parameter at covariant places and things just work coincidently but they almost never work when dealing with invariant type parameter at contravariant place.
But when it works you have to remember that it's just a coincidence that the type was not used at any contravariant places, and you should not rely on it working, as the developers never intended it. It might change with one extra addition of a contravariant used to the class in future releases.
This is exactly the case here, the second type parameter In of KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> is used at naturally contravariant places in class KeyedOneInputStreamOperatorTestHarness .
As for the overloaded method error, that is because the constructor for KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> wants a KeySelector<IN, K> . Thus KeyedOneInputStreamOperatorTestHarness<Int, TaxiRide, Int> wants a KeySelector<TaxiRide, Int> and not a scala.Function1[TaxiRide, Int] as you are providing it in following line,

val testHarness = 
  new KeyedOneInputStreamOperatorTestHarness(
    operator,
   (ride: TaxiRide) => ride.rideId,
   Types.INT
)

Now, You just need to provide the proper instances.

val rideKeySelector = new KeySelector[TaxiRide, Int] {
  @throws(classOf[Exception])
  override def getKey(value: TaxiRide): Int = value.rideId
}

val testHarness = 
  new KeyedOneInputStreamOperatorTestHarness(
    operator,
   rideKeySelector,
   Types.INT
)

I will additionally suggest that you just simply drop the usage of Int (and simply use java.lang.Integer ) when dealing with type intensive Java libraries. Interactions between Scala's Int and Java's int and Integer have enough intricacies to write a full book.

hyrbngr7

hyrbngr72#

我完全同意@sarveshseri,但是,我会把它换成其他的词。
您传递的自定义对象破坏了类型检查,正如@sarveshseri所述,类型可以变化,因此Java不喜欢正在发生的事情。
你必须在你的管道中创建Tupples,它由Integer、String和Long等基本类型组成。我不知道你需要从文档中查看的完整列表。在这种情况下,你可以创建复杂的Tupples来传递你需要的数据,在你测试的函数之前进行解析。如果你不想在函数和键选择器中一次又一次地解析对象,你就需要这样做。
下面是一个键选择器的例子,你需要在你的例子中弄清楚它:

val keySelector = new KeySelector[((String, Long), String), String] {
      @throws(classOf[Exception])
      override def getKey(value: ((String, Long), String)): String = value._1._1
    }

然后,您的流运算符可以是:

harness = new KeyedOneInputStreamOperatorTestHarness[String, ((String, Long), String), String](new KeyedProcessOperator(processFunction), keySelector, Types.STRING)

希望这能帮上忙干杯。

相关问题