`
sillycat
  • 浏览: 2557871 次
  • 性别: Icon_minigender_1
  • 来自: 成都
社区版块
存档分类
最新评论

AMAZON Kinesis Firehose 2019(1)Firehose Buffer to S3

 
阅读更多
AMAZON Kinesis Firehose 2019(1)Firehose Buffer to S3

Some final references to create the project
https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Firehose.html
https://github.com/danielsan/firehose-nodejs-example
https://forum.serverless.com/t/creating-a-kinesis-firehose-stream-in-serverless-yaml-with-iamrolelambdaexecution-role/2366
https://github.com/otofu-square/serverless-kinesis-firehose/blob/master/serverless.yml
https://github.com/mikepatrick/kinesis-log-aggregator-demo
https://github.com/phodal/serverless/blob/02a067088cb37d294074b334777a4ca4175f737c/firehose/handler.js

Examples
http://serverless.ink/#serverless-%E6%95%B0%E6%8D%AE%E5%88%86%E6%9E%90kinesis-firehose-%E6%8C%81%E4%B9%85%E5%8C%96%E6%95%B0%E6%8D%AE%E5%88%B0-s3
http://serverless.ink/#serverless-kinesis-firehose-%E4%BB%A3%E7%A0%81
http://serverless.ink/#%E5%AE%89%E8%A3%85%E5%8F%8A%E6%B5%8B%E8%AF%95

Give region in the new construction
https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Kinesis.html#constructor-property

More example here
https://github.com/phodal/serverless

Nothing special in the package.json for dependency
  "devDependencies": {
    "@types/aws-lambda": "^8.10.31",
    "@types/node": "^8.0.57",
    "@typescript-eslint/eslint-plugin": "^2.0.0",
    "@typescript-eslint/parser": "^2.0.0",
    "aws-sdk": "^2.518.0",
    "eslint": "^6.2.2",
    "eslint-config-prettier": "^6.1.0",
    "eslint-plugin-prettier": "^3.1.0",
    "prettier": "^1.18.2",
    "serverless": "^1.50.1",
    "serverless-webpack": "^4.4.0",
    "ts-loader": "^2.3.7",
    "tslint": "^5.19.0",
    "typescript": "^3.5.3",
    "webpack": "^3.12.0"
  }

Use the serverless.yml to create the resource
plugins:
  - serverless-webpack
custom:
  stage: ${opt:stage, 'stage'}
  regionByStage:
    int: us-west-1
    stage: us-west-1
    prod: us-west-2
  resource_region: ${self:custom.regionByStage.${self:custom.stage}}
  deploy_region: ${opt:region, self:custom.regionByStage.${self:custom.stage}}
  eventBusArn: ${cf:eventBus-${opt:stage, self:provider.stage}.SNSTopic}
  datawarehouses3: 'datawarehouse-${self:custom.stage}-events'
  persists3bucketname: 'datawarehouse-${self:custom.stage}-events'

functions:
  postEventHandler:
    handler: src/eventHandler.postEvents
   
resources:
  Resources:
    FirehoseToS3Role:
      Type: AWS::IAM::Role
      Properties:
        RoleName: FirehoseToS3Role
        AssumeRolePolicyDocument:
          Statement:
          - Effect: Allow
            Principal:
              Service:
              - firehose.amazonaws.com
            Action:
            - sts:AssumeRole
        Policies:
        - PolicyName: FirehoseToS3Policy
          PolicyDocument:
            Statement:
              - Effect: Allow
                Action:
                - s3:AbortMultipartUpload
                - s3:GetBucketLocation
                - s3:GetObject
                - s3:ListBucket
                - s3:ListBucketMultipartUploads
                - s3:PutObject
                Resource: '*'
    ServerlessKinesisFirehoseBucket:
      Type: AWS::S3::Bucket
      DeletionPolicy: Retain
      Properties:
        BucketName: ${self:provider.environment.PERSIST_S3_BUCKET_NAME}
    ServerlessKinesisFirehose:
      Type: AWS::KinesisFirehose::DeliveryStream
      Properties:
        DeliveryStreamName: ${self:provider.environment.DELIVERY_STREAM_NAME}
        S3DestinationConfiguration:
          BucketARN:
            Fn::Join:
            - ''
            - - 'arn:aws:s3:::'
              - Ref: ServerlessKinesisFirehoseBucket
          BufferingHints:
            IntervalInSeconds: "60"
            SizeInMBs: "1"
          CompressionFormat: "UNCOMPRESSED"
          Prefix: "raw/"
          RoleARN: { Fn::GetAtt: [ FirehoseToS3Role, Arn ] }

The util.ts to send out the string to firehose is as easy as follow:
import * as AWS from 'aws-sdk';

const region = process.env.REGION;
const deliveryStreamName = process.env.DELIVERY_STREAM_NAME;

const firehose = new AWS.Firehose( { region: region } );

export const sendFirehose = async (bodyMsg: string) => {
    const params = {
        DeliveryStreamName: deliveryStreamName,
        Record: {
          Data: new Buffer(bodyMsg)
        }
    };
    return firehose.putRecord(params).promise()
};

Handler to receive the POST events
import { SNSEvent, Handler, APIGatewayEvent, Context, Callback } from 'aws-lambda';
import { get } from 'lodash';
import { EventAction } from ‘@sillycat/eventbus';
import { sendFirehose } from './util';

export const postEvents: Handler = async (event: APIGatewayEvent, context: Context, callBack: Callback) => {
  console.log("event received:" + JSON.stringify(event));
  try {
      await sendFirehose(JSON.stringify(event.body));
      callBack(null, { 'statusCode': 200, 'body': 'Successful POST' });
  } catch (err) {
      callBack(err);
  }
}


References:
https://stackoverflow.com/questions/55714834/push-from-lambda-to-s3-or-push-from-lambda-to-kinesis-firehose-to-s3
https://fivetran.com/docs/files/aws-kinesis
https://github.com/SumoLogic/sumologic-aws-lambda/tree/master/kinesisfirehose-processor
https://towardsdatascience.com/delivering-real-time-streaming-data-to-amazon-s3-using-amazon-kinesis-data-firehose-2cda5c4d1efe
https://aws.amazon.com/blogs/compute/amazon-kinesis-firehose-data-transformation-with-aws-lambda/
https://aws.amazon.com/kinesis/data-firehose/faqs/
https://github.com/alexcasalboni/serverless-data-pipeline-sam

NodeJS Document
https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Firehose.html
Old Example
https://github.com/otofu-square/serverless-kinesis-firehose
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics