123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718 |
- package dao
- import (
- "context"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "sort"
- "strings"
- "time"
- "wkla.no-ip.biz/gogs/Willie/MsgService/MessageService/config"
- "wkla.no-ip.biz/gogs/Willie/MsgService/MessageService/internal"
- "wkla.no-ip.biz/gogs/Willie/MsgService/MessageService/internal/crypt"
- "wkla.no-ip.biz/gogs/Willie/MsgService/MessageService/internal/slicesutils"
- "wkla.no-ip.biz/gogs/Willie/MsgService/MessageService/logging"
- "wkla.no-ip.biz/gogs/Willie/MsgService/MessageService/model"
- "go.mongodb.org/mongo-driver/bson"
- "go.mongodb.org/mongo-driver/bson/primitive"
- "go.mongodb.org/mongo-driver/mongo"
- "go.mongodb.org/mongo-driver/mongo/gridfs"
- "go.mongodb.org/mongo-driver/mongo/options"
- )
- const timeout = 1 * time.Minute
- const attachmentsCollectionName = "attachments"
- const usersCollectionName = "users"
- // MongoDAO a mongodb based dao
- type MongoDAO struct {
- initialised bool
- client *mongo.Client
- mongoConfig config.MongoDB
- bucket gridfs.Bucket
- database mongo.Database
- ticker time.Ticker
- done chan bool
- }
- var log logging.ServiceLogger
- //InitDAO initialise the mongodb connection, build up all collections and indexes
- func (m *MongoDAO) InitDAO(MongoConfig config.MongoDB) {
- m.initialised = false
- m.mongoConfig = MongoConfig
- // uri := fmt.Sprintf("mongodb://%s:%s@%s:%d", mongoConfig.Username, mongoConfig.Password, mongoConfig.Host, mongoConfig.Port)
- uri := fmt.Sprintf("mongodb://%s:%d", m.mongoConfig.Host, m.mongoConfig.Port)
- clientOptions := options.Client()
- clientOptions.ApplyURI(uri)
- clientOptions.Auth = &options.Credential{Username: m.mongoConfig.Username, Password: m.mongoConfig.Password, AuthSource: m.mongoConfig.AuthDB}
- var err error
- m.client, err = mongo.NewClient(clientOptions)
- if err != nil {
- fmt.Printf("error: %s\n", err.Error())
- }
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- defer cancel()
- err = m.client.Connect(ctx)
- if err != nil {
- fmt.Printf("error: %s\n", err.Error())
- }
- m.database = *m.client.Database(m.mongoConfig.Database)
- myBucket, err := gridfs.NewBucket(&m.database, options.GridFSBucket().SetName(attachmentsCollectionName))
- if err != nil {
- fmt.Printf("error: %s\n", err.Error())
- }
- m.bucket = *myBucket
- m.initialised = true
- }
- //ProcessFiles adding a file to the storage, stream like
- func (m *MongoDAO) ProcessFiles(RemoveCallback func(fileInfo model.FileInfo) bool) error {
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- defer cancel()
- cursor, err := m.bucket.Find(bson.M{}, &options.GridFSFindOptions{})
- if err != nil {
- fmt.Printf("error: %s\n", err.Error())
- return err
- }
- defer cursor.Close(ctx)
- count := 0
- for cursor.Next(ctx) {
- var file bson.M
- if err = cursor.Decode(&file); err != nil {
- log.Alertf("%v", err)
- continue
- }
- metadata := file["metadata"].(bson.M)
- info := model.FileInfo{}
- info.Filename = file["filename"].(string)
- info.Backend = metadata["backend"].(string)
- info.ID = file["_id"].(primitive.ObjectID).Hex()
- info.UploadDate = file["uploadDate"].(primitive.DateTime).Time()
- RemoveCallback(info)
- count++
- }
- return nil
- }
- //AddFile adding a file to the storage, stream like
- func (m *MongoDAO) AddFile(backend string, filename string, reader io.Reader) (string, error) {
- uploadOpts := options.GridFSUpload().SetMetadata(bson.D{{Key: "backend", Value: backend}})
- fileID, err := m.bucket.UploadFromStream(filename, reader, uploadOpts)
- if err != nil {
- fmt.Printf("error: %s\n", err.Error())
- return "", err
- }
- log.Infof("Write file to DB was successful. File id: %s \n", fileID)
- id := fileID.Hex()
- return id, nil
- }
- //GetFilename getting the filename of an attachment from the database with the id
- func (m *MongoDAO) GetFilename(backend string, fileid string) (string, error) {
- objectID, err := primitive.ObjectIDFromHex(fileid)
- if err != nil {
- log.Alertf("%v", err)
- return "", err
- }
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- defer cancel()
- cursor, err := m.bucket.Find(bson.M{"_id": objectID, "metadata.backend": backend})
- if err != nil {
- log.Alertf("%v", err)
- return "", err
- }
- defer cursor.Close(ctx)
- cursor.Next(ctx)
- var file bson.M
- var filename string
- if err = cursor.Decode(&file); err != nil {
- log.Alertf("%v", err)
- return "", err
- }
- filename = file["filename"].(string)
- return filename, nil
- }
- //GetFile getting a single file from the database with the id
- func (m *MongoDAO) GetFile(backend string, fileid string, stream io.Writer) error {
- _, err := m.GetFilename(backend, fileid)
- if err != nil {
- log.Alertf("%v", err)
- return err
- }
- objectID, err := primitive.ObjectIDFromHex(fileid)
- if err != nil {
- log.Alertf("%v", err)
- return err
- }
- _, err = m.bucket.DownloadToStream(objectID, stream)
- if err != nil {
- log.Alertf("%v", err)
- return err
- }
- return nil
- }
- //DeleteFile getting a single from the database with the id
- func (m *MongoDAO) DeleteFile(backend string, fileid string) error {
- _, err := m.GetFilename(backend, fileid)
- if err != nil {
- log.Alertf("%v", err)
- return err
- }
- objectID, err := primitive.ObjectIDFromHex(fileid)
- if err != nil {
- log.Alertf("%v", err)
- return err
- }
- err = m.bucket.Delete(objectID)
- if err != nil {
- log.Alertf("%v", err)
- return err
- }
- return nil
- }
- //GetUsers getting a list of users
- func (m *MongoDAO) GetUsers() ([]model.User, error) {
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- defer cancel()
- usersCollection := m.database.Collection(usersCollectionName)
- filter := bson.M{}
- cursor, err := usersCollection.Find(ctx, filter)
- if err != nil {
- fmt.Printf("error: %s\n", err.Error())
- return nil, err
- }
- defer cursor.Close(ctx)
- users := make([]model.User, 0)
- for cursor.Next(ctx) {
- var user model.User
- if err = cursor.Decode(&user); err != nil {
- log.Alertf("%v", err)
- return nil, err
- }
- users = append(users, user)
- }
- sort.Slice(users, func(i, j int) bool {
- return users[i].Name < users[j].Name
- })
- return users, nil
- }
- //GetUser getting the usermodel
- func (m *MongoDAO) GetUser(username string) (model.User, bool) {
- username = strings.ToLower(username)
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- defer cancel()
- usersCollection := m.database.Collection(usersCollectionName)
- var user model.User
- filter := bson.M{"name": username}
- err := usersCollection.FindOne(ctx, filter).Decode(&user)
- if err != nil {
- fmt.Printf("error: %s\n", err.Error())
- return model.User{}, false
- }
- password := user.Password
- hash := BuildPasswordHash(password, user.Salt)
- user.Password = hash
- return user, true
- }
- // AddUser adding a new user to the system
- func (m *MongoDAO) AddUser(user model.User) (model.User, error) {
- if user.Name == "" {
- return model.User{}, errors.New("username should not be empty")
- }
- user.Name = strings.ToLower(user.Name)
- _, ok := m.GetUser(user.Name)
- if ok {
- return model.User{}, errors.New("username already exists")
- }
- user.Salt, _ = crypt.GenerateRandomBytes(20)
- user.Password = BuildPasswordHash(user.Password, user.Salt)
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- defer cancel()
- collection := m.database.Collection(usersCollectionName)
- _, err := collection.InsertOne(ctx, user)
- if err != nil {
- fmt.Printf("error: %s\n", err.Error())
- return model.User{}, err
- }
- return user, nil
- }
- // DeleteUser deletes one user from the system
- func (m *MongoDAO) DeleteUser(username string) error {
- if username == "" {
- return errors.New("username should not be empty")
- }
- username = strings.ToLower(username)
- _, ok := m.GetUser(username)
- if !ok {
- return errors.New("username not exists")
- }
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- defer cancel()
- collection := m.database.Collection(usersCollectionName)
- filter := bson.M{"name": username}
- _, err := collection.DeleteOne(ctx, filter)
- if err != nil {
- fmt.Printf("error: %s\n", err.Error())
- return err
- }
- return nil
- }
- // ChangePWD changes the apssword of a single user
- func (m *MongoDAO) ChangePWD(username string, newpassword string) (model.User, error) {
- if username == "" {
- return model.User{}, errors.New("username should not be empty")
- }
- username = strings.ToLower(username)
- userModel, ok := m.GetUser(username)
- if !ok {
- return model.User{}, errors.New("username not registered")
- }
- newsalt, _ := crypt.GenerateRandomBytes(20)
- newpassword = BuildPasswordHash(newpassword, newsalt)
- userModel.Password = newpassword
- userModel.Salt = newsalt
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- defer cancel()
- collection := m.database.Collection(usersCollectionName)
- filter := bson.M{"name": username}
- update := bson.D{{Key: "$set", Value: bson.D{{Key: "password", Value: newpassword}, {Key: "salt", Value: newsalt}}}}
- result := collection.FindOneAndUpdate(ctx, filter, update)
- if result.Err() != nil {
- fmt.Printf("error: %s\n", result.Err().Error())
- return model.User{}, result.Err()
- }
- return userModel, nil
- }
- //CreateModel creating a new model
- func (m *MongoDAO) CreateModel(route model.Route, data model.JSONMap) (string, error) {
- collectionName := route.GetRouteName()
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- defer cancel()
- collection := m.database.Collection(collectionName)
- result, err := collection.InsertOne(ctx, data)
- if err != nil {
- switch v := err.(type) {
- case mongo.WriteException:
- if v.WriteErrors[0].Code == 11000 {
- return "", ErrUniqueIndexError
- }
- }
- fmt.Printf("error: %s\n", err.Error())
- return "", err
- }
- switch v := result.InsertedID.(type) {
- case primitive.ObjectID:
- return v.Hex(), nil
- }
- return "", ErrUnknownError
- }
- //CreateModels creates a bunch of models
- func (m *MongoDAO) CreateModels(route model.Route, datas []model.JSONMap) ([]string, error) {
- collectionName := route.GetRouteName()
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- defer cancel()
- collection := m.database.Collection(collectionName)
- models := make([]interface{}, 0)
- for _, data := range datas {
- models = append(models, data)
- }
- result, err := collection.InsertMany(ctx, models, &options.InsertManyOptions{})
- if err != nil {
- fmt.Printf("error: %s\n", err.Error())
- return nil, err
- }
- modelids := make([]string, 0)
- for _, id := range result.InsertedIDs {
- switch v := id.(type) {
- case primitive.ObjectID:
- modelids = append(modelids, v.Hex())
- }
- }
- return modelids, nil
- }
- //GetModel getting requested model from the storage
- func (m *MongoDAO) GetModel(route model.Route) (model.JSONMap, error) {
- collectionName := route.GetRouteName()
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- defer cancel()
- collection := m.database.Collection(collectionName)
- objectID, _ := primitive.ObjectIDFromHex(route.Identity)
- result := collection.FindOne(ctx, bson.M{"_id": objectID})
- err := result.Err()
- if err == mongo.ErrNoDocuments {
- log.Alertf("%v", err)
- return nil, ErrNoDocument
- }
- if err != nil {
- log.Alertf("%v", err)
- return nil, err
- }
- var bemodel model.JSONMap
- if err := result.Decode(&bemodel); err != nil {
- log.Alertf("%v", err)
- return nil, err
- }
- // bemodel[internal.AttributeID] = bemodel[internal.AttributeID].(primitive.ObjectID).Hex()
- bemodel, _ = m.convertModel(bemodel)
- return bemodel, nil
- }
- //CountModel counting all medelsin this collection
- func (m *MongoDAO) CountModel(route model.Route) (int, error) {
- collectionName := route.GetRouteName()
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- defer cancel()
- collection := m.database.Collection(collectionName)
- n, err := collection.CountDocuments(ctx, bson.M{}, &options.CountOptions{})
- if err == mongo.ErrNoDocuments {
- log.Alertf("%v", err)
- return 0, ErrNoDocument
- }
- if err != nil {
- log.Alertf("%v", err)
- return 0, err
- }
- return int(n), nil
- }
- //QueryModel query for the right models
- func (m *MongoDAO) QueryModel(route model.Route, query string, offset int, limit int) (int, []model.JSONMap, error) {
- collectionName := route.GetRouteName()
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- defer cancel()
- collection := m.database.Collection(collectionName)
- var queryM map[string]interface{}
- if query == "" {
- queryM = make(map[string]interface{})
- } else {
- err := json.Unmarshal([]byte(query), &queryM)
- if err != nil {
- log.Alertf("%v", err)
- return 0, nil, err
- }
- }
- queryDoc := bson.M{}
- for k, v := range queryM {
- if k == "$fulltext" {
- queryDoc["$text"] = bson.M{"$search": v}
- } else {
- switch v := v.(type) {
- // case float64:
- // case int:
- // case bool:
- case string:
- queryDoc[k] = bson.M{"$regex": v}
- }
- //queryDoc[k] = v
- }
- }
- //data, _ := json.Marshal(queryDoc)
- //log.Infof("mongoquery: %s", string(data))
- n, err := collection.CountDocuments(ctx, queryDoc, &options.CountOptions{Collation: &options.Collation{Locale: "en", Strength: 2}})
- if err != nil {
- log.Alertf("%v", err)
- return 0, nil, err
- }
- cursor, err := collection.Find(ctx, queryDoc, &options.FindOptions{Collation: &options.Collation{Locale: "en", Strength: 2}})
- if err != nil {
- log.Alertf("%v", err)
- return 0, nil, err
- }
- defer cursor.Close(ctx)
- models := make([]model.JSONMap, 0)
- count := 0
- docs := 0
- for cursor.Next(ctx) {
- if count >= offset {
- if (docs < limit) || (limit <= 0) {
- var model model.JSONMap
- if err = cursor.Decode(&model); err != nil {
- log.Alertf("%v", err)
- return 0, nil, err
- }
- models = append(models, model)
- docs++
- } else {
- break
- }
- }
- count++
- }
- return int(n), models, nil
- }
- //UpdateModel updateing an existing datamodel in the mongo db
- func (m *MongoDAO) UpdateModel(route model.Route, data model.JSONMap) (model.JSONMap, error) {
- collectionName := route.GetRouteName()
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- defer cancel()
- collection := m.database.Collection(collectionName)
- objectID, _ := primitive.ObjectIDFromHex(route.Identity)
- delete(data, internal.AttributeID)
- filter := bson.M{internal.AttributeID: objectID}
- updateResult, err := collection.ReplaceOne(ctx, filter, data)
- if err != nil {
- return nil, err
- }
- if updateResult.ModifiedCount == 0 {
- return nil, ErrUnknownError
- }
- newModel, err := m.GetModel(route)
- if err != nil {
- return nil, err
- }
- return newModel, nil
- }
- //DeleteModel deleting the requested model from the storage
- func (m *MongoDAO) DeleteModel(route model.Route) error {
- collectionName := route.GetRouteName()
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- defer cancel()
- collection := m.database.Collection(collectionName)
- objectID, _ := primitive.ObjectIDFromHex(route.Identity)
- filter := bson.M{internal.AttributeID: objectID}
- result, err := collection.DeleteOne(ctx, filter)
- if err != nil {
- return err
- }
- if result.DeletedCount != 1 {
- return ErrUnknownError
- }
- return nil
- }
- // GetIndexNames getting a list of index names
- func (m *MongoDAO) GetIndexNames(route model.Route) ([]string, error) {
- collection := m.database.Collection(route.GetRouteName())
- indexView := collection.Indexes()
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- defer cancel()
- cursor, err := indexView.List(ctx)
- if err != nil {
- log.Alertf("%v", err)
- return nil, err
- }
- defer cursor.Close(ctx)
- myIndexes := make([]string, 0)
- for cursor.Next(ctx) {
- var index bson.M
- if err = cursor.Decode(&index); err != nil {
- log.Alertf("%v", err)
- return nil, err
- }
- name := index["name"].(string)
- if !strings.HasPrefix(name, "_") {
- if name == "$text" {
- name = FulltextIndexName
- }
- myIndexes = append(myIndexes, name)
- }
- }
- return myIndexes, nil
- }
- // DeleteIndex delete one search index
- func (m *MongoDAO) DeleteIndex(route model.Route, name string) error {
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- defer cancel()
- collection := m.database.Collection(route.GetRouteName())
- _, err := collection.Indexes().DropOne(ctx, name)
- if err != nil {
- log.Alertf("%v", err)
- return err
- }
- return nil
- }
- //UpdateIndex create or update an index
- func (m *MongoDAO) UpdateIndex(route model.Route, index model.Index) error {
- myIndexes, err := m.GetIndexNames(route)
- if err != nil {
- log.Alertf("%v", err)
- return err
- }
- collection := m.database.Collection(route.GetRouteName())
- indexView := collection.Indexes()
- if !slicesutils.Contains(myIndexes, index.Name) {
- var indexmodel mongo.IndexModel
- if index.Name == FulltextIndexName {
- keys := bson.D{}
- for _, field := range index.Fields {
- //TODO here must be implemented the right field type
- keys = append(keys, primitive.E{
- Key: field,
- Value: "text",
- })
- }
- indexmodel = mongo.IndexModel{
- Keys: keys,
- Options: options.Index().SetName("$text"),
- }
- } else {
- keys := bson.D{}
- for _, field := range index.Fields {
- keys = append(keys, primitive.E{
- Key: field,
- Value: 1,
- })
- }
- // TODO here must be implemented the right language
- idxOptions := options.Index().SetName(index.Name).SetCollation(&options.Collation{Locale: "en", Strength: 2})
- if index.Unique {
- idxOptions = idxOptions.SetUnique(true)
- }
- indexmodel = mongo.IndexModel{
- Keys: keys,
- Options: idxOptions,
- }
- }
- // Specify the MaxTime option to limit the amount of time the operation can run on the server
- opts := options.CreateIndexes().SetMaxTime(2 * time.Second)
- name, err := indexView.CreateOne(context.TODO(), indexmodel, opts)
- if err != nil {
- log.Alertf("%v", err)
- return err
- }
- log.Infof("Index %s for route %s created.", name, route.GetRouteName())
- }
- return nil
- }
- // Ping pinging the mongoDao
- func (m *MongoDAO) Ping() error {
- if !m.initialised {
- return errors.New("mongo client not initialised")
- }
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- defer cancel()
- return m.database.Client().Ping(ctx, nil)
- }
- // DeleteBackend dropping all data from the backend
- func (m *MongoDAO) DeleteBackend(backend string) error {
- if backend == attachmentsCollectionName || backend == usersCollectionName {
- return errors.New("wrong backend name")
- }
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- defer cancel()
- collectionNames, err := m.database.ListCollectionNames(ctx, bson.D{}, &options.ListCollectionsOptions{})
- if err != nil {
- log.Alertf("%v", err)
- return err
- }
- for _, name := range collectionNames {
- if strings.HasPrefix(name, backend+".") {
- collection := m.database.Collection(name)
- err = collection.Drop(ctx)
- if err != nil {
- log.Alertf("%v", err)
- return err
- }
- }
- }
- filter := bson.M{"metadata.backend": backend}
- cursor, err := m.bucket.Find(filter, &options.GridFSFindOptions{})
- if err != nil {
- log.Alertf("%v", err)
- return err
- }
- defer cursor.Close(ctx)
- for cursor.Next(ctx) {
- var file bson.M
- if err = cursor.Decode(&file); err != nil {
- log.Alertf("%v", err)
- return err
- }
- if err = m.bucket.Delete(file["_id"]); err != nil {
- log.Alertf("%v", err)
- return err
- }
- }
- return nil
- }
- // DropAll dropping all data from the database
- func (m *MongoDAO) DropAll() {
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- defer cancel()
- collectionNames, err := m.database.ListCollectionNames(ctx, bson.D{}, &options.ListCollectionsOptions{})
- if err != nil {
- log.Alertf("%v", err)
- return
- }
- for _, name := range collectionNames {
- if name != usersCollectionName {
- collection := m.database.Collection(name)
- err = collection.Drop(ctx)
- if err != nil {
- log.Alertf("%v", err)
- return
- }
- }
- }
- }
- // Stop stopping the mongodao
- func (m *MongoDAO) Stop() {
- m.ticker.Stop()
- m.done <- true
- }
- func (m *MongoDAO) convertModel(srcModel model.JSONMap) (model.JSONMap, error) {
- dstModel := srcModel
- for k, v := range srcModel {
- dstModel[k] = m.convertValue(v)
- }
- return dstModel, nil
- }
- func (m *MongoDAO) convertValue(value interface{}) interface{} {
- switch v := value.(type) {
- case primitive.ObjectID:
- return v.Hex()
- case primitive.A:
- items := make([]interface{}, 0)
- for _, itemValue := range v {
- items = append(items, m.convertValue(itemValue))
- }
- return items
- }
- return value
- }
|