2021-02-08 20:47:07 +01:00
|
|
|
package dagger
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"encoding/json"
|
|
|
|
"fmt"
|
|
|
|
|
|
|
|
"github.com/moby/buildkit/client/llb"
|
|
|
|
"github.com/rs/zerolog/log"
|
|
|
|
"gopkg.in/yaml.v3"
|
|
|
|
|
2021-02-17 22:13:39 +01:00
|
|
|
"dagger.io/go/dagger/compiler"
|
2021-02-08 20:47:07 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
// An execution pipeline
|
|
|
|
type Pipeline struct {
|
|
|
|
s Solver
|
|
|
|
fs FS
|
|
|
|
out *Fillable
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewPipeline(s Solver, out *Fillable) *Pipeline {
|
|
|
|
return &Pipeline{
|
|
|
|
s: s,
|
|
|
|
fs: s.Scratch(),
|
|
|
|
out: out,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *Pipeline) FS() FS {
|
|
|
|
return p.fs
|
|
|
|
}
|
|
|
|
|
2021-02-19 23:04:40 +01:00
|
|
|
func isComponent(v *compiler.Value) bool {
|
|
|
|
return v.Get("#dagger.compute").Exists()
|
|
|
|
}
|
|
|
|
|
2021-02-17 03:31:03 +01:00
|
|
|
func ops(code ...*compiler.Value) ([]*compiler.Value, error) {
|
|
|
|
ops := []*compiler.Value{}
|
2021-02-08 20:47:07 +01:00
|
|
|
// 1. Decode 'code' into a single flat array of operations.
|
|
|
|
for _, x := range code {
|
|
|
|
// 1. attachment array
|
2021-02-19 23:04:40 +01:00
|
|
|
if isComponent(x) {
|
|
|
|
xops, err := x.Get("#dagger.compute").List()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2021-02-08 20:47:07 +01:00
|
|
|
// 'from' has an executable attached
|
|
|
|
ops = append(ops, xops...)
|
2021-02-19 23:04:40 +01:00
|
|
|
// 2. individual op
|
|
|
|
} else if _, err := x.Get("do").String(); err == nil {
|
2021-02-08 20:47:07 +01:00
|
|
|
ops = append(ops, x)
|
2021-02-19 23:04:40 +01:00
|
|
|
// 3. op array
|
|
|
|
} else if xops, err := x.List(); err == nil {
|
2021-02-08 20:47:07 +01:00
|
|
|
ops = append(ops, xops...)
|
2021-02-19 23:04:40 +01:00
|
|
|
} else {
|
|
|
|
// 4. error
|
|
|
|
return nil, fmt.Errorf("not executable: %s", x.SourceUnsafe())
|
2021-02-08 20:47:07 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
return ops, nil
|
|
|
|
}
|
|
|
|
|
2021-02-17 03:31:03 +01:00
|
|
|
func Analyze(fn func(*compiler.Value) error, code ...*compiler.Value) error {
|
2021-02-08 20:47:07 +01:00
|
|
|
ops, err := ops(code...)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
for _, op := range ops {
|
|
|
|
if err := analyzeOp(fn, op); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-02-17 03:31:03 +01:00
|
|
|
func analyzeOp(fn func(*compiler.Value) error, op *compiler.Value) error {
|
2021-02-08 20:47:07 +01:00
|
|
|
if err := fn(op); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
do, err := op.Get("do").String()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
switch do {
|
|
|
|
case "load", "copy":
|
|
|
|
return Analyze(fn, op.Get("from"))
|
|
|
|
case "exec":
|
2021-02-17 03:31:03 +01:00
|
|
|
return op.Get("mount").RangeStruct(func(dest string, mnt *compiler.Value) error {
|
2021-02-08 20:47:07 +01:00
|
|
|
if from := mnt.Get("from"); from.Exists() {
|
|
|
|
return Analyze(fn, from)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// x may be:
|
|
|
|
// 1) a single operation
|
|
|
|
// 2) an array of operations
|
|
|
|
// 3) a value with an attached array of operations
|
2021-02-17 03:31:03 +01:00
|
|
|
func (p *Pipeline) Do(ctx context.Context, code ...*compiler.Value) error {
|
2021-02-08 20:47:07 +01:00
|
|
|
ops, err := ops(code...)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
// 2. Execute each operation in sequence
|
|
|
|
for idx, op := range ops {
|
|
|
|
// If op not concrete, interrupt without error.
|
|
|
|
// This allows gradual resolution:
|
|
|
|
// compute what you can compute.. leave the rest incomplete.
|
|
|
|
if err := op.IsConcreteR(); err != nil {
|
|
|
|
log.
|
|
|
|
Ctx(ctx).
|
2021-02-17 02:54:43 +01:00
|
|
|
Warn().
|
2021-02-08 20:47:07 +01:00
|
|
|
Str("original_cue_error", err.Error()).
|
|
|
|
Int("op", idx).
|
2021-02-17 02:54:43 +01:00
|
|
|
Msg("pipeline was partially executed because of missing inputs")
|
|
|
|
return nil
|
2021-02-08 20:47:07 +01:00
|
|
|
}
|
|
|
|
if err := p.doOp(ctx, op); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
// Force a buildkit solve request at each operation,
|
|
|
|
// so that errors map to the correct cue path.
|
|
|
|
// FIXME: might as well change FS to make every operation
|
|
|
|
// synchronous.
|
|
|
|
fs, err := p.fs.Solve(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
p.fs = fs
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-02-17 03:31:03 +01:00
|
|
|
func (p *Pipeline) doOp(ctx context.Context, op *compiler.Value) error {
|
2021-02-08 20:47:07 +01:00
|
|
|
do, err := op.Get("do").String()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
switch do {
|
|
|
|
case "copy":
|
|
|
|
return p.Copy(ctx, op)
|
|
|
|
case "exec":
|
|
|
|
return p.Exec(ctx, op)
|
|
|
|
case "export":
|
|
|
|
return p.Export(ctx, op)
|
|
|
|
case "fetch-container":
|
|
|
|
return p.FetchContainer(ctx, op)
|
|
|
|
case "fetch-git":
|
|
|
|
return p.FetchGit(ctx, op)
|
|
|
|
case "local":
|
|
|
|
return p.Local(ctx, op)
|
|
|
|
case "load":
|
|
|
|
return p.Load(ctx, op)
|
|
|
|
case "subdir":
|
|
|
|
return p.Subdir(ctx, op)
|
|
|
|
default:
|
|
|
|
return fmt.Errorf("invalid operation: %s", op.JSON())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Spawn a temporary pipeline with the same solver.
|
|
|
|
// Output values are discarded: the parent pipeline's values are not modified.
|
|
|
|
func (p *Pipeline) Tmp() *Pipeline {
|
|
|
|
return NewPipeline(p.s, nil)
|
|
|
|
}
|
|
|
|
|
2021-02-17 03:31:03 +01:00
|
|
|
func (p *Pipeline) Subdir(ctx context.Context, op *compiler.Value) error {
|
2021-02-08 20:47:07 +01:00
|
|
|
// FIXME: this could be more optimized by carrying subdir path as metadata,
|
|
|
|
// and using it in copy, load or mount.
|
|
|
|
|
|
|
|
dir, err := op.Get("dir").String()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
p.fs = p.fs.Change(func(st llb.State) llb.State {
|
|
|
|
return st.File(llb.Copy(
|
|
|
|
p.fs.LLB(),
|
|
|
|
dir,
|
|
|
|
"/",
|
|
|
|
&llb.CopyInfo{
|
|
|
|
CopyDirContentsOnly: true,
|
|
|
|
},
|
|
|
|
))
|
|
|
|
})
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-02-17 03:31:03 +01:00
|
|
|
func (p *Pipeline) Copy(ctx context.Context, op *compiler.Value) error {
|
2021-02-08 20:47:07 +01:00
|
|
|
// Decode copy options
|
|
|
|
src, err := op.Get("src").String()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
dest, err := op.Get("dest").String()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
// Execute 'from' in a tmp pipeline, and use the resulting fs
|
|
|
|
from := p.Tmp()
|
|
|
|
if err := from.Do(ctx, op.Get("from")); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
p.fs = p.fs.Change(func(st llb.State) llb.State {
|
|
|
|
return st.File(llb.Copy(
|
|
|
|
from.FS().LLB(),
|
|
|
|
src,
|
|
|
|
dest,
|
|
|
|
// FIXME: allow more configurable llb options
|
|
|
|
// For now we define the following convenience presets:
|
|
|
|
&llb.CopyInfo{
|
|
|
|
CopyDirContentsOnly: true,
|
|
|
|
CreateDestPath: true,
|
|
|
|
AllowWildcard: true,
|
|
|
|
},
|
|
|
|
))
|
|
|
|
})
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-02-17 03:31:03 +01:00
|
|
|
func (p *Pipeline) Local(ctx context.Context, op *compiler.Value) error {
|
2021-02-08 20:47:07 +01:00
|
|
|
dir, err := op.Get("dir").String()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
var include []string
|
2021-02-18 02:19:49 +01:00
|
|
|
if inc := op.Get("include"); inc.Exists() {
|
|
|
|
if err := inc.Decode(&include); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2021-02-08 20:47:07 +01:00
|
|
|
}
|
2021-02-18 02:19:49 +01:00
|
|
|
|
2021-02-12 23:41:55 +01:00
|
|
|
p.fs = p.fs.Change(func(st llb.State) llb.State {
|
|
|
|
return st.File(llb.Copy(
|
|
|
|
llb.Local(dir, llb.FollowPaths(include)),
|
|
|
|
"/",
|
|
|
|
"/",
|
|
|
|
))
|
|
|
|
})
|
2021-02-08 20:47:07 +01:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-02-17 03:31:03 +01:00
|
|
|
func (p *Pipeline) Exec(ctx context.Context, op *compiler.Value) error {
|
2021-02-08 20:47:07 +01:00
|
|
|
opts := []llb.RunOption{}
|
|
|
|
var cmd struct {
|
|
|
|
Args []string
|
|
|
|
Env map[string]string
|
|
|
|
Dir string
|
|
|
|
Always bool
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := op.Decode(&cmd); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
// marker for status events
|
|
|
|
// FIXME
|
|
|
|
opts = append(opts, llb.WithCustomName(op.Path().String()))
|
|
|
|
// args
|
|
|
|
opts = append(opts, llb.Args(cmd.Args))
|
|
|
|
// dir
|
|
|
|
opts = append(opts, llb.Dir(cmd.Dir))
|
|
|
|
// env
|
|
|
|
for k, v := range cmd.Env {
|
|
|
|
opts = append(opts, llb.AddEnv(k, v))
|
|
|
|
}
|
|
|
|
// always?
|
|
|
|
// FIXME: initialize once for an entire compute job, to avoid cache misses
|
|
|
|
if cmd.Always {
|
|
|
|
cacheBuster, err := randomID(8)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
opts = append(opts, llb.AddEnv("DAGGER_CACHEBUSTER", cacheBuster))
|
|
|
|
}
|
|
|
|
// mounts
|
|
|
|
if mounts := op.Lookup("mount"); mounts.Exists() {
|
|
|
|
mntOpts, err := p.mountAll(ctx, mounts)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
opts = append(opts, mntOpts...)
|
|
|
|
}
|
|
|
|
// --> Execute
|
|
|
|
p.fs = p.fs.Change(func(st llb.State) llb.State {
|
|
|
|
return st.Run(opts...).Root()
|
|
|
|
})
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-02-17 03:31:03 +01:00
|
|
|
func (p *Pipeline) mountAll(ctx context.Context, mounts *compiler.Value) ([]llb.RunOption, error) {
|
2021-02-08 20:47:07 +01:00
|
|
|
opts := []llb.RunOption{}
|
2021-02-17 03:31:03 +01:00
|
|
|
err := mounts.RangeStruct(func(dest string, mnt *compiler.Value) error {
|
2021-02-08 20:47:07 +01:00
|
|
|
o, err := p.mount(ctx, dest, mnt)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
opts = append(opts, o)
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
return opts, err
|
|
|
|
}
|
|
|
|
|
2021-02-17 03:31:03 +01:00
|
|
|
func (p *Pipeline) mount(ctx context.Context, dest string, mnt *compiler.Value) (llb.RunOption, error) {
|
2021-02-08 20:47:07 +01:00
|
|
|
if s, err := mnt.String(); err == nil {
|
|
|
|
// eg. mount: "/foo": "cache"
|
|
|
|
switch s {
|
|
|
|
case "cache":
|
|
|
|
return llb.AddMount(
|
|
|
|
dest,
|
|
|
|
llb.Scratch(),
|
|
|
|
llb.AsPersistentCacheDir(
|
|
|
|
mnt.Path().String(),
|
|
|
|
llb.CacheMountShared,
|
|
|
|
),
|
|
|
|
), nil
|
|
|
|
case "tmp":
|
|
|
|
return llb.AddMount(
|
|
|
|
dest,
|
|
|
|
llb.Scratch(),
|
|
|
|
llb.Tmpfs(),
|
|
|
|
), nil
|
|
|
|
default:
|
|
|
|
return nil, fmt.Errorf("invalid mount source: %q", s)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// eg. mount: "/foo": { from: www.source }
|
|
|
|
from := p.Tmp()
|
|
|
|
if err := from.Do(ctx, mnt.Get("from")); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
// possibly construct mount options for LLB from
|
|
|
|
var mo []llb.MountOption
|
|
|
|
// handle "path" option
|
|
|
|
if mp := mnt.Lookup("path"); mp.Exists() {
|
|
|
|
mps, err := mp.String()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
mo = append(mo, llb.SourcePath(mps))
|
|
|
|
}
|
|
|
|
return llb.AddMount(dest, from.FS().LLB(), mo...), nil
|
|
|
|
}
|
|
|
|
|
2021-02-17 03:31:03 +01:00
|
|
|
func (p *Pipeline) Export(ctx context.Context, op *compiler.Value) error {
|
2021-02-08 20:47:07 +01:00
|
|
|
source, err := op.Get("source").String()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
format, err := op.Get("format").String()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
contents, err := p.fs.ReadFile(ctx, source)
|
|
|
|
if err != nil {
|
2021-02-17 05:13:51 +01:00
|
|
|
return fmt.Errorf("export %s: %w", source, err)
|
2021-02-08 20:47:07 +01:00
|
|
|
}
|
|
|
|
switch format {
|
|
|
|
case "string":
|
|
|
|
log.
|
|
|
|
Ctx(ctx).
|
|
|
|
Debug().
|
|
|
|
Bytes("contents", contents).
|
|
|
|
Msg("exporting string")
|
|
|
|
|
|
|
|
if err := p.out.Fill(string(contents)); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
case "json":
|
|
|
|
var o interface{}
|
|
|
|
o, err := unmarshalAnything(contents, json.Unmarshal)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
log.
|
|
|
|
Ctx(ctx).
|
|
|
|
Debug().
|
|
|
|
Interface("contents", o).
|
|
|
|
Msg("exporting json")
|
|
|
|
|
|
|
|
if err := p.out.Fill(o); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
case "yaml":
|
|
|
|
var o interface{}
|
|
|
|
o, err := unmarshalAnything(contents, yaml.Unmarshal)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
log.
|
|
|
|
Ctx(ctx).
|
|
|
|
Debug().
|
|
|
|
Interface("contents", o).
|
|
|
|
Msg("exporting yaml")
|
|
|
|
|
|
|
|
if err := p.out.Fill(o); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
default:
|
|
|
|
return fmt.Errorf("unsupported export format: %q", format)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
type unmarshaller func([]byte, interface{}) error
|
|
|
|
|
|
|
|
func unmarshalAnything(data []byte, fn unmarshaller) (interface{}, error) {
|
|
|
|
// unmarshalling a map into interface{} yields an error:
|
|
|
|
// "unsupported Go type for map key (interface {})"
|
|
|
|
// we want to attempt to unmarshal to a map[string]interface{} first
|
|
|
|
var oMap map[string]interface{}
|
|
|
|
if err := fn(data, &oMap); err == nil {
|
|
|
|
return oMap, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// If the previous attempt didn't work, we might be facing a scalar (e.g.
|
|
|
|
// bool).
|
|
|
|
// Try to unmarshal to interface{} directly.
|
|
|
|
var o interface{}
|
|
|
|
err := fn(data, &o)
|
|
|
|
return o, err
|
|
|
|
}
|
|
|
|
|
2021-02-17 03:31:03 +01:00
|
|
|
func (p *Pipeline) Load(ctx context.Context, op *compiler.Value) error {
|
2021-02-08 20:47:07 +01:00
|
|
|
// Execute 'from' in a tmp pipeline, and use the resulting fs
|
|
|
|
from := p.Tmp()
|
|
|
|
if err := from.Do(ctx, op.Get("from")); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
p.fs = p.fs.Set(from.FS().LLB())
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-02-17 03:31:03 +01:00
|
|
|
func (p *Pipeline) FetchContainer(ctx context.Context, op *compiler.Value) error {
|
2021-02-08 20:47:07 +01:00
|
|
|
ref, err := op.Get("ref").String()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
// FIXME: preserve docker image metadata
|
|
|
|
p.fs = p.fs.Set(llb.Image(ref))
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-02-17 03:31:03 +01:00
|
|
|
func (p *Pipeline) FetchGit(ctx context.Context, op *compiler.Value) error {
|
2021-02-08 20:47:07 +01:00
|
|
|
remote, err := op.Get("remote").String()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
ref, err := op.Get("ref").String()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
p.fs = p.fs.Set(llb.Git(remote, ref))
|
|
|
|
return nil
|
|
|
|
}
|