blob: de0f55f6cb2286124ac608e6d1712325eef2702c [file] [log] [blame]
// Copyright 2020 CUE Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package flow
// This file contains logic for running tasks.
//
// This implementation anticipates that workflows can also be used for defining
// servers, not just batch scripts. In the future, tasks may be long running and
// provide streams of results.
//
// The implementation starts a goroutine for each user-defined task, instead of
// having a fixed pool of workers. The main reason for this is that tasks are
// inherently heterogeneous and may be blocking on top of that. Also, in the
// future tasks may be long running, as discussed above.
import (
"cuelang.org/go/cue"
"cuelang.org/go/cue/errors"
"cuelang.org/go/internal"
"cuelang.org/go/internal/core/adt"
"cuelang.org/go/internal/core/eval"
"cuelang.org/go/internal/core/runtime"
)
func (c *Controller) runLoop() {
_, x := internal.CoreValue(c.inst)
root := x.(*adt.Vertex)
// Copy the initial conjuncts.
n := len(root.Conjuncts)
c.conjuncts = make([]adt.Conjunct, n, n+len(c.tasks))
copy(c.conjuncts, root.Conjuncts)
c.markReady(nil)
for c.errs == nil {
// Dispatch all unblocked tasks to workers. Only update
// the configuration when all have been dispatched.
waiting := false
running := false
// Mark tasks as Ready.
for _, t := range c.tasks {
switch t.state {
case Waiting:
waiting = true
case Ready:
running = true
t.state = Running
c.updateTaskValue(t)
rx, nx := internal.CoreValue(t.v)
t.ctxt = eval.NewContext(rx.(*runtime.Runtime), nx.(*adt.Vertex))
go func(t *Task) {
if err := t.r.Run(t, nil); err != nil {
t.err = errors.Promote(err, "task failed")
}
t.c.taskCh <- t
}(t)
case Running:
running = true
case Terminated:
}
}
if !running {
if waiting {
// Should not happen ever, as cycle detection should have caught
// this. But keep this around as a defensive measure.
c.addErr(errors.New("deadlock"), "run loop")
}
break
}
select {
case <-c.context.Done():
return
case t := <-c.taskCh:
t.state = Terminated
switch t.err {
case nil:
c.updateTaskResults(t)
case ErrAbort:
// TODO: do something cleverer.
fallthrough
default:
c.addErr(t.err, "task failure")
return
}
// Recompute the configuration, if necessary.
if c.updateValue() {
// initTasks was already called in New to catch initialization
// errors earlier.
c.initTasks()
}
c.updateTaskValue(t)
c.markReady(t)
}
}
}
func (c *Controller) markReady(t *Task) {
for _, x := range c.tasks {
if x.state == Waiting && x.isReady() {
x.state = Ready
}
}
if c.cfg.UpdateFunc != nil {
if err := c.cfg.UpdateFunc(c, t); err != nil {
c.addErr(err, "task completed")
c.cancel()
return
}
}
}
// updateValue recomputes the workflow configuration if it is out of date. It
// reports whether the values were updated.
func (c *Controller) updateValue() bool {
if c.valueSeqNum == c.conjunctSeq {
return false
}
// TODO: incrementally update results. Currently, the entire tree is
// recomputed on every update. This should not be necessary with the right
// notification structure in place.
v := &adt.Vertex{Conjuncts: c.conjuncts}
v.Finalize(c.opCtx)
c.inst = cue.MakeValue(c.opCtx, v)
c.valueSeqNum = c.conjunctSeq
return true
}
// updateTaskValue updates the value of the task in the configuration if it is
// out of date.
func (c *Controller) updateTaskValue(t *Task) {
required := t.conjunctSeq
for _, dep := range t.depTasks {
if dep.conjunctSeq > required {
required = dep.conjunctSeq
}
}
if t.valueSeq == required {
return
}
if c.valueSeqNum < required {
c.updateValue()
}
t.v = c.inst.LookupPath(t.path)
t.valueSeq = required
}
// updateTaskResults updates the result status of the task and adds any result
// values to the overall configuration.
func (c *Controller) updateTaskResults(t *Task) bool {
if t.update == nil {
return false
}
expr := t.update
for i := len(t.labels) - 1; i >= 0; i-- {
expr = &adt.StructLit{
Decls: []adt.Decl{
&adt.Field{
Label: t.labels[i],
Value: expr,
},
},
}
}
t.update = nil
// TODO: replace rather than add conjunct if this task already added a
// conjunct before. This will allow for serving applications.
c.conjuncts = append(c.conjuncts, adt.MakeRootConjunct(c.env, expr))
c.conjunctSeq++
t.conjunctSeq = c.conjunctSeq
return true
}