server.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469
  1. package server
  2. import (
  3. "bufio"
  4. "errors"
  5. "fmt"
  6. "os"
  7. "strconv"
  8. "strings"
  9. "syscall"
  10. "time"
  11. "google.golang.org/grpc"
  12. "google.golang.org/grpc/codes"
  13. "github.com/docker/containerd"
  14. "github.com/docker/containerd/api/grpc/types"
  15. "github.com/docker/containerd/runtime"
  16. "github.com/docker/containerd/supervisor"
  17. "golang.org/x/net/context"
  18. )
  19. type apiServer struct {
  20. sv *supervisor.Supervisor
  21. }
  22. // NewServer returns grpc server instance
  23. func NewServer(sv *supervisor.Supervisor) types.APIServer {
  24. return &apiServer{
  25. sv: sv,
  26. }
  27. }
  28. func (s *apiServer) GetServerVersion(ctx context.Context, c *types.GetServerVersionRequest) (*types.GetServerVersionResponse, error) {
  29. return &types.GetServerVersionResponse{
  30. Major: containerd.VersionMajor,
  31. Minor: containerd.VersionMinor,
  32. Patch: containerd.VersionPatch,
  33. Revision: containerd.GitCommit,
  34. }, nil
  35. }
  36. func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContainerRequest) (*types.CreateContainerResponse, error) {
  37. if c.BundlePath == "" {
  38. return nil, errors.New("empty bundle path")
  39. }
  40. e := &supervisor.StartTask{}
  41. e.ID = c.Id
  42. e.BundlePath = c.BundlePath
  43. e.Stdin = c.Stdin
  44. e.Stdout = c.Stdout
  45. e.Stderr = c.Stderr
  46. e.Labels = c.Labels
  47. e.NoPivotRoot = c.NoPivotRoot
  48. e.Runtime = c.Runtime
  49. e.RuntimeArgs = c.RuntimeArgs
  50. e.StartResponse = make(chan supervisor.StartResponse, 1)
  51. if c.Checkpoint != "" {
  52. e.CheckpointDir = c.CheckpointDir
  53. e.Checkpoint = &runtime.Checkpoint{
  54. Name: c.Checkpoint,
  55. }
  56. }
  57. s.sv.SendTask(e)
  58. if err := <-e.ErrorCh(); err != nil {
  59. return nil, err
  60. }
  61. r := <-e.StartResponse
  62. apiC, err := createAPIContainer(r.Container, false)
  63. if err != nil {
  64. return nil, err
  65. }
  66. return &types.CreateContainerResponse{
  67. Container: apiC,
  68. }, nil
  69. }
  70. func (s *apiServer) CreateCheckpoint(ctx context.Context, r *types.CreateCheckpointRequest) (*types.CreateCheckpointResponse, error) {
  71. e := &supervisor.CreateCheckpointTask{}
  72. e.ID = r.Id
  73. e.CheckpointDir = r.CheckpointDir
  74. e.Checkpoint = &runtime.Checkpoint{
  75. Name: r.Checkpoint.Name,
  76. Exit: r.Checkpoint.Exit,
  77. Tcp: r.Checkpoint.Tcp,
  78. UnixSockets: r.Checkpoint.UnixSockets,
  79. Shell: r.Checkpoint.Shell,
  80. }
  81. s.sv.SendTask(e)
  82. if err := <-e.ErrorCh(); err != nil {
  83. return nil, err
  84. }
  85. return &types.CreateCheckpointResponse{}, nil
  86. }
  87. func (s *apiServer) DeleteCheckpoint(ctx context.Context, r *types.DeleteCheckpointRequest) (*types.DeleteCheckpointResponse, error) {
  88. if r.Name == "" {
  89. return nil, errors.New("checkpoint name cannot be empty")
  90. }
  91. e := &supervisor.DeleteCheckpointTask{}
  92. e.ID = r.Id
  93. e.CheckpointDir = r.CheckpointDir
  94. e.Checkpoint = &runtime.Checkpoint{
  95. Name: r.Name,
  96. }
  97. s.sv.SendTask(e)
  98. if err := <-e.ErrorCh(); err != nil {
  99. return nil, err
  100. }
  101. return &types.DeleteCheckpointResponse{}, nil
  102. }
  103. func (s *apiServer) ListCheckpoint(ctx context.Context, r *types.ListCheckpointRequest) (*types.ListCheckpointResponse, error) {
  104. e := &supervisor.GetContainersTask{}
  105. s.sv.SendTask(e)
  106. if err := <-e.ErrorCh(); err != nil {
  107. return nil, err
  108. }
  109. var container runtime.Container
  110. for _, c := range e.Containers {
  111. if c.ID() == r.Id {
  112. container = c
  113. break
  114. }
  115. }
  116. if container == nil {
  117. return nil, grpc.Errorf(codes.NotFound, "no such containers")
  118. }
  119. var out []*types.Checkpoint
  120. checkpoints, err := container.Checkpoints(r.CheckpointDir)
  121. if err != nil {
  122. return nil, err
  123. }
  124. for _, c := range checkpoints {
  125. out = append(out, &types.Checkpoint{
  126. Name: c.Name,
  127. Tcp: c.Tcp,
  128. Shell: c.Shell,
  129. UnixSockets: c.UnixSockets,
  130. // TODO: figure out timestamp
  131. //Timestamp: c.Timestamp,
  132. })
  133. }
  134. return &types.ListCheckpointResponse{Checkpoints: out}, nil
  135. }
  136. func (s *apiServer) Signal(ctx context.Context, r *types.SignalRequest) (*types.SignalResponse, error) {
  137. e := &supervisor.SignalTask{}
  138. e.ID = r.Id
  139. e.PID = r.Pid
  140. e.Signal = syscall.Signal(int(r.Signal))
  141. s.sv.SendTask(e)
  142. if err := <-e.ErrorCh(); err != nil {
  143. return nil, err
  144. }
  145. return &types.SignalResponse{}, nil
  146. }
  147. func (s *apiServer) State(ctx context.Context, r *types.StateRequest) (*types.StateResponse, error) {
  148. e := &supervisor.GetContainersTask{}
  149. e.ID = r.Id
  150. s.sv.SendTask(e)
  151. if err := <-e.ErrorCh(); err != nil {
  152. return nil, err
  153. }
  154. m := s.sv.Machine()
  155. state := &types.StateResponse{
  156. Machine: &types.Machine{
  157. Cpus: uint32(m.Cpus),
  158. Memory: uint64(m.Memory),
  159. },
  160. }
  161. for _, c := range e.Containers {
  162. apiC, err := createAPIContainer(c, true)
  163. if err != nil {
  164. return nil, err
  165. }
  166. state.Containers = append(state.Containers, apiC)
  167. }
  168. return state, nil
  169. }
  170. func createAPIContainer(c runtime.Container, getPids bool) (*types.Container, error) {
  171. processes, err := c.Processes()
  172. if err != nil {
  173. return nil, grpc.Errorf(codes.Internal, "get processes for container: "+err.Error())
  174. }
  175. var procs []*types.Process
  176. for _, p := range processes {
  177. oldProc := p.Spec()
  178. stdio := p.Stdio()
  179. proc := &types.Process{
  180. Pid: p.ID(),
  181. SystemPid: uint32(p.SystemPid()),
  182. Terminal: oldProc.Terminal,
  183. Args: oldProc.Args,
  184. Env: oldProc.Env,
  185. Cwd: oldProc.Cwd,
  186. Stdin: stdio.Stdin,
  187. Stdout: stdio.Stdout,
  188. Stderr: stdio.Stderr,
  189. }
  190. proc.User = &types.User{
  191. Uid: oldProc.User.UID,
  192. Gid: oldProc.User.GID,
  193. AdditionalGids: oldProc.User.AdditionalGids,
  194. }
  195. proc.Capabilities = oldProc.Capabilities
  196. proc.ApparmorProfile = oldProc.ApparmorProfile
  197. proc.SelinuxLabel = oldProc.SelinuxLabel
  198. proc.NoNewPrivileges = oldProc.NoNewPrivileges
  199. for _, rl := range oldProc.Rlimits {
  200. proc.Rlimits = append(proc.Rlimits, &types.Rlimit{
  201. Type: rl.Type,
  202. Soft: rl.Soft,
  203. Hard: rl.Hard,
  204. })
  205. }
  206. procs = append(procs, proc)
  207. }
  208. var pids []int
  209. state := c.State()
  210. if getPids && (state == runtime.Running || state == runtime.Paused) {
  211. if pids, err = c.Pids(); err != nil {
  212. return nil, grpc.Errorf(codes.Internal, "get all pids for container: "+err.Error())
  213. }
  214. }
  215. return &types.Container{
  216. Id: c.ID(),
  217. BundlePath: c.Path(),
  218. Processes: procs,
  219. Labels: c.Labels(),
  220. Status: string(state),
  221. Pids: toUint32(pids),
  222. Runtime: c.Runtime(),
  223. }, nil
  224. }
  225. func toUint32(its []int) []uint32 {
  226. o := []uint32{}
  227. for _, i := range its {
  228. o = append(o, uint32(i))
  229. }
  230. return o
  231. }
  232. func (s *apiServer) UpdateContainer(ctx context.Context, r *types.UpdateContainerRequest) (*types.UpdateContainerResponse, error) {
  233. e := &supervisor.UpdateTask{}
  234. e.ID = r.Id
  235. e.State = runtime.State(r.Status)
  236. if r.Resources != nil {
  237. rs := r.Resources
  238. e.Resources = &runtime.Resource{}
  239. if rs.CpuShares != 0 {
  240. e.Resources.CPUShares = int64(rs.CpuShares)
  241. }
  242. if rs.BlkioWeight != 0 {
  243. e.Resources.BlkioWeight = uint16(rs.BlkioWeight)
  244. }
  245. if rs.CpuPeriod != 0 {
  246. e.Resources.CPUPeriod = int64(rs.CpuPeriod)
  247. }
  248. if rs.CpuQuota != 0 {
  249. e.Resources.CPUQuota = int64(rs.CpuQuota)
  250. }
  251. if rs.CpusetCpus != "" {
  252. e.Resources.CpusetCpus = rs.CpusetCpus
  253. }
  254. if rs.CpusetMems != "" {
  255. e.Resources.CpusetMems = rs.CpusetMems
  256. }
  257. if rs.KernelMemoryLimit != 0 {
  258. e.Resources.KernelMemory = int64(rs.KernelMemoryLimit)
  259. }
  260. if rs.MemoryLimit != 0 {
  261. e.Resources.Memory = int64(rs.MemoryLimit)
  262. }
  263. if rs.MemoryReservation != 0 {
  264. e.Resources.MemoryReservation = int64(rs.MemoryReservation)
  265. }
  266. if rs.MemorySwap != 0 {
  267. e.Resources.MemorySwap = int64(rs.MemorySwap)
  268. }
  269. }
  270. s.sv.SendTask(e)
  271. if err := <-e.ErrorCh(); err != nil {
  272. return nil, err
  273. }
  274. return &types.UpdateContainerResponse{}, nil
  275. }
  276. func (s *apiServer) UpdateProcess(ctx context.Context, r *types.UpdateProcessRequest) (*types.UpdateProcessResponse, error) {
  277. e := &supervisor.UpdateProcessTask{}
  278. e.ID = r.Id
  279. e.PID = r.Pid
  280. e.Height = int(r.Height)
  281. e.Width = int(r.Width)
  282. e.CloseStdin = r.CloseStdin
  283. s.sv.SendTask(e)
  284. if err := <-e.ErrorCh(); err != nil {
  285. return nil, err
  286. }
  287. return &types.UpdateProcessResponse{}, nil
  288. }
  289. func (s *apiServer) Events(r *types.EventsRequest, stream types.API_EventsServer) error {
  290. t := time.Time{}
  291. if r.Timestamp != 0 {
  292. t = time.Unix(int64(r.Timestamp), 0)
  293. }
  294. events := s.sv.Events(t)
  295. defer s.sv.Unsubscribe(events)
  296. for e := range events {
  297. if err := stream.Send(&types.Event{
  298. Id: e.ID,
  299. Type: e.Type,
  300. Timestamp: uint64(e.Timestamp.Unix()),
  301. Pid: e.PID,
  302. Status: uint32(e.Status),
  303. }); err != nil {
  304. return err
  305. }
  306. }
  307. return nil
  308. }
  309. func convertToPb(st *runtime.Stat) *types.StatsResponse {
  310. pbSt := &types.StatsResponse{
  311. Timestamp: uint64(st.Timestamp.Unix()),
  312. CgroupStats: &types.CgroupStats{},
  313. }
  314. systemUsage, _ := getSystemCPUUsage()
  315. pbSt.CgroupStats.CpuStats = &types.CpuStats{
  316. CpuUsage: &types.CpuUsage{
  317. TotalUsage: st.Cpu.Usage.Total,
  318. PercpuUsage: st.Cpu.Usage.Percpu,
  319. UsageInKernelmode: st.Cpu.Usage.Kernel,
  320. UsageInUsermode: st.Cpu.Usage.User,
  321. },
  322. ThrottlingData: &types.ThrottlingData{
  323. Periods: st.Cpu.Throttling.Periods,
  324. ThrottledPeriods: st.Cpu.Throttling.ThrottledPeriods,
  325. ThrottledTime: st.Cpu.Throttling.ThrottledTime,
  326. },
  327. SystemUsage: systemUsage,
  328. }
  329. pbSt.CgroupStats.MemoryStats = &types.MemoryStats{
  330. Cache: st.Memory.Cache,
  331. Usage: &types.MemoryData{
  332. Usage: st.Memory.Usage.Usage,
  333. MaxUsage: st.Memory.Usage.Max,
  334. Failcnt: st.Memory.Usage.Failcnt,
  335. Limit: st.Memory.Usage.Limit,
  336. },
  337. SwapUsage: &types.MemoryData{
  338. Usage: st.Memory.Swap.Usage,
  339. MaxUsage: st.Memory.Swap.Max,
  340. Failcnt: st.Memory.Swap.Failcnt,
  341. Limit: st.Memory.Swap.Limit,
  342. },
  343. KernelUsage: &types.MemoryData{
  344. Usage: st.Memory.Kernel.Usage,
  345. MaxUsage: st.Memory.Kernel.Max,
  346. Failcnt: st.Memory.Kernel.Failcnt,
  347. Limit: st.Memory.Kernel.Limit,
  348. },
  349. Stats: st.Memory.Raw,
  350. }
  351. pbSt.CgroupStats.BlkioStats = &types.BlkioStats{
  352. IoServiceBytesRecursive: convertBlkioEntryToPb(st.Blkio.IoServiceBytesRecursive),
  353. IoServicedRecursive: convertBlkioEntryToPb(st.Blkio.IoServicedRecursive),
  354. IoQueuedRecursive: convertBlkioEntryToPb(st.Blkio.IoQueuedRecursive),
  355. IoServiceTimeRecursive: convertBlkioEntryToPb(st.Blkio.IoServiceTimeRecursive),
  356. IoWaitTimeRecursive: convertBlkioEntryToPb(st.Blkio.IoWaitTimeRecursive),
  357. IoMergedRecursive: convertBlkioEntryToPb(st.Blkio.IoMergedRecursive),
  358. IoTimeRecursive: convertBlkioEntryToPb(st.Blkio.IoTimeRecursive),
  359. SectorsRecursive: convertBlkioEntryToPb(st.Blkio.SectorsRecursive),
  360. }
  361. pbSt.CgroupStats.HugetlbStats = make(map[string]*types.HugetlbStats)
  362. for k, st := range st.Hugetlb {
  363. pbSt.CgroupStats.HugetlbStats[k] = &types.HugetlbStats{
  364. Usage: st.Usage,
  365. MaxUsage: st.Max,
  366. Failcnt: st.Failcnt,
  367. }
  368. }
  369. pbSt.CgroupStats.PidsStats = &types.PidsStats{
  370. Current: st.Pids.Current,
  371. Limit: st.Pids.Limit,
  372. }
  373. return pbSt
  374. }
  375. func convertBlkioEntryToPb(b []runtime.BlkioEntry) []*types.BlkioStatsEntry {
  376. var pbEs []*types.BlkioStatsEntry
  377. for _, e := range b {
  378. pbEs = append(pbEs, &types.BlkioStatsEntry{
  379. Major: e.Major,
  380. Minor: e.Minor,
  381. Op: e.Op,
  382. Value: e.Value,
  383. })
  384. }
  385. return pbEs
  386. }
  387. const nanoSecondsPerSecond = 1e9
  388. // getSystemCPUUsage returns the host system's cpu usage in
  389. // nanoseconds. An error is returned if the format of the underlying
  390. // file does not match.
  391. //
  392. // Uses /proc/stat defined by POSIX. Looks for the cpu
  393. // statistics line and then sums up the first seven fields
  394. // provided. See `man 5 proc` for details on specific field
  395. // information.
  396. func getSystemCPUUsage() (uint64, error) {
  397. var line string
  398. f, err := os.Open("/proc/stat")
  399. if err != nil {
  400. return 0, err
  401. }
  402. bufReader := bufio.NewReaderSize(nil, 128)
  403. defer func() {
  404. bufReader.Reset(nil)
  405. f.Close()
  406. }()
  407. bufReader.Reset(f)
  408. err = nil
  409. for err == nil {
  410. line, err = bufReader.ReadString('\n')
  411. if err != nil {
  412. break
  413. }
  414. parts := strings.Fields(line)
  415. switch parts[0] {
  416. case "cpu":
  417. if len(parts) < 8 {
  418. return 0, fmt.Errorf("bad format of cpu stats")
  419. }
  420. var totalClockTicks uint64
  421. for _, i := range parts[1:8] {
  422. v, err := strconv.ParseUint(i, 10, 64)
  423. if err != nil {
  424. return 0, fmt.Errorf("error parsing cpu stats")
  425. }
  426. totalClockTicks += v
  427. }
  428. return (totalClockTicks * nanoSecondsPerSecond) /
  429. clockTicksPerSecond, nil
  430. }
  431. }
  432. return 0, fmt.Errorf("bad stats format")
  433. }
  434. func (s *apiServer) Stats(ctx context.Context, r *types.StatsRequest) (*types.StatsResponse, error) {
  435. e := &supervisor.StatsTask{}
  436. e.ID = r.Id
  437. e.Stat = make(chan *runtime.Stat, 1)
  438. s.sv.SendTask(e)
  439. if err := <-e.ErrorCh(); err != nil {
  440. return nil, err
  441. }
  442. stats := <-e.Stat
  443. t := convertToPb(stats)
  444. return t, nil
  445. }