implement new json_data model

This commit is contained in:
Adrian Zürcher
2025-04-29 08:31:06 +02:00
parent 0a137c9d86
commit 49d8d03d8a
25 changed files with 609 additions and 572 deletions

View File

@@ -2,24 +2,23 @@ package dbm
import (
"bufio"
"context"
"encoding/json"
"fmt"
"os"
"runtime"
"sync"
"time"
"github.com/tecamino/tecamino-dbm/args"
"github.com/tecamino/tecamino-dbm/models"
serverModels "github.com/tecamino/tecamino-dbm/server/models"
json_dataModels "github.com/tecamino/tecamino-json_data/models"
"github.com/tecamino/tecamino-logger/logging"
"github.com/zuadi/tecamino-dbm/args"
"github.com/zuadi/tecamino-dbm/models"
serverModels "github.com/zuadi/tecamino-dbm/server/models"
)
type DBMHandler struct {
filePath string
DB models.Datapoint
Clients serverModels.Clients
Conns *serverModels.Connections
sync.RWMutex
Log *logging.Logger
arg *args.Args
@@ -40,14 +39,14 @@ func NewDbmHandler(a *args.Args) (*DBMHandler, error) {
if err != nil {
return nil, err
}
logger.Info("main", "start dma")
logger.Info("main", "start dma handler")
// Initialize dtabase manager handler
dmaHandler := DBMHandler{
arg: a,
filePath: fmt.Sprintf("%s/%s.dma", a.RootDir, a.DMAFile),
Log: logger,
Clients: serverModels.NewClients(),
Conns: serverModels.NewConnections(),
}
// initialize system datapoint and periodically update it
@@ -56,6 +55,7 @@ func NewDbmHandler(a *args.Args) (*DBMHandler, error) {
}
// check if dtabase file exists to load data
s := time.Now()
if _, err := os.Stat(dmaHandler.filePath); err == nil {
f, err := os.Open(dmaHandler.filePath)
@@ -69,14 +69,13 @@ func NewDbmHandler(a *args.Args) (*DBMHandler, error) {
for scanner.Scan() {
dp := models.Datapoint{}
err = json.Unmarshal(scanner.Bytes(), &dp)
if err != nil {
if err = json.Unmarshal(scanner.Bytes(), &dp); err != nil {
return nil, err
}
dmaHandler.ImportDatapoints(&dp)
dmaHandler.ImportDatapoints(dp)
}
}
dmaHandler.Log.Info("dmbHandler.NewDmbHandler", fmt.Sprintf("%d datapoint imported in %v", dmaHandler.GetNumbersOfDatapoints(), time.Since(s)))
return &dmaHandler, nil
}
@@ -105,18 +104,30 @@ func (d *DBMHandler) SaveDb() (err error) {
return
}
func (d *DBMHandler) CreateDatapoint(typ models.Type, value any, right models.Rights, path string) error {
if err := d.DB.CreateDatapoint(typ, value, right, path); err != nil {
return err
func (d *DBMHandler) CreateDatapoints(sets ...json_dataModels.Set) ([]json_dataModels.Set, error) {
if len(sets) == 0 {
return nil, nil
}
dps, err := d.DB.CreateDatapoints(d.Conns, sets...)
if err != nil {
return nil, err
}
var ndp uint64
for _, dp := range dps {
if !dp.Updated {
ndp++
}
}
dp := d.QueryDatapoints(1, "System:Datapoints")
d.UpdateDatapointValue("System:Datapoints", dp[0].GetValueUint64()+1)
return nil
d.UpdateDatapointValue("System:Datapoints", dp[0].GetValueUint64()+ndp)
return dps, nil
}
func (d *DBMHandler) ImportDatapoints(dps ...*models.Datapoint) error {
func (d *DBMHandler) ImportDatapoints(dps ...models.Datapoint) error {
for _, dp := range dps {
err := d.DB.ImportDatapoint(dp, dp.Path)
err := d.DB.ImportDatapoint(d.Conns, dp, dp.Path)
if err != nil {
return err
}
@@ -127,71 +138,24 @@ func (d *DBMHandler) ImportDatapoints(dps ...*models.Datapoint) error {
}
func (d *DBMHandler) UpdateDatapointValue(path string, value any) error {
return d.DB.UpdateDatapointValue(value, path)
return d.DB.UpdateDatapointValue(d.Conns, value, path)
}
func (d *DBMHandler) RemoveDatapoint(path string) error {
if err := d.DB.RemoveDatapoint(path); err != nil {
return err
func (d *DBMHandler) RemoveDatapoint(sets ...json_dataModels.Set) ([]json_dataModels.Set, error) {
var lsRemoved []json_dataModels.Set
for _, set := range sets {
removed, err := d.DB.RemoveDatapoint(d.Conns, set)
if err != nil {
return lsRemoved, err
}
lsRemoved = append(lsRemoved, removed)
dp := d.QueryDatapoints(1, "System:Datapoints")
d.UpdateDatapointValue("System:Datapoints", dp[0].GetValueUint64()-1)
}
dp := d.QueryDatapoints(1, "System:Datapoints")
d.UpdateDatapointValue("System:Datapoints", dp[0].GetValueUint64()+1)
return nil
return lsRemoved, nil
}
func (d *DBMHandler) QueryDatapoints(depth int, key string) []*models.Datapoint {
func (d *DBMHandler) QueryDatapoints(depth uint, key string) []*models.Datapoint {
return d.DB.QueryDatapoints(depth, key)
}
func (d *DBMHandler) AddSystemDps() (err error) {
tim := "System:Time"
memory := "System:UsedMemory"
var m runtime.MemStats
var mOld uint64
var tOld int64
err = d.DB.CreateDatapoint(models.LOU, 0, models.Read, "System:Datapoints")
if err != nil {
d.Log.Error("dmb.Handler.AddSystemDps.CreateDatapoint", err.Error())
return
}
err = d.DB.CreateDatapoint(models.STR, nil, models.Read, tim)
if err != nil {
d.Log.Error("dmb.Handler.AddSystemDps.CreateDatapoint", err.Error())
return
}
err = d.DB.CreateDatapoint(models.STR, nil, models.Read, memory)
if err != nil {
d.Log.Error("dmb.Handler.AddSystemDps.CreateDatapoint", err.Error())
return
}
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
for {
t := time.Now().UnixMilli()
if tOld != t {
if er := d.DB.UpdateDatapointValue(t, tim); er != nil {
d.Log.Error("dmb.Handler.AddSystemDps.UpdateDatapointValue", er.Error())
}
d.Publish(ctx, OnChange, tim, t)
tOld = t
}
runtime.ReadMemStats(&m)
if m.Sys != mOld {
mem := fmt.Sprintf("%.2f MB", float64(m.Sys)/1024/1024)
if er := d.DB.UpdateDatapointValue(mem, memory); er != nil {
d.Log.Error("dmb.Handler.AddSystemDps.UpdateDatapointValue", er.Error())
}
d.Publish(ctx, OnChange, memory, mem)
mOld = m.Sys
}
time.Sleep(1 * time.Second)
}
}()
return
}