flink twitter流媒体示例不适用于自定义端点

nmpmafwu  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(299)

我在github上扩展了flink连接器,以获取自定义url的twitter流,虽然我能够获得示例代码中给出的随机tweets,但是当我提供自定义url时,tweets不会被提取(控制台和文件上没有打印任何内容)。我已经写了一个customendpoint,如下所示

public class CustomEndPoint implements EndpointInitializer, Serializable{
@Override
public StreamingEndpoint createEndpoint() {
    return new RawEndpoint("https://api.twitter.com/1.1/search/tweets.json?q=%23apple", "GET");     
}}

下面是我如何将我的自定义端点连接到连接器api中给出的twittersource类

TwitterSource twitterSource = new TwitterSource(params.getProperties());
    twitterSource.setCustomEndpointInitializer(new CustomEndPoint());
        streamSource = env.addSource(twitterSource);

同样,当我直接修改twittersource类的代码时,相同的端点也能工作(即将我的端点硬编码为字段变量),但是我不想这样做,因为这不是使用api的最佳方式,而且我失去了在不更改代码的情况下提供不同端点的能力

niwlg2el

niwlg2el1#

这是因为您必须设置twittersource主机属性。您的主机不是flink twitter源客户端使用的默认主机。

props.setProperty(TwitterSource.CLIENT_HOSTS,"https://api.twitter.com");

使用的默认主机是“https://stream.twitter.com“请参阅twittersource类,run方法:

client = new ClientBuilder()
        .name(properties.getProperty(CLIENT_NAME, "flink-twitter-source"))
        .hosts(properties.getProperty(CLIENT_HOSTS, Constants.STREAM_HOST))
        .endpoint(endpoint)
        .authentication(auth)
        .processor(new HosebirdMessageProcessor() {

...

相关问题