java—使用kafka connect api将对象从rest api插入kafka

rsaldnfx  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(252)

我在使用kafkaconnectapi开发kafka源连接器时遇到了一些问题。
我使用改造和gson从restapi获取数据,然后尝试将其插入kafka。
这是我的源任务类:

public class BitfinexSourceTask extends SourceTask implements BitfinexTickerGetter.OnTickerReadyListener {
private static final String DATETIME_FIELD = "datetime";

private BitfinexService service;
private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private BlockingQueue<SourceRecord> queue = null;
private BitfinexTickerGetter tickerGetter;

private final Runnable runnable = new Runnable() {
    @Override
    public void run() {
        try {
            tickerGetter.get();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
};
private ScheduledFuture<?> scheduledFuture;

@Override
public String version() {
    return VersionUtil.getVersion();
}

@Override
public void start(Map<String, String> map) {
    service = BitfinexServiceFactory.create();
    queue = new LinkedBlockingQueue<>();

    tickerGetter = new BitfinexTickerGetter(service, this);
    scheduledFuture = scheduler.scheduleAtFixedRate(runnable, 0, 5, TimeUnit.MINUTES);
}

@Override
public List<SourceRecord> poll() throws InterruptedException {
    List<SourceRecord> result = new LinkedList<>();
    if (queue.isEmpty()) result.add(queue.take());
    queue.drainTo(result);
    return result;
}

@Override
public void stop() {
    scheduledFuture.cancel(true);
    scheduler.shutdown();
}

@Override
public void onTickerReady(Ticker ticker) {
    Map<String, ?> srcOffset = Collections.singletonMap(DATETIME_FIELD, ticker.getDatetime());
    Map<String, ?> srcPartition = Collections.singletonMap("from", "bitfinex");

    SourceRecord record = new SourceRecord(srcPartition, srcOffset, ticker.getSymbol(), Schema.STRING_SCHEMA, ticker.getDatetime(), Ticker.SCHEMA, ticker);

    queue.offer(record);
}

}
实际上,我能够构建并添加连接器。它运行时没有任何错误,但是没有创建主题。我决定手动创建主题,然后重新运行连接器,但是主题仍然是空的。 Ticker 是包含字符串和双字段的pojo对象。
有人能帮我吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题