sqoop导入数据、全库导入和创建job以及实现定时增量导入的示例分析

这篇文章将为大家详细讲解有关sqoop导入数据、全库导入和创建job以及实现定时增量导入的示例分析,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

网站建设哪家好,找创新互联公司!专注于网页设计、网站建设、微信开发、微信小程序开发、集团企业网站建设等服务项目。为回馈新老客户创新互联还提供了行唐免费建站欢迎大家使用!

先从功能简单的开始介绍,

  1. sqoop导入单个表到hive:

sqoop import \
--connect jdbc:MySQL://192.168.49.214:3306/mysqlcdc 
--username root \
--password 123456 \
--table data \
--hive-import \
--fields-terminated-by '\t' \
-m 1

这是最简单的将mysql表导入hive中,没有指定hive表名,默认在default库,表名和mysql表同名。sqoop也可以通过sql语句来从多表中选择自己想要的数据,比如:

sqoop import \
--connect 'jdbc:sqlserver://192.168.49.180:1433;database=rcscounty_qn' \
--username sa \
--password 123456! \
--fields-terminated-by '\t' \
--hive-import \
--hive-table rcs.user_orgname \
--m 1 \
--query 'SELECT u.USER_ID as id, u.USER_NAME as name, u.ORG_ID as orgId, o.ORG_NAME as orgName FROM USER u ,  ORG o where o.ORG_ID = u.ORG_ID and $CONDITIONS'

通过sqoop导入sqlserver数据库的数据,通过query查询出自己想要的数据,将这些数据导入hive中。 $CONDITIONS 是不能缺少的,有查询条件的时候查询条件和and连接,没有查询条件的时候放在where中就可以了。

通过sqoop导入数据到hive中,有以下一些特点:

1)指定的hive表可以存在也可以不存在,不存在则会自动创建,表存在假如没有数据则会将数据导入,数据格式不对会报错,加入--hive-overwrite会将hive表进行重写。

2)通过sqoop创建的hive表只能是内部表,即使通过--target-dir指定了数据在hdfs中存储的路径,实际上在hdfs中只会创建文件夹,数据默认是放在/user/hive/warehouse/里面。

3)同一张hive表通过--fields-terminated-by指定的分隔符要统一,否则后导入的数据会挤到一列,无法分开。

2.sqoop全库导入

sqoop import-all-tables "-Dorg.apache.sqoop.splitter.allow_text_splitter=true" \
--connect 'jdbc:sqlserver://192.168.49.180:1433;database=rcscounty_qn' \
--username sa \
--password 123456 \
--fields-terminated-by '\t' \
--hive-database ods_quannan \
-m 10 \
--create-hive-table \
--hive-import \
--hive-overwrite \
--autoreset-to-one-mapper

将一个数据库内的表都导入一个hive库中,假如这个库中所有的表都有主键,则不需要最后一行的--autoreset-to-one-mapper。

假如需要将每个hive表进行一定规律的改名,比如以前的表名是table,希望导入的表名叫ods_table,是无法通过sqoop来实现的,需要自己写脚本来导入。

我在执行全库导入时,全库为133张表,实际导入为80张表。可能是我参数哪里有问题,多次执行都是这样。所以建议导入之后检查一下数量是否正确。

3.sqoop增量导入

由于hive表没有主键,所以hive表无法实现update,只能将新插入的数据添加进来,也就是增量导入。

增量导入有两种方式,一种是append,一种是incremental lastmodified。

增量导入只能导入到hdfs中,不能导入到hive中,所以语句中不要有--hive import。

append方式:

sqoop import \
--connect 'jdbc:mysql://192.168.49.201:3307/blade?serverTimezone=UTC&zeroDateTimeBehavior=CONVERT_TO_NULL' \
--username root \
--password 123456 \
--table blade_blog \
--target-dir '/user/hive/warehouse/ods.db/blade_blog' \
--incremental append \
--check-column id \
--last-value 3 \
-m 1

由于修改的是hdfs数据,所以需要用target-dir指定hdfs路径。没有加时区可能会报错Establishing SSL connection without server's identity verification is not recommended.但是加了时区,在传递时间类型的数据时,假如设置不正确,可能会将hive中得到的数据比mysql中的数据快/慢。在设置的时候要先查询自己数据库的时区设置,一般mysql默认时区是UTC。

