blob: 44d0dde55dddc0d73fda3841d2b4ec483d6e4e2b [file] [log] [blame]
Marcel van Lohuizen00f345b2020-10-30 22:08:53 +01001// Copyright 2020 CUE Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Package flow provides a low-level workflow manager based on a CUE Instance.
16//
17// A Task defines an operational unit in a Workflow and corresponds to a struct
18// in a CUE instance. This package does not define what a Task looks like in a
19// CUE Instance. Instead, the user of this package must supply a TaskFunc that
20// creates a Runner for cue.Values that are deemed to be a Task.
21//
22// Tasks may depend on other tasks. Cyclic dependencies are thereby not allowed.
23// A Task A depends on another Task B if A, directly or indirectly, has a
24// reference to any field of Task B, including its root.
25//
26// Example:
27// var inst cue.Instance
28//
29// // taskFunc takes a Value v and returns a Runner if v is a Task.
30// w := flow.New(inst, taskFunc, nil)
31//
32// err := w.Run(context.Background())
33// if err != nil {
34// ...
35// }
36//
37package flow
38
39// TODO: Add hooks. This would allow UIs, for instance, to report on progress.
40//
41// - New(inst *cue.Instance, options ...Option)
42// - AddTask(v cue.Value, r Runner) *Task
43// - AddDependency(a, b *Task)
44// - AddTaskGraph(root cue.Value, fn taskFunc)
45// - AddSequence(list cue.Value, fn taskFunc)
46// - Err()
47
48// TODO:
49// Should we allow lists as a shorthand for a sequence of tasks?
50// If so, how do we specify termination behavior?
51
52// TODO:
53// Should we allow tasks to be a child of another task? Currently, the search
54// for tasks end once a task root is found.
55//
56// Semantically it is somewhat unclear to do so: for instance, if an $after
57// is used to refer to an explicit task dependency, it is logically
58// indistinguishable whether this should be a subtask or is a dependency.
59// Using higher-order constructs for analysis is generally undesirable.
60//
61// A possible solution would be to define specific "grouping tasks" whose sole
62// purpose is to define sub tasks. The user of this package would then need
63// to explicitly distinguish between tasks that are dependencies and tasks that
64// are subtasks.
65
66// TODO: streaming tasks/ server applications
67//
68// Workflows are currently implemented for batch processing, for instance to
69// implement shell scripting or other kinds of batch processing.
70//
71// This API has been designed, however, to also allow for streaming
72// applications. For instance, a streaming Task could listen for Etcd changes
73// or incoming HTTP requests and send updates each time an input changes.
74// Downstream tasks could then alternate between a Waiting and Running state.
75//
76// Note that such streaming applications would also cause configurations to
77// potentially not become increasingly more specific. Instead, a Task would
78// replace its old result each time it is updated. This would require tracking
79// of which conjunct was previously created by a task.
80
81import (
82 "context"
83
84 "cuelang.org/go/cue"
85 "cuelang.org/go/cue/errors"
86 "cuelang.org/go/internal"
87 "cuelang.org/go/internal/core/adt"
88 "cuelang.org/go/internal/core/convert"
89 "cuelang.org/go/internal/core/eval"
90 "cuelang.org/go/internal/core/runtime"
91)
92
93var (
94 // ErrAbort may be returned by a task to avoid processing downstream tasks.
95 // This can be used by control nodes to influence execution.
96 ErrAbort = errors.New("abort dependant tasks without failure")
97
98 // TODO: ErrUpdate: update and run a dependency, but don't complete a
99 // dependency as more results may come. This is useful in server mode.
100)
101
102// A TaskFunc creates a Runner for v if v defines a task or reports nil
103// otherwise. It reports an error for illformed tasks.
104type TaskFunc func(v cue.Value) (Runner, error)
105
106// A Runner executes a Task.
107type Runner interface {
108 // Run runs a Task. If any of the tasks it depends on returned an error it
109 // is passed to this task. It reports an error upon failure.
110 //
111 // Any results to be returned can be set by calling Fill on the passed task.
112 //
113 // TODO: what is a good contract for receiving and passing errors and abort.
114 //
115 // If for a returned error x errors.Is(x, ErrAbort), all dependant tasks
116 // will not be run, without this being an error.
117 Run(t *Task, err error) error
118}
119
120// A RunnerFunc runs a Task.
121type RunnerFunc func(t *Task) error
122
123func (f RunnerFunc) Run(t *Task, err error) error {
124 return f(t)
125}
126
127// A Config defines options for interpreting an Instance as a Workflow.
128type Config struct {
129 // Root limits the search for tasks to be within the path indicated to root.
130 // For the cue command, this is set to ["command"]. The default value is
131 // for all tasks to be root.
132 Root cue.Path
133
134 // InferTasks allows tasks to be defined outside of the Root. Such tasks
135 // will only be included in the workflow if any of its fields is referenced
136 // by any of the tasks defined within Root.
Marcel van Lohuizenfecdd832020-11-25 15:34:05 +0100137 //
138 // CAVEAT EMPTOR: this features is mostly provided for backwards
139 // compatibility with v0.2. A problem with this approach is that it will
140 // look for task structs within arbitrary data. So if not careful, there may
141 // be spurious matches.
Marcel van Lohuizen00f345b2020-10-30 22:08:53 +0100142 InferTasks bool
143
Marcel van Lohuizend3ff4a12020-11-16 13:37:57 +0100144 // IgnoreConcrete ignores references for which the values are already
145 // concrete and cannot change.
146 IgnoreConcrete bool
147
Marcel van Lohuizen00f345b2020-10-30 22:08:53 +0100148 // UpdateFunc is called whenever the information in the controller is
149 // updated. This includes directly after initialization. The task may be
150 // nil if this call is not the result of a task completing.
151 UpdateFunc func(c *Controller, t *Task) error
152}
153
154// A Controller defines a set of Tasks to be executed.
155type Controller struct {
156 cfg Config
157 isTask TaskFunc
158
159 inst cue.Value
160 valueSeqNum int64
161
162 env *adt.Environment
163
164 conjuncts []adt.Conjunct
165 conjunctSeq int64
166
167 taskCh chan *Task
168
169 opCtx *adt.OpContext
170 context context.Context
171 cancelFunc context.CancelFunc
172
173 // keys maps task keys to their index. This allows a recreation of the
174 // Instance while retaining the original task indices.
175 //
176 // TODO: do instance updating in place to allow for more efficient
177 // processing.
178 keys map[string]*Task
179 tasks []*Task
180
181 // Only used during task initialization.
182 nodes map[*adt.Vertex]*Task
183
184 errs errors.Error
185}
186
187// Tasks reports the tasks that are currently registered with the controller.
188//
189// This may currently only be called before Run is called or from within
190// a call to UpdateFunc. Task pointers returned by this call are not guaranteed
191// to be the same between successive calls to this method.
192func (c *Controller) Tasks() []*Task {
193 return c.tasks
194}
195
196func (c *Controller) cancel() {
197 if c.cancelFunc != nil {
198 c.cancelFunc()
199 }
200}
201
202func (c *Controller) addErr(err error, msg string) {
203 c.errs = errors.Append(c.errs, errors.Promote(err, msg))
204}
205
206// New creates a Controller for a given Instance and TaskFunc.
207func New(cfg *Config, inst *cue.Instance, f TaskFunc) *Controller {
208 v := inst.Value()
209 rx, nx := internal.CoreValue(v)
210 ctx := eval.NewContext(rx.(*runtime.Runtime), nx.(*adt.Vertex))
211
212 c := &Controller{
213 isTask: f,
214 inst: v,
215 opCtx: ctx,
216
217 taskCh: make(chan *Task),
218 keys: map[string]*Task{},
219 }
220
221 if cfg != nil {
222 c.cfg = *cfg
223 }
224
225 c.initTasks()
226 return c
227
228}
229
230// Run runs the tasks of a workflow until completion.
231func (c *Controller) Run(ctx context.Context) error {
232 c.context, c.cancelFunc = context.WithCancel(ctx)
233 defer c.cancelFunc()
234
235 c.runLoop()
236 return c.errs
237}
238
239// A State indicates the state of a Task.
240//
241// The following state diagram indicates the possible state transitions:
242//
243// Ready
244// ↗︎ ↘︎
245// Waiting ← Running
246// ↘︎ ↙︎
247// Terminated
248//
249// A Task may move from Waiting to Terminating if one of
250// the tasks on which it dependends fails.
251//
252// NOTE: transitions from Running to Waiting are currently not supported. In
253// the future this may be possible if a task depends on continuously running
254// tasks that send updates.
255//
256type State int
257
258const (
259 // Waiting indicates a task is blocked on input from another task.
260 //
261 // NOTE: although this is currently not implemented, a task could
262 // theoretically move from the Running to Waiting state.
263 Waiting State = iota
264
265 // Ready means a tasks is ready to run, but currently not running.
266 Ready
267
268 // Running indicates a goroutine is currently active for a task and that
269 // it is not Waiting.
270 Running
271
272 // Terminated means a task has stopped running either because it terminated
273 // while Running or was aborted by task on which it depends. The error
274 // value of a Task indicates the reason for the termination.
275 Terminated
276)
277
278var stateStrings = map[State]string{
279 Waiting: "Waiting",
280 Ready: "Ready",
281 Running: "Running",
282 Terminated: "Terminated",
283}
284
285// String reports a human readable string of status s.
286func (s State) String() string {
287 return stateStrings[s]
288}
289
290// A Task contains the context for a single task execution.
291type Task struct {
292 // Static
293 c *Controller
294 ctxt *adt.OpContext
295 r Runner
296
297 index int
298 path cue.Path
299 key string
300 labels []adt.Feature
301
302 // Dynamic
303 update adt.Expr
304 deps map[*Task]bool
305
306 conjunctSeq int64
307 valueSeq int64
308 v cue.Value
309 err errors.Error
310 state State
311 depTasks []*Task
312}
313
314// Context reports the Controller's Context.
315func (t *Task) Context() context.Context {
316 return t.c.context
317}
318
319// Path reports the path of Task within the Instance in which it is defined.
320func (t *Task) Path() cue.Path {
321 return t.path
322}
323
324// Index reports the sequence number of the Task. This will not change over
325// time.
326func (t *Task) Index() int {
327 return t.index
328}
329
330func (t *Task) done() bool {
331 return t.state > Running
332}
333
334func (t *Task) isReady() bool {
335 for _, d := range t.depTasks {
336 if !d.done() {
337 return false
338 }
339 }
340 return true
341}
342
343func (t *Task) vertex() *adt.Vertex {
344 _, x := internal.CoreValue(t.v)
345 return x.(*adt.Vertex)
346}
347
348func (t *Task) addDep(dep *Task) {
349 if dep == nil || dep == t {
350 return
351 }
352 if t.deps == nil {
353 t.deps = map[*Task]bool{}
354 }
355 if !t.deps[dep] {
356 t.deps[dep] = true
357 t.depTasks = append(t.depTasks, dep)
358 }
359}
360
361// Fill fills in values of the Controller's configuration for the current task.
362// The changes take effect after the task completes.
363//
364// This method may currently only be called by the runner.
365func (t *Task) Fill(x interface{}) error {
366 expr := convert.GoValueToExpr(t.ctxt, true, x)
367 if t.update == nil {
368 t.update = expr
369 return nil
370 }
371 t.update = &adt.BinaryExpr{
372 Op: adt.AndOp,
373 X: t.update,
374 Y: expr,
375 }
376 return nil
377}
378
379// Value reports the latest value of this task.
380//
381// This method may currently only be called before Run is called or after a
382// Task completed, or from within a call to UpdateFunc.
383func (t *Task) Value() cue.Value {
384 // TODO: synchronize
385 return t.v
386}
387
388// Dependencies reports the Tasks t depends on.
389//
390// This method may currently only be called before Run is called or after a
391// Task completed, or from within a call to UpdateFunc.
392func (t *Task) Dependencies() []*Task {
393 // TODO: add synchronization.
394 return t.depTasks
395}
396
397// Err returns the error of a completed Task.
398//
399// This method may currently only be called before Run is called, after a
400// Task completed, or from within a call to UpdateFunc.
401func (t *Task) Err() error {
402 return t.err
403}
404
405// State is the current state of the Task.
406//
407// This method may currently only be called before Run is called or after a
408// Task completed, or from within a call to UpdateFunc.
409func (t *Task) State() State {
410 return t.state
411}