cmd/cue/cmd: streaming instance iteration

This provides an abstraction for instance generation
allowing more consistent behavior between modes.

Steaming iteration also is necessary to incrementally
process a large number of data files.

This allows data files to be used in eval and export.

Change-Id: I938f9b4380bd00af3f2671249f9e8ef3945c76cc
Reviewed-on: https://cue-review.googlesource.com/c/cue/+/5094
Reviewed-by: Marcel van Lohuizen <mpvl@golang.org>
diff --git a/cmd/cue/cmd/common.go b/cmd/cue/cmd/common.go
index 4471014..b1abf47 100644
--- a/cmd/cue/cmd/common.go
+++ b/cmd/cue/cmd/common.go
@@ -16,6 +16,7 @@
 
 import (
 	"bytes"
+	"fmt"
 	"io"
 	"os"
 	"path/filepath"
@@ -33,6 +34,7 @@
 	"cuelang.org/go/cue/load"
 	"cuelang.org/go/cue/parser"
 	"cuelang.org/go/cue/token"
+	"cuelang.org/go/internal"
 	"cuelang.org/go/internal/encoding"
 )
 
@@ -137,11 +139,162 @@
 	merge     []*build.Instance
 }
 
-func (b *buildPlan) instances() []*cue.Instance {
-	if len(b.insts) == 0 {
-		return nil
+// instances iterates either over a list of instances, or a list of
+// data files. In the latter case, there must be either 0 or 1 other
+// instance, with which the data instance may be merged.
+func (b *buildPlan) instances() iterator {
+	if len(b.orphanedData) == 0 && len(b.orphanedSchema) == 0 {
+		return &instanceIterator{a: buildInstances(b.cmd, b.insts), i: -1}
 	}
-	return buildInstances(b.cmd, b.insts)
+	return newStreamingIterator(b)
+}
+
+type iterator interface {
+	scan() bool
+	instance() *cue.Instance
+	file() *ast.File // may return nil
+	err() error
+	close()
+	id() string
+}
+
+type instanceIterator struct {
+	a []*cue.Instance
+	i int
+	e error
+}
+
+func (i *instanceIterator) scan() bool {
+	i.i++
+	return i.i < len(i.a) && i.e == nil
+}
+
+func (i *instanceIterator) close()                  {}
+func (i *instanceIterator) err() error              { return i.e }
+func (i *instanceIterator) instance() *cue.Instance { return i.a[i.i] }
+func (i *instanceIterator) file() *ast.File         { return nil }
+func (i *instanceIterator) id() string              { return i.a[i.i].Dir }
+
+type streamingIterator struct {
+	r    *cue.Runtime
+	inst *cue.Instance
+	base cue.Value
+	b    *buildPlan
+	cfg  *encoding.Config
+	a    []*build.File
+	dec  *encoding.Decoder
+	i    *cue.Instance
+	f    *ast.File
+	e    error
+}
+
+func newStreamingIterator(b *buildPlan) *streamingIterator {
+	i := &streamingIterator{
+		cfg: b.encConfig,
+		a:   b.orphanedData,
+		b:   b,
+	}
+
+	// TODO: use orphanedSchema
+	switch len(b.insts) {
+	case 0:
+		i.r = &cue.Runtime{}
+	case 1:
+		p := b.insts[0]
+		inst := buildInstances(b.cmd, []*build.Instance{p})[0]
+		if inst.Err != nil {
+			return &streamingIterator{e: inst.Err}
+		}
+		i.r = internal.GetRuntime(inst).(*cue.Runtime)
+		if b.schema == nil {
+			i.base = inst.Value()
+		} else {
+			i.base = inst.Eval(b.schema)
+			if err := i.base.Err(); err != nil {
+				return &streamingIterator{e: err}
+			}
+		}
+	default:
+		return &streamingIterator{e: errors.Newf(token.NoPos,
+			"cannot combine data streaming with multiple instances")}
+	}
+
+	return i
+}
+
+func (i *streamingIterator) file() *ast.File         { return i.f }
+func (i *streamingIterator) instance() *cue.Instance { return i.i }
+
+func (i *streamingIterator) id() string {
+	if i.inst != nil {
+		return i.inst.Dir
+	}
+	return ""
+}
+
+func (i *streamingIterator) scan() bool {
+	if i.e != nil {
+		return false
+	}
+
+	// advance to next value
+	if i.dec != nil && !i.dec.Done() {
+		i.dec.Next()
+	}
+
+	// advance to next stream if necessary
+	for i.dec == nil || i.dec.Done() {
+		if i.dec != nil {
+			i.dec.Close()
+			i.dec = nil
+		}
+		if len(i.a) == 0 {
+			return false
+		}
+
+		i.dec = encoding.NewDecoder(i.a[0], i.cfg)
+		if i.e = i.dec.Err(); i.e != nil {
+			return false
+		}
+		i.a = i.a[1:]
+	}
+
+	// compose value
+	i.f = i.dec.File()
+	inst, err := i.r.CompileFile(i.f)
+	if err != nil {
+		i.e = err
+		return false
+	}
+	i.i = inst
+	if i.base.Exists() {
+		i.e = i.base.Err()
+		if i.e == nil {
+			i.i, i.e = i.i.Fill(i.base)
+			i.i.DisplayName = internal.DebugStr(i.b.schema)
+			if inst.DisplayName != "" {
+				i.i.DisplayName = fmt.Sprintf("%s|%s", inst.DisplayName, i.i.DisplayName)
+			}
+		}
+		i.f = nil
+	}
+	return i.e == nil
+}
+
+func (i *streamingIterator) close() {
+	if i.dec != nil {
+		i.dec.Close()
+		i.dec = nil
+	}
+}
+
+func (i *streamingIterator) err() error {
+	if i.dec != nil {
+		if err := i.dec.Err(); err != nil {
+			return err
+		}
+	}
+	return i.e
 }
 
 func parseArgs(cmd *Command, args []string, cfg *load.Config) (p *buildPlan, err error) {
@@ -234,21 +387,6 @@
 	return nil
 }
 
-func (b *buildPlan) singleInstance() *cue.Instance {
-	var p *build.Instance
-	switch len(b.insts) {
-	case 0:
-		return nil
-	case 1:
-		p = b.insts[0]
-	default:
-		exitOnErr(b.cmd, errors.Newf(token.NoPos,
-			"cannot combine data streaming with multiple instances"), true)
-		return nil
-	}
-	return buildInstances(b.cmd, []*build.Instance{p})[0]
-}
-
 func buildInstances(cmd *Command, binst []*build.Instance) []*cue.Instance {
 	// TODO:
 	// If there are no files and User is true, then use those?
diff --git a/cmd/cue/cmd/eval.go b/cmd/cue/cmd/eval.go
index 62a71ed..8b1ea7c 100644
--- a/cmd/cue/cmd/eval.go
+++ b/cmd/cue/cmd/eval.go
@@ -23,6 +23,7 @@
 	"cuelang.org/go/cue"
 	"cuelang.org/go/cue/ast"
 	"cuelang.org/go/cue/format"
+	"cuelang.org/go/internal"
 )
 
 // newEvalCmd creates a new eval command
@@ -97,12 +98,17 @@
 		}
 	}
 
