SmartPip DAG设定方法


关于任务调度的配置方法:

自定义参数方法

#当前时间
report_time = datetime.datetime.now()
#当前时间,往后推一天, 也可以小时(hours), 分钟(minutes), 秒(seconds)...
report_time = datetime.datetime.now()- datetime.timedelta(days=1)
#获取上个月最后一天
report_time = datetime.datetime.now().replace(day=1) - datetime.timedelta(days=1)
#格式化时间为字符串
时间:(%Y-%m-%d %H:%M:%S):  2015-03-08 23:30:42
P_START_ZYMD= report_time.strftime('%Y%m%d')
#可以直接传一个参数
P_ZYM='201912'
#也可以采用格式化整一个条件参数,格式f"xxxxxx",参数用{}包裹
PARAM_DMS_BP=f"where create_time >= '{P_START_ZYMD}' "

设定方法

--JOB设定方法,每一个JOB为一个节点,填写驱动名称和要执行的任务名
--任务名不可以用数字开头如00xxx, 不可以使用非法字符, 不可使用符号如(.)
#starrocks_sql  sql_filename
--可以传入参数, 如
#impala_sql  sql_filename P_START_ZYM,MSG
--也可以为每个JOB增加备注
#impala_sql  sql_filename     --我是备注

--DAG的设定方法
JOB写完以下, 在下方输入////, 4个以上/分隔为DAG设定
后可输入如: sql_filename1 >> sql_filename2 >> sql_filename3  为一个分支

完整样列可参考 完整样列

目前支持的驱动:

驱动名 说明 使用说明
datax 执行datax的抽取 模板会自动带出
kafkastarrocks 使用Streamload方法消费kafka到starrocks 使用方法参考下文
apistarrocks api接入到starrocks 使用方法参考下文
routinestarrocks 监控routineload方法消费kafka到starrocks #routinestarrocks 任务名 标签名 不预警[可选], 标签名带库:db.label
starrocks_sql 使用starrocks执行sql
hive_sql 使用hive执行sql refresh table语句时会自动调用impala来执行
impala_sql 使用impala执行sql
oracle_sql 执行oracle sql
gp_sql 执行greenplum或postgreSql sql
mssql_sql 执行sqlserver sql
myql_sql 执行mysql sql
sp 使用默认的数据库执行存储过程
py 执行python脚本
diy 自定义任务 使用方法参考下文
dataset 实现对任意数据库的查询与校验 需要smartchart的支持,使用方法参考下文
refresh_quality 刷新数据质量 #refresh_quality jobname 项目名
refresh_smc 刷新smartchart仪表盘 #refresh_smc jobname 报表ID或报表名
link DAG依赖 #link DAG的名字 sleeptime[可选] maxtime[可选],使用方法参考下文
branch 实现基于自定义的条件function执行不同的分支 #branch jobname 函数名(具体方法参考下文) 注意: jobname不可和函数名同名
trigger 触放目标dag执行 #trigger 任务名称 目标DAG名称
validate 使用默认的数据库执行select sql来做数据校验 如果select无返回值, 则认为校验失败, 并返回查询的结果,使用方法#validate sql_filename
refresh_tableau 刷新tableau数据源 #refresh_tableau jobname sourceID, sourceID是tableau对的需要刷新的数据源ID
ktr 执行kettle的transform
kjb 执行kettle的job
ktr_r 执行远程kettle的transform
kjb_r 执行远程kettle的job
dataxx 执行datax的抽取 采用原生配置方式
sqoop 使用sqoop抽取到大数据平台 使用方法参考不常用方法
hdfsstarrocks 使用brokerload方法到starrocks 使用方法参考不常用方法
sap 调用sap的rfc功能抽取数据到表 使用方法参考不常用方法
hdfs 转移到大数据平台执行ktr 将自动把生成的文本文件导入到hive的stage表, 表名需要与ktr名称一样
dataset实现对任意数据库的查询与校验
使用方法: #dataset jobname id remark[可选] maillist[可选] 
remark参数说明: 
      不填写 - 默认无查询结果抛出错误
      info - 只打印查询结果,
      e1 - 如果有返回结果抛出错误, 
      e2 - 邮件通知查询结果(可以用于需要定时邮件收到数据的需求)
      e3 - 无查询结果时,15分钟会重试,2小时还无结果抛出错误,
           可用于轮询业务系统, 等待业务系统数据ready后再抽取

