123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386 |
- package supervisor
- import (
- "encoding/json"
- "io"
- "io/ioutil"
- "os"
- "path/filepath"
- "sync"
- "time"
- "github.com/Sirupsen/logrus"
- "github.com/docker/containerd/runtime"
- )
- const (
- defaultBufferSize = 2048 // size of queue in eventloop
- )
- // New returns an initialized Process supervisor.
- func New(stateDir string, runtimeName, shimName string, runtimeArgs []string, timeout time.Duration, retainCount int) (*Supervisor, error) {
- startTasks := make(chan *startTask, 10)
- if err := os.MkdirAll(stateDir, 0755); err != nil {
- return nil, err
- }
- machine, err := CollectMachineInformation()
- if err != nil {
- return nil, err
- }
- monitor, err := NewMonitor()
- if err != nil {
- return nil, err
- }
- s := &Supervisor{
- stateDir: stateDir,
- containers: make(map[string]*containerInfo),
- startTasks: startTasks,
- machine: machine,
- subscribers: make(map[chan Event]struct{}),
- tasks: make(chan Task, defaultBufferSize),
- monitor: monitor,
- runtime: runtimeName,
- runtimeArgs: runtimeArgs,
- shim: shimName,
- timeout: timeout,
- }
- if err := setupEventLog(s, retainCount); err != nil {
- return nil, err
- }
- go s.exitHandler()
- go s.oomHandler()
- if err := s.restore(); err != nil {
- return nil, err
- }
- return s, nil
- }
- type containerInfo struct {
- container runtime.Container
- }
- func setupEventLog(s *Supervisor, retainCount int) error {
- if err := readEventLog(s); err != nil {
- return err
- }
- logrus.WithField("count", len(s.eventLog)).Debug("containerd: read past events")
- events := s.Events(time.Time{})
- return eventLogger(s, filepath.Join(s.stateDir, "events.log"), events, retainCount)
- }
- func eventLogger(s *Supervisor, path string, events chan Event, retainCount int) error {
- f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND|os.O_TRUNC, 0755)
- if err != nil {
- return err
- }
- go func() {
- var (
- count = len(s.eventLog)
- enc = json.NewEncoder(f)
- )
- for e := range events {
- // if we have a specified retain count make sure the truncate the event
- // log if it grows past the specified number of events to keep.
- if retainCount > 0 {
- if count > retainCount {
- logrus.Debug("truncating event log")
- // close the log file
- if f != nil {
- f.Close()
- }
- slice := retainCount - 1
- l := len(s.eventLog)
- if slice >= l {
- slice = l
- }
- s.eventLock.Lock()
- s.eventLog = s.eventLog[len(s.eventLog)-slice:]
- s.eventLock.Unlock()
- if f, err = os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND|os.O_TRUNC, 0755); err != nil {
- logrus.WithField("error", err).Error("containerd: open event to journal")
- continue
- }
- enc = json.NewEncoder(f)
- count = 0
- for _, le := range s.eventLog {
- if err := enc.Encode(le); err != nil {
- logrus.WithField("error", err).Error("containerd: write event to journal")
- }
- }
- }
- }
- s.eventLock.Lock()
- s.eventLog = append(s.eventLog, e)
- s.eventLock.Unlock()
- count++
- if err := enc.Encode(e); err != nil {
- logrus.WithField("error", err).Error("containerd: write event to journal")
- }
- }
- }()
- return nil
- }
- func readEventLog(s *Supervisor) error {
- f, err := os.Open(filepath.Join(s.stateDir, "events.log"))
- if err != nil {
- if os.IsNotExist(err) {
- return nil
- }
- return err
- }
- defer f.Close()
- dec := json.NewDecoder(f)
- for {
- var e Event
- if err := dec.Decode(&e); err != nil {
- if err == io.EOF {
- break
- }
- return err
- }
- s.eventLog = append(s.eventLog, e)
- }
- return nil
- }
- type Supervisor struct {
- // stateDir is the directory on the system to store container runtime state information.
- stateDir string
- // name of the OCI compatible runtime used to execute containers
- runtime string
- runtimeArgs []string
- shim string
- containers map[string]*containerInfo
- startTasks chan *startTask
- // we need a lock around the subscribers map only because additions and deletions from
- // the map are via the API so we cannot really control the concurrency
- subscriberLock sync.RWMutex
- subscribers map[chan Event]struct{}
- machine Machine
- tasks chan Task
- monitor *Monitor
- eventLog []Event
- eventLock sync.Mutex
- timeout time.Duration
- }
- // Stop closes all startTasks and sends a SIGTERM to each container's pid1 then waits for they to
- // terminate. After it has handled all the SIGCHILD events it will close the signals chan
- // and exit. Stop is a non-blocking call and will return after the containers have been signaled
- func (s *Supervisor) Stop() {
- // Close the startTasks channel so that no new containers get started
- close(s.startTasks)
- }
- // Close closes any open files in the supervisor but expects that Stop has been
- // callsed so that no more containers are started.
- func (s *Supervisor) Close() error {
- return nil
- }
- type Event struct {
- ID string `json:"id"`
- Type string `json:"type"`
- Timestamp time.Time `json:"timestamp"`
- PID string `json:"pid,omitempty"`
- Status int `json:"status,omitempty"`
- }
- // Events returns an event channel that external consumers can use to receive updates
- // on container events
- func (s *Supervisor) Events(from time.Time) chan Event {
- s.subscriberLock.Lock()
- defer s.subscriberLock.Unlock()
- c := make(chan Event, defaultBufferSize)
- EventSubscriberCounter.Inc(1)
- s.subscribers[c] = struct{}{}
- if !from.IsZero() {
- // replay old event
- s.eventLock.Lock()
- past := s.eventLog[:]
- s.eventLock.Unlock()
- for _, e := range past {
- if e.Timestamp.After(from) {
- c <- e
- }
- }
- // Notify the client that from now on it's live events
- c <- Event{
- Type: StateLive,
- Timestamp: time.Now(),
- }
- }
- return c
- }
- // Unsubscribe removes the provided channel from receiving any more events
- func (s *Supervisor) Unsubscribe(sub chan Event) {
- s.subscriberLock.Lock()
- defer s.subscriberLock.Unlock()
- delete(s.subscribers, sub)
- close(sub)
- EventSubscriberCounter.Dec(1)
- }
- // notifySubscribers will send the provided event to the external subscribers
- // of the events channel
- func (s *Supervisor) notifySubscribers(e Event) {
- s.subscriberLock.RLock()
- defer s.subscriberLock.RUnlock()
- for sub := range s.subscribers {
- // do a non-blocking send for the channel
- select {
- case sub <- e:
- default:
- logrus.WithField("event", e.Type).Warn("containerd: event not sent to subscriber")
- }
- }
- }
- // Start is a non-blocking call that runs the supervisor for monitoring contianer processes and
- // executing new containers.
- //
- // This event loop is the only thing that is allowed to modify state of containers and processes
- // therefore it is save to do operations in the handlers that modify state of the system or
- // state of the Supervisor
- func (s *Supervisor) Start() error {
- logrus.WithFields(logrus.Fields{
- "stateDir": s.stateDir,
- "runtime": s.runtime,
- "runtimeArgs": s.runtimeArgs,
- "memory": s.machine.Memory,
- "cpus": s.machine.Cpus,
- }).Debug("containerd: supervisor running")
- go func() {
- for i := range s.tasks {
- s.handleTask(i)
- }
- }()
- return nil
- }
- // Machine returns the machine information for which the
- // supervisor is executing on.
- func (s *Supervisor) Machine() Machine {
- return s.machine
- }
- // SendTask sends the provided event the the supervisors main event loop
- func (s *Supervisor) SendTask(evt Task) {
- TasksCounter.Inc(1)
- s.tasks <- evt
- }
- func (s *Supervisor) exitHandler() {
- for p := range s.monitor.Exits() {
- e := &ExitTask{
- Process: p,
- }
- s.SendTask(e)
- }
- }
- func (s *Supervisor) oomHandler() {
- for id := range s.monitor.OOMs() {
- e := &OOMTask{
- ID: id,
- }
- s.SendTask(e)
- }
- }
- func (s *Supervisor) monitorProcess(p runtime.Process) error {
- return s.monitor.Monitor(p)
- }
- func (s *Supervisor) restore() error {
- dirs, err := ioutil.ReadDir(s.stateDir)
- if err != nil {
- return err
- }
- for _, d := range dirs {
- if !d.IsDir() {
- continue
- }
- id := d.Name()
- container, err := runtime.Load(s.stateDir, id, s.timeout)
- if err != nil {
- return err
- }
- processes, err := container.Processes()
- if err != nil {
- return err
- }
- ContainersCounter.Inc(1)
- s.containers[id] = &containerInfo{
- container: container,
- }
- if err := s.monitor.MonitorOOM(container); err != nil && err != runtime.ErrContainerExited {
- logrus.WithField("error", err).Error("containerd: notify OOM events")
- }
- logrus.WithField("id", id).Debug("containerd: container restored")
- var exitedProcesses []runtime.Process
- for _, p := range processes {
- if p.State() == runtime.Running {
- if err := s.monitorProcess(p); err != nil {
- return err
- }
- } else {
- exitedProcesses = append(exitedProcesses, p)
- }
- }
- if len(exitedProcesses) > 0 {
- // sort processes so that init is fired last because that is how the kernel sends the
- // exit events
- sortProcesses(exitedProcesses)
- for _, p := range exitedProcesses {
- e := &ExitTask{
- Process: p,
- }
- s.SendTask(e)
- }
- }
- }
- return nil
- }
- func (s *Supervisor) handleTask(i Task) {
- var err error
- switch t := i.(type) {
- case *AddProcessTask:
- err = s.addProcess(t)
- case *CreateCheckpointTask:
- err = s.createCheckpoint(t)
- case *DeleteCheckpointTask:
- err = s.deleteCheckpoint(t)
- case *StartTask:
- err = s.start(t)
- case *DeleteTask:
- err = s.delete(t)
- case *ExitTask:
- err = s.exit(t)
- case *ExecExitTask:
- err = s.execExit(t)
- case *GetContainersTask:
- err = s.getContainers(t)
- case *SignalTask:
- err = s.signal(t)
- case *StatsTask:
- err = s.stats(t)
- case *UpdateTask:
- err = s.updateContainer(t)
- case *UpdateProcessTask:
- err = s.updateProcess(t)
- case *OOMTask:
- err = s.oom(t)
- default:
- err = ErrUnknownTask
- }
- if err != errDeferredResponse {
- i.ErrorCh() <- err
- close(i.ErrorCh())
- }
- }
|