我在使用kafkaconnectapi开发kafka源连接器时遇到了一些问题。
我使用改造和gson从restapi获取数据,然后尝试将其插入kafka。
这是我的源任务类:
public class BitfinexSourceTask extends SourceTask implements BitfinexTickerGetter.OnTickerReadyListener {
private static final String DATETIME_FIELD = "datetime";
private BitfinexService service;
private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private BlockingQueue<SourceRecord> queue = null;
private BitfinexTickerGetter tickerGetter;
private final Runnable runnable = new Runnable() {
@Override
public void run() {
try {
tickerGetter.get();
} catch (IOException e) {
e.printStackTrace();
}
}
};
private ScheduledFuture<?> scheduledFuture;
@Override
public String version() {
return VersionUtil.getVersion();
}
@Override
public void start(Map<String, String> map) {
service = BitfinexServiceFactory.create();
queue = new LinkedBlockingQueue<>();
tickerGetter = new BitfinexTickerGetter(service, this);
scheduledFuture = scheduler.scheduleAtFixedRate(runnable, 0, 5, TimeUnit.MINUTES);
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
List<SourceRecord> result = new LinkedList<>();
if (queue.isEmpty()) result.add(queue.take());
queue.drainTo(result);
return result;
}
@Override
public void stop() {
scheduledFuture.cancel(true);
scheduler.shutdown();
}
@Override
public void onTickerReady(Ticker ticker) {
Map<String, ?> srcOffset = Collections.singletonMap(DATETIME_FIELD, ticker.getDatetime());
Map<String, ?> srcPartition = Collections.singletonMap("from", "bitfinex");
SourceRecord record = new SourceRecord(srcPartition, srcOffset, ticker.getSymbol(), Schema.STRING_SCHEMA, ticker.getDatetime(), Ticker.SCHEMA, ticker);
queue.offer(record);
}
}
实际上,我能够构建并添加连接器。它运行时没有任何错误,但是没有创建主题。我决定手动创建主题,然后重新运行连接器,但是主题仍然是空的。 Ticker
是包含字符串和双字段的pojo对象。
有人能帮我吗?
暂无答案!
目前还没有任何答案,快来回答吧!