// Copyright 2020 CUE Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package flow provides a low-level workflow manager based on a CUE Instance.
//
// A Task defines an operational unit in a Workflow and corresponds to a struct
// in a CUE instance. This package does not define what a Task looks like in a
// CUE Instance. Instead, the user of this package must supply a TaskFunc that
// creates a Runner for cue.Values that are deemed to be a Task.
//
// Tasks may depend on other tasks. Cyclic dependencies are thereby not allowed.
// A Task A depends on another Task B if A, directly or indirectly, has a
// reference to any field of Task B, including its root.
//
// Example:
//   var inst cue.Instance
//
//   // taskFunc takes a Value v and returns a Runner if v is a Task.
//   w := flow.New(inst, taskFunc, nil)
//
//   err := w.Run(context.Background())
//   if err != nil {
//       ...
//   }
//
package flow

// TODO: Add hooks. This would allow UIs, for instance, to report on progress.
//
// - New(inst *cue.Instance, options ...Option)
// - AddTask(v cue.Value, r Runner) *Task
// - AddDependency(a, b *Task)
// - AddTaskGraph(root cue.Value, fn taskFunc)
// - AddSequence(list cue.Value, fn taskFunc)
// - Err()

// TODO:
// Should we allow lists as a shorthand for a sequence of tasks?
// If so, how do we specify termination behavior?

// TODO:
// Should we allow tasks to be a child of another task? Currently, the search
// for tasks end once a task root is found.
//
// Semantically it is somewhat unclear to do so: for instance, if an $after
// is used to refer to an explicit task dependency, it is logically
// indistinguishable whether this should be a subtask or is a dependency.
// Using higher-order constructs for analysis is generally undesirable.
//
// A possible solution would be to define specific "grouping tasks" whose sole
// purpose is to define sub tasks. The user of this package would then need
// to explicitly distinguish between tasks that are dependencies and tasks that
// are subtasks.

// TODO: streaming tasks/ server applications
//
// Workflows are currently implemented for batch processing, for instance to
// implement shell scripting or other kinds of batch processing.
//
// This API has been designed, however, to also allow for streaming
// applications. For instance, a streaming Task could listen for Etcd changes
// or incoming HTTP requests and send updates each time an input changes.
// Downstream tasks could then alternate between a Waiting and Running state.
//
// Note that such streaming applications would also cause configurations to
// potentially not become increasingly more specific. Instead, a Task would
// replace its old result each time it is updated. This would require tracking
// of which conjunct was previously created by a task.

import (
	"context"

	"cuelang.org/go/cue"
	"cuelang.org/go/cue/errors"
	"cuelang.org/go/internal"
	"cuelang.org/go/internal/core/adt"
	"cuelang.org/go/internal/core/convert"
	"cuelang.org/go/internal/core/eval"
	"cuelang.org/go/internal/core/runtime"
)

var (
	// ErrAbort may be returned by a task to avoid processing downstream tasks.
	// This can be used by control nodes to influence execution.
	ErrAbort = errors.New("abort dependant tasks without failure")

	// TODO: ErrUpdate: update and run a dependency, but don't complete a
	// dependency as more results may come. This is useful in server mode.
)

// A TaskFunc creates a Runner for v if v defines a task or reports nil
// otherwise. It reports an error for illformed tasks.
type TaskFunc func(v cue.Value) (Runner, error)

// A Runner executes a Task.
type Runner interface {
	// Run runs a Task. If any of the tasks it depends on returned an error it
	// is passed to this task. It reports an error upon failure.
	//
	// Any results to be returned can be set by calling Fill on the passed task.
	//
	// TODO: what is a good contract for receiving and passing errors and abort.
	//
	// If for a returned error x errors.Is(x, ErrAbort), all dependant tasks
	// will not be run, without this being an error.
	Run(t *Task, err error) error
}

// A RunnerFunc runs a Task.
type RunnerFunc func(t *Task) error