-	instances := b.instances()
-	for _, inst := range instances {
+	iter := b.instances()
+	defer iter.close()
+	for iter.scan() {
+		inst := iter.instance().Value()
+
 		// TODO: use ImportPath or some other sanitized path.
-		if len(instances) > 1 {
-			fmt.Fprintf(w, "\n// %s\n", inst.Dir)
+		if len(b.insts) > 1 {
+			fmt.Fprintf(w, "\n// %s\n", iter.id())
 		}
+		v := iter.instance().Value()
+
 		syn := []cue.Option{
 			cue.Final(), // for backwards compatibility
 			cue.Definitions(true),
@@ -124,10 +130,10 @@
 		}
 
 		if b.expressions == nil {
-			v := inst.Value()
+			v := v
 			if flagConcrete.Bool(cmd) && !flagIgnore.Bool(cmd) {
 				if err := v.Validate(cue.Concrete(true)); err != nil {
-					exitIfErr(cmd, inst, err, false)
+					exitOnErr(cmd, err, false)
 					continue
 				}
 			}
@@ -138,19 +144,20 @@
 				fmt.Fprint(w, "// ")
 				writeNode(format.Node(e))
 			}
-			v := inst.Eval(e)
+			v := internal.EvalExpr(inst, e).(cue.Value)
 			if err := v.Err(); err != nil {
 				return err
 			}
 			if flagConcrete.Bool(cmd) && !flagIgnore.Bool(cmd) {
 				if err := v.Validate(cue.Concrete(true)); err != nil {
-					exitIfErr(cmd, inst, err, false)
+					exitOnErr(cmd, err, false)
 					continue
 				}
 			}
 			writeNode(format.Node(getSyntax(v, syn), opts...))
 		}
 	}
+	exitOnErr(cmd, iter.err(), true)
 	return nil
 }
 
