Handling Multiple AWS Lambda Event Types with Go

Share

Here at Crimson Macaw, we have typically created AWS Lambda functions in Python and used the abstract factory design pattern as a way to handle different AWS event structures. Achieving the same principle within Go requires a different approach.

Being able to trigger a lambda function from multiple sources gives great flexibility to deploy our solutions into different configurations depending on our clients' requirements.

An excellent example of this is a lambda function that handles AWS S3 events; this can be done either by directly triggering from S3 or by sending S3 events to AWS SNS or AWS SQS and then subsequently setting up as an event source to the lambda function.

Expected Data Structures

When S3, SNS or SQS trigger a lambda function, the JSON structure contains a top-level Records array; however, the structure of each will differ depending on the source.

{
  "Records": []
}

Direct From S3

In this setup, an S3 bucket is configured for its events to invoke a Lambda function directly.

Diagram showing AWS Lambda Function triggered by AWS S3

When S3 asynchronously invokes the Lambda function, an example of the JSON structure would be:

{
  "Records": [
    {
      "eventVersion": "2.1",
      "eventSource": "aws:s3",
      "awsRegion": "eu-west-1",
      "eventTime": "2020-04-05T19:37:27.192Z",
      "eventName": "ObjectCreated:Put",
      "userIdentity": {
        "principalId": "AWS:AIDAINPONIXQXHT3IKHL2"
      },
      "requestParameters": {
        "sourceIPAddress": "205.255.255.255"
      },
      "responseElements": {
        "x-amz-request-id": "D82B88E5F771F645",
        "x-amz-id-2": "vlR7PnpV2Ce81l0PRw6jlUpck7Jo5ZsQjryTjKlc5aLWGVHPZLj5NeC6qMa0emYBDXOo6QBU0Wo="
      },
      "s3": {
        "s3SchemaVersion": "1.0",
        "configurationId": "828aa6fc-f7b5-4305-8584-487c791949c1",
        "bucket": {
          "name": "lambda-artifacts-deafc19498e3f2df",
          "ownerIdentity": {
            "principalId": "A3I5XTEXAMAI3E"
          },
          "arn": "arn:aws:s3:::lambda-artifacts-deafc19498e3f2df"
        },
        "object": {
          "key": "b21b84d653bb07b05b1e6b33684dc11b",
          "size": 1305107,
          "eTag": "b21b84d653bb07b05b1e6b33684dc11b",
          "sequencer": "0C0F6F405D6ED209E1"
        }
      }
    }
  ]
}

Referenced from https://docs.aws.amazon.com/lambda/latest/dg/with-s3.html

AWS S3 events via AWS SNS

In this setup, S3 was configured to publish events to an SNS Topic. The Lambda function is added as a subscriber of the SNS Topic.

Diagram showing AWS Lambda Function being triggered by AWS S3 via AWS SNS

When events occur on the S3 bucket, messages are pushed to the SNS Topic in the same format as above.

When SNS asynchronously invokes the Lambda function, after S3 has published a message, an example of the JSON structure would be:

