在Hadoop中进行数据采集和清洗的主要方式是:
- Flume进行流式数据采集:
- Agent发送数据到Flume Channel。
- Channel缓存数据并发送到Sink,如HDFS。
- 可以自定义 Interceptor 实现数据清洗。
- Sqoop进行批量数据采集:
- 使用Sqoop从RDBMS中采集数据到HDFS。
- 可以指定WHERE语句进行数据过滤。
- 在Sqoop作业中设置validator实现数据清洗。
- Pig/Hive进行数据清洗:
- 过滤空值、重复数据和无效数据。
- 标准化字段名称和值。
- 处理异常数据或缺失的数据。
- Flume自定义Interceptor示例:
public class UppercaseInterceptor extends BaseInterceptor {
@Override
public Status intercept(Event event) {
byte[] body = event.getBody();
String bodyStr = new String(body);
bodyStr = bodyStr.toUpperCase();
event.setBody(bodyStr.getBytes());
return Status.ACCEPT;
}
}
- 添加到Flume Channel:
<sinks>
<sink class="org.apache.flume.sink.hdfs.HDFSEventSink" name="sink1">
<interceptors>
<interceptor>
<class>com.example.UppercaseInterceptor</class>
</interceptor>
</interceptors>
<hdfs>
<filePrefix>flume-data</filePrefix>
<inUseSuffix>.tmp</inUseSuffix>
<completedSuffix></completedSuffix>
</hdfs>
</sink>
</sinks>
- Sqoop validator示例:
- 实现
org.apache.sqoop.validation.Validator
接口。 - 在Sqoop作业中设置:
$ sqoop import ... --validator com.example.CustomValidator
数据采集和清洗的主要作用是:
- 从各种数据源采集数据到集群。
- 过滤和转换异常、无效与脏数据。
- 标准化数据格式,方便后续处理。
来看一些简单示例:
- Flume Interceptor – 转大写:
public class UppercaseInterceptor extends BaseInterceptor {
@Override
public Status intercept(Event event) {
// 转大写 ...
return Status.ACCEPT;
}
}
- 添加到Channel:
<interceptors>
<interceptor>
<class>com.example.UppercaseInterceptor</class>
</interceptor>
</interceptors>
- Sqoop validator:
- 实现Validator接口
- 在Sqoop作业中设置:
$ sqoop import ... --validator com.example.CustomValidator
所以通过Flume拦截器、Sqoop校验器以及Hive/Pig的数据清洗语句,我们可以对采集到的数据进行过滤、转换和校验,实现高效准确的大数据清洗工作。