diff --git a/cmd/cue/cmd/export.go b/cmd/cue/cmd/export.go
index 385e482..194a2da 100644
--- a/cmd/cue/cmd/export.go
+++ b/cmd/cue/cmd/export.go
@@ -17,6 +17,8 @@
 import (
 	"github.com/spf13/cobra"
 
+	"cuelang.org/go/cue"
+	"cuelang.org/go/internal"
 	"cuelang.org/go/internal/encoding"
 	"cuelang.org/go/internal/filetypes"
 )
@@ -115,18 +117,23 @@
 	exitOnErr(cmd, err, true)
 	defer enc.Close()
 
-	for _, inst := range b.instances() {
+	iter := b.instances()
+	defer iter.close()
+	for iter.scan() {
+		inst := iter.instance()
+
 		if b.expressions == nil {
-			err = enc.Encode(inst.Value())
+			err = enc.Encode(inst)
 			exitOnErr(cmd, err, true)
 			continue
 		}
 		for _, e := range b.expressions {
-			v := inst.Eval(e)
-			exitOnErr(cmd, v.Err(), true)
+			v := internal.MakeInstance(inst.Eval(e)).(*cue.Instance)
+			exitOnErr(cmd, v.Err, true)
 			err = enc.Encode(v)
 			exitOnErr(cmd, err, true)
 		}
 	}
+	exitOnErr(cmd, iter.err(), true)
 	return nil
 }
diff --git a/cmd/cue/cmd/script_test.go b/cmd/cue/cmd/script_test.go
index 9cbf6f7..736271b 100644
--- a/cmd/cue/cmd/script_test.go
+++ b/cmd/cue/cmd/script_test.go
@@ -93,7 +93,7 @@
 
 	cwd, err := os.Getwd()
 	check(err)
