TrackIt – Cloud Consulting & S/W Development
  • Home
  • Solutions
    • M&EMedia & Entertainment
    • HPCHigh Performance Computing
    • DevOpsDevOps, AWS Cloud Consulting
    • StorageData Storage
    • S/W DevSoftware Development; Serverless, CI/CD, UI/UX, API Integration
    • AWS Programs
      • AWS Well-Architected Framework
      • Media2Cloud Video AI/ML
      • Chime SDK
      • DevOps Guru
      • API Gateway
  • Services
  • About Us
  • Resources
    • Blogs
    • White Papers
    • Case Studies
    • Press Releases
  • TrackIt App
    • Sign In

TrackIt Blogs

Read the latest cloud technology news and learn new tips and tricks to get the most out of your cloud.

  • Contact Us

Categories

Automation AWS data rooms Data Storage DevOps DevTools Monitoring Optimization Services TrackIt

Tags

ai ami API Aurora autoscaling aws AWS EC2 AWS S3 chargify Ci Cd Pipeline cloud Cloud Computing Cognito consul cost optimization Data Storage dedicated DevOps docker Dynamodb ec2 ESXi github hashicorp high availability IP json Marketing Media nomad open source packer pfSense Postgres public cloud reduce cost Serverless Solution Architect as a Service terraform trackit tutorial Video VM vmware windows

Apr. 02, 2020

Thibaut Cornolti —

ETL pipeline from AWS DynamoDB to Aurora PostgreSQL

TAGS : Aurora, aws, Dynamodb, Postgres

Deriving The Benefits of Both NoSQL and SQL Databases

In a world where an ever-increasing amount of data is being gathered, companies often find themselves without the tools to optimally use the often unstructured data they’ve gathered. To get a better idea of what such scenarios typically look like, let us consider the example of Company X. Company X has a NoSQL database that it favors using since it serves as an easy and effective way to ingest high volumes of data. This works well for a while; the data is coming in “the easy way” and things seem good. After the fact, however, Company X runs into a genuine challenge: the consumers of data in the company start to realize the need to make structured and relational queries with the gathered data. The problem? They aren’t using a relational database. The data in the NoSQL database is unstructured and not suitable for structured and relational queries. So what can Company X do?

How can Company X manage to salvage the unstructured data it has gathered and still make use of it in a structured way?

The solution to Company X’s predicament is an ETL pipeline — a pipeline that automatically Extracts data from their NoSQL database, Transforms it into a specific format suitable for exploitation and Loads it into a destination relational database. Nothing need change in terms of how the data is ingested. Company X still has all its processes ingesting data “the easy way” into the NoSQL database. But with an effective ETL pipeline in place, this unstructured data is also correctly loaded into an SQL database which Company X can then exploit to make structured and relational queries. As a result, Company X gets to derive the benefits of both types of databases: the enhanced data ingestion capabilities of the NoSQL database along with the ability to make relational queries with the relational database.

AWS Services That Allow You To Clone and Migrate Data

AWS already offers specific services such as AWS Data Pipeline that can help you to clone and migrate databases. However, these services do not provide you with the ability to extract, transform, and load data between databases in real-time, and that’s where an ETL pipeline comes in.

Bridging The Gap

Companies are progressively finding themselves in scenarios resembling that of Company X due to the increased use of NoSQL databases driven by an appreciation of their data ingestion capabilities. Historically, of course, others may have started off with an SQL database in order to be able to query their data while unfortunately minimizing the usefulness of unstructured data. Regardless of what type of database a company has started with, the problem that could arise in both scenarios is one and the same: the need to move from one type of database to another. (In other words, the challenge is either moving from unstructured data to structured data or vice versa.) ETL pipelines help bridge this gap.

What is ETL?

ETL stands for Extract, Transform, and Load. As the name implies, an ETL pipeline refers to a set of processes that:

  1. Extract data from a source database,
  2. Transform the data, and
  3. Load the transformed data into a destination database.

High-Level ETL Schema

