worker.go 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. package supervisor
  2. import (
  3. "sync"
  4. "time"
  5. "github.com/Sirupsen/logrus"
  6. "github.com/docker/containerd/runtime"
  7. )
  8. type Worker interface {
  9. Start()
  10. }
  11. type startTask struct {
  12. Container runtime.Container
  13. CheckpointPath string
  14. Stdin string
  15. Stdout string
  16. Stderr string
  17. Err chan error
  18. StartResponse chan StartResponse
  19. }
  20. func NewWorker(s *Supervisor, wg *sync.WaitGroup) Worker {
  21. return &worker{
  22. s: s,
  23. wg: wg,
  24. }
  25. }
  26. type worker struct {
  27. wg *sync.WaitGroup
  28. s *Supervisor
  29. }
  30. func (w *worker) Start() {
  31. defer w.wg.Done()
  32. for t := range w.s.startTasks {
  33. started := time.Now()
  34. process, err := t.Container.Start(t.CheckpointPath, runtime.NewStdio(t.Stdin, t.Stdout, t.Stderr))
  35. if err != nil {
  36. logrus.WithFields(logrus.Fields{
  37. "error": err,
  38. "id": t.Container.ID(),
  39. }).Error("containerd: start container")
  40. t.Err <- err
  41. evt := &DeleteTask{
  42. ID: t.Container.ID(),
  43. NoEvent: true,
  44. }
  45. w.s.SendTask(evt)
  46. continue
  47. }
  48. if err := w.s.monitor.MonitorOOM(t.Container); err != nil && err != runtime.ErrContainerExited {
  49. if process.State() != runtime.Stopped {
  50. logrus.WithField("error", err).Error("containerd: notify OOM events")
  51. }
  52. }
  53. if err := w.s.monitorProcess(process); err != nil {
  54. logrus.WithField("error", err).Error("containerd: add process to monitor")
  55. }
  56. ContainerStartTimer.UpdateSince(started)
  57. t.Err <- nil
  58. t.StartResponse <- StartResponse{
  59. Container: t.Container,
  60. }
  61. w.s.notifySubscribers(Event{
  62. Timestamp: time.Now(),
  63. ID: t.Container.ID(),
  64. Type: StateStart,
  65. })
  66. }
  67. }