func (f RunnerFunc) Run(t *Task, err error) error {
	return f(t)
}

// A Config defines options for interpreting an Instance as a Workflow.
type Config struct {
	// Root limits the search for tasks to be within the path indicated to root.
	// For the cue command, this is set to ["command"]. The default value is
	// for all tasks to be root.
	Root cue.Path

	// InferTasks allows tasks to be defined outside of the Root. Such tasks
	// will only be included in the workflow if any of its fields is referenced
	// by any of the tasks defined within Root.
	//
	// CAVEAT EMPTOR: this features is mostly provided for backwards
	// compatibility with v0.2. A problem with this approach is that it will
	// look for task structs within arbitrary data. So if not careful, there may
	// be spurious matches.
	InferTasks bool

	// IgnoreConcrete ignores references for which the values are already
	// concrete and cannot change.
	IgnoreConcrete bool

	// UpdateFunc is called whenever the information in the controller is
	// updated. This includes directly after initialization. The task may be
	// nil if this call is not the result of a task completing.
	UpdateFunc func(c *Controller, t *Task) error
}

// A Controller defines a set of Tasks to be executed.
type Controller struct {
	cfg    Config
	isTask TaskFunc

	inst        cue.Value
	valueSeqNum int64

	env *adt.Environment

	conjuncts   []adt.Conjunct
	conjunctSeq int64

	taskCh chan *Task

	opCtx      *adt.OpContext
	context    context.Context
	cancelFunc context.CancelFunc

	// keys maps task keys to their index. This allows a recreation of the
	// Instance while retaining the original task indices.
	//
	// TODO: do instance updating in place to allow for more efficient
	// processing.
	keys  map[string]*Task
	tasks []*Task

	// Only used during task initialization.
	nodes map[*adt.Vertex]*Task

	errs errors.Error
}

// Tasks reports the tasks that are currently registered with the controller.
//
// This may currently only be called before Run is called or from within
// a call to UpdateFunc. Task pointers returned by this call are not guaranteed
// to be the same between successive calls to this method.
func (c *Controller) Tasks() []*Task {
	return c.tasks
}

func (c *Controller) cancel() {
	if c.cancelFunc != nil {
		c.cancelFunc()
	}
}

func (c *Controller) addErr(err error, msg string) {
	c.errs = errors.Append(c.errs, errors.Promote(err, msg))
}

// New creates a Controller for a given Instance and TaskFunc.
func New(cfg *Config, inst *cue.Instance, f TaskFunc) *Controller {
	v := inst.Value()
	rx, nx := internal.CoreValue(v)
	ctx := eval.NewContext(rx.(*runtime.Runtime), nx.(*adt.Vertex))

	c := &Controller{
		isTask: f,
		inst:   v,
		opCtx:  ctx,

		taskCh: make(chan *Task),
		keys:   map[string]*Task{},
	}

	if cfg != nil {
		c.cfg = *cfg
	}

	c.initTasks()
	return c

}

// Run runs the tasks of a workflow until completion.
func (c *Controller) Run(ctx context.Context) error {
	c.context, c.cancelFunc = context.WithCancel(ctx)
	defer c.cancelFunc()

	c.runLoop()
	return c.errs
}

// A State indicates the state of a Task.
//
// The following state diagram indicates the possible state transitions:
//
//          Ready
//       ↗︎        ↘︎
//   Waiting  ←  Running
//       ↘︎        ↙︎
//       Terminated
//
// A Task may move from Waiting to Terminating if one of
// the tasks on which it dependends fails.
//
// NOTE: transitions from Running to Waiting are currently not supported. In
// the future this may be possible if a task depends on continuously running
// tasks that send updates.
//
type State int

const (
	// Waiting indicates a task is blocked on input from another task.
	//
	// NOTE: although this is currently not implemented, a task could
	// theoretically move from the Running to Waiting state.
	Waiting State = iota

	// Ready means a tasks is ready to run, but currently not running.
	Ready

	// Running indicates a goroutine is currently active for a task and that
	// it is not Waiting.
	Running

	// Terminated means a task has stopped running either because it terminated
	// while Running or was aborted by task on which it depends. The error
	// value of a Task indicates the reason for the termination.
	Terminated
)

