|
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [Xen-devel] [PATCH RFC 02/59] controller: Revamp communication structure
From: George Dunlap <george.dunlap@xxxxxxxxxx>
Two general-purporse channels rather than one per worker.
Set up two workers, gather and collate the information at the central
station.
Have the worker print out a report at start-of-day, so we get timing
information for the first actual report.
Catch SIGINT as a shorthand testing way of managing tear-down
gracefully.
Signed-off-by: George Dunlap <george.dunlap@xxxxxxxxxx>
---
main.go | 96 ++++++++++++++++++++++++++++++++++++++++++++++++++++-------------
1 file changed, 78 insertions(+), 18 deletions(-)
diff --git a/main.go b/main.go
index 6e90754..0cb9f51 100644
--- a/main.go
+++ b/main.go
@@ -2,13 +2,18 @@ package main
import (
"fmt"
+ "os"
"os/exec"
+ "os/signal"
"encoding/json"
"bufio"
"io"
+
)
type Worker struct {
+ Id int
+
c *exec.Cmd
stdout io.ReadCloser
@@ -17,14 +22,14 @@ type Worker struct {
}
type WorkerReport struct {
+ Id int
Now int
Mops int
MaxDelta int
}
-func (w *Worker) Start() (err error) {
+func (w *Worker) Init() (err error) {
w.c = exec.Command("../worker/worker-proc", "burnwait", "20",
"20000000")
-
w.stdout, err = w.c.StdoutPipe()
if err != nil {
@@ -32,47 +37,102 @@ func (w *Worker) Start() (err error) {
return
}
- w.c.Start()
-
- b, err := json.Marshal(WorkerReport{5,6,7})
- fmt.Print("Example json: ", string(b))
-
return
}
-func (w *Worker) Wait() {
- w.c.Wait()
+func (w *Worker) Shutdown() {
+ w.c.Process.Kill()
}
-func (w *Worker) Process() {
+func (w *Worker) Process(report chan WorkerReport, done chan bool) {
+ w.c.Start()
+
scanner := bufio.NewScanner(w.stdout)
for scanner.Scan() {
s := scanner.Text()
- fmt.Println("Got these bytes: ", s);
+ //fmt.Println("Got these bytes: ", s);
if w.jsonStarted {
var r WorkerReport
-
json.Unmarshal([]byte(s), &r)
- fmt.Println(r)
+ r.Id = w.Id
+ report <- r
} else {
if s == "START JSON" {
- fmt.Println("Got token to start parsing json")
+ //fmt.Println("Got token to start parsing json")
w.jsonStarted = true
}
}
}
+
+ done <- true
+
+ w.c.Wait()
+}
+
+const (
+ USEC = 1000
+ MSEC = USEC * 1000
+ SEC = MSEC * 1000
+)
+
+type WorkerState struct {
+ Worker
+ LastReport WorkerReport
+}
+
+func Report(ws *WorkerState, r WorkerReport) {
+ //fmt.Println(r)
+
+ lr := ws.LastReport
+
+ if (lr.Now > 0) {
+ time := float64(r.Now - lr.Now) / SEC
+ mops := r.Mops - lr.Mops
+
+ tput := float64(mops) / time
+
+ fmt.Printf("%d Time: %2.3f Mops: %d Tput: %4.2f\n", r.Id, time,
mops, tput);
+ }
+
+ ws.LastReport = r
}
func main() {
+ count := 2
+
+ report := make(chan WorkerReport)
+ done := make(chan bool)
+ signals := make(chan os.Signal, 1)
- w:=Worker{}
+ signal.Notify(signals, os.Interrupt)
- w.Start()
+ i := 0
- w.Process()
+ Workers := make([]WorkerState, count)
+
+ for i = 0; i< count; i++ {
+ Workers[i].Id = i
+
+ Workers[i].Init()
+
+ go Workers[i].Process(report, done)
+ }
- w.Wait()
+ for i > 0 {
+ select {
+ case r := <-report:
+ Report(&Workers[r.Id], r)
+ case <-done:
+ i--;
+ fmt.Println(i, "workers left");
+ case <-signals:
+ fmt.Println("SIGINT receieved, shutting down workers")
+ for j := range Workers {
+ Workers[j].Shutdown()
+ }
+ }
+ }
}
--
2.7.4
_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxx
https://lists.xen.org/xen-devel
|
![]() |
Lists.xenproject.org is hosted with RackSpace, monitoring our |