Deriving The Benefits of Both NoSQL and SQL Databases

In a world where an ever-increasing amount of data is being gathered, companies often find themselves without the tools to optimally use the often unstructured data they’ve gathered. To get a better idea of what such scenarios typically look like, let us consider the example of Company X. Company X has a NoSQL database that it favors using since it serves as an easy and effective way to ingest high volumes of data. This works well for a while; the data is coming in “the easy way” and things seem good. After the fact, however, Company X runs into a genuine challenge: the consumers of data in the company start to realize the need to make structured and relational queries with the gathered data. The problem? They aren’t using a relational database. The data in the NoSQL database is unstructured and not suitable for structured and relational queries. So what can Company X do?

How can Company X manage to salvage the unstructured data it has gathered and still make use of it in a structured way?

The solution to Company X’s predicament is an ETL pipeline — a pipeline that automatically Extracts data from their NoSQL database, Transforms it into a specific format suitable for exploitation and Loads it into a destination relational database. Nothing need change in terms of how the data is ingested. Company X still has all its processes ingesting data “the easy way” into the NoSQL database. But with an effective ETL pipeline in place, this unstructured data is also correctly loaded into an SQL database which Company X can then exploit to make structured and relational queries. As a result, Company X gets to derive the benefits of both types of databases: the enhanced data ingestion capabilities of the NoSQL database along with the ability to make relational queries with the relational database.

AWS Services That Allow You To Clone and Migrate Data

AWS already offers specific services such as AWS Data Pipeline that can help you to clone and migrate databases. However, these services do not provide you with the ability to extract, transform, and load data between databases in real-time, and that’s where an ETL pipeline comes in.

Bridging The Gap

Companies are progressively finding themselves in scenarios resembling that of Company X due to the increased use of NoSQL databases driven by an appreciation of their data ingestion capabilities. Historically, of course, others may have started off with an SQL database in order to be able to query their data while unfortunately minimizing the usefulness of unstructured data. Regardless of what type of database a company has started with, the problem that could arise in both scenarios is one and the same: the need to move from one type of database to another. (In other words, the challenge is either moving from unstructured data to structured data or vice versa.) ETL pipelines help bridge this gap.

What is ETL?

ETL stands for Extract, Transform, and Load. As the name implies, an ETL pipeline refers to a set of processes that:

  1. Extract data from a source database,
  2. Transform the data, and
  3. Load the transformed data into a destination database.

0*HuKAEbkMaVRLJY5R

High-Level ETL Schema

A DynamoDB to Aurora ETL Pipeline Architecture

In the pipeline illustrated below, we demonstrate an architecture for a DynamoDB to Aurora ETL. DynamoDB is our source NoSQL database, and the destination is Aurora, a PostgreSQL database.

ETL Pipeline Architecture Schema

How The Pipeline Works

  1. DynamoDB Streams captures a time-ordered sequence of item-level modifications in the database. The Stream Processor Lambda function is used to catch the data from DynamoDB.
    (See Stream Processor Code Snippet)
  2. The data is then sent to the Kinesis Firehose Lambda transformation function where the document type is transformed from JSON to CSV (compatible with Aurora). Kinesis Firehose, in this example, has a buffer interval of 60 seconds during which the data is transformed and aggregated into a single file. As a result, each CSV file corresponds to a 60-second stream of data.
    (See Firehose Processor Code Snippet)
  3. Records are pushed into their corresponding S3 buckets (transformed or untransformed). The untransformed records S3 bucket can be linked to Amazon SNS (Simple Notification Service) to notify of a transformation failure in the pipeline.
  4. Once the transformed records are pushed into the corresponding S3 bucket, the bucket sends an Event Notification to Amazon SQS (Simple Queue Service), which will trigger the Lambda function.
  5. The triggered Lambda function sends the file name to Aurora requesting it to ingest the new file.
    (See Aurora Processor Code Snippet)
  6. Aurora securely accesses the S3 bucket through S3 Endpoint. Since Aurora is inside a private subnet it doesn’t have direct access to the internet.

Stream Processor Code Snippet

'use strict';

const AWS = require('aws-sdk');
const sts = new AWS.STS();

