add dbm new model and fixed json_data

This commit is contained in:
Adrian Zürcher
2025-05-12 17:12:25 +02:00
parent 5ee97416dd
commit 836a69f914
14 changed files with 260 additions and 167 deletions

View File

@@ -21,7 +21,7 @@ func (d *DBMHandler) SaveData(c *gin.Context) {
}
r := json_dataModels.NewResponse()
r.SetMessage(fmt.Sprintf("DBM %d datapoints saved in: %v", d.GetNumbersOfDatapoints(), time.Since(s)))
d.Log.Info("db.SaveData", fmt.Sprintf("DBM %d datapoints saved in: %v", d.GetNumbersOfDatapoints(), time.Since(s)))
r.SetMessage(fmt.Sprintf("DBM %d datapoints saved in: %v", d.DBM.GetNumbersOfDatapoints(), time.Since(s)))
d.Log.Info("db.SaveData", fmt.Sprintf("DBM %d datapoints saved in: %v", d.DBM.GetNumbersOfDatapoints(), time.Since(s)))
c.JSON(http.StatusOK, r)
}

View File

@@ -12,23 +12,23 @@ import (
"github.com/tecamino/tecamino-dbm/args"
"github.com/tecamino/tecamino-dbm/models"
serverModels "github.com/tecamino/tecamino-dbm/server/models"
json_dataModels "github.com/tecamino/tecamino-json_data/models"
"github.com/tecamino/tecamino-logger/logging"
)
type DBMHandler struct {
filePath string
DB models.Datapoint
Conns *serverModels.Connections
DBM *models.DBM
Conns *serverModels.Connections
sync.RWMutex
Log *logging.Logger
arg *args.Args
Log *logging.Logger
arg *args.Args
filePath string
}
// initialze new Database Manager
// it will call cli arguments
func NewDbmHandler(a *args.Args) (*DBMHandler, error) {
//initialize new logger
logger, err := logging.NewLogger("dbmServer.log", &logging.Config{
MaxSize: 1,
MaxBackup: 3,
@@ -42,12 +42,16 @@ func NewDbmHandler(a *args.Args) (*DBMHandler, error) {
}
logger.Info("main", "start dma handler")
//initialize connection map
conns := serverModels.NewConnections()
// Initialize dtabase manager handler
dmaHandler := DBMHandler{
arg: a,
filePath: fmt.Sprintf("%s/%s.dbm", a.RootDir, a.DBMFile),
DBM: models.NewDBM(conns, logger),
Log: logger,
Conns: serverModels.NewConnections(),
Conns: conns,
}
// initialize system datapoint and periodically update it
@@ -67,16 +71,20 @@ func NewDbmHandler(a *args.Args) (*DBMHandler, error) {
// read in dtaabase file content
scanner := bufio.NewScanner(f)
var line int
for scanner.Scan() {
line++
dp := models.Datapoint{}
if err = json.Unmarshal(scanner.Bytes(), &dp); err != nil {
dmaHandler.Log.Error("dmbHandler.NewDmbHandler", "error in line "+fmt.Sprint(line)+" "+scanner.Text())
dmaHandler.Log.Error("dmbHandler.NewDmbHandler", err.Error())
return nil, err
}
dmaHandler.ImportDatapoints(dp)
dmaHandler.DBM.ImportDatapoints(dp)
}
}
dmaHandler.Log.Info("dmbHandler.NewDmbHandler", fmt.Sprintf("%d datapoint imported in %v", dmaHandler.GetNumbersOfDatapoints(), time.Since(s)))
dmaHandler.Log.Info("dmbHandler.NewDmbHandler", fmt.Sprintf("%d datapoint imported in %v", dmaHandler.DBM.GetNumbersOfDatapoints(), time.Since(s)))
return &dmaHandler, nil
}
@@ -87,7 +95,7 @@ func (d *DBMHandler) SaveDb() (err error) {
}
defer f.Close()
for _, dp := range d.DB.GetAllDatapoints(0) {
for _, dp := range d.DBM.GetAllDatapoints(0) {
//exclude System datapoints from saving
//System datapoints are used for internal purposes and should not be saved in the database
if strings.Contains(dp.Path, "System:") {
@@ -110,59 +118,3 @@ func (d *DBMHandler) SaveDb() (err error) {
}
return
}
func (d *DBMHandler) CreateDatapoints(sets ...json_dataModels.Set) ([]json_dataModels.Set, error) {
if len(sets) == 0 {
return nil, nil
}
dps, err := d.DB.CreateDatapoints(d.Conns, sets...)
if err != nil {
return nil, err
}
var ndp uint64
for _, dp := range dps {
if !dp.Updated {
ndp++
}
}
dp := d.QueryDatapoints(1, "System:Datapoints")
d.UpdateDatapointValue("System:Datapoints", dp[0].GetValueUint64()+ndp)
return dps, nil
}
func (d *DBMHandler) ImportDatapoints(dps ...models.Datapoint) error {
for _, dp := range dps {
err := d.DB.ImportDatapoint(d.Conns, dp, dp.Path)
if err != nil {
return err
}
dps := d.QueryDatapoints(1, "System:Datapoints")
d.UpdateDatapointValue("System:Datapoints", dps[0].GetValueUint64()+1)
}
return nil
}
func (d *DBMHandler) UpdateDatapointValue(path string, value any) error {
return d.DB.UpdateDatapointValue(d.Conns, value, path)
}
func (d *DBMHandler) RemoveDatapoint(sets ...json_dataModels.Set) ([]json_dataModels.Set, error) {
var lsRemoved []json_dataModels.Set
for _, set := range sets {
removed, err := d.DB.RemoveDatapoint(d.Conns, set)
if err != nil {
return lsRemoved, err
}
lsRemoved = append(lsRemoved, removed)
dp := d.QueryDatapoints(1, "System:Datapoints")
d.UpdateDatapointValue("System:Datapoints", dp[0].GetValueUint64()-1)
}
return lsRemoved, nil
}
func (d *DBMHandler) QueryDatapoints(depth uint, key string) []*models.Datapoint {
return d.DB.QueryDatapoints(depth, key)
}

View File

@@ -22,7 +22,7 @@ func (d *DBMHandler) Get(req *json_dataModels.Request, id string) {
depth = get.Query.Depth
}
for _, dp := range d.DB.QueryDatapoints(depth, get.Path) {
for _, dp := range d.DBM.QueryDatapoints(depth, get.Uuid, get.Path) {
resp.AddGet(json_dataModels.Get{
Uuid: dp.Uuid,
Path: dp.Path,

View File

@@ -9,6 +9,7 @@ import (
)
func (d *DBMHandler) Json_Data(c *gin.Context) {
var err error
payload, err := json_data.ParseRequest(c.Request.Body)
if err != nil {
@@ -27,8 +28,7 @@ func (d *DBMHandler) Json_Data(c *gin.Context) {
if get.Query != nil {
depth = get.Query.Depth
}
for _, res := range d.QueryDatapoints(depth, get.Path) {
for _, res := range d.DBM.QueryDatapoints(depth, get.Uuid, get.Path) {
respond.AddGet(json_dataModels.Get{
Uuid: res.Uuid,
Path: res.Path,
@@ -42,7 +42,7 @@ func (d *DBMHandler) Json_Data(c *gin.Context) {
}
if payload.Set != nil {
respond.Set, err = d.CreateDatapoints(payload.Set...)
respond.Set, err = d.DBM.CreateDatapoints(payload.Set...)
if err != nil {
r := json_data.NewResponse()
r.SetError()
@@ -70,7 +70,7 @@ func (d *DBMHandler) Delete(c *gin.Context) {
if payload.Set != nil {
response.Set, err = d.RemoveDatapoint(payload.Set...)
response.Set, err = d.DBM.RemoveDatapoint(payload.Set...)
if err != nil {
r := json_data.NewResponse()
r.SetError()

View File

@@ -14,7 +14,7 @@ func (d *DBMHandler) Set(req *json_dataModels.Request) {
defer d.RUnlock()
for _, set := range req.Set {
for _, dp := range d.DB.QueryDatapoints(1, set.Path) {
for _, dp := range d.DBM.QueryDatapoints(1, set.Uuid, set.Path) {
dp.UpdateValue(d.Conns, set.Value)
}
}

View File

@@ -18,7 +18,7 @@ func (d *DBMHandler) Subscribe(req *json_dataModels.Request, id string) {
resp.Id = req.Id
for _, sub := range req.Subscribe {
for _, dp := range d.DB.QueryDatapoints(sub.Depth, sub.Path) {
for _, dp := range d.DBM.QueryDatapoints(sub.Depth, sub.Uuid, sub.Path) {
if sub.Driver != "" {
if dp.Drivers == nil || dp.Drivers[sub.Driver] == nil {
continue
@@ -51,9 +51,10 @@ func (d *DBMHandler) Unsubscribe(req *json_dataModels.Request, id string) {
defer d.RUnlock()
resp := json_dataModels.NewResponse()
resp.Id = req.Id
for _, sub := range req.Unsubscribe {
for _, dp := range d.DB.QueryDatapoints(sub.Depth, sub.Path) {
for _, dp := range d.DBM.QueryDatapoints(sub.Depth, sub.Uuid, sub.Path) {
if _, ok := dp.Subscriptions[id]; !ok {
continue
}
@@ -64,7 +65,6 @@ func (d *DBMHandler) Unsubscribe(req *json_dataModels.Request, id string) {
})
}
}
if err := d.Conns.SendResponse(id, resp); err != nil {
d.Log.Error("subscribe.Unsubscribe", err.Error())
}

View File

@@ -1,11 +1,8 @@
package dbm
import (
"fmt"
"runtime"
"time"
"github.com/tecamino/tecamino-dbm/utils"
"github.com/google/uuid"
"github.com/tecamino/tecamino-dbm/models"
json_dataModels "github.com/tecamino/tecamino-json_data/models"
)
@@ -14,84 +11,19 @@ func (d *DBMHandler) AddSystemDps() (err error) {
typ := json_dataModels.LOU
rights := json_dataModels.Read
_, err = d.DB.CreateDatapoints(d.Conns, json_dataModels.Set{Path: path, Value: 0, Type: typ, Rights: rights})
_, err = d.DBM.CreateDatapoints(json_dataModels.Set{Path: path, Value: 0, Type: typ, Rights: rights})
if err != nil {
d.Log.Error("dmb.Handler.AddSystemDps", err.Error())
return err
}
dp := d.QueryDatapoints(1, path)
d.UpdateDatapointValue(path, dp[0].GetValueUint64()+1)
models.SystemDatapoints = d.DBM.QueryDatapoints(1, uuid.Nil, path)[0].Uuid
if err = d.GoSystemTime(); err != nil {
if err = d.DBM.GoSystemTime(); err != nil {
return err
}
if err = d.GoSystemMemory(); err != nil {
if err = d.DBM.GoSystemMemory(); err != nil {
return err
}
return
}
func (d *DBMHandler) GetNumbersOfDatapoints() uint64 {
return utils.Uint64From(d.DB.Datapoints["System"].Datapoints["Datapoints"].Value)
}
func (d *DBMHandler) GoSystemTime() error {
path := "System:Time"
var tOld int64
typ := json_dataModels.STR
rights := json_dataModels.Read
_, err := d.DB.CreateDatapoints(d.Conns, json_dataModels.Set{Path: path, Type: typ, Rights: rights})
if err != nil {
d.Log.Error("system.GoSystemTime", err.Error())
return err
}
dp := d.QueryDatapoints(1, "System:Datapoints")
d.UpdateDatapointValue("System:Datapoints", dp[0].GetValueUint64()+1)
go func() {
for {
t := time.Now().UnixMilli()
if tOld != t {
if er := d.DB.UpdateDatapointValue(d.Conns, time.UnixMilli(t).Format("2006-01-02 15:04:05"), path); er != nil {
d.Log.Error("dmb.Handler.AddSystemDps.UpdateDatapointValue", er.Error())
}
tOld = t
}
time.Sleep(time.Second)
}
}()
return nil
}
func (d *DBMHandler) GoSystemMemory() error {
path := "System:UsedMemory"
var m runtime.MemStats
var mOld uint64
typ := json_dataModels.STR
rights := json_dataModels.Read
_, err := d.DB.CreateDatapoints(d.Conns, json_dataModels.Set{Path: path, Type: typ, Rights: rights})
if err != nil {
d.Log.Error("system.GoSystemMemory", err.Error())
return err
}
dp := d.QueryDatapoints(1, "System:Datapoints")
d.UpdateDatapointValue("System:Datapoints", dp[0].GetValueUint64()+1)
go func() {
for {
runtime.ReadMemStats(&m)
if m.Sys != mOld {
mem := fmt.Sprintf("%.2f MB", float64(m.Sys)/1024/1024)
if er := d.DB.UpdateDatapointValue(d.Conns, mem, path); er != nil {
d.Log.Error("dmb.Handler.AddSystemDps.UpdateDatapointValue", er.Error())
}
mOld = m.Sys
}
time.Sleep(time.Second)
}
}()
return nil
}

View File

@@ -72,10 +72,12 @@ func (d *DBMHandler) readJsonData(id string) (request *json_dataModels.Request,
_, reader, err := client.Conn.Reader(client.Ctx)
if err != nil {
return request, err
d.Log.Info("webSocket.readJsonData", fmt.Sprintf("WebSocket reader: %v\n", err))
return request, nil
}
b, err := io.ReadAll(reader)
if err != nil {
code := websocket.CloseStatus(err)