Window 还可以将两个流中的元素进行关联合并。官方解释:窗口连接将共享一个公共Key且位于同一窗口中的两个流的元素连接在一起。这些窗口可以通过使用窗口分配器来定义,并对来自两个流的元素进行处理。
Window Join按照窗口进行两个流的合并,关联方式类似两个数据库表的Inner Join,如果在一个窗口内,只有一个流的数据,那么将没有关联结果。在窗口内两个流数据的关联,则是基于相同key,进行笛卡尔积方式的关联。
主要有三种合并方式:Tumbling Window Join、Sliding Window Join、Session Window Join。
API:
stream.join(otherStream)
.where()
.equalTo()
.window()
.apply()
.where() 是指定第一个流的关联的key
.equalTo() 是指定第二个流的关联的key
.window() 分配关联的窗口
.apply() 自定义关联后的数据如何处理,可以定义JoinFunction和FlatJoinFunction两种函数
我们以Tumbling Window Join来演示代码:
package com.itzhimei.window;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;
/**
* Window Join
* 两个流基于窗口进行关联
*
*/
public class Window_12_Join {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Transaction> dataStreamA = env
.addSource(new TransactionSource())
.name("transactionsA");
DataStream<Transaction> dataStreamB = env
.addSource(new TransactionSource())
.name("transactionsB");
DataStream<Tuple2<Long, Double>> apply = dataStreamA.join(dataStreamB)
//指定第一个流的关联的key
.where(Transaction::getAccountId)
//指定第二个流的关联的key
.equalTo(Transaction::getAccountId)
//分配关联的窗口
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
//自定义关联后的数据如何处理,可以定义JoinFunction和FlatJoinFunction两种函数
.apply(new JoinFunction<Transaction, Transaction, Tuple2<Long, Double>>() {
@Override
public Tuple2<Long, Double> join(Transaction first, Transaction second) throws Exception {
return new Tuple2<>(first.getAccountId(), first.getAmount() + second.getAmount());
}
});
apply.print();
env.execute();
}
}
/*
15> (3,224.3)
1> (4,957.5)
15> (3,432.9)
1> (4,738.1700000000001)
11> (1,376.46)
11> (1,567.87)
16> (2,749.58)
11> (1,455.48)
16> (2,726.23)
16> (2,771.94)
1> (4,710.69)
15> (3,112.369)
1> (4,500.85)
1> (4,829.64)
16> (2,787.7)
11> (1,607.85)
15> (3,112.92)
1> (4,542.94)
16> (2,604.97)
1> (4,738.1700000000001)
11> (1,563.67)
1> (4,518.84)
11> (1,672.14)
1> (4,491.36)
11> (1,244.35)
*/