exports.handler = (event, context, callback) => {
    const firehose = new AWS.Firehose({
        region: process.env.region,
    });
    const records = event.Records;
    if (records.length) {
        firehose.putRecordBatch(
            {
                DeliveryStreamName: process.env.stream_name,
                Records: records.map(record =>  ({ Data: JSON.stringify({
                    data: record.dynamodb.NewImage || record.dynamodb.OldImage,
                    eventName: record.eventName
                })})),
            },
            (err, data) => {
                if (err) {
                    console.log(err, err.stack);
                    callback(err);
                } else {
                    console.log(data);
                    callback(null, 'Successfully processed records.');
                }
            }
        );
    } else callback(null, 'Nothing to process.');
};

Firehose Processor Code Snippet

'use strict';

const AWS = require('aws-sdk');
const { parse } = require('json2csv');
const uuid = require('uuid');

const decode = (b) => (new Buffer(b, 'base64')).toString('utf8');
const encode = (s) => (new Buffer(s, 'utf8')).toString('base64');

const formatDate = (date) => {
    if (date === null) return null;
    const parsedDate = new Date(date);
    return (isNaN(parsedDate) ? null : parsedDate.getFullYear() + "-" + (parsedDate.getMonth() + 1) + "-" + parsedDate.getDate() + " " + parsedDate.getHours() + ":" + parsedDate.getMinutes() + ":" + parsedDate.getSeconds() + "." + parsedDate.getMilliseconds());
}

const fields = [
    {
        label: "id",
        value: () => uuid.v4()
    },
    {
        label: "creation_date",
        value: () => formatDate(new Date())
    },
    {
        label: "marker",
        value: (row) => JSON.stringify(row['__eventName'])
    },
    "first_name",
    "last_name",
    "phone",
    {
        label: "$created_at",
        value: (row) => formatDate(row['$created_at'])
    },
    "$created_by",
    {
        label: "$updated_at",
        value: (row) => formatDate(row['$updated_at'])
    },
    "$updated_by",
    "$request_id",
    "$is_deleted",
    {
        label: "$deleted_at",
        value: (row) => formatDate(row['$deleted_at'])
    },
];

const opts = {
    fields,
    header: false,
};

exports.handler = (event, context, callback) => {
    const output = event.records.map((record) => {
        const entry = JSON.parse(decode(record.data));
        var rec = AWS.DynamoDB.Converter.unmarshall(entry.data);
        var eventName = entry.eventName;
        rec["__eventName"] = eventName;
        const csv = parse(rec, opts);
        return {
            recordId: record.recordId,
            result: 'Ok',
            data: encode(csv + '\n'),
        };
    });
    callback(null, { records: output });
};

Aurora Processor Code Snippet

import logging
import json
import pg8000
import boto3
import sys
import os

# csv header
header = [
    "id",
    "creation_date",
    "marker",
    "first_name",
    "last_name",
    "phone",
    "_created_at",
    "_created_by",
    "_updated_at",
    "_updated_by",
    "_request_id",
    "_is_deleted",
    "_deleted_at"
]

# rds settings
db_endpoint = os.environ['db_endpoint']
db_port = int(os.environ['db_port'])
db_name = os.environ['db_name']
db_table = os.environ['db_table']
db_user = os.environ['db_user']
db_password = os.environ['db_password']

# logger settings
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# s3 settings
s3_client = boto3.client('s3')


def ingest(conn, record):
    bucket = record['s3']['bucket']['name']
    key = record['s3']['object']['key']
    region = record['awsRegion']
    psql = "SELECT aws_s3.table_import_from_s3('%s', '%s', '(format csv)', aws_commons.create_s3_uri('%s', '%s', '%s'))" % (
        db_table, ','.join(header), bucket, key, region
    )
    logger.info(psql)
    try:
        with conn.cursor() as cur:
            cur.execute(psql)
            logger.info('Data loaded from S3 into Aurora')
    except pg8000.IntegrityError as err:
        if err.args[0]['C'] == '23505':
            logger.info('File already loaded (duplicate key)')
        else:
            raise


