AWS SNS

The Simple Notification Service (SNS) integration publishes all the events to a SNS Topic to which other applications or AWS services can subscribe for further processing.

Events

The AWS Simple Notification Service integration exposes all events as documented by Event types.

Message attributes

The following message attributes are added to each published message:

  • event - the event type
  • dev_eui - the device EUI
  • application_id - the ChirpStack Application Server application ID

Example code

The following code example demonstrates how to consume integration events using an AWS SQS subscription.

!!! important Make sure the Enable raw message delivery option is enabled on the subscription. If not enabled, the SQS messages will not have the expected attributes.

Go

main.go:

package main

import (
	"encoding/base64"
	"encoding/hex"
	"log"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/credentials"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/sqs"
	"google.golang.org/protobuf/encoding/protojson"
	"google.golang.org/protobuf/proto"

	"github.com/chirpstack/chirpstack/api/go/v4/integration"
)

type handler struct {
	json bool

	sqs      *sqs.SQS
	queueURL string
}

func (h *handler) receive() error {
	for {
		result, err := h.sqs.ReceiveMessage(&sqs.ReceiveMessageInput{
			MessageAttributeNames: []*string{
				aws.String(sqs.QueueAttributeNameAll),
			},
			QueueUrl:            &h.queueURL,
			MaxNumberOfMessages: aws.Int64(1),
		})
		if err != nil {
			return err
		}

		for _, msg := range result.Messages {
			_, err := h.sqs.DeleteMessage(&sqs.DeleteMessageInput{
				QueueUrl:      &h.queueURL,
				ReceiptHandle: msg.ReceiptHandle,
			})
			if err != nil {
				log.Printf("delete message error: %s", err)
			}

			event, ok := msg.MessageAttributes["event"]
			if !ok || event.StringValue == nil {
				log.Printf("event attribute is missing")
				continue
			}

			switch *event.StringValue {
			case "up":
				err = h.up(*msg.Body)
			case "join":
				err = h.join(*msg.Body)
			default:
				log.Printf("handler for event %s is not implemented", *event.StringValue)
				err = nil
			}

			if err != nil {
				log.Printf("handling event '%s' returned error: %s", *event.StringValue, err)
			}

		}
	}

	return nil
}

func (h *handler) up(body string) error {
	var up integration.UplinkEvent
	if err := h.unmarshal(body, &up); err != nil {
		return err
	}
	log.Printf("Uplink received from %s with payload: %s", up.GetDeviceInfo().DevEui, hex.EncodeToString(up.Data))
	return nil
}

func (h *handler) join(body string) error {
	var join integration.JoinEvent
	if err := h.unmarshal(body, &join); err != nil {
		return err
	}
	log.Printf("Device %s joined with DevAddr %s", join.GetDeviceInfo().DevEui, join.DevAddr)
	return nil
}

func (h *handler) unmarshal(body string, v proto.Message) error {
	if h.json {
		return protojson.UnmarshalOptions{
			DiscardUnknown: true,
			AllowPartial:   true,
		}.Unmarshal([]byte(body), v)
	}

	b, err := base64.StdEncoding.DecodeString(body)
	if err != nil {
		return err
	}

	return proto.Unmarshal(b, v)
}

func newHandler(json bool, accessKeyID, secretAccessKey, region, queueURL string) (*handler, error) {
	sess, err := session.NewSession(&aws.Config{
		Region:      aws.String(region),
		Credentials: credentials.NewStaticCredentials(accessKeyID, secretAccessKey, ""),
	})
	if err != nil {
		return nil, err
	}

	return &handler{
		json:     json,
		sqs:      sqs.New(sess),
		queueURL: queueURL,
	}, nil
}

func main() {
	h, err := newHandler(
		// set true when using JSON encoding
		false,

		// AWS AccessKeyID
		"...",

		// AWS SecretAccessKey
		"...",

		// AWS region
		"eu-west-1",

		// SQS queue url
		"https://sqs...",
	)
	if err != nil {
		panic(err)
	}

	panic(h.receive())
}

Python

Please refer to the Boto3 Configuration for setting up the API credentials.

main.py:

import boto3
from chirpstack_api import integration
from google.protobuf.json_format import Parse


class Handler:
    def __init__(self, json, queue_url):
        self.json = json
        self.queue_url = queue_url

    def receive(self):
        sqs = boto3.client('sqs')

        while True:
            resp = sqs.receive_message(
                QueueUrl=self.queue_url,
                MessageAttributeNames=[
                    'All',
                ],
                MaxNumberOfMessages=1,
                WaitTimeSeconds=10,
            )

            if not 'Messages' in resp:
                continue

            msg = resp['Messages'][0]
            receipt_handle = msg['ReceiptHandle']

            sqs.delete_message(
                QueueUrl=self.queue_url,
                ReceiptHandle=receipt_handle,
            )

            event = msg['MessageAttributes']['event']['StringValue']

            if event == "up":
                self.up(msg['Body'])
            elif event == "join":
                self.join(msg['Body'])
            else:
                print('handler for event %s is not implemented' % event)

    def up(self, body):
        up = self.unmarshal(body, integration.UplinkEvent())
        print('Uplink received from: %s with payload: %s' % (up.device_info.dev_eui, up.data.hex()))

    def join(self, body):
        join = self.unmarshal(body, integration.JoinEvent())
        print('Device: %s joined with DevAddr: %s' % (join.device_info.dev_eui, join.dev_addr))

    def unmarshal(self, body, pl):
        if self.json:
            return Parse(body, pl)
        
        pl.ParseFromString(body)
        return pl


h = Handler(
    # True -  JSON marshaler
    # False - Protobuf marshaler (binary)
    False,

    # SQS queue url
    'https://sqs....',
)
h.receive()