侧流是什么,这个名字听起来有点奇怪,侧流的作用就是在主流处理过程中,再定义一个或多个输出流,输出一些额外的信息。
在实际开发中侧流主要有两个作用,一是能够按照给定的数据切分逻辑,将输入数据切分成不同的子集,二是支持将迟到数据输出到侧流。
侧流的使用
1、首先需要定义用于标识旁路输出流的 OutputTag:
OutputTag outputTag = new OutputTag(“side-output-id”) {};
这需要是一个匿名的内部类,以便我们分析类型
2、然后使用Context 参数,将数据发送到由 OutputTag 标识的旁路输出
3、最后从主流上get到侧流的DataStream结果
以上三步总结代码为:
final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
SingleOutputStreamOperator<Integer> mainDataStream = ...;
DataStream<String> sideOutputStream = mainDataStream.getSideOutput(outputTag);
可以通过以下方法将数据发送到旁路输出:
ProcessFunction
KeyedProcessFunction
CoProcessFunction
KeyedCoProcessFunction
ProcessWindowFunction
ProcessAllWindowFunction
我们来看demo,我们将交易金额小于10元的交易,放到侧流输出
代码:
package com.itzhimei.process;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;
/**
* 将交易金额小于10元的交易,放到侧流输出
*/
public class ProcessFunction_4_SideOutput {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Transaction> transactions = env
.addSource(new TransactionSource())
.name("transactions");
OutputTag<Transaction> outputTag = new OutputTag<Transaction>("TXOutputTag");
SingleOutputStreamOperator<Transaction> process = transactions.process(new ProcessFunction<Transaction, Transaction>() {
@Override
public void processElement(Transaction value, Context ctx, Collector<Transaction> out) throws Exception {
if (value.getAmount() >= 10) {
out.collect(value);
} else {
// 发送数据到旁路输出
ctx.output(outputTag, value);
}
}
});
DataStream<Transaction> sideOutput = process.getSideOutput(outputTag);
process.print("main");
sideOutput.print("sideOut");
env.execute();
}
}
/* 输出结果
main:3> Transaction{accountId=1, timestamp=1546272000000, amount=188.23}
main:4> Transaction{accountId=2, timestamp=1546272360000, amount=374.79}
main:5> Transaction{accountId=3, timestamp=1546272720000, amount=112.15}
main:6> Transaction{accountId=4, timestamp=1546273080000, amount=478.75}
main:7> Transaction{accountId=5, timestamp=1546273440000, amount=208.85}
main:8> Transaction{accountId=1, timestamp=1546273800000, amount=379.64}
main:1> Transaction{accountId=2, timestamp=1546274160000, amount=351.44}
main:2> Transaction{accountId=3, timestamp=1546274520000, amount=320.75}
main:3> Transaction{accountId=4, timestamp=1546274880000, amount=259.42}
main:4> Transaction{accountId=5, timestamp=1546275240000, amount=273.44}
main:5> Transaction{accountId=1, timestamp=1546275600000, amount=267.25}
main:6> Transaction{accountId=2, timestamp=1546275960000, amount=397.15}
sideOut:7> Transaction{accountId=3, timestamp=1546276320000, amount=0.219}
main:8> Transaction{accountId=4, timestamp=1546276680000, amount=231.94}
main:1> Transaction{accountId=5, timestamp=1546277040000, amount=384.73}
main:2> Transaction{accountId=1, timestamp=1546277400000, amount=419.62}
main:3> Transaction{accountId=2, timestamp=1546277760000, amount=412.91}
sideOut:4> Transaction{accountId=3, timestamp=1546278120000, amount=0.77}
main:5> Transaction{accountId=4, timestamp=1546278480000, amount=22.1}
main:6> Transaction{accountId=5, timestamp=1546278840000, amount=377.54}
main:7> Transaction{accountId=1, timestamp=1546279200000, amount=375.44}
main:8> Transaction{accountId=2, timestamp=1546279560000, amount=230.18}
sideOut:1> Transaction{accountId=3, timestamp=1546279920000, amount=0.8}
main:2> Transaction{accountId=4, timestamp=1546280280000, amount=350.89}
main:3> Transaction{accountId=5, timestamp=1546280640000, amount=127.55}
*/