The bellow mentioned Apache Flink test code doesn't compile in Scala when I want to use KeyedOneInputStreamOperatorTestHarness[K, IN, OUT] with Int or Long as the key (K) parameter. The same code can be compiled with generic parameters of type STRING, INTEGER or java.lang.Long:
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.operators.KeyedProcessOperator
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness
import org.apache.flink.streaming.api.scala._
import org.junit.Test
case class TaxiRide(rideId: Int)
class RideKeySelector extends KeySelector[TaxiRide, Int] {
override def getKey(in: TaxiRide): Int = in.rideId
}
class RideTests {
private def setupHarness(function: KeyedProcessFunction[Int, TaxiRide, Int]): KeyedOneInputStreamOperatorTestHarness[Int, TaxiRide, Int] = {
val operator: KeyedProcessOperator[Int, TaxiRide, Int] = new KeyedProcessOperator(function)
val testHarness: KeyedOneInputStreamOperatorTestHarness[Int, TaxiRide, Int] =
new KeyedOneInputStreamOperatorTestHarness(operator, new RideKeySelector(), Types.INT)
testHarness.setup()
testHarness.open()
testHarness
}
}
Build error:
type mismatch;
found : org.example.RideKeySelector
required: org.apache.flink.api.java.functions.KeySelector[org.example.TaxiRide,Int]
new KeyedOneInputStreamOperatorTestHarness(operator, new RideKeySelector(), Types.INT)
I created the project from the following Maven archetype:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-quickstart-scala</artifactId>
<version>1.14.2</version>
</dependency>
I added the following test dependencies to pom.xml in IntelliJ IDEA:
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_2.11</artifactId>
<version>1.14.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>1.14.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.14.2</version>
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.14.2</version>
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
</dependencies>
I tried to replace the key selector with a lambda:
val testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, (ride: TaxiRide) => ride.rideId, Types.INT)
The build error changes to:
overloaded method constructor KeyedOneInputStreamOperatorTestHarness with alternatives:
(x$1: org.apache.flink.streaming.api.operators.OneInputStreamOperator[IN,OUT],x$2: org.apache.flink.api.java.functions.KeySelector[IN,K],x$3: org.apache.flink.api.common.typeinfo.TypeInformation[K],x$4: org.apache.flink.runtime.operators.testutils.MockEnvironment)org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness[K,IN,OUT] <and>
(x$1: org.apache.flink.streaming.api.operators.OneInputStreamOperator[IN,OUT],x$2: org.apache.flink.api.java.functions.KeySelector[IN,K],x$3: org.apache.flink.api.common.typeinfo.TypeInformation[K])org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness[K,IN,OUT] <and>
(x$1: org.apache.flink.streaming.api.operators.StreamOperatorFactory[OUT],x$2: org.apache.flink.api.java.functions.KeySelector[IN,K],x$3: org.apache.flink.api.common.typeinfo.TypeInformation[K])org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness[K,IN,OUT] <and>
(x$1: org.apache.flink.streaming.api.operators.StreamOperatorFactory[OUT],x$2: org.apache.flink.api.java.functions.KeySelector[IN,K],x$3: org.apache.flink.api.common.typeinfo.TypeInformation[K],x$4: Int,x$5: Int,x$6: Int)org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness[K,IN,OUT] <and>
(x$1: org.apache.flink.streaming.api.operators.OneInputStreamOperator[IN,OUT],x$2: org.apache.flink.api.java.functions.KeySelector[IN,K],x$3: org.apache.flink.api.common.typeinfo.TypeInformation[K],x$4: Int,x$5: Int,x$6: Int)org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness[K,IN,OUT]
cannot be applied to (org.apache.flink.streaming.api.operators.KeyedProcessOperator[Int,org.example.TaxiRide,Int], org.example.TaxiRide => Integer, org.apache.flink.api.common.typeinfo.TypeInformation[Integer])
testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, (ride: TaxiRide) => ride.rideId, Types.INT)
It looks like that it cannot accept Scala types, why is it so?
2条答案
按热度按时间ozxc1zmp1#
Parameterised types are connected to a concept called
type variance
.When you have a
Wrapper
which take a type parameterA
, then yourWrapper
can either beinvariant
,covariant
orcontravariant
with respective to typeA
.invariant
:: typesWrapper[A]
andWrapper[B]
simply do not care about type relationship betweenA
andB
. There will simply be no relation betweenWrapper[A]
andWrapper[B]
.covariant
:: given a typeB
which is sub type of another typeA
(read asB extends A
), thenWrapper[B]
will also be a sub type ofWrapper[A]
(read asWrapper[B] extends Wrapper[A]
).contravariant
:: given a typeB
which is sub type of another typeA
(read asB extends A
), thenWrapper[B]
will be a super type ofWrapper[A]
(read asWrapper[A] extends Wrapper[B]
).Java only allows
invariant
type parameters while Scala allows you to choose thevariance
as required usingWrapper[A]
forinvariant
,Wrapper[+A]
forcovariant
andWrapper[-A]
forcontravariant
relations.So, for any paramterised type
Wrapper[T]
implemented in Java,Wrapper[A]
andWrapper[B]
will have no relation, irrespective of any existing relationship betweenA
andB
.As
KeyedOneInputStreamOperatorTestHarness
is implemented in Java, it isinvariant
with respect to relationship betweenRideKeySelector
andKeySelector[TaxiRide, Int]
.While you don't need to worry about
variance
in many cases when you are dealing withinvariant
type parameter atcovariant
places and things just work coincidently but they almost never work when dealing withinvariant
type parameter atcontravariant
place.But when it works you have to remember that it's just a coincidence that the type was not used at any
contravariant
places, and you should not rely on it working, as the developers never intended it. It might change with one extra addition of acontravariant
used to the class in future releases.This is exactly the case here, the second type parameter
In
ofKeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
is used at naturallycontravariant
places in classKeyedOneInputStreamOperatorTestHarness
.As for the overloaded method error, that is because the constructor for
KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
wants aKeySelector<IN, K>
. ThusKeyedOneInputStreamOperatorTestHarness<Int, TaxiRide, Int>
wants aKeySelector<TaxiRide, Int>
and not ascala.Function1[TaxiRide, Int]
as you are providing it in following line,Now, You just need to provide the proper instances.
I will additionally suggest that you just simply drop the usage of
Int
(and simply usejava.lang.Integer
) when dealing with type intensive Java libraries. Interactions between Scala'sInt
and Java'sint
andInteger
have enough intricacies to write a full book.hyrbngr72#
我完全同意@sarveshseri,但是,我会把它换成其他的词。
您传递的自定义对象破坏了类型检查,正如@sarveshseri所述,类型可以变化,因此Java不喜欢正在发生的事情。
你必须在你的管道中创建Tupples,它由Integer、String和Long等基本类型组成。我不知道你需要从文档中查看的完整列表。在这种情况下,你可以创建复杂的Tupples来传递你需要的数据,在你测试的函数之前进行解析。如果你不想在函数和键选择器中一次又一次地解析对象,你就需要这样做。
下面是一个键选择器的例子,你需要在你的例子中弄清楚它:
然后,您的流运算符可以是:
希望这能帮上忙干杯。