-	defer os.Chdir(cwd)
+	defer func() { _ = os.Chdir(cwd) }()
 	_ = os.Chdir(tmpdir)
 
 	for s := bufio.NewScanner(bytes.NewReader(a.Comment)); s.Scan(); {
diff --git a/cmd/cue/cmd/testdata/script/vet_altdata.txt b/cmd/cue/cmd/testdata/script/vet_altdata.txt
index 4db9322..b6a3617 100644
--- a/cmd/cue/cmd/testdata/script/vet_altdata.txt
+++ b/cmd/cue/cmd/testdata/script/vet_altdata.txt
@@ -1,8 +1,20 @@
 cue vet schema.cue json: foo.data
 ! stderr .
 
+cue export schema.cue json: foo.data
+cmp stdout export-stdout
+
 -- schema.cue --
 [string]: string
 
 -- foo.data --
 { "a": "b" }
+{ "c": "d" }
+
+-- export-stdout --
+{
+    "a": "b"
+}
+{
+    "c": "d"
+}
diff --git a/cmd/cue/cmd/testdata/script/vet_data.txt b/cmd/cue/cmd/testdata/script/vet_data.txt
index 0bf809d..7831007 100644
--- a/cmd/cue/cmd/testdata/script/vet_data.txt
+++ b/cmd/cue/cmd/testdata/script/vet_data.txt
@@ -1,5 +1,8 @@
 ! cue vet schema.cue data.yaml
-cmp stderr expect-stderr
+cmp stderr vet-stderr
+
+! cue export schema.cue data.yaml
+cmp stderr export-stderr
 
 -- schema.cue --
 Language :: {
@@ -17,10 +20,14 @@
   - tag: no
     name: Norwegian
 
--- expect-stderr --
+-- vet-stderr --
 languages.2.tag: conflicting values string and false (mismatched types string and bool):
     ./data.yaml:6:11
     ./schema.cue:2:8
 languages.1.name: invalid value "dutch" (does not match =~"^\\p{Lu}"):
     ./schema.cue:3:8
     ./data.yaml:5:12
+-- export-stderr --
+languages.1.name: invalid value "dutch" (does not match =~"^\\p{Lu}"):
+    ./schema.cue:3:8
+    ./data.yaml:5:12
diff --git a/cmd/cue/cmd/vet.go b/cmd/cue/cmd/vet.go
index f45b123..8d80a50 100644
--- a/cmd/cue/cmd/vet.go
+++ b/cmd/cue/cmd/vet.go
@@ -20,8 +20,6 @@
 
 	"cuelang.org/go/cue"
 	"cuelang.org/go/cue/errors"
-	"cuelang.org/go/internal"
-	"cuelang.org/go/internal/encoding"
 )
 
 const vetDoc = `vet validates CUE and other data files
@@ -107,7 +105,10 @@
 
 	shown := false
 
-	for _, inst := range b.instances() {
+	iter := b.instances()
+	defer iter.close()
+	for iter.scan() {
+		inst := iter.instance()
 		// TODO: use ImportPath or some other sanitized path.
 
 		concrete := true
@@ -134,46 +135,27 @@
 					"some instances are incomplete; use the -c flag to show errors or suppress this message")
 			}
 		}
-		exitIfErr(cmd, inst, err, false)
+		exitOnErr(cmd, err, false)
 	}
+	exitOnErr(cmd, iter.err(), true)
 	return nil
 }
 
 func vetFiles(cmd *Command, b *buildPlan) {
 	// Use -r type root, instead of -e
-	expr := flagSchema.String(cmd)
 
-	var check cue.Value
-
-	inst := b.singleInstance()
-	if inst == nil {
+	if len(b.insts) == 0 {
 		exitOnErr(cmd, errors.New("data files specified without a schema"), true)
 	}
 
-	if expr == "" {
-		check = inst.Value()
-	} else {
-		check = inst.Eval(b.schema)
-		exitIfErr(cmd, inst, check.Err(), true)
-	}
+	iter := b.instances()
+	defer iter.close()
+	for iter.scan() {
+		v := iter.instance().Value()
 
-	r := internal.GetRuntime(inst).(*cue.Runtime)
-
-	for _, f := range b.orphanedData {
-		i := encoding.NewDecoder(f, b.encConfig)
-		defer i.Close()
-		for ; !i.Done(); i.Next() {
-			body, err := r.CompileExpr(i.Expr())
-			exitIfErr(cmd, inst, err, true)
-			v := body.Value().Unify(check)
-			if err := v.Err(); err != nil {
-				exitIfErr(cmd, inst, err, false)
-			} else {
-				// Always concrete when checking against concrete files.
-				err = v.Validate(cue.Concrete(true))
-				exitIfErr(cmd, inst, err, false)
-			}
-		}
-		exitIfErr(cmd, inst, i.Err(), false)
+		// Always concrete when checking against concrete files.
+		err := v.Validate(cue.Concrete(true))
+		exitOnErr(cmd, err, false)
 	}
+	exitOnErr(cmd, iter.err(), false)
 }
diff --git a/cue/ast_test.go b/cue/ast_test.go
index bb2d77e..dfd8359 100644
--- a/cue/ast_test.go
+++ b/cue/ast_test.go
@@ -515,7 +515,7 @@
 			if err != nil {
 				t.Fatal(err)
 			}
-			evaluated := inst.evalExpr(ctx, expr)
+			evaluated := evalExpr(ctx, inst.eval(ctx), expr)
 			v := testResolve(ctx, evaluated, evalFull)
 			if got := debugStr(ctx, v); got != tc.out {
 				t.Errorf("output differs:\ngot  %q\nwant %q", got, tc.out)
diff --git a/cue/builtin.go b/cue/builtin.go
index 5847d6a..098a5b2 100644
--- a/cue/builtin.go
+++ b/cue/builtin.go
@@ -91,7 +91,7 @@
 		if err != nil {
 			panic(fmt.Errorf("could not parse %v: %v", p.cue, err))
 		}
-		pkg := evalExpr(ctx.index, obj, expr).(*structLit)
+		pkg := evalExpr(ctx, obj, expr).(*structLit)
 		for _, a := range pkg.arcs {
 			// Discard option status and attributes at top level.
 			// TODO: filter on capitalized fields?
diff --git a/cue/instance.go b/cue/instance.go
index ae0cd3c..3abd5fc 100644
--- a/cue/instance.go
+++ b/cue/instance.go
@@ -147,34 +147,19 @@
 		v := value.(Value)
 		e := expr.(ast.Expr)
 		ctx := v.idx.newContext()
-		return newValueRoot(ctx, evalExpr(v.idx, v.eval(ctx), e))
+		return newValueRoot(ctx, evalExpr(ctx, v.eval(ctx), e))
 	}
 }
 
-func evalExpr(idx *index, x value, expr ast.Expr) evaluated {
+func evalExpr(ctx *context, x value, expr ast.Expr) evaluated {
 	if isBottom(x) {
-		return idx.mkErr(x, "error evaluating instance: %v", x)
+		return ctx.mkErr(x, "error evaluating instance: %v", x)
 	}
 	obj, ok := x.(*structLit)
 	if !ok {
-		return idx.mkErr(obj, "instance is not a struct")
+		return ctx.mkErr(x, "instance is not a struct, found %s", x.kind())
 	}
-
-	v := newVisitor(idx, nil, nil, obj, true)
-	return eval(idx, v.walk(expr))
-}
-
-func (inst *Instance) evalExpr(ctx *context, expr ast.Expr) evaluated {
-	root := inst.eval(ctx)
-	if isBottom(root) {
-		return ctx.mkErr(root, "error evaluating instance")
-	}
-	obj, ok := root.(*structLit)
-	if !ok {
-		return ctx.mkErr(root, "instance is not a struct, found %s",
-			root.kind())
-	}
-	v := newVisitor(ctx.index, inst.inst, nil, obj, true)
+	v := newVisitor(ctx.index, nil, nil, obj, true)
 	return v.walk(expr).evalPartial(ctx)
 }
 
@@ -215,7 +200,7 @@
 // Expressions may refer to builtin packages if they can be uniquely identified.
 func (inst *Instance) Eval(expr ast.Expr) Value {
 	ctx := inst.newContext()
-	result := inst.evalExpr(ctx, expr)
+	result := evalExpr(ctx, inst.eval(ctx), expr)
 	return newValueRoot(ctx, result)
 }
 
diff --git a/cue/types.go b/cue/types.go
index 0f8da53..b4398b9 100644
--- a/cue/types.go
+++ b/cue/types.go
@@ -1359,6 +1359,18 @@
 	return f, err
 }
 
+// TODO: expose this API?
+//
+// // EvalExpr evaluates an expression within the scope of v, which must be
+// // a struct.
+// //
+// // Expressions may refer to builtin packages if they can be uniquely identified.
+// func (v Value) EvalExpr(expr ast.Expr) Value {
+// 	ctx := v.ctx()
+// 	result := evalExpr(ctx, v.eval(ctx), expr)
+// 	return newValueRoot(ctx, result)
+// }
+
 // Fill creates a new value by unifying v with the value of x at the given path.
 //
 // Values may be any Go value that can be converted to CUE, an ast.Expr or
diff --git a/internal/encoding/encoder.go b/internal/encoding/encoder.go
index 18d577e..5f7c8f2 100644
--- a/internal/encoding/encoder.go
+++ b/internal/encoding/encoder.go
@@ -36,7 +36,7 @@
 type Encoder struct {
 	cfg          *Config
 	closer       io.Closer
-	interpret    func(cue.Value) (*ast.File, error)
+	interpret    func(*cue.Instance) (*ast.File, error)
 	encFile      func(*ast.File) error
 	encValue     func(cue.Value) error
 	autoSimplify bool
@@ -184,26 +184,22 @@
 	return e.encodeFile(f, e.interpret)
 }
 
-func (e *Encoder) EncodeExpr(x ast.Expr) error {
-	return e.EncodeFile(toFile(x))
-}
-
-func (e *Encoder) Encode(v cue.Value) error {
+func (e *Encoder) Encode(inst *cue.Instance) error {
 	e.autoSimplify = true
 	if e.interpret != nil {
-		f, err := e.interpret(v)
+		f, err := e.interpret(inst)
 		if err != nil {
 			return err
 		}
 		return e.encodeFile(f, nil)
 	}
 	if e.encValue != nil {
-		return e.encValue(v)
+		return e.encValue(inst.Value())
 	}
-	return e.encFile(valueToFile(v))
+	return e.encFile(valueToFile(inst.Value()))
 }
 
-func (e *Encoder) encodeFile(f *ast.File, interpret func(cue.Value) (*ast.File, error)) error {
+func (e *Encoder) encodeFile(f *ast.File, interpret func(*cue.Instance) (*ast.File, error)) error {
 	if interpret == nil && e.encFile != nil {
 		return e.encFile(f)
 	}
@@ -213,7 +209,7 @@
 		return err
 	}
 	if interpret != nil {
-		return e.Encode(inst.Value())
+		return e.Encode(inst)
 	}
 	return e.encValue(inst.Value())
 }