backend.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. package worker
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "time"
  7. "go.mongodb.org/mongo-driver/bson/primitive"
  8. "gopkg.in/yaml.v2"
  9. "wkla.no-ip.biz/gogs/Willie/MsgService/MessageService/dao"
  10. "wkla.no-ip.biz/gogs/Willie/MsgService/MessageService/model"
  11. )
  12. var BackendStorageRoute model.Route
  13. func init() {
  14. }
  15. //ValidateBackend validate if a backend definition is valid
  16. func ValidateBackend(be model.Backend) error {
  17. // checking backendname format
  18. // checking models
  19. // checking indexes
  20. // checking datasources
  21. // checking rules
  22. // checking destinations
  23. return nil
  24. }
  25. //PrepareBackend will mmaily prepare the configuration of the datasources and destination to the right config type
  26. func PrepareBackend(backend model.Backend) (model.Backend, error) {
  27. for i, dataSource := range backend.DataSources {
  28. configJSON, err := json.Marshal(dataSource.Config)
  29. switch dataSource.Type {
  30. case "mqtt":
  31. var config model.DataSourceConfigMQTT
  32. if err = json.Unmarshal(configJSON, &config); err != nil {
  33. return backend, errors.New(fmt.Sprintf("backend: %s, unmarshall mqtt config: %q", backend.Backendname, dataSource.Type))
  34. }
  35. backend.DataSources[i].Config = config
  36. default:
  37. return backend, errors.New(fmt.Sprintf("backend: %s, unknown datasource type: %q", backend.Backendname, dataSource.Type))
  38. }
  39. }
  40. for i, destination := range backend.Destinations {
  41. configJSON, err := json.Marshal(destination.Config)
  42. switch destination.Type {
  43. case "mqtt":
  44. var config model.DataSourceConfigMQTT
  45. if err = json.Unmarshal(configJSON, &config); err != nil {
  46. return backend, errors.New(fmt.Sprintf("backend: %s, unmarshall mqtt config: %q", backend.Backendname, destination.Type))
  47. }
  48. backend.Destinations[i].Config = config
  49. default:
  50. return backend, errors.New(fmt.Sprintf("backend: %s, unknown destination type: %q", backend.Backendname, destination.Type))
  51. }
  52. }
  53. return backend, nil
  54. }
  55. //RegisterBackend will create the needed indexes for the models and create the datasources, rules and destinations
  56. func RegisterBackend(backend model.Backend) error {
  57. // create indexes if missing
  58. models := backend.Models
  59. for _, bemodel := range models {
  60. err := createIndex(bemodel, backend.Backendname)
  61. if err != nil {
  62. log.Fatalf("%v", err)
  63. }
  64. }
  65. // creating source plugins
  66. for _, datasource := range backend.DataSources {
  67. ok := false
  68. for !ok {
  69. err := createDatasource(datasource, backend.Backendname)
  70. if err != nil {
  71. log.Fatalf("%v", err)
  72. time.Sleep(10 * time.Second)
  73. continue
  74. }
  75. ok = true
  76. }
  77. }
  78. for _, rule := range backend.Rules {
  79. ok := false
  80. for !ok {
  81. err := createRule(rule, backend.Backendname)
  82. if err != nil {
  83. log.Fatalf("%v", err)
  84. time.Sleep(10 * time.Second)
  85. continue
  86. }
  87. ok = true
  88. }
  89. }
  90. for _, destination := range backend.Destinations {
  91. ok := false
  92. for !ok {
  93. err := Destinations.Register(backend.Backendname, destination)
  94. if err != nil {
  95. log.Fatalf("%v", err)
  96. time.Sleep(10 * time.Second)
  97. continue
  98. }
  99. ok = true
  100. }
  101. }
  102. return nil
  103. }
  104. func createDatasource(datasource model.DataSource, backendname string) error {
  105. switch datasource.Type {
  106. case "mqtt":
  107. clientID := fmt.Sprintf("autorestIoT.%s.%s", backendname, datasource.Name)
  108. err := mqttRegisterTopic(clientID, backendname, datasource)
  109. if err != nil {
  110. return err
  111. }
  112. default:
  113. log.Alertf("type \"%s\" is not availble as data source type", datasource.Type)
  114. }
  115. return nil
  116. }
  117. func destroyDatasource(datasource model.DataSource, backendname string) error {
  118. switch datasource.Type {
  119. case "mqtt":
  120. clientID := fmt.Sprintf("autorestIoT.%s.%s", backendname, datasource.Name)
  121. err := mqttDeregisterTopic(clientID, backendname, datasource)
  122. if err != nil {
  123. return err
  124. }
  125. }
  126. return nil
  127. }
  128. func createRule(rule model.Rule, backendname string) error {
  129. json, err := json.Marshal(rule.Transform)
  130. if err != nil {
  131. return err
  132. }
  133. err = Rules.Register(backendname, rule.Name, string(json))
  134. if err != nil {
  135. return err
  136. }
  137. return nil
  138. }
  139. func destroyRule(rule model.Rule, backendname string) error {
  140. err := Rules.Deregister(backendname, rule.Name)
  141. if err != nil {
  142. return err
  143. }
  144. return nil
  145. }
  146. func createIndex(bemodel model.Model, backendname string) error {
  147. indexes := bemodel.Indexes
  148. // define stardard fulltext index
  149. _, ok := bemodel.GetIndex(dao.FulltextIndexName)
  150. if !ok {
  151. fulltextIndex := model.Index{
  152. Name: dao.FulltextIndexName,
  153. Fields: bemodel.GetFieldNames(),
  154. }
  155. indexes = append(indexes, fulltextIndex)
  156. }
  157. // define stardard indexes
  158. for _, field := range bemodel.Fields {
  159. _, ok := bemodel.GetIndex(dao.FulltextIndexName)
  160. if !ok {
  161. index := model.Index{
  162. Name: field.Name,
  163. Fields: []string{field.Name},
  164. }
  165. indexes = append(indexes, index)
  166. }
  167. }
  168. // Delete unused indexes
  169. route := model.Route{
  170. Backend: backendname,
  171. Model: bemodel.Name,
  172. }
  173. names, err := dao.GetStorage().GetIndexNames(route)
  174. if err != nil {
  175. return err
  176. }
  177. for _, idxName := range names {
  178. found := false
  179. for _, index := range indexes {
  180. if idxName == index.Name {
  181. found = true
  182. break
  183. }
  184. }
  185. if !found {
  186. err = dao.GetStorage().DeleteIndex(route, idxName)
  187. }
  188. }
  189. for _, index := range indexes {
  190. err := dao.GetStorage().UpdateIndex(route, index)
  191. if err != nil {
  192. return err
  193. }
  194. }
  195. return nil
  196. }
  197. //DeregisterBackend will destroy all datasources, Rules and destinations and will remove the backend from the internal backendlist.
  198. func DeregisterBackend(backendname string) error {
  199. backend, ok := model.BackendList.Get(backendname)
  200. if ok {
  201. for _, datasource := range backend.DataSources {
  202. ok := false
  203. for !ok {
  204. err := destroyDatasource(datasource, backend.Backendname)
  205. if err != nil {
  206. log.Fatalf("%v", err)
  207. return err
  208. }
  209. ok = true
  210. }
  211. }
  212. for _, rule := range backend.Rules {
  213. ok := false
  214. for !ok {
  215. err := destroyRule(rule, backend.Backendname)
  216. if err != nil {
  217. log.Fatalf("%v", err)
  218. return err
  219. }
  220. ok = true
  221. }
  222. }
  223. for _, destination := range backend.Destinations {
  224. ok := false
  225. for !ok {
  226. err := Destinations.Deregister(backend.Backendname, destination)
  227. if err != nil {
  228. log.Fatalf("%v", err)
  229. return err
  230. }
  231. ok = true
  232. }
  233. }
  234. model.BackendList.Remove(backendname)
  235. }
  236. return nil
  237. }
  238. //StoreBackend will save the backend definition to the storage. If its already there, it will be updated
  239. func StoreBackend(backend model.Backend) (string, error) {
  240. update := false
  241. id := ""
  242. query := fmt.Sprintf("{\"backendname\": \"%s\"}", backend.Backendname)
  243. count, bemodels, err := dao.GetStorage().QueryModel(BackendStorageRoute, query, 0, 10)
  244. if err != nil {
  245. log.Alertf("%v", err)
  246. return "", err
  247. }
  248. if count > 0 {
  249. update = true
  250. bemodel := model.JSONMap(bemodels[0])
  251. id = bemodel["_id"].(primitive.ObjectID).Hex()
  252. log.Infof("found backend with id: %s", id)
  253. }
  254. jsonString, err := json.Marshal(backend)
  255. if err != nil {
  256. return "", err
  257. }
  258. jsonModel := model.JSONMap{}
  259. err = yaml.Unmarshal(jsonString, &jsonModel)
  260. if err != nil {
  261. return "", err
  262. }
  263. if update {
  264. route := model.Route{
  265. Backend: BackendStorageRoute.Backend,
  266. Apikey: BackendStorageRoute.Apikey,
  267. Identity: id,
  268. Model: BackendStorageRoute.Model,
  269. SystemID: BackendStorageRoute.SystemID,
  270. Username: BackendStorageRoute.Username,
  271. }
  272. _, err = dao.GetStorage().UpdateModel(route, jsonModel)
  273. if err != nil {
  274. return "", err
  275. }
  276. log.Infof("backend updated: %s", id)
  277. } else {
  278. id, err = dao.GetStorage().CreateModel(BackendStorageRoute, jsonModel)
  279. if err != nil {
  280. return "", err
  281. }
  282. log.Infof("backend created: %s", id)
  283. }
  284. return id, nil
  285. }
  286. //DeleteBackend deleting the backend from the storage, no data will be deleted
  287. func DeleteBackend(backendname string) error {
  288. query := fmt.Sprintf("{\"backendname\": \"%s\"}", backendname)
  289. count, bemodels, err := dao.GetStorage().QueryModel(BackendStorageRoute, query, 0, 10)
  290. if err != nil {
  291. log.Alertf("%v", err)
  292. return err
  293. }
  294. if count > 0 {
  295. bemodel := model.JSONMap(bemodels[0])
  296. id := bemodel["_id"].(primitive.ObjectID).Hex()
  297. log.Infof("found backend with id: %s", id)
  298. route := model.Route{
  299. Backend: BackendStorageRoute.Backend,
  300. Apikey: BackendStorageRoute.Apikey,
  301. Identity: id,
  302. Model: BackendStorageRoute.Model,
  303. SystemID: BackendStorageRoute.SystemID,
  304. Username: BackendStorageRoute.Username,
  305. }
  306. err = dao.GetStorage().DeleteModel(route)
  307. if err != nil {
  308. return err
  309. }
  310. log.Infof("backend deleted: %s", id)
  311. }
  312. return nil
  313. }