我正在使用flink 1.1.3和cassandra 3.8,我想为精简作业创建一个cassandrasink,所以我必须使用pojo,下面是我得到的:
public class StreamingJob {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
//properties to connect with kafka
DataStream<String> stream = env.addSource(/*kafka connection*/);
CassandraSink.addSink(stream)
.setClusterBuilder(new ClusterBuilder() {
@Override
public Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint("127.0.0.1").build();
}
})
.build();
//print messages
stream.rebalance()
.flatMap(new DeserializeJson())
.filter(new EventFilter())
.<Tuple2<String, String>>project(2, 5)
.keyBy(0)
.print();
// execute program
env.execute("Streaming Job");
}
table呢
@Table(keyspace = "flinktest", name = "eventos")
public class Eventos implements Serializable {
@Column(name = "ad_id")
private byte[] adId;
@Column(name = "event_time")
private String eventTime;
public Eventos(byte[] adId, String eventTime){
this.adId = adId;
this.eventTime = eventTime;
}
public byte[] getAdId() {
return adId;
}
public void setId(byte[] adId) {
this.adId = adId;
}
public String getEventTime() {
return eventTime;
}
public void setEventTime(String eventTime) {
this.eventTime = eventTime;
}
}
当我在没有Cassandra辛的情况下运行它时,我没有任何问题,而且工作通常都是由Kafka提供的。添加Flume时,会出现以下错误:
java.lang.RuntimeException: Cannot create CassandraPojoSink with input: String
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:53)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: @Table annotation was not found on type java.lang.String
at com.datastax.driver.mapping.AnnotationChecks.getTypeAnnotation(AnnotationChecks.java:39)
at com.datastax.driver.mapping.AnnotationParser.parseEntity(AnnotationParser.java:50)
at com.datastax.driver.mapping.MappingManager.getMapper(MappingManager.java:154)
at com.datastax.driver.mapping.MappingManager.mapper(MappingManager.java:110)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:51)
我不知道为什么“@table”会导致这个问题,因为它是从另一个库(datastax)导入的。你知道怎么解决这个问题吗?这是越来越烦人,因为我不能让它与CassandraFlume和例子,我发现没有帮助太多。
暂无答案!
目前还没有任何答案,快来回答吧!