123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469 |
- package server
- import (
- "bufio"
- "errors"
- "fmt"
- "os"
- "strconv"
- "strings"
- "syscall"
- "time"
- "google.golang.org/grpc"
- "google.golang.org/grpc/codes"
- "github.com/docker/containerd"
- "github.com/docker/containerd/api/grpc/types"
- "github.com/docker/containerd/runtime"
- "github.com/docker/containerd/supervisor"
- "golang.org/x/net/context"
- )
- type apiServer struct {
- sv *supervisor.Supervisor
- }
- // NewServer returns grpc server instance
- func NewServer(sv *supervisor.Supervisor) types.APIServer {
- return &apiServer{
- sv: sv,
- }
- }
- func (s *apiServer) GetServerVersion(ctx context.Context, c *types.GetServerVersionRequest) (*types.GetServerVersionResponse, error) {
- return &types.GetServerVersionResponse{
- Major: containerd.VersionMajor,
- Minor: containerd.VersionMinor,
- Patch: containerd.VersionPatch,
- Revision: containerd.GitCommit,
- }, nil
- }
- func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContainerRequest) (*types.CreateContainerResponse, error) {
- if c.BundlePath == "" {
- return nil, errors.New("empty bundle path")
- }
- e := &supervisor.StartTask{}
- e.ID = c.Id
- e.BundlePath = c.BundlePath
- e.Stdin = c.Stdin
- e.Stdout = c.Stdout
- e.Stderr = c.Stderr
- e.Labels = c.Labels
- e.NoPivotRoot = c.NoPivotRoot
- e.Runtime = c.Runtime
- e.RuntimeArgs = c.RuntimeArgs
- e.StartResponse = make(chan supervisor.StartResponse, 1)
- if c.Checkpoint != "" {
- e.CheckpointDir = c.CheckpointDir
- e.Checkpoint = &runtime.Checkpoint{
- Name: c.Checkpoint,
- }
- }
- s.sv.SendTask(e)
- if err := <-e.ErrorCh(); err != nil {
- return nil, err
- }
- r := <-e.StartResponse
- apiC, err := createAPIContainer(r.Container, false)
- if err != nil {
- return nil, err
- }
- return &types.CreateContainerResponse{
- Container: apiC,
- }, nil
- }
- func (s *apiServer) CreateCheckpoint(ctx context.Context, r *types.CreateCheckpointRequest) (*types.CreateCheckpointResponse, error) {
- e := &supervisor.CreateCheckpointTask{}
- e.ID = r.Id
- e.CheckpointDir = r.CheckpointDir
- e.Checkpoint = &runtime.Checkpoint{
- Name: r.Checkpoint.Name,
- Exit: r.Checkpoint.Exit,
- Tcp: r.Checkpoint.Tcp,
- UnixSockets: r.Checkpoint.UnixSockets,
- Shell: r.Checkpoint.Shell,
- }
- s.sv.SendTask(e)
- if err := <-e.ErrorCh(); err != nil {
- return nil, err
- }
- return &types.CreateCheckpointResponse{}, nil
- }
- func (s *apiServer) DeleteCheckpoint(ctx context.Context, r *types.DeleteCheckpointRequest) (*types.DeleteCheckpointResponse, error) {
- if r.Name == "" {
- return nil, errors.New("checkpoint name cannot be empty")
- }
- e := &supervisor.DeleteCheckpointTask{}
- e.ID = r.Id
- e.CheckpointDir = r.CheckpointDir
- e.Checkpoint = &runtime.Checkpoint{
- Name: r.Name,
- }
- s.sv.SendTask(e)
- if err := <-e.ErrorCh(); err != nil {
- return nil, err
- }
- return &types.DeleteCheckpointResponse{}, nil
- }
- func (s *apiServer) ListCheckpoint(ctx context.Context, r *types.ListCheckpointRequest) (*types.ListCheckpointResponse, error) {
- e := &supervisor.GetContainersTask{}
- s.sv.SendTask(e)
- if err := <-e.ErrorCh(); err != nil {
- return nil, err
- }
- var container runtime.Container
- for _, c := range e.Containers {
- if c.ID() == r.Id {
- container = c
- break
- }
- }
- if container == nil {
- return nil, grpc.Errorf(codes.NotFound, "no such containers")
- }
- var out []*types.Checkpoint
- checkpoints, err := container.Checkpoints(r.CheckpointDir)
- if err != nil {
- return nil, err
- }
- for _, c := range checkpoints {
- out = append(out, &types.Checkpoint{
- Name: c.Name,
- Tcp: c.Tcp,
- Shell: c.Shell,
- UnixSockets: c.UnixSockets,
- // TODO: figure out timestamp
- //Timestamp: c.Timestamp,
- })
- }
- return &types.ListCheckpointResponse{Checkpoints: out}, nil
- }
- func (s *apiServer) Signal(ctx context.Context, r *types.SignalRequest) (*types.SignalResponse, error) {
- e := &supervisor.SignalTask{}
- e.ID = r.Id
- e.PID = r.Pid
- e.Signal = syscall.Signal(int(r.Signal))
- s.sv.SendTask(e)
- if err := <-e.ErrorCh(); err != nil {
- return nil, err
- }
- return &types.SignalResponse{}, nil
- }
- func (s *apiServer) State(ctx context.Context, r *types.StateRequest) (*types.StateResponse, error) {
- e := &supervisor.GetContainersTask{}
- e.ID = r.Id
- s.sv.SendTask(e)
- if err := <-e.ErrorCh(); err != nil {
- return nil, err
- }
- m := s.sv.Machine()
- state := &types.StateResponse{
- Machine: &types.Machine{
- Cpus: uint32(m.Cpus),
- Memory: uint64(m.Memory),
- },
- }
- for _, c := range e.Containers {
- apiC, err := createAPIContainer(c, true)
- if err != nil {
- return nil, err
- }
- state.Containers = append(state.Containers, apiC)
- }
- return state, nil
- }
- func createAPIContainer(c runtime.Container, getPids bool) (*types.Container, error) {
- processes, err := c.Processes()
- if err != nil {
- return nil, grpc.Errorf(codes.Internal, "get processes for container: "+err.Error())
- }
- var procs []*types.Process
- for _, p := range processes {
- oldProc := p.Spec()
- stdio := p.Stdio()
- proc := &types.Process{
- Pid: p.ID(),
- SystemPid: uint32(p.SystemPid()),
- Terminal: oldProc.Terminal,
- Args: oldProc.Args,
- Env: oldProc.Env,
- Cwd: oldProc.Cwd,
- Stdin: stdio.Stdin,
- Stdout: stdio.Stdout,
- Stderr: stdio.Stderr,
- }
- proc.User = &types.User{
- Uid: oldProc.User.UID,
- Gid: oldProc.User.GID,
- AdditionalGids: oldProc.User.AdditionalGids,
- }
- proc.Capabilities = oldProc.Capabilities
- proc.ApparmorProfile = oldProc.ApparmorProfile
- proc.SelinuxLabel = oldProc.SelinuxLabel
- proc.NoNewPrivileges = oldProc.NoNewPrivileges
- for _, rl := range oldProc.Rlimits {
- proc.Rlimits = append(proc.Rlimits, &types.Rlimit{
- Type: rl.Type,
- Soft: rl.Soft,
- Hard: rl.Hard,
- })
- }
- procs = append(procs, proc)
- }
- var pids []int
- state := c.State()
- if getPids && (state == runtime.Running || state == runtime.Paused) {
- if pids, err = c.Pids(); err != nil {
- return nil, grpc.Errorf(codes.Internal, "get all pids for container: "+err.Error())
- }
- }
- return &types.Container{
- Id: c.ID(),
- BundlePath: c.Path(),
- Processes: procs,
- Labels: c.Labels(),
- Status: string(state),
- Pids: toUint32(pids),
- Runtime: c.Runtime(),
- }, nil
- }
- func toUint32(its []int) []uint32 {
- o := []uint32{}
- for _, i := range its {
- o = append(o, uint32(i))
- }
- return o
- }
- func (s *apiServer) UpdateContainer(ctx context.Context, r *types.UpdateContainerRequest) (*types.UpdateContainerResponse, error) {
- e := &supervisor.UpdateTask{}
- e.ID = r.Id
- e.State = runtime.State(r.Status)
- if r.Resources != nil {
- rs := r.Resources
- e.Resources = &runtime.Resource{}
- if rs.CpuShares != 0 {
- e.Resources.CPUShares = int64(rs.CpuShares)
- }
- if rs.BlkioWeight != 0 {
- e.Resources.BlkioWeight = uint16(rs.BlkioWeight)
- }
- if rs.CpuPeriod != 0 {
- e.Resources.CPUPeriod = int64(rs.CpuPeriod)
- }
- if rs.CpuQuota != 0 {
- e.Resources.CPUQuota = int64(rs.CpuQuota)
- }
- if rs.CpusetCpus != "" {
- e.Resources.CpusetCpus = rs.CpusetCpus
- }
- if rs.CpusetMems != "" {
- e.Resources.CpusetMems = rs.CpusetMems
- }
- if rs.KernelMemoryLimit != 0 {
- e.Resources.KernelMemory = int64(rs.KernelMemoryLimit)
- }
- if rs.MemoryLimit != 0 {
- e.Resources.Memory = int64(rs.MemoryLimit)
- }
- if rs.MemoryReservation != 0 {
- e.Resources.MemoryReservation = int64(rs.MemoryReservation)
- }
- if rs.MemorySwap != 0 {
- e.Resources.MemorySwap = int64(rs.MemorySwap)
- }
- }
- s.sv.SendTask(e)
- if err := <-e.ErrorCh(); err != nil {
- return nil, err
- }
- return &types.UpdateContainerResponse{}, nil
- }
- func (s *apiServer) UpdateProcess(ctx context.Context, r *types.UpdateProcessRequest) (*types.UpdateProcessResponse, error) {
- e := &supervisor.UpdateProcessTask{}
- e.ID = r.Id
- e.PID = r.Pid
- e.Height = int(r.Height)
- e.Width = int(r.Width)
- e.CloseStdin = r.CloseStdin
- s.sv.SendTask(e)
- if err := <-e.ErrorCh(); err != nil {
- return nil, err
- }
- return &types.UpdateProcessResponse{}, nil
- }
- func (s *apiServer) Events(r *types.EventsRequest, stream types.API_EventsServer) error {
- t := time.Time{}
- if r.Timestamp != 0 {
- t = time.Unix(int64(r.Timestamp), 0)
- }
- events := s.sv.Events(t)
- defer s.sv.Unsubscribe(events)
- for e := range events {
- if err := stream.Send(&types.Event{
- Id: e.ID,
- Type: e.Type,
- Timestamp: uint64(e.Timestamp.Unix()),
- Pid: e.PID,
- Status: uint32(e.Status),
- }); err != nil {
- return err
- }
- }
- return nil
- }
- func convertToPb(st *runtime.Stat) *types.StatsResponse {
- pbSt := &types.StatsResponse{
- Timestamp: uint64(st.Timestamp.Unix()),
- CgroupStats: &types.CgroupStats{},
- }
- systemUsage, _ := getSystemCPUUsage()
- pbSt.CgroupStats.CpuStats = &types.CpuStats{
- CpuUsage: &types.CpuUsage{
- TotalUsage: st.Cpu.Usage.Total,
- PercpuUsage: st.Cpu.Usage.Percpu,
- UsageInKernelmode: st.Cpu.Usage.Kernel,
- UsageInUsermode: st.Cpu.Usage.User,
- },
- ThrottlingData: &types.ThrottlingData{
- Periods: st.Cpu.Throttling.Periods,
- ThrottledPeriods: st.Cpu.Throttling.ThrottledPeriods,
- ThrottledTime: st.Cpu.Throttling.ThrottledTime,
- },
- SystemUsage: systemUsage,
- }
- pbSt.CgroupStats.MemoryStats = &types.MemoryStats{
- Cache: st.Memory.Cache,
- Usage: &types.MemoryData{
- Usage: st.Memory.Usage.Usage,
- MaxUsage: st.Memory.Usage.Max,
- Failcnt: st.Memory.Usage.Failcnt,
- Limit: st.Memory.Usage.Limit,
- },
- SwapUsage: &types.MemoryData{
- Usage: st.Memory.Swap.Usage,
- MaxUsage: st.Memory.Swap.Max,
- Failcnt: st.Memory.Swap.Failcnt,
- Limit: st.Memory.Swap.Limit,
- },
- KernelUsage: &types.MemoryData{
- Usage: st.Memory.Kernel.Usage,
- MaxUsage: st.Memory.Kernel.Max,
- Failcnt: st.Memory.Kernel.Failcnt,
- Limit: st.Memory.Kernel.Limit,
- },
- Stats: st.Memory.Raw,
- }
- pbSt.CgroupStats.BlkioStats = &types.BlkioStats{
- IoServiceBytesRecursive: convertBlkioEntryToPb(st.Blkio.IoServiceBytesRecursive),
- IoServicedRecursive: convertBlkioEntryToPb(st.Blkio.IoServicedRecursive),
- IoQueuedRecursive: convertBlkioEntryToPb(st.Blkio.IoQueuedRecursive),
- IoServiceTimeRecursive: convertBlkioEntryToPb(st.Blkio.IoServiceTimeRecursive),
- IoWaitTimeRecursive: convertBlkioEntryToPb(st.Blkio.IoWaitTimeRecursive),
- IoMergedRecursive: convertBlkioEntryToPb(st.Blkio.IoMergedRecursive),
- IoTimeRecursive: convertBlkioEntryToPb(st.Blkio.IoTimeRecursive),
- SectorsRecursive: convertBlkioEntryToPb(st.Blkio.SectorsRecursive),
- }
- pbSt.CgroupStats.HugetlbStats = make(map[string]*types.HugetlbStats)
- for k, st := range st.Hugetlb {
- pbSt.CgroupStats.HugetlbStats[k] = &types.HugetlbStats{
- Usage: st.Usage,
- MaxUsage: st.Max,
- Failcnt: st.Failcnt,
- }
- }
- pbSt.CgroupStats.PidsStats = &types.PidsStats{
- Current: st.Pids.Current,
- Limit: st.Pids.Limit,
- }
- return pbSt
- }
- func convertBlkioEntryToPb(b []runtime.BlkioEntry) []*types.BlkioStatsEntry {
- var pbEs []*types.BlkioStatsEntry
- for _, e := range b {
- pbEs = append(pbEs, &types.BlkioStatsEntry{
- Major: e.Major,
- Minor: e.Minor,
- Op: e.Op,
- Value: e.Value,
- })
- }
- return pbEs
- }
- const nanoSecondsPerSecond = 1e9
- // getSystemCPUUsage returns the host system's cpu usage in
- // nanoseconds. An error is returned if the format of the underlying
- // file does not match.
- //
- // Uses /proc/stat defined by POSIX. Looks for the cpu
- // statistics line and then sums up the first seven fields
- // provided. See `man 5 proc` for details on specific field
- // information.
- func getSystemCPUUsage() (uint64, error) {
- var line string
- f, err := os.Open("/proc/stat")
- if err != nil {
- return 0, err
- }
- bufReader := bufio.NewReaderSize(nil, 128)
- defer func() {
- bufReader.Reset(nil)
- f.Close()
- }()
- bufReader.Reset(f)
- err = nil
- for err == nil {
- line, err = bufReader.ReadString('\n')
- if err != nil {
- break
- }
- parts := strings.Fields(line)
- switch parts[0] {
- case "cpu":
- if len(parts) < 8 {
- return 0, fmt.Errorf("bad format of cpu stats")
- }
- var totalClockTicks uint64
- for _, i := range parts[1:8] {
- v, err := strconv.ParseUint(i, 10, 64)
- if err != nil {
- return 0, fmt.Errorf("error parsing cpu stats")
- }
- totalClockTicks += v
- }
- return (totalClockTicks * nanoSecondsPerSecond) /
- clockTicksPerSecond, nil
- }
- }
- return 0, fmt.Errorf("bad stats format")
- }
- func (s *apiServer) Stats(ctx context.Context, r *types.StatsRequest) (*types.StatsResponse, error) {
- e := &supervisor.StatsTask{}
- e.ID = r.Id
- e.Stat = make(chan *runtime.Stat, 1)
- s.sv.SendTask(e)
- if err := <-e.ErrorCh(); err != nil {
- return nil, err
- }
- stats := <-e.Stat
- t := convertToPb(stats)
- return t, nil
- }
|