AWS Glue – simple demo (copy data between redshift clusters)

Lab Scenario:

EC2 linux installed  with Kinesis agent will capture “tcpdump” logs and ingest in to Kinesis firehose and writes the data in to redshift-cluster-01.

Same region, we will create another redshift-cluster-02 with empty database and table.

Using Glue, we will  create crawlers to connect to redshift-cluster-A and read the schema and new job will be created to copy the data records from redshift-cluster-01 to redshift-cluster-02.

 

Screen Shot 2017-08-24 at 2.22.28 PM

AWS Resources/Services involved:

1. EC2 instance with IAM role to read/write to S3, Firehose

2. Kinesis Firehose to receive “tcpdump” logs from EC2 instance

3. Redshift-cluster-A to allow kinesis Firehose to store the records

4. Redshift-cluster-B to copy records from cluster-A

5. AWS Glue to copy data between Redshift clusters.

6. VPC endpoint to access S3 bucket

Section-A: 

In this section, we are going to configure kinesis-agent in EC2 instance to capture tcpdump logs and stream to Kinesis Firehose and redshift table:

Step:1 Create source and target redshift clusters ( redshift-cluster-01 & redshift-cluster-02)

This document not explains the procedure of  how to create Redshift cluster. Please refer the AWS public doc for details.

Ref: http://docs.aws.amazon.com/redshift/latest/gsg/rs-gsg-launch-sample-cluster.html

For this exercise, we create database/ table with below specifications:

For, Redshift Cluster-01

Database Name: redshift01

For, Redshift Cluster-02

Database Name: redshift02

** Note down the user name/passowrd & connection endpoint.

 

Step:2 Launch an EC2 instance and create connect to Redshift-Database and create tables

Launch an EC2 instance in the same VPC where redshift cluster created.

Also, attach IAM role/policy to allow EC2 instance to have access to S3 read/write, Redshift, Kinesis firehose.

Login to EC2 linux instance and follow below steps:

a. Install postgresql packages:

$ sudo  yum install -y postgresql-server

b. Connect to redshift01 and create table “tcpdump” with column name “col1″ to store tcpdump data.

$ psql -h redshift01.us-east-1.redshift.amazonaws.com -U testuser -d redshift01 -p 5439

redshift01# create table tcpdump (col1 varchar(10000));

c. Connect to redshift02 and create table “tcpdump” with column name “col1″ to copy data from redshift01 database using GLUE.

$ psql -h redshift02.us-east-1.redshift.amazonaws.com -U testuser -d redshift02 -p 5439

redshift02# create table tcpdump (col1 varchar(10000));

 

Step:3  Create Kinesis-Firehose delivery stream to receive “TCPDUMP” logs from EC2 instance:

Follow the instructions -> http://docs.aws.amazon.com/firehose/latest/dev/basic-create.html

Note: select “Destination” as “Amazon Redshift” and enter your redshift name, user ID, password, database etc.,

Example:

for “Amazon Redshift destination” section:

Cluster: redshift01
User Name: user01
Password: *******
Database: redshift01
Table: tcpdump
Columns: col01

for “Intermediate S3 destination” section:

– Give any existing or new S3 bucket

Note: Before data goes from Firehose to redshift, data will be saved in to intermediate S3 bucket.

for “Amazon Redshift COPY command” section: 

— Leave it empty —

Step:4 Configure AWS “Kinesis-agent” and “TCPDUMP” to stream data to “Kinesis-Firehose” and then to “S3″ and “redshift01″ Database:

a. Install “aws-kinesis-agent”:

sudo yum install aws-kinesis-agent -y

sudo service aws-kinesis-agent start

sudo chkconfig aws-kinesis-agent on

b. Configure “TCPDUMP” to create continuous log file

sudo yum install -y tcpdump

sudo nohup tcpdump >> /tmp/tcpdump.log

tail -f /tmp/tcpdump.out  (check to ensure you are getting logs)

c. Configure the “aws-kinesis-agent” to forward “TCPDUMP”log records to kinesis firehose

Update:  sudo vi /etc/aws-kinesis/agent.json

{
“cloudwatch.emitMetrics”: true,
“kinesis.endpoint”: “”,
“firehose.endpoint”: “”,

“flows”: [
{
“filePattern”: “/tmp/tcpdump.out*”,
“deliveryStream”: “FirehoseStreamName
}
]
}

d. Restart the kinesis-agent and verify the logs:

  • $ sudo  service aws-kinesis-agent restart
    aws-kinesis-agent shutdown                                 [  OK  ]
    aws-kinesis-agent startup                                  [  OK  ]

To verify the Kinesis-agent logs to ensure logs copied to Firehose:

  • $ tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log  (it take few minutes…wait to see success to destination)

