无法创建cassandra pojo接收器

np8igboo  于 2021-06-25  发布在  Flink
关注(0)|答案(0)|浏览(313)

我正在使用flink 1.1.3和cassandra 3.8,我想为精简作业创建一个cassandrasink,所以我必须使用pojo,下面是我得到的:

public class StreamingJob {

    public static void main(String[] args) throws Exception {
        // set up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Properties properties = new Properties();
    //properties to connect with kafka

    DataStream<String> stream = env.addSource(/*kafka connection*/);

    CassandraSink.addSink(stream)
            .setClusterBuilder(new ClusterBuilder() {
                @Override
                public Cluster buildCluster(Cluster.Builder builder) {
                    return builder.addContactPoint("127.0.0.1").build();
                }
            })
            .build();
    //print messages
    stream.rebalance()
            .flatMap(new DeserializeJson())
            .filter(new EventFilter())
            .<Tuple2<String, String>>project(2, 5)
            .keyBy(0)
            .print();
    // execute program

    env.execute("Streaming Job");
}

table呢

@Table(keyspace = "flinktest", name = "eventos")
public class Eventos implements Serializable {
    @Column(name = "ad_id")
    private byte[] adId;
    @Column(name = "event_time")
    private String eventTime;

    public Eventos(byte[] adId, String eventTime){
        this.adId = adId;
        this.eventTime = eventTime;
    }

    public byte[] getAdId() {
        return adId;
    }

    public void setId(byte[] adId) {
        this.adId = adId;
    }

    public String getEventTime() {
        return eventTime;
    }

    public void setEventTime(String eventTime) {
        this.eventTime = eventTime;
    }
}

当我在没有Cassandra辛的情况下运行它时,我没有任何问题,而且工作通常都是由Kafka提供的。添加Flume时,会出现以下错误:

java.lang.RuntimeException: Cannot create CassandraPojoSink with input: String
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:53)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.IllegalArgumentException: @Table annotation was not found on type java.lang.String
at com.datastax.driver.mapping.AnnotationChecks.getTypeAnnotation(AnnotationChecks.java:39)
at com.datastax.driver.mapping.AnnotationParser.parseEntity(AnnotationParser.java:50)
at com.datastax.driver.mapping.MappingManager.getMapper(MappingManager.java:154)
at com.datastax.driver.mapping.MappingManager.mapper(MappingManager.java:110)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:51)

我不知道为什么“@table”会导致这个问题,因为它是从另一个库(datastax)导入的。你知道怎么解决这个问题吗?这是越来越烦人,因为我不能让它与CassandraFlume和例子,我发现没有帮助太多。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题