本文整理了Java中org.apache.flink.types.Row.of
方法的一些代码示例,展示了Row.of
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Row.of
方法的具体详情如下:
包路径:org.apache.flink.types.Row
类名称:Row
方法名:of
[英]Creates a new Row and assigns the given values to the Row's fields. This is more convenient than using the constructor.
For example:
Row.of("hello", true, 1L);}
instead of
Row row = new Row(3);
row.setField(0, "hello");
row.setField(1, true);
row.setField(2, 1L);
[中]创建新行,并将给定值指定给该行的字段。这比使用构造函数更方便。
例如:
<<<0$>>而不是<<1$>>
代码示例来源:origin: apache/flink
@Override
public void run(SourceContext<Row> ctx) throws Exception {
long offsetMS = offsetSeconds * 2000L;
while (ms < durationMs) {
synchronized (ctx.getCheckpointLock()) {
for (int i = 0; i < numKeys; i++) {
ctx.collect(Row.of(i, ms + offsetMS, "Some payload..."));
}
ms += sleepMs;
}
Thread.sleep(sleepMs);
}
}
代码示例来源:origin: apache/flink
@Test
public void testRowRegisterRowWithNames() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
StreamITCase.clear();
List<Row> data = new ArrayList<>();
data.add(Row.of(1, 1L, "Hi"));
data.add(Row.of(2, 2L, "Hello"));
data.add(Row.of(3, 2L, "Hello world"));
TypeInformation<?>[] types = {
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO};
String[] names = {"a", "b", "c"};
RowTypeInfo typeInfo = new RowTypeInfo(types, names);
DataStream<Row> ds = env.fromCollection(data).returns(typeInfo);
Table in = tableEnv.fromDataStream(ds, "a,b,c");
tableEnv.registerTable("MyTableRow", in);
String sqlQuery = "SELECT a,c FROM MyTableRow";
Table result = tableEnv.sqlQuery(sqlQuery);
DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
resultSet.addSink(new StreamITCase.StringSink<Row>());
env.execute();
List<String> expected = new ArrayList<>();
expected.add("1,Hi");
expected.add("2,Hello");
expected.add("3,Hello world");
StreamITCase.compareWithList(expected);
}
代码示例来源:origin: apache/flink
@Test(expected = TableException.class)
public void testGenericRowWithAlias() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
// use null value the enforce GenericType
DataSet<Row> dataSet = env.fromElements(Row.of((Integer) null));
assertTrue(dataSet.getType() instanceof GenericTypeInfo);
assertTrue(dataSet.getType().getTypeClass().equals(Row.class));
// Must fail. Cannot import DataSet<Row> with GenericTypeInfo.
tableEnv.fromDataSet(dataSet, "nullField");
}
代码示例来源:origin: apache/flink
@Test(expected = TableException.class)
public void testGenericRow() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
// use null value the enforce GenericType
DataSet<Row> dataSet = env.fromElements(Row.of(1, 2L, "Hello", null));
assertTrue(dataSet.getType() instanceof GenericTypeInfo);
assertTrue(dataSet.getType().getTypeClass().equals(Row.class));
// Must fail. Cannot import DataSet<Row> with GenericTypeInfo.
tableEnv.fromDataSet(dataSet);
}
代码示例来源:origin: apache/flink
@Test
public void testRowOf() {
Row row1 = Row.of(1, "hello", null, Tuple2.of(2L, "hi"), true);
Row row2 = new Row(5);
row2.setField(0, 1);
row2.setField(1, "hello");
row2.setField(2, null);
row2.setField(3, new Tuple2<>(2L, "hi"));
row2.setField(4, true);
assertEquals(row1, row2);
}
代码示例来源:origin: dataArtisans/flink-training-exercises
@Override
public Row map(TaxiFare fare) throws Exception {
return Row.of(
fare.rideId,
fare.taxiId,
fare.driverId,
fare.paymentType,
fare.tip,
fare.tolls,
fare.totalFare);
}
}
代码示例来源:origin: dataArtisans/flink-training-exercises
@Override
public Row map(TaxiRide ride) throws Exception {
return Row.of(
ride.rideId,
ride.taxiId,
ride.driverId,
ride.isStart,
ride.startLon,
ride.startLat,
ride.endLon,
ride.endLat,
ride.passengerCnt);
}
}
代码示例来源:origin: harbby/sylph
@Override
public Row deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)
{
return Row.of(
topic, //topic
messageKey == null ? null : new String(messageKey, UTF_8), //key
new String(message, UTF_8), //message
partition,
offset
);
}
代码示例来源:origin: harbby/sylph
@Override
public Row deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)
{
return Row.of(
topic, //topic
messageKey == null ? null : new String(messageKey, UTF_8), //key
new String(message, UTF_8), //message
partition,
offset
);
}
代码示例来源:origin: haoch/flink-siddhi
private Row buildRow(Event event) {
return Row.of(event.getData());
}
代码示例来源:origin: harbby/sylph
@Override
public void run(SourceContext<Row> sourceContext)
throws Exception
{
Random random = new Random(1000000);
int numKeys = 10;
while (running) {
java.time.LocalDate date = java.time.LocalDate.now();
java.sql.Date now = java.sql.Date.valueOf(date);
String msg = "https://github.com/harbby/sylph/" + random.nextLong();
Row row = Row.of("github.com" + random.nextLong(), msg, now);
sourceContext.collect(row);
}
}
代码示例来源:origin: harbby/sylph
@Override
public void run(SourceContext<Row> sourceContext)
throws Exception
{
Random random = new Random();
int numKeys = 10;
long count = 1L;
while (running) {
long eventTime = System.currentTimeMillis() - random.nextInt(10 * 1000); //表示数据已经产生了 但是会有10秒以内的延迟
String userId = "uid_" + count;
String msg = MAPPER.writeValueAsString(ImmutableMap.of("user_id", userId, "ip", "127.0.0.1", "store", 12.0));
Row row = Row.of("key" + random.nextInt(10), msg, eventTime);
sourceContext.collect(row);
count = count > numKeys ? 1L : count + 1;
TimeUnit.MILLISECONDS.sleep(100);
}
}
内容来源于网络,如有侵权,请联系作者删除!