当有两条输入流输入数据,DataStream API提供了CoProcessFunction。CoProcessFunction支持为每一个输入流提供一个processElement()方法。
我们来演示一个简单的demo,程序先模拟两个输入流,一个是交易金额大于10元的流,一个是交易金额小于10元的流,我们在两个流上的processElement()方法,都过滤掉用户1和用户5。
代码:
package com.itzhimei.process;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;
public class ProcessFunction_3_CoProcessFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Transaction> tx1 = env
.addSource(new TransactionSource()).filter(new FilterFunction<Transaction>() {
@Override
public boolean filter(Transaction value) throws Exception {
return value.getAmount()>=10;
}
})
.name("transactions");
DataStream<Transaction> tx2 = env
.addSource(new TransactionSource()).filter(new FilterFunction<Transaction>() {
@Override
public boolean filter(Transaction value) throws Exception {
return value.getAmount()<10;
}
})
.name("transactions");
//tx1.print("tx111111");
//tx2.print("tx222222");
SingleOutputStreamOperator<Transaction> process = tx1.connect(tx2).process(new CoProcessFunction<Transaction, Transaction, Transaction>() {
@Override
public void processElement1(Transaction value, Context ctx, Collector<Transaction> out) throws Exception {
if(value.getAccountId() != 1 && value.getAccountId() != 5) {
out.collect(value);
}
}
@Override
public void processElement2(Transaction value, Context ctx, Collector<Transaction> out) throws Exception {
if(value.getAccountId() != 1 && value.getAccountId() != 5) {
out.collect(value);
}
}
});
process.print();
env.execute();
}
}
/* 输出结果
1> Transaction{accountId=2, timestamp=1546272360000, amount=374.79}
2> Transaction{accountId=3, timestamp=1546272720000, amount=112.15}
3> Transaction{accountId=4, timestamp=1546273080000, amount=478.75}
6> Transaction{accountId=2, timestamp=1546274160000, amount=351.44}
7> Transaction{accountId=3, timestamp=1546274520000, amount=320.75}
8> Transaction{accountId=4, timestamp=1546274880000, amount=259.42}
3> Transaction{accountId=2, timestamp=1546275960000, amount=397.15}
2> Transaction{accountId=3, timestamp=1546276320000, amount=0.219}
5> Transaction{accountId=4, timestamp=1546276680000, amount=231.94}
8> Transaction{accountId=2, timestamp=1546277760000, amount=412.91}
7> Transaction{accountId=3, timestamp=1546278120000, amount=0.77}
2> Transaction{accountId=4, timestamp=1546278480000, amount=22.1}
5> Transaction{accountId=2, timestamp=1546279560000, amount=230.18}
4> Transaction{accountId=3, timestamp=1546279920000, amount=0.8}
7> Transaction{accountId=4, timestamp=1546280280000, amount=350.89}
2> Transaction{accountId=2, timestamp=1546281360000, amount=228.22}
*/