Como utilizar AWS DMS para CDC no seu Data Lake
Introdução
Neste artigo será relatado alguns dos desafios e lições aprendidas ao se utilizar o AWS DMS para replicação de bancos de dados relacionais para o S3, e mantendo um change data capture (CDC) ligado. O nosso desafio era disponibilizar os dados para os analistas, cientistas de dados e todos os demais interessados, de forma que eles passassem utilizar esta plataforma em vez das fontes de dado transacionais em instâncias de ready only.
Para subir a infraestrutura necessária foi utilizado os seguintes recursos:
- Github Actions
- AWS CloudFormation
AWS DMS
AWS database migration service (DMS) é um serviço de migração de dados a partir de diversos tipos de bancos de dados para a cloud. É possível utilizar deste serviço para fazer uma carga completa de migração do status atual do seu banco de dados para o destino e depois manter a sua fonte e destino sincronizadas com tarefas de on going replication ligadas.
Com o DMS é possível rapidamente configurar uma instância de migração e os endpoints de origem e destino, tendo a infraestrutura pronta para as tarefas de migração. Nestas serão configuradas as tabelas que serão migradas, qual o tipo de migração que será feito, entre outros detalhes. A seguir uma imagem da arquitetura necessária para ficar mais claro.
Para o nosso caso utilizamos deste serviço para migrar os bancos de dados relacionais para o S3, configurando dois tipos de tarefas de migração para cada banco:
- tarefa de full load: carrega os dados atuais do banco no s3 e só é ativa quando são criadas novas tabelas/colunas que estão dentro da regra,
- tarefa de on going replication: faz o CDC entre a fonte e o destino e está sempre ligada
Pré-requisitos para o DMS
Para se utilizar esse serviço da AWS da melhor forma possível, algumas configurações e recursos são necessários:
- A instância de replicação tem que estar dentro da VPC para garantir um endereço de IP privado.
- A instância de replicação precisa ter um subnet group com pelo menos duas subnets em diferentes availability zones.
- É necessário ter a logical replication ativada no banco de dados de origem (para mais informações).
- O endpoint de destino precisa de uma role contendo policies que tenham permissão de escrever, ler e deletar arquivos no s3.
- O parâmetro HeartbeatEnable deve estar setado como true no endpoint de origem para evitar problemas com armazenamento cheio no banco de dados de origem.
Código da Infraestrutura
Utilizando o CloudFormation podemos criar uma stack contendo todos os recursos ilustrados na figura acima, além de todas as roles e demais recursos necessários. O código completo está no github neste link.
Parâmetros
Para que o código possa ser utilizado de forma mais genérica para vários casos de uso, vamos precisar de alguns parâmetros.
Parameters:
VPC:
Description: "VPC where to setup DMS instance."
Type: "AWS::EC2::VPC::Id"
PrivateSubnetOne:
Description: "Private subnet 1 id preconfigurated in the account."
Type: String
PrivateSubnetTwo:
Description: "Private subnet 2 id preconfigurated in the account."
Type: String
SecurityGroupIds:
Description: "Security Group IDs for DMS subnet group."
Type: List<AWS::EC2::SecurityGroup::Id>
ReplicationInstanceAllocatedStorage:
Description: "The amount of storage (in gigabytes) to be initially allocated for the replication instance."
Type: Number
Default: 100
ReplicationInstanceClass:
Description: "The compute and memory capacity of the replication instance as specified by the replication instance class."
Type: String
Default: dms.r5.large
AllowedValues:
- dms.t3.micro
- dms.t3.small
- dms.t3.medium
- dms.t3.large
- dms.c5.large
- dms.c5.xlarge
- dms.c5.2xlarge
- dms.c5.4xlarge
- dms.c5.9xlarge
- dms.c5.12xlarge
- dms.c5.18xlarge
- dms.c5.24xlarge
- dms.r5.large
- dms.r5.xlarge
- dms.r5.2xlarge
- dms.r5.4xlarge
- dms.r5.8xlarge
- dms.r5.12xlarge
- dms.r5.16xlarge
- dms.r5.24xlarge
SourceEndpointName:
Description: "Source endpoint name."
Type: String
SourceSecretsManager:
Description: "DB secrets manager to get credentials."
Type: String
SourceTableMapping:
Description: "Json table mapping for source database."
Type: String
DstBucketArn:
Description: "Destination S3 bucket arn."
Type: String
DstBucketName:
Description: "Destination S3 bucket name."
Type: String
DstBucketFolder:
Description: "Destination S3 folder in which the data is to written on."
Type: String
Endpoints
Os endpoints de origem necessitam das credenciais de acesso ao banco de origem com permissão de escrita, portanto pode ser interessante fazer uso do secrets manager para manter essa credencial segura, sem expô-la diretamente no código. Para o endpoint de destilo precisaremos também criar a role que dá as devidas permissões no S3.
DmsSrcEndpoint:
Type: AWS::DMS::Endpoint
Properties:
EndpointIdentifier: !Ref SourceEndpointName
DatabaseName: !Sub '{{resolve:secretsmanager:${SourceSecretsManager}:SecretString:dbname}}'
EndpointType: "source"
EngineName: "postgres"
ServerName: !Sub '{{resolve:secretsmanager:${SourceSecretsManager}:SecretString:host}}'
Port: !Sub '{{resolve:secretsmanager:${SourceSecretsManager}:SecretString:port}}'
Username: !Sub '{{resolve:secretsmanager:${SourceSecretsManager}:SecretString:username}}'
Password: !Sub '{{resolve:secretsmanager:${SourceSecretsManager}:SecretString:password}}'
PostgreSqlSettings:
HeartbeatEnable: true
SslMode: require
S3IAMRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
-
Effect: "Allow"
Principal:
Service:
- "dms.amazonaws.com"
Action:
- "sts:AssumeRole"
Path: "/"
Policies:
-
PolicyName: DMSPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
-
Effect: Allow
Action:
- s3:PutObject
- s3:DeleteObject
- s3:ListBucket
Resource:
- !Ref DstBucketArn
- !Sub "${DstBucketArn}/*"
DmsEndpointTarget:
Type: AWS::DMS::Endpoint
Properties:
EndpointType: "target"
EngineName: "S3"
S3Settings:
BucketName: !Ref DstBucketName
BucketFolder: !Ref DstBucketFolder
ServiceAccessRoleArn: !GetAtt S3IAMRole.Arn
DataFormat: "parquet"
DatePartitionEnabled: false
IncludeOpForFullLoad: true
TimestampColumnName: "TIMESTAMP"
Para o endpoint de destino precisamos setar algumas configurações, sendo elas:
- O nome do bucket onde os dados serão jogados (BucketName)
- O nome da pasta dentro do bucket onde os dados serão jogados (BucketFolder)
- O formato que os dados serão jogados (DataFormat)
- Se os dados serão ou não particionados (DatePartitionEnabled)
- Incluir a coluna de Op no full load, cuja será usada para o tratamento dos dados para próxima camada (IncludeOpForFullLoad)
- O nome da coluna que informa o datetime que a alteração foi feita no banco de origem (TimestampColumnName)
Replication Instance
Agora para setar a infraestrutura que fará uso destes endpoints para realizar a migração, precisamos primeiramente setar a máquina que realizará isso: nossa replication instance. Para que ela exista é necessário ter o subnet group também.
ReplicationInstanceSubnetGroup:
Type: AWS::DMS::ReplicationSubnetGroup
Properties:
ReplicationSubnetGroupDescription: !Sub '${AWS::StackName} DMS Subnet Group'
SubnetIds:
- !Ref PrivateSubnetOne
- !Ref PrivateSubnetTwo
ReplicationInstance:
Type: AWS::DMS::ReplicationInstance
Properties:
AllocatedStorage: !Ref ReplicationInstanceAllocatedStorage
AllowMajorVersionUpgrade: false
AutoMinorVersionUpgrade: false
MultiAZ: false
PubliclyAccessible: false
ReplicationInstanceClass: !Sub '${ReplicationInstanceClass}'
ReplicationSubnetGroupIdentifier: !Ref ReplicationInstanceSubnetGroup
VpcSecurityGroupIds: !Ref SecurityGroupIds
Replications Tasks
Tendo a máquina e os endpoints, podemos ter agora as tarefas de migração, sendo uma para full-load e outra para cdc.
DmsFullLoadReplicationTask:
Type: AWS::DMS::ReplicationTask
Properties:
MigrationType: full-load
ReplicationInstanceArn: !Ref ReplicationInstance
ReplicationTaskSettings: '{
"TargetMetadata": {
"TargetSchema": "",
"SupportLobs": true,
"FullLobMode": false,
"LobChunkSize": 0,
"LimitedSizeLobMode": true,
"LobMaxSize": 4096,
"InlineLobMaxSize": 0,
"LoadMaxFileSize": 0,
"ParallelLoadThreads": 0,
"ParallelLoadBufferSize": 0,
"BatchApplyEnabled": false,
"TaskRecoveryTableEnabled": false,
"ParallelLoadQueuesPerThread": 0,
"ParallelApplyThreads": 0,
"ParallelApplyBufferSize": 0,
"ParallelApplyQueuesPerThread": 0
},
"FullLoadSettings": {
"TargetTablePrepMode": "DROP_AND_CREATE"
},
"Logging": {
"EnableLogging": true,
"LogComponents": [
{
"Id": "FILE_FACTORY",
"Severity": "LOGGER_SEVERITY_DEFAULT"
},{
"Id": "METADATA_MANAGER",
"Severity": "LOGGER_SEVERITY_DEFAULT"
},{
"Id": "SORTER",
"Severity": "LOGGER_SEVERITY_DEFAULT"
},{
"Id": "SOURCE_CAPTURE",
"Severity": "LOGGER_SEVERITY_DEFAULT"
},{
"Id": "SOURCE_UNLOAD",
"Severity": "LOGGER_SEVERITY_DEFAULT"
},{
"Id": "TABLES_MANAGER",
"Severity": "LOGGER_SEVERITY_DEFAULT"
},{
"Id": "TARGET_APPLY",
"Severity": "LOGGER_SEVERITY_DEFAULT"
},{
"Id": "TARGET_LOAD",
"Severity": "LOGGER_SEVERITY_INFO"
},{
"Id": "TASK_MANAGER",
"Severity": "LOGGER_SEVERITY_DEBUG"
},{
"Id": "TRANSFORMATION",
"Severity": "LOGGER_SEVERITY_DEBUG"
}
],
"CloudWatchLogGroup": null,
"CloudWatchLogStream": null
},
"ChangeProcessingDdlHandlingPolicy": {
"HandleSourceTableDropped": false,
"HandleSourceTableTruncated": false,
"HandleSourceTableAltered": true
}
}'
TableMappings: !Ref SourceTableMapping
SourceEndpointArn: !Ref DmsSrcEndpoint
TargetEndpointArn: !Ref DmsEndpointTarget
DmsCdcReplicationTask:
Type: AWS::DMS::ReplicationTask
Properties:
MigrationType: cdc
ReplicationInstanceArn: !Ref ReplicationInstance
ReplicationTaskSettings: '{
"TargetMetadata": {
"TargetSchema": "",
"SupportLobs": true,
"FullLobMode": false,
"LobChunkSize": 0,
"LimitedSizeLobMode": true,
"LobMaxSize": 4096,
"InlineLobMaxSize": 0,
"LoadMaxFileSize": 0,
"ParallelLoadThreads": 0,
"ParallelLoadBufferSize": 0,
"BatchApplyEnabled": false,
"TaskRecoveryTableEnabled": false,
"ParallelLoadQueuesPerThread": 0,
"ParallelApplyThreads": 0,
"ParallelApplyBufferSize": 0,
"ParallelApplyQueuesPerThread": 0
},
"Logging": {
"EnableLogging": true,
"LogComponents": [
{
"Id": "FILE_FACTORY",
"Severity": "LOGGER_SEVERITY_DEFAULT"
},{
"Id": "METADATA_MANAGER",
"Severity": "LOGGER_SEVERITY_DEFAULT"
},{
"Id": "SORTER",
"Severity": "LOGGER_SEVERITY_DEFAULT"
},{
"Id": "SOURCE_CAPTURE",
"Severity": "LOGGER_SEVERITY_DEFAULT"
},{
"Id": "SOURCE_UNLOAD",
"Severity": "LOGGER_SEVERITY_DEFAULT"
},{
"Id": "TABLES_MANAGER",
"Severity": "LOGGER_SEVERITY_DEFAULT"
},{
"Id": "TARGET_APPLY",
"Severity": "LOGGER_SEVERITY_DEFAULT"
},{
"Id": "TARGET_LOAD",
"Severity": "LOGGER_SEVERITY_INFO"
},{
"Id": "TASK_MANAGER",
"Severity": "LOGGER_SEVERITY_DEBUG"
},{
"Id": "TRANSFORMATION",
"Severity": "LOGGER_SEVERITY_DEBUG"
}
],
"CloudWatchLogGroup": null,
"CloudWatchLogStream": null
},
"ChangeProcessingDdlHandlingPolicy": {
"HandleSourceTableDropped": false,
"HandleSourceTableTruncated": false,
"HandleSourceTableAltered": true
}
}'
TableMappings: !Ref SourceTableMapping
SourceEndpointArn: !Ref DmsSrcEndpoint
TargetEndpointArn: !Ref DmsEndpointTarget
Código do Github Actions
Podemos automatizar o deploy da stack com o Github Actions. Para isso precisamos de alguns secrets configurados, incluindo uma credencial da AWS (ACCESS_KEY, SECRET_KEY), uma role na AWS para o pipeline (PIPELINE_ROLE_ARN) e uma role na AWS com as permissões para o CloudFormation (CLOUDFORMATION_ROLE_ARN).
Segue o código para automatizar o deploy da stack criada:
name: dms-migration-pipeline
on:
pull_request:
branches:
- main
paths:
- 'template/dms*'
- '.github/workflows/dms-db-migration.yml'
jobs:
deploy-stage:
env:
PIPELINE_ROLE_ARN: ${{ secrets.PIPELINE_ROLE_ARN }}
CLOUDFORMATION_ROLE_ARN: ${{ secrets.CLOUDFORMATION_ROLE_ARN }}
ACCOUNT_REGION: us-east-1
ACCOUNT_NUMBER: ${{ secrets.ACCOUNT_NUMBER }}
USER_ACCESS_KEY_ID: ${{ secrets.ACCESS_KEY }}
USER_SECRET_KEY: ${{ secrets.SECRET_KEY }}
ARTIFACTS_BUCKET: cf-artifacts
VPC_ID: ${{ secrets.VPC }}
PRIV_SUBNET_ONE: ${{ secrets.PRIV_SUBNET_ONE }}
PRIV_SUBNET_TWO: ${{ secrets.PRIV_SUBNET_TWO }}
SECURITY_GROUP: ${{ secrets.SECURITY_GROUP }}
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
with:
submodules: true
- uses: actions/setup-python@v2
- uses: aws-actions/setup-sam@v2
with:
version: 1.59.0
- name: Assume user role
uses: aws-actions/configure-aws-credentials@v1
with:
aws-access-key-id: ${{ env.USER_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ env.USER_SECRET_KEY }}
aws-region: ${{ env.ACCOUNT_REGION }}
role-to-assume: ${{ env.PIPELINE_ROLE_ARN }}
role-session-name: deploy-db-migration
role-duration-seconds: 3600
role-skip-session-tagging: true
- name: Db Migration Deployment
run: |
export SOURCE_TABLE_MAPPING=$(cat "./table-mapping/migration/general.json" | jq -R '.')
sam deploy --stack-name db-migration \
--template template/dms_migration.yml \
--capabilities CAPABILITY_NAMED_IAM \
--region ${ACCOUNT_REGION} \
--parameter-overrides \
VPC=${VPC_ID} \
PrivateSubnetOne=${PRIV_SUBNET_ONE} \
PrivateSubnetTwo=${PRIV_SUBNET_TWO} \
SecurityGroupIds=${SECURITY_GROUP} \
SourceEndpointName=postgres-source-db \
SourceSecretsManager=source-db-credential \
SourceTableMapping=$(echo ${SOURCE_TABLE_MAPPING}) \
DstBucketName=landinglayer-${ACCOUNT_REGION}-${ACCOUNT_NUMBER} \
DstBucketArn=arn:aws:s3:::landinglayer-${ACCOUNT_REGION}-${ACCOUNT_NUMBER} \
DstBucketFolder=db-name \
--s3-bucket ${ARTIFACTS_BUCKET} \
--no-fail-on-empty-changeset \
--role-arn ${CLOUDFORMATION_ROLE_ARN}
Esse workflow é ativado toda vez que:
- é feito um pull request para a branch main
- os arquivos do template ou do workflows são modificados.
Lembrando que para se ter o deploy automatizado com esse arquivo de workflow é necessário ter actions ativas no repositório.
Conclusão
Com isso temos o que é necessário para criar uma stack de migração e automatizar o deploy da mesma na AWS. Espero que seja se ajuda pra quem está buscando a melhor forma de ter CDC no Data Lake.