id参数: 可以是数据集ID也可以填数据集的名称, 
maillist: 格式 xxx@xxxx,xxx@xxx

你也可以采用高级自定义,使用
get_dataset(id):传入数据集ID, 返回值
{"result":"success","data":[[]]}
link DAG依赖
使用方法 #link DAG的名字  sleeptime[可选]  maxtime[可选]
    #link DAG_A  --如DAG_A Fail或进行中,此节点后的流程不会再执行
    #link DAG_A  60  3600  --如DAG_A还在进行中, 每60秒检查一次
       如有完成则继续任务, 重试达到3600秒后, 还未完成, 则报错
kafkastarrocks 使用Stream load方法消费kafka到starrocks
##kafkaConn=xxx  --连接器名
##topic=Test1       --kafka topic名称
##table=test.loadcsv    --目标表名

--------- 通用参数[可选] ----------
##skipError=1   -- 是否跳过错误
##max_filter_ratio=0.2 --容错率
##columns=msg_seq,kafka_time,create_time=unix_timestamp()  --指定字段

--------- 如果kafka是Json格式[可选] ----------
##format=json
##jsonpaths= a,b
##json_root: list   --json 根节点名

-------  如果是kafka是类csv[可选]  ------
##column_separator=,   -- 指定分隔为逗号, starrocks默认使用\t

------- 只在运维时使用 ------
##offsets=-996 -- 指定从最后一个offset开始消费
也可指定初始offset, 也可使用{"0":"123", "1":"99"}来指定不同partition的offset
apistarrocks api接入到starrocks
##apiConn=xxx  -- api连接器名(在租户管理中配置)
##param={"p1":"xxx"}       -- 传入连接器的参数[可选]

其它starrocks端配置同kafkastarrocks
branch,trigger 实现分支判断及DAG触发
'''
编写自定义函数名为:任务名 ,基于条件自定义返回的分支进行后续执行,未返回的分支将不会执行
'''
def inv_validate():
    dataset = get_dataset(407)
    if len(dataset['data']) > 1:
        return 'quadrant_6month_list'
    return 'triggertaskname'

#impala_sql quadrant_6month_list
#trigger  triggertaskname  targetdagname 
#branch branch_jobname inv_validate 

/////////////////////////

branch_jobname  >> [quadrant_6month_list, triggertaskname]
diy 自定义任务
'''
你的备注可以写在这
'''
#自定义你的执行函数
import requests
def refresh_XX():
    response=requests.get('',verify=False).json()
    if response['status']!=200:
        raise Exception(str(response))

#diy refresh_job  refresh_XX  --注意: jobname不可与函数名同名

其它高级方法

任务触发方式
  • all_success: 所有上游成功
  • all_failed: 所有上游任务失败
  • all_done: 所有上游任务完成
  • one_failed: 只要一个任务失败
  • one_success:只有一个任成功
  • none_failed: 上游任务没有失败
  • dummy: 无依赖

样列

#datax task1
#datax task2
/////
dag.trigger_rule='all_success'   #可全局指定
task2.trigger_rule = 'all_done'  #指定单个任务
高级用法, 分配到不同worker执行

指定dag的queue即可, worker的启动方式

airflow celery worker -q bg
#ktr abc
////////
abc.queue = 'bg'
更多高级自定义
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'queue': 'bash_queue',
'pool': 'backfill',
'priority_weight': 10,
'end_date': datetime(2016, 1, 1),
'wait_for_downstream': False,
'sla': timedelta(hours=2),
'execution_timeout': timedelta(seconds=300),
'on_failure_callback': some_function,
'on_success_callback': some_other_function,
'on_retry_callback': another_function,
'sla_miss_callback': yet_another_function,
'trigger_rule': 'all_success'