Azure Service-Bus

The Azure Service Bus integration publishes all the events to a Service Bus Topic or Queue to which applications can subscribe.


The Azure Service Bus integration exposes all events as documented by Event types.

User properties

The following user properties are added to each published message:

  • event - the event type
  • dev_eui - the device EUI
  • application_id - the ChirpStack Application Server application ID

Example code

The following code example demonstrates how to consume integration events using an Azure Service-Bus Queue.



package main

import (

	servicebus ""


type handler struct {
	json bool

	ns    *servicebus.Namespace
	queue *servicebus.Queue

func (h *handler) receive() error {
	for {
		if err := h.queue.Receive(context.TODO(), servicebus.HandlerFunc(h.receiveFunc)); err != nil {
			return err

func (h *handler) receiveFunc(ctx context.Context, msg *servicebus.Message) error {
	ev, ok := msg.UserProperties["Event"]
	if !ok {
		log.Println("event attribute is missing")

	event, ok := ev.(string)
	if !ok {
		log.Println("event must be of type string")

	var err error

	switch event {
	case "up":
		err = h.up(msg.Data)
	case "join":
		err = h.join(msg.Data)
		log.Printf("handler for event %s is not implemented", event)

	if err != nil {
		log.Printf("handling event '%s' returned error: %s", event, err)

	return msg.Complete(ctx)

func (h *handler) up(b []byte) error {
	var up integration.UplinkEvent
	if err := h.unmarshal(b, &up); err != nil {
		return err
	log.Printf("Uplink received from %s with payload: %s", up.GetDeviceInfo().DevEui, hex.EncodeToString(up.Data))
	return nil

func (h *handler) join(b []byte) error {
	var join integration.JoinEvent
	if err := h.unmarshal(b, &join); err != nil {
		return err
	log.Printf("Device %s joined with DevAddr %s", join.GetDeviceInfo().DevEui, join.DevAddr)
	return nil

func (h *handler) unmarshal(b []byte, v proto.Message) error {
	if h.json {
		return protojson.UnmarshalOptions{
			DiscardUnknown: true,
			AllowPartial:   true,
		}.Unmarshal(b, v)
	return proto.Unmarshal(b, v)

func newHandler(json bool, connStr string, queueName string) (*handler, error) {
	ns, err := servicebus.NewNamespace(
	if err != nil {

	queue, err := ns.NewQueue(queueName)
	if err != nil {

	return &handler{
		json:  json,
		ns:    ns,
		queue: queue,
	}, nil


func main() {
	h, err := newHandler(
		// set true when using JSON encoding

		// service-bus connection string

		// queue name
	if err != nil {


from azure.servicebus import ServiceBusClient
from chirpstack_api import integration
from google.protobuf.json_format import Parse

class Handler:
    def __init__(self, json, connection_string, queue_name):
        self.json = json
        self.connection_string = connection_string
        self.queue_name = queue_name

    def receive(self):
        client = ServiceBusClient.from_connection_string(self.connection_string)
        queue_client = client.get_queue(self.queue_name)
        messages = queue_client.get_receiver()

        for message in messages:
            body = b''.join(message.body)

            event = message.user_properties[b'Event']

            if event == b'up':
            elif event == b'join':
                print('handler for event %s is not implemented' % event)

    def up(self, body):
        up = self.unmarshal(body, integration.UplinkEvent())
        print('Uplink received from: %s with payload: %s' % (up.device_info.dev_eui,

    def join(self, body):
        join = self.unmarshal(body, integration.JoinEvent())
        print('Device: %s joined with DevAddr: %s' % (join.device_info.dev_eui.hex, join.dev_addr))

    def unmarshal(self, body, pl):
        if self.json:
            return Parse(body, pl)
        return pl

h = Handler(
    # True -  JSON marshaler
    # False - Protobuf marshaler (binary)

    # Service-Bus connection string

    # Service-Bus queue name