123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594 |
- package runtime
- import (
- "encoding/json"
- "fmt"
- "io"
- "io/ioutil"
- "os"
- "path/filepath"
- "syscall"
- "time"
- "github.com/Sirupsen/logrus"
- "github.com/docker/containerd/specs"
- "github.com/docker/containerd/subreaper/exec"
- ocs "github.com/opencontainers/runtime-spec/specs-go"
- )
- type Container interface {
- // ID returns the container ID
- ID() string
- // Path returns the path to the bundle
- Path() string
- // Start starts the init process of the container
- Start(checkpointPath string, s Stdio) (Process, error)
- // Exec starts another process in an existing container
- Exec(string, specs.ProcessSpec, Stdio) (Process, error)
- // Delete removes the container's state and any resources
- Delete() error
- // Processes returns all the containers processes that have been added
- Processes() ([]Process, error)
- // State returns the containers runtime state
- State() State
- // Resume resumes a paused container
- Resume() error
- // Pause pauses a running container
- Pause() error
- // RemoveProcess removes the specified process from the container
- RemoveProcess(string) error
- // Checkpoints returns all the checkpoints for a container
- Checkpoints(checkpointDir string) ([]Checkpoint, error)
- // Checkpoint creates a new checkpoint
- Checkpoint(checkpoint Checkpoint, checkpointDir string) error
- // DeleteCheckpoint deletes the checkpoint for the provided name
- DeleteCheckpoint(name string, checkpointDir string) error
- // Labels are user provided labels for the container
- Labels() []string
- // Pids returns all pids inside the container
- Pids() ([]int, error)
- // Stats returns realtime container stats and resource information
- Stats() (*Stat, error)
- // Name or path of the OCI compliant runtime used to execute the container
- Runtime() string
- // OOM signals the channel if the container received an OOM notification
- OOM() (OOM, error)
- // UpdateResource updates the containers resources to new values
- UpdateResources(*Resource) error
- // Status return the current status of the container.
- Status() (State, error)
- }
- type OOM interface {
- io.Closer
- FD() int
- ContainerID() string
- Flush()
- Removed() bool
- }
- type Stdio struct {
- Stdin string
- Stdout string
- Stderr string
- }
- func NewStdio(stdin, stdout, stderr string) Stdio {
- for _, s := range []*string{
- &stdin, &stdout, &stderr,
- } {
- if *s == "" {
- *s = "/dev/null"
- }
- }
- return Stdio{
- Stdin: stdin,
- Stdout: stdout,
- Stderr: stderr,
- }
- }
- type ContainerOpts struct {
- Root string
- ID string
- Bundle string
- Runtime string
- RuntimeArgs []string
- Shim string
- Labels []string
- NoPivotRoot bool
- Timeout time.Duration
- }
- // New returns a new container
- func New(opts ContainerOpts) (Container, error) {
- c := &container{
- root: opts.Root,
- id: opts.ID,
- bundle: opts.Bundle,
- labels: opts.Labels,
- processes: make(map[string]Process),
- runtime: opts.Runtime,
- runtimeArgs: opts.RuntimeArgs,
- shim: opts.Shim,
- noPivotRoot: opts.NoPivotRoot,
- timeout: opts.Timeout,
- }
- if err := os.Mkdir(filepath.Join(c.root, c.id), 0755); err != nil {
- return nil, err
- }
- f, err := os.Create(filepath.Join(c.root, c.id, StateFile))
- if err != nil {
- return nil, err
- }
- defer f.Close()
- if err := json.NewEncoder(f).Encode(state{
- Bundle: c.bundle,
- Labels: c.labels,
- Runtime: c.runtime,
- RuntimeArgs: c.runtimeArgs,
- Shim: c.shim,
- NoPivotRoot: opts.NoPivotRoot,
- }); err != nil {
- return nil, err
- }
- return c, nil
- }
- func Load(root, id string, timeout time.Duration) (Container, error) {
- var s state
- f, err := os.Open(filepath.Join(root, id, StateFile))
- if err != nil {
- return nil, err
- }
- defer f.Close()
- if err := json.NewDecoder(f).Decode(&s); err != nil {
- return nil, err
- }
- c := &container{
- root: root,
- id: id,
- bundle: s.Bundle,
- labels: s.Labels,
- runtime: s.Runtime,
- runtimeArgs: s.RuntimeArgs,
- shim: s.Shim,
- noPivotRoot: s.NoPivotRoot,
- processes: make(map[string]Process),
- timeout: timeout,
- }
- dirs, err := ioutil.ReadDir(filepath.Join(root, id))
- if err != nil {
- return nil, err
- }
- for _, d := range dirs {
- if !d.IsDir() {
- continue
- }
- pid := d.Name()
- s, err := readProcessState(filepath.Join(root, id, pid))
- if err != nil {
- return nil, err
- }
- p, err := loadProcess(filepath.Join(root, id, pid), pid, c, s)
- if err != nil {
- logrus.WithField("id", id).WithField("pid", pid).Debug("containerd: error loading process %s", err)
- continue
- }
- c.processes[pid] = p
- }
- return c, nil
- }
- func readProcessState(dir string) (*ProcessState, error) {
- f, err := os.Open(filepath.Join(dir, "process.json"))
- if err != nil {
- return nil, err
- }
- defer f.Close()
- var s ProcessState
- if err := json.NewDecoder(f).Decode(&s); err != nil {
- return nil, err
- }
- return &s, nil
- }
- type container struct {
- // path to store runtime state information
- root string
- id string
- bundle string
- runtime string
- runtimeArgs []string
- shim string
- processes map[string]Process
- labels []string
- oomFds []int
- noPivotRoot bool
- timeout time.Duration
- }
- func (c *container) ID() string {
- return c.id
- }
- func (c *container) Path() string {
- return c.bundle
- }
- func (c *container) Labels() []string {
- return c.labels
- }
- func (c *container) readSpec() (*specs.Spec, error) {
- var spec specs.Spec
- f, err := os.Open(filepath.Join(c.bundle, "config.json"))
- if err != nil {
- return nil, err
- }
- defer f.Close()
- if err := json.NewDecoder(f).Decode(&spec); err != nil {
- return nil, err
- }
- return &spec, nil
- }
- func (c *container) Delete() error {
- err := os.RemoveAll(filepath.Join(c.root, c.id))
- args := c.runtimeArgs
- args = append(args, "delete", c.id)
- if derr := exec.Command(c.runtime, args...).Run(); err == nil {
- err = derr
- }
- return err
- }
- func (c *container) Processes() ([]Process, error) {
- out := []Process{}
- for _, p := range c.processes {
- out = append(out, p)
- }
- return out, nil
- }
- func (c *container) RemoveProcess(pid string) error {
- delete(c.processes, pid)
- return os.RemoveAll(filepath.Join(c.root, c.id, pid))
- }
- func (c *container) State() State {
- proc := c.processes["init"]
- if proc == nil {
- return Stopped
- }
- return proc.State()
- }
- func (c *container) Runtime() string {
- return c.runtime
- }
- func (c *container) Pause() error {
- args := c.runtimeArgs
- args = append(args, "pause", c.id)
- b, err := exec.Command(c.runtime, args...).CombinedOutput()
- if err != nil {
- return fmt.Errorf("%s: %q", err.Error(), string(b))
- }
- return nil
- }
- func (c *container) Resume() error {
- args := c.runtimeArgs
- args = append(args, "resume", c.id)
- b, err := exec.Command(c.runtime, args...).CombinedOutput()
- if err != nil {
- return fmt.Errorf("%s: %q", err.Error(), string(b))
- }
- return nil
- }
- func (c *container) Checkpoints(checkpointDir string) ([]Checkpoint, error) {
- if checkpointDir == "" {
- checkpointDir = filepath.Join(c.bundle, "checkpoints")
- }
- dirs, err := ioutil.ReadDir(checkpointDir)
- if err != nil {
- return nil, err
- }
- var out []Checkpoint
- for _, d := range dirs {
- if !d.IsDir() {
- continue
- }
- path := filepath.Join(checkpointDir, d.Name(), "config.json")
- data, err := ioutil.ReadFile(path)
- if err != nil {
- return nil, err
- }
- var cpt Checkpoint
- if err := json.Unmarshal(data, &cpt); err != nil {
- return nil, err
- }
- out = append(out, cpt)
- }
- return out, nil
- }
- func (c *container) Checkpoint(cpt Checkpoint, checkpointDir string) error {
- if checkpointDir == "" {
- checkpointDir = filepath.Join(c.bundle, "checkpoints")
- }
- if err := os.MkdirAll(checkpointDir, 0755); err != nil {
- return err
- }
- path := filepath.Join(checkpointDir, cpt.Name)
- if err := os.Mkdir(path, 0755); err != nil {
- return err
- }
- f, err := os.Create(filepath.Join(path, "config.json"))
- if err != nil {
- return err
- }
- cpt.Created = time.Now()
- err = json.NewEncoder(f).Encode(cpt)
- f.Close()
- if err != nil {
- return err
- }
- args := []string{
- "checkpoint",
- "--image-path", path,
- }
- add := func(flags ...string) {
- args = append(args, flags...)
- }
- add(c.runtimeArgs...)
- if !cpt.Exit {
- add("--leave-running")
- }
- if cpt.Shell {
- add("--shell-job")
- }
- if cpt.Tcp {
- add("--tcp-established")
- }
- if cpt.UnixSockets {
- add("--ext-unix-sk")
- }
- add(c.id)
- out, err := exec.Command(c.runtime, args...).CombinedOutput()
- if err != nil {
- return fmt.Errorf("%s: %q", err.Error(), string(out))
- }
- return err
- }
- func (c *container) DeleteCheckpoint(name string, checkpointDir string) error {
- if checkpointDir == "" {
- checkpointDir = filepath.Join(c.bundle, "checkpoints")
- }
- return os.RemoveAll(filepath.Join(checkpointDir, name))
- }
- func (c *container) Start(checkpointPath string, s Stdio) (Process, error) {
- processRoot := filepath.Join(c.root, c.id, InitProcessID)
- if err := os.Mkdir(processRoot, 0755); err != nil {
- return nil, err
- }
- spec, err := c.readSpec()
- if err != nil {
- return nil, err
- }
- config := &processConfig{
- checkpoint: checkpointPath,
- root: processRoot,
- id: InitProcessID,
- c: c,
- stdio: s,
- spec: spec,
- processSpec: specs.ProcessSpec(spec.Process),
- }
- p, err := c.newProcess(config)
- if err != nil {
- return nil, err
- }
- if err := p.Start(); err != nil {
- return nil, err
- }
- c.processes[InitProcessID] = p
- return p, nil
- }
- func (c *container) Exec(pid string, pspec specs.ProcessSpec, s Stdio) (pp Process, err error) {
- processRoot := filepath.Join(c.root, c.id, pid)
- if err := os.Mkdir(processRoot, 0755); err != nil {
- return nil, err
- }
- defer func() {
- if err != nil {
- c.RemoveProcess(pid)
- }
- }()
- spec, err := c.readSpec()
- if err != nil {
- return nil, err
- }
- config := &processConfig{
- exec: true,
- id: pid,
- root: processRoot,
- c: c,
- processSpec: pspec,
- spec: spec,
- stdio: s,
- }
- p, err := c.newProcess(config)
- if err != nil {
- return nil, err
- }
- if err := p.Start(); err != nil {
- return nil, err
- }
- c.processes[pid] = p
- return p, nil
- }
- func hostIDFromMap(id uint32, mp []ocs.IDMapping) int {
- for _, m := range mp {
- if (id >= m.ContainerID) && (id <= (m.ContainerID + m.Size - 1)) {
- return int(m.HostID + (id - m.ContainerID))
- }
- }
- return 0
- }
- func (c *container) Pids() ([]int, error) {
- args := c.runtimeArgs
- args = append(args, "ps", "--format=json", c.id)
- out, err := exec.Command(c.runtime, args...).CombinedOutput()
- if err != nil {
- return nil, fmt.Errorf("%s: %q", err.Error(), out)
- }
- var pids []int
- if err := json.Unmarshal(out, &pids); err != nil {
- return nil, err
- }
- return pids, nil
- }
- func (c *container) Stats() (*Stat, error) {
- now := time.Now()
- args := c.runtimeArgs
- args = append(args, "events", "--stats", c.id)
- out, err := exec.Command(c.runtime, args...).CombinedOutput()
- if err != nil {
- return nil, fmt.Errorf("%s: %q", err.Error(), out)
- }
- s := struct {
- Data *Stat `json:"data"`
- }{}
- if err := json.Unmarshal(out, &s); err != nil {
- return nil, err
- }
- s.Data.Timestamp = now
- return s.Data, nil
- }
- // Status implements the runtime Container interface.
- func (c *container) Status() (State, error) {
- args := c.runtimeArgs
- args = append(args, "state", c.id)
- out, err := exec.Command(c.runtime, args...).CombinedOutput()
- if err != nil {
- return "", fmt.Errorf("%s: %q", err.Error(), out)
- }
- // We only require the runtime json output to have a top level Status field.
- var s struct {
- Status State `json:"status"`
- }
- if err := json.Unmarshal(out, &s); err != nil {
- return "", err
- }
- return s.Status, nil
- }
- func (c *container) writeEventFD(root string, cfd, efd int) error {
- f, err := os.OpenFile(filepath.Join(root, "cgroup.event_control"), os.O_WRONLY, 0)
- if err != nil {
- return err
- }
- defer f.Close()
- _, err = f.WriteString(fmt.Sprintf("%d %d", efd, cfd))
- return err
- }
- func (c *container) newProcess(config *processConfig) (Process, error) {
- if c.shim == "" {
- return newDirectProcess(config)
- }
- return newProcess(config)
- }
- type waitArgs struct {
- pid int
- err error
- }
- // isAlive checks if the shim that launched the container is still alive
- func isAlive(cmd *exec.Cmd) (bool, error) {
- if err := syscall.Kill(cmd.Process.Pid, 0); err != nil {
- if err == syscall.ESRCH {
- return false, nil
- }
- return false, err
- }
- return true, nil
- }
- type oom struct {
- id string
- root string
- control *os.File
- eventfd int
- }
- func (o *oom) ContainerID() string {
- return o.id
- }
- func (o *oom) FD() int {
- return o.eventfd
- }
- func (o *oom) Flush() {
- buf := make([]byte, 8)
- syscall.Read(o.eventfd, buf)
- }
- func (o *oom) Removed() bool {
- _, err := os.Lstat(filepath.Join(o.root, "cgroup.event_control"))
- return os.IsNotExist(err)
- }
- func (o *oom) Close() error {
- err := syscall.Close(o.eventfd)
- if cerr := o.control.Close(); err == nil {
- err = cerr
- }
- return err
- }
- type message struct {
- Level string `json:"level"`
- Msg string `json:"msg"`
- }
- func readLogMessages(path string) ([]message, error) {
- var out []message
- f, err := os.Open(path)
- if err != nil {
- return nil, err
- }
- defer f.Close()
- dec := json.NewDecoder(f)
- for {
- var m message
- if err := dec.Decode(&m); err != nil {
- if err == io.EOF {
- break
- }
- return nil, err
- }
- out = append(out, m)
- }
- return out, nil
- }
|