2017-08-11 16:52:53.017+0000 ip-172-31-4-2 (Agent STARTING) com.amazon.kinesis.streaming.agent.Agent [INFO] Agent: Startup completed in 78 ms.
2017-08-11 16:53:23.022+0000 ip-172-31-4-2 (FileTailer[fh:FireHoseStreamNamet:/tmp/tcpdump.out].MetricsEmitter RUNNING) com.amazon.kinesis.streaming.agent.tailing.FileTailer [INFO] FileTailer[fh:FireHoseStreamNamet:/2016-08-11 17:03:11.646+0000 ip-172-31-4-2 (Agent.MetricsEmitter RUNNING) com.amazon.kinesis.streaming.agent.Agent [INFO] Agent: Progress: 1371 records parsed (4256040 bytes), and 1000 records sent successfully to destinations. Uptime: 90084ms

  • Verify the S3 bucket “tcpdump2525″  to ensure data streamed.
  • Verify REDSHIFT table “select * from tcpdump;” to ensure data exists.
  • Verify AWS console for Firehose and check “Moniroing / S3 logs/ Redshift Logs” for any errors.

 

Section-B

In this section, we are going to configure AWS Glue “Database Catalog”, “Crawler” and ETL “Job “to copy data between redshift cluster.

1. Create IAM role “glueRole” and attach following  policies:

– S3 full access
– Cloudwatch full access
– AmazonKinesisFirehose full access

Ref: http://docs.aws.amazon.com/glue/latest/dg/create-an-iam-role.html

2. Add database

To add database, click on “Add Database” in AWS Glue console and create the two databases as below:

-> redshift01

-> redshift02

3. Add “Connections” (will create Tables next step)

To add connections, click on “Connections” under “Databases” -> “Add Connection”

Connection Name : redshift01-Connection

Connection Type: JDBC (yes..don’t pick redshift)

-> NEXT

Under “Set up access to your data store”:

JDBC URL – jdbc:redshift://redshift01.xxxxx.us-east-1.redshift.amazonaws.com:5439/tcpdump

UserName : user01

Password: xxxxx

VPC: Choose the VPC where redshift cluster exists.

Subnet: Choose the subnet ID where redshift01 cluster exists.

Security Groups: Choose the securigyGroup ID where redshift01 cluster exists.

-> NEXT -> Review-> FINISH

** Repeat the same to create another DB connection “redshift02-Connection”
4. Let us add tables by clicking  “Tables” under “Databases” and then click on “Add Tables” –> “Add Tables using Crawler” ->

Crawler name:  “redshift01-Crawler”

 IAM role:  “glueRole”

-> NEXT

DataStore: JDBC

Connection: Select “redshift01-Connection”

Include Path: tcpdump/public/%  (here, tcpdump – database name, public – database schema name , % – wildcard)

Frequency : Run on Demand

Database: redshift01 (select 1st DB)

Review -> FINISH

** Repeat same steps to create another crawler “redshift02-Crawler”

 

5. Run the crawlers to connect to Database and discover schema.

-> Under “Crawlers” -> select “redshift01-crawler” -> hit “Run crawler”

** This process may take few minutes.

 

6. Create “Jobs”

Add Job  -> enter the following under “Job properties”

Job Name: redshift01-redshift02

IAM Role – glueRole

This job runs : A proposed script generated by AWS Glue

Script file name : default

S3 path where script is stored: default

Temporary directory : <your S3 bucket>

 

-> NEXT

Under “Choose your data sources”; select the “redshift01″ database as a source Database.

-> NEXT

Under “Choose your data target”; select “redshift02″ database as a target Database.

-> NEXT

Under “Map the source columns to target columns”; leave it –default–

** Note, we are just moving records as it is from redshift01 to redshift02 and hence no modification required with respect to “column name” and “data type”.

-> NEXT

Review and hit “FINISH”

 

Sample Job output:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, [‘TempDir’,’JOB_NAME’])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args[‘JOB_NAME’], args)
## @type: DataSource
## @args: [database = “redshift01″, table_name = “glue_public_glue”, redshift_tmp_dir = TempDir, transformation_ctx = “datasource0″]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = “redshift01″, table_name = “glue_public_glue”, redshift_tmp_dir = args[“TempDir”], transformation_ctx = “datasource0″)
## @type: ApplyMapping
## @args: [mapping = [(“col1″, “string”, “col1″, “string”)], transformation_ctx = “applymapping1″]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [(“col1″, “string”, “col1″, “string”)], transformation_ctx = “applymapping1″)
## @type: SelectFields
## @args: [paths = [“col1″], transformation_ctx = “selectfields2″]
## @return: selectfields2
## @inputs: [frame = applymapping1]
selectfields2 = SelectFields.apply(frame = applymapping1, paths = [“col1″], transformation_ctx = “selectfields2″)
## @type: ResolveChoice
## @args: [choice = “MATCH_CATALOG”, database = “redshift01″, table_name = “glue_public_glue”, transformation_ctx = “resolvechoice3″]
## @return: resolvechoice3
## @inputs: [frame = selectfields2]
resolvechoice3 = ResolveChoice.apply(frame = selectfields2, choice = “MATCH_CATALOG”, database = “redshift01″, table_name = “glue_public_glue”, transformation_ctx = “resolvechoice3″)
## @type: ResolveChoice
## @args: [choice = “make_cols”, transformation_ctx = “resolvechoice4″]
## @return: resolvechoice4
## @inputs: [frame = resolvechoice3]
resolvechoice4 = ResolveChoice.apply(frame = resolvechoice3, choice = “make_cols”, transformation_ctx = “resolvechoice4″)
## @type: DataSink
## @args: [database = “redshift01″, table_name = “glue_public_glue”, redshift_tmp_dir = TempDir, transformation_ctx = “datasink5″]
## @return: datasink5
## @inputs: [frame = resolvechoice4]
datasink5 = glueContext.write_dynamic_frame.from_catalog(frame = resolvechoice4, database = “redshift01″, table_name = “glue_public_glue”, redshift_tmp_dir = args[“TempDir”], transformation_ctx = “datasink5″)
job.commit()

Finally, hit “Run job” and watch the “Logs” in console.

 

7. Verify the job Status:

  • Check the Logs & Error Logs in Cloudwatch.
  • If no error, connect to target redshift database and run “select * from tcpdump;” to ensure records from source database copied to target one.

 

Note: Optionally, you can run the job against trigger (lambda) to check new records added in S3/redshift and trigger crawler to re-run and understand the schema and copy the records to target DB.

Leave a Comment

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>