org.apache.samza.operators.windows.WindowKey类的使用及代码示例

x33g5p2x  于2022-02-03 转载在 其他  
字(13.2k)|赞(0)|评价(0)|浏览(126)

本文整理了Java中org.apache.samza.operators.windows.WindowKey类的一些代码示例,展示了WindowKey类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。WindowKey类的具体详情如下:
包路径:org.apache.samza.operators.windows.WindowKey
类名称:WindowKey

WindowKey介绍

[英]Key for a WindowPane emitted from a Window.
[中]从窗口发出的窗格的键。

代码示例

代码示例来源:origin: apache/samza

@Test
 public void testConstructor() {
  WindowPane<String, Integer> wndOutput = new WindowPane<>(new WindowKey<>("testMsg", null), 10, AccumulationMode.DISCARDING, FiringType.EARLY);
  assertEquals(wndOutput.getKey().getKey(), "testMsg");
  assertEquals(wndOutput.getMessage(), Integer.valueOf(10));
 }
}

代码示例来源:origin: org.apache.samza/samza-core_2.12

/**
 * Computes the pane output corresponding to a {@link TriggerKey} that fired.
 */
private WindowPane<K, Object> computePaneOutput(TriggerKey<K> triggerKey, Object windowVal) {
 WindowKey<K> windowKey = new WindowKey(triggerKey.getKey(), Long.toString(triggerKey.getTimestamp()));
 WindowPane<K, Object> paneOutput =
   new WindowPane<>(windowKey, windowVal, window.getAccumulationMode(), triggerKey.getType());
 LOG.trace("Emitting pane output for trigger key {}", triggerKey);
 return paneOutput;
}

代码示例来源:origin: apache/samza

@Test
public void testTumblingWindowsDiscardingMode() throws Exception {
 OperatorSpecGraph sgb = this.getKeyedTumblingWindowStreamGraph(AccumulationMode.DISCARDING,
   Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2))).getOperatorSpecGraph();
 List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
 TestClock testClock = new TestClock();
 StreamOperatorTask task = new StreamOperatorTask(sgb, testClock);
 task.init(this.context);
 MessageCollector messageCollector =
   envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
 integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator));
 testClock.advanceTime(Duration.ofSeconds(1));
 task.window(messageCollector, taskCoordinator);
 Assert.assertEquals(windowPanes.size(), 5);
 Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
 Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
 Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2));
 Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 2);
 Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1));
 Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 2);
 Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2));
 Assert.assertEquals((windowPanes.get(3).getMessage()).size(), 2);
 Assert.assertEquals(windowPanes.get(4).getKey().getKey(), new Integer(3));
 Assert.assertEquals((windowPanes.get(4).getMessage()).size(), 1);
}

代码示例来源:origin: apache/samza

task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
Assert.assertEquals(windowPanes.size(), 1);
Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0");
Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
Assert.assertEquals(windowPanes.get(0).getFiringType(), FiringType.EARLY);
Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0");
Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0");
Assert.assertEquals(windowPanes.get(1).getFiringType(), FiringType.DEFAULT);
Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(3));
Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1000");
Assert.assertEquals(windowPanes.get(2).getFiringType(), FiringType.DEFAULT);
Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 1);

代码示例来源:origin: org.apache.samza/samza-core_2.11

/**
 * Computes the pane output corresponding to a {@link TriggerKey} that fired.
 */
private WindowPane<K, Object> computePaneOutput(TriggerKey<K> triggerKey, Object windowVal) {
 WindowKey<K> windowKey = new WindowKey(triggerKey.getKey(), Long.toString(triggerKey.getTimestamp()));
 WindowPane<K, Object> paneOutput =
   new WindowPane<>(windowKey, windowVal, window.getAccumulationMode(), triggerKey.getType());
 LOG.trace("Emitting pane output for trigger key {}", triggerKey);
 return paneOutput;
}

代码示例来源:origin: apache/samza

