~derf / interblag / entry / Monitoring The Things Indoor Gateway (TTIG) via InfluxDB
dark mode

The Things Indoor Gateway (TTIG) is an affordable LoRaWAN gateway, ideal for getting started with The Things Network or other setups. Here are two ways of monitoring its radio performance and feeding data into e.g. InfluxDB, so you can display the results in a small Grafana dashboard.

TTN Gateway Server API

The Things Stack's Gateway Server API allows requesting uplink and downlink stats of a gateway if you have an appropriate API key.

First, you need to navigate to the gateway page in your TTN console and create a new API key with “View gateway status” rights. Using this key and your gateway ID, you can request connection statistics:

> curl -H "Authorization: Bearer GATEWAY_KEY" \
  https://eu1.cloud.thethings.network/api/v3/gs/gateways/GATEWAY_ID/connection/stats | jq
{
  "last_uplink_received_at": "2021-09-12T11:00:41.490891018Z",
  "uplink_count": "115",
  "last_downlink_received_at": "2021-09-12T00:05:45.008438327Z",
  "downlink_count": "2",
}

With a cronjob running every few minutes, you can pass the data to InfluxDB. I'm using the following Python script for this:

#!/usr/bin/env python3
# vim:tabstop=4 softtabstop=4 shiftwidth=4 textwidth=160 smarttab expandtab colorcolumn=160

import requests

def main(auth_token, gateway_id):
    response = requests.get(
        f"https://eu1.cloud.thethings.network/api/v3/gs/gateways/{gateway_id}/connection/stats",
        headers={
            "Authorization": "Bearer {auth_token}"
        },
    )

    data = response.json()

    uplink_count = data.get("uplink_count", 0)
    downlink_count = data.get("downlink_count", 0)

    requests.post(
        "http://influxdb:8086/write?db=hosts",
        f"ttn_gateway,name={gateway_id} uplink_count={uplink_count},downlink_count={downlink_count}",
    )


if __name__ == "__main__":
    main("GATEWAY_KEY", "GATEWAY_ID")

It's also possible to assign “Read gateway traffic” rights to an API key. I didn't play around with that yet.

USB-UART Logs

By soldering a 1kΩ resistor onto R86 on the TTIG PCB, you can enable its built-in CP2102N USB-UART converter. This allows you to use the USB port not just for power, but also for observing its debug output. See Xose Pérez' Hacking the TTI Indoor Gateway blog post for details.

With this hack, connecting the TTIG to a linux computer capable of sourcing up to 900mA via USB will cause a /dev/ttyUSB serial interface to apper. You can use tools such as screen or picocom with a baud rate of 115200 to observe the output. Apart from memory usage and time synchronization logs, it includes a line similar to the following one for each received LoRa transmission:

RX 868.3MHz DR5 SF7/BW125 snr=9.0 rssi=-46 xtime=0x43000FB11517C3 - updf mhdr=40 DevAddr=01234567 FCtrl=00 FCnt=502 FOpts=[] 0151B4 mic=-1842874694 (15 bytes)

So you can log statistics about Received Signal Strength, Signal-to-Noise Ratio, Spreading Factor and similar.

The Python script I'm using for this is somewhat more involved:

#!/usr/bin/env python3
# vim:tabstop=4 softtabstop=4 shiftwidth=4 textwidth=160 smarttab expandtab colorcolumn=160

import re
import requests
import serial
import serial.threaded
import sys
import time


class SerialReader(serial.threaded.Protocol):
    def __init__(self, callback):
        self.callback = callback
        self.recv_buf = ""

    def __call__(self):
        return self

    def data_received(self, data):
        try:
            str_data = data.decode("UTF-8")
            self.recv_buf += str_data

            lines = self.recv_buf.split("\n")
            if len(lines) > 1:
                self.recv_buf = lines[-1]
                for line in lines[:-1]:
                    self.callback(str.strip(line))

        except UnicodeDecodeError:
            pass
            # sys.stderr.write('UART output contains garbage: {data}\n'.format(data = data))


class SerialMonitor:
    def __init__(self, port: str, baud: int, callback):
        self.ser = serial.serial_for_url(port, do_not_open=True)
        self.ser.baudrate = baud
        self.ser.parity = "N"
        self.ser.rtscts = False
        self.ser.xonxoff = False

        try:
            self.ser.open()
        except serial.SerialException as e:
            sys.stderr.write(
                "Could not open serial port {}: {}\n".format(self.ser.name, e)
            )
            sys.exit(1)

        self.reader = SerialReader(callback=callback)
        self.worker = serial.threaded.ReaderThread(self.ser, self.reader)
        self.worker.start()

    def close(self):
        self.worker.stop()
        self.ser.close()


if __name__ == "__main__":

    def parse_line(line):

        match = re.search(
            "RX ([0-9.]+)MHz DR([0-9]+) SF([0-9]+)/BW([0-9]+) snr=([0-9.-]+) rssi=([0-9-]+) .* DevAddr=([^ ]*)",
            line,
        )

        if match:
            requests.post(
                "http://influxdb:8086/write?db=hosts",
                data=f"ttn_rx,gateway=GATEWAY_ID,devaddr={match.group(7)} dr={match.group(2)},sf={match.group(3)},bw={match.group(4)},snr={match.group(5)},rssi={match.group(6)}",
            )

    monitor = SerialMonitor(
        "/dev/ttyUSB0",
        115200,
        parse_line,
    )

    try:
        while True:
            time.sleep(60)
    except KeyboardInterrupt:
        monitor.close()