Hadoop中如何进行数据的转换和格式化?代码举例讲解

在Hadoop中进行数据转换和格式化的主要方式是:

1、 使用Hive进行数据格式转换:

  • 支持 converts 数据格式,如TEXTFILE -> ORCFILE。
  • 支持使用CAST函数转换列数据类型,如STRING -> BIGINT。
  • 需要设定目标表的存储格式与数据结构。

2、 使用Sqoop进行数据导入与导出:

  • 支持将MySQL/Postgres的数据导入到HDFS/Hive/HBase。
  • 支持从HDFS/Hive/HBase导出数据到MySQL/Postgres。
  • 支持全量/增量导入,以及不同数据格式如CSV、JSON、Avro等。

3、 使用Flume进行日志聚合与数据抽取:

  • 以agent的形式部署在数据源,将各种格式的日志数据采集到HDFS/Hive/HBase。
  • 支持各种数据源如文件、websocket、thrift、exec等。
  • 支持多种数据格式并可自定义数据转换逻辑。

4、 使用MapReduce进行大规模的数据转换与格式化:

  • 自定义MapReduce作业对输入数据进行转换、过滤、聚合或格式化。
  • 支持任意数据格式和转换逻辑,性能远超 Hive/Sqoop。
  • 需要编写Mapper、Reducer、Partitioner等组件,然后打包运行。

5、 Hive数据格式转换示例:

  • 创建TEXTFILE格式的输入表:
CREATE TABLE students (  
    name STRING,  
    age INT  
) 
ROW FORMAT DELIMITED  
FIELDS TERMINATED BY '\t'  
STORED AS TEXTFILE;
  • 创建ORC格式的输出表:
CREATE TABLE students_orc (  
    name STRING,  
    age INT 
) 
STORED AS ORC;
  • 执行INSERT OVERWRITE将数据转换写入:
INSERT OVERWRITE TABLE students_orc
SELECT * FROM students;

数据转换和格式化的主要作用是:

  1. 支持不同数据源之间的两级互操作与迁移。
  2. 统一数据格式,满足计算引擎的输入要求。
  3. 提高查询效率,降低数据冗余。
  4. 使生态中各存储、计算组件能有效协同工作。

来看一些简单示例:
1、 使用Hive创建不同格式的表:

CREATE TABLE students (  
    name STRING,  
    age INT  
) 
ROW FORMAT DELIMITED  
FIELDS TERMINATED BY '\t'  
STORED AS TEXTFILE;

CREATE TABLE students_orc (  
    name STRING,  
    age INT 
)  
STORED AS ORC;
  • 测Hive INSERT OVERWRITE 执行转换:
INSERT OVERWRITE TABLE students_orc  
SELECT * FROM students;

2、 使用Sqoop将MySQL数据导入Hive:

sqoop import \
--connect jdbc:mysql://mysql.example.com/students \  
--username student --password study \
--table students  \
--hive-import
  • 这会在Hive中创建students表并导入MySQL数据。
    3、 使用Flume采集日志数据到HDFS:
properties
# Flume Agent Config
agent.sources = tailsource
agent.channels = memorychannel 
agent.sinks = hdfs-sink  

# Source Config
agent.sources.tailsource.type = TAILSOURCE
agent.sources.tailsource.positionFile = /var/log/flume-tail-positions.json
agent.sources.tailsource.filegroups = f1
agent.sources.tailsource.filegroups.f1 = /var/log/*.log

# Sink Config  
agent.sinks.hdfs-sink.type = HDFS 
agent.sinks.hdfs-sink.hdfs.path = hdfs://namenode/flume/logs  
agent.sinks.hdfs-sink.hdfs.filePrefix = logs-
agent.sinks.hdfs-sink.hdfs.round = true
agent.sinks.hdfs-sink.hdfs.roundValue = 10 
agent.sinks.hdfs-sink.hdfs.roundUnit = minute

# Channel Config
agent.channels.memorychannel.type = MEMORY 

# Bindings  
agent.sources.tailsource.channels = memorychannel
agent.sinks.hdfs-sink.channel = memorychannel 
  • Flume会tail /var/log/下所有.log文件
  • 每10分钟将采集的日志数据写入HDFS的/flume/logs目录
  • 支持文本、json、avro等多种日志数据格式

4、 自定义MapReduce作业转换数据:

  • 编写Mapper将每行以‘|’分隔的输入字符串反转:
java
public class ReverseMapper extends Mapper<LongWritable, Text, Text, Text> {
    private Text outKey = new Text(); 
    private Text outValue = new Text();  

    @Override
    public void map(LongWritable key, Text value, Context context)  
        throws IOException, InterruptedException {  
        String line = value.toString();
        String[] split = line.split("\\|");
        outKey.set(split[0]);
        outValue.set(new StringBuilder(split[1]).reverse().toString());
        context.write(outKey, outValue);
    } 
}
  • 编写Reducer输出最终结果:
java
public class ReverseReducer extends Reducer<Text, Text, Text, Text> {
    private Text result = new Text();

    @Override
    public void reduce(Text key, Iterable<Text> values, Context context)  
        throws IOException, InterruptedException {      
        StringBuilder sb = new StringBuilder();
        for (Text val : values) {
            sb.append(val.toString()).append("|");
        }
        result.set(sb.substring(0, sb.length() - 1));
        context.write(key, result);
    }
}
  • 打包运行MapReduce作业对输入数据进行反转转换。