如何在apache flink上编写“自定义源代码”

mitkmikd  于 2021-06-25  发布在  Flink
关注(0)|答案(0)|浏览(284)

我想写一个数据源,它是来自TarantoolJava的数据流https://github.com/tarantool/tarantool-java.
有谁能给我一个关于如何用用户定义的方法编写数据源的指南吗。
这是我的代码:

  1. package tarantooljava.streaming.flink_connecter;
  2. import org.apache.flink.configuration.Configuration;
  3. import org.tarantool.TarantoolConnection16;
  4. import org.tarantool.TarantoolConnection16Impl;
  5. import splunk.test.TestSchema;
  6. import java.io.IOException;
  7. import java.util.Arrays;
  8. import java.util.List;
  9. import static java.util.Objects.requireNonNull;
  10. /**
  11. * Created by jaryzhen on 16/4/19.
  12. */
  13. public class FlinkTarantoolJavaSpace<T> extends FlinkTarantoolJavaSpaceBase<T>{
  14. private ConsumerThread<T> consumerThread;
  15. public FlinkTarantoolJavaSpace(String ip, int port, String user, String pwd) throws IOException {
  16. FlinkTarantoolJavaSpace(ip,port,user,pwd,11);
  17. }
  18. public List<T> FlinkTarantoolJavaSpace(String ip, int port, String user, String pwd, int a) throws IOException {
  19. requireNonNull(ip, "topics");
  20. TarantoolConnection16 con = new TarantoolConnection16Impl(ip, port);
  21. con.auth(user, pwd);
  22. final TestSchema schema = con.schema(new TestSchema());
  23. List select0 = null;
  24. for (int i=0 ; i <100 ; i=i+2) {
  25. select0 = con.select(schema.tester.id, schema.tester.primary, Arrays.asList(i), 0, 30, 0);
  26. //System.out.println("select0:" +i+ select0);
  27. }
  28. // System.out.println(a.size());
  29. // System.out.println(a.get(0));
  30. con.close();
  31. return select0;
  32. }
  33. @Override
  34. public void open(Configuration parameters) throws Exception {
  35. super.open(parameters);
  36. }
  37. @Override
  38. public void run(SourceContext<T> sourceContext) throws Exception {
  39. consumerThread = new ConsumerThread<>(this, sourceContext);
  40. }
  41. @Override
  42. public void cancel() {
  43. // set ourselves as not running
  44. boolean running = false;
  45. if(true) {
  46. } else {
  47. // the consumer thread is not running, so we have to interrupt our own thread
  48. }
  49. }
  50. @Override
  51. public void close() throws Exception {
  52. cancel();
  53. super.close();
  54. }
  55. // ------------------------------------------------------------------------
  56. // Checkpoint and restore
  57. // ----------------------------------------------------------------
  58. private static class ConsumerThread<T> extends Thread {
  59. private FlinkTarantoolJavaSpace<T> flinConsumer;
  60. private SourceContext<T> sourceContext;
  61. private boolean running = true;
  62. public ConsumerThread(FlinkTarantoolJavaSpace<T> flinkConsumer, SourceContext<T> sourceContext) {
  63. this.sourceContext = sourceContext;
  64. this.flinkConsumer=flinkConsumer;
  65. }
  66. @Override
  67. public void run() {
  68. }
  69. /**
  70. * Try to shutdown the thread
  71. */
  72. public void shutdown() {
  73. this.running = false;
  74. }
  75. }

}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题