mqtt.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423
  1. package worker
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. orglog "log"
  7. "os"
  8. "strconv"
  9. "strings"
  10. "time"
  11. mqtt "github.com/eclipse/paho.mqtt.golang"
  12. "wkla.no-ip.biz/gogs/Willie/MsgService/MessageService/model"
  13. )
  14. const processorPrefix = "processor"
  15. const datasourcePrefix = "datasource"
  16. func CreateMQTTDestinationProcessor(backend string, destination model.Destination) (DestinationProcessor, error) {
  17. processor := MQTTDestinationProcessor{
  18. Backend: backend,
  19. Destination: destination,
  20. }
  21. return &processor, nil
  22. }
  23. //MQTTDestinationProcessor does nothing
  24. type MQTTDestinationProcessor struct {
  25. Backend string
  26. Destination model.Destination
  27. }
  28. //Initialise the mqtt processor
  29. func (m *MQTTDestinationProcessor) Initialise(backend string, destination model.Destination) error {
  30. // delete an already connected mqtt processor
  31. // if m.Destination != nil {
  32. datasinkNsName := GetMQTTClientNsName(processorPrefix, backend, m.Destination.Name)
  33. datasink, ok := mqttClients[datasinkNsName]
  34. if ok {
  35. if datasink.Client != nil {
  36. datasink.Client.Disconnect(1000)
  37. }
  38. }
  39. delete(mqttClients, datasinkNsName)
  40. // }
  41. m.Destination = destination
  42. // now initilaise the new connection
  43. datasink, err := getDatasinkMQTTClient(datasinkNsName, backend, destination.Config.(model.DataSourceConfigMQTT))
  44. if err != nil {
  45. log.Alertf("%v", err)
  46. return err
  47. }
  48. return nil
  49. }
  50. //Destroy the mqtt processor
  51. func (m *MQTTDestinationProcessor) Destroy(backend string, destination model.Destination) error {
  52. datasinkNsName := GetMQTTClientNsName(processorPrefix, backend, m.Destination.Name)
  53. datasink, ok := mqttClients[datasinkNsName]
  54. if ok {
  55. if datasink.Client != nil {
  56. datasink.Client.Disconnect(1000)
  57. }
  58. delete(mqttClients, datasinkNsName)
  59. }
  60. return nil
  61. }
  62. //Store stores the message to a topic
  63. func (m *MQTTDestinationProcessor) Store(data model.JSONMap) (string, error) {
  64. datasinkNsName := GetMQTTClientNsName(processorPrefix, m.Backend, m.Destination.Name)
  65. datasink, ok := mqttClients[datasinkNsName]
  66. if !ok {
  67. err := m.Initialise(m.Backend, m.Destination)
  68. if err != nil {
  69. return "", err
  70. }
  71. datasinkNsName = GetMQTTClientNsName(processorPrefix, m.Backend, m.Destination.Name)
  72. datasink, ok = mqttClients[datasinkNsName]
  73. if !ok {
  74. return "", errors.New("destination client is not ready")
  75. }
  76. }
  77. payload, err := json.Marshal(data)
  78. if err != nil {
  79. return "", err
  80. }
  81. config := m.Destination.Config.(model.DataSourceConfigMQTT)
  82. token := datasink.Client.Publish(datasink.Topic, byte(config.QoS), false, payload)
  83. token.Wait()
  84. if token.Error() != nil {
  85. return "", token.Error()
  86. }
  87. return fmt.Sprintf("brk: %s, tpc: %s", datasink.Broker, datasink.Topic), nil
  88. }
  89. type MqttDatasource struct {
  90. Client mqtt.Client
  91. Broker string
  92. Backend string
  93. Destinations []string
  94. Topic string
  95. QoS int
  96. Payload string
  97. TopicAttribute string
  98. SimpleValueAttribute string
  99. SimpleValueAttributeType string
  100. Rule string
  101. }
  102. var mqttClients = make(map[string]MqttDatasource)
  103. //GetMQTTClients
  104. func GetMQTTClients() map[string]MqttDatasource {
  105. return mqttClients
  106. }
  107. func init() {
  108. // mqtt.DEBUG = orglog.New(os.Stdout, "DEBUG", 0)
  109. mqtt.ERROR = orglog.New(os.Stdout, "ERROR", 0)
  110. }
  111. func mqttStoreMessage(datasource MqttDatasource, msg mqtt.Message) {
  112. //log.Infof("MODEL: %s.%s TOPIC: %s MSG: %s", datasource.Backend, datasource.Model, msg.Topic(), msg.Payload())
  113. data, err := prepareMessage(datasource, msg)
  114. if err != nil {
  115. log.Alertf("%v", err)
  116. return
  117. }
  118. if datasource.TopicAttribute != "" {
  119. data[datasource.TopicAttribute] = datasource.Topic
  120. }
  121. data, err = executeTransformationrule(datasource, data)
  122. if err != nil {
  123. log.Alertf("%v", err)
  124. return
  125. }
  126. for _, destination := range datasource.Destinations {
  127. if strings.HasPrefix(destination, "$model.") {
  128. modelname := strings.TrimPrefix(destination, "$model.")
  129. route := model.Route{
  130. Backend: datasource.Backend,
  131. Model: modelname,
  132. }
  133. Store(route, data)
  134. } else {
  135. err := Destinations.Store(datasource.Backend, destination, data)
  136. if err != nil {
  137. log.Alertf("%v", err)
  138. return
  139. }
  140. }
  141. }
  142. }
  143. func executeTransformationrule(datasource MqttDatasource, data model.JSONMap) (model.JSONMap, error) {
  144. if datasource.Rule != "" {
  145. jsonBytes, err := json.Marshal(data)
  146. if err != nil {
  147. log.Alertf("%v", err)
  148. return nil, err
  149. }
  150. newJson, err := Rules.TransformJSON(datasource.Backend, datasource.Rule, jsonBytes)
  151. if err != nil {
  152. log.Alertf("%v", err)
  153. return nil, err
  154. }
  155. data = nil
  156. err = json.Unmarshal(newJson, &data)
  157. if err != nil {
  158. log.Alertf("%v", err)
  159. return nil, err
  160. }
  161. //fmt.Printf("src: %s\ndst: %s\n", string(jsonBytes), string(newJson))
  162. }
  163. return data, nil
  164. }
  165. func getSimpleDataAsModel(fieldname, fieldtype string, payload string) (model.JSONMap, error) {
  166. data := model.JSONMap{}
  167. var err error
  168. switch fieldtype {
  169. case model.FieldTypeInt:
  170. value, err := strconv.Atoi(payload)
  171. if err == nil {
  172. data[fieldname] = value
  173. }
  174. case model.FieldTypeFloat:
  175. value, err := strconv.ParseFloat(payload, 64)
  176. if err == nil {
  177. data[fieldname] = value
  178. }
  179. case model.FieldTypeTime:
  180. value, err := time.Parse(time.RFC3339, payload)
  181. if err != nil {
  182. saveerr := err
  183. var vint int
  184. vint, err = strconv.Atoi(payload)
  185. if err == nil {
  186. value = time.Unix(0, int64(vint)*int64(time.Millisecond))
  187. } else {
  188. err = saveerr
  189. }
  190. }
  191. if err == nil {
  192. data[fieldname] = value
  193. }
  194. case model.FieldTypeBool:
  195. value, err := strconv.ParseBool(payload)
  196. if err == nil {
  197. data[fieldname] = value
  198. }
  199. default:
  200. data[fieldname] = payload
  201. }
  202. if err != nil {
  203. return nil, err
  204. }
  205. return data, nil
  206. }
  207. func prepareMessage(datasource MqttDatasource, msg mqtt.Message) (model.JSONMap, error) {
  208. var data model.JSONMap
  209. data = nil
  210. switch strings.ToLower(datasource.Payload) {
  211. case "application/json":
  212. err := json.Unmarshal(msg.Payload(), &data)
  213. if err != nil {
  214. log.Alertf("%v", err)
  215. return nil, err
  216. }
  217. case "application/x.simple":
  218. added := false
  219. payload := string(msg.Payload())
  220. var err error
  221. data, err = getSimpleDataAsModel(datasource.SimpleValueAttribute, datasource.SimpleValueAttributeType, payload)
  222. if err != nil {
  223. log.Alertf("converting error on topic %s: %v", datasource.Topic, err)
  224. return nil, err
  225. }
  226. if !added {
  227. data[datasource.SimpleValueAttribute] = string(msg.Payload())
  228. }
  229. }
  230. return data, nil
  231. }
  232. func mqttConnectionLost(datasource MqttDatasource, c mqtt.Client, e error) {
  233. connected := false
  234. for !connected {
  235. err := mqttReconnect(c)
  236. if err != nil {
  237. log.Alertf("%v", err)
  238. time.Sleep(10 * time.Second)
  239. continue
  240. }
  241. connected = c.IsConnected()
  242. }
  243. subscribed := false
  244. for !subscribed {
  245. if !c.IsConnected() {
  246. mqttReconnect(c)
  247. }
  248. err := mqttSubscribe(datasource)
  249. if err != nil {
  250. log.Alertf("%v", err)
  251. time.Sleep(10 * time.Second)
  252. continue
  253. }
  254. subscribed = true
  255. }
  256. log.Infof("registering topic %s on %s for model %v", datasource.Topic, datasource.Broker, datasource.Destinations)
  257. }
  258. func mqttReconnect(c mqtt.Client) error {
  259. if !c.IsConnected() {
  260. token := c.Connect()
  261. token.Wait()
  262. err := token.Error()
  263. return err
  264. }
  265. return nil
  266. }
  267. func mqttSubscribe(datasource MqttDatasource) error {
  268. token := datasource.Client.Subscribe(datasource.Topic, byte(datasource.QoS), func(c mqtt.Client, m mqtt.Message) {
  269. mqttStoreMessage(datasource, m)
  270. })
  271. token.Wait()
  272. err := token.Error()
  273. return err
  274. }
  275. func mqttUnsubscribe(datasource MqttDatasource) error {
  276. token := datasource.Client.Unsubscribe(datasource.Topic)
  277. token.Wait()
  278. err := token.Error()
  279. return err
  280. }
  281. func getDatasinkMQTTClient(datasinkNsName string, backendname string, config model.DataSourceConfigMQTT) (MqttDatasource, error) {
  282. datasourceMqtt := MqttDatasource{
  283. Broker: config.Broker,
  284. Backend: backendname,
  285. Topic: config.Topic,
  286. QoS: config.QoS,
  287. Payload: config.Payload,
  288. TopicAttribute: config.AddTopicAsAttribute,
  289. SimpleValueAttribute: config.SimpleValueAttribute,
  290. SimpleValueAttributeType: config.SimpleValueAttributeType,
  291. }
  292. opts := mqtt.NewClientOptions().AddBroker(config.Broker).SetClientID(datasinkNsName)
  293. opts.SetKeepAlive(2 * time.Second)
  294. //opts.SetDefaultPublishHandler(f)
  295. opts.SetPingTimeout(1 * time.Second)
  296. opts.AutoReconnect = true
  297. opts.SetConnectionLostHandler(func(c mqtt.Client, err error) {
  298. mqttConnectionLost(datasourceMqtt, c, err)
  299. })
  300. if config.Username != "" {
  301. opts.CredentialsProvider = func() (string, string) {
  302. return config.Username, config.Password
  303. }
  304. }
  305. c := mqtt.NewClient(opts)
  306. datasourceMqtt.Client = c
  307. err := mqttReconnect(c)
  308. if err != nil {
  309. return MqttDatasource{}, err
  310. }
  311. mqttClients[datasinkNsName] = datasourceMqtt
  312. return datasourceMqtt, nil
  313. }
  314. func getDatasourceMQTTClient(clientID string, backendname string, datasource model.DataSource) (MqttDatasource, error) {
  315. destinationmodel := datasource.Destinations
  316. config := datasource.Config.(model.DataSourceConfigMQTT)
  317. datasourceMqtt := MqttDatasource{
  318. Broker: config.Broker,
  319. Backend: backendname,
  320. Destinations: destinationmodel,
  321. Topic: config.Topic,
  322. QoS: config.QoS,
  323. Payload: config.Payload,
  324. TopicAttribute: config.AddTopicAsAttribute,
  325. SimpleValueAttribute: config.SimpleValueAttribute,
  326. SimpleValueAttributeType: config.SimpleValueAttributeType,
  327. Rule: datasource.Rule,
  328. }
  329. opts := mqtt.NewClientOptions().AddBroker(config.Broker).SetClientID(clientID)
  330. opts.SetKeepAlive(2 * time.Second)
  331. //opts.SetDefaultPublishHandler(f)
  332. opts.SetPingTimeout(1 * time.Second)
  333. opts.AutoReconnect = true
  334. opts.SetConnectionLostHandler(func(c mqtt.Client, err error) {
  335. mqttConnectionLost(datasourceMqtt, c, err)
  336. })
  337. if config.Username != "" {
  338. opts.CredentialsProvider = func() (string, string) {
  339. return config.Username, config.Password
  340. }
  341. }
  342. c := mqtt.NewClient(opts)
  343. datasourceMqtt.Client = c
  344. err := mqttReconnect(c)
  345. if err != nil {
  346. return MqttDatasource{}, err
  347. }
  348. datasourceNSName := GetMQTTClientNsName(datasourcePrefix, datasourceMqtt.Backend, datasource.Name)
  349. mqttClients[datasourceNSName] = datasourceMqtt
  350. return datasourceMqtt, nil
  351. }
  352. func mqttRegisterTopic(clientID string, backendname string, datasource model.DataSource) error {
  353. datasourceMqtt, err := getDatasourceMQTTClient(clientID, backendname, datasource)
  354. if err != nil {
  355. return err
  356. }
  357. err = mqttSubscribe(datasourceMqtt)
  358. if err != nil {
  359. return err
  360. }
  361. log.Infof("registering topic %s on %s for model %s", datasourceMqtt.Topic, datasourceMqtt.Broker, datasource.Destinations)
  362. return nil
  363. }
  364. func mqttDeregisterTopic(clientID string, backendname string, datasource model.DataSource) error {
  365. datasourceNSName := GetMQTTClientNsName(datasourcePrefix, backendname, datasource.Name)
  366. datasourceMqtt, ok := mqttClients[datasourceNSName]
  367. if ok {
  368. err := mqttUnsubscribe(datasourceMqtt)
  369. if err != nil {
  370. return err
  371. }
  372. datasourceMqtt.Client.Disconnect(1000)
  373. delete(mqttClients, datasourceNSName)
  374. log.Infof("deregistering topic %s on %s for model %s", datasourceMqtt.Topic, datasourceMqtt.Broker, datasource.Destinations)
  375. }
  376. return nil
  377. }
  378. func GetMQTTClientNsName(prefix, backend, dataname string) string {
  379. return fmt.Sprintf("%s.%s.%s", prefix, backend, dataname)
  380. }