supervisor.go 9.6 KB


  1. package supervisor
  2. import (
  3. "encoding/json"
  4. "io"
  5. "io/ioutil"
  6. "os"
  7. "path/filepath"
  8. "sync"
  9. "time"
  10. "github.com/Sirupsen/logrus"
  11. "github.com/docker/containerd/runtime"
  12. )
  13. const (
  14. defaultBufferSize = 2048 // size of queue in eventloop
  15. )
  16. // New returns an initialized Process supervisor.
  17. func New(stateDir string, runtimeName, shimName string, runtimeArgs []string, timeout time.Duration, retainCount int) (*Supervisor, error) {
  18. startTasks := make(chan *startTask, 10)
  19. if err := os.MkdirAll(stateDir, 0755); err != nil {
  20. return nil, err
  21. }
  22. machine, err := CollectMachineInformation()
  23. if err != nil {
  24. return nil, err
  25. }
  26. monitor, err := NewMonitor()
  27. if err != nil {
  28. return nil, err
  29. }
  30. s := &Supervisor{
  31. stateDir: stateDir,
  32. containers: make(map[string]*containerInfo),
  33. startTasks: startTasks,
  34. machine: machine,
  35. subscribers: make(map[chan Event]struct{}),
  36. tasks: make(chan Task, defaultBufferSize),
  37. monitor: monitor,
  38. runtime: runtimeName,
  39. runtimeArgs: runtimeArgs,
  40. shim: shimName,
  41. timeout: timeout,
  42. }
  43. if err := setupEventLog(s, retainCount); err != nil {
  44. return nil, err
  45. }
  46. go s.exitHandler()
  47. go s.oomHandler()
  48. if err := s.restore(); err != nil {
  49. return nil, err
  50. }
  51. return s, nil
  52. }
  53. type containerInfo struct {
  54. container runtime.Container
  55. }
  56. func setupEventLog(s *Supervisor, retainCount int) error {
  57. if err := readEventLog(s); err != nil {
  58. return err
  59. }
  60. logrus.WithField("count", len(s.eventLog)).Debug("containerd: read past events")
  61. events := s.Events(time.Time{})
  62. return eventLogger(s, filepath.Join(s.stateDir, "events.log"), events, retainCount)
  63. }
  64. func eventLogger(s *Supervisor, path string, events chan Event, retainCount int) error {
  65. f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND|os.O_TRUNC, 0755)
  66. if err != nil {
  67. return err
  68. }
  69. go func() {
  70. var (
  71. count = len(s.eventLog)
  72. enc = json.NewEncoder(f)
  73. )
  74. for e := range events {
  75. // if we have a specified retain count make sure the truncate the event
  76. // log if it grows past the specified number of events to keep.
  77. if retainCount > 0 {
  78. if count > retainCount {
  79. logrus.Debug("truncating event log")
  80. // close the log file
  81. if f != nil {
  82. f.Close()
  83. }
  84. slice := retainCount - 1
  85. l := len(s.eventLog)
  86. if slice >= l {
  87. slice = l
  88. }
  89. s.eventLock.Lock()
  90. s.eventLog = s.eventLog[len(s.eventLog)-slice:]
  91. s.eventLock.Unlock()
  92. if f, err = os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND|os.O_TRUNC, 0755); err != nil {
  93. logrus.WithField("error", err).Error("containerd: open event to journal")
  94. continue
  95. }
  96. enc = json.NewEncoder(f)
  97. count = 0
  98. for _, le := range s.eventLog {
  99. if err := enc.Encode(le); err != nil {
  100. logrus.WithField("error", err).Error("containerd: write event to journal")
  101. }
  102. }
  103. }
  104. }
  105. s.eventLock.Lock()
  106. s.eventLog = append(s.eventLog, e)
  107. s.eventLock.Unlock()
  108. count++
  109. if err := enc.Encode(e); err != nil {
  110. logrus.WithField("error", err).Error("containerd: write event to journal")
  111. }
  112. }
  113. }()
  114. return nil
  115. }
  116. func readEventLog(s *Supervisor) error {
  117. f, err := os.Open(filepath.Join(s.stateDir, "events.log"))
  118. if err != nil {
  119. if os.IsNotExist(err) {
  120. return nil
  121. }
  122. return err
  123. }
  124. defer f.Close()
  125. dec := json.NewDecoder(f)
  126. for {
  127. var e Event
  128. if err := dec.Decode(&e); err != nil {
  129. if err == io.EOF {
  130. break
  131. }
  132. return err
  133. }
  134. s.eventLog = append(s.eventLog, e)
  135. }
  136. return nil
  137. }
  138. type Supervisor struct {
  139. // stateDir is the directory on the system to store container runtime state information.
  140. stateDir string
  141. // name of the OCI compatible runtime used to execute containers
  142. runtime string
  143. runtimeArgs []string
  144. shim string
  145. containers map[string]*containerInfo
  146. startTasks chan *startTask
  147. // we need a lock around the subscribers map only because additions and deletions from
  148. // the map are via the API so we cannot really control the concurrency
  149. subscriberLock sync.RWMutex
  150. subscribers map[chan Event]struct{}
  151. machine Machine
  152. tasks chan Task
  153. monitor *Monitor
  154. eventLog []Event
  155. eventLock sync.Mutex
  156. timeout time.Duration
  157. }
  158. // Stop closes all startTasks and sends a SIGTERM to each container's pid1 then waits for they to
  159. // terminate. After it has handled all the SIGCHILD events it will close the signals chan
  160. // and exit. Stop is a non-blocking call and will return after the containers have been signaled
  161. func (s *Supervisor) Stop() {
  162. // Close the startTasks channel so that no new containers get started
  163. close(s.startTasks)
  164. }
  165. // Close closes any open files in the supervisor but expects that Stop has been
  166. // callsed so that no more containers are started.
  167. func (s *Supervisor) Close() error {
  168. return nil
  169. }
  170. type Event struct {
  171. ID string `json:"id"`
  172. Type string `json:"type"`
  173. Timestamp time.Time `json:"timestamp"`
  174. PID string `json:"pid,omitempty"`
  175. Status int `json:"status,omitempty"`
  176. }
  177. // Events returns an event channel that external consumers can use to receive updates
  178. // on container events
  179. func (s *Supervisor) Events(from time.Time) chan Event {
  180. s.subscriberLock.Lock()
  181. defer s.subscriberLock.Unlock()
  182. c := make(chan Event, defaultBufferSize)
  183. EventSubscriberCounter.Inc(1)
  184. s.subscribers[c] = struct{}{}
  185. if !from.IsZero() {
  186. // replay old event
  187. s.eventLock.Lock()
  188. past := s.eventLog[:]
  189. s.eventLock.Unlock()
  190. for _, e := range past {
  191. if e.Timestamp.After(from) {
  192. c <- e
  193. }
  194. }
  195. // Notify the client that from now on it's live events
  196. c <- Event{
  197. Type: StateLive,
  198. Timestamp: time.Now(),
  199. }
  200. }
  201. return c
  202. }
  203. // Unsubscribe removes the provided channel from receiving any more events
  204. func (s *Supervisor) Unsubscribe(sub chan Event) {
  205. s.subscriberLock.Lock()
  206. defer s.subscriberLock.Unlock()
  207. delete(s.subscribers, sub)
  208. close(sub)
  209. EventSubscriberCounter.Dec(1)
  210. }
  211. // notifySubscribers will send the provided event to the external subscribers
  212. // of the events channel
  213. func (s *Supervisor) notifySubscribers(e Event) {
  214. s.subscriberLock.RLock()
  215. defer s.subscriberLock.RUnlock()
  216. for sub := range s.subscribers {
  217. // do a non-blocking send for the channel
  218. select {
  219. case sub <- e:
  220. default:
  221. logrus.WithField("event", e.Type).Warn("containerd: event not sent to subscriber")
  222. }
  223. }
  224. }
  225. // Start is a non-blocking call that runs the supervisor for monitoring contianer processes and
  226. // executing new containers.
  227. //
  228. // This event loop is the only thing that is allowed to modify state of containers and processes
  229. // therefore it is save to do operations in the handlers that modify state of the system or
  230. // state of the Supervisor
  231. func (s *Supervisor) Start() error {
  232. logrus.WithFields(logrus.Fields{
  233. "stateDir": s.stateDir,
  234. "runtime": s.runtime,
  235. "runtimeArgs": s.runtimeArgs,
  236. "memory": s.machine.Memory,
  237. "cpus": s.machine.Cpus,
  238. }).Debug("containerd: supervisor running")
  239. go func() {
  240. for i := range s.tasks {
  241. s.handleTask(i)
  242. }
  243. }()
  244. return nil
  245. }
  246. // Machine returns the machine information for which the
  247. // supervisor is executing on.
  248. func (s *Supervisor) Machine() Machine {
  249. return s.machine
  250. }
  251. // SendTask sends the provided event the the supervisors main event loop
  252. func (s *Supervisor) SendTask(evt Task) {
  253. TasksCounter.Inc(1)
  254. s.tasks <- evt
  255. }
  256. func (s *Supervisor) exitHandler() {
  257. for p := range s.monitor.Exits() {
  258. e := &ExitTask{
  259. Process: p,
  260. }
  261. s.SendTask(e)
  262. }
  263. }
  264. func (s *Supervisor) oomHandler() {
  265. for id := range s.monitor.OOMs() {
  266. e := &OOMTask{
  267. ID: id,
  268. }
  269. s.SendTask(e)
  270. }
  271. }
  272. func (s *Supervisor) monitorProcess(p runtime.Process) error {
  273. return s.monitor.Monitor(p)
  274. }
  275. func (s *Supervisor) restore() error {
  276. dirs, err := ioutil.ReadDir(s.stateDir)
  277. if err != nil {
  278. return err
  279. }
  280. for _, d := range dirs {
  281. if !d.IsDir() {
  282. continue
  283. }
  284. id := d.Name()
  285. container, err := runtime.Load(s.stateDir, id, s.timeout)
  286. if err != nil {
  287. return err
  288. }
  289. processes, err := container.Processes()
  290. if err != nil {
  291. return err
  292. }
  293. ContainersCounter.Inc(1)
  294. s.containers[id] = &containerInfo{
  295. container: container,
  296. }
  297. if err := s.monitor.MonitorOOM(container); err != nil && err != runtime.ErrContainerExited {
  298. logrus.WithField("error", err).Error("containerd: notify OOM events")
  299. }
  300. logrus.WithField("id", id).Debug("containerd: container restored")
  301. var exitedProcesses []runtime.Process
  302. for _, p := range processes {
  303. if p.State() == runtime.Running {
  304. if err := s.monitorProcess(p); err != nil {
  305. return err
  306. }
  307. } else {
  308. exitedProcesses = append(exitedProcesses, p)
  309. }
  310. }
  311. if len(exitedProcesses) > 0 {
  312. // sort processes so that init is fired last because that is how the kernel sends the
  313. // exit events
  314. sortProcesses(exitedProcesses)
  315. for _, p := range exitedProcesses {
  316. e := &ExitTask{
  317. Process: p,
  318. }
  319. s.SendTask(e)
  320. }
  321. }
  322. }
  323. return nil
  324. }
  325. func (s *Supervisor) handleTask(i Task) {
  326. var err error
  327. switch t := i.(type) {
  328. case *AddProcessTask:
  329. err = s.addProcess(t)
  330. case *CreateCheckpointTask:
  331. err = s.createCheckpoint(t)
  332. case *DeleteCheckpointTask:
  333. err = s.deleteCheckpoint(t)
  334. case *StartTask:
  335. err = s.start(t)
  336. case *DeleteTask:
  337. err = s.delete(t)
  338. case *ExitTask:
  339. err = s.exit(t)
  340. case *ExecExitTask:
  341. err = s.execExit(t)
  342. case *GetContainersTask:
  343. err = s.getContainers(t)
  344. case *SignalTask:
  345. err = s.signal(t)
  346. case *StatsTask:
  347. err = s.stats(t)
  348. case *UpdateTask:
  349. err = s.updateContainer(t)
  350. case *UpdateProcessTask:
  351. err = s.updateProcess(t)
  352. case *OOMTask:
  353. err = s.oom(t)
  354. default:
  355. err = ErrUnknownTask
  356. }
  357. if err != errDeferredResponse {
  358. i.ErrorCh() <- err
  359. close(i.ErrorCh())
  360. }
  361. }