428 lines
12 KiB
Go
428 lines
12 KiB
Go
package dockerclient
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
cerrdefs "github.com/containerd/errdefs"
|
|
"github.com/moby/moby/api/types/container"
|
|
"github.com/moby/moby/api/types/mount"
|
|
"github.com/moby/moby/api/types/network"
|
|
mobyclient "github.com/moby/moby/client"
|
|
)
|
|
|
|
// enginePort is the in-container HTTP port the engine listens on. Galaxy
|
|
// never publishes the port to the host; it is reachable only through
|
|
// Docker DNS on the user-defined network.
|
|
const enginePort = 8080
|
|
|
|
// Adapter is the production *Client implementation backed by
|
|
// `github.com/moby/moby/client`. Use NewAdapter to construct it.
|
|
type Adapter struct {
|
|
docker *mobyclient.Client
|
|
clock func() time.Time
|
|
}
|
|
|
|
// AdapterConfig configures an Adapter.
|
|
type AdapterConfig struct {
|
|
// Docker is the underlying Moby client. Must be non-nil.
|
|
Docker *mobyclient.Client
|
|
|
|
// Clock supplies the wall-clock used when the daemon does not
|
|
// return a parseable started_at value. Defaults to time.Now.
|
|
Clock func() time.Time
|
|
}
|
|
|
|
// NewAdapter wraps a moby client with the dockerclient port surface.
|
|
func NewAdapter(cfg AdapterConfig) (*Adapter, error) {
|
|
if cfg.Docker == nil {
|
|
return nil, errors.New("dockerclient: nil moby client")
|
|
}
|
|
clock := cfg.Clock
|
|
if clock == nil {
|
|
clock = time.Now
|
|
}
|
|
return &Adapter{docker: cfg.Docker, clock: clock}, nil
|
|
}
|
|
|
|
// EnsureNetwork returns nil when the named user-defined network exists
|
|
// on the daemon; ErrNetworkMissing otherwise. Adapter never creates
|
|
// networks itself — operators provision the network ahead of time.
|
|
func (a *Adapter) EnsureNetwork(ctx context.Context, name string) error {
|
|
if _, err := a.docker.NetworkInspect(ctx, name, mobyclient.NetworkInspectOptions{}); err != nil {
|
|
if cerrdefs.IsNotFound(err) {
|
|
return ErrNetworkMissing
|
|
}
|
|
return fmt.Errorf("dockerclient: inspect network %q: %w", name, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// PullImage pulls ref according to policy. The pull stream is fully
|
|
// drained synchronously so callers know the image is ready when this
|
|
// returns nil.
|
|
func (a *Adapter) PullImage(ctx context.Context, ref string, policy PullPolicy) error {
|
|
if !policy.IsKnown() {
|
|
return ErrInvalidPullPolicy
|
|
}
|
|
switch policy {
|
|
case PullPolicyNever:
|
|
if _, err := a.InspectImage(ctx, ref); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
case PullPolicyIfMissing:
|
|
if _, err := a.InspectImage(ctx, ref); err == nil {
|
|
return nil
|
|
} else if !errors.Is(err, ErrImageNotFound) {
|
|
return err
|
|
}
|
|
}
|
|
resp, err := a.docker.ImagePull(ctx, ref, mobyclient.ImagePullOptions{})
|
|
if err != nil {
|
|
return fmt.Errorf("%w: pull %q: %v", ErrImagePullFailed, ref, err)
|
|
}
|
|
if _, drainErr := io.Copy(io.Discard, resp); drainErr != nil {
|
|
_ = resp.Close()
|
|
return fmt.Errorf("%w: drain %q: %v", ErrImagePullFailed, ref, drainErr)
|
|
}
|
|
if closeErr := resp.Close(); closeErr != nil {
|
|
return fmt.Errorf("%w: close %q: %v", ErrImagePullFailed, ref, closeErr)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// InspectImage returns the labels of ref. Maps daemon `not found` to
|
|
// ErrImageNotFound.
|
|
func (a *Adapter) InspectImage(ctx context.Context, ref string) (ImageInspect, error) {
|
|
res, err := a.docker.ImageInspect(ctx, ref)
|
|
if err != nil {
|
|
if cerrdefs.IsNotFound(err) {
|
|
return ImageInspect{}, ErrImageNotFound
|
|
}
|
|
return ImageInspect{}, fmt.Errorf("dockerclient: inspect image %q: %w", ref, err)
|
|
}
|
|
out := ImageInspect{Ref: ref}
|
|
if res.Config != nil {
|
|
out.Labels = cloneStringMap(res.Config.Labels)
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
// InspectContainer returns the metadata for idOrName. Maps daemon
|
|
// `not found` to ErrContainerNotFound.
|
|
func (a *Adapter) InspectContainer(ctx context.Context, idOrName string) (ContainerInspect, error) {
|
|
res, err := a.docker.ContainerInspect(ctx, idOrName, mobyclient.ContainerInspectOptions{})
|
|
if err != nil {
|
|
if cerrdefs.IsNotFound(err) {
|
|
return ContainerInspect{}, ErrContainerNotFound
|
|
}
|
|
return ContainerInspect{}, fmt.Errorf("dockerclient: inspect container %q: %w", idOrName, err)
|
|
}
|
|
return mapContainerInspect(res.Container), nil
|
|
}
|
|
|
|
// Run pulls the image (per spec.PullPolicy), creates the container with
|
|
// the documented label set, attaches it to spec.Network, starts it, and
|
|
// returns the canonical engine endpoint URL.
|
|
func (a *Adapter) Run(ctx context.Context, spec RunSpec) (RunResult, error) {
|
|
if strings.TrimSpace(spec.Name) == "" {
|
|
return RunResult{}, errors.New("dockerclient: run: name must not be empty")
|
|
}
|
|
if strings.TrimSpace(spec.Image) == "" {
|
|
return RunResult{}, errors.New("dockerclient: run: image must not be empty")
|
|
}
|
|
if strings.TrimSpace(spec.Network) == "" {
|
|
return RunResult{}, errors.New("dockerclient: run: network must not be empty")
|
|
}
|
|
if strings.TrimSpace(spec.Hostname) == "" {
|
|
return RunResult{}, errors.New("dockerclient: run: hostname must not be empty")
|
|
}
|
|
policy := spec.PullPolicy
|
|
if policy == "" {
|
|
policy = PullPolicyIfMissing
|
|
}
|
|
if err := a.PullImage(ctx, spec.Image, policy); err != nil {
|
|
return RunResult{}, err
|
|
}
|
|
|
|
envSlice := make([]string, 0, len(spec.Env))
|
|
for k, v := range spec.Env {
|
|
envSlice = append(envSlice, k+"="+v)
|
|
}
|
|
|
|
labels := make(map[string]string, len(spec.Labels)+1)
|
|
for k, v := range spec.Labels {
|
|
labels[k] = v
|
|
}
|
|
labels[ManagedLabel] = ManagedLabelValue
|
|
|
|
mounts := make([]mount.Mount, 0, len(spec.BindMounts))
|
|
for _, b := range spec.BindMounts {
|
|
mounts = append(mounts, mount.Mount{
|
|
Type: mount.TypeBind,
|
|
Source: b.HostPath,
|
|
Target: b.MountPath,
|
|
ReadOnly: b.ReadOnly,
|
|
})
|
|
}
|
|
|
|
resources := container.Resources{}
|
|
if spec.CPUQuota > 0 {
|
|
// Convert decimal cpus into NanoCPUs (1.0 = 1e9).
|
|
resources.NanoCPUs = int64(spec.CPUQuota * 1e9)
|
|
}
|
|
if mem, err := parseMemoryString(spec.Memory); err != nil {
|
|
return RunResult{}, err
|
|
} else if mem > 0 {
|
|
resources.Memory = mem
|
|
}
|
|
if spec.PIDsLimit > 0 {
|
|
pl := int64(spec.PIDsLimit)
|
|
resources.PidsLimit = &pl
|
|
}
|
|
|
|
logConfig := container.LogConfig{}
|
|
if spec.LogDriver != "" {
|
|
logConfig.Type = spec.LogDriver
|
|
}
|
|
if spec.LogOpts != "" {
|
|
opts, err := parseLogOpts(spec.LogOpts)
|
|
if err != nil {
|
|
return RunResult{}, err
|
|
}
|
|
logConfig.Config = opts
|
|
}
|
|
|
|
hostCfg := &container.HostConfig{
|
|
NetworkMode: container.NetworkMode(spec.Network),
|
|
Mounts: mounts,
|
|
LogConfig: logConfig,
|
|
Resources: resources,
|
|
AutoRemove: false,
|
|
ReadonlyRootfs: false,
|
|
RestartPolicy: container.RestartPolicy{
|
|
Name: container.RestartPolicyOnFailure,
|
|
},
|
|
}
|
|
|
|
netCfg := &network.NetworkingConfig{
|
|
EndpointsConfig: map[string]*network.EndpointSettings{
|
|
spec.Network: {
|
|
Aliases: []string{spec.Hostname},
|
|
},
|
|
},
|
|
}
|
|
|
|
created, err := a.docker.ContainerCreate(ctx, mobyclient.ContainerCreateOptions{
|
|
Name: spec.Name,
|
|
Config: &container.Config{
|
|
Hostname: spec.Hostname,
|
|
Image: spec.Image,
|
|
Env: envSlice,
|
|
Cmd: spec.Cmd,
|
|
Labels: labels,
|
|
},
|
|
HostConfig: hostCfg,
|
|
NetworkingConfig: netCfg,
|
|
})
|
|
if err != nil {
|
|
return RunResult{}, fmt.Errorf("dockerclient: create container %q: %w", spec.Name, err)
|
|
}
|
|
|
|
if _, err := a.docker.ContainerStart(ctx, created.ID, mobyclient.ContainerStartOptions{}); err != nil {
|
|
// Best-effort: try to remove the freshly-created container so we
|
|
// do not leak a half-started one.
|
|
_, _ = a.docker.ContainerRemove(ctx, created.ID, mobyclient.ContainerRemoveOptions{Force: true})
|
|
return RunResult{}, fmt.Errorf("dockerclient: start container %q: %w", spec.Name, err)
|
|
}
|
|
|
|
startedAt := a.clock()
|
|
if inspect, err := a.docker.ContainerInspect(ctx, created.ID, mobyclient.ContainerInspectOptions{}); err == nil {
|
|
if inspect.Container.State != nil && inspect.Container.State.StartedAt != "" {
|
|
if parsed, perr := time.Parse(time.RFC3339Nano, inspect.Container.State.StartedAt); perr == nil {
|
|
startedAt = parsed
|
|
}
|
|
}
|
|
}
|
|
|
|
return RunResult{
|
|
ContainerID: created.ID,
|
|
EngineEndpoint: fmt.Sprintf("http://%s:%d", spec.Hostname, enginePort),
|
|
StartedAt: startedAt,
|
|
}, nil
|
|
}
|
|
|
|
// Stop sends SIGTERM to idOrName and waits up to timeoutSeconds before
|
|
// forcibly killing it. Maps daemon `not found` to ErrContainerNotFound.
|
|
func (a *Adapter) Stop(ctx context.Context, idOrName string, timeoutSeconds int) error {
|
|
opts := mobyclient.ContainerStopOptions{}
|
|
if timeoutSeconds >= 0 {
|
|
t := timeoutSeconds
|
|
opts.Timeout = &t
|
|
}
|
|
if _, err := a.docker.ContainerStop(ctx, idOrName, opts); err != nil {
|
|
if cerrdefs.IsNotFound(err) {
|
|
return ErrContainerNotFound
|
|
}
|
|
return fmt.Errorf("dockerclient: stop %q: %w", idOrName, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Remove deletes idOrName. Idempotent: nil when the container is
|
|
// already gone.
|
|
func (a *Adapter) Remove(ctx context.Context, idOrName string) error {
|
|
if _, err := a.docker.ContainerRemove(ctx, idOrName, mobyclient.ContainerRemoveOptions{Force: true}); err != nil {
|
|
if cerrdefs.IsNotFound(err) {
|
|
return nil
|
|
}
|
|
return fmt.Errorf("dockerclient: remove %q: %w", idOrName, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// List returns container summaries that match filter.
|
|
func (a *Adapter) List(ctx context.Context, filter ListFilter) ([]ContainerSummary, error) {
|
|
filters := mobyclient.Filters{}
|
|
for k, v := range filter.Labels {
|
|
if v == "" {
|
|
filters.Add("label", k)
|
|
continue
|
|
}
|
|
filters.Add("label", k+"="+v)
|
|
}
|
|
res, err := a.docker.ContainerList(ctx, mobyclient.ContainerListOptions{
|
|
All: true,
|
|
Filters: filters,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("dockerclient: list: %w", err)
|
|
}
|
|
out := make([]ContainerSummary, 0, len(res.Items))
|
|
for _, item := range res.Items {
|
|
out = append(out, mapContainerSummary(item))
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
func mapContainerInspect(c container.InspectResponse) ContainerInspect {
|
|
out := ContainerInspect{
|
|
ID: c.ID,
|
|
Name: strings.TrimPrefix(c.Name, "/"),
|
|
ImageRef: c.Image,
|
|
}
|
|
if c.Config != nil {
|
|
out.Hostname = c.Config.Hostname
|
|
out.Labels = cloneStringMap(c.Config.Labels)
|
|
if out.ImageRef == "" {
|
|
out.ImageRef = c.Config.Image
|
|
}
|
|
}
|
|
if c.State != nil {
|
|
out.Status = string(c.State.Status)
|
|
out.ExitCode = c.State.ExitCode
|
|
if t, err := time.Parse(time.RFC3339Nano, c.State.StartedAt); err == nil && !t.IsZero() {
|
|
out.StartedAt = t
|
|
}
|
|
if t, err := time.Parse(time.RFC3339Nano, c.State.FinishedAt); err == nil && !t.IsZero() {
|
|
out.FinishedAt = t
|
|
}
|
|
if c.State.Health != nil {
|
|
out.Health = string(c.State.Health.Status)
|
|
}
|
|
}
|
|
return out
|
|
}
|
|
|
|
func mapContainerSummary(s container.Summary) ContainerSummary {
|
|
out := ContainerSummary{
|
|
ID: s.ID,
|
|
ImageRef: s.Image,
|
|
Status: string(s.State),
|
|
Labels: cloneStringMap(s.Labels),
|
|
}
|
|
if len(s.Names) > 0 {
|
|
out.Name = strings.TrimPrefix(s.Names[0], "/")
|
|
}
|
|
out.StartedAt = time.Unix(s.Created, 0).UTC()
|
|
return out
|
|
}
|
|
|
|
func cloneStringMap(in map[string]string) map[string]string {
|
|
if len(in) == 0 {
|
|
return nil
|
|
}
|
|
out := make(map[string]string, len(in))
|
|
for k, v := range in {
|
|
out[k] = v
|
|
}
|
|
return out
|
|
}
|
|
|
|
// parseMemoryString accepts the docker `--memory` short forms (e.g.
|
|
// `512m`, `1g`) and returns the corresponding byte count. An empty
|
|
// string yields 0 (no memory limit). Unknown formats produce an error.
|
|
func parseMemoryString(raw string) (int64, error) {
|
|
raw = strings.TrimSpace(raw)
|
|
if raw == "" {
|
|
return 0, nil
|
|
}
|
|
multiplier := int64(1)
|
|
last := raw[len(raw)-1]
|
|
digits := raw
|
|
switch last {
|
|
case 'b', 'B':
|
|
multiplier = 1
|
|
digits = raw[:len(raw)-1]
|
|
case 'k', 'K':
|
|
multiplier = 1024
|
|
digits = raw[:len(raw)-1]
|
|
case 'm', 'M':
|
|
multiplier = 1024 * 1024
|
|
digits = raw[:len(raw)-1]
|
|
case 'g', 'G':
|
|
multiplier = 1024 * 1024 * 1024
|
|
digits = raw[:len(raw)-1]
|
|
default:
|
|
if last < '0' || last > '9' {
|
|
return 0, fmt.Errorf("dockerclient: invalid memory suffix in %q", raw)
|
|
}
|
|
}
|
|
n, err := strconv.ParseInt(digits, 10, 64)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("dockerclient: parse memory %q: %w", raw, err)
|
|
}
|
|
if n < 0 {
|
|
return 0, fmt.Errorf("dockerclient: memory must be non-negative, got %q", raw)
|
|
}
|
|
return n * multiplier, nil
|
|
}
|
|
|
|
// parseLogOpts splits a comma-separated `key=value` list into a map.
|
|
func parseLogOpts(raw string) (map[string]string, error) {
|
|
out := make(map[string]string)
|
|
for _, pair := range strings.Split(raw, ",") {
|
|
pair = strings.TrimSpace(pair)
|
|
if pair == "" {
|
|
continue
|
|
}
|
|
k, v, ok := strings.Cut(pair, "=")
|
|
if !ok {
|
|
return nil, fmt.Errorf("dockerclient: log opt %q must be key=value", pair)
|
|
}
|
|
k = strings.TrimSpace(k)
|
|
v = strings.TrimSpace(v)
|
|
if k == "" {
|
|
return nil, fmt.Errorf("dockerclient: log opt %q has empty key", pair)
|
|
}
|
|
out[k] = v
|
|
}
|
|
return out, nil
|
|
}
|