Kinesis-Stream Scripts

What Is Amazon Kinesis Streams?

Use Amazon Kinesis Streams to collect and process large streams of data records in real time.

What Can I Do with Streams?

You can use Streams for rapid and continuous data intake and aggregation. The type of data used includes IT infrastructure log data, application logs, social media, market data feeds, and web clickstream data. Because the response time for the data intake and processing is in real time, the processing is typically lightweight.

Amazon Kinesis Streams

An Amazon Kinesis stream is an ordered sequence of data records. Each record in the stream has a sequence number that is assigned by Streams. The data records in the stream are distributed into shards.

Data Records

A data record is the unit of data stored in an Amazon Kinesis stream. Data records are composed of a sequence number, partition key, and data blob, which is an immutable sequence of bytes. Streams does not inspect, interpret, or change the data in the blob in any way. A data blob can be up to 1 MB.

Shards

A shard is a uniquely identified group of data records in a stream. A stream is composed of one or more shards, each of which provides a fixed unit of capacity. Each shard can support up to 5 transactions per second for reads, up to a maximum total data read rate of 2 MB per second and up to 1,000 records per second for writes, up to a maximum total data write rate of 1 MB per second (including partition keys). The data capacity of your stream is a function of the number of shards that you specify for the stream. The total capacity of the stream is the sum of the capacities of its shards.

If your data rate increases, then you just add more shards to increase the size of your stream. Similarly, you can remove shards if the data rate decreases.

For 1 SHARD:

                                                   Read:                                                     Write:
Total Stream Capacity:              2 MB/s                                                    1 MB/s
Max Transactions/second:        5                                                             1000

Partition Keys

A partition key is used to group data by shard within a stream. The Streams service segregates the data records belonging to a stream into multiple shards, using the partition key associated with each data record to determine which shard a given data record belongs to.A partition key is specified by the applications putting the data into a stream.

What is Shard Iterator?

A shard iterator expires five minutes after it is returned to the requester. A shard iterator specifies the shard position from which to start reading data records sequentially. The position is specified using the sequence number of a data record in a shard.

ShardIteratorType

Determines how the shard iterator is used to start reading data records from the shard.

The following are the valid Amazon Kinesis shard iterator types:

  • AT_SEQUENCE_NUMBER – Start reading from the position denoted by a specific sequence number, provided in the value StartingSequenceNumber.

  • AFTER_SEQUENCE_NUMBER – Start reading right after the position denoted by a specific sequence number, provided in the value StartingSequenceNumber.

  • AT_TIMESTAMP – Start reading from the position denoted by a specific timestamp, provided in the valueTimestamp.

  • TRIM_HORIZON – Start reading at the last untrimmed record in the shard in the system, which is the oldest data record in the shard.

  • LATEST – Start reading just after the most recent record in the shard, so that you always read the most recent data in the shard.

 

Amazon Kinesis Client Library

The Amazon Kinesis Client Library is compiled into your application to enable fault-tolerant consumption of data from the stream. The Amazon Kinesis Client Library ensures that for every shard there is a record processor running and processing that shard. The library also simplifies reading data from the stream. The Amazon Kinesis Client Library uses an Amazon DynamoDB table to store control data. It creates one table per application that is processing data.

1. Create Stream using BOTO3 library

import boto3

client = boto3.client(‘kinesis’)

response = client.create_stream(

    StreamName=’create01′,

    ShardCount=2

)

1.1 Create Stream using AWS CLI

$ aws kinesis create-stream –stream-name awscli –shard-count 1

$ aws kinesis describe-stream –stream-name awscli
SHARDS    shardId-000000000000
HASHKEYRANGE    340282366920938463463374607431768211455    0
SEQUENCENUMBERRANGE    49564427759838483454612901031789331947837672521652502530

$ aws kinesis list-streams
STREAMNAMES    awscli
STREAMNAMES    awscli-01
STREAMNAMES    cli
STREAMNAMES    console

2.1 Put-Item using BOTO3 library

import boto3

client = boto3.client(‘kinesis’)

response = client.put_record(

    StreamName=’create01′,

    Data=’name – sakthi, place – ky, 001 – 862′,

    PartitionKey=’name’,

    SequenceNumberForOrdering=’888′

)

2.1  Put-Item using AWS-CLI

aws kinesis put-record --stream-name cli-stream --partition-key name --data "name : hari, address: frisco"
49564427864384376945324462320314605491932595514234634242    shardId-000000000000 

3. Describe
import boto3
client = boto3.client('kinesis')
response = client.describe_stream(
    StreamName='botostream',
    Limit=123
)

4. Enable enhanced monitoring

import boto3
client = boto3.client('kinesis')
response = client.enable_enhanced_monitoring(
    StreamName='mass',
    ShardLevelMetrics=[
        'ALL'
    ]
)

5. Change the Stream Retention Period

