blob: ab90825bbb41d679eecd80a048eda46523170285 [file] [log] [blame]
Shinichiro Hamajib69bf8a2015-06-10 14:52:06 +09001// Copyright 2015 Google Inc. All rights reserved
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
Fumitoshi Ukai744bb2b2015-06-25 00:10:52 +090015package kati
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +090016
17import (
18 "bufio"
19 "encoding/binary"
20 "fmt"
21 "io"
22 "os/exec"
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +090023)
24
25func btoi(b bool) int {
26 if b {
27 return 1
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +090028 }
Fumitoshi Ukai936de102015-06-08 11:21:16 +090029 return 0
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +090030}
31
32func sendMsg(w io.Writer, data []byte) {
33 for len(data) != 0 {
34 written, err := w.Write(data)
35 if err == io.EOF {
36 return
37 }
38 if err != nil {
39 panic(err)
40 }
41 data = data[written:]
42 }
43}
44
45func sendInt(w io.Writer, i int) {
46 v := int32(i)
47 binary.Write(w, binary.LittleEndian, &v)
48}
49
50func sendString(w io.Writer, s string) {
51 sendInt(w, len(s))
52 sendMsg(w, []byte(s))
53}
54
55func sendRunners(w io.Writer, runners []runner) {
56 sendInt(w, len(runners))
57 for _, r := range runners {
58 sendString(w, r.output)
59 sendString(w, r.cmd)
60 sendString(w, r.shell)
61 sendInt(w, btoi(r.echo))
62 sendInt(w, btoi(r.ignoreError))
63 }
64}
65
Fumitoshi Ukaiff4e5802015-06-25 13:12:26 +090066type paraResult struct {
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +090067 output string
68 stdout string
69 stderr string
70 status int
Shinichiro Hamajia6808422015-05-13 18:00:50 +090071 signal int
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +090072}
73
74func recvInt(r *bufio.Reader) (int, error) {
75 var v int32
76 err := binary.Read(r, binary.LittleEndian, &v)
77 return int(v), err
78}
79
80func recvString(r *bufio.Reader) (string, error) {
81 l, err := recvInt(r)
82 if err != nil {
83 return "", err
84 }
85 buf := make([]byte, l)
86 read := 0
87 for read < len(buf) {
88 r, err := r.Read(buf[read:])
89 if err != nil {
90 return "", err
91 }
92 read += r
93 }
94 return string(buf), nil
95}
96
Fumitoshi Ukaiff4e5802015-06-25 13:12:26 +090097func recvResult(r *bufio.Reader) (*paraResult, error) {
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +090098 output, err := recvString(r)
99 if err != nil {
100 return nil, err
101 }
102 stdout, err := recvString(r)
103 if err != nil {
104 return nil, err
105 }
106 stderr, err := recvString(r)
107 if err != nil {
108 return nil, err
109 }
110 status, err := recvInt(r)
111 if err != nil {
112 return nil, err
113 }
Shinichiro Hamajia6808422015-05-13 18:00:50 +0900114 signal, err := recvInt(r)
115 if err != nil {
116 return nil, err
117 }
Fumitoshi Ukaiff4e5802015-06-25 13:12:26 +0900118 return &paraResult{
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +0900119 output: output,
120 stdout: stdout,
121 stderr: stderr,
122 status: status,
Shinichiro Hamajia6808422015-05-13 18:00:50 +0900123 signal: signal,
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +0900124 }, nil
125}
126
Fumitoshi Ukaiff4e5802015-06-25 13:12:26 +0900127type paraWorker struct {
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +0900128 para *exec.Cmd
Fumitoshi Ukaiff4e5802015-06-25 13:12:26 +0900129 paraChan chan *paraResult
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +0900130 stdin io.WriteCloser
131 stdout *bufio.Reader
132 doneChan chan bool
133}
134
Fumitoshi Ukaiff4e5802015-06-25 13:12:26 +0900135func newParaWorker(paraChan chan *paraResult, numJobs int, paraPath string) *paraWorker {
Fumitoshi Ukai744bb2b2015-06-25 00:10:52 +0900136 para := exec.Command(paraPath, fmt.Sprintf("-j%d", numJobs), "--kati")
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +0900137 stdin, err := para.StdinPipe()
138 if err != nil {
139 panic(err)
140 }
141 stdout, err := para.StdoutPipe()
142 if err != nil {
143 panic(err)
144 }
145 err = para.Start()
146 if err != nil {
147 panic(err)
148 }
Fumitoshi Ukaiff4e5802015-06-25 13:12:26 +0900149 return &paraWorker{
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +0900150 para: para,
151 paraChan: paraChan,
152 stdin: stdin,
153 stdout: bufio.NewReader(stdout),
154 doneChan: make(chan bool),
155 }
156}
157
Fumitoshi Ukaiff4e5802015-06-25 13:12:26 +0900158func (para *paraWorker) Run() {
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +0900159 for {
160 r, err := recvResult(para.stdout)
161 if err == io.EOF {
162 break
163 }
164 if err != nil {
165 panic(err)
166 }
167 para.paraChan <- r
168 }
169 para.para.Process.Kill()
170 para.para.Process.Wait()
171 para.doneChan <- true
172}
173
Fumitoshi Ukaiff4e5802015-06-25 13:12:26 +0900174func (para *paraWorker) Wait() {
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +0900175 para.stdin.Close()
176 <-para.doneChan
177}
178
Fumitoshi Ukaiff4e5802015-06-25 13:12:26 +0900179func (para *paraWorker) RunCommand(runners []runner) {
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +0900180 sendRunners(para.stdin, runners)
181}