tools/flow: API for CUE-based workflow engine
This API is to replace the logic in cmd/cue/cmd and make it
available as a package.
It features improved dependency analysis that is based on
the core ADT of the new evaluator, rathter than the old cue API.
The design allows for a smooth transtion for a setup where
an instance is incrementally updated and evaluated, so allowing
for significant performance improvements down the road.
The design also allows for dynamic task dependencies and
insertion, allowing the defined tasks to grow dynamically.
Change-Id: Ib286aadffe1e819659385300debf81280327e327
Reviewed-on: https://cue-review.googlesource.com/c/cue/+/7542
Reviewed-by: CUE cueckoo <cueckoo@gmail.com>
Reviewed-by: Marcel van Lohuizen <mpvl@golang.org>
diff --git a/tools/flow/flow.go b/tools/flow/flow.go
new file mode 100644
index 0000000..5f0b412
--- /dev/null
+++ b/tools/flow/flow.go
@@ -0,0 +1,402 @@
+// Copyright 2020 CUE Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package flow provides a low-level workflow manager based on a CUE Instance.
+//
+// A Task defines an operational unit in a Workflow and corresponds to a struct
+// in a CUE instance. This package does not define what a Task looks like in a
+// CUE Instance. Instead, the user of this package must supply a TaskFunc that
+// creates a Runner for cue.Values that are deemed to be a Task.
+//
+// Tasks may depend on other tasks. Cyclic dependencies are thereby not allowed.
+// A Task A depends on another Task B if A, directly or indirectly, has a
+// reference to any field of Task B, including its root.
+//
+// Example:
+// var inst cue.Instance
+//
+// // taskFunc takes a Value v and returns a Runner if v is a Task.
+// w := flow.New(inst, taskFunc, nil)
+//
+// err := w.Run(context.Background())
+// if err != nil {
+// ...
+// }
+//
+package flow
+
+// TODO: Add hooks. This would allow UIs, for instance, to report on progress.
+//
+// - New(inst *cue.Instance, options ...Option)
+// - AddTask(v cue.Value, r Runner) *Task
+// - AddDependency(a, b *Task)
+// - AddTaskGraph(root cue.Value, fn taskFunc)
+// - AddSequence(list cue.Value, fn taskFunc)
+// - Err()
+
+// TODO:
+// Should we allow lists as a shorthand for a sequence of tasks?
+// If so, how do we specify termination behavior?
+
+// TODO:
+// Should we allow tasks to be a child of another task? Currently, the search
+// for tasks end once a task root is found.
+//
+// Semantically it is somewhat unclear to do so: for instance, if an $after
+// is used to refer to an explicit task dependency, it is logically
+// indistinguishable whether this should be a subtask or is a dependency.
+// Using higher-order constructs for analysis is generally undesirable.
+//
+// A possible solution would be to define specific "grouping tasks" whose sole
+// purpose is to define sub tasks. The user of this package would then need
+// to explicitly distinguish between tasks that are dependencies and tasks that
+// are subtasks.
+
+// TODO: streaming tasks/ server applications
+//
+// Workflows are currently implemented for batch processing, for instance to
+// implement shell scripting or other kinds of batch processing.
+//
+// This API has been designed, however, to also allow for streaming
+// applications. For instance, a streaming Task could listen for Etcd changes
+// or incoming HTTP requests and send updates each time an input changes.
+// Downstream tasks could then alternate between a Waiting and Running state.
+//
+// Note that such streaming applications would also cause configurations to
+// potentially not become increasingly more specific. Instead, a Task would
+// replace its old result each time it is updated. This would require tracking
+// of which conjunct was previously created by a task.
+
+import (
+ "context"
+
+ "cuelang.org/go/cue"
+ "cuelang.org/go/cue/errors"
+ "cuelang.org/go/internal"
+ "cuelang.org/go/internal/core/adt"
+ "cuelang.org/go/internal/core/convert"
+ "cuelang.org/go/internal/core/eval"
+ "cuelang.org/go/internal/core/runtime"
+)
+
+var (
+ // ErrAbort may be returned by a task to avoid processing downstream tasks.
+ // This can be used by control nodes to influence execution.
+ ErrAbort = errors.New("abort dependant tasks without failure")
+
+ // TODO: ErrUpdate: update and run a dependency, but don't complete a
+ // dependency as more results may come. This is useful in server mode.
+)
+
+// A TaskFunc creates a Runner for v if v defines a task or reports nil
+// otherwise. It reports an error for illformed tasks.
+type TaskFunc func(v cue.Value) (Runner, error)
+
+// A Runner executes a Task.
+type Runner interface {
+ // Run runs a Task. If any of the tasks it depends on returned an error it
+ // is passed to this task. It reports an error upon failure.
+ //
+ // Any results to be returned can be set by calling Fill on the passed task.
+ //
+ // TODO: what is a good contract for receiving and passing errors and abort.
+ //
+ // If for a returned error x errors.Is(x, ErrAbort), all dependant tasks
+ // will not be run, without this being an error.
+ Run(t *Task, err error) error
+}
+
+// A RunnerFunc runs a Task.
+type RunnerFunc func(t *Task) error
+
+func (f RunnerFunc) Run(t *Task, err error) error {
+ return f(t)
+}
+
+// A Config defines options for interpreting an Instance as a Workflow.
+type Config struct {
+ // Root limits the search for tasks to be within the path indicated to root.
+ // For the cue command, this is set to ["command"]. The default value is
+ // for all tasks to be root.
+ Root cue.Path
+
+ // InferTasks allows tasks to be defined outside of the Root. Such tasks
+ // will only be included in the workflow if any of its fields is referenced
+ // by any of the tasks defined within Root.
+ InferTasks bool
+
+ // UpdateFunc is called whenever the information in the controller is
+ // updated. This includes directly after initialization. The task may be
+ // nil if this call is not the result of a task completing.
+ UpdateFunc func(c *Controller, t *Task) error
+}
+
+// A Controller defines a set of Tasks to be executed.
+type Controller struct {
+ cfg Config
+ isTask TaskFunc
+
+ inst cue.Value
+ valueSeqNum int64
+
+ env *adt.Environment
+
+ conjuncts []adt.Conjunct
+ conjunctSeq int64
+
+ taskCh chan *Task
+
+ opCtx *adt.OpContext
+ context context.Context
+ cancelFunc context.CancelFunc
+
+ // keys maps task keys to their index. This allows a recreation of the
+ // Instance while retaining the original task indices.
+ //
+ // TODO: do instance updating in place to allow for more efficient
+ // processing.
+ keys map[string]*Task
+ tasks []*Task
+
+ // Only used during task initialization.
+ nodes map[*adt.Vertex]*Task
+
+ errs errors.Error
+}
+
+// Tasks reports the tasks that are currently registered with the controller.
+//
+// This may currently only be called before Run is called or from within
+// a call to UpdateFunc. Task pointers returned by this call are not guaranteed
+// to be the same between successive calls to this method.
+func (c *Controller) Tasks() []*Task {
+ return c.tasks
+}
+
+func (c *Controller) cancel() {
+ if c.cancelFunc != nil {
+ c.cancelFunc()
+ }
+}
+
+func (c *Controller) addErr(err error, msg string) {
+ c.errs = errors.Append(c.errs, errors.Promote(err, msg))
+}
+
+// New creates a Controller for a given Instance and TaskFunc.
+func New(cfg *Config, inst *cue.Instance, f TaskFunc) *Controller {
+ v := inst.Value()
+ rx, nx := internal.CoreValue(v)
+ ctx := eval.NewContext(rx.(*runtime.Runtime), nx.(*adt.Vertex))
+
+ c := &Controller{
+ isTask: f,
+ inst: v,
+ opCtx: ctx,
+
+ taskCh: make(chan *Task),
+ keys: map[string]*Task{},
+ }
+
+ if cfg != nil {
+ c.cfg = *cfg
+ }
+
+ c.initTasks()
+ return c
+
+}
+
+// Run runs the tasks of a workflow until completion.
+func (c *Controller) Run(ctx context.Context) error {
+ c.context, c.cancelFunc = context.WithCancel(ctx)
+ defer c.cancelFunc()
+
+ c.runLoop()
+ return c.errs
+}
+
+// A State indicates the state of a Task.
+//
+// The following state diagram indicates the possible state transitions:
+//
+// Ready
+// ↗︎ ↘︎
+// Waiting ← Running
+// ↘︎ ↙︎
+// Terminated
+//
+// A Task may move from Waiting to Terminating if one of
+// the tasks on which it dependends fails.
+//
+// NOTE: transitions from Running to Waiting are currently not supported. In
+// the future this may be possible if a task depends on continuously running
+// tasks that send updates.
+//
+type State int
+
+const (
+ // Waiting indicates a task is blocked on input from another task.
+ //
+ // NOTE: although this is currently not implemented, a task could
+ // theoretically move from the Running to Waiting state.
+ Waiting State = iota
+
+ // Ready means a tasks is ready to run, but currently not running.
+ Ready
+
+ // Running indicates a goroutine is currently active for a task and that
+ // it is not Waiting.
+ Running
+
+ // Terminated means a task has stopped running either because it terminated
+ // while Running or was aborted by task on which it depends. The error
+ // value of a Task indicates the reason for the termination.
+ Terminated
+)
+
+var stateStrings = map[State]string{
+ Waiting: "Waiting",
+ Ready: "Ready",
+ Running: "Running",
+ Terminated: "Terminated",
+}
+
+// String reports a human readable string of status s.
+func (s State) String() string {
+ return stateStrings[s]
+}
+
+// A Task contains the context for a single task execution.
+type Task struct {
+ // Static
+ c *Controller
+ ctxt *adt.OpContext
+ r Runner
+
+ index int
+ path cue.Path
+ key string
+ labels []adt.Feature
+
+ // Dynamic
+ update adt.Expr
+ deps map[*Task]bool
+
+ conjunctSeq int64
+ valueSeq int64
+ v cue.Value
+ err errors.Error
+ state State
+ depTasks []*Task
+}
+
+// Context reports the Controller's Context.
+func (t *Task) Context() context.Context {
+ return t.c.context
+}
+
+// Path reports the path of Task within the Instance in which it is defined.
+func (t *Task) Path() cue.Path {
+ return t.path
+}
+
+// Index reports the sequence number of the Task. This will not change over
+// time.
+func (t *Task) Index() int {
+ return t.index
+}
+
+func (t *Task) done() bool {
+ return t.state > Running
+}
+
+func (t *Task) isReady() bool {
+ for _, d := range t.depTasks {
+ if !d.done() {
+ return false
+ }
+ }
+ return true
+}
+
+func (t *Task) vertex() *adt.Vertex {
+ _, x := internal.CoreValue(t.v)
+ return x.(*adt.Vertex)
+}
+
+func (t *Task) addDep(dep *Task) {
+ if dep == nil || dep == t {
+ return
+ }
+ if t.deps == nil {
+ t.deps = map[*Task]bool{}
+ }
+ if !t.deps[dep] {
+ t.deps[dep] = true
+ t.depTasks = append(t.depTasks, dep)
+ }
+}
+
+// Fill fills in values of the Controller's configuration for the current task.
+// The changes take effect after the task completes.
+//
+// This method may currently only be called by the runner.
+func (t *Task) Fill(x interface{}) error {
+ expr := convert.GoValueToExpr(t.ctxt, true, x)
+ if t.update == nil {
+ t.update = expr
+ return nil
+ }
+ t.update = &adt.BinaryExpr{
+ Op: adt.AndOp,
+ X: t.update,
+ Y: expr,
+ }
+ return nil
+}
+
+// Value reports the latest value of this task.
+//
+// This method may currently only be called before Run is called or after a
+// Task completed, or from within a call to UpdateFunc.
+func (t *Task) Value() cue.Value {
+ // TODO: synchronize
+ return t.v
+}
+
+// Dependencies reports the Tasks t depends on.
+//
+// This method may currently only be called before Run is called or after a
+// Task completed, or from within a call to UpdateFunc.
+func (t *Task) Dependencies() []*Task {
+ // TODO: add synchronization.
+ return t.depTasks
+}
+
+// Err returns the error of a completed Task.
+//
+// This method may currently only be called before Run is called, after a
+// Task completed, or from within a call to UpdateFunc.
+func (t *Task) Err() error {
+ return t.err
+}
+
+// State is the current state of the Task.
+//
+// This method may currently only be called before Run is called or after a
+// Task completed, or from within a call to UpdateFunc.
+func (t *Task) State() State {
+ return t.state
+}