@Test
public void testTumblingWindowsAccumulatingMode() throws Exception {
 OperatorSpecGraph sgb = this.getKeyedTumblingWindowStreamGraph(AccumulationMode.ACCUMULATING,
   Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2))).getOperatorSpecGraph();
 List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
 TestClock testClock = new TestClock();
 StreamOperatorTask task = new StreamOperatorTask(sgb, testClock);
 task.init(this.context);
 MessageCollector messageCollector =
   envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
 integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator));
 testClock.advanceTime(Duration.ofSeconds(1));
 task.window(messageCollector, taskCoordinator);
 Assert.assertEquals(windowPanes.size(), 7);
 Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
 Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
 Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2));
 Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 2);
 Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1));
 Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 4);
 Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2));
 Assert.assertEquals((windowPanes.get(3).getMessage()).size(), 4);
}

代码示例来源:origin: apache/samza

Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1");
Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1");
Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "1001");
Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1001");
Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 2);
task.window(messageCollector, taskCoordinator);
Assert.assertEquals(windowPanes.size(), 4);
Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2));
Assert.assertEquals(windowPanes.get(3).getKey().getPaneId(), "2001");
Assert.assertEquals((windowPanes.get(3).getMessage()).size(), 2);

代码示例来源:origin: apache/samza

/**
 * Computes the pane output corresponding to a {@link TriggerKey} that fired.
 */
private WindowPane<K, Object> computePaneOutput(TriggerKey<K> triggerKey, Object windowVal) {
 WindowKey<K> windowKey = new WindowKey(triggerKey.getKey(), Long.toString(triggerKey.getTimestamp()));
 WindowPane<K, Object> paneOutput =
   new WindowPane<>(windowKey, windowVal, window.getAccumulationMode(), triggerKey.getType());
 LOG.trace("Emitting pane output for trigger key {}", triggerKey);
 return paneOutput;
}

代码示例来源:origin: org.apache.samza/samza-sql

void translate(final LogicalAggregate aggregate, final TranslatorContext context) {
 validateAggregateFunctions(aggregate);
 MessageStream<SamzaSqlRelMessage> inputStream = context.getMessageStream(aggregate.getInput().getId());
 // At this point, the assumption is that only count function is supported.
 SupplierFunction<Long> initialValue = () -> (long) 0;
 FoldLeftFunction<SamzaSqlRelMessage, Long> foldCountFn = (m, c) -> c + 1;
 final ArrayList<String> aggFieldNames = getAggFieldNames(aggregate);
 MessageStream<SamzaSqlRelMessage> outputStream =
   inputStream
     .window(Windows.keyedTumblingWindow(m -> m,
       Duration.ofMillis(context.getExecutionContext().getSamzaSqlApplicationConfig().getWindowDurationMs()),
       initialValue,
       foldCountFn,
       new SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde(),
       new LongSerde())
       .setAccumulationMode(AccumulationMode.DISCARDING), "tumblingWindow_" + windowId)
     .map(windowPane -> {
       List<String> fieldNames = windowPane.getKey().getKey().getSamzaSqlRelRecord().getFieldNames();
       List<Object> fieldValues = windowPane.getKey().getKey().getSamzaSqlRelRecord().getFieldValues();
       fieldNames.add(aggFieldNames.get(0));
       fieldValues.add(windowPane.getMessage());
       return new SamzaSqlRelMessage(fieldNames, fieldValues);
      });
 context.registerMessageStream(aggregate.getId(), outputStream);
}

代码示例来源:origin: apache/samza

Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(1));
Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0");
Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 5);
Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1));
Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1000");
Assert.assertEquals(windowPanes.size(), 4);
Assert.assertEquals(windowPanes.get(3).getFiringType(), FiringType.DEFAULT);
Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(1));
Assert.assertEquals(windowPanes.get(3).getKey().getPaneId(), "1000");

代码示例来源:origin: org.apache.samza/samza-core_2.10

/**
 * Computes the pane output corresponding to a {@link TriggerKey} that fired.
 */
