📝FlinkX源码剖析(2):基于JDBC的Source和Sink算子构造流程。代码注释见。
Source算子的构造
上一篇文章分析到,FlinkX通过SourceFactory.createSource()
来创建source算子并返回DataStream<RowData>。由于SourceFactory是一个抽象类,并且createSource方法是一个抽象方法,因此必定由SourceFactory的具体子类来实现createSource方法(比如MysqlSourceFactory)。实际上,几乎所有关系型数据库都继承自JdbcSourceFactory并复用其方法,接下来具体分析JdbcSourceFactory源码(如Kafka和HDFS等Source其他算子之后再看)。
public DataStream<RowData> createSource() {
JdbcInputFormatBuilder builder = getBuilder();
// fetchSize和queryTimeOut不可以为0,否则用默认设置替换
int fetchSize = jdbcConf.getFetchSize();
jdbcConf.setFetchSize(fetchSize == 0 ? getDefaultFetchSize() : fetchSize);
int queryTimeOut = jdbcConf.getQueryTimeOut();
jdbcConf.setQueryTimeOut(queryTimeOut == 0 ? DEFAULT_QUERY_TIMEOUT : queryTimeOut);
// 给builder设置jdbcConf、jdbcDialect和rowConverter
builder.setJdbcConf(jdbcConf);
builder.setJdbcDialect(jdbcDialect);
AbstractRowConverter rowConverter = null;
if (!useAbstractBaseColumn) {
checkConstant(jdbcConf);
final RowType rowType =
TableUtil.createRowType(jdbcConf.getColumn(), getRawTypeConverter());
rowConverter = jdbcDialect.getRowConverter(rowType);
}
builder.setRowConverter(rowConverter);
return createInput(builder.finish());
}
createSource的逻辑可以分为3步:1.设置JdbcInputFormatBuilder.设置RowConverter;3.创建输出流。
JdbcInputFormatBuilder
从其名称后缀可以看出,JdbcInputFormatBuilder是JdbcInputFormat的构造类,即FlinkX使用Builder设计模式来创建JdbcInputFormat对象。getBuilder方法如下所示:
protected JdbcInputFormatBuilder getBuilder() {
return new JdbcInputFormatBuilder(new JdbcInputFormat());
}
JdbcSourceFactory的子类可以通过重写getBuilder方法来返回自定义的JdbcInputFormat,比如Db2SourceFactory。而JdbcInputFormat继承自BaseRichInputFormat,它是FlinkX所有source算子的基础,并继承自Flink API 提供的RichInputFormat类。现在只需要先记着我们构造的InputFormat是JdbcInputFormat实例。
RowConverter
RowConverter负责将JDBC读取的数据转为Flink API中的RowData形式,更具体点,它将JDBC字段类型映射为Flink中的DataType类型。以MySQL为例分析,TableUtil.createRowType方法源码如下:
public static RowType createRowType(List<FieldConf> fields, RawTypeConverter converter) {
String[] fieldNames = fields.stream().map(FieldConf::getName).toArray(String[]::new);
String[] fieldTypes = fields.stream().map(FieldConf::getType).toArray(String[]::new);
TableSchema.Builder builder = TableSchema.builder();
for (int i = 0; i < fieldTypes.length; i++) {
DataType dataType = converter.apply(fieldTypes[i]);
builder.add(TableColumn.physical(fieldNames[i], dataType));
}
return (RowType) builder.build().toRowDataType().getLogicalType();
}
该方法遍历字段类型列表,由RawTypeConverter.apply方法返回每个类型应该对应的DataType类型。因此,函数式接口RawTypeConverter的实现就尤为重要,并且每种数据库的应该各不相同。如下代码所示,RawTypeConverter由JdbcSourceFactory类的成员变量jdbcDialect给出:
@Override
public RawTypeConverter getRawTypeConverter() {
return jdbcDialect.getRawTypeConverter();
}
而jdbcDialect又是在JdbcSourceFactory唯一的构造函数中被设置,如下所示:
public JdbcSourceFactory(
SyncConf syncConf, StreamExecutionEnvironment env, JdbcDialect jdbcDialect) {
// 又分了一层,父类调用
super(syncConf, env);
// jdbcDialect由每个子类自己实现
this.jdbcDialect = jdbcDialect;
...
}
回顾上一篇文章的内容,通过反射来创建SourceFactory对象的代码为Constructor<?> constructor =clazz.getConstructor(SyncConf.class, StreamExecutionEnvironment.class)
。诶,怎么少了一个JdbcDialect参数?注意实际调用的是子类xxxSourceFactory的构造方法,以MysqlSourceFactory为例,可以看到它是传入了自己定义的MysqlDialect对象,而MysqlDialect的getRawTypeConverter返回的是MysqlRawTypeConverter类的apply方法引用,由它来决定MySQL的JDBC字段类型应该对应哪一种Flink DataTypes。
public MysqlSourceFactory(SyncConf syncConf, StreamExecutionEnvironment env) {
// JdbcSourceFactory完成通用关系型数据库的公共功能
super(syncConf, env, new MysqlDialect());
// 避免result.next阻塞
if (jdbcConf.isPolling()
&& StringUtils.isEmpty(jdbcConf.getStartLocation())
&& jdbcConf.getFetchSize() == 0) {
jdbcConf.setFetchSize(1000);
}
}
小结
只需要实现自定义的xxxSourceFactory、xxxDialect和xxxRawTypeConverter类就可以轻松地添加一种JDBC数据源的source算子,这是策略模式+模板模式的好处。
创建输出流
JdbcSourceFactory通过createInput(builder.finish())
来创建输出流,该方法又调用来自父类SourceFactory的createInput方法:
protected DataStream<RowData> createInput(InputFormat<RowData, InputSplit> inputFormat) {
return createInput(inputFormat, this.getClass().getSimpleName().toLowerCase());
}
SourceFactory类createInput方法代码如下所示,在检查传入参数非空后,通过传入的inputformat和getTypeInformation()方法的返回值构造DtInputFormatSourceFunction,至此终于得到了SourceFunction方法,接下来就是使用env.add方法将其添加到流执行环境,并返回输出流。
protected DataStream<RowData> createInput(
InputFormat<RowData, InputSplit> inputFormat, String sourceName) {
Preconditions.checkNotNull(sourceName);
Preconditions.checkNotNull(inputFormat);
DtInputFormatSourceFunction<RowData> function =
new DtInputFormatSourceFunction<>(inputFormat, getTypeInformation());
// 添加source算子,算子名称是工厂类名的全小写模式,如mysqlsourcefactory
return env.addSource(function, sourceName, getTypeInformation());
}
DtInputFormatSourceFunction
DtInputFormatSourceFunction继承自Flink DataStream API中的InputFormatSourceFunction,顾名思义,它依赖于InputFormat来提供数据。DtInputFormatSourceFunction重写了如下生命周期方法:
// 准备输出环境
public void open(Configuration parameters);
// 执行输出数据流的函数体
public void run(SourceContext<OUT> ctx) throws Exception;
// 取消执行,就是将run方法的运行标志值为false
public void cancel();
// 关闭输出流,调用gracefulClose代劳
public void public void close() throws Exception;
其中,最重要的run方法代码如下所示。首先调用format的openInputFormat方法设置设置jobName、jobId和任务编号,并且还设置了读取的开始时间。接着在while循环中不断产生一条输出数据,关键方法分别是format.open()、format.nextRecord,接下来具体分析。
DtInputFormatSourceFunction.run()
public void run(SourceContext<OUT> ctx) throws Exception {
Exception tryException = null;
try {
Counter completedSplitsCounter =
getRuntimeContext().getMetricGroup().counter("numSplitsProcessed");
if (isRunning && format instanceof RichInputFormat) {
// openInputFormat设置jobName、jobId和任务编号,并且开启自定义reporter
// 并且设置读取的开始时间
((RichInputFormat) format).openInputFormat();
}
OUT nextElement = serializer.createInstance();
while (isRunning) {
// 调用BaseRichInputFormat.open方法
format.open(splitIterator.next());
// for each element we also check if cancel
// was called by checking the isRunning flag
while (isRunning && !format.reachedEnd()) {
synchronized (ctx.getCheckpointLock()) {
nextElement = format.nextRecord(nextElement);
if (nextElement != null) {
ctx.collect(nextElement);
}
}
}
format.close();
completedSplitsCounter.inc();
if (isRunning) {
isRunning = splitIterator.hasNext();
}
}
} catch (Exception exception) {
tryException = exception;
LOG.error("Exception happened, start to close format", exception);
} finally {
isRunning = false;
gracefulClose();
if (null != tryException) {
throw tryException;
}
}
}
BaseRichInputFormat.open方法
该方法核心功能通过调用抽象方法openInternal实现,即由每个子类根据自身情况实现。对于关系型数据库,JdbcInputFormat类在openInternal方法中实现打开数据库连接并查询记录的逻辑,具体的可以分为3步:获取连接、构造查询SQL、执行SQL。
第一步:获取JDBC连接,由getConnection方法实现,注意紧接着将autocommit设置为false。
第二步:构造查询SQL,FlinkX根据自身设计的业务逻辑分层构造sql。