subscription.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package dbus
  15. import (
  16. "errors"
  17. "time"
  18. "github.com/godbus/dbus"
  19. )
  20. const (
  21. cleanIgnoreInterval = int64(10 * time.Second)
  22. ignoreInterval = int64(30 * time.Millisecond)
  23. )
  24. // Subscribe sets up this connection to subscribe to all systemd dbus events.
  25. // This is required before calling SubscribeUnits. When the connection closes
  26. // systemd will automatically stop sending signals so there is no need to
  27. // explicitly call Unsubscribe().
  28. func (c *Conn) Subscribe() error {
  29. c.sigconn.BusObject().Call("org.freedesktop.DBus.AddMatch", 0,
  30. "type='signal',interface='org.freedesktop.systemd1.Manager',member='UnitNew'")
  31. c.sigconn.BusObject().Call("org.freedesktop.DBus.AddMatch", 0,
  32. "type='signal',interface='org.freedesktop.DBus.Properties',member='PropertiesChanged'")
  33. err := c.sigobj.Call("org.freedesktop.systemd1.Manager.Subscribe", 0).Store()
  34. if err != nil {
  35. return err
  36. }
  37. return nil
  38. }
  39. // Unsubscribe this connection from systemd dbus events.
  40. func (c *Conn) Unsubscribe() error {
  41. err := c.sigobj.Call("org.freedesktop.systemd1.Manager.Unsubscribe", 0).Store()
  42. if err != nil {
  43. return err
  44. }
  45. return nil
  46. }
  47. func (c *Conn) dispatch() {
  48. ch := make(chan *dbus.Signal, signalBuffer)
  49. c.sigconn.Signal(ch)
  50. go func() {
  51. for {
  52. signal, ok := <-ch
  53. if !ok {
  54. return
  55. }
  56. if signal.Name == "org.freedesktop.systemd1.Manager.JobRemoved" {
  57. c.jobComplete(signal)
  58. }
  59. if c.subscriber.updateCh == nil {
  60. continue
  61. }
  62. var unitPath dbus.ObjectPath
  63. switch signal.Name {
  64. case "org.freedesktop.systemd1.Manager.JobRemoved":
  65. unitName := signal.Body[2].(string)
  66. c.sysobj.Call("org.freedesktop.systemd1.Manager.GetUnit", 0, unitName).Store(&unitPath)
  67. case "org.freedesktop.systemd1.Manager.UnitNew":
  68. unitPath = signal.Body[1].(dbus.ObjectPath)
  69. case "org.freedesktop.DBus.Properties.PropertiesChanged":
  70. if signal.Body[0].(string) == "org.freedesktop.systemd1.Unit" {
  71. unitPath = signal.Path
  72. }
  73. }
  74. if unitPath == dbus.ObjectPath("") {
  75. continue
  76. }
  77. c.sendSubStateUpdate(unitPath)
  78. }
  79. }()
  80. }
  81. // Returns two unbuffered channels which will receive all changed units every
  82. // interval. Deleted units are sent as nil.
  83. func (c *Conn) SubscribeUnits(interval time.Duration) (<-chan map[string]*UnitStatus, <-chan error) {
  84. return c.SubscribeUnitsCustom(interval, 0, func(u1, u2 *UnitStatus) bool { return *u1 != *u2 }, nil)
  85. }
  86. // SubscribeUnitsCustom is like SubscribeUnits but lets you specify the buffer
  87. // size of the channels, the comparison function for detecting changes and a filter
  88. // function for cutting down on the noise that your channel receives.
  89. func (c *Conn) SubscribeUnitsCustom(interval time.Duration, buffer int, isChanged func(*UnitStatus, *UnitStatus) bool, filterUnit func(string) bool) (<-chan map[string]*UnitStatus, <-chan error) {
  90. old := make(map[string]*UnitStatus)
  91. statusChan := make(chan map[string]*UnitStatus, buffer)
  92. errChan := make(chan error, buffer)
  93. go func() {
  94. for {
  95. timerChan := time.After(interval)
  96. units, err := c.ListUnits()
  97. if err == nil {
  98. cur := make(map[string]*UnitStatus)
  99. for i := range units {
  100. if filterUnit != nil && filterUnit(units[i].Name) {
  101. continue
  102. }
  103. cur[units[i].Name] = &units[i]
  104. }
  105. // add all new or changed units
  106. changed := make(map[string]*UnitStatus)
  107. for n, u := range cur {
  108. if oldU, ok := old[n]; !ok || isChanged(oldU, u) {
  109. changed[n] = u
  110. }
  111. delete(old, n)
  112. }
  113. // add all deleted units
  114. for oldN := range old {
  115. changed[oldN] = nil
  116. }
  117. old = cur
  118. if len(changed) != 0 {
  119. statusChan <- changed
  120. }
  121. } else {
  122. errChan <- err
  123. }
  124. <-timerChan
  125. }
  126. }()
  127. return statusChan, errChan
  128. }
  129. type SubStateUpdate struct {
  130. UnitName string
  131. SubState string
  132. }
  133. // SetSubStateSubscriber writes to updateCh when any unit's substate changes.
  134. // Although this writes to updateCh on every state change, the reported state
  135. // may be more recent than the change that generated it (due to an unavoidable
  136. // race in the systemd dbus interface). That is, this method provides a good
  137. // way to keep a current view of all units' states, but is not guaranteed to
  138. // show every state transition they go through. Furthermore, state changes
  139. // will only be written to the channel with non-blocking writes. If updateCh
  140. // is full, it attempts to write an error to errCh; if errCh is full, the error
  141. // passes silently.
  142. func (c *Conn) SetSubStateSubscriber(updateCh chan<- *SubStateUpdate, errCh chan<- error) {
  143. c.subscriber.Lock()
  144. defer c.subscriber.Unlock()
  145. c.subscriber.updateCh = updateCh
  146. c.subscriber.errCh = errCh
  147. }
  148. func (c *Conn) sendSubStateUpdate(path dbus.ObjectPath) {
  149. c.subscriber.Lock()
  150. defer c.subscriber.Unlock()
  151. if c.shouldIgnore(path) {
  152. return
  153. }
  154. info, err := c.GetUnitProperties(string(path))
  155. if err != nil {
  156. select {
  157. case c.subscriber.errCh <- err:
  158. default:
  159. }
  160. }
  161. name := info["Id"].(string)
  162. substate := info["SubState"].(string)
  163. update := &SubStateUpdate{name, substate}
  164. select {
  165. case c.subscriber.updateCh <- update:
  166. default:
  167. select {
  168. case c.subscriber.errCh <- errors.New("update channel full!"):
  169. default:
  170. }
  171. }
  172. c.updateIgnore(path, info)
  173. }
  174. // The ignore functions work around a wart in the systemd dbus interface.
  175. // Requesting the properties of an unloaded unit will cause systemd to send a
  176. // pair of UnitNew/UnitRemoved signals. Because we need to get a unit's
  177. // properties on UnitNew (as that's the only indication of a new unit coming up
  178. // for the first time), we would enter an infinite loop if we did not attempt
  179. // to detect and ignore these spurious signals. The signal themselves are
  180. // indistinguishable from relevant ones, so we (somewhat hackishly) ignore an
  181. // unloaded unit's signals for a short time after requesting its properties.
  182. // This means that we will miss e.g. a transient unit being restarted
  183. // *immediately* upon failure and also a transient unit being started
  184. // immediately after requesting its status (with systemctl status, for example,
  185. // because this causes a UnitNew signal to be sent which then causes us to fetch
  186. // the properties).
  187. func (c *Conn) shouldIgnore(path dbus.ObjectPath) bool {
  188. t, ok := c.subscriber.ignore[path]
  189. return ok && t >= time.Now().UnixNano()
  190. }
  191. func (c *Conn) updateIgnore(path dbus.ObjectPath, info map[string]interface{}) {
  192. c.cleanIgnore()
  193. // unit is unloaded - it will trigger bad systemd dbus behavior
  194. if info["LoadState"].(string) == "not-found" {
  195. c.subscriber.ignore[path] = time.Now().UnixNano() + ignoreInterval
  196. }
  197. }
  198. // without this, ignore would grow unboundedly over time
  199. func (c *Conn) cleanIgnore() {
  200. now := time.Now().UnixNano()
  201. if c.subscriber.cleanIgnore < now {
  202. c.subscriber.cleanIgnore = now + cleanIgnoreInterval
  203. for p, t := range c.subscriber.ignore {
  204. if t < now {
  205. delete(c.subscriber.ignore, p)
  206. }
  207. }
  208. }
  209. }