&zeroDateTimeBehavior=CONVERT_TO_NULL参数和时区原因一致,不加的话无法连接到mysql数据库。假如不加就能连到mysql数据库的话,不加也可以。

指定增量方式为append,检查列为id,设定值为3,所以id比3大(不含等于)的数据都会被导入。不会合并重复数据,所以如果你连续执行两遍,会看到两个id为4的数据。

检查列不能是字符,必须是数字或者是时间。append方式官方推荐用数字,时间建议用lastmodified方式导入。

lastmodified方式又分两种增量导入方式,一种是不合并重复数据(append),一种会合并重复数据(merge-key) ,例子如下

append方式

sqoop import --connect 'jdbc:mysql://192.168.49.214:3306/mysqlcdc?serverTimezone=UTC&zeroDateTimeBehavior=CONVERT_TO_NULL' \
--username root \
--password 123456 \
--table data \
--target-dir '/user/hive/warehouse/data' \
--check-column last_mod \
--incremental lastmodified \
--last-value '2019-08-30 16:49:12' \
--m 1 \
--append

last_mod列所有时间大于等于2019-08-30 16:49:12的数据都会被导入。

merge-key方式:

sqoop import --connect 'jdbc:mysql://192.168.49.214:3306/mysqlcdc?serverTimezone=CST&zeroDateTimeBehavior=CONVERT_TO_NULL' \
--username root \
--password 123456 \
--table data \
--target-dir '/user/hive/warehouse/data' \
--check-column last_mod \
--incremental lastmodified  \
--fields-terminated-by ',' \
--last-value '2019-08-28 17:31:58' \
--m 1 \
--merge-key id

指定merge-key为id,hive表中所有id重复的数据都会合并,无论是否是本次增量导入添加的。

要注意一点,导入的数据实际时间范围是你指定的last-value到执行这个sqoop语句,比如你指定了last-value为2019-08-28 17:31:58,执行这个sqoop语句的时间是2021-1-8 15:00:00,但是数据库里有个数据时间是2022-2-4 12:31:21,这个数据是不会被导入进来的。在打印的日志里面能够看到:

sqoop导入数据、全库导入和创建job以及实现定时增量导入的示例分析

假如执行没有报错,重复数据也合并了,但是数据没有更新也没有新导入,建议检查一下hdfs文件路径是否正确。

4.将增量导入创建为job,并建立定时任务

sqoop可以将一些sqoop操作保存下来作为job,方便以后执行。之后创建定时任务,来达到定时增量导入的目的。

创建sqoop job:

sqoop job  \
--create one.more.time \
-- import \
--connect 'jdbc:mysql://192.168.49.101:3307/maybe?serverTimezone=UTC&zeroDateTimeBehavior=CONVERT_TO_NULL' \
--username root \
--password 123456 \
--table blade_blog \
--target-dir '/user/hive/warehouse/ods.db/b_blog' \
--fields-terminated-by '\t' \
--m 1 \
--check-column update_time \
--incremental lastmodified  \
--last-value '2020-01-16 15:34:01' \
--merge-key id

这样就创建了名为one.more.time的job了。

查看job:

sqoop job --list

通过job来执行定时增量导入,第一次执行的last-value值为你指定的值,之后运行会记录你执行这个job的时间,来作为下次last-value的参数,实现动态配置last-value,避免重复导入。

执行job:

sqoop job --exec one.more.time

创建定时任务:

先检查是否安装了crontab

rpm -qa | grep crontab

没有的话安装crontab,centos为yum install crontabs。

编写一个shell脚本,来执行job。

为当前用户创建定时任务:

crontab -e

进入编辑

40 08 * * 1-5 ls /home/software/sqoop-script/maybe.sh

表示周一至周五,每天8:40执行maybe.sh。更多的crontab时间编写规范请看  Linux基础之定时任务。

这样就实现定时增量同步了。

关于sqoop导入数据、全库导入和创建job以及实现定时增量导入的示例分析就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。


分享题目:sqoop导入数据、全库导入和创建job以及实现定时增量导入的示例分析
网页URL:http://ybzwz.com/article/jdgdjh.html