The ETL Pipeline Architecture

In the pipeline illustrated below, DynamoDB (a NoSQL database) is the source database and Aurora (PostgreSQL database) is the destination.

ETL Pipeline Architecture Schema

How The Pipeline Works

  1. DynamoDB Streams captures a time-ordered sequence of item-level modifications in the database. The Stream Processor Lambda function is used to catch the data from DynamoDB.
    (See Stream Processor Code Snippet)
  2. The data is then sent to the Kinesis Firehose Lambda transformation function where the document type is transformed from JSON to CSV (compatible with Aurora). Kinesis Firehose, in this example, has a buffer interval of 60 seconds during which the data is transformed and aggregated into a single file. As a result, each CSV file corresponds to a 60-second stream of data.
    (See Firehose Processor Code Snippet)
  3. Records are pushed into their corresponding S3 buckets (transformed or untransformed). The untransformed records S3 bucket can be linked to Amazon SNS (Simple Notification Service) to notify of a transformation failure in the pipeline.
  4. Once the transformed records are pushed into the corresponding S3 bucket, the bucket sends an Event Notification to Amazon SQS (Simple Queue Service), which will trigger the Lambda function.
  5. The triggered Lambda function sends the file name to Aurora requesting it to ingest the new file.
    (See Aurora Processor Code Snippet)
  6. Aurora securely accesses the S3 bucket through S3 Endpoint. Since Aurora is inside a private subnet it doesn’t have direct access to the internet.

Stream Processor Code Snippet

'use strict';

const AWS = require('aws-sdk');
const sts = new AWS.STS();

exports.handler = (event, context, callback) => {
    const firehose = new AWS.Firehose({
        region: process.env.region,
    });
    const records = event.Records;
    if (records.length) {
        firehose.putRecordBatch(
            {
                DeliveryStreamName: process.env.stream_name,
                Records: records.map(record =>  ({ Data: JSON.stringify({
                    data: record.dynamodb.NewImage || record.dynamodb.OldImage,
                    eventName: record.eventName
                })})),
            },
            (err, data) => {
                if (err) {
                    console.log(err, err.stack);
                    callback(err);
                } else {
                    console.log(data);
                    callback(null, 'Successfully processed records.');
                }
            }
        );
    } else callback(null, 'Nothing to process.');
};

Firehose Processor Code Snippet

'use strict';

const AWS = require('aws-sdk');
const { parse } = require('json2csv');
const uuid = require('uuid');

const decode = (b) => (new Buffer(b, 'base64')).toString('utf8');
const encode = (s) => (new Buffer(s, 'utf8')).toString('base64');

const formatDate = (date) => {
    if (date === null) return null;
    const parsedDate = new Date(date);
    return (isNaN(parsedDate) ? null : parsedDate.getFullYear() + "-" + (parsedDate.getMonth() + 1) + "-" + parsedDate.getDate() + " " + parsedDate.getHours() + ":" + parsedDate.getMinutes() + ":" + parsedDate.getSeconds() + "." + parsedDate.getMilliseconds());
}

const fields = [
    {
        label: "id",
        value: () => uuid.v4()
    },
    {
        label: "creation_date",
        value: () => formatDate(new Date())
    },
    {
        label: "marker",
        value: (row) => JSON.stringify(row['__eventName'])
    },
    "first_name",
    "last_name",
    "phone",
    {
        label: "$created_at",
        value: (row) => formatDate(row['$created_at'])
    },
    "$created_by",
    {
        label: "$updated_at",
        value: (row) => formatDate(row['$updated_at'])
    },
    "$updated_by",
    "$request_id",
    "$is_deleted",
    {
        label: "$deleted_at",
        value: (row) => formatDate(row['$deleted_at'])
    },
];

const opts = {
    fields,
    header: false,
};

