Apr. 02, 2020
In a world where an ever-increasing amount of data is being gathered, companies often find themselves without the tools to optimally use the often unstructured data they’ve gathered. To get a better idea of what such scenarios typically look like, let us consider the example of Company X. Company X has a NoSQL database that it favors using since it serves as an easy and effective way to ingest high volumes of data. This works well for a while; the data is coming in “the easy way” and things seem good. After the fact, however, Company X runs into a genuine challenge: the consumers of data in the company start to realize the need to make structured and relational queries with the gathered data. The problem? They aren’t using a relational database. The data in the NoSQL database is unstructured and not suitable for structured and relational queries. So what can Company X do?
How can Company X manage to salvage the unstructured data it has gathered and still make use of it in a structured way?
The solution to Company X’s predicament is an ETL pipeline — a pipeline that automatically Extracts data from their NoSQL database, Transforms it into a specific format suitable for exploitation and Loads it into a destination relational database. Nothing need change in terms of how the data is ingested. Company X still has all its processes ingesting data “the easy way” into the NoSQL database. But with an effective ETL pipeline in place, this unstructured data is also correctly loaded into an SQL database which Company X can then exploit to make structured and relational queries. As a result, Company X gets to derive the benefits of both types of databases: the enhanced data ingestion capabilities of the NoSQL database along with the ability to make relational queries with the relational database.
AWS already offers specific services such as AWS Data Pipeline that can help you to clone and migrate databases. However, these services do not provide you with the ability to extract, transform, and load data between databases in real-time, and that’s where an ETL pipeline comes in.
Companies are progressively finding themselves in scenarios resembling that of Company X due to the increased use of NoSQL databases driven by an appreciation of their data ingestion capabilities. Historically, of course, others may have started off with an SQL database in order to be able to query their data while unfortunately minimizing the usefulness of unstructured data. Regardless of what type of database a company has started with, the problem that could arise in both scenarios is one and the same: the need to move from one type of database to another. (In other words, the challenge is either moving from unstructured data to structured data or vice versa.) ETL pipelines help bridge this gap.
ETL stands for Extract, Transform, and Load. As the name implies, an ETL pipeline refers to a set of processes that:
In the pipeline illustrated below, we demonstrate an architecture for a DynamoDB to Aurora ETL. DynamoDB is our source NoSQL database, and the destination is Aurora, a PostgreSQL database.
'use strict'; const AWS = require('aws-sdk'); const sts = new AWS.STS(); exports.handler = (event, context, callback) => { const firehose = new AWS.Firehose({ region: process.env.region, }); const records = event.Records; if (records.length) { firehose.putRecordBatch( { DeliveryStreamName: process.env.stream_name, Records: records.map(record => ({ Data: JSON.stringify({ data: record.dynamodb.NewImage || record.dynamodb.OldImage, eventName: record.eventName })})), }, (err, data) => { if (err) { console.log(err, err.stack); callback(err); } else { console.log(data); callback(null, 'Successfully processed records.'); } } ); } else callback(null, 'Nothing to process.'); };
'use strict'; const AWS = require('aws-sdk'); const { parse } = require('json2csv'); const uuid = require('uuid'); const decode = (b) => (new Buffer(b, 'base64')).toString('utf8'); const encode = (s) => (new Buffer(s, 'utf8')).toString('base64'); const formatDate = (date) => { if (date === null) return null; const parsedDate = new Date(date); return (isNaN(parsedDate) ? null : parsedDate.getFullYear() + "-" + (parsedDate.getMonth() + 1) + "-" + parsedDate.getDate() + " " + parsedDate.getHours() + ":" + parsedDate.getMinutes() + ":" + parsedDate.getSeconds() + "." + parsedDate.getMilliseconds()); } const fields = [ { label: "id", value: () => uuid.v4() }, { label: "creation_date", value: () => formatDate(new Date()) }, { label: "marker", value: (row) => JSON.stringify(row['__eventName']) }, "first_name", "last_name", "phone", { label: "$created_at", value: (row) => formatDate(row['$created_at']) }, "$created_by", { label: "$updated_at", value: (row) => formatDate(row['$updated_at']) }, "$updated_by", "$request_id", "$is_deleted", { label: "$deleted_at", value: (row) => formatDate(row['$deleted_at']) }, ]; const opts = { fields, header: false, }; exports.handler = (event, context, callback) => { const output = event.records.map((record) => { const entry = JSON.parse(decode(record.data)); var rec = AWS.DynamoDB.Converter.unmarshall(entry.data); var eventName = entry.eventName; rec["__eventName"] = eventName; const csv = parse(rec, opts); return { recordId: record.recordId, result: 'Ok', data: encode(csv + '\n'), }; }); callback(null, { records: output }); };
import logging import json import pg8000 import boto3 import sys import os # csv header header = [ "id", "creation_date", "marker", "first_name", "last_name", "phone", "_created_at", "_created_by", "_updated_at", "_updated_by", "_request_id", "_is_deleted", "_deleted_at" ] # rds settings db_endpoint = os.environ['db_endpoint'] db_port = int(os.environ['db_port']) db_name = os.environ['db_name'] db_table = os.environ['db_table'] db_user = os.environ['db_user'] db_password = os.environ['db_password'] # logger settings logger = logging.getLogger() logger.setLevel(logging.INFO) # s3 settings s3_client = boto3.client('s3') def ingest(conn, record): bucket = record['s3']['bucket']['name'] key = record['s3']['object']['key'] region = record['awsRegion'] psql = "SELECT aws_s3.table_import_from_s3('%s', '%s', '(format csv)', aws_commons.create_s3_uri('%s', '%s', '%s'))" % ( db_table, ','.join(header), bucket, key, region ) logger.info(psql) try: with conn.cursor() as cur: cur.execute(psql) logger.info('Data loaded from S3 into Aurora') except pg8000.IntegrityError as err: if err.args[0]['C'] == '23505': logger.info('File already loaded (duplicate key)') else: raise def handler(event, context): try: conn = pg8000.connect( host=db_endpoint, port=db_port, user=db_user, password=db_password, database=db_name, timeout=900 ) conn.autocommit = True except Exception as e: logger.error( "ERROR: Unexpected error: Could not connect to Aurora instance." ) logger.error(e) sys.exit() logger.info("SUCCESS: Connection to RDS Aurora instance succeeded") for record in event['Records']: print(record) if 'body' in record: # The event comes from SQS body = json.loads(record['body']) if not('Event' in body) or body['Event'] != 's3:TestEvent': for rec in body['Records']: ingest(conn, rec) else: # The event comes from S3 ingest(conn, record) conn.close() logger.info("SUCCESS: Connection to RDS Aurora instance successfully closed")
The question of how updates are propagated from the source database to the destination database is not necessarily a simple one to answer. How data propagates from one datastore to another varies from case to case. However, the models for the propagation of updates can generally be categorized into two main types: Non-destructive and destructive.
In this scenario, when you change or delete an item in the source database, a notification will be stored in the destination database to register the change or deletion of this item. When an item is deleted, the last information for this item is still stored in the database — providing a trace of the information that existed before deletion.
Pros and Cons: The obvious benefit of this model is the ability to track changes and access previous versions of data. However, the tracking of multiple versions requires additional storage space and might oblige you to make specific queries in order to access entries.
An example of this scenario would be a mirror database where all the changes that happen within your source database will be propagated into the destination database (you will not, however, be able to track the changes in data) but instead, your destination database will only store the exact equivalent of what you have in the source database. In this scenario, any update to an item in the source database will force an update to the corresponding item in the destination database. For example: With a mirror database model, if you change the name of a user in DynamoDB (source database), you won’t save a new entry in Aurora (destination database) to register this change. Instead, the corresponding user name will be directly updated in Aurora.
Pros and Cons: The primary appeal of this model would be the minimization of storage space. This model gives you the luxury of not having to handle modifications. You won’t have to make specific queries to get only the latest version for each entry because you will always have the latest version that’s automatically updated. However, this model does not allow you to revert back to previous versions.
Two Lambda functions help evaluate and control the quality of the ETL pipeline. The first Lambda sends a query to DynamoDB to get the number of entries in the source database. The second Lambda sends a query to Aurora to get the number of entries in the destination database. A notification is sent to the user to flag any difference between the two numbers.
Deploying ETL pipelines can be a tricky business. Successfully designing and implementing an ETL process can be prone to mistakes like underestimating the transformation requirements, incorrectly mapping and matching data, poor performance (both in the extraction process and particularly after the fact in the target database or data warehouse), and an inability to scale. If you’re looking for assistance and specialized expertise in implementing your ETL pipeline TrackIt is here to assist you.
The ETL pipeline showcased in this article was implemented by TrackIt for a client company in the financial industry that was initially using a NoSQL database to ingest transactional data. The unstructured data from transactions was being brought in ‘the easy way’ but could not be queried and as a result, was not serving any purpose. Having sought TrackIt’s assistance in building the solution described above, the company now has an ETL pipeline functioning in real-time that allows it to derive the benefits of both NoSQL and SQL databases.
TrackIt is an Amazon Web Services Advanced Consulting Partner specializing in cloud management, consulting, and software development solutions based in Venice, CA.
TrackIt specializes in Modern Software Development, DevOps, Infrastructure-As-Code, Serverless, CI/CD, and Containerization with specialized expertise in Media & Entertainment workflows, High-Performance Computing environments, and data storage.
TrackIt’s forté is cutting-edge software design with deep expertise in containerization, serverless architectures, and innovative pipeline development. The TrackIt team can help you architect, design, build and deploy a customized solution tailored to your exact requirements.
In addition to providing cloud management, consulting, and modern software development services, TrackIt also provides an open-source AWS cost management tool that allows users to optimize their costs and resources on AWS.