Compare commits
2 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
567d2fe03d | ||
![]() |
756985882a |
@@ -57,8 +57,11 @@ var upgrader = websocket.Upgrader{
|
|||||||
EnableCompression: false,
|
EnableCompression: false,
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewClient(ip, id string, port uint, logger *logging.Logger) (*Client, error) {
|
func NewClient(id, ip, endpoint string, port uint, logger *logging.Logger) (*Client, error) {
|
||||||
u := url.URL{Scheme: "ws", Host: fmt.Sprintf("%s:%d", ip, port), Path: "status", RawQuery: "id=" + id}
|
if endpoint == "" {
|
||||||
|
endpoint = "status"
|
||||||
|
}
|
||||||
|
u := url.URL{Scheme: "ws", Host: fmt.Sprintf("%s:%d", ip, port), Path: endpoint, RawQuery: "id=" + id}
|
||||||
|
|
||||||
c := &Client{
|
c := &Client{
|
||||||
id: id,
|
id: id,
|
||||||
|
@@ -43,7 +43,7 @@ func NewStatusClient(serviceName, ip string, port uint, debug bool) (*StatusClie
|
|||||||
}
|
}
|
||||||
|
|
||||||
// connect underlying websocket client
|
// connect underlying websocket client
|
||||||
sc.client, err = models.NewClient(ip, serviceName, port, logger)
|
sc.client, err = models.NewClient(serviceName, ip, "", port, logger)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@@ -1,10 +1,8 @@
|
|||||||
package statusServer
|
package statusServer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"runtime"
|
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -97,56 +95,12 @@ func NewStatusServer(config models.Config) (*StatusServer, error) {
|
|||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ServeHttp starts the HTTP server and continuously publishes runtime metrics
|
// ServeHttp starts the HTTP server
|
||||||
// (memory allocation + GC count) to the "info" topic.
|
|
||||||
//
|
|
||||||
// It blocks until the HTTP server stops. Resources are cleaned up on shutdown:
|
// It blocks until the HTTP server stops. Resources are cleaned up on shutdown:
|
||||||
// PubSub is closed and a shutdown message is logged.
|
// PubSub is closed and a shutdown message is logged.
|
||||||
func (s *StatusServer) ServeHttp() error {
|
func (s *StatusServer) ServeHttp() error {
|
||||||
s.log.Debug("ServeHttp", "start publishing runtime metrics (memory + GC count)")
|
s.log.Debug("ServeHttp", "start publishing runtime metrics (memory + GC count)")
|
||||||
|
|
||||||
// create cancellable context for metrics goroutine
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
// metrics publisher goroutine
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
var init bool
|
|
||||||
var m runtime.MemStats
|
|
||||||
var oldM uint64
|
|
||||||
var oldGc uint32
|
|
||||||
var info struct {
|
|
||||||
Memory uint64 `json:"memory"`
|
|
||||||
GC uint32 `json:"gc"`
|
|
||||||
}
|
|
||||||
|
|
||||||
ticker := time.NewTicker(500 * time.Millisecond)
|
|
||||||
defer ticker.Stop()
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
// stop publishing if context is canceled
|
|
||||||
return
|
|
||||||
case <-ticker.C:
|
|
||||||
// read runtime memory statistics
|
|
||||||
runtime.ReadMemStats(&m)
|
|
||||||
info.GC = m.NumGC
|
|
||||||
|
|
||||||
// publish only when values changed or first run
|
|
||||||
if oldGc != m.NumGC || oldM != m.Alloc/1024 || !init {
|
|
||||||
info.Memory = m.Alloc / 1024
|
|
||||||
s.pubSub.Publish("info", info)
|
|
||||||
oldGc = m.NumGC
|
|
||||||
oldM = m.Alloc / 1024
|
|
||||||
init = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
// log startup
|
// log startup
|
||||||
s.log.Info("ServeHttp", fmt.Sprintf("start listening on %s:%d", s.Ip, s.Port))
|
s.log.Info("ServeHttp", fmt.Sprintf("start listening on %s:%d", s.Ip, s.Port))
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user