org.apache.flink.types.Row.of()方法的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(5.7k)|赞(0)|评价(0)|浏览(214)

本文整理了Java中org.apache.flink.types.Row.of方法的一些代码示例,展示了Row.of的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Row.of方法的具体详情如下:
包路径:org.apache.flink.types.Row
类名称:Row
方法名:of

Row.of介绍

[英]Creates a new Row and assigns the given values to the Row's fields. This is more convenient than using the constructor.

For example:

Row.of("hello", true, 1L);}

instead of

Row row = new Row(3); 
row.setField(0, "hello"); 
row.setField(1, true); 
row.setField(2, 1L);

[中]创建新行,并将给定值指定给该行的字段。这比使用构造函数更方便。
例如:
<<<0$>>而不是<<1$>>

代码示例

代码示例来源:origin: apache/flink

@Override
public void run(SourceContext<Row> ctx) throws Exception {
  long offsetMS = offsetSeconds * 2000L;
  while (ms < durationMs) {
    synchronized (ctx.getCheckpointLock()) {
      for (int i = 0; i < numKeys; i++) {
        ctx.collect(Row.of(i, ms + offsetMS, "Some payload..."));
      }
      ms += sleepMs;
    }
    Thread.sleep(sleepMs);
  }
}

代码示例来源:origin: apache/flink

@Test
public void testRowRegisterRowWithNames() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  StreamITCase.clear();
  List<Row> data = new ArrayList<>();
  data.add(Row.of(1, 1L, "Hi"));
  data.add(Row.of(2, 2L, "Hello"));
  data.add(Row.of(3, 2L, "Hello world"));
  TypeInformation<?>[] types = {
      BasicTypeInfo.INT_TYPE_INFO,
      BasicTypeInfo.LONG_TYPE_INFO,
      BasicTypeInfo.STRING_TYPE_INFO};
  String[] names = {"a", "b", "c"};
  RowTypeInfo typeInfo = new RowTypeInfo(types, names);
  DataStream<Row> ds = env.fromCollection(data).returns(typeInfo);
  Table in = tableEnv.fromDataStream(ds, "a,b,c");
  tableEnv.registerTable("MyTableRow", in);
  String sqlQuery = "SELECT a,c FROM MyTableRow";
  Table result = tableEnv.sqlQuery(sqlQuery);
  DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
  resultSet.addSink(new StreamITCase.StringSink<Row>());
  env.execute();
  List<String> expected = new ArrayList<>();
  expected.add("1,Hi");
  expected.add("2,Hello");
  expected.add("3,Hello world");
  StreamITCase.compareWithList(expected);
}

代码示例来源:origin: apache/flink

@Test(expected = TableException.class)
public void testGenericRowWithAlias() throws Exception {
  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
  // use null value the enforce GenericType
  DataSet<Row> dataSet = env.fromElements(Row.of((Integer) null));
  assertTrue(dataSet.getType() instanceof GenericTypeInfo);
  assertTrue(dataSet.getType().getTypeClass().equals(Row.class));
  // Must fail. Cannot import DataSet<Row> with GenericTypeInfo.
  tableEnv.fromDataSet(dataSet, "nullField");
}

代码示例来源:origin: apache/flink

@Test(expected = TableException.class)
public void testGenericRow() throws Exception {
  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
  // use null value the enforce GenericType
  DataSet<Row> dataSet = env.fromElements(Row.of(1, 2L, "Hello", null));
  assertTrue(dataSet.getType() instanceof GenericTypeInfo);
  assertTrue(dataSet.getType().getTypeClass().equals(Row.class));
  // Must fail. Cannot import DataSet<Row> with GenericTypeInfo.
  tableEnv.fromDataSet(dataSet);
}

代码示例来源:origin: apache/flink

@Test
public void testRowOf() {
  Row row1 = Row.of(1, "hello", null, Tuple2.of(2L, "hi"), true);
  Row row2 = new Row(5);
  row2.setField(0, 1);
  row2.setField(1, "hello");
  row2.setField(2, null);
  row2.setField(3, new Tuple2<>(2L, "hi"));
  row2.setField(4, true);
  assertEquals(row1, row2);
}

代码示例来源:origin: dataArtisans/flink-training-exercises

@Override
  public Row map(TaxiFare fare) throws Exception {
    return Row.of(
        fare.rideId,
        fare.taxiId,
        fare.driverId,
        fare.paymentType,
        fare.tip,
        fare.tolls,
        fare.totalFare);
  }
}

代码示例来源:origin: dataArtisans/flink-training-exercises

@Override
  public Row map(TaxiRide ride) throws Exception {
    return Row.of(
        ride.rideId,
        ride.taxiId,
        ride.driverId,
        ride.isStart,
        ride.startLon,
        ride.startLat,
        ride.endLon,
        ride.endLat,
        ride.passengerCnt);
  }
}

代码示例来源:origin: harbby/sylph

@Override
public Row deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)
{
  return Row.of(
      topic, //topic
      messageKey == null ? null : new String(messageKey, UTF_8), //key
      new String(message, UTF_8), //message
      partition,
      offset
  );
}

代码示例来源:origin: harbby/sylph

@Override
public Row deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)
{
  return Row.of(
      topic, //topic
      messageKey == null ? null : new String(messageKey, UTF_8), //key
      new String(message, UTF_8), //message
      partition,
      offset
  );
}

代码示例来源:origin: haoch/flink-siddhi

private Row buildRow(Event event) {
  return Row.of(event.getData());
}

代码示例来源:origin: harbby/sylph

@Override
public void run(SourceContext<Row> sourceContext)
    throws Exception
{
  Random random = new Random(1000000);
  int numKeys = 10;
  while (running) {
    java.time.LocalDate date = java.time.LocalDate.now();
    java.sql.Date now = java.sql.Date.valueOf(date);
    String msg = "https://github.com/harbby/sylph/" + random.nextLong();
    Row row = Row.of("github.com" + random.nextLong(), msg, now);
    sourceContext.collect(row);
  }
}

代码示例来源:origin: harbby/sylph

@Override
public void run(SourceContext<Row> sourceContext)
    throws Exception
{
  Random random = new Random();
  int numKeys = 10;
  long count = 1L;
  while (running) {
    long eventTime = System.currentTimeMillis() - random.nextInt(10 * 1000); //表示数据已经产生了 但是会有10秒以内的延迟
    String userId = "uid_" + count;
    String msg = MAPPER.writeValueAsString(ImmutableMap.of("user_id", userId, "ip", "127.0.0.1", "store", 12.0));
    Row row = Row.of("key" + random.nextInt(10), msg, eventTime);
    sourceContext.collect(row);
    count = count > numKeys ? 1L : count + 1;
    TimeUnit.MILLISECONDS.sleep(100);
  }
}

相关文章