123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423 |
- package worker
- import (
- "encoding/json"
- "errors"
- "fmt"
- orglog "log"
- "os"
- "strconv"
- "strings"
- "time"
- mqtt "github.com/eclipse/paho.mqtt.golang"
- "wkla.no-ip.biz/gogs/Willie/MsgService/MessageService/model"
- )
- const processorPrefix = "processor"
- const datasourcePrefix = "datasource"
- func CreateMQTTDestinationProcessor(backend string, destination model.Destination) (DestinationProcessor, error) {
- processor := MQTTDestinationProcessor{
- Backend: backend,
- Destination: destination,
- }
- return &processor, nil
- }
- //MQTTDestinationProcessor does nothing
- type MQTTDestinationProcessor struct {
- Backend string
- Destination model.Destination
- }
- //Initialise the mqtt processor
- func (m *MQTTDestinationProcessor) Initialise(backend string, destination model.Destination) error {
- // delete an already connected mqtt processor
- // if m.Destination != nil {
- datasinkNsName := GetMQTTClientNsName(processorPrefix, backend, m.Destination.Name)
- datasink, ok := mqttClients[datasinkNsName]
- if ok {
- if datasink.Client != nil {
- datasink.Client.Disconnect(1000)
- }
- }
- delete(mqttClients, datasinkNsName)
- // }
- m.Destination = destination
- // now initilaise the new connection
- datasink, err := getDatasinkMQTTClient(datasinkNsName, backend, destination.Config.(model.DataSourceConfigMQTT))
- if err != nil {
- log.Alertf("%v", err)
- return err
- }
- return nil
- }
- //Destroy the mqtt processor
- func (m *MQTTDestinationProcessor) Destroy(backend string, destination model.Destination) error {
- datasinkNsName := GetMQTTClientNsName(processorPrefix, backend, m.Destination.Name)
- datasink, ok := mqttClients[datasinkNsName]
- if ok {
- if datasink.Client != nil {
- datasink.Client.Disconnect(1000)
- }
- delete(mqttClients, datasinkNsName)
- }
- return nil
- }
- //Store stores the message to a topic
- func (m *MQTTDestinationProcessor) Store(data model.JSONMap) (string, error) {
- datasinkNsName := GetMQTTClientNsName(processorPrefix, m.Backend, m.Destination.Name)
- datasink, ok := mqttClients[datasinkNsName]
- if !ok {
- err := m.Initialise(m.Backend, m.Destination)
- if err != nil {
- return "", err
- }
- datasinkNsName = GetMQTTClientNsName(processorPrefix, m.Backend, m.Destination.Name)
- datasink, ok = mqttClients[datasinkNsName]
- if !ok {
- return "", errors.New("destination client is not ready")
- }
- }
- payload, err := json.Marshal(data)
- if err != nil {
- return "", err
- }
- config := m.Destination.Config.(model.DataSourceConfigMQTT)
- token := datasink.Client.Publish(datasink.Topic, byte(config.QoS), false, payload)
- token.Wait()
- if token.Error() != nil {
- return "", token.Error()
- }
- return fmt.Sprintf("brk: %s, tpc: %s", datasink.Broker, datasink.Topic), nil
- }
- type MqttDatasource struct {
- Client mqtt.Client
- Broker string
- Backend string
- Destinations []string
- Topic string
- QoS int
- Payload string
- TopicAttribute string
- SimpleValueAttribute string
- SimpleValueAttributeType string
- Rule string
- }
- var mqttClients = make(map[string]MqttDatasource)
- //GetMQTTClients
- func GetMQTTClients() map[string]MqttDatasource {
- return mqttClients
- }
- func init() {
- // mqtt.DEBUG = orglog.New(os.Stdout, "DEBUG", 0)
- mqtt.ERROR = orglog.New(os.Stdout, "ERROR", 0)
- }
- func mqttStoreMessage(datasource MqttDatasource, msg mqtt.Message) {
- //log.Infof("MODEL: %s.%s TOPIC: %s MSG: %s", datasource.Backend, datasource.Model, msg.Topic(), msg.Payload())
- data, err := prepareMessage(datasource, msg)
- if err != nil {
- log.Alertf("%v", err)
- return
- }
- if datasource.TopicAttribute != "" {
- data[datasource.TopicAttribute] = datasource.Topic
- }
- data, err = executeTransformationrule(datasource, data)
- if err != nil {
- log.Alertf("%v", err)
- return
- }
- for _, destination := range datasource.Destinations {
- if strings.HasPrefix(destination, "$model.") {
- modelname := strings.TrimPrefix(destination, "$model.")
- route := model.Route{
- Backend: datasource.Backend,
- Model: modelname,
- }
- Store(route, data)
- } else {
- err := Destinations.Store(datasource.Backend, destination, data)
- if err != nil {
- log.Alertf("%v", err)
- return
- }
- }
- }
- }
- func executeTransformationrule(datasource MqttDatasource, data model.JSONMap) (model.JSONMap, error) {
- if datasource.Rule != "" {
- jsonBytes, err := json.Marshal(data)
- if err != nil {
- log.Alertf("%v", err)
- return nil, err
- }
- newJson, err := Rules.TransformJSON(datasource.Backend, datasource.Rule, jsonBytes)
- if err != nil {
- log.Alertf("%v", err)
- return nil, err
- }
- data = nil
- err = json.Unmarshal(newJson, &data)
- if err != nil {
- log.Alertf("%v", err)
- return nil, err
- }
- //fmt.Printf("src: %s\ndst: %s\n", string(jsonBytes), string(newJson))
- }
- return data, nil
- }
- func getSimpleDataAsModel(fieldname, fieldtype string, payload string) (model.JSONMap, error) {
- data := model.JSONMap{}
- var err error
- switch fieldtype {
- case model.FieldTypeInt:
- value, err := strconv.Atoi(payload)
- if err == nil {
- data[fieldname] = value
- }
- case model.FieldTypeFloat:
- value, err := strconv.ParseFloat(payload, 64)
- if err == nil {
- data[fieldname] = value
- }
- case model.FieldTypeTime:
- value, err := time.Parse(time.RFC3339, payload)
- if err != nil {
- saveerr := err
- var vint int
- vint, err = strconv.Atoi(payload)
- if err == nil {
- value = time.Unix(0, int64(vint)*int64(time.Millisecond))
- } else {
- err = saveerr
- }
- }
- if err == nil {
- data[fieldname] = value
- }
- case model.FieldTypeBool:
- value, err := strconv.ParseBool(payload)
- if err == nil {
- data[fieldname] = value
- }
- default:
- data[fieldname] = payload
- }
- if err != nil {
- return nil, err
- }
- return data, nil
- }
- func prepareMessage(datasource MqttDatasource, msg mqtt.Message) (model.JSONMap, error) {
- var data model.JSONMap
- data = nil
- switch strings.ToLower(datasource.Payload) {
- case "application/json":
- err := json.Unmarshal(msg.Payload(), &data)
- if err != nil {
- log.Alertf("%v", err)
- return nil, err
- }
- case "application/x.simple":
- added := false
- payload := string(msg.Payload())
- var err error
- data, err = getSimpleDataAsModel(datasource.SimpleValueAttribute, datasource.SimpleValueAttributeType, payload)
- if err != nil {
- log.Alertf("converting error on topic %s: %v", datasource.Topic, err)
- return nil, err
- }
- if !added {
- data[datasource.SimpleValueAttribute] = string(msg.Payload())
- }
- }
- return data, nil
- }
- func mqttConnectionLost(datasource MqttDatasource, c mqtt.Client, e error) {
- connected := false
- for !connected {
- err := mqttReconnect(c)
- if err != nil {
- log.Alertf("%v", err)
- time.Sleep(10 * time.Second)
- continue
- }
- connected = c.IsConnected()
- }
- subscribed := false
- for !subscribed {
- if !c.IsConnected() {
- mqttReconnect(c)
- }
- err := mqttSubscribe(datasource)
- if err != nil {
- log.Alertf("%v", err)
- time.Sleep(10 * time.Second)
- continue
- }
- subscribed = true
- }
- log.Infof("registering topic %s on %s for model %v", datasource.Topic, datasource.Broker, datasource.Destinations)
- }
- func mqttReconnect(c mqtt.Client) error {
- if !c.IsConnected() {
- token := c.Connect()
- token.Wait()
- err := token.Error()
- return err
- }
- return nil
- }
- func mqttSubscribe(datasource MqttDatasource) error {
- token := datasource.Client.Subscribe(datasource.Topic, byte(datasource.QoS), func(c mqtt.Client, m mqtt.Message) {
- mqttStoreMessage(datasource, m)
- })
- token.Wait()
- err := token.Error()
- return err
- }
- func mqttUnsubscribe(datasource MqttDatasource) error {
- token := datasource.Client.Unsubscribe(datasource.Topic)
- token.Wait()
- err := token.Error()
- return err
- }
- func getDatasinkMQTTClient(datasinkNsName string, backendname string, config model.DataSourceConfigMQTT) (MqttDatasource, error) {
- datasourceMqtt := MqttDatasource{
- Broker: config.Broker,
- Backend: backendname,
- Topic: config.Topic,
- QoS: config.QoS,
- Payload: config.Payload,
- TopicAttribute: config.AddTopicAsAttribute,
- SimpleValueAttribute: config.SimpleValueAttribute,
- SimpleValueAttributeType: config.SimpleValueAttributeType,
- }
- opts := mqtt.NewClientOptions().AddBroker(config.Broker).SetClientID(datasinkNsName)
- opts.SetKeepAlive(2 * time.Second)
- //opts.SetDefaultPublishHandler(f)
- opts.SetPingTimeout(1 * time.Second)
- opts.AutoReconnect = true
- opts.SetConnectionLostHandler(func(c mqtt.Client, err error) {
- mqttConnectionLost(datasourceMqtt, c, err)
- })
- if config.Username != "" {
- opts.CredentialsProvider = func() (string, string) {
- return config.Username, config.Password
- }
- }
- c := mqtt.NewClient(opts)
- datasourceMqtt.Client = c
- err := mqttReconnect(c)
- if err != nil {
- return MqttDatasource{}, err
- }
- mqttClients[datasinkNsName] = datasourceMqtt
- return datasourceMqtt, nil
- }
- func getDatasourceMQTTClient(clientID string, backendname string, datasource model.DataSource) (MqttDatasource, error) {
- destinationmodel := datasource.Destinations
- config := datasource.Config.(model.DataSourceConfigMQTT)
- datasourceMqtt := MqttDatasource{
- Broker: config.Broker,
- Backend: backendname,
- Destinations: destinationmodel,
- Topic: config.Topic,
- QoS: config.QoS,
- Payload: config.Payload,
- TopicAttribute: config.AddTopicAsAttribute,
- SimpleValueAttribute: config.SimpleValueAttribute,
- SimpleValueAttributeType: config.SimpleValueAttributeType,
- Rule: datasource.Rule,
- }
- opts := mqtt.NewClientOptions().AddBroker(config.Broker).SetClientID(clientID)
- opts.SetKeepAlive(2 * time.Second)
- //opts.SetDefaultPublishHandler(f)
- opts.SetPingTimeout(1 * time.Second)
- opts.AutoReconnect = true
- opts.SetConnectionLostHandler(func(c mqtt.Client, err error) {
- mqttConnectionLost(datasourceMqtt, c, err)
- })
- if config.Username != "" {
- opts.CredentialsProvider = func() (string, string) {
- return config.Username, config.Password
- }
- }
- c := mqtt.NewClient(opts)
- datasourceMqtt.Client = c
- err := mqttReconnect(c)
- if err != nil {
- return MqttDatasource{}, err
- }
- datasourceNSName := GetMQTTClientNsName(datasourcePrefix, datasourceMqtt.Backend, datasource.Name)
- mqttClients[datasourceNSName] = datasourceMqtt
- return datasourceMqtt, nil
- }
- func mqttRegisterTopic(clientID string, backendname string, datasource model.DataSource) error {
- datasourceMqtt, err := getDatasourceMQTTClient(clientID, backendname, datasource)
- if err != nil {
- return err
- }
- err = mqttSubscribe(datasourceMqtt)
- if err != nil {
- return err
- }
- log.Infof("registering topic %s on %s for model %s", datasourceMqtt.Topic, datasourceMqtt.Broker, datasource.Destinations)
- return nil
- }
- func mqttDeregisterTopic(clientID string, backendname string, datasource model.DataSource) error {
- datasourceNSName := GetMQTTClientNsName(datasourcePrefix, backendname, datasource.Name)
- datasourceMqtt, ok := mqttClients[datasourceNSName]
- if ok {
- err := mqttUnsubscribe(datasourceMqtt)
- if err != nil {
- return err
- }
- datasourceMqtt.Client.Disconnect(1000)
- delete(mqttClients, datasourceNSName)
- log.Infof("deregistering topic %s on %s for model %s", datasourceMqtt.Topic, datasourceMqtt.Broker, datasource.Destinations)
- }
- return nil
- }
- func GetMQTTClientNsName(prefix, backend, dataname string) string {
- return fmt.Sprintf("%s.%s.%s", prefix, backend, dataname)
- }
|