保姆级教程:用DataX实现跨数据源同步,解决MySQL到Hive数据集成难题

一、引言 (Introduction)

钩子 (The Hook)

你是否曾面临这样的困境:公司业务数据分散在MySQL、Oracle、SQL Server等不同的数据库中,而数据分析团队却需要将这些数据汇总到Hive数据仓库进行统一分析?手动写脚本导数据?效率低下且易出错。使用商业ETL工具?成本高昂,学习曲线陡峭。作为数据工程师,你是否渴望找到一个开源、高效、灵活且易于上手的工具来解决这些跨数据源同步的难题?如果你点头,那么请继续阅读,本文将为你揭开DataX的神秘面纱,带你彻底攻克MySQL到Hive的数据集成难关。

定义问题/阐述背景 (The “Why”)

在当今数据驱动的时代,企业的数据往往存储在多种多样的数据源中。关系型数据库(如MySQL)因其ACID特性,广泛用于在线事务处理(OLTP);而数据仓库(如Hive)则专为大规模数据存储和分析(OLAP)而设计。将业务数据从MySQL等OLTP数据库同步到Hive等OLAP数据仓库,是构建数据中台、支持业务决策的关键一步。

然而,跨数据源同步面临着诸多挑战:

  1. 数据格式不兼容:不同数据库的数据类型、字段约束存在差异。
  2. 同步效率低下:面对海量数据,如何快速、准确地完成同步是一大难题。
  3. 增量同步复杂:如何只同步新增或变化的数据,而非全量数据,以减少资源消耗。
  4. 操作繁琐易错:编写自定义脚本不仅耗时,还容易引入人为错误,且难以维护。
  5. 监控与告警缺失:同步任务是否成功?失败了如何及时知晓?

DataX正是为解决这些问题而生的一款开源数据同步工具。

亮明观点/文章目标 (The “What” & “How”)

本文将以“保姆级”的细致程度,带你从零开始,全面掌握使用DataX进行数据同步的技能。我们将以MySQL到Hive的全量同步和增量同步为核心案例,一步步演示从环境准备、配置编写、任务执行到结果验证、问题排查的完整流程。

读完本文后,你将能够:

  • 深刻理解DataX的核心架构与工作原理。
  • 独立完成DataX的安装与环境配置。
  • 熟练编写MySQL到Hive的数据同步作业JSON配置文件。
  • 掌握DataX任务的执行、监控与日志分析方法。
  • 解决DataX同步过程中可能遇到的常见问题。
  • 了解DataX的高级特性(如数据转换、多表同步)和最佳实践。

无论你是刚入行的数据工程师、需要处理数据同步的开发人员,还是对数据集成感兴趣的技术爱好者,这篇教程都将成为你手中的利器。让我们开始这段DataX探索之旅吧!


二、基础知识/背景铺垫 (Foundational Concepts)

在深入实战之前,让我们先打好基础,了解DataX以及相关技术的核心概念。这将帮助你更好地理解后续的配置和操作。

2.1 什么是DataX?

DataX 是阿里巴巴开源的一款异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。

2.1.1 DataX的设计理念

DataX采用了框架 + 插件的设计模式。将数据源读取和写入抽象为Reader/Writer插件,纳入到整个同步框架中。

  • Reader:数据采集模块,负责从源数据源读取数据。
  • Writer:数据写入模块,负责将数据写入到目标数据源。
  • Transformer:数据转换模块(DataX 3.0+ 支持),负责对采集的数据进行加工处理,如过滤、转换、脱敏等。
  • Framework:DataX框架,负责连接Reader和Writer,处理缓冲、流控、并发、数据转换、错误处理等核心技术问题。

这种设计使得DataX具有极强的扩展性,用户可以根据需要开发新的Reader或Writer插件。

2.1.2 DataX支持的数据源

DataX支持的数据源非常丰富,常见的包括:

  • 关系型数据库:MySQL, Oracle, SQL Server, PostgreSQL, DB2, 达梦, 金仓等。
  • 大数据生态:HDFS, Hive, HBase, Spark, ClickHouse, StarRocks, Doris等。
  • NoSQL:MongoDB, Redis。
  • 文件类型:TextFile, CSV, Excel, FTP, SFTP。
  • 搜索引擎:Elasticsearch。
  • 其他:ODPS, ADS, Kudu等。

