各位老铁们,大家好,今天由我来为大家分享大数据调度平台Airflow(六):Airflow算子及案例,以及的相关问题知识,希望对大家有所帮助。如果可以帮助到大家,还望关注收藏下本站,您的支持是我们最大的动力,谢谢大家了哈,下面我们开始吧!
Airflow 中最重要的是各种Operator,它们允许生成特定类型的任务。该任务在实例化时称为DAG 中的任务节点。所有Operators都派生自BaseOparator并继承了许多属性和方法。关于BaseOperator的参数可以参考:
http://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html#module-airflow.models.baseoperator
BaseOperator中常用的参数如下:
task_id(str): 唯一的task_id标签
owner(str): 任务的所有者,建议使用linux用户名
email(str或liststr): 出现问题时,可以填写多个地址发送报警邮件,以逗号分隔。
email_on_retry(bool): 重试任务时是否发送电子邮件
email_on_failure(bool): 任务执行失败时是否发送邮件
retries(int):任务失败之前应重试的次数
retry_delay(datetime.timedelta): 重试间隔,必须是timedelta对象
start_date(datetime.datetime):DAG开始执行时间,该参数必须是datetime对象,不能使用字符串。
end_date(datetime.datetime):DAG运行结束时间。任务启动后一般会继续执行。该参数一般不设置。
depends_on_past (bool, 默认False) : 是否依赖于过去,如果为True,则之前的DAG调度必须成功,才能执行当前的DAG调度。
dag(airflow.models.DAG): 指定的dag。
execution_timeout(datetime.timedelta):允许执行此任务实例的最长时间。如果超过最大时间,任务将会失败。
trigger_rule(str): 定义依赖的触发规则,包括以下选项:全部失败|全部完成|成功|一个_失败|无失败| none_failed_or_skipped | 无失败或跳过无跳过| dummy(无条件执行)}默认为all_success。
一、BashOperator及调度Shell命令及脚本
BashOperator主要执行bash脚本或命令。 BashOperator参数如下:
bash_command(str): 要执行的命令或脚本(脚本必须以.sh结尾)
BashOperator调度Shell命令案例from datetime import datetime, timedeltafrom airflow import DAGfrom airflow.operators.bash import BashOperatordefault_args={ ‘owner’:’zhangsan’, ‘start_date’:datetime(2021, 9, 23), ’email’:’kettle_test1@ 163.com’, #pwd:kettle123456 ‘retries’: 1, #重试失败次数’retry_delay’: timedelta(分钟=5) #失败重试间隔}dag=DAG( dag_id=’execute_shell_cmd’, default_args=default_args, Schedule_interval=timedelta (分钟=1))t1=BashOperator(task_id=’print_date’, bash_command=’日期’, dag=dag)t2=BashOperator(task_id=’print_helloworld’, bash_command=’echo ‘hello world!”, dag=dag )t3=BashOperator( task_id=’templated’, bash_command=”’ {% for i in range(5) %} echo ‘{{ ds }}’ echo ‘{{ params.name}}’ echo ‘{{ params .age}}’ {% endfor %} ”’, params={‘name’:’wangwu’,’age’:10}, dag=dag)t1 t2 t3
请注意,t3 中使用了Jinja 模板。 “{% %}”里面是for标签,用于循环操作,但必须以{% endfor %}结尾。 “{{}}”里面是变量,其中ds是执行日期,是airflow的宏变量,params.name和params.age是自定义变量。
default_args中的email是指当DAG执行失败时发送邮件到指定邮箱。如果要使用airflow发送邮件,需要在$AIRFLOW_HOME/airflow.cfg中配置以下内容:
[smtp]#如果你想让airflow在重试、失败时发送邮件,并且你想使用#airflow.utils.email.send_email_smtp功能,你必须配置一个#smtp服务器这里mtp_host=smtp.163.comsmtp_starttls=Truesmtp_ssl=False# 示例: smtp_user=airflowsmtp_user=kettle_test2# 示例: smtp_password=airflowsmtp_password=VIOFSYMFDIKKIUEAsmtp_port=25smtp_mail_from=kettle_test2@163.comsmtp_timeout=30smtp_retry_limit=5
另外,电子邮件地址信息如下:
邮箱1:kettle_test1@163.com 密码:kettle123456
邮箱2:kettle_test2@163.com 密码:kettle123456
163 邮件SMTP 服务器地址: smtp.163.com 端口:25
配置163邮箱时,需要启用“POP3/SMTP/IMAP服务”服务。设置如下:
Kettle_test1@163.com FECJJVEPGPTZJYMQ
Kettle_test2@163.com VIOFSYMFDIKKIUEA
BashOperator调度Shell脚本案例准备以下两个Shell脚本。将以下两个脚本放置在$AIRFLOW_HOME/dags 目录中。
BashOperator默认执行脚本时,会从/tmp/airflow**临时目录中搜索对应的脚本。由于临时目录的名称不确定,建议执行脚本时在“bash_command”中写入绝对路径。如果要写相对路径,可以将脚本放在/tmp目录下,通过写“sh ./xxx.sh”来执行“bash_command”中的命令。
第一个shell.sh
#!/bin/bashdt=$1echo ‘====执行第一个shell====’echo ‘—- 第一个:时间是${dt}’
第二个shell.sh
#!/bin/bashdt=$1echo ‘====执行第二个shell====’echo ‘—-第二个:时间是${dt}’
编写airflow python配置:
from datetime import datetime, timedeltafrom airflow import DAGfrom airflow.operators.bash import BashOperatordefault_args={ ‘owner’:’zhangsan’, ‘start_date’:datetime(2021, 9, 23), ‘retries’: 1, #重试失败次数’ retry_delay’: timedelta(分钟=5) # 失败重试间隔}dag=DAG( dag_id=’execute_shell_sh’, default_args=default_args, Schedule_interval=timedelta(分钟=1))first=BashOperator( task_id=’first’, #编写脚本建议写绝对路径bash_command=’sh /root/airflow/dags/first_shell.sh %s’%datetime.now().strftime(‘%Y-%m-%d’), dag=dag)second=BashOperator(task_id=’second’, #脚本路径建议写绝对路径bash_command=’sh /root/airflow/dags/second_shell.sh %s’%datetime.now().strftime(‘% Y-%m-%d’), dag=dag)第一秒
执行结果:
特别注意:在“bash_command”中编写执行脚本时,无论是否有参数,一定要在脚本后面加一个空格,否则会找不到对应的脚本。如下:
二、SSHOperator及调度远程Shell脚本
在实际调度任务中,大多数任务脚本分布在不同的机器上。我们可以使用SSHOperator 来调用远程机器上的脚本任务。 SSHOperator 使用ssh 协议与远程主机进行通信。需要注意的是,SSHOperator在调用脚本时并不会读取用户的配置文件。最好在脚本中添加如下代码,这样调用脚本时会自动读取当前用户的配置信息:
#Ubunto系统。 ~/.profile#CentoOS 或RedHat 系统。 /.bashrc
SSHOperator参数的详细解释请参考:
airflow.providers.ssh.operators.ssh — apache-airflow-providers-ssh 文档
SSHOperator常用参数如下:
ssh_conn_id(str): ssh连接id,名称自行选择,需要在airflow webserver界面中配置。具体配置请参考案例。 remote_host (str) : 远程连接节点主机,如果配置,可以替换ssh_conn_id中配置的远程主机,可选。 command(str): 在远程主机上执行的命令或脚本。
SSHOperator调度远程节点脚本案例。请按照以下步骤使用SSHOperator 调度远程节点脚本:
1.安装“apache-airflow-providers-ssh”提供程序包
首先,停止airflow webserver和scheduler,切换到node4节点上的python37环境,安装ssh连接包。另外,Providers包的安装方法可以参考以下官网地址:
https://airflow.apache.org/docs/apache-airflow-providers/packages-ref.html#apache-airflow-providers-ssh
#切换Python37环境[root@node4 ~]# conda activate python37#安装ssh提供程序包(python37) [root@node4 ~]# pip install apache-airflow-providers-ssh==2.1.1#启动airflow(python37) [ root@node4 ~]# 气流网络服务器–port 8080(python37) [root@node4 ~]# 气流调度程序
2.配置SSH Connection连接
登录airflow webui,选择“管理”-“连接”:
单击“+”添加连接。这里主机连接到node5节点:
3. 准备远程执行脚本
在node5节点/根路径下创建first_shell.sh,内容如下:
#!/bin/bashecho ‘====执行第一个shell====’
在node3节点/根路径下创建second_shell.sh,内容如下:
#!/bin/bashecho ‘====执行第二个shell====’
4.编写DAG python配置文件
注意,使用本地开发工具编写python配置时,需要使用SSHOperator,并且需要在对应的本地python环境中安装对应的provider包。
C:\Users\wubaid:D:\cd d:\ProgramData\Anaconda3\envs\python37\Scriptsd:\ProgramData\Anaconda3\envs\python37\Scriptspip 安装apache-airflow-providers-ssh==2.1.1
python配置文件:
从日期时间导入日期时间,时间增量从气流导入DAG从airflow.operators.bash导入BashOperator从airflow.providers.ssh.operators.ssh导入SSHOperatordefault_args={‘所有者’:’lisi’,’start_date’:datetime(2021,9,23),’ retries’: 1, # 失败重试次数’retry_delay’: timedelta(分钟=5) # 失败重试间隔}dag=DAG( dag_id=’execute_remote_shell’, default_args=default_args, Schedule_interval=timedelta(分钟=1))first=SSHOperator( task_id=’first’, ssh_conn_id=’ssh-node5′,# 配置Airflow webui Connection中配置的SSH Conn id command=’sh /root/first_shell.sh ‘, dag=dag)second=SSHOperator( task_id=’ secondary’, ssh_conn_id=’ssh-node5′,#配置Airflow webui Connection中配置的SSH Conn id command=’sh /root/second_shell.sh ‘,remote_host=’192.168.179.6’,#如果配置remote_host,将会替换host dag=dag)Connection 中SSH 配置的第一秒
5.调度python配置脚本
将上面配置的python文件上传到node4节点$AIRFLOW_HOME/dags,重启Airflow websever和调度器,登录webui,开始调度:
调度结果如下:
三、HiveOperator及调度HQL
可以通过HiveOperator直接操作Hive SQL。 HiveOperator的参数如下:
hql(str): 需要执行Hive SQL。 hive_cli_conn_id(str): 连接到Hive 的conn_id,在气流webui 连接中配置。
要使用HiveOperator在airflow中调用Hive任务,首先需要安装以下依赖项并配置Hive Metastore:
#切换Python37环境[root@node4 ~]# conda activate python37#安装hive提供程序包(python37) [root@node4 ~]# pip install apache-airflow-providers-apache-hive==2.0.2#启动airflow(python37) ) [root@node4 ~]# 气流网络服务器–port 8080(python37) [root@node4 ~]# 气流调度程序
登录Airflow webui 并设置Hive Metastore。登录后,找到“Admin”-“Connections”,点击“+”添加新配置:
HiveOperator调度HQL案例1.启动Hive并准备表
启动HDFS和Hive Metastore,并在Hive中创建以下三个表:
创建表person_info(id int,name string,age int) 行格式分隔字段以’\t’ 结尾;创建表Score_info(id int,name string,score int) 行格式分隔字段以’\t’ 结尾;
将以下数据加载到表person_info 中:
1 zs 18
2 号19
3 WW 20
将以下数据加载到表score_info中:
1 号100
2LS 200
3 WW 300
2、在node4节点上配置Hive客户端
由于Airflow在使用HiveOperator时需要在Airflow安装节点上安装Hive客户端,因此需要在node4节点上配置Hive客户端。
将Hive安装包上传到node4,解压到“/software”下,配置Hive环境变量
#在/etc/profile文件末尾配置Hive环境变量export HIVE_HOME=/software/hive-1.2.1export PATH=$PATH:$HIVE_HOME/bin#使环境变量生效source /etc/profile
修改HIVE_HOME/conf/hive-site.xml,写入以下内容:
配置属性namehive.metastore.warehouse.dir/name value/user/hive/warehouse/value /property 属性namehive.metastore.local/name valuefalse/value /property 属性namehive.metastore.uris/name valuethrift://node1:9083/value /属性/配置
3.编写DAG python配置文件
注意,使用本地开发工具编写python配置时,需要使用HiveOperator,并且需要在对应的本地python环境中安装对应的provider包。
C:\Users\wubaid:D:\cd d:\ProgramData\Anaconda3\envs\python37\Scriptsd:\ProgramData\Anaconda3\envs\python37\Scriptspip install apache-airflow-providers-apache-hive==2.0.2 注意:也可以本地安装这里如果缺少对应的C++环境,我们可以不安装或者直接跳过。
Python配置文件:
from datetime import datetime, timedeltafrom airflow import DAGfrom airflow.providers.apache.hive.operators.hive import HiveOperatordefault_args={ ‘owner’:’wangwu’, ‘start_date’:datetime(2021, 9, 23), ‘retries’: 1, # 失败重试次数’retry_delay’ : timedelta(分钟=5) # 失败重试间隔}dag=DAG( dag_id=’execute_hive_sql’, default_args=default_args, Schedule_interval=timedelta(分钟=1))first=HiveOperator( task_id=’ person_info’, hive_cli_conn_id=’node1-hive-metastore’, hql=’从person_info 中选择id,name,age’, dag=dag)second=HiveOperator( task_id=’score_info’, hive_cli_conn_id=’node1-hive-metastore’ , hql=’从Score_info 中选择id、名称、分数’, dag=dag)third=HiveOperator( task_id=’join_info’, hive_cli_conn_id=’node1-hive-metastore’, hql=’选择a.id,a.name, a .age,b.score from person_info a join Score_info b on a.id=b.id’, dag=dag)第一第二第三
4.调度python配置脚本
将上面配置的python文件上传到node4节点$AIRFLOW_HOME/dags,重启Airflow websever和调度器,登录webui,开始调度:
调度结果如下:
四、PythonOperator
PythonOperator 可以调用Python 函数。由于Python基本上可以调用任何类型的任务,如果实在找不到合适的Operator,请将任务转换为Python函数并使用PythonOperator。
PythonOperator的常用参数如下。更多参数可以查看官网:airflow.operators.python — Airflow Documentation
原创文章,作者:小su,如若转载,请注明出处:https://www.sudun.com/ask/200935.html
用户评论
古巷青灯
终于看到有人写了关于 Airflow Operator 的文章!我之前一直在摸索怎么用它来整合我的工作流,这篇文章刚好给了我一些思路。
有10位网友表示赞同!
屌国女农
这篇博文写的非常好!解释的很清晰,把各种 Operators 的用法都详细介绍了一遍,还配上了案例分析,非常实用。
有13位网友表示赞同!
巷雨优美回忆
对于 Airflow 新手来说,这个帖子太棒了!之前一直对 Operators 感到困惑,现在看起来终于明白了,感谢作者!
有11位网友表示赞同!
海盟山誓总是赊
我比较关注的是一些实际应用的场景,比如在实时大数据处理中使用 Operators 如何进行调度和控制?文章里能提供更多案例吗?
有14位网友表示赞同!
日久见人心
虽然文章内容很不错,但我觉得代码示例可以丰富一些,最好能加入不同类型的 Operator 的示例代码,这样更直观易懂.
有16位网友表示赞同!
放肆丶小侽人
其实对于复杂的流程,仅仅依靠 Operators 感觉还是显得过于局限,有没有其他更高层次的框架或者工具可以辅助进行调度和管理?
有18位网友表示赞同!
黑夜漫长
Airflow 的学习曲线确实挺高的,尤其是那些比较专业的 Operators 。希望以后作者能分享更多实战经验,帮助大家更快上手。
有11位网友表示赞同!
葵雨
我觉得这篇文章最棒的地方就是案例分析部分,很符合我的需求。因为理论知识了解很容易忘记,实践案例更容易记忆和理解。
有11位网友表示赞同!
惯例
我一直在寻找高效的调度平台,希望 Airflow 能真正解决我们的问题。但这个帖子让我有些担心,因为它描述的 Operators 和案例似乎都比较局限
有18位网友表示赞同!
自繩自縛
这篇博文写的太浅了,对复杂的操作流程以及部署和管理方面都没有深入探讨,对于有经验的技术人员来说并不值一提.
有17位网友表示赞同!
为爱放弃
感觉 Airflow 越来越强大!之前还想着要自己开发调度系统,现在有了 Airflow 的 Operators 直接使用,简直省心多了。
有19位网友表示赞同!
孤独症
我比较好奇 Airflow 和其他调度平台的差异性是什么?例如Kubernetes和 Apache NiFi。这篇博文能不能多谈一下不同平台的优缺点比较?
有9位网友表示赞同!
淡抹丶悲伤
学习了几个月 Airflow,终于可以理解这些 Operators 的作用了!之前总是感觉它们就像一盘散沙,现在有了连接点,整体架构就清晰多了.
有11位网友表示赞同!
冷青裳
我还是想要看到更多关于数据流处理、任务依赖等方面的应用场景案例,而不是停留在简单的代码示例上。
有5位网友表示赞同!
最怕挣扎
Airflow 的 Operators 太强大啦!能解决很多复杂的调度问题。我正在尝试使用它来优化我们的数据管道流程,期待能够获得更好的效果.
有9位网友表示赞同!
弃我者亡
对于大型复杂的数据流系统,是否需要考虑将 Airflow 和其他工具进行集成呢?例如使用 Kafka 来管理消息传递?
有7位网友表示赞同!