exports.handler = (event, context, callback) => {
    const output = event.records.map((record) => {
        const entry = JSON.parse(decode(record.data));
        var rec = AWS.DynamoDB.Converter.unmarshall(entry.data);
        var eventName = entry.eventName;
        rec["__eventName"] = eventName;
        const csv = parse(rec, opts);
        return {
            recordId: record.recordId,
            result: 'Ok',
            data: encode(csv + '\n'),
        };
    });
    callback(null, { records: output });
};

 

Aurora Processor Code Snippet

import logging
import json
import pg8000
import boto3
import sys
import os

# csv header
header = [
    "id",
    "creation_date",
    "marker",
    "first_name",
    "last_name",
    "phone",
    "_created_at",
    "_created_by",
    "_updated_at",
    "_updated_by",
    "_request_id",
    "_is_deleted",
    "_deleted_at"
]

# rds settings
db_endpoint = os.environ['db_endpoint']
db_port = int(os.environ['db_port'])
db_name = os.environ['db_name']
db_table = os.environ['db_table']
db_user = os.environ['db_user']
db_password = os.environ['db_password']

# logger settings
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# s3 settings
s3_client = boto3.client('s3')


def ingest(conn, record):
    bucket = record['s3']['bucket']['name']
    key = record['s3']['object']['key']
    region = record['awsRegion']
    psql = "SELECT aws_s3.table_import_from_s3('%s', '%s', '(format csv)', aws_commons.create_s3_uri('%s', '%s', '%s'))" % (
        db_table, ','.join(header), bucket, key, region
    )
    logger.info(psql)
    try:
        with conn.cursor() as cur:
            cur.execute(psql)
            logger.info('Data loaded from S3 into Aurora')
    except pg8000.IntegrityError as err:
        if err.args[0]['C'] == '23505':
            logger.info('File already loaded (duplicate key)')
        else:
            raise


def handler(event, context):
    try:
        conn = pg8000.connect(
            host=db_endpoint,
            port=db_port,
            user=db_user,
            password=db_password,
            database=db_name,
            timeout=900
        )
        conn.autocommit = True
    except Exception as e:
        logger.error(
            "ERROR: Unexpected error: Could not connect to Aurora instance."
        )
        logger.error(e)
        sys.exit()
    logger.info("SUCCESS: Connection to RDS Aurora instance succeeded")
    for record in event['Records']:
        print(record)
        if 'body' in record:        # The event comes from SQS
            body = json.loads(record['body'])
            if not('Event' in body) or body['Event'] != 's3:TestEvent':
                for rec in body['Records']:
                    ingest(conn, rec)
        else:                       # The event comes from S3
            ingest(conn, record)
    conn.close()
    logger.info("SUCCESS: Connection to RDS Aurora instance successfully closed")

 

Data propagation

The question of how updates are propagated from the source database to the destination database is not necessarily a simple one to answer. How data propagates from one datastore to another varies from case to case. However, the models for the propagation of updates can generally be categorized into two main types: Non-destructive and destructive.

Non-destructive updates

Non Destructive Updates Schema

In this scenario, when you change or delete an item in the source database, a notification will be stored in the destination database to register the change or deletion of this item. When an item is deleted, the last information for this item is still stored in the database — providing a trace of the information that existed before deletion.

Pros and Cons: The obvious benefit of this model is the ability to track changes and access previous versions of data. However, the tracking of multiple versions requires additional storage space and might oblige you to make specific queries in order to access entries.

Destructive updates

Destructive Updates Schema

An example of this scenario would be a mirror database where all the changes that happen within your source database will be propagated into the destination database (you will not, however, be able to track the changes in data) but instead, your destination database will only store the exact equivalent of what you have in the source database. In this scenario, any update to an item in the source database will force an update to the corresponding item in the destination database. For example: With a mirror database model, if you change the name of a user in DynamoDB (source database), you won’t save a new entry in Aurora (destination database) to register this change. Instead, the corresponding user name will be directly updated in Aurora.

Pros and Cons: The primary appeal of this model would be the minimization of storage space. This model gives you the luxury of not having to handle modifications. You won’t have to make specific queries to get only the latest version for each entry because you will always have the latest version that’s automatically updated. However, this model does not allow you to revert back to previous versions.