var stateStrings = map[State]string{
	Waiting:    "Waiting",
	Ready:      "Ready",
	Running:    "Running",
	Terminated: "Terminated",
}

// String reports a human readable string of status s.
func (s State) String() string {
	return stateStrings[s]
}

// A Task contains the context for a single task execution.
type Task struct {
	// Static
	c    *Controller
	ctxt *adt.OpContext
	r    Runner

	index  int
	path   cue.Path
	key    string
	labels []adt.Feature

	// Dynamic
	update   adt.Expr
	deps     map[*Task]bool
	pathDeps map[string][]*Task

	conjunctSeq int64
	valueSeq    int64
	v           cue.Value
	err         errors.Error
	state       State
	depTasks    []*Task
}

// Context reports the Controller's Context.
func (t *Task) Context() context.Context {
	return t.c.context
}

// Path reports the path of Task within the Instance in which it is defined.
func (t *Task) Path() cue.Path {
	return t.path
}

// Index reports the sequence number of the Task. This will not change over
// time.
func (t *Task) Index() int {
	return t.index
}

func (t *Task) done() bool {
	return t.state > Running
}

func (t *Task) isReady() bool {
	for _, d := range t.depTasks {
		if !d.done() {
			return false
		}
	}
	return true
}

func (t *Task) vertex() *adt.Vertex {
	_, x := internal.CoreValue(t.v)
	return x.(*adt.Vertex)
}

func (t *Task) addDep(path string, dep *Task) {
	if dep == nil || dep == t {
		return
	}
	if t.deps == nil {
		t.deps = map[*Task]bool{}
		t.pathDeps = map[string][]*Task{}
	}

	// Add the dependencies for a given path to the controller. We could compute
	// this again later, but this ensures there will be no discrepancies.
	a := t.pathDeps[path]
	found := false
	for _, t := range a {
		if t == dep {
			found = true
			break
		}
	}
	if !found {
		t.pathDeps[path] = append(a, dep)

	}

	if !t.deps[dep] {
		t.deps[dep] = true
		t.depTasks = append(t.depTasks, dep)
	}
}

// Fill fills in values of the Controller's configuration for the current task.
// The changes take effect after the task completes.
//
// This method may currently only be called by the runner.
func (t *Task) Fill(x interface{}) error {
	expr := convert.GoValueToExpr(t.ctxt, true, x)
	if t.update == nil {
		t.update = expr
		return nil
	}
	t.update = &adt.BinaryExpr{
		Op: adt.AndOp,
		X:  t.update,
		Y:  expr,
	}
	return nil
}

// Value reports the latest value of this task.
//
// This method may currently only be called before Run is called or after a
// Task completed, or from within a call to UpdateFunc.
func (t *Task) Value() cue.Value {
	// TODO: synchronize
	return t.v
}

// Dependencies reports the Tasks t depends on.
//
// This method may currently only be called before Run is called or after a
// Task completed, or from within a call to UpdateFunc.
func (t *Task) Dependencies() []*Task {
	// TODO: add synchronization.
	return t.depTasks
}

// PathDependencies reports the dependencies found for a value at the given
// path.
//
// This may currently only be called before Run is called or from within
// a call to UpdateFunc.
func (t *Task) PathDependencies(p cue.Path) []*Task {
	return t.pathDeps[p.String()]
}

// Err returns the error of a completed Task.
//
// This method may currently only be called before Run is called, after a
// Task completed, or from within a call to UpdateFunc.
func (t *Task) Err() error {
	return t.err
}

// State is the current state of the Task.
//
// This method may currently only be called before Run is called or after a
// Task completed, or from within a call to UpdateFunc.
func (t *Task) State() State {
	return t.state
}
