https://serverless-stack.com/ logo
#sst
Title
# sst
e

Erik Robertson

06/10/2022, 10:08 AM
I'm curious if anyone has already setup a Kinesis Data Firehose using SST ? I'm simply looking for a straightforward way to log audit data into S3 and if needed query it using Athena later down the road. Didn't find any example for this.
a

Arpad

06/10/2022, 10:28 AM
Not familiar with Kinesis, but looks like there's a construct in the CDK that seems to do what you need, however it is experimental: https://docs.aws.amazon.com/cdk/api/v2/docs/aws-kinesisfirehose-alpha-readme.html
e

Erik Robertson

06/10/2022, 2:10 PM
@Jonathan Poissant those links don't work for me (I do have a bitbucket account). Are you sure it's a public repository ?
j

Jonathan Poissant

06/10/2022, 2:34 PM
Sorry it’s an company internal repo
Here is the Firehose config
Copy code
# Firehose used to handle writing all data into S3
  IoTFirehoseDataLake:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      DeliveryStreamName: !Sub ${namespace}-IoTFirehose
      DeliveryStreamType: DirectPut
      ExtendedS3DestinationConfiguration:
        BucketARN: !Ref dataLakeBucketArn
        BufferingHints:
          IntervalInSeconds: 300
          SizeInMBs: 32
        CompressionFormat: GZIP
        Prefix: "iot/"
        RoleARN: !GetAtt IoTFirehoseDataLakeStreamRole.Arn
e

Erik Robertson

06/10/2022, 6:04 PM
Thanks @Arpad, trying to use the CDK construct you pointed me to
t

Timothée Clain

06/17/2022, 6:54 PM
@Erik Robertson it works pretty well
Copy code
const glueRole = new Role(this, "glue-catalog-role", {
      assumedBy: new ServicePrincipal("<http://glue.amazonaws.com|glue.amazonaws.com>"),
      managedPolicies: [
        ManagedPolicy.fromAwsManagedPolicyName(
          "service-role/AWSGlueServiceRole"
        ),
      ],
      inlinePolicies: {
        readS3Files: new PolicyDocument({
          statements: [
            new PolicyStatement({
              actions: ["s3:*"],
              effect: Effect.ALLOW,
              resources: [`${this.datalakeBucket.s3Bucket.bucketArn}/*`],
            }),
          ],
        }),
      },
    });

    const eventsCrawler = new Glue.CfnCrawler(
      this,
      "crawler",
      {
        role: glueRole.roleArn,
        schedule: {
          scheduleExpression: "cron(0/30 * * * ? *)",
        },

        databaseName: "lake",
        targets: {
          s3Targets: [
            {
              path: `s3://${datalakeBucket.bucketName}/${rootS3Paths.platformEvents}`,
            },
          ],
        },
      }
    );

    // athena
    const athenaWg = new Athena.CfnWorkGroup(this, `athenawg-${scope.stage}`, {
      name: `pillardatalake-${scope.stage}`,
      workGroupConfiguration: {
        enforceWorkGroupConfiguration: true,
        publishCloudWatchMetricsEnabled: true,
        resultConfiguration: {
          outputLocation: `s3://${datalakeBucket.bucketName}/athena-results`,
        },
      },
    });

    // kinesis
    const productEventsdatalakeBucketDestination =
      // FIXME: type are not compatible
      new KinesisFirehoseDestinationsConstruct.S3Bucket(
        datalakeBucket.s3Bucket,
        {
          dataOutputPrefix: `${rootS3Paths.platformEvents}/`,
        }
      );

    // event streams
    this.productEventStream = new sst.KinesisStream(
      this,
      `productEventsStream-${scope.stage}`,
      {
        kinesisStream: {
          streamName: `productEventsStream-${scope.stage}`,
        },
      }
    );
    // s3 delivery routing for product events
    new KinesisFirehoseConstruct.DeliveryStream(this, "Delivery Stream", {
      sourceStream: this.productEventStream.kinesisStream,
      destinations: [productEventsdatalakeBucketDestination],
    });
  }
// a snippet with support for athena / kinesis / s3 / firehose. hope it’s helpful
there is a glue job that auto infer the schema (please note there are cost associated with this setup)
6 Views