# Message Service # Einleitung Dieser Service ust en einfacher cloud native Message Service, wie es ja schon so viele gibt. Zunächst einmal ist er als reiner Test für eine Go Implementierung und verschiedene Studien gedacht. Längerfristig soll dieser Service den GAP zwischen den vielen kleinen selbstgestrickten Lösungen und den doch recht umfangreichen und komplexen Servicen wie Kafka, RocketMQ, RabbitMQ... schliessen. Ein kurzer Vergleich: Kafka: Mit Abstand der am weitesten vorgeschrittene Service. Die Eierlegende Woll-Milch-Sau. Kafka kann so ziemlich alles, was man braucht und eben noch viel mehr. Leider ist der Betrieb aber auch eher schwergewichtig. Neben dem Kafka CLuster selber, wernden och weitere zahreiche Komponeneten für den Betrieb benötigt, die auch nicht gerade einfach sind. Als ein Beispiel Zookeeper. (https://kafka.apache.org/) RocketMQ: Etwas weniger komplex, aber leider doch mit dem Nameserver und Broker Konzept recht komplex, gerade für kleine Installation für meine Begriffe etwas oversized. (http://rocketmq.apache.org/) RabbitMQ: Zwar werden mich die Entwickler nun schlagen, aber RabbitMQ ist nicht Cloud Native. Dazu gibt es da viel zu viele Sonderlocken. Das Discovery der Nodes ist eher fragwürdig, vor allem, wenn es um eine Docker/Kubernetes Integration geht. (https://www.rabbitmq.com/) Am nähesten für einen solchen Service käme für einen solchen "kleinen" Service wäre NSQ. Bei diesen sind jedoch der Lookup beim Start, und die nicht vorhandene Persistenz eines Nodes zu beanstanden. Vor allem, da die Messages nicht im Cluster verteilt werden, sondern nur auf dem Node, auf dem Sie gesendet wurden gespeichert. Geht der Node hops, sind die Messages weg. Auch schwierig, der Client macht zu allen Nodes eine Verbindung auf und sucht dann in den verschiedenen Shards nach der zu konsumierenden Message. Ein Splitbrain ist da schon vorprogrammiert. Aber NSQ ist ja noch relativ jung. (V 1.2) (https://nsq.io/) # Architekturansatz Da der Service eher für kleine bis mittlere Installation vorgesehen ist, werden folgende Annahmen/Voraussetzungen gemacht. - Der Service hat keinerlei Abhängigkeiten zu anderen Servicen. Er kann somit auch stand-alone betrieben werden. - Ein Cluster bildet sich, wenn die Knoten sich gegenseitig finden. Dazu kann eine DNS/IP Table übergeben werden. Einmal im Cluster, werden die IPs aller beteiligten geshared. Ebenso wie deren Status. Beim Start kann in der Konfiguration eine Mindest-Clustergröße mitgegeben werden. Sind weniger Nodes aktiv als in der Konfiguration angegeben, wird ein Splitbrain vermutet und der Service verweigert seinen Dienst. -> unhealthy - Topics werden in shards verwaltet. Je nach Konfiguration (QoS) kann die Annahme einer Meldung entsprechend des QuS Wertes zugesichert werden. - QoS = 0: Die Nachricht ist auf dem 1. Knoten empfangen worden, keine Persistenz. - QoS = 1: Die Nachricht ist in zumindest einem Shard aufgenommen worden. - QoS = 2: Die Nachricht ist in zumindest zwei Shards (Topic Shard + Backup) aufgenommen worden. - QoS = 3: Die Nachricht wurde zusätzlich auch noch persistiert. - ein Topic ist die eigentliche Struktur. Eine MEldung wird zunächst in einem Topic vom Producer empfangen. Dann wird die Nachricht an die angeschlossenen Consumer Groups parallel weiter geleitet. Innerhalb einer Consumer Group kann dann ein Consumer diese Nachricht erhalten (lock) und nach Verarbeitung entweder zurückweisen (Reject), dann geht Sie an einen anderen Consumer, oder aber als verarbeitet (acknowledge) markieren. - Nachrichten inenrhalb eines Topics sind grundsätzlich ordered. Einzig bei der parallelen Verarbeitung innerhalb einer Consumergroup, kann die Reihenfolge variieren. (Wenn eine Consumer eine Meldung Rejected hat) - Die ID einer Message ist innerhalb eines Topics fortlaufend. - Der Client kann jederzeit auch auf bereits konsumierte Nachrichten per ID zugreifen, wenn diese nicht explizite (oder implizit) gelöscht worden sind. Auch ein Peek ist jederzeit möglich. - Consumer erhalten nur die ausgewählte Meldung. Eine Persistenz im Client ist unerwünscht. - Auch der Consumer kann auf ein Topic mit einem QoS zugreifen. - QoS = 0: Sobald der Client eine Message anfordert, wird diese automatisch als verarbeitet gekennzeichnet - QoS = 1: Der Client muss explizite ein Nachricht als verarbeitet kennzeichnen - Pro Topic können bestimmte Verhaltensweisen beim Erstellen vom Producer und Consumer eingestellt werden. - Producer: - Generell: - MAX_CAPACITY: wie viele Nachrichten kann das Topic maximal aufnehmen. Default: -1 - DEFAULT_QOS: Default QoS für alle Producer in diesem Topic - Client-Session: - DEFAULT_QOS: Default QoS für diesen Producer in diesem Topic - Consumer: - Generell - DEFAULT_QOS: Default QoS für alle Consumer in diesem Topic - Gruppe - DEFAULT_QOS: Default QoS für alle Consumer einer Gruppe in diesem Topic - DELETE_AFTER_ACKNOWLEDGE: Nach dem Acknowledge wird die Nachricht für diese Gruppe automatisch gelöscht. - Client-Session - DEFAULT_QOS: Default QoS für diesen Consumer in diesem Topic - Jeder Client hat eine eindeutige ID ## Konfiguration des Service ```yaml # port of the http server port: 8080 # port of the https server sslport: 8443 # this is the servicURL from outside serviceURL: http://127.0.0.1:8080 # this is the registry URL from inside this service for consul as service registry registryURL: # this is the system id of this service. services in a cluster mode should have the same system id. systemID: autorest-srv #sercret file for storing usernames and passwords secretfile: /tmp/storage/config/secret.yaml #where the configuration files of the backends are backendpath: configs/backends #allow data saving without a registered backend allowAnonymousBackend: true # configuration of the gelf logging server logging: gelf-url: gelf-port: # healthcheck configuration healthcheck: # automatically check the health of this service every ## seconds period: 30 ``` ## Storage Als Storage wird derzeit nur MongoDB unterstützt. Bei der Mongo Storage Implementierung werden die verschiedenen Backends allerdings in einer Datenbank abgelegt. Einzelne Modelle werden in jeweils einer Collection abgelegt. Der Collectionname besteht aus dem Backendnamen "." und dem Modellnamen. ### Hint Um eine neue Mongodatenbank anzulegen, müssen folgende Kommandos auf der Mongo Console ausgeführt werden: ```json #create a new db named backend1 use backend1 #create a db admin on db backend1 with user backend1 with password backend1 db.createUser({ user: "backend1", pwd: "backend1", roles: [ "readWrite", "dbAdmin", { role: "dbOwner", db: "backend1" } ]}) ``` ## User Folgende User mit folgenden Rollen werden automatisch angelegt: - Admin, pwd: admin, roles: admin - Editor, pwd: editor, roles: edit - guest, pwd: guest, roles: read ## REST Interface Hier nun folgt die Beschreibung des REST Interfaces. Beispielhafte REST Calls sind als Postman Collection im Ordner test/postman vorhanden. Beispielhaft werden hier alle Calls als Calls auf den lokalen Server (127.0.0.1) mit dem Port 9443 bereitgestellt. Bei einer anderen Serverinstanz bitte entsprechend ändern. ### Admin API Security: Ja, Authentifizierung derzeit als BasicAuth. Role: admin zus. Header: **X-mcs-system**: autorest-srv (Konfigurationseinstellung: systemID) **X-mcs-apikey**: {uuid} Wird beim Starten des Servers auf der Konsole ausgegeben. ``` ... 2020/04/29 08:43:04 systemid: autorest-srv 2020/04/29 08:43:04 apikey: 5854d123dd25f310395954f7c450171c 2020/04/29 08:43:04 ssl: true ... ``` #### Liste alle Backends **Request**: **GET**: https://127.0.0.1:9443/api/v1/admin/backends **Beschreibung**: Liefert eine Liste mit allen Backenddefinitioninformationen. D.h. pro Backend werden nur der Name, die Beschreibung und die URL auf die Definition ausgeliefert. **Request**: **GET**: https://127.0.0.1:9443/api/v1/admin/backends **Response**: ```JSON [ { "Name": "sensors", "Description": "sensor model für storing and retrieving sensor data", "URL": "https://127.0.0.1:9443/api/v1/admin/backends/sensors/" }, { "Name": "mybe", "Description": "", "URL": "https://127.0.0.1:9443/api/v1/admin/backends/mybe/" } ] ``` #### Definition eines Backends **GET**: https://127.0.0.1:9443/api/v1/admin/backends/{backendname}/ **Beschreibung**: Liefert die Definition eines Backends. **Request**: https://127.0.0.1:9443/api/v1/admin/backends/sensors/ **Response**: ```JSON { "backendname": "sensors", "description": "sensor model für storing and retrieving sensor data", "models": [ { "name": "temperatur", "description": "", "fields": [ { "name": "temperatur", "type": "float", "mandatory": false, "collection": false }, { "name": "source", "type": "string", "mandatory": false, "collection": false } ], "indexes": null } ], "datasources": [ { "name": "temp_wohnzimmer", "type": "mqtt", "destination": "temperatur", "rule": "tasmota_ds18b20", "config": { "broker": "127.0.0.1:1883", "topic": "stat/temperatur/wohnzimmer", "payload": "application/json", "username": "temp", "password": "temp", "addTopicAsAttribute": "topic", "simpleValueAttribute": "" } }, { "name": "temp_kueche", "type": "mqtt", "destination": "temperatur", "rule": "tasmota_ds18b20", "config": { "broker": "127.0.0.1:1883", "topic": "tele/tasmota_63E6F8/SENSOR", "payload": "application/json", "username": "temp", "password": "temp", "addTopicAsAttribute": "topic", "simpleValueAttribute": "" } } ], "rules": [ { "name": "tasmota_ds18b20", "description": "transforming the tasmota json structure of the DS18B20 into my simple structure", "transform": [ { "operation": "shift", "spec": { "TempUnit": "TempUnit", "Temperature": "DS18B20.Temperature" } } ] }, { "name": "hm_temp_simple", "description": "handle homematic temperatur rightly", "transform": [ { "operation": "shift", "spec": { "Datetime": "ts", "Temperature": "val", "Timestamp": "ts" } }, { "operation": "default", "spec": { "TempUnit": "°C" } }, { "operation": "timestamp", "spec": { "Datetime": { "inputFormat": "$unixext", "outputFormat": "2006-01-02T15:04:05-0700" } } } ] } ] } ``` #### Neues Backends anlegen **POST**: https://127.0.0.1:9443/api/v1/admin/backends/ **Beschreibung**: Liefert die Definition eines Backends. **Request**: https://127.0.0.1:9443/api/v1/admin/backends/sensors/ **Response**: **Not implemented Yet** #### Daten eines Backends löschen **DELETE**: https://127.0.0.1:9443/api/v1/admin/backends/ **Beschreibung**: Löscht alle Daten eines Backends. **Request**: https://127.0.0.1:9443/api/v1/admin/backends/sensors/dropdata **Response**: ```json { "backend": "sensors", "msg": "backend sensors deleted. All data destroyed." } ``` #### Liste aller Modelle eines Backends **GET**: https://127.0.0.1:9443/api/v1/admin/backends/{backendname}/models **Beschreibung**: Liefert eine Liste aller Modelle eines Backends. **Request**: https://127.0.0.1:9443/api/v1/admin/backends/sensors/models **Response**: ```json [ { "name": "temperatur", "description": "", "fields": [ { "name": "temperatur", "type": "float", "mandatory": false, "collection": false }, { "name": "source", "type": "string", "mandatory": false, "collection": false } ], "indexes": null } ] ``` #### Liste aller Datenquellen eines Backends **GET**: https://127.0.0.1:9443/api/v1/admin/backends/{backendname}/datasources **Beschreibung**: Liefert eine Liste aller Datenquellen eines Backends. **Request**: https://127.0.0.1:9443/api/v1/admin/backends/sensors/datasources **Response**: ```json [ { "name": "temp_wohnzimmer", "type": "mqtt", "destination": "temperatur", "rule": "tasmota_ds18b20", "config": { "broker": "127.0.0.1:1883", "topic": "stat/temperatur/wohnzimmer", "payload": "application/json", "username": "temp", "password": "temp", "addTopicAsAttribute": "topic", "simpleValueAttribute": "" } }, { "name": "temp_kueche", "type": "mqtt", "destination": "temperatur", "rule": "tasmota_ds18b20", "config": { "broker": "127.0.0.1:1883", "topic": "tele/tasmota_63E6F8/SENSOR", "payload": "application/json", "username": "temp", "password": "temp", "addTopicAsAttribute": "topic", "simpleValueAttribute": "" } } ] ``` #### Liste aller Transformationsregeln eines Backends **GET**: https://127.0.0.1:9443/api/v1/admin/backends/{backendname}/rules **Beschreibung**: Liefert eine Liste aller Transformationsregelen eines Backends. **Request**: https://127.0.0.1:9443/api/v1/admin/backends/sensors/rules **Response**: ```json [ { "name": "tasmota_ds18b20", "description": "transforming the tasmota json structure of the DS18B20 into my simple structure", "transform": [ { "operation": "shift", "spec": { "TempUnit": "TempUnit", "Temperature": "DS18B20.Temperature" } } ] }, { "name": "hm_temp_simple", "description": "handle homematic temperatur rightly", "transform": [ { "operation": "shift", "spec": { "Datetime": "ts", "Temperature": "val", "Timestamp": "ts" } }, { "operation": "default", "spec": { "TempUnit": "°C" } }, { "operation": "timestamp", "spec": { "Datetime": { "inputFormat": "$unixext", "outputFormat": "2006-01-02T15:04:05-0700" } } } ] } ] ``` #### Definition einer Transformationsregel eines Backends **GET**: https://127.0.0.1:9443/api/v1/admin/backends/{backendname}/rules/{rulename} **Beschreibung**: Definition einer Transformationsregel eines Backends. **Request**: https://127.0.0.1:9443/api/v1/admin/backends/sensors/rules/hm_temp_simple **Response**: ```json { "name": "hm_temp_simple", "description": "handle homematic temperatur rightly", "transform": [ { "operation": "shift", "spec": { "Datetime": "ts", "Temperature": "val", "Timestamp": "ts" } }, { "operation": "default", "spec": { "TempUnit": "°C" } }, { "operation": "timestamp", "spec": { "Datetime": { "inputFormat": "$unixext", "outputFormat": "2006-01-02T15:04:05-0700" } } } ] } ``` #### Testen einer Transformationsregel eines Backends **POST**: https://127.0.0.1:9443/api/v1/admin/backends/{backendname}/rules/{rulename}/test **Beschreibung**: Testen einer Transformationsregel eines Backends. **Request**: https://127.0.0.1:9443/api/v1/admin/backends/sensors/rules/hm_temp_simple/test Payload: ```json { "val": 22.8, "ts": 1588142598973, "lc": 1588142598973 } ``` **Response**: ```json { "Timestamp": 1588142598973, "Datetime": "2020-04-29T08:43:18+0200", "Temperature": 22.8, "TempUnit": "°C" } ``` ### Tasks API Security: Ja #### Taskliste Request**: **GET**: https://127.0.0.1:9443/api/v1/admin/tasks **Beschreibung**: Liste der Service Tasks **Security role**: admin **Request**: **GET**: https://127.0.0.1:9443/api/v1/admin/tasks **Response**: ```json { "count": 1, "data": [ { "_id": "5eca0c72cfe02b227085e242", "tdata": "reporting", "tfile": "1234456789", "tstatus": "finished", "ttype": "report" } ], "found": 1, "limit": 0, "offset": 0, "query": "" } ``` #### Starten eines Servicetask **Request**: **POST**: https://127.0.0.1:9443/api/v1/admin/tasks **Beschreibung**: Starten eines Service tasks **Security role**: admin **Request**: **POST**: https://127.0.0.1:9443/api/v1/admin/tasks/ ​ **Payload**: ```json { "ttype": "report", "tdata": "[optional data for the task, if necessary]" } ``` **Response**: ```JSON { "tasksid": "5ea92df4d015d95201f6b4b8" } ``` ​ **Headers**: `Location: /api/v1/admin/tasks/5ea92df4d015d95201f6b4b8` ### Files API Security: Ja #### Upload einer Datei **Request**: **POST**: https://127.0.0.1:9443/api/v1/files/{backendname}/ **Beschreibung**: Upload einer Datei auf den Server. Dateien dürfen nicht größer sein als 10MB. **Security role**: edit **Request**: **POST**: https://127.0.0.1:9443/api/v1/files/sensors/ ​ **Payload**: Http Formbased File Upload. Name des Formfeldes: file **Response**: ```JSON { "fileid": "5ea92df4d015d95201f6b4b8", "filename": "readme.md" } ``` ​ **Headers**: `Location: /api/v1/files/sensors/5ea92df4d015d95201f6b4b8` #### Download einer Datei **Request**: **GET**: https://127.0.0.1:9443/api/v1/files/{backendname}/{fileid} **Beschreibung**: Download einer Datei vom Server. **Security role**: read **Request**: **GET**: https://127.0.0.1:9443/api/v1/files/sensors/5ea92df4d015d95201f6b4b8 **Response**: Die Datei als Download.