private WindowPane<K, Object> computePaneOutput(TriggerKey<K> triggerKey, Object windowVal) {
 WindowKey<K> windowKey = new WindowKey(triggerKey.getKey(), Long.toString(triggerKey.getTimestamp()));
 WindowPane<K, Object> paneOutput =
   new WindowPane<>(windowKey, windowVal, window.getAccumulationMode(), triggerKey.getType());
 LOG.trace("Emitting pane output for trigger key {}", triggerKey);
 return paneOutput;
}

代码示例来源:origin: apache/samza

void translate(final LogicalAggregate aggregate, final TranslatorContext context) {
 validateAggregateFunctions(aggregate);
 MessageStream<SamzaSqlRelMessage> inputStream = context.getMessageStream(aggregate.getInput().getId());
 // At this point, the assumption is that only count function is supported.
 SupplierFunction<Long> initialValue = () -> (long) 0;
 FoldLeftFunction<SamzaSqlRelMessage, Long> foldCountFn = (m, c) -> c + 1;
 final ArrayList<String> aggFieldNames = getAggFieldNames(aggregate);
 MessageStream<SamzaSqlRelMessage> outputStream =
   inputStream
     .map(new TranslatorInputMetricsMapFunction(logicalOpId))
     .window(Windows.keyedTumblingWindow(m -> m,
       Duration.ofMillis(context.getExecutionContext().getSamzaSqlApplicationConfig().getWindowDurationMs()),
       initialValue,
       foldCountFn,
       new SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde(),
       new LongSerde())
       .setAccumulationMode(
         AccumulationMode.DISCARDING), changeLogStorePrefix + "_tumblingWindow_" + logicalOpId)
     .map(windowPane -> {
       List<String> fieldNames = windowPane.getKey().getKey().getSamzaSqlRelRecord().getFieldNames();
       List<Object> fieldValues = windowPane.getKey().getKey().getSamzaSqlRelRecord().getFieldValues();
       fieldNames.add(aggFieldNames.get(0));
       fieldValues.add(windowPane.getMessage());
       return new SamzaSqlRelMessage(fieldNames, fieldValues, new SamzaSqlRelMsgMetadata("", "", ""));
      });
 context.registerMessageStream(aggregate.getId(), outputStream);
 outputStream.map(new TranslatorOutputMetricsMapFunction(logicalOpId));
}

代码示例来源:origin: org.apache.samza/samza-core

/**
 * Computes the pane output corresponding to a {@link TriggerKey} that fired.
 */
private WindowPane<K, Object> computePaneOutput(TriggerKey<K> triggerKey, Object windowVal) {
 WindowKey<K> windowKey = new WindowKey(triggerKey.getKey(), Long.toString(triggerKey.getTimestamp()));
 WindowPane<K, Object> paneOutput =
   new WindowPane<>(windowKey, windowVal, window.getAccumulationMode(), triggerKey.getType());
 LOG.trace("Emitting pane output for trigger key {}", triggerKey);
 return paneOutput;
}

代码示例来源:origin: apache/samza

@Test
public void testSessionWindowsAccumulatingMode() throws Exception {
 OperatorSpecGraph sgb = this.getKeyedSessionWindowStreamGraph(AccumulationMode.DISCARDING,
   Duration.ofMillis(500)).getOperatorSpecGraph();
 TestClock testClock = new TestClock();
 StreamOperatorTask task = new StreamOperatorTask(sgb, testClock);
 List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
 MessageCollector messageCollector =
   envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
 task.init(this.context);
 task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
 task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
 testClock.advanceTime(Duration.ofSeconds(1));
 task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
 task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
 task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
 task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
 testClock.advanceTime(Duration.ofSeconds(1));
 task.window(messageCollector, taskCoordinator);
 Assert.assertEquals(windowPanes.size(), 2);
 Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
 Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
 Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2));
 Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
 Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 4);
}

代码示例来源:origin: apache/samza

new StringSerde(),
     new LongSerde()), "count-by-country")
   .map(pane -> new KV<>(pane.getKey().getKey(), pane.getMessage()))
   .sendTo(outputStream);
}, config);

相关文章