package dbm import ( "bufio" "context" "encoding/json" "fmt" "os" "runtime" "sync" "time" "github.com/tecamino/tecamino-logger/logging" "github.com/zuadi/tecamino-dbm/args" "github.com/zuadi/tecamino-dbm/models" serverModels "github.com/zuadi/tecamino-dbm/server/models" ) type DBMHandler struct { filePath string DB models.Datapoint Clients serverModels.Clients sync.RWMutex Log *logging.Logger arg *args.Args } // initialze new Database Manager // it will call cli arguments func NewDbmHandler(a *args.Args) (*DBMHandler, error) { logger, err := logging.NewLogger("dbmServer.log", &logging.Config{ MaxSize: 1, MaxBackup: 3, MaxAge: 28, Debug: a.Debug, TerminalOut: true, }) if err != nil { return nil, err } logger.Info("main", "start dma") // Initialize dtabase manager handler dmaHandler := DBMHandler{ arg: a, filePath: fmt.Sprintf("%s/%s.dma", a.RootDir, a.DMAFile), Log: logger, Clients: serverModels.NewClients(), } // initialize system datapoint and periodically update it if err := dmaHandler.AddSystemDps(); err != nil { return nil, err } // check if dtabase file exists to load data if _, err := os.Stat(dmaHandler.filePath); err == nil { f, err := os.Open(dmaHandler.filePath) if err != nil { return nil, err } defer f.Close() // read in dtaabase file content scanner := bufio.NewScanner(f) for scanner.Scan() { dp := models.Datapoint{} err = json.Unmarshal(scanner.Bytes(), &dp) if err != nil { return nil, err } dmaHandler.ImportDatapoints(&dp) } } return &dmaHandler, nil } func (d *DBMHandler) SaveDb() (err error) { f, err := os.OpenFile(d.filePath, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0666) if err != nil { return err } defer f.Close() for _, dp := range d.DB.GetAllDatapoints(0) { b, er := json.Marshal(dp) if er != nil { return er } _, err = f.Write(b) if err != nil { return } _, err = f.Write([]byte("\n")) if err != nil { return } } return } func (d *DBMHandler) CreateDatapoint(typ models.Type, value any, right models.Rights, path string) error { if err := d.DB.CreateDatapoint(typ, value, right, path); err != nil { return err } dp := d.QueryDatapoints(1, "System:Datapoints") d.UpdateDatapointValue("System:Datapoints", dp[0].GetValueUint64()+1) return nil } func (d *DBMHandler) ImportDatapoints(dps ...*models.Datapoint) error { for _, dp := range dps { err := d.DB.ImportDatapoint(dp, dp.Path) if err != nil { return err } dp := d.QueryDatapoints(1, "System:Datapoints") d.UpdateDatapointValue("System:Datapoints", dp[0].GetValueUint64()+1) } return nil } func (d *DBMHandler) UpdateDatapointValue(path string, value any) error { return d.DB.UpdateDatapointValue(value, path) } func (d *DBMHandler) RemoveDatapoint(path string) error { if err := d.DB.RemoveDatapoint(path); err != nil { return err } dp := d.QueryDatapoints(1, "System:Datapoints") d.UpdateDatapointValue("System:Datapoints", dp[0].GetValueUint64()+1) return nil } func (d *DBMHandler) QueryDatapoints(depth int, key string) []*models.Datapoint { return d.DB.QueryDatapoints(depth, key) } func (d *DBMHandler) AddSystemDps() (err error) { tim := "System:Time" memory := "System:UsedMemory" var m runtime.MemStats var mOld uint64 var tOld int64 err = d.DB.CreateDatapoint(models.LOU, 0, models.Read, "System:Datapoints") if err != nil { d.Log.Error("dmb.Handler.AddSystemDps.CreateDatapoint", err.Error()) return } err = d.DB.CreateDatapoint(models.STR, nil, models.Read, tim) if err != nil { d.Log.Error("dmb.Handler.AddSystemDps.CreateDatapoint", err.Error()) return } err = d.DB.CreateDatapoint(models.STR, nil, models.Read, memory) if err != nil { d.Log.Error("dmb.Handler.AddSystemDps.CreateDatapoint", err.Error()) return } go func() { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) defer cancel() for { t := time.Now().UnixMilli() if tOld != t { if er := d.DB.UpdateDatapointValue(t, tim); er != nil { d.Log.Error("dmb.Handler.AddSystemDps.UpdateDatapointValue", er.Error()) } d.Publish(ctx, OnChange, tim, t) tOld = t } runtime.ReadMemStats(&m) if m.Sys != mOld { mem := fmt.Sprintf("%.2f MB", float64(m.Sys)/1024/1024) if er := d.DB.UpdateDatapointValue(mem, memory); er != nil { d.Log.Error("dmb.Handler.AddSystemDps.UpdateDatapointValue", er.Error()) } d.Publish(ctx, OnChange, memory, mem) mOld = m.Sys } time.Sleep(1 * time.Second) } }() return }