123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475 |
- package supervisor
- import (
- "sync"
- "time"
- "github.com/Sirupsen/logrus"
- "github.com/docker/containerd/runtime"
- )
- type Worker interface {
- Start()
- }
- type startTask struct {
- Container runtime.Container
- CheckpointPath string
- Stdin string
- Stdout string
- Stderr string
- Err chan error
- StartResponse chan StartResponse
- }
- func NewWorker(s *Supervisor, wg *sync.WaitGroup) Worker {
- return &worker{
- s: s,
- wg: wg,
- }
- }
- type worker struct {
- wg *sync.WaitGroup
- s *Supervisor
- }
- func (w *worker) Start() {
- defer w.wg.Done()
- for t := range w.s.startTasks {
- started := time.Now()
- process, err := t.Container.Start(t.CheckpointPath, runtime.NewStdio(t.Stdin, t.Stdout, t.Stderr))
- if err != nil {
- logrus.WithFields(logrus.Fields{
- "error": err,
- "id": t.Container.ID(),
- }).Error("containerd: start container")
- t.Err <- err
- evt := &DeleteTask{
- ID: t.Container.ID(),
- NoEvent: true,
- }
- w.s.SendTask(evt)
- continue
- }
- if err := w.s.monitor.MonitorOOM(t.Container); err != nil && err != runtime.ErrContainerExited {
- if process.State() != runtime.Stopped {
- logrus.WithField("error", err).Error("containerd: notify OOM events")
- }
- }
- if err := w.s.monitorProcess(process); err != nil {
- logrus.WithField("error", err).Error("containerd: add process to monitor")
- }
- ContainerStartTimer.UpdateSince(started)
- t.Err <- nil
- t.StartResponse <- StartResponse{
- Container: t.Container,
- }
- w.s.notifySubscribers(Event{
- Timestamp: time.Now(),
- ID: t.Container.ID(),
- Type: StateStart,
- })
- }
- }
|