大数据调度平台Airflow(六):Airflow算子及案例

#头条创作挑战赛#Airflow Operators及案例Airflow中最重要的还是各种Operator,其允许生成特定类型的任务,这个任务在实例化时称为DA

各位老铁们,大家好,今天由我来为大家分享大数据调度平台Airflow(六):Airflow算子及案例,以及的相关问题知识,希望对大家有所帮助。如果可以帮助到大家,还望关注收藏下本站,您的支持是我们最大的动力,谢谢大家了哈,下面我们开始吧!

Airflow 中最重要的是各种Operator,它们允许生成特定类型的任务。该任务在实例化时称为DAG 中的任务节点。所有Operators都派生自BaseOparator并继承了许多属性和方法。关于BaseOperator的参数可以参考:

http://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html#module-airflow.models.baseoperator

BaseOperator中常用的参数如下:

task_id(str): 唯一的task_id标签

owner(str): 任务的所有者,建议使用linux用户名

email(str或liststr): 出现问题时,可以填写多个地址发送报警邮件,以逗号分隔。

email_on_retry(bool): 重试任务时是否发送电子邮件

email_on_failure(bool): 任务执行失败时是否发送邮件

retries(int):任务失败之前应重试的次数

retry_delay(datetime.timedelta): 重试间隔,必须是timedelta对象

start_date(datetime.datetime):DAG开始执行时间,该参数必须是datetime对象,不能使用字符串。

end_date(datetime.datetime):DAG运行结束时间。任务启动后一般会继续执行。该参数一般不设置。

depends_on_past (bool, 默认False) : 是否依赖于过去,如果为True,则之前的DAG调度必须成功,才能执行当前的DAG调度。

dag(airflow.models.DAG): 指定的dag。

execution_timeout(datetime.timedelta):允许执行此任务实例的最长时间。如果超过最大时间,任务将会失败。

trigger_rule(str): 定义依赖的触发规则,包括以下选项:全部失败|全部完成|成功|一个_失败|无失败| none_failed_or_skipped | 无失败或跳过无跳过| dummy(无条件执行)}默认为all_success。

一、BashOperator及调度Shell命令及脚本

BashOperator主要执行bash脚本或命令。 BashOperator参数如下:

bash_command(str): 要执行的命令或脚本(脚本必须以.sh结尾)

