本文整理了Java中org.apache.samza.operators.windows.WindowKey
类的一些代码示例,展示了WindowKey
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。WindowKey
类的具体详情如下:
包路径:org.apache.samza.operators.windows.WindowKey
类名称:WindowKey
[英]Key for a WindowPane emitted from a Window.
[中]从窗口发出的窗格的键。
代码示例来源:origin: apache/samza
@Test
public void testConstructor() {
WindowPane<String, Integer> wndOutput = new WindowPane<>(new WindowKey<>("testMsg", null), 10, AccumulationMode.DISCARDING, FiringType.EARLY);
assertEquals(wndOutput.getKey().getKey(), "testMsg");
assertEquals(wndOutput.getMessage(), Integer.valueOf(10));
}
}
代码示例来源:origin: org.apache.samza/samza-core_2.12
/**
* Computes the pane output corresponding to a {@link TriggerKey} that fired.
*/
private WindowPane<K, Object> computePaneOutput(TriggerKey<K> triggerKey, Object windowVal) {
WindowKey<K> windowKey = new WindowKey(triggerKey.getKey(), Long.toString(triggerKey.getTimestamp()));
WindowPane<K, Object> paneOutput =
new WindowPane<>(windowKey, windowVal, window.getAccumulationMode(), triggerKey.getType());
LOG.trace("Emitting pane output for trigger key {}", triggerKey);
return paneOutput;
}
代码示例来源:origin: apache/samza
@Test
public void testTumblingWindowsDiscardingMode() throws Exception {
OperatorSpecGraph sgb = this.getKeyedTumblingWindowStreamGraph(AccumulationMode.DISCARDING,
Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2))).getOperatorSpecGraph();
List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
TestClock testClock = new TestClock();
StreamOperatorTask task = new StreamOperatorTask(sgb, testClock);
task.init(this.context);
MessageCollector messageCollector =
envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator));
testClock.advanceTime(Duration.ofSeconds(1));
task.window(messageCollector, taskCoordinator);
Assert.assertEquals(windowPanes.size(), 5);
Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2));
Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 2);
Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1));
Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 2);
Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2));
Assert.assertEquals((windowPanes.get(3).getMessage()).size(), 2);
Assert.assertEquals(windowPanes.get(4).getKey().getKey(), new Integer(3));
Assert.assertEquals((windowPanes.get(4).getMessage()).size(), 1);
}
代码示例来源:origin: apache/samza
task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
Assert.assertEquals(windowPanes.size(), 1);
Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0");
Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
Assert.assertEquals(windowPanes.get(0).getFiringType(), FiringType.EARLY);
Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0");
Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0");
Assert.assertEquals(windowPanes.get(1).getFiringType(), FiringType.DEFAULT);
Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(3));
Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1000");
Assert.assertEquals(windowPanes.get(2).getFiringType(), FiringType.DEFAULT);
Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 1);
代码示例来源:origin: org.apache.samza/samza-core_2.11
/**
* Computes the pane output corresponding to a {@link TriggerKey} that fired.
*/
private WindowPane<K, Object> computePaneOutput(TriggerKey<K> triggerKey, Object windowVal) {
WindowKey<K> windowKey = new WindowKey(triggerKey.getKey(), Long.toString(triggerKey.getTimestamp()));
WindowPane<K, Object> paneOutput =
new WindowPane<>(windowKey, windowVal, window.getAccumulationMode(), triggerKey.getType());
LOG.trace("Emitting pane output for trigger key {}", triggerKey);
return paneOutput;
}
代码示例来源:origin: apache/samza
@Test
public void testTumblingWindowsAccumulatingMode() throws Exception {
OperatorSpecGraph sgb = this.getKeyedTumblingWindowStreamGraph(AccumulationMode.ACCUMULATING,
Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2))).getOperatorSpecGraph();
List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
TestClock testClock = new TestClock();
StreamOperatorTask task = new StreamOperatorTask(sgb, testClock);
task.init(this.context);
MessageCollector messageCollector =
envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator));
testClock.advanceTime(Duration.ofSeconds(1));
task.window(messageCollector, taskCoordinator);
Assert.assertEquals(windowPanes.size(), 7);
Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2));
Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 2);
Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1));
Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 4);
Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2));
Assert.assertEquals((windowPanes.get(3).getMessage()).size(), 4);
}
代码示例来源:origin: apache/samza
Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1");
Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1");
Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "1001");
Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1001");
Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 2);
task.window(messageCollector, taskCoordinator);
Assert.assertEquals(windowPanes.size(), 4);
Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2));
Assert.assertEquals(windowPanes.get(3).getKey().getPaneId(), "2001");
Assert.assertEquals((windowPanes.get(3).getMessage()).size(), 2);
代码示例来源:origin: apache/samza
/**
* Computes the pane output corresponding to a {@link TriggerKey} that fired.
*/
private WindowPane<K, Object> computePaneOutput(TriggerKey<K> triggerKey, Object windowVal) {
WindowKey<K> windowKey = new WindowKey(triggerKey.getKey(), Long.toString(triggerKey.getTimestamp()));
WindowPane<K, Object> paneOutput =
new WindowPane<>(windowKey, windowVal, window.getAccumulationMode(), triggerKey.getType());
LOG.trace("Emitting pane output for trigger key {}", triggerKey);
return paneOutput;
}
代码示例来源:origin: org.apache.samza/samza-sql
void translate(final LogicalAggregate aggregate, final TranslatorContext context) {
validateAggregateFunctions(aggregate);
MessageStream<SamzaSqlRelMessage> inputStream = context.getMessageStream(aggregate.getInput().getId());
// At this point, the assumption is that only count function is supported.
SupplierFunction<Long> initialValue = () -> (long) 0;
FoldLeftFunction<SamzaSqlRelMessage, Long> foldCountFn = (m, c) -> c + 1;
final ArrayList<String> aggFieldNames = getAggFieldNames(aggregate);
MessageStream<SamzaSqlRelMessage> outputStream =
inputStream
.window(Windows.keyedTumblingWindow(m -> m,
Duration.ofMillis(context.getExecutionContext().getSamzaSqlApplicationConfig().getWindowDurationMs()),
initialValue,
foldCountFn,
new SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde(),
new LongSerde())
.setAccumulationMode(AccumulationMode.DISCARDING), "tumblingWindow_" + windowId)
.map(windowPane -> {
List<String> fieldNames = windowPane.getKey().getKey().getSamzaSqlRelRecord().getFieldNames();
List<Object> fieldValues = windowPane.getKey().getKey().getSamzaSqlRelRecord().getFieldValues();
fieldNames.add(aggFieldNames.get(0));
fieldValues.add(windowPane.getMessage());
return new SamzaSqlRelMessage(fieldNames, fieldValues);
});
context.registerMessageStream(aggregate.getId(), outputStream);
}
代码示例来源:origin: apache/samza
Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(1));
Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0");
Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 5);
Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1));
Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1000");
Assert.assertEquals(windowPanes.size(), 4);
Assert.assertEquals(windowPanes.get(3).getFiringType(), FiringType.DEFAULT);
Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(1));
Assert.assertEquals(windowPanes.get(3).getKey().getPaneId(), "1000");
代码示例来源:origin: org.apache.samza/samza-core_2.10
/**
* Computes the pane output corresponding to a {@link TriggerKey} that fired.
*/
private WindowPane<K, Object> computePaneOutput(TriggerKey<K> triggerKey, Object windowVal) {
WindowKey<K> windowKey = new WindowKey(triggerKey.getKey(), Long.toString(triggerKey.getTimestamp()));
WindowPane<K, Object> paneOutput =
new WindowPane<>(windowKey, windowVal, window.getAccumulationMode(), triggerKey.getType());
LOG.trace("Emitting pane output for trigger key {}", triggerKey);
return paneOutput;
}
代码示例来源:origin: apache/samza
void translate(final LogicalAggregate aggregate, final TranslatorContext context) {
validateAggregateFunctions(aggregate);
MessageStream<SamzaSqlRelMessage> inputStream = context.getMessageStream(aggregate.getInput().getId());
// At this point, the assumption is that only count function is supported.
SupplierFunction<Long> initialValue = () -> (long) 0;
FoldLeftFunction<SamzaSqlRelMessage, Long> foldCountFn = (m, c) -> c + 1;
final ArrayList<String> aggFieldNames = getAggFieldNames(aggregate);
MessageStream<SamzaSqlRelMessage> outputStream =
inputStream
.map(new TranslatorInputMetricsMapFunction(logicalOpId))
.window(Windows.keyedTumblingWindow(m -> m,
Duration.ofMillis(context.getExecutionContext().getSamzaSqlApplicationConfig().getWindowDurationMs()),
initialValue,
foldCountFn,
new SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde(),
new LongSerde())
.setAccumulationMode(
AccumulationMode.DISCARDING), changeLogStorePrefix + "_tumblingWindow_" + logicalOpId)
.map(windowPane -> {
List<String> fieldNames = windowPane.getKey().getKey().getSamzaSqlRelRecord().getFieldNames();
List<Object> fieldValues = windowPane.getKey().getKey().getSamzaSqlRelRecord().getFieldValues();
fieldNames.add(aggFieldNames.get(0));
fieldValues.add(windowPane.getMessage());
return new SamzaSqlRelMessage(fieldNames, fieldValues, new SamzaSqlRelMsgMetadata("", "", ""));
});
context.registerMessageStream(aggregate.getId(), outputStream);
outputStream.map(new TranslatorOutputMetricsMapFunction(logicalOpId));
}
代码示例来源:origin: org.apache.samza/samza-core
/**
* Computes the pane output corresponding to a {@link TriggerKey} that fired.
*/
private WindowPane<K, Object> computePaneOutput(TriggerKey<K> triggerKey, Object windowVal) {
WindowKey<K> windowKey = new WindowKey(triggerKey.getKey(), Long.toString(triggerKey.getTimestamp()));
WindowPane<K, Object> paneOutput =
new WindowPane<>(windowKey, windowVal, window.getAccumulationMode(), triggerKey.getType());
LOG.trace("Emitting pane output for trigger key {}", triggerKey);
return paneOutput;
}
代码示例来源:origin: apache/samza
@Test
public void testSessionWindowsAccumulatingMode() throws Exception {
OperatorSpecGraph sgb = this.getKeyedSessionWindowStreamGraph(AccumulationMode.DISCARDING,
Duration.ofMillis(500)).getOperatorSpecGraph();
TestClock testClock = new TestClock();
StreamOperatorTask task = new StreamOperatorTask(sgb, testClock);
List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
MessageCollector messageCollector =
envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
task.init(this.context);
task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
testClock.advanceTime(Duration.ofSeconds(1));
task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
testClock.advanceTime(Duration.ofSeconds(1));
task.window(messageCollector, taskCoordinator);
Assert.assertEquals(windowPanes.size(), 2);
Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2));
Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 4);
}
代码示例来源:origin: apache/samza
new StringSerde(),
new LongSerde()), "count-by-country")
.map(pane -> new KV<>(pane.getKey().getKey(), pane.getMessage()))
.sendTo(outputStream);
}, config);
内容来源于网络,如有侵权,请联系作者删除!