monitor_linux.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. package supervisor
  2. import (
  3. "sync"
  4. "syscall"
  5. "github.com/Sirupsen/logrus"
  6. "github.com/docker/containerd/archutils"
  7. "github.com/docker/containerd/runtime"
  8. )
  9. func NewMonitor() (*Monitor, error) {
  10. m := &Monitor{
  11. receivers: make(map[int]interface{}),
  12. exits: make(chan runtime.Process, 1024),
  13. ooms: make(chan string, 1024),
  14. }
  15. fd, err := archutils.EpollCreate1(0)
  16. if err != nil {
  17. return nil, err
  18. }
  19. m.epollFd = fd
  20. go m.start()
  21. return m, nil
  22. }
  23. type Monitor struct {
  24. m sync.Mutex
  25. receivers map[int]interface{}
  26. exits chan runtime.Process
  27. ooms chan string
  28. epollFd int
  29. }
  30. func (m *Monitor) Exits() chan runtime.Process {
  31. return m.exits
  32. }
  33. func (m *Monitor) OOMs() chan string {
  34. return m.ooms
  35. }
  36. func (m *Monitor) Monitor(p runtime.Process) error {
  37. m.m.Lock()
  38. defer m.m.Unlock()
  39. fd := p.ExitFD()
  40. event := syscall.EpollEvent{
  41. Fd: int32(fd),
  42. Events: syscall.EPOLLHUP,
  43. }
  44. if err := archutils.EpollCtl(m.epollFd, syscall.EPOLL_CTL_ADD, fd, &event); err != nil {
  45. return err
  46. }
  47. EpollFdCounter.Inc(1)
  48. m.receivers[fd] = p
  49. return nil
  50. }
  51. func (m *Monitor) MonitorOOM(c runtime.Container) error {
  52. m.m.Lock()
  53. defer m.m.Unlock()
  54. o, err := c.OOM()
  55. if err != nil {
  56. return err
  57. }
  58. fd := o.FD()
  59. event := syscall.EpollEvent{
  60. Fd: int32(fd),
  61. Events: syscall.EPOLLHUP | syscall.EPOLLIN,
  62. }
  63. if err := archutils.EpollCtl(m.epollFd, syscall.EPOLL_CTL_ADD, fd, &event); err != nil {
  64. return err
  65. }
  66. EpollFdCounter.Inc(1)
  67. m.receivers[fd] = o
  68. return nil
  69. }
  70. func (m *Monitor) Close() error {
  71. return syscall.Close(m.epollFd)
  72. }
  73. func (m *Monitor) start() {
  74. var events [128]syscall.EpollEvent
  75. for {
  76. n, err := archutils.EpollWait(m.epollFd, events[:], -1)
  77. if err != nil {
  78. if err == syscall.EINTR {
  79. continue
  80. }
  81. logrus.WithField("error", err).Fatal("containerd: epoll wait")
  82. }
  83. // process events
  84. for i := 0; i < n; i++ {
  85. fd := int(events[i].Fd)
  86. m.m.Lock()
  87. r := m.receivers[fd]
  88. switch t := r.(type) {
  89. case runtime.Process:
  90. if events[i].Events == syscall.EPOLLHUP {
  91. delete(m.receivers, fd)
  92. if err = syscall.EpollCtl(m.epollFd, syscall.EPOLL_CTL_DEL, fd, &syscall.EpollEvent{
  93. Events: syscall.EPOLLHUP,
  94. Fd: int32(fd),
  95. }); err != nil {
  96. logrus.WithField("error", err).Error("containerd: epoll remove fd")
  97. }
  98. if err := t.Close(); err != nil {
  99. logrus.WithField("error", err).Error("containerd: close process IO")
  100. }
  101. EpollFdCounter.Dec(1)
  102. m.exits <- t
  103. }
  104. case runtime.OOM:
  105. // always flush the event fd
  106. t.Flush()
  107. if t.Removed() {
  108. delete(m.receivers, fd)
  109. // epoll will remove the fd from its set after it has been closed
  110. t.Close()
  111. EpollFdCounter.Dec(1)
  112. } else {
  113. m.ooms <- t.ContainerID()
  114. }
  115. }
  116. m.m.Unlock()
  117. }
  118. }
  119. }