首充送50%
续费低至5折
阿里云CDN 1折购
免费代充值
免费选购指南
免费协助迁移

使用 DynamoDB 简化 MWAA(Airflow)任务调度开发

2023-07-25

1. 背景介绍

我们看到越来越多的商业应用对于数据处理任务的调度提出需求,无论是处理原始数据清洗,数据仓库的分层任务,亦或是复杂大规模计算的场景。任务管理编排服务被更多业务场景所提及,Amazon Managed Workflows for Apache Airflow 是一项适用于 Apache Air flow 的托管式编排服务,让您能够在云中大规模设置和操作数据管道。

Apache Airflow 是一种开源工具,用于以编程方式编写、安排和监控称为工作流的流程和任务序列。借助 Amazon MWAA,您可以使用 Apache Airflow 和 Python 来创建工作流程,而无需管理底层基础设施以实现可扩展性、可用性和安全性。Amazon MWAA 会自动扩展其工作流程执行能力以满足您的需求,Amazon MWAA 集成了 AWS 安全服务,可帮助您快速安全地访问数据。

MWAA 服务有着诸多优点,如灵活创建不同版本的 Airflow 集群,可以自动扩展组件,通过设置在您的环境中运行的最小和最大工作线程数,自动扩展 Worker。集成启用基于角色的身份验证和授权等等。但对于 Airflow 本身,在进行任务编排及开发需要编写大量的 Python 脚本,用户有着一定的学习和使用成本。我们也看到,其中大部分任务的代码几乎都有通用性。本文提供一种使用 serverless 数据库 DynamoDB 来管理 Airflow 任务,并利用 Airflow scheduler 自动生成 DAG 的方式,使用户仅需编写通用任务插件,通过 DynamoDB 表格管理任务及作业参数,包含依赖关系,任务具体执行自定义参数等,自动生成 Airflow DAG 并自定义时间运行调度任务。

2. 设计实现思路

2.1 任务组织

在实践中,常见的,一个具体的数据处理任务称之为 Task,例如一次 Redshift SQL 执行,一次 Spark 批任务处理或一次 Python 脚本的运行。本方案实现的功能,会提供一个基础的 Task 模版类,并提供 Redshift SQL 任务示例代码。帮助用户编写通用 Task 方法。在本方案中,一个 Job DAG 由多个 Task 构成,并且可以组织其 Task 依赖关系。为了兼容更加复杂的场景,添加了自定义 Scheduler DAG,它由多个 Job 组成,在 Scheduler DAG 统一设置定时触发策略,作为作业执行的主 DAG,按照 Job 之间依赖依次触发具体 job 执行。在这样的设计里,业务可以实现诸如先并发处理多个分散业务模块的数据处理,再进行汇总任务的处理。设计图示如下。

2.2 架构实现

结合 DynamoDB Serverless 数据库支持任意规模数据的快速读取特性,将 Task,Job,Scheduler 在 DynamoDB Table 中记录管理。编写统一的 DAG 文件部署到 MWAA S3 桶中,Airflow 定时扫描数据库里新的任务记录,自动生成 DAG。通过自定义定时触发运行 Task,如 Redshift、Athena、EMR 等任务,并在 Task 里可以配置监控告警,发送到日志及邮件等告警。

3. 方案部署指南

源码链接:https://github.com/SEZ9/Airflow-dynamic-dags

3.1 创建 DynamoDB 表

创建三张 DynamoDB 表格分别管理 Scheduler、Job,Task,命名如下:mwaa_scheduler,mwaa_job,mwaa_task。其中表字段及说明如下。

表名:mwaa_scheduler

分区键:schedule_id

属性字段:

job_list 字段类型:字符串;解释:关联的 job id,示例:1,2

schedule_description 字段类型:字符串;解释:调度器的描述信息

schedule_job_configs 字段类型:字符串;解释:记录调度器的额外信息

