DAG设定方法


关于任务调度的配置方法:

自定义参数方法

#以下为推荐方法, 直接在DAG中写参数,你还可以用更灵活的参数方法
#当前时间
report_time = datetime.datetime.now()
#当前时间,往后推一天, 你可以写成小时hour=1...
report_time = datetime.datetime.now()- datetime.timedelta(days=1)
#格式化时间为字符串
时间:(%Y-%m-%d %H:%M:%S):  2015-03-08 23:30:42
P_START_ZYM= report_time.strftime('%Y%m')
P_START_ZYMD= report_time.strftime('%Y%m%d')
#你也可以直接传一个参数
P_ZYM='201912'
P_ID=1230
#你也可以采用格式化整一个条件参数,格式f"xxxxxx",参数用{}包裹
PARAM_DMS_BP=f"where create_time >= '{P_START_ZYMD}' "
#你也可以直接给参数赋值
PARAM_DMS_ISB=PARAM_DMS_BP
#当你要刷数据的时候, 你可以不修改之前的参数, 下方重写直接覆盖
PARAM_DMS_BP=''

#如果你需要在测试环境中运行,请输入
dev=xxx  #询问管理员

#如果你要注释掉一些设定, 你可以使用 
'''你的内容'''

设定方法

--JOB设定方法,每一个JOB为一个节点
--填写驱动名称和要执行的文件名即可,注意文件名不需要带后缀如.sql, .py....
--文件名不可以用数字开头如000xxx, 不可以使用非法字符
#impala_sql  sql_filename
--也可以向sql_file传入参数, 如
#impala_sql  sql_filename P_START_ZYM,MSG 
--也可以为每个JOB增加备注
#impala_sql  sql_filename     --我是备注, 请--就可以


--DAG的设定方法
JOB写完以下, 在下方输入////, 4个以上/分隔为DAG设定
后可输入如: sql_filename1 >> sql_filename2 >> sql_filename3  为一个分支
sql_filename1 >> sql_filename4 为另一个分支

目前支持的驱动:

  • hive_sql 使用hive执行sql文件, 注意当此sql文件中存在refresh table语句时会自动调用impala来执行这条命令, 这样你无需再单独做一个refresh的job,直接放在hive的脚本中即可
  • impala_sql 使用impala执行sql文件
  • ktr 执行kettle的transform文件
  • kjb 执行kettle的job文件
  • datax 执行datax的抽取方式,采用封装好的配置
  • dataxx 执行datax的抽取方式,采用原生配置方式
  • hdfs 转移到大数据平台执行ktr, 将自动把生成的文本文件导入到hive的stage表, 表名需要与ktr名称一样
  • py 执行python脚本文件
  • link DAG依赖的实现方法
使用方法 #link DAG的名字  sleeptime[可选]  maxtime[可选]
    #link DAG_A  --如DAG_A Fail或进行中,此节点后的流程不会再执行
    #link DAG_A  60  3600  --如DAG_A还在进行中, 每60秒检查一次
       如有完成则继续任务, 重试达到3600秒后, 还未完成, 则报错
  • branch 实现基于自定义的条件function执行不同的分支, 使用方法: #branch jobname 函数名(具体方法参考下文) 注意: jobname不可和函数名同名
  • trigger 触放目标dag执行, 使用方法: #trigger 任务名称 目标DAG名称
  • validate 使用默认的数据库执行select sql来做数据校验, 如果select无返回值, 则认为校验失败, 并返回查询的结果,使用方法#validate sql_filename
  • dataset 是validate的增强功能, 可以实现对任意数据库的查询与校验. 需要smartchart的支持
使用方法: #dataset jobname id remark[可选] maillist[可选] 
remark参数说明: 
      不填写 - 默认无查询结果抛出错误
      info - 只打印查询结果,
      e1 - 如果有返回结果抛出错误, 
      e2 - 邮件通知查询结果(可以用于需要定时邮件收到数据的需求)
      e3 - 无查询结果时,15分钟会重试,2小时还无结果抛出错误,
           可用于轮询业务系统, 等待业务系统数据ready后再抽取

id参数: 可以是数据集ID也可以填数据集的名称, 
maillist: 格式 xxx@xxxx,xxx@xxx

你也可以采用高级自定义,使用
get_dataset(id):传入数据集ID, 返回值
{"result":"success","data":[[]]}
  • refresh_tableau 刷新tableau数据源,格式:#refresh_tableau jobname sourceID, 其中jobname为自定义一个英文名做为标识, sourceID是tableau对的需要刷新的数据源ID, 需要在头部参数中定义, 如 sourceID = 'xxxx-xxxx-xxx'
  • ktr_r 执行远程kettle的transform文件
  • kjb_r 执行远程kettle的job文件
  • gp_sql 执行greenplum的sql文件
  • sql_file 使用默认的驱动执行sql文件
  • sp 使用默认的数据库执行SP名

  • sqoop 使用sqoop抽取到大数据平台