{
  "Records": [
    {
      "EventVersion": "1.0",
      "EventSubscriptionArn": "arn:aws:sns:us-east-2:123456789012:sns-lambda:21be56ed-a058-49f5-8c98-aedd2564c486",
      "EventSource": "aws:sns",
      "Sns": {
        "SignatureVersion": "1",
        "Timestamp": "2020-04-05T19:37:27.318Z",
        "Signature": "tcc6faL2yUC6dgZdmrwh1Y4cGa/ebXEkAi6RibDsvpi+tE/1+82j...65r==",
        "SigningCertUrl": "https://sns.us-east-2.amazonaws.com/SimpleNotificationService-ac565b8b1a6c5d002d285f9598aa1d9b.pem",
        "MessageId": "95df01b4-ee98-5cb9-9903-4c221d41eb5e",
        "Message": "{\"Records\":[{\"eventVersion\":\"2.1\",\"eventSource\":\"aws:s3\",\"awsRegion\":\"eu-west-1\",\"eventTime\":\"2020-04-05T19:37:27.192Z\",\"eventName\":\"ObjectCreated:Put\",\"userIdentity\":{\"principalId\":\"AWS:AIDAINPONIXQXHT3IKHL2\"},\"requestParameters\":{\"sourceIPAddress\":\"205.255.255.255\"},\"responseElements\":{\"x-amz-request-id\":\"D82B88E5F771F645\",\"x-amz-id-2\":\"vlR7PnpV2Ce81l0PRw6jlUpck7Jo5ZsQjryTjKlc5aLWGVHPZLj5NeC6qMa0emYBDXOo6QBU0Wo=\"},\"s3\":{\"s3SchemaVersion\":\"1.0\",\"configurationId\":\"828aa6fc-f7b5-4305-8584-487c791949c1\",\"bucket\":{\"name\":\"lambda-artifacts-deafc19498e3f2df\",\"ownerIdentity\":{\"principalId\":\"A3I5XTEXAMAI3E\"},\"arn\":\"arn:aws:s3:::lambda-artifacts-deafc19498e3f2df\"},\"object\":{\"key\":\"b21b84d653bb07b05b1e6b33684dc11b\",\"size\":1305107,\"eTag\":\"b21b84d653bb07b05b1e6b33684dc11b\",\"sequencer\":\"0C0F6F405D6ED209E1\"}}}]}",
        "MessageAttributes": {},
        "Type": "Notification",
        "UnsubscribeUrl": "https://sns.eu-west-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:eu-west-1:123456789012:test-lambda:21be56ed-a058-49f5-8c98-aedd2564c486",
        "TopicArn":"arn:aws:sns:eu-west-1:123456789012:sns-lambda",
        "Subject": "TestInvoke"
      }
    }
  ]
}

Referenced from https://docs.aws.amazon.com/lambda/latest/dg/with-sns.html

As you can see, the S3 event is the original JSON structure but encoded as a string in the Message property.

AWS S3 events via AWS SQS

In this setup, S3 was configured to send events to an SQS Queue. The SQS Queue is added as an event source to the Lambda function.

Diagram showing AWS Lambda Function being triggered by AWS S3 via AWS SQS

When events occur on the S3 bucket, messages are pushed to the SQS Queue in the same format as above.

When SNS synchronously invokes the Lambda function, after S3 has sent a message, an example of the JSON structure would be:

{
  "Records": [
    {
      "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
      "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
      "body": "{\"Records\":[{\"eventVersion\":\"2.1\",\"eventSource\":\"aws:s3\",\"awsRegion\":\"eu-west-1\",\"eventTime\":\"2020-04-05T19:37:27.192Z\",\"eventName\":\"ObjectCreated:Put\",\"userIdentity\":{\"principalId\":\"AWS:AIDAINPONIXQXHT3IKHL2\"},\"requestParameters\":{\"sourceIPAddress\":\"205.255.255.255\"},\"responseElements\":{\"x-amz-request-id\":\"D82B88E5F771F645\",\"x-amz-id-2\":\"vlR7PnpV2Ce81l0PRw6jlUpck7Jo5ZsQjryTjKlc5aLWGVHPZLj5NeC6qMa0emYBDXOo6QBU0Wo=\"},\"s3\":{\"s3SchemaVersion\":\"1.0\",\"configurationId\":\"828aa6fc-f7b5-4305-8584-487c791949c1\",\"bucket\":{\"name\":\"lambda-artifacts-deafc19498e3f2df\",\"ownerIdentity\":{\"principalId\":\"A3I5XTEXAMAI3E\"},\"arn\":\"arn:aws:s3:::lambda-artifacts-deafc19498e3f2df\"},\"object\":{\"key\":\"b21b84d653bb07b05b1e6b33684dc11b\",\"size\":1305107,\"eTag\":\"b21b84d653bb07b05b1e6b33684dc11b\",\"sequencer\":\"0C0F6F405D6ED209E1\"}}}]}",
      "attributes": {
        "ApproximateReceiveCount": "1",
        "SentTimestamp": "1586111847318",
        "SenderId": "AIDAIENQZJOLO23YVJ4VO",
        "ApproximateFirstReceiveTimestamp": "15861118483091"
      },
      "messageAttributes": {},
      "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
      "eventSource": "aws:sqs",
      "eventSourceARN": "arn:aws:sqs:eu-west-1:123456789012:my-queue",
      "awsRegion": "eu-west-1"
    }
  ]
}

