#!/usr/bin/env python3 # vim:tabstop=4 softtabstop=4 shiftwidth=4 textwidth=160 smarttab expandtab colorcolumn=160 # Copyright © 2023 by Birte Kristina Friesel from datetime import datetime import requests hass_url = "https://homeassistant.example.org" hass_token = "enter REST API token here" influx_url = "https://user:password@example.org/write?db=hosts" strp_format = "%Y-%m-%dT%H:%M:%S.%f%z" # configure to slightly higher than this script's run interval (here: every 2 minutes) delta_seconds = 150 def get_if_recent(hass, epoch, entity_id): if not entity_id in hass: return None entity = hass[entity_id] if ( epoch - datetime.strptime(entity["last_updated"], strp_format).timestamp() > delta_seconds ): return None return entity def get_state_if_recent(hass, epoch, entity_id, getter): entity = get_if_recent(hass, epoch, entity_id) if entity is None: return list() return [getter(entity["state"])] def get_attr_if_recent(hass, epoch, entity_id, attr, getter): entity = get_if_recent(hass, epoch, entity_id) if entity is None: return list() return [getter(entity["attributes"][attr])] def main(): epoch = datetime.now().timestamp() hass_list = requests.get( f"{hass_url}/api/states", headers={"Accept": "application/json", "Authorization": f"Bearer {hass_token}"}, ).json() hass = dict() for entry in hass_list: hass[entry["entity_id"]] = entry pixel3 = list() pixel3.extend( get_state_if_recent( hass, epoch, "sensor.pixel_3_battery_level", lambda r: f"battery0_capacity_percent={r}", ) ) pixel3.extend( get_state_if_recent( hass, epoch, "sensor.pixel_3_battery_power", lambda r: f"battery0_power_uw={float(r) * 1e6}", ) ) pixel3.extend( get_attr_if_recent( hass, epoch, "sensor.pixel_3_battery_power", "voltage", lambda r: f"battery0_voltage_uv={float(r) * 1e6}", ) ) pixel3.extend( get_state_if_recent( hass, epoch, "sensor.pixel_3_battery_temperature", lambda r: f"battery0_degc={float(r)}", ) ) pixel3.extend( get_state_if_recent( hass, epoch, "sensor.pixel_3_storage_sensor", lambda r: f"__used_percent={100 - float(r)}", ) ) if pixel3: requests.post(influx_url, f"host,name=pixel3 {','.join(pixel3)}") pixel3 = list() pixel3.extend( get_state_if_recent( hass, epoch, "sensor.pixel_3_pressure_sensor", lambda r: f"pressure_hpa={r}" ) ) pixel3.extend( get_state_if_recent( hass, epoch, "sensor.pixel_3_light_sensor", lambda r: f"illuminance_lux={r}" ) ) pixel3.extend( get_state_if_recent( hass, epoch, "sensor.pixel_3_steps_sensor", lambda r: f"steps_count={r}" ) ) if pixel3: requests.post(influx_url, f"sensor,name=pixel3 {','.join(pixel3)}") if __name__ == "__main__": main()