import boto3
client = boto3.client('kinesis')
response = client.increase_stream_retention_period(
    StreamName='botostream',
    RetentionPeriodHours=120
)

6.  Add tag

import boto3
client = boto3.client('kinesis')
response = client.add_tags_to_stream(
    StreamName='botostream',
    Tags={
        'string': 'bototag'
    }
)

7.  Upload the JSON file (for loop)

import json
import decimal
import datetime
import time
import base64
import boto3
client = boto3.client('kinesis', region_name="us-east-1")
with open("jeopardy-questions.json") as json_file:
    jq = json.load(json_file, parse_float=decimal.Decimal)
# print jq
for i in jq:
    val = i['value']
    print
    val
    if val is None:
        val = '$0'
    data = json.dumps(i)
    # print data
    # print base64.b64encode(data)
    response = client.put_record(
        StreamName='mass',
        Data=base64.b64encode(data),
        PartitionKey=val
    )




Scenario:1 
Create a stream to store family record  using 4 shards & partition-key "name" and check the each record :

Create stream "family":
 $ aws kinesis create-stream --stream-name family --shard-count 4

Describe stream:
$ aws kinesis describe-stream --stream-name family
SHARDS    shardId-000000000000
HASHKEYRANGE    85070591730234615865843651857942052863    0
SEQUENCENUMBERRANGE    49564427969643894282389003547357369456913497976968577026
SHARDS    shardId-000000000001
HASHKEYRANGE    170141183460469231731687303715884105727    85070591730234615865843651857942052864
SEQUENCENUMBERRANGE    49564427969666195027587534170498905175186146338474557458
SHARDS    shardId-000000000002
HASHKEYRANGE    255211775190703847597530955573826158591    170141183460469231731687303715884105728
SEQUENCENUMBERRANGE    49564427969688495772786064793640440893458794699980537890
SHARDS    shardId-000000000003
HASHKEYRANGE    340282366920938463463374607431768211455    255211775190703847597530955573826158592
SEQUENCENUMBERRANGE    49564427969710796517984595416781976611731443061486518322

Put record:
$ aws kinesis put-record --stream-name family --partition-key name --data "name:Senthil, Age:38, Place: Chennai"
49564427969688495772786064797730236941215101828124901410    shardId-000000000002

Get records: (2 steps, get shard_iterator and get records)
$ SHARD_ITERATOR=$(aws kinesis get-shard-iterator --shard-id shardId-000000000002 --shard-iterator-type TRIM_HORIZON --stream-name family)

0    AAAAAAAAAAFQTZslNgrxmplVIKF2AbJFXFsakj22GvPKK5Onr8Y8sVPojfDPMCSVG4+doF+Fx5WkmTUdoLMm7XHB2zRk5hDxBGtys0t14BOVf+s4n7LC1pbtfeq4So5PO2q0l4jE6SuwSFknyvsLZbGh8GLHL5sr0qLZqYMKZzKs5Egh+6pZZ1W7kNfsmsOHwZWlXLu3fBMB88P7AmcwiVJgWbHLwPxx
RECORDS    1470160369.5        name    49564427969688495772786064797730236941215101828124901410
RECORDS    1470160385.66    bmFtZTpQcml5YSwgQWdlOjMyLCBQbGFjZTogQ2hlbm5haQ==    name    49564427969688495772786064798115884277672169634367799330
RECORDS    1470160409.35    bmFtZTpTYWt0aGksIEFnZTo4LCBQbGFjZTogS1k=    name    49564427969688495772786064798361296219053941006100594722
RECORDS    1470160502.14    cGFsYW5pLCBBZ2U6OCwgUGxhY2U6IEtZ    name    49564427969688495772786064798725182890757950709879013410
RECORDS    1470160530.64    cm9qYSwgeWVhcjo2NQ==    year    49564427969688495772786064798798927365754445151120392226

*** to read it , decode the Base64 format output.
https://www.base64decode.org/ --> copy the "bmFtZTpTZW50aGlsLCBBZ2U6MzgsIFBsYWNlOiBDaGVubmFp"
(or)
command line decode:
 $ aws kinesis get-records --shard-iterator $SHARD_ITERATOR | awk '{print $3}' | base64 --decode


Merge two shards as one (to reduce the shard capacity:
aws kinesis merge-shards --stream-name family --shard-to-merge shardId-000000000000 --adjacent-shard-to-merge shardId-000000000001 

Spit a shard into two shards (to increase the shard capacity):
aws kinesis split-shard --stream-name family --shard-to-split shardId-000000000000 --new-starting-hash-key 42535295865117307932921825928971026431

***How to get the new-starting-hash-key??

aws kinesis describe-stream --stream-name family| egrep "ShardId|Key"
                "ShardId": "shardId-000000000000", 
                 "HashKeyRange": {
                    "EndingHashKey": "85070591730234615865843651857942052863",   --> /2 (divide this half to get new starting key)
                    "StartingHashKey": "0"




Leave a Comment

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>