Referenced from https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html

Just like when SNS is used, the S3 event is the original JSON structure but encoded as a string in the body of the SQS message.

AWS Lambda in Python

Decoding the variant structures in Python is relatively simple, as the event arrives as a dict, then using Python classes and **kwargs on a class's __init__ method is enough to recursively decode the structure.

An incomplete example:

from abc import ABC

class Record(ABC):
  @staticmethod
  def factory(record: dict) -> 'Record':
    try:
      return S3Record(**record)
    except (KeyError, ValueError):
      pass

    try:
      return SNSRecord(**record)
    except (KeyError, ValueError):
      pass

    try:
      return SQSRecord(**record)
    except (KeyError, ValueError):
      pass

    raise TypeError


class S3Record(Record):
  __slots__ = ('event_source',)

  def __init__(self, **kwargs):
    self.event_source = kwargs.pop('eventSource')
    if 'aws:s3' != self.event_source:
      raise ValueError


class SNSRecord(Record):
  __slots__ = ('event_source',)

  def __init__(self, **kwargs):
    self.event_source = kwargs.pop('EventSource')
    if 'aws:sns' != self.event_source:
      raise ValueError


class SQSRecord(Record):
  __slots__ = ('event_source',)

  def __init__(self, **kwargs):
    self.event_source = kwargs.pop('eventSource')
    if 'aws:sns' != self.event_source:
      raise ValueError

AWS Lambda in Go

Using the AWS Lambda for Go library, it is not so simple as the event structure is already pre-decoded using the encoding/json library internally.

Fortunately, the library already contains the expected data structures as struct types with the json tags needed for decoding. These can be used for much of the leg work, but to handle the variant structure then the Unmarshaller interface needs to be implemented on the data type used on the lambda handle function.

Setup the main

If you are unfamiliar with writing Lambda functions in Go, then I would recommend reading the AWS Lambda Function Handler in Go documentation.

I create a custom structure data type called Event, for which I know that will always contain, be a dictionary with a field called Records.

Each Record will always be about an S3 event, but its source could come from any of the above. To keep the original data, I'll store the event source information and if an SNS or SQS payload is received, then save that too within additional fields.

package main

import (
  "context"
  "github.com/aws/aws-lambda-go/events"
  "github.com/aws/aws-lambda-go/lambda"
)

//Record each data record
type Record struct {
  EventSource    string
  EventSourceArn string
  AWSRegion      string
  S3             events.S3Entity
  SQS            events.SQSMessage
  SNS            events.SNSEntity
}

//Event incoming event
type Event struct {
  Records []Record
}

func handle(ctx context.Context, event Event) {
  // execute the lambda
}

func main() {
  lambda.Start(handle)
}

Implement the Unmarshaller interface

The Unmarshaller interface only requires a single function. Initially, the most apparent type to implement this would be on a Record, however as an SNS or SQS event could theoretically have multiple S3 records internally then the function must exist on the Event type.

func (event *Event) UnmarshalJSON(data []byte) error {
}

Detecting the Event Source

Before I decode the data, first, the structure type needs to be detected. I add another function on an *Event to get the event type; our actual implementation of this has much more error handling, but a simplified version would be:

type eventType int

const (
	unknownEventType eventType = iota
	s3EventType
	snsEventType
	sqsEventType
)

func (event *Event) getEventType(data []byte) eventType {
	temp := make(map[string]interface{})
	json.Unmarshal(data, &temp)

	recordsList, _ := temp["Records"].([]interface{})
	record, _ := recordsList[0].(map[string]interface{})

	var eventSource string

	if es, ok := record["EventSource"]; ok {
		eventSource = es.(string)

	} else if es, ok := record["eventSource"]; ok {
		eventSource = es.(string)
	}

	switch eventSource {
	case "aws:s3":
		return s3EventType
	case "aws:sns":
		return snsEventType
	case "aws:sqs":
		return sqsEventType
	}

	return unknownEventType
}

The first step in the UnmarshalJSON function is now to call the getEventType function.

func (event *Event) UnmarshalJSON(data []byte) error {
  eType := event.getEventType(data)
}

Mapping the Data