def handler(event, context):
    try:
        conn = pg8000.connect(
            host=db_endpoint,
            port=db_port,
            user=db_user,
            password=db_password,
            database=db_name,
            timeout=900
        )
        conn.autocommit = True
    except Exception as e:
        logger.error(
            "ERROR: Unexpected error: Could not connect to Aurora instance."
        )
        logger.error(e)
        sys.exit()
    logger.info("SUCCESS: Connection to RDS Aurora instance succeeded")
    for record in event['Records']:
        print(record)
        if 'body' in record:        # The event comes from SQS
            body = json.loads(record['body'])
            if not('Event' in body) or body['Event'] != 's3:TestEvent':
                for rec in body['Records']:
                    ingest(conn, rec)
        else:                       # The event comes from S3
            ingest(conn, record)
    conn.close()
    logger.info("SUCCESS: Connection to RDS Aurora instance successfully closed")

Data propagation

The question of how updates are propagated from the source database to the destination database is not necessarily a simple one to answer. How data propagates from one datastore to another varies from case to case. However, the models for the propagation of updates can generally be categorized into two main types: Non-destructive and destructive.

Non-destructive updates

1*GNGRDVg4lMungAXIR45CSA

Non Destructive Updates Schema

In this scenario, when you change or delete an item in the source database, a notification will be stored in the destination database to register the change or deletion of this item. When an item is deleted, the last information for this item is still stored in the database — providing a trace of the information that existed before deletion.

Pros and Cons: The obvious benefit of this model is the ability to track changes and access previous versions of data. However, the tracking of multiple versions requires additional storage space and might oblige you to make specific queries in order to access entries.

Destructive updates

1*wagf0PfORsHUf2 joP2fJg

Destructive Updates Schema

An example of this scenario would be a mirror database where all the changes that happen within your source database will be propagated into the destination database (you will not, however, be able to track the changes in data) but instead, your destination database will only store the exact equivalent of what you have in the source database. In this scenario, any update to an item in the source database will force an update to the corresponding item in the destination database. For example: With a mirror database model, if you change the name of a user in DynamoDB (source database), you won’t save a new entry in Aurora (destination database) to register this change. Instead, the corresponding user name will be directly updated in Aurora.

Pros and Cons: The primary appeal of this model would be the minimization of storage space. This model gives you the luxury of not having to handle modifications. You won’t have to make specific queries to get only the latest version for each entry because you will always have the latest version that’s automatically updated. However, this model does not allow you to revert back to previous versions.

Lambda functions for Quality control

ETL pipeline with Lambda functions

ETL Pipeline Architecture Schema With Quality Control

Two Lambda functions help evaluate and control the quality of the ETL pipeline. The first Lambda sends a query to DynamoDB to get the number of entries in the source database. The second Lambda sends a query to Aurora to get the number of entries in the destination database. A notification is sent to the user to flag any difference between the two numbers.

TrackIt and ETL Pipelines

Deploying ETL pipelines can be a tricky business. Successfully designing and implementing an ETL process can be prone to mistakes like underestimating the transformation requirements, incorrectly mapping and matching data, poor performance (both in the extraction process and particularly after the fact in the target database or data warehouse), and an inability to scale. If you’re looking for assistance and specialized expertise in implementing your ETL pipeline TrackIt is here to assist you.

The ETL pipeline showcased in this article was implemented by TrackIt for a client company in the financial industry that was initially using a NoSQL database to ingest transactional data. The unstructured data from transactions was being brought in ‘the easy way’ but could not be queried and as a result, was not serving any purpose. Having sought TrackIt’s assistance in building the solution described above, the company now has an ETL pipeline functioning in real-time that allows it to derive the benefits of both NoSQL and SQL databases.

About TrackIt

TrackIt is an Amazon Web Services Advanced Consulting Partner specializing in cloud management, consulting, and software development solutions based in Venice, CA.

TrackIt specializes in Modern Software Development, DevOps, Infrastructure-As-Code, Serverless, CI/CD, and Containerization with specialized expertise in Media & Entertainment workflows, High-Performance Computing environments, and data storage.

TrackIt’s forté is cutting-edge software design with deep expertise in containerization, serverless architectures, and innovative pipeline development. The TrackIt team can help you architect, design, build and deploy a customized solution tailored to your exact requirements.

In addition to providing cloud management, consulting, and modern software development services, TrackIt also provides an open-source AWS cost management tool that allows users to optimize their costs and resources on AWS.