开源ETL利器——Kettle的实战教程

作者: 李剑 分类: 其他技术 发布时间: 2018-12-18 09:37

1. Kettle的简单介绍

Kettle(现名Data Integration)是一款使用Java编写的功能强大的ETL(Extract Transform and Load)工具,支持关系型数据库(PostgreSQLMySQLOracle等)、非关系型数据库(MongoDBElasticSearch等)以及文件之间的大规模数据迁移。

2. 常用组件

Kettle提供了极为丰富的组件库,下面列举的是它的一些常用组件,以及对组件的常用参数进行简单介绍,详细的参数说明可参考Kettle的帮助文档。

2.1 Table input

指定数据库表作为输入。

  • Step name: 步骤名称,Kettle的每一个组件即一个步骤,可为该步骤取一个别名
  • Connection: 指定数据库连接
  • SQL: 编写SQL,从该数据库表中筛选出符合条件的数据

2.2 Table output

指定数据库表作为输出

  • Step name: 步骤名称
  • Connection: 指定数据库连接
  • Target schema: 输出的数据库表模式
  • Target table: 指定输出的数据库表
  • Use batch update for inserts: 是否使用批处理进行插入
  • Database fields: 配置字段映射关系
    • Table field: 输出的数据库表字段
    • Stream field: 流字段(流入该组件的数据字段) 

2.3 Sort rows

按照某字段进行排序

  • Step name: 步骤名称
  • Fields:
    • Fieldname: 排序的字段名
    • Ascending: 排序方式 

2.4 Merge join

将不同来源数据进行融合,类似于SQL中的join,注意: 该组件接收的数据必须按照join字段按照相同规则进行排序,否则join后的数据会有丢失。

  • Step name: 步骤名称
  • First Step: 需要融合的一组数据
  • Second Step: 需要融合的另一组数据
  • Join Type: 融合的类型
  • Keys for 1st stepFirst Step中进行融合的字段
  • Keys for 2nd stepSecond Step中进行融合的字段 

2.5 Add sequence

读取指定的序列值

  • Step name: 步骤名称
  • Name of value: 序列值别名
  • Use DB to get sequence: 是否使用数据库序列
  • Connnection: 数据库连接
  • Schema name: 数据库模式名称
  • Sequence name: 序列名 

2.6 Modified Java Script Value

支持编写JavaScript脚本,用于实现必要的业务逻辑

  • Step name: 步骤名称
  • Java script functions: 提供了一些JavaScript函数
  • Java script: 脚本编辑窗口
  • Fields: 可将脚本中的定义的变量映射出去 

3. 在实际场景中的应用

在软件开发中,经常会遇到这样的场景: 新开发的系统即将替换老系统,而老系统庞大的数据需迁移到新系统中,但数据结构与新系统不完全兼容,下面通过一个简单的例子来介绍Kettle是如何处理这些老数据,完成数据迁移任务的。

3.1 老数据结构

  • company公司表: 
company
  • district区域表: 

该表存储了省市区,通过parent_id进行关联

  • company_district公司区域表: 
  • employee员工表: 
  • employee_company员工公司表: 

3.2 新数据结构

  • company公司表: 

对比老数据company表,新的company表中新增了districtcityprovince字段,他们可以从老数据company_district表和district表中取得;contact字段对应tel字段;addr对应address

  • employee员工表: 

对比老数据employee表,新的employee表中新增company_id字段且有外键约束;sex字段由原来的1、2变更为男、女

3.3 数据迁移

由于employee有外键关联company,因此先迁移company表数据,新的company表需新增old_id字段来保存老的company表的id,用于员工关联公司。

3.3.1 company

数据迁移前的分析:

  • 打开Kettle,点击File->new->Transformation,新建一个转换流程
  • 点击左侧Design``Tab页,将Table input组件拖拽至右侧转换流程窗口,在组件上右键点击edit,弹出该组件的编辑窗口,设置步骤名称、数据库连接和SQL语句,如下图所示: 
  • companycompany_district数据进行left joinjoin之前需按照join字段排序,将Sort rows组件拖拽至右侧转换流程窗口,并进行编辑,如下图所示: 
  • Merge Join组件拖拽至右侧,并进行编辑,如下图所示: 
  • companycompany_district``Merge Join的结果和district数据分别进行排序,同上面步骤
  • 将两者进行join,同上面步骤
  • 添加Add sequence组件,并进行编辑,如下图所示: 
  • 添加Table output组件,并进行编辑,如下图所示: 
  • 整体流程如下图所示: 
  • 点击启动按钮执行整个流程,直至所有步骤右上角出现绿色的箭头,company表便完成了迁移。

3.3.2 employee

数据迁移前的分析:

  • company的数据迁移类似,添加三个Table input组件,并进行编辑
  • 分别将employeeemployee_company按照join字段进行统一排序
  • 将排序的结果进行join
  • 分别将新的companyjoin之后的结果按照join字段进行统一排序
  • 将排序的结果进行join
  • 编写脚本,转换sex字段 
  • 读取新的employee序列值
  • 输出到新的employee表中
  • 整体流程如下图所示: 
  • 点击启动按钮执行整个流程,直至所有步骤右上角出现绿色的箭头,employee表便完成了迁移。

3.4 结果

  • company表 

  • employee表 

至此,便完成了老数据的迁移。

4. 遇到的问题

Kettle使用过程中会发现,当需要进行迁移的数据量较为庞大时(千万级),常常会出现内存溢出的问题,解决方法是将Kettle内存调高些: 打开spoon.sh文件,找到PENTAHO_DI_JAVA_OPTIONS="-Xms1024m -Xmx2048m -XX:MaxPermSize=256m",将其修改为PENTAHO_DI_JAVA_OPTIONS="-Xms16384m -Xmx32768m -XX:MaxPermSize=16384m",重启即可。


源码地址: github

发表评论

电子邮件地址不会被公开。 必填项已用*标注