BashOperator调度Shell命令案例from datetime import datetime, timedeltafrom airflow import DAGfrom airflow.operators.bash import BashOperatordefault_args={ ‘owner’:’zhangsan’, ‘start_date’:datetime(2021, 9, 23), ’email’:’kettle_test1@ 163.com’, #pwd:kettle123456 ‘retries’: 1, #重试失败次数’retry_delay’: timedelta(分钟=5) #失败重试间隔}dag=DAG( dag_id=’execute_shell_cmd’, default_args=default_args, Schedule_interval=timedelta (分钟=1))t1=BashOperator(task_id=’print_date’, bash_command=’日期’, dag=dag)t2=BashOperator(task_id=’print_helloworld’, bash_command=’echo ‘hello world!”, dag=dag )t3=BashOperator( task_id=’templated’, bash_command=”’ {% for i in range(5) %} echo ‘{{ ds }}’ echo ‘{{ params.name}}’ echo ‘{{ params .age}}’ {% endfor %} ”’, params={‘name’:’wangwu’,’age’:10}, dag=dag)t1 t2 t3

请注意,t3 中使用了Jinja 模板。 “{% %}”里面是for标签,用于循环操作,但必须以{% endfor %}结尾。 “{{}}”里面是变量,其中ds是执行日期,是airflow的宏变量,params.name和params.age是自定义变量。

default_args中的email是指当DAG执行失败时发送邮件到指定邮箱。如果要使用airflow发送邮件,需要在$AIRFLOW_HOME/airflow.cfg中配置以下内容:

[smtp]#如果你想让airflow在重试、失败时发送邮件,并且你想使用#airflow.utils.email.send_email_smtp功能,你必须配置一个#smtp服务器这里mtp_host=smtp.163.comsmtp_starttls=Truesmtp_ssl=False# 示例: smtp_user=airflowsmtp_user=kettle_test2# 示例: smtp_password=airflowsmtp_password=VIOFSYMFDIKKIUEAsmtp_port=25smtp_mail_from=kettle_test2@163.comsmtp_timeout=30smtp_retry_limit=5

另外,电子邮件地址信息如下:

邮箱1:kettle_test1@163.com 密码:kettle123456

邮箱2:kettle_test2@163.com 密码:kettle123456

163 邮件SMTP 服务器地址: smtp.163.com 端口:25

配置163邮箱时,需要启用“POP3/SMTP/IMAP服务”服务。设置如下:

Kettle_test1@163.com FECJJVEPGPTZJYMQ

Kettle_test2@163.com VIOFSYMFDIKKIUEA

BashOperator调度Shell脚本案例准备以下两个Shell脚本。将以下两个脚本放置在$AIRFLOW_HOME/dags 目录中。

BashOperator默认执行脚本时,会从/tmp/airflow**临时目录中搜索对应的脚本。由于临时目录的名称不确定,建议执行脚本时在“bash_command”中写入绝对路径。如果要写相对路径,可以将脚本放在/tmp目录下,通过写“sh ./xxx.sh”来执行“bash_command”中的命令。

第一个shell.sh

#!/bin/bashdt=$1echo ‘====执行第一个shell====’echo ‘—- 第一个:时间是${dt}’

大数据调度平台Airflow(六):Airflow算子及案例

第二个shell.sh

#!/bin/bashdt=$1echo ‘====执行第二个shell====’echo ‘—-第二个:时间是${dt}’

编写airflow python配置:

from datetime import datetime, timedeltafrom airflow import DAGfrom airflow.operators.bash import BashOperatordefault_args={ ‘owner’:’zhangsan’, ‘start_date’:datetime(2021, 9, 23), ‘retries’: 1, #重试失败次数’ retry_delay’: timedelta(分钟=5) # 失败重试间隔}dag=DAG( dag_id=’execute_shell_sh’, default_args=default_args, Schedule_interval=timedelta(分钟=1))first=BashOperator( task_id=’first’, #编写脚本建议写绝对路径bash_command=’sh /root/airflow/dags/first_shell.sh %s’%datetime.now().strftime(‘%Y-%m-%d’), dag=dag)second=BashOperator(task_id=’second’, #脚本路径建议写绝对路径bash_command=’sh /root/airflow/dags/second_shell.sh %s’%datetime.now().strftime(‘% Y-%m-%d’), dag=dag)第一秒

执行结果:

特别注意:在“bash_command”中编写执行脚本时,无论是否有参数,一定要在脚本后面加一个空格,否则会找不到对应的脚本。如下:

二、SSHOperator及调度远程Shell脚本

在实际调度任务中,大多数任务脚本分布在不同的机器上。我们可以使用SSHOperator 来调用远程机器上的脚本任务。 SSHOperator 使用ssh 协议与远程主机进行通信。需要注意的是,SSHOperator在调用脚本时并不会读取用户的配置文件。最好在脚本中添加如下代码,这样调用脚本时会自动读取当前用户的配置信息:

#Ubunto系统。 ~/.profile#CentoOS 或RedHat 系统。 /.bashrc

SSHOperator参数的详细解释请参考:

airflow.providers.ssh.operators.ssh — apache-airflow-providers-ssh 文档

SSHOperator常用参数如下:

ssh_conn_id(str): ssh连接id,名称自行选择,需要在airflow webserver界面中配置。具体配置请参考案例。 remote_host (str) : 远程连接节点主机,如果配置,可以替换ssh_conn_id中配置的远程主机,可选。 command(str): 在远程主机上执行的命令或脚本。

SSHOperator调度远程节点脚本案例。请按照以下步骤使用SSHOperator 调度远程节点脚本:

1.安装“apache-airflow-providers-ssh”提供程序包

首先,停止airflow webserver和scheduler,切换到node4节点上的python37环境,安装ssh连接包。另外,Providers包的安装方法可以参考以下官网地址:

https://airflow.apache.org/docs/apache-airflow-providers/packages-ref.html#apache-airflow-providers-ssh

#切换Python37环境[root@node4 ~]# conda activate python37#安装ssh提供程序包(python37) [root@node4 ~]# pip install apache-airflow-providers-ssh==2.1.1#启动airflow(python37) [ root@node4 ~]# 气流网络服务器–port 8080(python37) [root@node4 ~]# 气流调度程序

2.配置SSH Connection连接

登录airflow webui,选择“管理”-“连接”:

单击“+”添加连接。这里主机连接到node5节点:

3. 准备远程执行脚本

在node5节点/根路径下创建first_shell.sh,内容如下:

#!/bin/bashecho ‘====执行第一个shell====’

在node3节点/根路径下创建second_shell.sh,内容如下:

#!/bin/bashecho ‘====执行第二个shell====’

4.编写DAG python配置文件

注意,使用本地开发工具编写python配置时,需要使用SSHOperator,并且需要在对应的本地python环境中安装对应的provider包。

C:\Users\wubaid:D:\cd d:\ProgramData\Anaconda3\envs\python37\Scriptsd:\ProgramData\Anaconda3\envs\python37\Scriptspip 安装apache-airflow-providers-ssh==2.1.1

python配置文件:

从日期时间导入日期时间,时间增量从气流导入DAG从airflow.operators.bash导入BashOperator从airflow.providers.ssh.operators.ssh导入SSHOperatordefault_args={‘所有者’:’lisi’,’start_date’:datetime(2021,9,23),’ retries’: 1, # 失败重试次数’retry_delay’: timedelta(分钟=5) # 失败重试间隔}dag=DAG( dag_id=’execute_remote_shell’, default_args=default_args, Schedule_interval=timedelta(分钟=1))first=SSHOperator( task_id=’first’, ssh_conn_id=’ssh-node5′,# 配置Airflow webui Connection中配置的SSH Conn id command=’sh /root/first_shell.sh ‘, dag=dag)second=SSHOperator( task_id=’ secondary’, ssh_conn_id=’ssh-node5′,#配置Airflow webui Connection中配置的SSH Conn id command=’sh /root/second_shell.sh ‘,remote_host=’192.168.179.6’,#如果配置remote_host,将会替换host dag=dag)Connection 中SSH 配置的第一秒

5.调度python配置脚本

将上面配置的python文件上传到node4节点$AIRFLOW_HOME/dags,重启Airflow websever和调度器,登录webui,开始调度:

调度结果如下:

大数据调度平台Airflow(六):Airflow算子及案例

三、HiveOperator及调度HQL

可以通过HiveOperator直接操作Hive SQL。 HiveOperator的参数如下:

hql(str): 需要执行Hive SQL。 hive_cli_conn_id(str): 连接到Hive 的conn_id,在气流webui 连接中配置。

要使用HiveOperator在airflow中调用Hive任务,首先需要安装以下依赖项并配置Hive Metastore:

#切换Python37环境[root@node4 ~]# conda activate python37#安装hive提供程序包(python37) [root@node4 ~]# pip install apache-airflow-providers-apache-hive==2.0.2#启动airflow(python37) ) [root@node4 ~]# 气流网络服务器–port 8080(python37) [root@node4 ~]# 气流调度程序

登录Airflow webui 并设置Hive Metastore。登录后,找到“Admin”-“Connections”,点击“+”添加新配置:

HiveOperator调度HQL案例1.启动Hive并准备表

启动HDFS和Hive Metastore,并在Hive中创建以下三个表:

创建表person_info(id int,name string,age int) 行格式分隔字段以’\t’ 结尾;创建表Score_info(id int,name string,score int) 行格式分隔字段以’\t’ 结尾;

将以下数据加载到表person_info 中:

1 zs 18

2 号19

3 WW 20

将以下数据加载到表score_info中:

1 号100

2LS 200

3 WW 300

2、在node4节点上配置Hive客户端

由于Airflow在使用HiveOperator时需要在Airflow安装节点上安装Hive客户端,因此需要在node4节点上配置Hive客户端。

将Hive安装包上传到node4,解压到“/software”下,配置Hive环境变量

#在/etc/profile文件末尾配置Hive环境变量export HIVE_HOME=/software/hive-1.2.1export PATH=$PATH:$HIVE_HOME/bin#使环境变量生效source /etc/profile

修改HIVE_HOME/conf/hive-site.xml,写入以下内容:

配置属性namehive.metastore.warehouse.dir/name value/user/hive/warehouse/value /property 属性namehive.metastore.local/name valuefalse/value /property 属性namehive.metastore.uris/name valuethrift://node1:9083/value /属性/配置

3.编写DAG python配置文件

注意,使用本地开发工具编写python配置时,需要使用HiveOperator,并且需要在对应的本地python环境中安装对应的provider包。

C:\Users\wubaid:D:\cd d:\ProgramData\Anaconda3\envs\python37\Scriptsd:\ProgramData\Anaconda3\envs\python37\Scriptspip install apache-airflow-providers-apache-hive==2.0.2 注意:也可以本地安装这里如果缺少对应的C++环境,我们可以不安装或者直接跳过。

Python配置文件:

from datetime import datetime, timedeltafrom airflow import DAGfrom airflow.providers.apache.hive.operators.hive import HiveOperatordefault_args={ ‘owner’:’wangwu’, ‘start_date’:datetime(2021, 9, 23), ‘retries’: 1, # 失败重试次数’retry_delay’ : timedelta(分钟=5) # 失败重试间隔}dag=DAG( dag_id=’execute_hive_sql’, default_args=default_args, Schedule_interval=timedelta(分钟=1))first=HiveOperator( task_id=’ person_info’, hive_cli_conn_id=’node1-hive-metastore’, hql=’从person_info 中选择id,name,age’, dag=dag)second=HiveOperator( task_id=’score_info’, hive_cli_conn_id=’node1-hive-metastore’ , hql=’从Score_info 中选择id、名称、分数’, dag=dag)third=HiveOperator( task_id=’join_info’, hive_cli_conn_id=’node1-hive-metastore’, hql=’选择a.id,a.name, a .age,b.score from person_info a join Score_info b on a.id=b.id’, dag=dag)第一第二第三

4.调度python配置脚本

将上面配置的python文件上传到node4节点$AIRFLOW_HOME/dags,重启Airflow websever和调度器,登录webui,开始调度:

调度结果如下:

四、PythonOperator

PythonOperator 可以调用Python 函数。由于Python基本上可以调用任何类型的任务,如果实在找不到合适的Operator,请将任务转换为Python函数并使用PythonOperator。

PythonOperator的常用参数如下。更多参数可以查看官网:airflow.operators.python — Airflow Documentation

用户评论

大数据调度平台Airflow(六):Airflow算子及案例
古巷青灯

终于看到有人写了关于 Airflow Operator 的文章!我之前一直在摸索怎么用它来整合我的工作流,这篇文章刚好给了我一些思路。

    有10位网友表示赞同!

大数据调度平台Airflow(六):Airflow算子及案例
屌国女农

这篇博文写的非常好!解释的很清晰,把各种 Operators 的用法都详细介绍了一遍,还配上了案例分析,非常实用。

    有13位网友表示赞同!

大数据调度平台Airflow(六):Airflow算子及案例
巷雨优美回忆

对于 Airflow 新手来说,这个帖子太棒了!之前一直对 Operators 感到困惑,现在看起来终于明白了,感谢作者!

    有11位网友表示赞同!

大数据调度平台Airflow(六):Airflow算子及案例
海盟山誓总是赊

我比较关注的是一些实际应用的场景,比如在实时大数据处理中使用 Operators 如何进行调度和控制?文章里能提供更多案例吗?

    有14位网友表示赞同!

大数据调度平台Airflow(六):Airflow算子及案例
日久见人心

虽然文章内容很不错,但我觉得代码示例可以丰富一些,最好能加入不同类型的 Operator 的示例代码,这样更直观易懂.

    有16位网友表示赞同!

大数据调度平台Airflow(六):Airflow算子及案例
放肆丶小侽人

其实对于复杂的流程,仅仅依靠 Operators 感觉还是显得过于局限,有没有其他更高层次的框架或者工具可以辅助进行调度和管理?

    有18位网友表示赞同!

大数据调度平台Airflow(六):Airflow算子及案例
黑夜漫长

Airflow 的学习曲线确实挺高的,尤其是那些比较专业的 Operators 。希望以后作者能分享更多实战经验,帮助大家更快上手。

    有11位网友表示赞同!

大数据调度平台Airflow(六):Airflow算子及案例
葵雨

我觉得这篇文章最棒的地方就是案例分析部分,很符合我的需求。因为理论知识了解很容易忘记,实践案例更容易记忆和理解。

    有11位网友表示赞同!

大数据调度平台Airflow(六):Airflow算子及案例
惯例

我一直在寻找高效的调度平台,希望 Airflow 能真正解决我们的问题。但这个帖子让我有些担心,因为它描述的 Operators 和案例似乎都比较局限

    有18位网友表示赞同!

大数据调度平台Airflow(六):Airflow算子及案例
自繩自縛

这篇博文写的太浅了,对复杂的操作流程以及部署和管理方面都没有深入探讨,对于有经验的技术人员来说并不值一提.

    有17位网友表示赞同!

大数据调度平台Airflow(六):Airflow算子及案例
为爱放弃

感觉 Airflow 越来越强大!之前还想着要自己开发调度系统,现在有了 Airflow 的 Operators 直接使用,简直省心多了。

    有19位网友表示赞同!

大数据调度平台Airflow(六):Airflow算子及案例
孤独症

我比较好奇 Airflow 和其他调度平台的差异性是什么?例如Kubernetes和 Apache NiFi。这篇博文能不能多谈一下不同平台的优缺点比较?

    有9位网友表示赞同!

大数据调度平台Airflow(六):Airflow算子及案例
淡抹丶悲伤

学习了几个月 Airflow,终于可以理解这些 Operators 的作用了!之前总是感觉它们就像一盘散沙,现在有了连接点,整体架构就清晰多了.

    有11位网友表示赞同!

大数据调度平台Airflow(六):Airflow算子及案例
冷青裳

我还是想要看到更多关于数据流处理、任务依赖等方面的应用场景案例,而不是停留在简单的代码示例上。

    有5位网友表示赞同!

大数据调度平台Airflow(六):Airflow算子及案例
最怕挣扎

Airflow 的 Operators 太强大啦!能解决很多复杂的调度问题。我正在尝试使用它来优化我们的数据管道流程,期待能够获得更好的效果.

    有9位网友表示赞同!

大数据调度平台Airflow(六):Airflow算子及案例
弃我者亡

对于大型复杂的数据流系统,是否需要考虑将 Airflow 和其他工具进行集成呢?例如使用 Kafka 来管理消息传递?

    有7位网友表示赞同!

原创文章,作者:小su,如若转载,请注明出处:https://www.sudun.com/ask/200935.html

Like (0)
小su的头像小su
Previous 2024年9月27日 下午11:40
Next 2024年9月27日 下午11:48

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注