具体可查阅DataX官方文档或其GitHub仓库获取最新支持列表。

2.1.3 DataX的优势
  • 开源免费:基于Apache License 2.0协议,无商业许可成本。
  • 功能强大:支持多种异构数据源,满足复杂的数据同步需求。
  • 性能优异:通过合理的并发控制和缓冲机制,可以达到较高的同步性能。
  • 配置简单:基于JSON的配置文件,易于编写和维护。
  • 健壮稳定:经过阿里巴巴内部多年的大规模使用验证,具有良好的容错和恢复机制。
  • 监控完善:提供详细的日志输出和统计信息,便于问题排查和性能调优。
2.1.4 DataX的局限性
  • 离线同步:主要用于离线批量数据同步,不适合实时性要求极高的场景(如毫秒级同步)。
  • 无内置调度:DataX本身只是一个同步工具,不包含任务调度功能,需要配合外部调度系统(如Azkaban, Airflow, XXL-Job等)使用。

2.2 MySQL与Hive简介

在本教程中,我们重点关注MySQL到Hive的同步。让我们简要回顾一下这两个数据源:

2.2.1 MySQL
  • 类型:关系型数据库管理系统 (RDBMS)。
  • 特点:支持ACID事务,基于行存储,适合OLTP(在线事务处理),查询响应快,支持复杂的SQL查询和索引。
  • 数据存储:数据以表的形式组织,存储在本地文件系统或特定的存储引擎(如InnoDB)中。
  • 在数据同步中的角色:通常作为源数据源 (Source),即数据的产生地。
2.2.2 Hive
  • 类型:基于Hadoop的数据仓库工具。
  • 特点:基于HDFS存储数据,使用类SQL的HQL进行查询,适合OLAP(在线分析处理),用于处理大规模数据集,查询延迟相对较高。
  • 数据存储:数据存储在HDFS上,通常以结构化或半结构化的文件形式(如TextFile, SequenceFile, ORC, Parquet)存储。表结构元数据保存在Metastore中。
  • 在数据同步中的角色:通常作为目标数据源 (Destination),即数据的汇总和分析平台。

2.3 MySQL到Hive数据同步的挑战与DataX的应对

将数据从MySQL同步到Hive并非易事,主要面临以下挑战:

  1. 数据类型映射:MySQL的数据类型(如VARCHAR, INT, DATETIME)与Hive的数据类型(如STRING, INT, TIMESTAMP)需要正确映射。
    • DataX应对:MySQLReader和HiveWriter插件内部已处理了大部分常用数据类型的映射关系。
  2. 数据格式转换:Hive通常期望特定格式的输入文件(如逗号分隔、制表符分隔)。
    • DataX应对:HiveWriter可以配置输出文件的格式(如textfile, orc)、字段分隔符、行分隔符等。
  3. 大批量数据处理:当MySQL表数据量很大时,同步效率和资源占用是问题。
    • DataX应对:支持并发读取(通过splitPk配置)和批量写入,Framework层有流控机制。
  4. 增量数据同步:只同步新增或变化的数据,避免全表扫描的开销。
    • DataX应对:可以通过配置Reader的查询SQL,使用WHERE条件过滤出增量数据(如基于时间戳、自增ID)。
  5. 元数据不一致:MySQL表结构变更后,Hive表结构如何同步更新。
    • DataX应对:DataX本身不直接处理元数据同步,需要用户手动维护Hive表结构,或结合其他工具(如Sqoop的--create-hive-table,但DataX无此参数,需手动)。

DataX通过其插件化架构和丰富的配置选项,为这些挑战提供了切实可行的解决方案。


三、核心内容/实战演练 (The Core - “How-To”)

好了,理论知识铺垫得差不多了,现在让我们进入最激动人心的实战环节!本章节将手把手教你完成从环境准备到成功将MySQL数据同步到Hive的全过程。

3.1 环境准备

在开始之前,请确保你的环境满足以下要求。我们假设你使用的是Linux操作系统(如CentOS/Ubuntu)。

