fix panic based error reporting
diff --git a/para.go b/para.go
index ab90825..6993dfb 100644
--- a/para.go
+++ b/para.go
@@ -29,38 +29,46 @@
return 0
}
-func sendMsg(w io.Writer, data []byte) {
- for len(data) != 0 {
- written, err := w.Write(data)
- if err == io.EOF {
- return
- }
- if err != nil {
- panic(err)
- }
- data = data[written:]
- }
+type paraConn struct {
+ w io.WriteCloser
+ r *bufio.Reader
+ err error
}
-func sendInt(w io.Writer, i int) {
+func (c *paraConn) sendMsg(data []byte) error {
+ if c.err != nil {
+ return c.err
+ }
+ _, err := c.w.Write(data)
+ c.err = err
+ return err
+}
+
+func (c *paraConn) sendInt(i int) error {
+ if c.err != nil {
+ return c.err
+ }
v := int32(i)
- binary.Write(w, binary.LittleEndian, &v)
+ c.err = binary.Write(c.w, binary.LittleEndian, &v)
+ return c.err
}
-func sendString(w io.Writer, s string) {
- sendInt(w, len(s))
- sendMsg(w, []byte(s))
+func (c *paraConn) sendString(s string) error {
+ c.sendInt(len(s))
+ c.sendMsg([]byte(s))
+ return c.err
}
-func sendRunners(w io.Writer, runners []runner) {
- sendInt(w, len(runners))
+func (c *paraConn) sendRunners(runners []runner) error {
+ c.sendInt(len(runners))
for _, r := range runners {
- sendString(w, r.output)
- sendString(w, r.cmd)
- sendString(w, r.shell)
- sendInt(w, btoi(r.echo))
- sendInt(w, btoi(r.ignoreError))
+ c.sendString(r.output)
+ c.sendString(r.cmd)
+ c.sendString(r.shell)
+ c.sendInt(btoi(r.echo))
+ c.sendInt(btoi(r.ignoreError))
}
+ return c.err
}
type paraResult struct {
@@ -71,49 +79,37 @@
signal int
}
-func recvInt(r *bufio.Reader) (int, error) {
+func (c *paraConn) recvInt() (int, error) {
+ if c.err != nil {
+ return 0, c.err
+ }
var v int32
- err := binary.Read(r, binary.LittleEndian, &v)
- return int(v), err
+ c.err = binary.Read(c.r, binary.LittleEndian, &v)
+ return int(v), c.err
}
-func recvString(r *bufio.Reader) (string, error) {
- l, err := recvInt(r)
+func (c *paraConn) recvString() (string, error) {
+ l, err := c.recvInt()
if err != nil {
+ c.err = err
return "", err
}
buf := make([]byte, l)
- read := 0
- for read < len(buf) {
- r, err := r.Read(buf[read:])
- if err != nil {
- return "", err
- }
- read += r
+ _, c.err = io.ReadFull(c.r, buf)
+ if c.err != nil {
+ return "", c.err
}
return string(buf), nil
}
-func recvResult(r *bufio.Reader) (*paraResult, error) {
- output, err := recvString(r)
- if err != nil {
- return nil, err
- }
- stdout, err := recvString(r)
- if err != nil {
- return nil, err
- }
- stderr, err := recvString(r)
- if err != nil {
- return nil, err
- }
- status, err := recvInt(r)
- if err != nil {
- return nil, err
- }
- signal, err := recvInt(r)
- if err != nil {
- return nil, err
+func (c *paraConn) recvResult() (*paraResult, error) {
+ output, _ := c.recvString()
+ stdout, _ := c.recvString()
+ stderr, _ := c.recvString()
+ status, _ := c.recvInt()
+ signal, _ := c.recvInt()
+ if c.err != nil {
+ return nil, c.err
}
return ¶Result{
output: output,
@@ -127,55 +123,58 @@
type paraWorker struct {
para *exec.Cmd
paraChan chan *paraResult
- stdin io.WriteCloser
- stdout *bufio.Reader
+ c *paraConn
doneChan chan bool
}
-func newParaWorker(paraChan chan *paraResult, numJobs int, paraPath string) *paraWorker {
+func newParaWorker(paraChan chan *paraResult, numJobs int, paraPath string) (*paraWorker, error) {
para := exec.Command(paraPath, fmt.Sprintf("-j%d", numJobs), "--kati")
stdin, err := para.StdinPipe()
if err != nil {
- panic(err)
+ return nil, err
}
stdout, err := para.StdoutPipe()
if err != nil {
- panic(err)
+ return nil, err
}
err = para.Start()
if err != nil {
- panic(err)
+ return nil, err
}
return ¶Worker{
para: para,
paraChan: paraChan,
- stdin: stdin,
- stdout: bufio.NewReader(stdout),
+ c: ¶Conn{
+ w: stdin,
+ r: bufio.NewReader(stdout),
+ },
doneChan: make(chan bool),
- }
+ }, nil
}
-func (para *paraWorker) Run() {
+func (para *paraWorker) Run() error {
for {
- r, err := recvResult(para.stdout)
- if err == io.EOF {
- break
- }
+ r, err := para.c.recvResult()
if err != nil {
- panic(err)
+ break
}
para.paraChan <- r
}
para.para.Process.Kill()
para.para.Process.Wait()
para.doneChan <- true
+ return para.c.err
}
-func (para *paraWorker) Wait() {
- para.stdin.Close()
+func (para *paraWorker) Wait() error {
+ para.c.w.Close()
<-para.doneChan
+ if para.c.err == io.EOF {
+ return nil
+ }
+ return para.c.err
}
-func (para *paraWorker) RunCommand(runners []runner) {
- sendRunners(para.stdin, runners)
+func (para *paraWorker) RunCommand(runners []runner) error {
+ return para.c.sendRunners(runners)
}