在Hadoop中处理大量小文件的问题的主要方式是:
1、 合并小文件:
- 使用文件合并工具如CombineFileInputFormat统一合并小文件。
- 自定义InputFormat实现小文件合并。
2、 序列化和缩小小文件:
- 使用SequenceFileOutputFormat将小文件序列化成SequenceFile。
- 自定义OutputFormat实现小文件压缩和编码。
3、 在MapReduce中处理小文件:
- 自定义InputFormat将多个小文件作为一个split输入。
- 在Mapper中对多个小文件进行聚合。
4、 使用CombineFileInputFormat示例:
- 设置minimumSplitSize和maximumSplitSize:
<property>
<name>mapreduce.input.fileinputformat.split.maxsize</name>
<value>536870912</value>
</property>
<property>
<name>mapreduce.input.fileinputformat.split.minsize</name>
<value>536870912</value>
</property>
- 在作业中设置InputFormatClass:
job.setInputFormatClass(CombineFileInputFormat.class);
5、 自定义InputFormat示例:
- 继承FileInputFormat,重写getSplits()方法:
public class CustomInputFormat extends FileInputFormat<LongWritable, Text> {
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException {
List<FileSplit> splits = new ArrayList<>();
long maxSize = 536870912; // 512MB
long totalSize = 0;
List<FileStatus> files = listStatus(context);
for(FileStatus file : files) {
if((totalSize + file.getLen()) <= maxSize) {
totalSize += file.getLen();
} else {
splits.add(new FileSplit(files, 0, totalSize, new String[0]));
totalSize = file.getLen();
}
}
splits.add(new FileSplit(files, 0, totalSize, new String[0]));
return splits;
}
}
- 在作业中设置InputFormatClass:
job.setInputFormatClass(CustomInputFormat.class);
大文件处理的主要作用是:
- 避免产生过多的Map任务导致资源浪费。
- 合理设置块大小提高 NameNode 和 DataNode 的性能。
- 统一处理小文件更易于业务实现。
来看一些简单示例:
- CombineFileInputFormat – 配置:
<property>
<name>mapreduce.input.fileinputformat.split.maxsize</name>
<value>536870912</value>
</property>
<property>
<name>mapreduce.input.fileinputformat.split.minsize</name>
<value>536870912</value>
</property>
- 设置InputFormatClass:
job.setInputFormatClass(CombineFileInputFormat.class);
- 自定义InputFormat:
public class CustomInputFormat extends FileInputFormat<LongWritable, Text> {
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException {
List<FileSplit> splits = new ArrayList<>();
long maxSize = 536870912; // 512MB
long totalSize = 0;
List<FileStatus> files = listStatus(context);
for(FileStatus file : files) {
if((totalSize + file.getLen()) <= maxSize) {
totalSize += file.getLen();
} else {
splits.add(new FileSplit(files, 0, totalSize, new String[0]));
totalSize = file.getLen();
}
}
splits.add(new FileSplit(files, 0, totalSize, new String[0]));
return splits;
}
@Override
public RecordReader<LongWritable, Text> createRecordReader(
InputSplit split, TaskAttemptContext context) {
return new CombineFileRecordReader((FileSplit)split, context);
}
}
- 重写createRecordReader()方法返回CombineFileRecordReader。
- 在作业中设置InputFormatClass:
job.setInputFormatClass(CustomInputFormat.class);
- SequenceFileOutputFormat – 设置OutputFormatClass:
job.setOutputFormatClass(SequenceFileOutputFormat.class);
- Mapper输出的key和value类型必须是WritableComparable。
所以通过InputFormat设置最小最大分片大小、自定义InputFormat实现文件合并,或使用SequenceFileOutputFormat序列化文件,我们可以高效解决Hadoop中的小文件问题。
自定义InputFormat是解决小文件问题的最佳方式,因为它可以最大限度满足我们的业务需求。SequenceFileOutputFormat也是一种简单高效的小文件压缩方法,尤其适合 Reduce端的输出文件。