3.1.1 前置软件要求
  1. JDK

    • 版本:DataX基于Java开发,需要JDK 1.8或以上版本。
    • 验证java -versionjavac -version
    • 安装
      # CentOS示例
      sudo yum install java-1.8.0-openjdk-devel
      # Ubuntu示例
      sudo apt-get install openjdk-8-jdk
      # 配置环境变量 JAVA_HOME (根据实际安装路径调整)
      export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk
      export PATH=$JAVA_HOME/bin:$PATH
      
  2. MySQL

    • 版本:5.x 或 8.x 均可。确保服务已启动并可访问。
    • 权限:需要一个具有SELECT权限的用户,用于DataX读取数据。
    • 验证mysql -u username -p
  3. Hadoop & Hive

    • Hadoop:Hive依赖Hadoop,需确保HDFS和YARN服务正常运行。Hadoop 2.x或3.x版本。
    • Hive:确保Hive服务(特别是Metastore和HiveServer2,如果需要)已启动。Hive 1.x, 2.x, 3.x版本均可,但推荐使用较新版本以获得更好的兼容性和功能。
    • 权限:DataX运行的用户需要有向HDFS写入数据(Hive表对应路径)以及向Hive Metastore写入元数据的权限。
    • 验证HDFShdfs dfs -ls /
    • 验证Hivehive -e "show databases;" 或通过Beeline连接。

    注意:Hadoop和Hive的安装配置本身比较复杂,超出了本文范围。如果你是在本地测试,可以考虑使用伪分布式环境Docker容器(如big-data-europe/hive)来快速搭建。假设你已经拥有一个可用的Hive环境。

3.1.2 下载与安装DataX
  1. 下载DataX

    • 官方GitHub Release页面:https://github.com/alibaba/DataX/releases
    • 选择一个稳定版本,例如 datax.tar.gz (通常是最新的稳定版)。
    • 也可以通过wget直接下载(请替换为最新的版本链接):
      wget https://github.com/alibaba/DataX/releases/download/{version}/datax.tar.gz
      # 例如:wget https://github.com/alibaba/DataX/releases/download/v3.0/datax.tar.gz
      
    • 如果GitHub下载慢,可以尝试国内镜像或码云仓库。
  2. 解压DataX

    mkdir -p /opt/module
    tar -zxvf datax.tar.gz -C /opt/module/
    
  3. 验证安装

    cd /opt/module/datax/
    python bin/datax.py --version
    # 或
    python3 bin/datax.py --version  (如果系统默认是Python3)
    

    如果看到类似如下输出,说明安装成功:

    DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
    Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
    

    注意:DataX的启动脚本是用Python编写的,因此需要Python环境(Python 2.6.x 或 2.7.x,注意:Python 3.x可能存在兼容性问题! 如果系统只有Python3,可以尝试修改脚本或使用虚拟环境安装Python2.7)。

    • Python版本问题解决
      • 方法一:安装Python 2.7,并确保python命令指向Python 2.7。
      • 方法二(推荐,若DataX版本支持):查看DataX的bin/datax.py脚本头部的shebang是否为#!/usr/bin/env python,尝试修改为#!/usr/bin/env python3,并测试是否能正常运行。部分新版本的DataX已对Python3做了兼容。如果遇到语法错误,则可能需要使用Python2。
3.1.3 目录结构说明

解压后的DataX目录结构如下,了解这些有助于后续操作:

datax/
├── bin/               # 可执行脚本目录 (datax.py)
├── conf/              # 全局配置目录 (core.json, logback.xml等)
├── job/               # 存放作业配置文件的示例目录 (可自定义)
│   └── job.json       # 作业配置示例
├── lib/               # DataX依赖的jar包
├── log/               # 日志输出目录
├── plugin/            # 核心插件目录 (Reader, Writer, Transformer)
│   ├── reader/        # Reader插件 (如mysqlreader, oraclereader)
│   ├── writer/        # Writer插件 (如hivewriter, hdfswriter)
│   └── transformer/   # Transformer插件 (数据转换)
├── script/            # 一些辅助脚本
└── tmp/               # 临时文件目录

3.2 准备测试数据

在MySQL中创建测试数据库、表,并插入一些示例数据。

3.2.1 登录MySQL
mysql -u root -p
# 输入密码
3.2.2 创建测试数据库和表
-- 创建数据库
CREATE DATABASE IF NOT EXISTS datax_test;

-- 使用数据库
USE datax_test;

