Loading DynamoDB stream to AWS Elasticsearch service with Lambda
We can load streaming data into our Amazon Elasticsearch Service (ES) domain from many different sources.
We'll use AWS Lambda to send data to our ES domain from Amazon DynamoDB. New data that arrives in the database table triggers an event notification to Lambda, which then runs our code to perform the indexing.
In this post, we'll do the following (see Loading Streaming Data into Amazon Elasticsearch Service):
- Create a DynamoDB table with DynamoDB Streams enabled. The table must reside in the same region as our ES domain and have a stream set to New image.
- Create an IAM Role. This role must have basic Amazon ES, DynamoDB, and Lambda execution permissions.
- Create an Elasticsearch Service (ES) domain which is the destination for data after our Lambda function processes it.
- Create a Lambda function. Note that to send our DynamoDB table data as a stream to the elasticsearch we are going to use triggered Lambda function. The Lamba function then runs our code to perform the indexing and adding item in the elasticsearch.
- Set event trigger on DynamoDB. Set the trigger Lambda function as MusicEventProcessor.
- Then we create some events by INSERT/MODIFY/DELETE on DynamoDB Table either via CLI or console.
- Check if events have been triggered using Monitoring section of Lambda and CloudWatch Log.
Create a Music table in Amazon DynamoDB with the following details:
The enable the Stream:
During the role creation steps, we will be asked to make a policy for the role.
"Create role" => Select "Lambda" as a use case => Click "Create Policy".
This role must have basic Amazon ES, DynamoDB, and Lambda execution permissions, such as the following:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:ListStreams", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "*" } ] }
Attatch the policy to the role, name the role as "ddb-to-es-lambda-role", and then click "Create"
Note that because we chose "Lambda" as a use case, it has the following Trust Relationship:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }
Let's create ES domain that is the destination for data after our Lambda function processes it.
Though the recommended "Network Configuration" is to use VPC access, in our case, we chose Public access. For the "Domain access policy" we opted to use "Allow access to the domain".
It takes a while for our ES becomes active.
Note that the following endpoint info is needed in our Lambda function code:
https://search-ddb-to-es-r7dcdoy4caeoklst3yseumqmre.us-east-1.es.amazonaws.com
And we will bu using the Kibana url in our testing:
https://search-ddb-to-es-r7dcdoy4caeoklst3yseumqmre.us-east-1.es.amazonaws.com/_plugin/kibana/
Let's create a Lambda function named as ddb-to-es-lambda-function:
create a directory named ddb-to-es and use the following code (sample.py):
import boto3 import requests from requests_aws4auth import AWS4Auth region = 'us-east-1' service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = 'https://search-ddb-to-es-r7dcdoy4caeoklst3yseumqmre.us-east-1.es.amazonaws.com' # the Amazon ES domain, with https:// index = 'lambda-index' type = 'lambda-type' url = host + '/' + index + '/' + type + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: # Get the primary key for use as the Elasticsearch ID id = record['dynamodb']['Keys']['id']['S'] if record['eventName'] == 'REMOVE': r = requests.delete(url + id, auth=awsauth) else: document = record['dynamodb']['NewImage'] r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return str(count) + ' records processed.'
Note that in our lambda code we have PUT requests. For that we need to create out code locally with dependencies and zip the file, then upload it to AWS Lambda.
Edit the variables for region and host and then use the following commands to install our dependencies:
$ cd ddb-to-es $ pip install requests -t . $ pip install requests_aws4auth -t . $ pip install boto3 .
Note that the boto3 was installed as well because I got a "Runtime.ImportModuleError":
cannot import name 'AWS4Auth' from 'requests_aws4auth'
Package the application code and dependencies:
$ zip -r lambda.zip *
Upload the zip.
For Handler, name the handler as sample.handler:
This setting tells Lambda the file (sample.py) and method (handler) that it should run after a trigger.
Also, we need to add a Trigger to the Lambda function:
At this point, we have a complete set of resources: a DynamoDB table for our source data, a DynamoDB stream of changes to the table, a function that runs after our source data changes and indexes those changes, and an Amazon ES domain for searching and visualization.
Now, we can test our lambda function by adding a new item to the DynamoDB table using the AWS CLI from our local desktop:
$ aws dynamodb put-item --table-name movie-ddb --item '{"director": {"S": "Kevin Costner"},"id": {"S": "00001"},"title": {"S": "The Postman"}}' --region us-east-1 $
We can see it's there in our DDB table:
We can confirm it's been triggered by Lmabda's Monitoring page as well. Another way is to check CloudWatch logs:
Then, we need to verify that lambda-index contains a document. But we got "not authorized" error as shown below:
Use the following request:
$ curl https://search-ddb-to-es-r7dcdoy4caeoklst3yseumqmre.us-east-1.es.amazonaws.com/lambda-index/lambda-type/00001 {"Message":"User: anonymous is not authorized to perform: es:ESHttpGet"}
The same outcome with Kibana:
So, we need to modify the es access policy as ip-based following the guide from
I get a "User: anonymous is not authorized" error when I try to access my Elasticsearch cluster:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "*" }, "Action": "es:ESHttp*", "Resource": "arn:aws:es:us-east-1:526262051452:domain/ddb-to-es/*", "Condition": { "IpAddress": { "aws:SourceIp": "73.70.219.237/24" } } } ] }
Here the ip of my local desktop can be found from https://checkip.amazonaws.com/
Now, make the request again:
$ curl https://search-ddb-to-es-r7dcdoy4caeoklst3yseumqmre.us-east-1.es.amazonaws.com/lambda-index/lambda-type/00001 {"_index":"lambda-index","_type":"lambda-type","_id":"00001","_version":10,"_seq_no":9,"_primary_term":1,"found":true, "_source":{"director":{"S":"Kevin Costner"},"id":{"S":"00001"},"title":{"S":"The Postman"}}}
With Kibana:
- Loading Streaming Data into Amazon Elasticsearch Service
- Tutorial: Process New Items with DynamoDB Streams and Lambda
- Step 2: Write Data to a Table Using the Console or AWS CLI
AWS (Amazon Web Services)
- AWS : EKS (Elastic Container Service for Kubernetes)
- AWS : Creating a snapshot (cloning an image)
- AWS : Attaching Amazon EBS volume to an instance
- AWS : Adding swap space to an attached volume via mkswap and swapon
- AWS : Creating an EC2 instance and attaching Amazon EBS volume to the instance using Python boto module with User data
- AWS : Creating an instance to a new region by copying an AMI
- AWS : S3 (Simple Storage Service) 1
- AWS : S3 (Simple Storage Service) 2 - Creating and Deleting a Bucket
- AWS : S3 (Simple Storage Service) 3 - Bucket Versioning
- AWS : S3 (Simple Storage Service) 4 - Uploading a large file
- AWS : S3 (Simple Storage Service) 5 - Uploading folders/files recursively
- AWS : S3 (Simple Storage Service) 6 - Bucket Policy for File/Folder View/Download
- AWS : S3 (Simple Storage Service) 7 - How to Copy or Move Objects from one region to another
- AWS : S3 (Simple Storage Service) 8 - Archiving S3 Data to Glacier
- AWS : Creating a CloudFront distribution with an Amazon S3 origin
- AWS : Creating VPC with CloudFormation
- AWS : WAF (Web Application Firewall) with preconfigured CloudFormation template and Web ACL for CloudFront distribution
- AWS : CloudWatch & Logs with Lambda Function / S3
- AWS : Lambda Serverless Computing with EC2, CloudWatch Alarm, SNS
- AWS : Lambda and SNS - cross account
- AWS : CLI (Command Line Interface)
- AWS : CLI (ECS with ALB & autoscaling)
- AWS : ECS with cloudformation and json task definition
- AWS Application Load Balancer (ALB) and ECS with Flask app
- AWS : Load Balancing with HAProxy (High Availability Proxy)
- AWS : VirtualBox on EC2
- AWS : NTP setup on EC2
- AWS: jq with AWS
- AWS & OpenSSL : Creating / Installing a Server SSL Certificate
- AWS : OpenVPN Access Server 2 Install
- AWS : VPC (Virtual Private Cloud) 1 - netmask, subnets, default gateway, and CIDR
- AWS : VPC (Virtual Private Cloud) 2 - VPC Wizard
- AWS : VPC (Virtual Private Cloud) 3 - VPC Wizard with NAT
- DevOps / Sys Admin Q & A (VI) - AWS VPC setup (public/private subnets with NAT)
- AWS - OpenVPN Protocols : PPTP, L2TP/IPsec, and OpenVPN
- AWS : Autoscaling group (ASG)
- AWS : Setting up Autoscaling Alarms and Notifications via CLI and Cloudformation
- AWS : Adding a SSH User Account on Linux Instance
- AWS : Windows Servers - Remote Desktop Connections using RDP
- AWS : Scheduled stopping and starting an instance - python & cron
- AWS : Detecting stopped instance and sending an alert email using Mandrill smtp
- AWS : Elastic Beanstalk with NodeJS
- AWS : Elastic Beanstalk Inplace/Rolling Blue/Green Deploy
- AWS : Identity and Access Management (IAM) Roles for Amazon EC2
- AWS : Identity and Access Management (IAM) Policies, sts AssumeRole, and delegate access across AWS accounts
- AWS : Identity and Access Management (IAM) sts assume role via aws cli2
- AWS : Creating IAM Roles and associating them with EC2 Instances in CloudFormation
- AWS Identity and Access Management (IAM) Roles, SSO(Single Sign On), SAML(Security Assertion Markup Language), IdP(identity provider), STS(Security Token Service), and ADFS(Active Directory Federation Services)
- AWS : Amazon Route 53
- AWS : Amazon Route 53 - DNS (Domain Name Server) setup
- AWS : Amazon Route 53 - subdomain setup and virtual host on Nginx
- AWS Amazon Route 53 : Private Hosted Zone
- AWS : SNS (Simple Notification Service) example with ELB and CloudWatch
- AWS : Lambda with AWS CloudTrail
- AWS : SQS (Simple Queue Service) with NodeJS and AWS SDK
- AWS : Redshift data warehouse
- AWS : CloudFormation
- AWS : CloudFormation Bootstrap UserData/Metadata
- AWS : CloudFormation - Creating an ASG with rolling update
- AWS : Cloudformation Cross-stack reference
- AWS : OpsWorks
- AWS : Network Load Balancer (NLB) with Autoscaling group (ASG)
- AWS CodeDeploy : Deploy an Application from GitHub
- AWS EC2 Container Service (ECS)
- AWS EC2 Container Service (ECS) II
- AWS Hello World Lambda Function
- AWS Lambda Function Q & A
- AWS Node.js Lambda Function & API Gateway
- AWS API Gateway endpoint invoking Lambda function
- AWS API Gateway invoking Lambda function with Terraform
- AWS API Gateway invoking Lambda function with Terraform - Lambda Container
- Amazon Kinesis Streams
- AWS: Kinesis Data Firehose with Lambda and ElasticSearch
- Amazon DynamoDB
- Amazon DynamoDB with Lambda and CloudWatch
- Loading DynamoDB stream to AWS Elasticsearch service with Lambda
- Amazon ML (Machine Learning)
- Simple Systems Manager (SSM)
- AWS : RDS Connecting to a DB Instance Running the SQL Server Database Engine
- AWS : RDS Importing and Exporting SQL Server Data
- AWS : RDS PostgreSQL & pgAdmin III
- AWS : RDS PostgreSQL 2 - Creating/Deleting a Table
- AWS : MySQL Replication : Master-slave
- AWS : MySQL backup & restore
- AWS RDS : Cross-Region Read Replicas for MySQL and Snapshots for PostgreSQL
- AWS : Restoring Postgres on EC2 instance from S3 backup
- AWS : Q & A
- AWS : Security
- AWS : Security groups vs. network ACLs
- AWS : Scaling-Up
- AWS : Networking
- AWS : Single Sign-on (SSO) with Okta
- AWS : JIT (Just-in-Time) with Okta
Ph.D. / Golden Gate Ave, San Francisco / Seoul National Univ / Carnegie Mellon / UC Berkeley / DevOps / Deep Learning / Visualization