Flink从入门到实战七[ProcessFunction]-5-侧流输出

侧流是什么,这个名字听起来有点奇怪,侧流的作用就是在主流处理过程中,再定义一个或多个输出流,输出一些额外的信息。
在实际开发中侧流主要有两个作用,一是能够按照给定的数据切分逻辑,将输入数据切分成不同的子集,二是支持将迟到数据输出到侧流。

侧流的使用
1、首先需要定义用于标识旁路输出流的 OutputTag:
OutputTag outputTag = new OutputTag(“side-output-id”) {};
这需要是一个匿名的内部类,以便我们分析类型

2、然后使用Context 参数,将数据发送到由 OutputTag 标识的旁路输出

3、最后从主流上get到侧流的DataStream结果

以上三步总结代码为:

final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};

SingleOutputStreamOperator<Integer> mainDataStream = ...;

DataStream<String> sideOutputStream = mainDataStream.getSideOutput(outputTag);

可以通过以下方法将数据发送到旁路输出:
ProcessFunction
KeyedProcessFunction
CoProcessFunction
KeyedCoProcessFunction
ProcessWindowFunction
ProcessAllWindowFunction

我们来看demo,我们将交易金额小于10元的交易,放到侧流输出
代码:

package com.itzhimei.process;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;

/**
 * 将交易金额小于10元的交易,放到侧流输出
 */
public class ProcessFunction_4_SideOutput {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Transaction> transactions = env
                .addSource(new TransactionSource())
                .name("transactions");

        OutputTag<Transaction> outputTag = new OutputTag<Transaction>("TXOutputTag");

        SingleOutputStreamOperator<Transaction> process = transactions.process(new ProcessFunction<Transaction, Transaction>() {
            @Override
            public void processElement(Transaction value, Context ctx, Collector<Transaction> out) throws Exception {
                if (value.getAmount() >= 10) {
                    out.collect(value);
                } else {
                    // 发送数据到旁路输出
                    ctx.output(outputTag, value);
                }
            }
        });

        DataStream<Transaction> sideOutput = process.getSideOutput(outputTag);

        process.print("main");
        sideOutput.print("sideOut");
        env.execute();
    }
}

/* 输出结果
main:3> Transaction{accountId=1, timestamp=1546272000000, amount=188.23}
main:4> Transaction{accountId=2, timestamp=1546272360000, amount=374.79}
main:5> Transaction{accountId=3, timestamp=1546272720000, amount=112.15}
main:6> Transaction{accountId=4, timestamp=1546273080000, amount=478.75}
main:7> Transaction{accountId=5, timestamp=1546273440000, amount=208.85}
main:8> Transaction{accountId=1, timestamp=1546273800000, amount=379.64}
main:1> Transaction{accountId=2, timestamp=1546274160000, amount=351.44}
main:2> Transaction{accountId=3, timestamp=1546274520000, amount=320.75}
main:3> Transaction{accountId=4, timestamp=1546274880000, amount=259.42}
main:4> Transaction{accountId=5, timestamp=1546275240000, amount=273.44}
main:5> Transaction{accountId=1, timestamp=1546275600000, amount=267.25}
main:6> Transaction{accountId=2, timestamp=1546275960000, amount=397.15}
sideOut:7> Transaction{accountId=3, timestamp=1546276320000, amount=0.219}
main:8> Transaction{accountId=4, timestamp=1546276680000, amount=231.94}
main:1> Transaction{accountId=5, timestamp=1546277040000, amount=384.73}
main:2> Transaction{accountId=1, timestamp=1546277400000, amount=419.62}
main:3> Transaction{accountId=2, timestamp=1546277760000, amount=412.91}
sideOut:4> Transaction{accountId=3, timestamp=1546278120000, amount=0.77}
main:5> Transaction{accountId=4, timestamp=1546278480000, amount=22.1}
main:6> Transaction{accountId=5, timestamp=1546278840000, amount=377.54}
main:7> Transaction{accountId=1, timestamp=1546279200000, amount=375.44}
main:8> Transaction{accountId=2, timestamp=1546279560000, amount=230.18}
sideOut:1> Transaction{accountId=3, timestamp=1546279920000, amount=0.8}
main:2> Transaction{accountId=4, timestamp=1546280280000, amount=350.89}
main:3> Transaction{accountId=5, timestamp=1546280640000, amount=127.55}
 */