-- 创建测试表 (用户表)
CREATE TABLE IF NOT EXISTS `user_info` (
  `id` INT(11) NOT NULL AUTO_INCREMENT COMMENT '用户ID',
  `username` VARCHAR(50) NOT NULL COMMENT '用户名',
  `age` TINYINT(3) UNSIGNED NULL DEFAULT NULL COMMENT '年龄',
  `gender` ENUM('M', 'F', 'U') NOT NULL DEFAULT 'U' COMMENT '性别: M-男, F-女, U-未知',
  `email` VARCHAR(100) NULL DEFAULT NULL COMMENT '邮箱',
  `create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  KEY `idx_create_time` (`create_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户信息表';
3.2.3 插入测试数据
INSERT INTO `user_info` (`username`, `age`, `gender`, `email`, `create_time`, `update_time`) VALUES
('zhangsan', 25, 'M', 'zhangsan@example.com', '2023-01-01 10:00:00', '2023-01-01 10:00:00'),
('lisi', 30, 'M', 'lisi@example.com', '2023-01-02 11:00:00', '2023-01-02 11:00:00'),
('wangwu', 28, 'F', 'wangwu@example.com', '2023-01-03 14:30:00', '2023-01-03 14:30:00'),
('zhaoliu', NULL, 'U', NULL, '2023-01-04 09:15:00', '2023-01-04 09:15:00');
3.2.4 在Hive中创建目标表

登录Hive,创建与MySQL表结构对应的目标表。注意数据类型的映射和文件格式的选择。

hive
-- 创建数据库 (如果需要)
CREATE DATABASE IF NOT EXISTS datax_test;

-- 使用数据库
USE datax_test;

-- 创建目标表 (Hive)
-- 注意数据类型映射:
-- MySQL VARCHAR -> Hive STRING
-- MySQL INT -> Hive INT
-- MySQL TINYINT -> Hive TINYINT 或 INT
-- MySQL ENUM -> Hive STRING (Hive没有ENUM类型)
-- MySQL DATETIME -> Hive TIMESTAMP
CREATE EXTERNAL TABLE IF NOT EXISTS `user_info` (
  `id` INT COMMENT '用户ID',
  `username` STRING COMMENT '用户名',
  `age` TINYINT COMMENT '年龄',
  `gender` STRING COMMENT '性别: M-男, F-女, U-未知',
  `email` STRING COMMENT '邮箱',
  `create_time` TIMESTAMP COMMENT '创建时间',
  `update_time` TIMESTAMP COMMENT '更新时间'
)
-- 选择文件格式,这里先用简单的TEXTFILE
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'  -- 字段分隔符,与DataX配置一致
LINES TERMINATED BY '\n'   -- 行分隔符
STORED AS TEXTFILE        -- 文件存储格式
LOCATION '/user/hive/warehouse/datax_test.db/user_info'  -- HDFS存储路径 (可选,Hive会有默认路径)
TBLPROPERTIES (
  'comment' = 'DataX同步测试用户表'
);

数据类型映射参考

MySQL 数据类型 Hive 推荐数据类型 说明
INT, INTEGER INT
TINYINT TINYINT / INT Hive的TINYINT是1字节有符号整数
SMALLINT SMALLINT
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
DECIMAL(p,s) DECIMAL(p,s) Hive 0.11.0+ 支持
VARCHAR(n), CHAR(n) STRING
DATETIME, TIMESTAMP TIMESTAMP Hive的TIMESTAMP精度到纳秒,MySQL到秒或毫秒
DATE DATE Hive 1.2.0+ 支持DATE类型
BOOLEAN BOOLEAN
ENUM, SET STRING Hive无对应类型,存储为字符串
BLOB, TEXT STRING / 特殊处理 通常存储为STRING,或使用二进制格式

3.3 编写DataX同步作业配置文件 (JSON)

DataX作业是通过一个JSON格式的配置文件来定义的。这个配置文件描述了从哪个数据源读取数据(Reader),如何处理数据(Transformer,可选),以及向哪个数据源写入数据(Writer)。

3.3.1 配置文件结构概览

一个典型的DataX作业配置文件结构如下:

{
  "job": {
    "content": [
      {
        "reader": {     // 源数据源读取配置 (MySQLReader)
          "name": "mysqlreader",
          "parameter": {
            // MySQLReader具体参数
          }
        },
        "writer": {     // 目标数据源写入配置 (HiveWriter)
          "name": "hivewriter",
          "parameter": {
            // HiveWriter具体参数
          }
        }
        // ,"transformer": []  // 可选,数据转换配置
      }
    ],
    "setting": {       // 作业运行时的全局设置
      "speed": {       // 速度控制
        // 速度控制参数
      },
      "errorLimit": {  // 错误限制
        // 错误限制参数
      }
    }
  }
}
3.3.2 配置MySQLReader

MySQLReader用于从MySQL数据库读取数据。其主要参数如下:

"reader": {
  "name": "mysqlreader",
  "parameter": {
    "username": "你的MySQL用户名",
    "password": "你的MySQL密码",
    "column": [      // 要读取的字段列表,*表示所有字段,但不推荐,建议显式列出
      "id",
      "username",
      "age",
      "gender",
      "email",
      "create_time",
      "update_time"
    ],
    "connection": [
      {
        "table": [    // 要同步的表名列表
          "user_info"
        ],
        "jdbcUrl": [  // MySQL JDBC连接串列表,可以多个
          "jdbc:mysql://你的MySQL主机IP:3306/datax_test?useUnicode=true&characterEncoding=utf8&useSSL=false"
        ]
      }
    ],
    "where": "",     // 筛选条件,可选,用于增量同步
    "querySql": "",  // 自定义查询SQL,可选,如果配置了这个,column和table参数会被忽略
    "splitPk": ""    // 用于分片的主键,可选,用于并发读取提高性能
  }
}

关键参数解释

  • username/password: MySQL登录凭证。
  • column: 指定要读取的列。推荐显式列出所有列,而不是用*,这样更清晰且避免不必要的列被同步。
  • connection: 包含tablejdbcUrl。可以配置多个jdbcUrl实现多库表读取,但通常一个就够了。
  • jdbcUrl: MySQL的JDBC连接字符串。需要确保驱动正确(DataX已内置MySQL驱动)。常见参数:
    • useUnicode=true&characterEncoding=utf8: 指定字符集,避免中文乱码。
    • useSSL=false: 如果MySQL服务未配置SSL,设为false。
    • serverTimezone=Asia/Shanghai: 指定时区,避免时间转换问题。
  • where: 附加的查询条件,例如 where create_time > '2023-01-01'不要加WHERE关键字,DataX会自动加上。
  • querySql: 自定义SQL。当你需要复杂查询(如JOIN、聚合)时使用。例如: "querySql": "select id, name from user where age > 18;"。使用querySql时,tablecolumn参数会被忽略。
  • splitPk: 用于数据分片的字段,通常是主键ID。DataX会根据这个字段将数据分成多个分片,由多个线程并发读取,提高效率。例如 "splitPk": "id"。对于非整形主键或不适合分片的场景,可以不配置,此时为单线程读取。
3.3.3 配置HiveWriter

HiveWriter用于将数据写入Hive表。其主要参数如下:

"writer": {
  "name": "hivewriter",
  "parameter": {
    "defaultFS": "hdfs://你的HDFS namenode地址:9000",  // HDFS地址
    "fileType": "textfile",  // 文件类型:textfile, orc, parquet
    "path": "/user/hive/warehouse/datax_test.db/user_info",  // HDFS目标路径 (通常是Hive表的LOCATION)
    "fileName": "user_info",  // 输出文件名前缀
    "column": [  // 目标表的字段列表,顺序和类型需与Reader输出一致
      {
        "name": "id",
        "type": "int"
      },
      {
        "name": "username",
        "type": "string"
      },
      {
        "name": "age",
        "type": "tinyint"
      },
      {
        "name": "gender",
        "type": "string"
      },
      {
        "name": "email",
        "type": "string"
      },
      {
        "name": "create_time",
        "type": "timestamp"
      },
      {
        "name": "update_time",
        "type": "timestamp"
      }
    ],
    "writeMode": "append",  // 写入模式:append, overwrite, nonConflict
    "fieldDelimiter": "\t",  // 字段分隔符,需与Hive表定义一致
    "compress": "none",  // 压缩方式:none, gzip, lzo, snappy (依文件类型支持而定)
    "hiveMetaStore": "thrift://你的Hive Metastore地址:9083",  // Hive Metastore服务地址
    "hiveDatabase": "datax_test",  // Hive数据库名
    "hiveTable": "user_info",      // Hive表名
    "haveKerberos": false,  // 是否启用Kerberos认证
    // Kerberos相关配置 (如果haveKerberos为true)
    // "kerberosKeytabFilePath": "",
    // "kerberosPrincipal": ""
  }
}

关键参数解释

  • defaultFS: HDFS的NameNode地址,格式为 hdfs://host:port。例如伪分布式可能是 hdfs://localhost:9000,集群环境则是你的NameNode服务地址。
  • fileType: 输出到HDFS的文件类型。可选:textfile, orc, parquet
    • textfile: 普通文本文件,易于查看,压缩率低,性能差。适合测试或简单场景。
    • orc: 高效的列式存储格式,压缩率高,性能好。推荐生产环境使用。
    • parquet: 另一种高效的列式存储格式,与ORC类似,各有优劣。
  • path: 数据文件写入到HDFS的具体路径。通常就是Hive表的LOCATION属性值。可以通过show create table user_info;在Hive中查看。
  • fileName: 输出文件的前缀。DataX会在后面加上随机后缀和分区信息(如果分区表)。
  • column: 目标Hive表的字段定义。name是字段名,type是Hive数据类型。字段的顺序和数量必须与Reader输出的字段一一对应
  • writeMode: 写入模式:
    • append: 追加模式,将数据追加到目标表。
    • overwrite: 覆盖模式,先删除目标表中已有的数据,再写入新数据。谨慎使用
    • nonConflict: 非冲突模式,如果目标路径已存在,则报错。
  • fieldDelimiter: 字段分隔符。对于textfile类型,需要与Hive表定义中的FIELDS TERMINATED BY一致。例如\t (制表符), , (逗号)。
  • compress: 文件压缩方式。例如gzip。ORC和Parquet文件本身内置压缩,此参数可能不生效或需配合特定参数。
  • hiveMetaStore: Hive Metastore的Thrift服务地址。通常是 thrift://metastore-host:9083。HiveWriter需要连接Metastore来获取表的元数据信息(即使指定了path,也推荐配置此项以确保元数据正确)。可以通过查看Hive配置文件hive-site.xml中的hive.metastore.uris获取。
  • hiveDatabase / hiveTable: 目标Hive表的数据库名和表名。
3.3.4 配置Job Setting (全局设置)

setting部分用于配置作业的全局参数,如速度控制、错误限制等。

"setting": {
  "speed": {
    "channel": 1,  // 并发通道数 (并发数),默认为1。可以根据服务器性能和数据源性能调整。
    "byte": 1048576  // 全局每秒字节数限制,默认为0,表示不限制。1048576 B = 1MB
  },
  "errorLimit": {
    "record": 10,  // 允许的脏数据记录数上限,默认为0。超过则任务失败。
    "percentage": 0.02  // 允许的脏数据百分比上限,默认为0.0。
  }
}

参数解释

  • speed.channel: 并发通道数,即启动多少个线程来执行同步任务。适当增加可以提高同步速度,但受限于源端数据库性能、网络带宽和目标端写入性能。
  • speed.byte: 限流,控制每秒从Reader读取并写到Writer的字节数总和。防止同步任务对源数据库或网络造成过大压力。
  • errorLimit.record: 允许的最大脏数据记录数。例如,源数据中某个字段格式错误导致无法正确解析或写入,这类记录会被标记为脏数据。超过此数量,任务失败。
  • errorLimit.percentage: 允许的最大脏数据百分比。相对于总记录数。
3.3.5 完整的配置文件示例 (全量同步)

综合以上配置,我们为MySQL到Hive的全量同步创建一个完整的JSON配置文件 mysql2hive_user_info.json

在DataX的安装目录下创建一个job文件夹(如果没有),并在其中创建该配置文件:

cd /opt/module/datax/
mkdir -p job/mysql2hive
vi job/mysql2hive/mysql2hive_user_info.json

粘贴以下内容,并根据你的实际环境修改 所有占位符信息 (如<your-mysql-host>, <your-hdfs-namenode>等):

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "root",                     // MySQL用户名
            "password": "your_mysql_password",      // MySQL密码
            "column": [
              "id",
              "username",
              "age",
              "gender",
              "email",
              "create_time",
              "update_time"
            ],
            "connection": [
              {
                "table": [
                  "user_info"
                ],
                "jdbcUrl": [
                  "jdbc:mysql://<your-mysql-host>:3306/datax_test?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai"  // 替换为你的MySQL地址和端口
                ]
              }
            ],
            "splitPk": "id"  // 可选,用于并发读取,这里以id为分片键
          }
        },
        "writer": {
          "name": "hivewriter",
          "parameter": {
            "defaultFS": "hdfs://<your-hdfs-namenode>:9000",  // 替换为你的HDFS namenode地址和端口
            "fileType": "textfile",
            "path": "/user/hive/warehouse/datax_test.db/user_info",  // 替换为你的Hive表LOCATION路径
            "fileName": "user_info",
            "column": [
              {
                "name": "id",
                "type": "int"
              },
              {
                "name": "username",
                "type": "string"
              },
              {
                "name": "age",
                "type": "tinyint"
              },
              {
                "name": "gender",
                "type": "string"
              },
              {
                "name": "email",
                "type": "string"
              },
              {
                "name": "create_time",
                "type": "timestamp"
              },
              {
                "name": "update_time",
                "type": "timestamp"
              }
            ],
            "writeMode": "append",
            "fieldDelimiter": "\t",
            "compress": "none",
            "hiveMetaStore": "thrift://<your-hive-metastore-host>:9083",  // 替换为你的Hive Metastore地址和端口
            "hiveDatabase": "datax_test",
            "hiveTable": "user_info",
            "haveKerberos": false
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 2,  // 开启2个并发通道
        "byte": 0
      },
      "errorLimit": {
        "record": 10,
        "percentage": 0.02
      }
    }
  }
}

