公司新闻

使用 AWS 数据库迁移服务 (DMS)、AWS Step Functions 和 Redshift

使用 AWS 数据库迁移服务 (DMS)、AWS Step Functions 和 Redshift

使用 AWS 实现从数据库自动加载数据到 Amazon Redshift

主要内容

本文介绍了如何使用 AWS 数据库迁移服务DMS、AWS Step Functions 和 Redshift Data API 将数据从数据库自动加载到 Amazon Redshift。这种方法实际上是为了简化数据仓库实施过程,确保以自动化方式处理和加载运行时数据,同时保持数据的及时更新和纪律性。

Amazon Redshift 是一个快速、可扩展、安全且完全管理的云数据仓库,使用户能够使用标准 SQL 和现有的 ETL、商业智能BI及报表工具以简单且具成本效益的方式分析所有数据。

随着数据增长,确保最终用户能在合适的时间和合适的地点获取数据显得至关重要。完全自动化并高度可扩展的 ETL 过程有助于减少管理常规 ETL 流水线所需的运营工作。此外,及时刷新数据对于数据仓库至关重要。

数据集成方法

数据集成过程有两种主要方法:

全量加载 这一方法会完全重新加载特定数据仓库表或数据集中的所有数据。增量加载 这一方法仅专注于更新或添加已更改或新生成的数据到现有数据集中。

本文主要探讨如何自动处理那些完全改变且无法追踪变化的数据。

解决方案概述

该工作流包括以下步骤:

使用 AWS DMS 迁移任务从配置好的 SQL Server 数据源完全加载数据集到 Redshift 集群的暂存区。AWS DMS 在迁移任务完成后向 EventBridge 发布复制作业已停止事件。EventBridge 将事件路由到 Step Functions 状态机。状态机通过 Redshift Data API 调用 Redshift 存储过程,将数据集从暂存区加载到生产表中。

以下架构图展示了使用 AWS 服务的端到端解决方案。

速云梯下载

接下来的章节中,我们将展示如何创建完整加载的 AWS DMS 任务、配置 ETL 编排、创建 EventBridge 规则并测试该解决方案。

前提条件

在完成本实例之前,须满足以下条件:

拥有 AWS 账户一个已配置为 AWS DMS 复制源的 SQL Server 数据库一个作为目标数据库的 Redshift 集群一个用于将数据从源迁移到目标的 AWS DMS 复制实例一个指向 SQL Server 数据库的源终端节点一个指向 Redshift 集群的目标终端节点

创建全量加载的 AWS DMS 任务

按照以下步骤设置迁移任务:

在 AWS DMS 控制台中,选择导航窗格中的 数据库迁移任务。选择 创建任务。在 任务标识符 中,输入一个任务名称例如 dmsfulldumptask。选择您的复制实例。选择您的源终端节点。选择您的目标终端节点。在 迁移类型 中,选择 迁移现有数据。

在 表映射 部分下,选择 添加新选择规则。在 架构 中,选择 输入架构。在 架构名称 中,输入一个名称例如,dmssample。保持默认设置,然后选择 创建任务。

以下截屏显示您在 AWS DMS 控制台上完成的任务。

创建 Redshift 表

使用 Redshift 查询编辑器 在 Redshift 集群上创建以下表:

dbodimcust 存储客户属性:

sqlCREATE TABLE dbodimcust ( custkey integer ENCODE az64 custid character varying(10) ENCODE lzo custname character varying(100) ENCODE lzo custcity character varying(50) ENCODE lzo custrevflg character varying(1) ENCODE lzo)DISTSTYLE AUTO

dbofactsales 存储客户销售交易记录:

sqlCREATE TABLE dbofactsales ( ordernumber character varying(20) ENCODE lzo custkey integer ENCODE az64 orderamt numeric(182) ENCODE az64)DISTSTYLE AUTO

dbofactsalesstg 存储每日客户增量销售交易:

sqlCREATE TABLE dbofactsalesstg ( ordernumber character varying(20) ENCODE lzo custid character varying(10) ENCODE lzo orderamt numeric(182) ENCODE az64)DISTSTYLE AUTO

以下插入语句可用于将示例数据加载到销售暂存表中:

sqlINSERT INTO dbofactsalesstg(ordernumber custid orderamt) VALUES (100 1 200)INSERT INTO dbofactsalesstg(ordernumber custid orderamt) VALUES (101 1 300)INSERT INTO dbofactsalesstg(ordernumber custid orderamt) VALUES (102 2 25)INSERT INTO dbofactsalesstg(ordernumber custid orderamt) VALUES (103 2 35)INSERT INTO dbofactsalesstg(ordernumber custid orderamt) VALUES (104 3 80)INSERT INTO dbofactsalesstg(ordernumber custid orderamt) VALUES (105 3 45)

创建存储过程

在 Redshift 查询编辑器中,创建以下存储过程以处理客户和销售交易数据:

sploadcustdim() 该过程将客户维度与暂存中的增量客户数据进行比较,并填充客户维度:

sqlCREATE OR REPLACE PROCEDURE dbosploadcustdim()LANGUAGE plpgsqlAS BEGIN TRUNCATE TABLE dbodimcust INSERT INTO dbodimcust(custkey custid custname custcity) VALUES (1 100 abc chicago) INSERT INTO dbodimcust(custkey custid custname custcity) VALUES (2 101 xyz dallas) INSERT INTO dbodimcust(custkey custid custname custcity) VALUES (3 102 yrt new york) UPDATE dbodimcust SET custrevflg = CASE WHEN custcity = new york THEN Y ELSE N END WHERE custrevflg IS NULLEND

sploadfactsales() 该过程对增量订单数据进行转换,加入日期维度和客户维度,并在最终销售事实表中填充相应的主键:

sqlCREATE OR REPLACE PROCEDURE dbosploadfactsales()LANGUAGE plpgsqlAS BEGIN 处理销售事实表 INSERT INTO dbofactsales SELECT salesfctordernumber custcustkey AS custkey salesfctorderamt FROM dbofactsalesstg salesfct INNER JOIN (SELECT FROM dbodimcust) cust ON salesfctcustid = custcustidEND

创建 Step Functions 状态机

按照以下步骤创建状态机 redshifteltloadcustomersales,该状态机在 AWS DMS 客户表的全量加载任务完成后被调用。

在 Step Functions 控制台中,选择导航窗格中的 状态机。选择 创建状态机。在 模板 中,选择 空白。在 操作 下拉菜单中,选择 导入定义,以导入状态机的工作流定义。

在您喜欢的文本编辑器中保存以下代码为 ASL 文件扩展名例如,redshifteltloadcustomersalesASL。提供您的 Redshift 集群 ID 和 Redshift 集群的秘密 ARN。

json{ Comment State Machine to process ETL for Customer Sales Transactions StartAt LoadCustomerDim States { LoadCustomerDim { Type Task Parameters { ClusterIdentifier redshiftclusterabcd Database dev Sql call dbosploadcustdim() SecretArn arnawssecretsmanageruswest2xxxsecretrsclustersecretabcd } Resource arnawsstatesawssdkredshiftdataexecuteStatement Next Wait on LoadCustomerDim } Wait on LoadCustomerDim { Type Wait Seconds 30 Next CheckStatusLoadCustomerDim } CheckStatusLoadCustomerDim { Type Task Next Choice Parameters { Id Id } Resource arnawsstatesawssdkredshiftdatadescribeStatement } Choice { Type Choice Choices [ { Not { Variable Status StringEquals FINISHED } Next Wait on LoadCustomerDim } ] Default LoadSalesFact } LoadSalesFact { Type Task End true Parameters { ClusterIdentifier redshiftclusterabcdef Database dev Sql call dbosploadfactsales() SecretArn arnawssecretsmanageruswest2xxxsecretrsclustersecretabcd } Resource arnawsstatesawssdkredshiftdataexecuteStatement } }}

选择 选择文件 并上传 ASL 文件以创建一个新状态机。

使用 AWS 数据库迁移服务 (DMS)、AWS Step Functions 和 Redshift

在 状态机名称 中,输入状态机名称例如,redshifteltloadcustomersales。选择 创建。

成功创建状态机后,您可以验证详细信息,如下所示的截屏。

以下图示展示了状态机工作流。

状态机包括以下步骤:

LoadCustomerDim 通过 executestatement API 将存储过程 sploadcustdim 传递到 Redshift 集群,以加载客户维度的增量数据,并将 SQL 语句的标识符传回状态机。WaitonLoadCustomerDim 等待至少 30 秒。CheckStatusLoadCustomerDim 调用 Data API 的 describeStatement 来获取 API 调用状态。isrunLoadCustomerDimcomplete 根据其状态路由下一个 ETL 工作流步骤:FINISHED 将存储过程 LoadSalesFact 传递给 executestatement API,以在 Redshift 集群中运行,从而加载增量数据并填充销售事实表中的主键。其他状态 返回到 waitonloadcustomerdim 步骤,等待 SQL 语句完成。

状态机 redshifteltloadcustomersales 在 EventBridge 规则触发时加载 dimcust、factsalesstg 和 factsales 表。

您还可以选择在状态机完成时设置事件通知,以触发任何下游操作,例如 Amazon Simple Notification ServiceAmazon SNS或其他 ETL 处理。

创建 EventBridge 规则

EventBridge 在全量加载完成时向 Step Functions 状态机发送事件通知。您也可以在 EventBridge 中开启或关闭事件通知。

完成以下步骤以创建 EventBridge 规则:

在 EventBridge 控制台中,选择导航窗格中的 规则。选择 创建规则。在 名称 中,输入一个名称例如,dmstest。可选地,为规则输入描述。在 事件总线 中,选择与此规则关联的事件总线。如果您希望该规则匹配来自您账户的事件,请选择 AWS 默认事件总线。在 规则类型 中,选择 带有事件模式的规则。选择 下一步。在 事件源 中,选择 AWS 事件或 EventBridge 合作伙伴事件。选择 使用模式表单。在 事件源 中,选择 AWS 服务。在 AWS 服务 中,选择 Database Migration Service。在 事件类型 中,选择 所有事件。在 事件模式 中,输入以下 JSON 表达式,以查找 AWS DMS 任务状态为 REPLICATIONTASKSTOPPED 的事件:

json{ source [awsdms] detail { eventId [DMSEVENT0079] eventType [REPLICATIONTASKSTOPPED] detailMessage [Stop Reason FULLLOADONLYFINISHED] type [REPLICATIONTASK] category [StateChange] }}

在 目标类型 中,选择 AWS 服务。在 AWS 服务 中,选择 Step Functions 状态机。在 状态机名称 中,输入 redshifteltloadcustomersales。选择 创建规则。

以下截屏显示为本帖子创建的规则的详细信息。

测试解决方案

运行任务,等待工作量完成。该工作流将完整数据从源数据库移动到 Redshift 集群。

以下截屏显示客户表全量加载的载入统计信息。

AWS DMS 提供在发生 AWS DMS 事件时的通知,例如全量加载完成或复制任务停止的情况。

全量加载完成后,AWS DMS 将事件发送到您账户的默认 事件总线。以下截屏示例展示了使用您创建的规则调用目标 Step Functions 状态机的情况。

![调用状态机](https//d2908q01vomqb2cloudfrontnet/b6692ea5df920cad691c20319a6fffd7a4a766b