填写以下内容到你的SQL文件, 比如命名为: mysqoop.sql
/*
conn =  zspl        -- 连接串, 找管理员要, 也可自定义, (必填)
sourceTable = tablename   -- 源系统表名,(必填)
columns =             -- 抽取原表的字段  a,b,c (可省略)
where =                -- 抽取时的条件  a>1 (可省略)
seq =                    -- 分隔符, 默认 \t (可省略)
query =                -- 查询语名, select * from xxx (可省略)
m =                     -- 启动map数,并行执行,需和split一起使用, 默认1(可省略)
split =                 -- 指定按字段拆分,字段为数值或时间格式, 不能有空(可省略)
otherParam =        -- 其它sqoop自定义参数 (可省略)
 */

load data inpath '/user/etl/tablename/*' overwrite into table xx.tablename;
refresh xxx.tablename

在测试过程可能会用到自定义的连接, 你可以将连接写在DAG设定中, 
SQOOP_PARA['zspl'] = '--connect jdbc:mysql://10.10.x.x:3306/xxx --username xxxxxxx --password xxxxxx'
#sqoop mysqoop
#sqoop mysqoop  P1,P2
  • sap 调用sap的rfc功能抽取数据到表, 流程如下:
1. 在上传设定中新建设定, 如名称为: mysapdata, 如果你是上传到hive, 建意起始行设为-1, 其它设为0
2. 编写mysapdata.json文件, 内容如下, 并上传到项目目录
    {
        "function": "abc", 
        "input": {
            "P1": "20201007", 
            "P2": "20201109"
        }, 
      "dataset": {},
      "table": "IT_RETAB",
      "columns": ["LGORT","TOTAL_WDJ","TOTAL_ZY"],
      "csvparam": {}
    }
    参数说明:
    function: 函数名
    input: 输入的固定参数
    dataset: 
    可实现动态参数,例如{"S_MATNR": 504}, 504是指数据集的ID,
    可以实现你从数据库中查询结果进行参数输入,需要与smartchart结合使用, 
    参数值样列
    "S_MATNR"=[{"表头1":xxx, "表头2":xxx ...}, {"表头1":xxx, "表头2":xxx ..}...]
    table: RFC返回的表名
    columns: 指定生成的列, 可留空自动识别
    csvparam: 设定生成的csv格式, 默认分隔符为|, 修改可以设定,{"sep": ","}

3. DAG设定中填写 #sap   mysapdata, 如你有input中的动态参数, 可以如下填写方式, 可替换json文件中的参数
    P1= '20201008'
    P2= '20201020'
    #sap   mysapdata P1,P2

一个完整的sample

//获取变量,以下为python语法(option)
P_DAYS = 12
MSG = '------------------------------------------------------------'
report_time = datetime.datetime.now()- datetime.timedelta(days=int(P_DAYS))
P_START_ZYM= report_time.strftime('%Y%m')

//以下为JOB专署语法
#link  lastdag 30 3600        --每30秒轮询前置任务,超时1小时
#impala_sql  sqlfile1         --执行impala
#hive_sql    sqlfile2         --执行hive
#dataset     checkinfo  321   --数据查询,校验              --
#sp  sp1   report_time,MSG    --执行SP,传入参数
#ktr myktr   P_START_ZYM      --执行kettle ktr, 传入参数
#kjb mykjb   P_START_ZYM
#/kjb abc                     --当此job暂时不要时,可/注释

--以下为python语法(option)
//////////////
lastdag >> sqlfile1 >>  \
validate >> sp1 >> [myktr, mykjb]
lastdag >> sqlfile2

高级用法, 如何实现分支判断及DAG触发

'''
编写自定义函数名为:任务名 ,基于条件自定义返回的分支进行后续执行,未返回的分支将不会执行
'''
def inv_validate():
    dataset = get_dataset(407)
    if len(dataset['data']) > 1:
        return 'quadrant_6month_list'
    return 'triggertaskname'

#impala_sql quadrant_6month_list
#trigger  triggertaskname  targetdagname 
#branch branch_jobname inv_validate 

/////////////////////////

branch_jobname  >> [quadrant_6month_list, triggertaskname]

高级用法, 分配到不同worker执行

指定dag的queue即可, worker的启动方式 airflow celery worker -q bg

#ktr abc
#ktr ddd

////////
abc.queue = 'bg'
abc >> ddd

高级用法-自定义

'''
你的备注可以写在这
'''
#自定义你的执行函数
import requests
import json
def refresh_XX():
    response=requests.get('',verify=False)
    print(response.text)
    response=json.loads(response.text)
    if response['status']!=200:
        raise Exception(str(response))

#diy refresh_job  refresh_XX  --注意: jobname不可与函数名同名

其它不重要说明

全局参数定义的方法,airflow自带,但不常用了
OPG_LOAD_DAYS = Variable.get("OPG_LOAD_DAYS") //从airflow中获取
MSG = Variable.get("MSG")
report_time = datetime.datetime.now()- datetime.timedelta(days=int(OPG_LOAD_DAYS))
P_START_ZYM= report_time .strftime('%Y%m') //由python代码生成的参数