blob: 6993dfb46772657918d87439f21a2103498c6eee [file] [log] [blame]
Shinichiro Hamajib69bf8a2015-06-10 14:52:06 +09001// Copyright 2015 Google Inc. All rights reserved
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
Fumitoshi Ukai744bb2b2015-06-25 00:10:52 +090015package kati
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +090016
17import (
18 "bufio"
19 "encoding/binary"
20 "fmt"
21 "io"
22 "os/exec"
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +090023)
24
25func btoi(b bool) int {
26 if b {
27 return 1
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +090028 }
Fumitoshi Ukai936de102015-06-08 11:21:16 +090029 return 0
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +090030}
31
Fumitoshi Ukai65c72332015-06-26 21:32:50 +090032type paraConn struct {
33 w io.WriteCloser
34 r *bufio.Reader
35 err error
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +090036}
37
Fumitoshi Ukai65c72332015-06-26 21:32:50 +090038func (c *paraConn) sendMsg(data []byte) error {
39 if c.err != nil {
40 return c.err
41 }
42 _, err := c.w.Write(data)
43 c.err = err
44 return err
45}
46
47func (c *paraConn) sendInt(i int) error {
48 if c.err != nil {
49 return c.err
50 }
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +090051 v := int32(i)
Fumitoshi Ukai65c72332015-06-26 21:32:50 +090052 c.err = binary.Write(c.w, binary.LittleEndian, &v)
53 return c.err
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +090054}
55
Fumitoshi Ukai65c72332015-06-26 21:32:50 +090056func (c *paraConn) sendString(s string) error {
57 c.sendInt(len(s))
58 c.sendMsg([]byte(s))
59 return c.err
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +090060}
61
Fumitoshi Ukai65c72332015-06-26 21:32:50 +090062func (c *paraConn) sendRunners(runners []runner) error {
63 c.sendInt(len(runners))
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +090064 for _, r := range runners {
Fumitoshi Ukai65c72332015-06-26 21:32:50 +090065 c.sendString(r.output)
66 c.sendString(r.cmd)
67 c.sendString(r.shell)
68 c.sendInt(btoi(r.echo))
69 c.sendInt(btoi(r.ignoreError))
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +090070 }
Fumitoshi Ukai65c72332015-06-26 21:32:50 +090071 return c.err
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +090072}
73
Fumitoshi Ukaiff4e5802015-06-25 13:12:26 +090074type paraResult struct {
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +090075 output string
76 stdout string
77 stderr string
78 status int
Shinichiro Hamajia6808422015-05-13 18:00:50 +090079 signal int
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +090080}
81
Fumitoshi Ukai65c72332015-06-26 21:32:50 +090082func (c *paraConn) recvInt() (int, error) {
83 if c.err != nil {
84 return 0, c.err
85 }
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +090086 var v int32
Fumitoshi Ukai65c72332015-06-26 21:32:50 +090087 c.err = binary.Read(c.r, binary.LittleEndian, &v)
88 return int(v), c.err
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +090089}
90
Fumitoshi Ukai65c72332015-06-26 21:32:50 +090091func (c *paraConn) recvString() (string, error) {
92 l, err := c.recvInt()
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +090093 if err != nil {
Fumitoshi Ukai65c72332015-06-26 21:32:50 +090094 c.err = err
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +090095 return "", err
96 }
97 buf := make([]byte, l)
Fumitoshi Ukai65c72332015-06-26 21:32:50 +090098 _, c.err = io.ReadFull(c.r, buf)
99 if c.err != nil {
100 return "", c.err
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +0900101 }
102 return string(buf), nil
103}
104
Fumitoshi Ukai65c72332015-06-26 21:32:50 +0900105func (c *paraConn) recvResult() (*paraResult, error) {
106 output, _ := c.recvString()
107 stdout, _ := c.recvString()
108 stderr, _ := c.recvString()
109 status, _ := c.recvInt()
110 signal, _ := c.recvInt()
111 if c.err != nil {
112 return nil, c.err
Shinichiro Hamajia6808422015-05-13 18:00:50 +0900113 }
Fumitoshi Ukaiff4e5802015-06-25 13:12:26 +0900114 return &paraResult{
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +0900115 output: output,
116 stdout: stdout,
117 stderr: stderr,
118 status: status,
Shinichiro Hamajia6808422015-05-13 18:00:50 +0900119 signal: signal,
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +0900120 }, nil
121}
122
Fumitoshi Ukaiff4e5802015-06-25 13:12:26 +0900123type paraWorker struct {
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +0900124 para *exec.Cmd
Fumitoshi Ukaiff4e5802015-06-25 13:12:26 +0900125 paraChan chan *paraResult
Fumitoshi Ukai65c72332015-06-26 21:32:50 +0900126 c *paraConn
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +0900127 doneChan chan bool
128}
129
Fumitoshi Ukai65c72332015-06-26 21:32:50 +0900130func newParaWorker(paraChan chan *paraResult, numJobs int, paraPath string) (*paraWorker, error) {
Fumitoshi Ukai744bb2b2015-06-25 00:10:52 +0900131 para := exec.Command(paraPath, fmt.Sprintf("-j%d", numJobs), "--kati")
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +0900132 stdin, err := para.StdinPipe()
133 if err != nil {
Fumitoshi Ukai65c72332015-06-26 21:32:50 +0900134 return nil, err
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +0900135 }
136 stdout, err := para.StdoutPipe()
137 if err != nil {
Fumitoshi Ukai65c72332015-06-26 21:32:50 +0900138 return nil, err
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +0900139 }
140 err = para.Start()
141 if err != nil {
Fumitoshi Ukai65c72332015-06-26 21:32:50 +0900142 return nil, err
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +0900143 }
Fumitoshi Ukaiff4e5802015-06-25 13:12:26 +0900144 return &paraWorker{
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +0900145 para: para,
146 paraChan: paraChan,
Fumitoshi Ukai65c72332015-06-26 21:32:50 +0900147 c: &paraConn{
148 w: stdin,
149 r: bufio.NewReader(stdout),
150 },
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +0900151 doneChan: make(chan bool),
Fumitoshi Ukai65c72332015-06-26 21:32:50 +0900152 }, nil
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +0900153}
154
Fumitoshi Ukai65c72332015-06-26 21:32:50 +0900155func (para *paraWorker) Run() error {
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +0900156 for {
Fumitoshi Ukai65c72332015-06-26 21:32:50 +0900157 r, err := para.c.recvResult()
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +0900158 if err != nil {
Fumitoshi Ukai65c72332015-06-26 21:32:50 +0900159 break
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +0900160 }
161 para.paraChan <- r
162 }
163 para.para.Process.Kill()
164 para.para.Process.Wait()
165 para.doneChan <- true
Fumitoshi Ukai65c72332015-06-26 21:32:50 +0900166 return para.c.err
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +0900167}
168
Fumitoshi Ukai65c72332015-06-26 21:32:50 +0900169func (para *paraWorker) Wait() error {
170 para.c.w.Close()
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +0900171 <-para.doneChan
Fumitoshi Ukai65c72332015-06-26 21:32:50 +0900172 if para.c.err == io.EOF {
173 return nil
174 }
175 return para.c.err
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +0900176}
177
Fumitoshi Ukai65c72332015-06-26 21:32:50 +0900178func (para *paraWorker) RunCommand(runners []runner) error {
179 return para.c.sendRunners(runners)
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +0900180}