mongodao.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718
  1. package dao
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "sort"
  9. "strings"
  10. "time"
  11. "wkla.no-ip.biz/gogs/Willie/MsgService/MessageService/config"
  12. "wkla.no-ip.biz/gogs/Willie/MsgService/MessageService/internal"
  13. "wkla.no-ip.biz/gogs/Willie/MsgService/MessageService/internal/crypt"
  14. "wkla.no-ip.biz/gogs/Willie/MsgService/MessageService/internal/slicesutils"
  15. "wkla.no-ip.biz/gogs/Willie/MsgService/MessageService/logging"
  16. "wkla.no-ip.biz/gogs/Willie/MsgService/MessageService/model"
  17. "go.mongodb.org/mongo-driver/bson"
  18. "go.mongodb.org/mongo-driver/bson/primitive"
  19. "go.mongodb.org/mongo-driver/mongo"
  20. "go.mongodb.org/mongo-driver/mongo/gridfs"
  21. "go.mongodb.org/mongo-driver/mongo/options"
  22. )
  23. const timeout = 1 * time.Minute
  24. const attachmentsCollectionName = "attachments"
  25. const usersCollectionName = "users"
  26. // MongoDAO a mongodb based dao
  27. type MongoDAO struct {
  28. initialised bool
  29. client *mongo.Client
  30. mongoConfig config.MongoDB
  31. bucket gridfs.Bucket
  32. database mongo.Database
  33. ticker time.Ticker
  34. done chan bool
  35. }
  36. var log logging.ServiceLogger
  37. //InitDAO initialise the mongodb connection, build up all collections and indexes
  38. func (m *MongoDAO) InitDAO(MongoConfig config.MongoDB) {
  39. m.initialised = false
  40. m.mongoConfig = MongoConfig
  41. // uri := fmt.Sprintf("mongodb://%s:%s@%s:%d", mongoConfig.Username, mongoConfig.Password, mongoConfig.Host, mongoConfig.Port)
  42. uri := fmt.Sprintf("mongodb://%s:%d", m.mongoConfig.Host, m.mongoConfig.Port)
  43. clientOptions := options.Client()
  44. clientOptions.ApplyURI(uri)
  45. clientOptions.Auth = &options.Credential{Username: m.mongoConfig.Username, Password: m.mongoConfig.Password, AuthSource: m.mongoConfig.AuthDB}
  46. var err error
  47. m.client, err = mongo.NewClient(clientOptions)
  48. if err != nil {
  49. fmt.Printf("error: %s\n", err.Error())
  50. }
  51. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  52. defer cancel()
  53. err = m.client.Connect(ctx)
  54. if err != nil {
  55. fmt.Printf("error: %s\n", err.Error())
  56. }
  57. m.database = *m.client.Database(m.mongoConfig.Database)
  58. myBucket, err := gridfs.NewBucket(&m.database, options.GridFSBucket().SetName(attachmentsCollectionName))
  59. if err != nil {
  60. fmt.Printf("error: %s\n", err.Error())
  61. }
  62. m.bucket = *myBucket
  63. m.initialised = true
  64. }
  65. //ProcessFiles adding a file to the storage, stream like
  66. func (m *MongoDAO) ProcessFiles(RemoveCallback func(fileInfo model.FileInfo) bool) error {
  67. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  68. defer cancel()
  69. cursor, err := m.bucket.Find(bson.M{}, &options.GridFSFindOptions{})
  70. if err != nil {
  71. fmt.Printf("error: %s\n", err.Error())
  72. return err
  73. }
  74. defer cursor.Close(ctx)
  75. count := 0
  76. for cursor.Next(ctx) {
  77. var file bson.M
  78. if err = cursor.Decode(&file); err != nil {
  79. log.Alertf("%v", err)
  80. continue
  81. }
  82. metadata := file["metadata"].(bson.M)
  83. info := model.FileInfo{}
  84. info.Filename = file["filename"].(string)
  85. info.Backend = metadata["backend"].(string)
  86. info.ID = file["_id"].(primitive.ObjectID).Hex()
  87. info.UploadDate = file["uploadDate"].(primitive.DateTime).Time()
  88. RemoveCallback(info)
  89. count++
  90. }
  91. return nil
  92. }
  93. //AddFile adding a file to the storage, stream like
  94. func (m *MongoDAO) AddFile(backend string, filename string, reader io.Reader) (string, error) {
  95. uploadOpts := options.GridFSUpload().SetMetadata(bson.D{{Key: "backend", Value: backend}})
  96. fileID, err := m.bucket.UploadFromStream(filename, reader, uploadOpts)
  97. if err != nil {
  98. fmt.Printf("error: %s\n", err.Error())
  99. return "", err
  100. }
  101. log.Infof("Write file to DB was successful. File id: %s \n", fileID)
  102. id := fileID.Hex()
  103. return id, nil
  104. }
  105. //GetFilename getting the filename of an attachment from the database with the id
  106. func (m *MongoDAO) GetFilename(backend string, fileid string) (string, error) {
  107. objectID, err := primitive.ObjectIDFromHex(fileid)
  108. if err != nil {
  109. log.Alertf("%v", err)
  110. return "", err
  111. }
  112. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  113. defer cancel()
  114. cursor, err := m.bucket.Find(bson.M{"_id": objectID, "metadata.backend": backend})
  115. if err != nil {
  116. log.Alertf("%v", err)
  117. return "", err
  118. }
  119. defer cursor.Close(ctx)
  120. cursor.Next(ctx)
  121. var file bson.M
  122. var filename string
  123. if err = cursor.Decode(&file); err != nil {
  124. log.Alertf("%v", err)
  125. return "", err
  126. }
  127. filename = file["filename"].(string)
  128. return filename, nil
  129. }
  130. //GetFile getting a single file from the database with the id
  131. func (m *MongoDAO) GetFile(backend string, fileid string, stream io.Writer) error {
  132. _, err := m.GetFilename(backend, fileid)
  133. if err != nil {
  134. log.Alertf("%v", err)
  135. return err
  136. }
  137. objectID, err := primitive.ObjectIDFromHex(fileid)
  138. if err != nil {
  139. log.Alertf("%v", err)
  140. return err
  141. }
  142. _, err = m.bucket.DownloadToStream(objectID, stream)
  143. if err != nil {
  144. log.Alertf("%v", err)
  145. return err
  146. }
  147. return nil
  148. }
  149. //DeleteFile getting a single from the database with the id
  150. func (m *MongoDAO) DeleteFile(backend string, fileid string) error {
  151. _, err := m.GetFilename(backend, fileid)
  152. if err != nil {
  153. log.Alertf("%v", err)
  154. return err
  155. }
  156. objectID, err := primitive.ObjectIDFromHex(fileid)
  157. if err != nil {
  158. log.Alertf("%v", err)
  159. return err
  160. }
  161. err = m.bucket.Delete(objectID)
  162. if err != nil {
  163. log.Alertf("%v", err)
  164. return err
  165. }
  166. return nil
  167. }
  168. //GetUsers getting a list of users
  169. func (m *MongoDAO) GetUsers() ([]model.User, error) {
  170. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  171. defer cancel()
  172. usersCollection := m.database.Collection(usersCollectionName)
  173. filter := bson.M{}
  174. cursor, err := usersCollection.Find(ctx, filter)
  175. if err != nil {
  176. fmt.Printf("error: %s\n", err.Error())
  177. return nil, err
  178. }
  179. defer cursor.Close(ctx)
  180. users := make([]model.User, 0)
  181. for cursor.Next(ctx) {
  182. var user model.User
  183. if err = cursor.Decode(&user); err != nil {
  184. log.Alertf("%v", err)
  185. return nil, err
  186. }
  187. users = append(users, user)
  188. }
  189. sort.Slice(users, func(i, j int) bool {
  190. return users[i].Name < users[j].Name
  191. })
  192. return users, nil
  193. }
  194. //GetUser getting the usermodel
  195. func (m *MongoDAO) GetUser(username string) (model.User, bool) {
  196. username = strings.ToLower(username)
  197. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  198. defer cancel()
  199. usersCollection := m.database.Collection(usersCollectionName)
  200. var user model.User
  201. filter := bson.M{"name": username}
  202. err := usersCollection.FindOne(ctx, filter).Decode(&user)
  203. if err != nil {
  204. fmt.Printf("error: %s\n", err.Error())
  205. return model.User{}, false
  206. }
  207. password := user.Password
  208. hash := BuildPasswordHash(password, user.Salt)
  209. user.Password = hash
  210. return user, true
  211. }
  212. // AddUser adding a new user to the system
  213. func (m *MongoDAO) AddUser(user model.User) (model.User, error) {
  214. if user.Name == "" {
  215. return model.User{}, errors.New("username should not be empty")
  216. }
  217. user.Name = strings.ToLower(user.Name)
  218. _, ok := m.GetUser(user.Name)
  219. if ok {
  220. return model.User{}, errors.New("username already exists")
  221. }
  222. user.Salt, _ = crypt.GenerateRandomBytes(20)
  223. user.Password = BuildPasswordHash(user.Password, user.Salt)
  224. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  225. defer cancel()
  226. collection := m.database.Collection(usersCollectionName)
  227. _, err := collection.InsertOne(ctx, user)
  228. if err != nil {
  229. fmt.Printf("error: %s\n", err.Error())
  230. return model.User{}, err
  231. }
  232. return user, nil
  233. }
  234. // DeleteUser deletes one user from the system
  235. func (m *MongoDAO) DeleteUser(username string) error {
  236. if username == "" {
  237. return errors.New("username should not be empty")
  238. }
  239. username = strings.ToLower(username)
  240. _, ok := m.GetUser(username)
  241. if !ok {
  242. return errors.New("username not exists")
  243. }
  244. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  245. defer cancel()
  246. collection := m.database.Collection(usersCollectionName)
  247. filter := bson.M{"name": username}
  248. _, err := collection.DeleteOne(ctx, filter)
  249. if err != nil {
  250. fmt.Printf("error: %s\n", err.Error())
  251. return err
  252. }
  253. return nil
  254. }
  255. // ChangePWD changes the apssword of a single user
  256. func (m *MongoDAO) ChangePWD(username string, newpassword string) (model.User, error) {
  257. if username == "" {
  258. return model.User{}, errors.New("username should not be empty")
  259. }
  260. username = strings.ToLower(username)
  261. userModel, ok := m.GetUser(username)
  262. if !ok {
  263. return model.User{}, errors.New("username not registered")
  264. }
  265. newsalt, _ := crypt.GenerateRandomBytes(20)
  266. newpassword = BuildPasswordHash(newpassword, newsalt)
  267. userModel.Password = newpassword
  268. userModel.Salt = newsalt
  269. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  270. defer cancel()
  271. collection := m.database.Collection(usersCollectionName)
  272. filter := bson.M{"name": username}
  273. update := bson.D{{Key: "$set", Value: bson.D{{Key: "password", Value: newpassword}, {Key: "salt", Value: newsalt}}}}
  274. result := collection.FindOneAndUpdate(ctx, filter, update)
  275. if result.Err() != nil {
  276. fmt.Printf("error: %s\n", result.Err().Error())
  277. return model.User{}, result.Err()
  278. }
  279. return userModel, nil
  280. }
  281. //CreateModel creating a new model
  282. func (m *MongoDAO) CreateModel(route model.Route, data model.JSONMap) (string, error) {
  283. collectionName := route.GetRouteName()
  284. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  285. defer cancel()
  286. collection := m.database.Collection(collectionName)
  287. result, err := collection.InsertOne(ctx, data)
  288. if err != nil {
  289. switch v := err.(type) {
  290. case mongo.WriteException:
  291. if v.WriteErrors[0].Code == 11000 {
  292. return "", ErrUniqueIndexError
  293. }
  294. }
  295. fmt.Printf("error: %s\n", err.Error())
  296. return "", err
  297. }
  298. switch v := result.InsertedID.(type) {
  299. case primitive.ObjectID:
  300. return v.Hex(), nil
  301. }
  302. return "", ErrUnknownError
  303. }
  304. //CreateModels creates a bunch of models
  305. func (m *MongoDAO) CreateModels(route model.Route, datas []model.JSONMap) ([]string, error) {
  306. collectionName := route.GetRouteName()
  307. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  308. defer cancel()
  309. collection := m.database.Collection(collectionName)
  310. models := make([]interface{}, 0)
  311. for _, data := range datas {
  312. models = append(models, data)
  313. }
  314. result, err := collection.InsertMany(ctx, models, &options.InsertManyOptions{})
  315. if err != nil {
  316. fmt.Printf("error: %s\n", err.Error())
  317. return nil, err
  318. }
  319. modelids := make([]string, 0)
  320. for _, id := range result.InsertedIDs {
  321. switch v := id.(type) {
  322. case primitive.ObjectID:
  323. modelids = append(modelids, v.Hex())
  324. }
  325. }
  326. return modelids, nil
  327. }
  328. //GetModel getting requested model from the storage
  329. func (m *MongoDAO) GetModel(route model.Route) (model.JSONMap, error) {
  330. collectionName := route.GetRouteName()
  331. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  332. defer cancel()
  333. collection := m.database.Collection(collectionName)
  334. objectID, _ := primitive.ObjectIDFromHex(route.Identity)
  335. result := collection.FindOne(ctx, bson.M{"_id": objectID})
  336. err := result.Err()
  337. if err == mongo.ErrNoDocuments {
  338. log.Alertf("%v", err)
  339. return nil, ErrNoDocument
  340. }
  341. if err != nil {
  342. log.Alertf("%v", err)
  343. return nil, err
  344. }
  345. var bemodel model.JSONMap
  346. if err := result.Decode(&bemodel); err != nil {
  347. log.Alertf("%v", err)
  348. return nil, err
  349. }
  350. // bemodel[internal.AttributeID] = bemodel[internal.AttributeID].(primitive.ObjectID).Hex()
  351. bemodel, _ = m.convertModel(bemodel)
  352. return bemodel, nil
  353. }
  354. //CountModel counting all medelsin this collection
  355. func (m *MongoDAO) CountModel(route model.Route) (int, error) {
  356. collectionName := route.GetRouteName()
  357. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  358. defer cancel()
  359. collection := m.database.Collection(collectionName)
  360. n, err := collection.CountDocuments(ctx, bson.M{}, &options.CountOptions{})
  361. if err == mongo.ErrNoDocuments {
  362. log.Alertf("%v", err)
  363. return 0, ErrNoDocument
  364. }
  365. if err != nil {
  366. log.Alertf("%v", err)
  367. return 0, err
  368. }
  369. return int(n), nil
  370. }
  371. //QueryModel query for the right models
  372. func (m *MongoDAO) QueryModel(route model.Route, query string, offset int, limit int) (int, []model.JSONMap, error) {
  373. collectionName := route.GetRouteName()
  374. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  375. defer cancel()
  376. collection := m.database.Collection(collectionName)
  377. var queryM map[string]interface{}
  378. if query == "" {
  379. queryM = make(map[string]interface{})
  380. } else {
  381. err := json.Unmarshal([]byte(query), &queryM)
  382. if err != nil {
  383. log.Alertf("%v", err)
  384. return 0, nil, err
  385. }
  386. }
  387. queryDoc := bson.M{}
  388. for k, v := range queryM {
  389. if k == "$fulltext" {
  390. queryDoc["$text"] = bson.M{"$search": v}
  391. } else {
  392. switch v := v.(type) {
  393. // case float64:
  394. // case int:
  395. // case bool:
  396. case string:
  397. queryDoc[k] = bson.M{"$regex": v}
  398. }
  399. //queryDoc[k] = v
  400. }
  401. }
  402. //data, _ := json.Marshal(queryDoc)
  403. //log.Infof("mongoquery: %s", string(data))
  404. n, err := collection.CountDocuments(ctx, queryDoc, &options.CountOptions{Collation: &options.Collation{Locale: "en", Strength: 2}})
  405. if err != nil {
  406. log.Alertf("%v", err)
  407. return 0, nil, err
  408. }
  409. cursor, err := collection.Find(ctx, queryDoc, &options.FindOptions{Collation: &options.Collation{Locale: "en", Strength: 2}})
  410. if err != nil {
  411. log.Alertf("%v", err)
  412. return 0, nil, err
  413. }
  414. defer cursor.Close(ctx)
  415. models := make([]model.JSONMap, 0)
  416. count := 0
  417. docs := 0
  418. for cursor.Next(ctx) {
  419. if count >= offset {
  420. if (docs < limit) || (limit <= 0) {
  421. var model model.JSONMap
  422. if err = cursor.Decode(&model); err != nil {
  423. log.Alertf("%v", err)
  424. return 0, nil, err
  425. }
  426. models = append(models, model)
  427. docs++
  428. } else {
  429. break
  430. }
  431. }
  432. count++
  433. }
  434. return int(n), models, nil
  435. }
  436. //UpdateModel updateing an existing datamodel in the mongo db
  437. func (m *MongoDAO) UpdateModel(route model.Route, data model.JSONMap) (model.JSONMap, error) {
  438. collectionName := route.GetRouteName()
  439. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  440. defer cancel()
  441. collection := m.database.Collection(collectionName)
  442. objectID, _ := primitive.ObjectIDFromHex(route.Identity)
  443. delete(data, internal.AttributeID)
  444. filter := bson.M{internal.AttributeID: objectID}
  445. updateResult, err := collection.ReplaceOne(ctx, filter, data)
  446. if err != nil {
  447. return nil, err
  448. }
  449. if updateResult.ModifiedCount == 0 {
  450. return nil, ErrUnknownError
  451. }
  452. newModel, err := m.GetModel(route)
  453. if err != nil {
  454. return nil, err
  455. }
  456. return newModel, nil
  457. }
  458. //DeleteModel deleting the requested model from the storage
  459. func (m *MongoDAO) DeleteModel(route model.Route) error {
  460. collectionName := route.GetRouteName()
  461. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  462. defer cancel()
  463. collection := m.database.Collection(collectionName)
  464. objectID, _ := primitive.ObjectIDFromHex(route.Identity)
  465. filter := bson.M{internal.AttributeID: objectID}
  466. result, err := collection.DeleteOne(ctx, filter)
  467. if err != nil {
  468. return err
  469. }
  470. if result.DeletedCount != 1 {
  471. return ErrUnknownError
  472. }
  473. return nil
  474. }
  475. // GetIndexNames getting a list of index names
  476. func (m *MongoDAO) GetIndexNames(route model.Route) ([]string, error) {
  477. collection := m.database.Collection(route.GetRouteName())
  478. indexView := collection.Indexes()
  479. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  480. defer cancel()
  481. cursor, err := indexView.List(ctx)
  482. if err != nil {
  483. log.Alertf("%v", err)
  484. return nil, err
  485. }
  486. defer cursor.Close(ctx)
  487. myIndexes := make([]string, 0)
  488. for cursor.Next(ctx) {
  489. var index bson.M
  490. if err = cursor.Decode(&index); err != nil {
  491. log.Alertf("%v", err)
  492. return nil, err
  493. }
  494. name := index["name"].(string)
  495. if !strings.HasPrefix(name, "_") {
  496. if name == "$text" {
  497. name = FulltextIndexName
  498. }
  499. myIndexes = append(myIndexes, name)
  500. }
  501. }
  502. return myIndexes, nil
  503. }
  504. // DeleteIndex delete one search index
  505. func (m *MongoDAO) DeleteIndex(route model.Route, name string) error {
  506. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  507. defer cancel()
  508. collection := m.database.Collection(route.GetRouteName())
  509. _, err := collection.Indexes().DropOne(ctx, name)
  510. if err != nil {
  511. log.Alertf("%v", err)
  512. return err
  513. }
  514. return nil
  515. }
  516. //UpdateIndex create or update an index
  517. func (m *MongoDAO) UpdateIndex(route model.Route, index model.Index) error {
  518. myIndexes, err := m.GetIndexNames(route)
  519. if err != nil {
  520. log.Alertf("%v", err)
  521. return err
  522. }
  523. collection := m.database.Collection(route.GetRouteName())
  524. indexView := collection.Indexes()
  525. if !slicesutils.Contains(myIndexes, index.Name) {
  526. var indexmodel mongo.IndexModel
  527. if index.Name == FulltextIndexName {
  528. keys := bson.D{}
  529. for _, field := range index.Fields {
  530. //TODO here must be implemented the right field type
  531. keys = append(keys, primitive.E{
  532. Key: field,
  533. Value: "text",
  534. })
  535. }
  536. indexmodel = mongo.IndexModel{
  537. Keys: keys,
  538. Options: options.Index().SetName("$text"),
  539. }
  540. } else {
  541. keys := bson.D{}
  542. for _, field := range index.Fields {
  543. keys = append(keys, primitive.E{
  544. Key: field,
  545. Value: 1,
  546. })
  547. }
  548. // TODO here must be implemented the right language
  549. idxOptions := options.Index().SetName(index.Name).SetCollation(&options.Collation{Locale: "en", Strength: 2})
  550. if index.Unique {
  551. idxOptions = idxOptions.SetUnique(true)
  552. }
  553. indexmodel = mongo.IndexModel{
  554. Keys: keys,
  555. Options: idxOptions,
  556. }
  557. }
  558. // Specify the MaxTime option to limit the amount of time the operation can run on the server
  559. opts := options.CreateIndexes().SetMaxTime(2 * time.Second)
  560. name, err := indexView.CreateOne(context.TODO(), indexmodel, opts)
  561. if err != nil {
  562. log.Alertf("%v", err)
  563. return err
  564. }
  565. log.Infof("Index %s for route %s created.", name, route.GetRouteName())
  566. }
  567. return nil
  568. }
  569. // Ping pinging the mongoDao
  570. func (m *MongoDAO) Ping() error {
  571. if !m.initialised {
  572. return errors.New("mongo client not initialised")
  573. }
  574. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  575. defer cancel()
  576. return m.database.Client().Ping(ctx, nil)
  577. }
  578. // DeleteBackend dropping all data from the backend
  579. func (m *MongoDAO) DeleteBackend(backend string) error {
  580. if backend == attachmentsCollectionName || backend == usersCollectionName {
  581. return errors.New("wrong backend name")
  582. }
  583. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  584. defer cancel()
  585. collectionNames, err := m.database.ListCollectionNames(ctx, bson.D{}, &options.ListCollectionsOptions{})
  586. if err != nil {
  587. log.Alertf("%v", err)
  588. return err
  589. }
  590. for _, name := range collectionNames {
  591. if strings.HasPrefix(name, backend+".") {
  592. collection := m.database.Collection(name)
  593. err = collection.Drop(ctx)
  594. if err != nil {
  595. log.Alertf("%v", err)
  596. return err
  597. }
  598. }
  599. }
  600. filter := bson.M{"metadata.backend": backend}
  601. cursor, err := m.bucket.Find(filter, &options.GridFSFindOptions{})
  602. if err != nil {
  603. log.Alertf("%v", err)
  604. return err
  605. }
  606. defer cursor.Close(ctx)
  607. for cursor.Next(ctx) {
  608. var file bson.M
  609. if err = cursor.Decode(&file); err != nil {
  610. log.Alertf("%v", err)
  611. return err
  612. }
  613. if err = m.bucket.Delete(file["_id"]); err != nil {
  614. log.Alertf("%v", err)
  615. return err
  616. }
  617. }
  618. return nil
  619. }
  620. // DropAll dropping all data from the database
  621. func (m *MongoDAO) DropAll() {
  622. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  623. defer cancel()
  624. collectionNames, err := m.database.ListCollectionNames(ctx, bson.D{}, &options.ListCollectionsOptions{})
  625. if err != nil {
  626. log.Alertf("%v", err)
  627. return
  628. }
  629. for _, name := range collectionNames {
  630. if name != usersCollectionName {
  631. collection := m.database.Collection(name)
  632. err = collection.Drop(ctx)
  633. if err != nil {
  634. log.Alertf("%v", err)
  635. return
  636. }
  637. }
  638. }
  639. }
  640. // Stop stopping the mongodao
  641. func (m *MongoDAO) Stop() {
  642. m.ticker.Stop()
  643. m.done <- true
  644. }
  645. func (m *MongoDAO) convertModel(srcModel model.JSONMap) (model.JSONMap, error) {
  646. dstModel := srcModel
  647. for k, v := range srcModel {
  648. dstModel[k] = m.convertValue(v)
  649. }
  650. return dstModel, nil
  651. }
  652. func (m *MongoDAO) convertValue(value interface{}) interface{} {
  653. switch v := value.(type) {
  654. case primitive.ObjectID:
  655. return v.Hex()
  656. case primitive.A:
  657. items := make([]interface{}, 0)
  658. for _, itemValue := range v {
  659. items = append(items, m.convertValue(itemValue))
  660. }
  661. return items
  662. }
  663. return value
  664. }