在Hadoop中进行数据转换和格式化的主要方式是:
1、 使用Hive进行数据格式转换:
- 支持 converts 数据格式,如TEXTFILE -> ORCFILE。
- 支持使用CAST函数转换列数据类型,如STRING -> BIGINT。
- 需要设定目标表的存储格式与数据结构。
2、 使用Sqoop进行数据导入与导出:
- 支持将MySQL/Postgres的数据导入到HDFS/Hive/HBase。
- 支持从HDFS/Hive/HBase导出数据到MySQL/Postgres。
- 支持全量/增量导入,以及不同数据格式如CSV、JSON、Avro等。
3、 使用Flume进行日志聚合与数据抽取:
- 以agent的形式部署在数据源,将各种格式的日志数据采集到HDFS/Hive/HBase。
- 支持各种数据源如文件、websocket、thrift、exec等。
- 支持多种数据格式并可自定义数据转换逻辑。
4、 使用MapReduce进行大规模的数据转换与格式化:
- 自定义MapReduce作业对输入数据进行转换、过滤、聚合或格式化。
- 支持任意数据格式和转换逻辑,性能远超 Hive/Sqoop。
- 需要编写Mapper、Reducer、Partitioner等组件,然后打包运行。
5、 Hive数据格式转换示例:
- 创建TEXTFILE格式的输入表:
CREATE TABLE students (
name STRING,
age INT
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE;
- 创建ORC格式的输出表:
CREATE TABLE students_orc (
name STRING,
age INT
)
STORED AS ORC;
- 执行INSERT OVERWRITE将数据转换写入:
INSERT OVERWRITE TABLE students_orc
SELECT * FROM students;
数据转换和格式化的主要作用是:
- 支持不同数据源之间的两级互操作与迁移。
- 统一数据格式,满足计算引擎的输入要求。
- 提高查询效率,降低数据冗余。
- 使生态中各存储、计算组件能有效协同工作。
来看一些简单示例:
1、 使用Hive创建不同格式的表:
CREATE TABLE students (
name STRING,
age INT
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE;
CREATE TABLE students_orc (
name STRING,
age INT
)
STORED AS ORC;
- 测Hive INSERT OVERWRITE 执行转换:
INSERT OVERWRITE TABLE students_orc
SELECT * FROM students;
2、 使用Sqoop将MySQL数据导入Hive:
sqoop import \
--connect jdbc:mysql://mysql.example.com/students \
--username student --password study \
--table students \
--hive-import
- 这会在Hive中创建students表并导入MySQL数据。
3、 使用Flume采集日志数据到HDFS:
properties
# Flume Agent Config
agent.sources = tailsource
agent.channels = memorychannel
agent.sinks = hdfs-sink
# Source Config
agent.sources.tailsource.type = TAILSOURCE
agent.sources.tailsource.positionFile = /var/log/flume-tail-positions.json
agent.sources.tailsource.filegroups = f1
agent.sources.tailsource.filegroups.f1 = /var/log/*.log
# Sink Config
agent.sinks.hdfs-sink.type = HDFS
agent.sinks.hdfs-sink.hdfs.path = hdfs://namenode/flume/logs
agent.sinks.hdfs-sink.hdfs.filePrefix = logs-
agent.sinks.hdfs-sink.hdfs.round = true
agent.sinks.hdfs-sink.hdfs.roundValue = 10
agent.sinks.hdfs-sink.hdfs.roundUnit = minute
# Channel Config
agent.channels.memorychannel.type = MEMORY
# Bindings
agent.sources.tailsource.channels = memorychannel
agent.sinks.hdfs-sink.channel = memorychannel
- Flume会tail /var/log/下所有.log文件
- 每10分钟将采集的日志数据写入HDFS的/flume/logs目录
- 支持文本、json、avro等多种日志数据格式
4、 自定义MapReduce作业转换数据:
- 编写Mapper将每行以‘|’分隔的输入字符串反转:
java
public class ReverseMapper extends Mapper<LongWritable, Text, Text, Text> {
private Text outKey = new Text();
private Text outValue = new Text();
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] split = line.split("\\|");
outKey.set(split[0]);
outValue.set(new StringBuilder(split[1]).reverse().toString());
context.write(outKey, outValue);
}
}
- 编写Reducer输出最终结果:
java
public class ReverseReducer extends Reducer<Text, Text, Text, Text> {
private Text result = new Text();
@Override
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
StringBuilder sb = new StringBuilder();
for (Text val : values) {
sb.append(val.toString()).append("|");
}
result.set(sb.substring(0, sb.length() - 1));
context.write(key, result);
}
}
- 打包运行MapReduce作业对输入数据进行反转转换。