[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [Xen-devel] [PATCH RFC 02/59] controller: Revamp communication structure
From: George Dunlap <george.dunlap@xxxxxxxxxx> Two general-purporse channels rather than one per worker. Set up two workers, gather and collate the information at the central station. Have the worker print out a report at start-of-day, so we get timing information for the first actual report. Catch SIGINT as a shorthand testing way of managing tear-down gracefully. Signed-off-by: George Dunlap <george.dunlap@xxxxxxxxxx> --- main.go | 96 ++++++++++++++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 78 insertions(+), 18 deletions(-) diff --git a/main.go b/main.go index 6e90754..0cb9f51 100644 --- a/main.go +++ b/main.go @@ -2,13 +2,18 @@ package main import ( "fmt" + "os" "os/exec" + "os/signal" "encoding/json" "bufio" "io" + ) type Worker struct { + Id int + c *exec.Cmd stdout io.ReadCloser @@ -17,14 +22,14 @@ type Worker struct { } type WorkerReport struct { + Id int Now int Mops int MaxDelta int } -func (w *Worker) Start() (err error) { +func (w *Worker) Init() (err error) { w.c = exec.Command("../worker/worker-proc", "burnwait", "20", "20000000") - w.stdout, err = w.c.StdoutPipe() if err != nil { @@ -32,47 +37,102 @@ func (w *Worker) Start() (err error) { return } - w.c.Start() - - b, err := json.Marshal(WorkerReport{5,6,7}) - fmt.Print("Example json: ", string(b)) - return } -func (w *Worker) Wait() { - w.c.Wait() +func (w *Worker) Shutdown() { + w.c.Process.Kill() } -func (w *Worker) Process() { +func (w *Worker) Process(report chan WorkerReport, done chan bool) { + w.c.Start() + scanner := bufio.NewScanner(w.stdout) for scanner.Scan() { s := scanner.Text() - fmt.Println("Got these bytes: ", s); + //fmt.Println("Got these bytes: ", s); if w.jsonStarted { var r WorkerReport - json.Unmarshal([]byte(s), &r) - fmt.Println(r) + r.Id = w.Id + report <- r } else { if s == "START JSON" { - fmt.Println("Got token to start parsing json") + //fmt.Println("Got token to start parsing json") w.jsonStarted = true } } } + + done <- true + + w.c.Wait() +} + +const ( + USEC = 1000 + MSEC = USEC * 1000 + SEC = MSEC * 1000 +) + +type WorkerState struct { + Worker + LastReport WorkerReport +} + +func Report(ws *WorkerState, r WorkerReport) { + //fmt.Println(r) + + lr := ws.LastReport + + if (lr.Now > 0) { + time := float64(r.Now - lr.Now) / SEC + mops := r.Mops - lr.Mops + + tput := float64(mops) / time + + fmt.Printf("%d Time: %2.3f Mops: %d Tput: %4.2f\n", r.Id, time, mops, tput); + } + + ws.LastReport = r } func main() { + count := 2 + + report := make(chan WorkerReport) + done := make(chan bool) + signals := make(chan os.Signal, 1) - w:=Worker{} + signal.Notify(signals, os.Interrupt) - w.Start() + i := 0 - w.Process() + Workers := make([]WorkerState, count) + + for i = 0; i< count; i++ { + Workers[i].Id = i + + Workers[i].Init() + + go Workers[i].Process(report, done) + } - w.Wait() + for i > 0 { + select { + case r := <-report: + Report(&Workers[r.Id], r) + case <-done: + i--; + fmt.Println(i, "workers left"); + case <-signals: + fmt.Println("SIGINT receieved, shutting down workers") + for j := range Workers { + Workers[j].Shutdown() + } + } + } } -- 2.7.4 _______________________________________________ Xen-devel mailing list Xen-devel@xxxxxxxxxxxxx https://lists.xen.org/xen-devel
|
![]() |
Lists.xenproject.org is hosted with RackSpace, monitoring our |