Lambda functions for Quality control

ETL Pipeline Architecture Schema With Quality Control

Two Lambda functions help evaluate and control the quality of the ETL pipeline. The first Lambda sends a query to DynamoDB to get the number of entries in the source database. The second Lambda sends a query to Aurora to get the number of entries in the destination database. A notification is sent to the user to flag any difference between the two numbers.

TrackIt and ETL Pipelines

Deploying ETL pipelines can be a tricky business. Successfully designing and implementing an ETL process can be prone to mistakes like underestimating the transformation requirements, incorrectly mapping and matching data, poor performance (both in the extraction process and particularly after the fact in the target database or data warehouse), and an inability to scale. If you’re looking for assistance and specialized expertise in implementing your ETL pipeline TrackIt is here to assist you.

The ETL pipeline showcased in this article was implemented by TrackIt for a client company in the financial industry that was initially using a NoSQL database to ingest transactional data. The unstructured data from transactions was being brought in ‘the easy way’ but could not be queried and as a result, was not serving any purpose. Having sought TrackIt’s assistance in building the solution described above, the company now has an ETL pipeline functioning in real-time that allows it to derive the benefits of both NoSQL and SQL databases.

About TrackIt

TrackIt is an Amazon Web Services Advanced Consulting Partner specializing in cloud management, consulting, and software development solutions based in Venice, CA.

TrackIt specializes in Modern Software Development, DevOps, Infrastructure-As-Code, Serverless, CI/CD, and Containerization with specialized expertise in Media & Entertainment workflows, High-Performance Computing environments, and data storage.

TrackIt’s forté is cutting-edge software design with deep expertise in containerization, serverless architectures, and innovative pipeline development. The TrackIt team can help you architect, design, build and deploy a customized solution tailored to your exact requirements.

In addition to providing cloud management, consulting, and modern software development services, TrackIt also provides an open-source AWS cost management tool that allows users to optimize their costs and resources on AWS.

Previous Next

9 Replies to “ETL pipeline from AWS DynamoDB to Aurora PostgreSQL”

  1. JlloInpum says:
    January 8, 2021 at 8:06 pm

    pharmacy rx one reviews erectile dysfunction drug pharm

    Reply
  2. Jtmfcroft says:
    January 8, 2021 at 9:52 pm

    ed pills online online pharmacies pharmacy near me

    Reply
  3. Aqwscroft says:
    January 9, 2021 at 6:56 am

    best drugstore moisturizer cheap rx the pharmacy

    Reply
  4. Khthtype says:
    January 9, 2021 at 10:43 pm

    24 hour drug store near me pharmacy discount card drugstore beetle

    Reply
  5. FgvdDiash says:
    January 12, 2021 at 6:08 am

    canadian pharmacy best drugstore face moisturizer best drugstore lipstick

    Reply
  6. Lokucroft says:
    January 12, 2021 at 8:54 am

    canadian pharmacies online erectile dysfunction drug pharmacy rx world

    Reply
  7. 우리카지노 says:
    January 21, 2021 at 12:17 pm

    We’re a gaggle of volunteers and starting a new scheme
    in our community. Your site offered us with useful info to work on. You’ve done a formidable job and our entire
    group can be thankful to you.

    Reply
  8. AnthonySeimb says:
    January 24, 2021 at 7:51 pm

    http://t.me/s/alcohol_yakutsk

    Reply
  9. Tommyhiz says:
    January 24, 2021 at 11:14 pm

    Прогон хрумером сайта

    Reply

Leave a Reply Cancel Reply

Your email address will not be published. Required fields are marked *

*
*

AWS Advanced Consulting Partner - Cloud Management, Consulting, and Software Development Solutions

Quick links

    • Home
    • Blog
    • About Us
    • Services
    • Trackit App

App

    • Log in to TrackIt
    • How to get started

get in touch

    • Phone: +1 323-230-0042
    • Email: info@trackit.io
  • @TrackIt 2020