schedule_job_dependencies 字段类型:字符串;解释:job 间的依赖关系,示例:[{“job_id”:”1″,”pid”:”2″}]

schedule_name 字段类型:字符串;解释:调度器的名称,必须为英文

schedule_params 字段类型:字符串;解释:调度器配置参数,示例:{“schedule_interval”:”0 0 12 * * “}

status 字段类型 数字;解释:调度器状态 1 为启用,0 为禁用

表名:mwaa_job

分区键:job_type

排序字段:job_id

属性字段:

job_name 字段类型:字符串;解释:作业名称

job_params 字段类型:字符串;解释:作业的额外参数

job_task_configs 字段类型:字符串;解释:关联 task 的额外参数

job_task_dependencies 字段类型:字符串;解释:Task 间的依赖关系,示例:[{“task_id”:”1″,”pid”:”2″}]

status 字段类型:数字;解释:作业状态  1 为启用,0 为禁用

表名:mwaa_task

分区键:task_id

属性字段:

task_name 字段类型:字符串;解释:任务名称

task_params 字段类型:字符串;解释:任务参数,示例:{“sql”:””}

task_type 字段类型:字符串;解释:任务类型

3.2 修改代码配置文件

在代码路径中找到 config/conf.py 文件修改公共参数,如数据表名及区域。每个环境的有关任务的执行参数,如Redshift 的地址及端口用户名密码登。以 Redshift Task 示例。


class ProductionConfig(BaseConfig):
    REDSHIFT_REGION = 'us-east-1'
    Scheduler_DYNAMODB_TABLE = 'mwaa_scheduler'
    JOB_DYNAMODB_TABLE = 'mwaa_job'
    TASK_DYNAMODB_TABLE = 'mwaa_task'
    REDSHIFT_HOST = 'xxxx'
    REDSHIFT_PORT = 5439
    REDSHIFT_USER = 'xxxx'
   RDSHIFT_PWD = 'xxxx’

3.3 创建 MWAA 集群,并部署 DAG 代码

创建集群参考文档:

https://docs.aws.amazon.com/zh_cn/mwaa/latest/userguide/get-started.html

部署后点击进入 Airflow UI,编辑环境参数,指明环境是生产还是测试,区分不同环境的参数获取。

3.4 在 DynamoDB 表格中编辑任务

在 DynamoDB 创建 Task,通过 DynamoDB console 进入表格添加项目填写对应任务参数

在 DynamoDB 创建 Job,通过 DynamoDB console 进入表格添加项目填写对应任务参数

在 DynamoDB 创建 Scheduler,通过 DynamoDB console 进入表格添加项目填写对应任务参数

等待 2 分钟左右,可以在 Airflow UI 看到自动生成的 DAG

查看 Scheduler 的 Job 依赖

查看 Job 的 Task 依赖

等待预定的运行时间或手动触发 schedule DAG 执行任务

3.5 自定义开发 Task 模版

本方案提供了一个基础的 BaseTaskFactory 类,在此基础上,用户可以自定义实现不同 Task 类型的实现。

首先在 task_factory 中创建一个 task 实现类并继承 BaseTaskFactory,参考示例代码编写 task 执行逻辑。

接着在 config/conf.py 中编写任务类型与 Task 类映射关系


TASK_FACTORY_MAPPING_DICT = {
        'REDSHIFT-SQL': 'BasicRedshiftTaskFactory'
    }

最后,通过在 DynamoDB 中 task 管理表中指定 Task 类型即可实现对于 task 的调用。

4. 总结

本文介绍了如何使用 DynamoDB,利用 MWAA Airflow 动态 DAG,简化 MWAA 开发工作,用户仅需实现通用 Task 的逻辑编写,通过 DynamoDB 表格数据管理任务执行及互相依赖。并且提供了 Redshift 调用的实现逻辑参考,将大大简化用户在使用 Airflow 进行任务开发的工作,提升生产效率。


联系我们
提交工单