自定义Source,官网有一个比较好的例子,用于生成信用卡交易数据,我们简单来看一下源码。
源码主要的功能类有两个:TransactionSource和TransactionIterator,Transaction是交易Model类。
TransactionSource是主类,继承自FromIteratorFunction,FromIteratorFunction实现了SourceFunction接口,所以TransactionSource本质就是一个SourceFunction类型。
TransactionSource内部定义了一个内部类RateLimitedIterator,RateLimitedIterator是一个迭代器类,用于获取交易数据。
真正实现交易数据生成的是TransactionIterator类,TransactionIterator支持生成有界数据和无界数据,TransactionSource调用的是生成无界数据的生成器。
生成方法:TransactionIterator类中有一个集合data,这里已经预置了一组数据,每次生成都是从集合里取一条数据进行加工并返回,如果是有界生成器,集合数据用完则不再生成数据,如果是无界生成器,集合数据取完之后,则重置下标,从头开始取数,继续生成数据。
代码:
import org.apache.flink.annotation.Public;
import org.apache.flink.streaming.api.functions.source.FromIteratorFunction;
import org.apache.flink.walkthrough.common.entity.Transaction;
import java.io.Serializable;
import java.util.Iterator;
/** A stream of transactions. */
@Public
public class TransactionSource extends FromIteratorFunction<Transaction> {
private static final long serialVersionUID = 1L;
public TransactionSource() {
super(new RateLimitedIterator<>(TransactionIterator.unbounded()));
}
private static class RateLimitedIterator<T> implements Iterator<T>, Serializable {
private static final long serialVersionUID = 1L;
private final Iterator<T> inner;
private RateLimitedIterator(Iterator<T> inner) {
this.inner = inner;
}
@Override
public boolean hasNext() {
return inner.hasNext();
}
@Override
public T next() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return inner.next();
}
}
}
import org.apache.flink.walkthrough.common.entity.Transaction;
import java.io.Serializable;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
/** An iterator of transaction events. */
final class TransactionIterator implements Iterator<Transaction>, Serializable {
private static final long serialVersionUID = 1L;
private static final Timestamp INITIAL_TIMESTAMP = Timestamp.valueOf("2019-01-01 00:00:00");
private static final long SIX_MINUTES = 6 * 60 * 1000;
private final boolean bounded;
private int index = 0;
private long timestamp;
static TransactionIterator bounded() {
return new TransactionIterator(true);
}
static TransactionIterator unbounded() {
return new TransactionIterator(false);
}
private TransactionIterator(boolean bounded) {
this.bounded = bounded;
this.timestamp = INITIAL_TIMESTAMP.getTime();
}
@Override
public boolean hasNext() {
if (index < data.size()) {
return true;
} else if (!bounded) {
index = 0;
return true;
} else {
return false;
}
}
@Override
public Transaction next() {
Transaction transaction = data.get(index++);
transaction.setTimestamp(timestamp);
timestamp += SIX_MINUTES;
return transaction;
}
private static List<Transaction> data =
Arrays.asList(
new Transaction(1, 0L, 188.23),
new Transaction(2, 0L, 374.79),
new Transaction(3, 0L, 112.15),
new Transaction(4, 0L, 478.75),
new Transaction(5, 0L, 208.85),
new Transaction(1, 0L, 379.64),
new Transaction(2, 0L, 351.44),
new Transaction(3, 0L, 320.75),
new Transaction(4, 0L, 259.42),
new Transaction(5, 0L, 273.44),
new Transaction(1, 0L, 267.25),
new Transaction(2, 0L, 397.15),
new Transaction(3, 0L, 0.219),
new Transaction(4, 0L, 231.94),
new Transaction(5, 0L, 384.73),
new Transaction(1, 0L, 419.62),
new Transaction(2, 0L, 412.91),
new Transaction(3, 0L, 0.77),
new Transaction(4, 0L, 22.10),
new Transaction(5, 0L, 377.54),
new Transaction(1, 0L, 375.44),
new Transaction(2, 0L, 230.18),
new Transaction(3, 0L, 0.80),
new Transaction(4, 0L, 350.89),
new Transaction(5, 0L, 127.55),
new Transaction(1, 0L, 483.91),
new Transaction(2, 0L, 228.22),
new Transaction(3, 0L, 871.15),
new Transaction(4, 0L, 64.19),
new Transaction(5, 0L, 79.43),
new Transaction(1, 0L, 56.12),
new Transaction(2, 0L, 256.48),
new Transaction(3, 0L, 148.16),
new Transaction(4, 0L, 199.95),
new Transaction(5, 0L, 252.37),
new Transaction(1, 0L, 274.73),
new Transaction(2, 0L, 473.54),
new Transaction(3, 0L, 119.92),
new Transaction(4, 0L, 323.59),
new Transaction(5, 0L, 353.16),
new Transaction(1, 0L, 211.90),
new Transaction(2, 0L, 280.93),
new Transaction(3, 0L, 347.89),
new Transaction(4, 0L, 459.86),
new Transaction(5, 0L, 82.31),
new Transaction(1, 0L, 373.26),
new Transaction(2, 0L, 479.83),
new Transaction(3, 0L, 454.25),
new Transaction(4, 0L, 83.64),
new Transaction(5, 0L, 292.44));
}