重要检查项

  1. MySQL连接信息:用户名、密码、主机、端口、数据库名是否正确。
  2. HDFS defaultFS:地址和端口是否正确。可以通过hdfs getconf -confKey fs.defaultFS命令获取。
  3. Hive Metastore地址:是否正确。可以在Hive的hive-site.xml中查找hive.metastore.uris
  4. Hive表路径 (path):是否与Hive表的LOCATION一致。
  5. column顺序和类型:Reader的column输出顺序是否与Writer的column定义顺序完全一致?数据类型是否兼容?
  6. fieldDelimiter:是否与Hive表的FIELDS TERMINATED BY一致?

3.4 执行DataX同步任务

配置文件准备就绪后,就可以执行DataX同步任务了。

3.4.1 执行命令

在DataX安装目录下,执行以下命令:

cd /opt/module/datax/
python bin/datax.py job/mysql2hive/mysql2hive_user_info.json

如果你的系统默认是Python3,且DataX脚本兼容Python3:

python3 bin/datax.py job/mysql2hive/mysql2hive_user_info.json
3.4.2 监控任务执行过程

执行命令后,DataX会输出详细的日志信息。重点关注以下几个阶段:

  1. 任务初始化:加载配置文件,初始化Reader和Writer插件。
  2. 任务启动:显示作业配置摘要,如通道数、Reader/Writer类型。
  3. 数据同步:实时显示同步进度,如“已读记录数”、“已写记录数”、“速度(条/秒)”、“剩余时间”等。
    2023-10-27 15:30:00.123 [job-0] INFO  JobContainer - 任务启动时刻                    : 2023-10-27 15:29:50
    2023-10-27 15:30:00.124 [job-0] INFO  JobContainer - 任务结束时刻                    : 2023-10-27 15:30:00
    2023-10-27 15:30:00.124 [job-0] INFO  JobContainer - 任务总计耗时                    : 10s
    2023-10-27 15:30:00.124 [job-0] INFO  JobContainer - 任务平均流量                    : 1024 KB/s
    2023-10-27 15:30:00.125 [job-0] INFO  JobContainer - 记录写入速度                    : 1000 条/秒
    2023-10-27 15:30:00.125 [job-0] INFO  JobContainer - 读出记录总数                    : 10000
    2023-10-27 15:30:00.125 [job-0] INFO  JobContainer - 读写失败总数                    : 0
    
  4. 任务完成:如果看到 任务退出, 退出码:0,表示任务成功完成。
