Azure Service-Bus

The Azure Service Bus integration publishes all the events to a Service Bus Topic or Queue to which applications can subscribe.

Events

The Azure Service Bus integration exposes all events as documented by Event types.

User properties

The following user properties are added to each published message:

  • event - the event type
  • dev_eui - the device EUI
  • application_id - the ChirpStack Application Server application ID

Example code

The following code example demonstrates how to consume integration events using an Azure Service-Bus Queue.

Go

main.go:

package main

import (
	"context"
	"encoding/hex"
	"log"

	servicebus "github.com/Azure/azure-service-bus-go"
	"google.golang.org/protobuf/encoding/protojson"
	"google.golang.org/protobuf/proto"

	"github.com/chirpstack/chirpstack/api/go/v4/integration"
)

type handler struct {
	json bool

	ns    *servicebus.Namespace
	queue *servicebus.Queue
}

func (h *handler) receive() error {
	for {
		if err := h.queue.Receive(context.TODO(), servicebus.HandlerFunc(h.receiveFunc)); err != nil {
			return err
		}
	}
}

func (h *handler) receiveFunc(ctx context.Context, msg *servicebus.Message) error {
	ev, ok := msg.UserProperties["Event"]
	if !ok {
		log.Println("event attribute is missing")
	}

	event, ok := ev.(string)
	if !ok {
		log.Println("event must be of type string")
	}

	var err error

	switch event {
	case "up":
		err = h.up(msg.Data)
	case "join":
		err = h.join(msg.Data)
	default:
		log.Printf("handler for event %s is not implemented", event)
	}

	if err != nil {
		log.Printf("handling event '%s' returned error: %s", event, err)
	}

	return msg.Complete(ctx)
}

func (h *handler) up(b []byte) error {
	var up integration.UplinkEvent
	if err := h.unmarshal(b, &up); err != nil {
		return err
	}
	log.Printf("Uplink received from %s with payload: %s", up.GetDeviceInfo().DevEui, hex.EncodeToString(up.Data))
	return nil
}

func (h *handler) join(b []byte) error {
	var join integration.JoinEvent
	if err := h.unmarshal(b, &join); err != nil {
		return err
	}
	log.Printf("Device %s joined with DevAddr %s", join.GetDeviceInfo().DevEui, join.DevAddr)
	return nil
}

func (h *handler) unmarshal(b []byte, v proto.Message) error {
	if h.json {
		return protojson.UnmarshalOptions{
			DiscardUnknown: true,
			AllowPartial:   true,
		}.Unmarshal(b, v)
	}
	return proto.Unmarshal(b, v)
}

func newHandler(json bool, connStr string, queueName string) (*handler, error) {
	ns, err := servicebus.NewNamespace(
		servicebus.NamespaceWithConnectionString(connStr),
	)
	if err != nil {
		panic(err)
	}

	queue, err := ns.NewQueue(queueName)
	if err != nil {
		panic(err)
	}

	return &handler{
		json:  json,
		ns:    ns,
		queue: queue,
	}, nil

}

func main() {
	h, err := newHandler(
		// set true when using JSON encoding
		false,

		// service-bus connection string
		"Endpoint=sb://example.servicebus.windows.net/;SharedAccessKeyName=example-policy;SharedAccessKey=...",

		// queue name
		"events",
	)
	if err != nil {
		panic(err)
	}
	panic(h.receive())
}

Python

main.py:

from azure.servicebus import ServiceBusClient
from chirpstack_api import integration
from google.protobuf.json_format import Parse


class Handler:
    def __init__(self, json, connection_string, queue_name):
        self.json = json
        self.connection_string = connection_string
        self.queue_name = queue_name

    def receive(self):
        client = ServiceBusClient.from_connection_string(self.connection_string)
        queue_client = client.get_queue(self.queue_name)
        messages = queue_client.get_receiver()

        for message in messages:
            message.complete()
            body = b''.join(message.body)

            event = message.user_properties[b'Event']

            if event == b'up':
                self.up(body)
            elif event == b'join':
                self.join(body)
            else:
                print('handler for event %s is not implemented' % event)

    def up(self, body):
        up = self.unmarshal(body, integration.UplinkEvent())
        print('Uplink received from: %s with payload: %s' % (up.device_info.dev_eui, up.data.hex()))

    def join(self, body):
        join = self.unmarshal(body, integration.JoinEvent())
        print('Device: %s joined with DevAddr: %s' % (join.device_info.dev_eui.hex, join.dev_addr))

    def unmarshal(self, body, pl):
        if self.json:
            return Parse(body, pl)
        
        pl.ParseFromString(body)
        return pl


h = Handler(
    # True -  JSON marshaler
    # False - Protobuf marshaler (binary)
    False,

    # Service-Bus connection string
    'Endpoint=sb://example.servicebus.windows.net/;SharedAccessKeyName=example-policy;SharedAccessKey=...',

    # Service-Bus queue name
    'events',
)
h.receive()