Now that I know the event type, I can safely use the event structures inside the AWS Lambda for Go package as these have json struct tags for decoding the data.

func (event *Event) UnmarshalJSON(data []byte) error {
  var err error

  switch event.getEventType(data) {
	case s3EventType:
		s3Event := &events.S3Event{}
		err = json.Unmarshal(data, s3Event)

		if err == nil {
			return event.mapS3EventRecords(s3Event)
		}

	case snsEventType:
		snsEvent := &events.SNSEvent{}
		err = json.Unmarshal(data, snsEvent)

		if err == nil {
			return event.mapSNSEventRecords(snsEvent)
		}

	case sqsEventType:
		sqsEvent := &events.SQSEvent{}
		err = json.Unmarshal(data, sqsEvent)

		if err == nil {
			return event.mapSQSEventRecords(sqsEvent)
		}
	}

  return err
}

In the above, I have referenced some map functions. These take each source event and map each record to our Record structure.

Mapping S3 Records

The mapS3EventRecords function is the simplest, each S3EventRecord can be directly mapped to my Record structure.

func (event *Event) mapS3EventRecords(s3Event *events.S3Event) error {
	event.Records = make([]Record, 0)

	for _, s3Record := range s3Event.Records {
		event.Records = append(event.Records, Record{
			EventSource:    s3Record.EventSource,
			EventSourceArn: s3Record.S3.Bucket.Arn,
			AWSRegion:      s3Record.AWSRegion,
			S3:             s3Record.S3,
		})
	}

	return nil
}

Mapping SNS Records

The mapSNSEventRecords function requires a little bit extra.

An SNS event does not contain any region information in its payload, by using arn.Parse function available in the AWS SDK for Go I can extract the region from SNS TopicArn.

Also, I use the json.Unmarshal function to decode the SNS message to an S3Event. As this, itself holds an array, the overall mapping of a single SNSEventRecord can produce multiple records.

You may also notice the use of github.com/pkg/errors package here!

func (event *Event) mapSNSEventRecords(snsEvent *events.SNSEvent) error {
	event.Records = make([]Record, 0)

	for _, snsRecord := range snsEvent.Records {
		// decode sns message to s3 event
		s3Event := &events.S3Event{}
		err := json.Unmarshal([]byte(snsRecord.SNS.Message), s3Event)
		if err != nil {
			return errors.Wrap(err, "Failed to decode sns message to an S3 event")
		}

		if len(s3Event.Records) == 0 {
			return errors.New("S3 Event Records is empty")
		}

		for _, s3Record := range s3Event.Records {
			topicArn, err := arn.Parse(snsRecord.SNS.TopicArn)
			if err != nil {
				return err
			}

			event.Records = append(event.Records, Record{
				EventSource:    snsRecord.EventSource,
				EventSourceArn: snsRecord.SNS.TopicArn,
				AWSRegion:      topicArn.Region,
				SNS:            snsRecord.SNS,
				S3:             s3Record.S3,
			})
		}
	}

	return nil
}

Mapping SQS Records

The mapSQSEventRecords is similar to mapSNSEventRecords, except that the region is part of a SQS structure.

func (event *Event) mapSQSEventRecords(sqsEvent *events.SQSEvent) error {
	event.Records = make([]Record, 0)

	for _, sqsRecord := range sqsEvent.Records {
		// decode sqs body to s3 event
		s3Event := &events.S3Event{}
		err := json.Unmarshal([]byte(sqsRecord.Body), s3Event)
		if err != nil {
			return errors.Wrap(err, "Failed to decode sqs body to an S3 event")
		}

		if len(s3Event.Records) == 0 {
			return errors.New("S3 Event Records is empty")
		}

		for _, s3Record := range s3Event.Records {
			event.Records = append(event.Records, Record{
				EventSource:    sqsRecord.EventSource,
				EventSourceArn: sqsRecord.EventSourceARN,
				AWSRegion:      sqsRecord.AWSRegion,
				SQS:            sqsRecord,
				S3:             s3Record.S3,
			})
		}
	}

	return nil
}

Conclusion

As you can see from the above, by using the internal functionality of Go's encoding/json package, it is possible to populate your lambda function's event type dynamically.

This approach, of course, can be expanded for any underlying data type you wish.

Thanks for reading!