3.4.3 常见启动错误及解决
  • Python版本错误

    SyntaxError: invalid syntax
    

    解决:确保使用Python 2.7.x 或尝试兼容Python3的DataX版本/脚本。

  • MySQL连接失败

    com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: Could not create connection to database server.
    

    解决:检查MySQL主机、端口是否可达,用户名密码是否正确,JDBC URL参数是否有误,MySQL服务是否正常。

  • HDFS连接失败

    java.net.ConnectException: Call From host/ip to namenode-host:9000 failed on connection exception: java.net.ConnectException: Connection refused;
    

    解决:检查HDFS NameNode地址和端口是否正确,HDFS服务是否正常,网络是否通畅。

  • Hive Metastore连接失败

    org.apache.thrift.transport.TTransportException: Could not connect to metastore-host:9083
    

    解决:检查Hive Metastore服务是否启动,地址端口是否正确。

  • 配置文件JSON格式错误

    com.alibaba.fastjson.JSONException: syntax error, expect {, actual EOF, pos 0
    

    解决:仔细检查JSON配置文件的语法,确保括号匹配、逗号正确、没有多余的逗号等。可以使用在线JSON校验工具(如https://jsonlint.com/)检查。

  • Reader和Writer column数量不匹配

    java.lang.IllegalArgumentException: The column number of reader is not equal to writer. Reader column count: 6, Writer column count:7
    

    解决:确保Reader的column列表和Writer的column列表的字段数量完全一致,顺序也一致。

3.5 验证同步结果

任务成功执行后,我们需要验证数据是否正确同步到Hive表中。

3.5.1 查看HDFS上的文件

DataX会将数据文件写入到配置的HDFS path下。

hdfs dfs -ls /user/hive/warehouse/datax_test.db/user_info/

你应该能看到类似 user_info_********.txt 的文件。查看文件内容:

hdfs dfs -cat /user/hive/warehouse/datax_test.db/user_info/user_info_********.txt | head -n 10

检查数据格式是否正确,分隔符是否符合预期。

3.5.2 查询Hive表数据

登录Hive CLI或Beeline,查询目标表:

hive
use datax_test;
select * from user_info;

应该能看到从MySQL同步过来的4条测试数据。

OK
1       zhangsan        25      M       zhangsan@example.com     2023-01-01 10:00:00     2023-01-01 10:00:00
2       lisi    30      M       lisi@example.com 2023-01-02 11:00:00     2023-01-02 11:00:00
3       wangwu  28      F       wangwu@example.com       2023-01-03 14:30:00     2023-01-03 14:30:00
4       zhaoliu NULL    U       NULL    2023-01-04 09:15:00     2023-01-04 09:15:00
Time taken: 0.52 seconds, Fetched: 4 row(s)
3.5.3 数据对比 (条数、关键值)
  • 对比记录数
    MySQL: select count(*) from datax_test.user_info;
    Hive: select count(*) from datax_test.user_info;
    确保两边数量一致。

  • 对比关键字段值:随机抽取几条记录,对比关键字段的值是否一致。

3.6 增量数据同步实战

全量同步适用于一次性初始化数据,但在实际生产中,更常见的是增量同步,即只同步自上次同步以来新增或变化的数据。DataX本身不提供内置的增量机制,但可以通过灵活配置来实现。

最常用的增量同步策略有两种:基于时间戳基于自增ID

3.6.1 基于时间戳的增量同步

假设MySQL表中有一个 update_time 字段(如我们的user_info表),记录了数据最后更新的时间。我们可以通过此时间戳来筛选增量数据。

实现思路

  1. 记录上次同步的时间戳 last_sync_time
  2. 本次同步时,只读取 update_time > last_sync_time 的数据。
  3. 同步完成后,更新 last_sync_time 为本次同步的结束时间或最大 update_time

修改DataX配置文件

在MySQLReader的parameter中添加where条件:

"reader": {
  "name": "mysqlreader",
  "parameter": {
    // ... 其他配置不变 ...
    "where": "update_time > '2023-01-04 09:15:00'",  // 上次同步的最大时间戳
    // ...
  }
}

测试步骤

  1. 在MySQL中插入新数据
    INSERT INTO `user_info` (`username`, `age`, `gender`, `email`, `create_time`, `update_time`) VALUES
    ('sunqi', 22, 'M', 'sunqi@example.com', '2023-10-27 16:00:00', '2023-10-27 16:
    
Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