123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336 |
- package worker
- import (
- "encoding/json"
- "errors"
- "fmt"
- "time"
- "go.mongodb.org/mongo-driver/bson/primitive"
- "gopkg.in/yaml.v2"
- "wkla.no-ip.biz/gogs/Willie/MsgService/MessageService/dao"
- "wkla.no-ip.biz/gogs/Willie/MsgService/MessageService/model"
- )
- var BackendStorageRoute model.Route
- func init() {
- }
- //ValidateBackend validate if a backend definition is valid
- func ValidateBackend(be model.Backend) error {
- // checking backendname format
- // checking models
- // checking indexes
- // checking datasources
- // checking rules
- // checking destinations
- return nil
- }
- //PrepareBackend will mmaily prepare the configuration of the datasources and destination to the right config type
- func PrepareBackend(backend model.Backend) (model.Backend, error) {
- for i, dataSource := range backend.DataSources {
- configJSON, err := json.Marshal(dataSource.Config)
- switch dataSource.Type {
- case "mqtt":
- var config model.DataSourceConfigMQTT
- if err = json.Unmarshal(configJSON, &config); err != nil {
- return backend, errors.New(fmt.Sprintf("backend: %s, unmarshall mqtt config: %q", backend.Backendname, dataSource.Type))
- }
- backend.DataSources[i].Config = config
- default:
- return backend, errors.New(fmt.Sprintf("backend: %s, unknown datasource type: %q", backend.Backendname, dataSource.Type))
- }
- }
- for i, destination := range backend.Destinations {
- configJSON, err := json.Marshal(destination.Config)
- switch destination.Type {
- case "mqtt":
- var config model.DataSourceConfigMQTT
- if err = json.Unmarshal(configJSON, &config); err != nil {
- return backend, errors.New(fmt.Sprintf("backend: %s, unmarshall mqtt config: %q", backend.Backendname, destination.Type))
- }
- backend.Destinations[i].Config = config
- default:
- return backend, errors.New(fmt.Sprintf("backend: %s, unknown destination type: %q", backend.Backendname, destination.Type))
- }
- }
- return backend, nil
- }
- //RegisterBackend will create the needed indexes for the models and create the datasources, rules and destinations
- func RegisterBackend(backend model.Backend) error {
- // create indexes if missing
- models := backend.Models
- for _, bemodel := range models {
- err := createIndex(bemodel, backend.Backendname)
- if err != nil {
- log.Fatalf("%v", err)
- }
- }
- // creating source plugins
- for _, datasource := range backend.DataSources {
- ok := false
- for !ok {
- err := createDatasource(datasource, backend.Backendname)
- if err != nil {
- log.Fatalf("%v", err)
- time.Sleep(10 * time.Second)
- continue
- }
- ok = true
- }
- }
- for _, rule := range backend.Rules {
- ok := false
- for !ok {
- err := createRule(rule, backend.Backendname)
- if err != nil {
- log.Fatalf("%v", err)
- time.Sleep(10 * time.Second)
- continue
- }
- ok = true
- }
- }
- for _, destination := range backend.Destinations {
- ok := false
- for !ok {
- err := Destinations.Register(backend.Backendname, destination)
- if err != nil {
- log.Fatalf("%v", err)
- time.Sleep(10 * time.Second)
- continue
- }
- ok = true
- }
- }
- return nil
- }
- func createDatasource(datasource model.DataSource, backendname string) error {
- switch datasource.Type {
- case "mqtt":
- clientID := fmt.Sprintf("autorestIoT.%s.%s", backendname, datasource.Name)
- err := mqttRegisterTopic(clientID, backendname, datasource)
- if err != nil {
- return err
- }
- default:
- log.Alertf("type \"%s\" is not availble as data source type", datasource.Type)
- }
- return nil
- }
- func destroyDatasource(datasource model.DataSource, backendname string) error {
- switch datasource.Type {
- case "mqtt":
- clientID := fmt.Sprintf("autorestIoT.%s.%s", backendname, datasource.Name)
- err := mqttDeregisterTopic(clientID, backendname, datasource)
- if err != nil {
- return err
- }
- }
- return nil
- }
- func createRule(rule model.Rule, backendname string) error {
- json, err := json.Marshal(rule.Transform)
- if err != nil {
- return err
- }
- err = Rules.Register(backendname, rule.Name, string(json))
- if err != nil {
- return err
- }
- return nil
- }
- func destroyRule(rule model.Rule, backendname string) error {
- err := Rules.Deregister(backendname, rule.Name)
- if err != nil {
- return err
- }
- return nil
- }
- func createIndex(bemodel model.Model, backendname string) error {
- indexes := bemodel.Indexes
- // define stardard fulltext index
- _, ok := bemodel.GetIndex(dao.FulltextIndexName)
- if !ok {
- fulltextIndex := model.Index{
- Name: dao.FulltextIndexName,
- Fields: bemodel.GetFieldNames(),
- }
- indexes = append(indexes, fulltextIndex)
- }
- // define stardard indexes
- for _, field := range bemodel.Fields {
- _, ok := bemodel.GetIndex(dao.FulltextIndexName)
- if !ok {
- index := model.Index{
- Name: field.Name,
- Fields: []string{field.Name},
- }
- indexes = append(indexes, index)
- }
- }
- // Delete unused indexes
- route := model.Route{
- Backend: backendname,
- Model: bemodel.Name,
- }
- names, err := dao.GetStorage().GetIndexNames(route)
- if err != nil {
- return err
- }
- for _, idxName := range names {
- found := false
- for _, index := range indexes {
- if idxName == index.Name {
- found = true
- break
- }
- }
- if !found {
- err = dao.GetStorage().DeleteIndex(route, idxName)
- }
- }
- for _, index := range indexes {
- err := dao.GetStorage().UpdateIndex(route, index)
- if err != nil {
- return err
- }
- }
- return nil
- }
- //DeregisterBackend will destroy all datasources, Rules and destinations and will remove the backend from the internal backendlist.
- func DeregisterBackend(backendname string) error {
- backend, ok := model.BackendList.Get(backendname)
- if ok {
- for _, datasource := range backend.DataSources {
- ok := false
- for !ok {
- err := destroyDatasource(datasource, backend.Backendname)
- if err != nil {
- log.Fatalf("%v", err)
- return err
- }
- ok = true
- }
- }
- for _, rule := range backend.Rules {
- ok := false
- for !ok {
- err := destroyRule(rule, backend.Backendname)
- if err != nil {
- log.Fatalf("%v", err)
- return err
- }
- ok = true
- }
- }
- for _, destination := range backend.Destinations {
- ok := false
- for !ok {
- err := Destinations.Deregister(backend.Backendname, destination)
- if err != nil {
- log.Fatalf("%v", err)
- return err
- }
- ok = true
- }
- }
- model.BackendList.Remove(backendname)
- }
- return nil
- }
- //StoreBackend will save the backend definition to the storage. If its already there, it will be updated
- func StoreBackend(backend model.Backend) (string, error) {
- update := false
- id := ""
- query := fmt.Sprintf("{\"backendname\": \"%s\"}", backend.Backendname)
- count, bemodels, err := dao.GetStorage().QueryModel(BackendStorageRoute, query, 0, 10)
- if err != nil {
- log.Alertf("%v", err)
- return "", err
- }
- if count > 0 {
- update = true
- bemodel := model.JSONMap(bemodels[0])
- id = bemodel["_id"].(primitive.ObjectID).Hex()
- log.Infof("found backend with id: %s", id)
- }
- jsonString, err := json.Marshal(backend)
- if err != nil {
- return "", err
- }
- jsonModel := model.JSONMap{}
- err = yaml.Unmarshal(jsonString, &jsonModel)
- if err != nil {
- return "", err
- }
- if update {
- route := model.Route{
- Backend: BackendStorageRoute.Backend,
- Apikey: BackendStorageRoute.Apikey,
- Identity: id,
- Model: BackendStorageRoute.Model,
- SystemID: BackendStorageRoute.SystemID,
- Username: BackendStorageRoute.Username,
- }
- _, err = dao.GetStorage().UpdateModel(route, jsonModel)
- if err != nil {
- return "", err
- }
- log.Infof("backend updated: %s", id)
- } else {
- id, err = dao.GetStorage().CreateModel(BackendStorageRoute, jsonModel)
- if err != nil {
- return "", err
- }
- log.Infof("backend created: %s", id)
- }
- return id, nil
- }
- //DeleteBackend deleting the backend from the storage, no data will be deleted
- func DeleteBackend(backendname string) error {
- query := fmt.Sprintf("{\"backendname\": \"%s\"}", backendname)
- count, bemodels, err := dao.GetStorage().QueryModel(BackendStorageRoute, query, 0, 10)
- if err != nil {
- log.Alertf("%v", err)
- return err
- }
- if count > 0 {
- bemodel := model.JSONMap(bemodels[0])
- id := bemodel["_id"].(primitive.ObjectID).Hex()
- log.Infof("found backend with id: %s", id)
- route := model.Route{
- Backend: BackendStorageRoute.Backend,
- Apikey: BackendStorageRoute.Apikey,
- Identity: id,
- Model: BackendStorageRoute.Model,
- SystemID: BackendStorageRoute.SystemID,
- Username: BackendStorageRoute.Username,
- }
- err = dao.GetStorage().DeleteModel(route)
- if err != nil {
- return err
- }
- log.Infof("backend deleted: %s", id)
- }
- return nil
- }
|