timeseries.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526
  1. // Copyright 2015 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Package timeseries implements a time series structure for stats collection.
  5. package timeseries // import "golang.org/x/net/internal/timeseries"
  6. import (
  7. "fmt"
  8. "log"
  9. "time"
  10. )
  11. const (
  12. timeSeriesNumBuckets = 64
  13. minuteHourSeriesNumBuckets = 60
  14. )
  15. var timeSeriesResolutions = []time.Duration{
  16. 1 * time.Second,
  17. 10 * time.Second,
  18. 1 * time.Minute,
  19. 10 * time.Minute,
  20. 1 * time.Hour,
  21. 6 * time.Hour,
  22. 24 * time.Hour, // 1 day
  23. 7 * 24 * time.Hour, // 1 week
  24. 4 * 7 * 24 * time.Hour, // 4 weeks
  25. 16 * 7 * 24 * time.Hour, // 16 weeks
  26. }
  27. var minuteHourSeriesResolutions = []time.Duration{
  28. 1 * time.Second,
  29. 1 * time.Minute,
  30. }
  31. // An Observable is a kind of data that can be aggregated in a time series.
  32. type Observable interface {
  33. Multiply(ratio float64) // Multiplies the data in self by a given ratio
  34. Add(other Observable) // Adds the data from a different observation to self
  35. Clear() // Clears the observation so it can be reused.
  36. CopyFrom(other Observable) // Copies the contents of a given observation to self
  37. }
  38. // Float attaches the methods of Observable to a float64.
  39. type Float float64
  40. // NewFloat returns a Float.
  41. func NewFloat() Observable {
  42. f := Float(0)
  43. return &f
  44. }
  45. // String returns the float as a string.
  46. func (f *Float) String() string { return fmt.Sprintf("%g", f.Value()) }
  47. // Value returns the float's value.
  48. func (f *Float) Value() float64 { return float64(*f) }
  49. func (f *Float) Multiply(ratio float64) { *f *= Float(ratio) }
  50. func (f *Float) Add(other Observable) {
  51. o := other.(*Float)
  52. *f += *o
  53. }
  54. func (f *Float) Clear() { *f = 0 }
  55. func (f *Float) CopyFrom(other Observable) {
  56. o := other.(*Float)
  57. *f = *o
  58. }
  59. // A Clock tells the current time.
  60. type Clock interface {
  61. Time() time.Time
  62. }
  63. type defaultClock int
  64. var defaultClockInstance defaultClock
  65. func (defaultClock) Time() time.Time { return time.Now() }
  66. // Information kept per level. Each level consists of a circular list of
  67. // observations. The start of the level may be derived from end and the
  68. // len(buckets) * sizeInMillis.
  69. type tsLevel struct {
  70. oldest int // index to oldest bucketed Observable
  71. newest int // index to newest bucketed Observable
  72. end time.Time // end timestamp for this level
  73. size time.Duration // duration of the bucketed Observable
  74. buckets []Observable // collections of observations
  75. provider func() Observable // used for creating new Observable
  76. }
  77. func (l *tsLevel) Clear() {
  78. l.oldest = 0
  79. l.newest = len(l.buckets) - 1
  80. l.end = time.Time{}
  81. for i := range l.buckets {
  82. if l.buckets[i] != nil {
  83. l.buckets[i].Clear()
  84. l.buckets[i] = nil
  85. }
  86. }
  87. }
  88. func (l *tsLevel) InitLevel(size time.Duration, numBuckets int, f func() Observable) {
  89. l.size = size
  90. l.provider = f
  91. l.buckets = make([]Observable, numBuckets)
  92. }
  93. // Keeps a sequence of levels. Each level is responsible for storing data at
  94. // a given resolution. For example, the first level stores data at a one
  95. // minute resolution while the second level stores data at a one hour
  96. // resolution.
  97. // Each level is represented by a sequence of buckets. Each bucket spans an
  98. // interval equal to the resolution of the level. New observations are added
  99. // to the last bucket.
  100. type timeSeries struct {
  101. provider func() Observable // make more Observable
  102. numBuckets int // number of buckets in each level
  103. levels []*tsLevel // levels of bucketed Observable
  104. lastAdd time.Time // time of last Observable tracked
  105. total Observable // convenient aggregation of all Observable
  106. clock Clock // Clock for getting current time
  107. pending Observable // observations not yet bucketed
  108. pendingTime time.Time // what time are we keeping in pending
  109. dirty bool // if there are pending observations
  110. }
  111. // init initializes a level according to the supplied criteria.
  112. func (ts *timeSeries) init(resolutions []time.Duration, f func() Observable, numBuckets int, clock Clock) {
  113. ts.provider = f
  114. ts.numBuckets = numBuckets
  115. ts.clock = clock
  116. ts.levels = make([]*tsLevel, len(resolutions))
  117. for i := range resolutions {
  118. if i > 0 && resolutions[i-1] >= resolutions[i] {
  119. log.Print("timeseries: resolutions must be monotonically increasing")
  120. break
  121. }
  122. newLevel := new(tsLevel)
  123. newLevel.InitLevel(resolutions[i], ts.numBuckets, ts.provider)
  124. ts.levels[i] = newLevel
  125. }
  126. ts.Clear()
  127. }
  128. // Clear removes all observations from the time series.
  129. func (ts *timeSeries) Clear() {
  130. ts.lastAdd = time.Time{}
  131. ts.total = ts.resetObservation(ts.total)
  132. ts.pending = ts.resetObservation(ts.pending)
  133. ts.pendingTime = time.Time{}
  134. ts.dirty = false
  135. for i := range ts.levels {
  136. ts.levels[i].Clear()
  137. }
  138. }
  139. // Add records an observation at the current time.
  140. func (ts *timeSeries) Add(observation Observable) {
  141. ts.AddWithTime(observation, ts.clock.Time())
  142. }
  143. // AddWithTime records an observation at the specified time.
  144. func (ts *timeSeries) AddWithTime(observation Observable, t time.Time) {
  145. smallBucketDuration := ts.levels[0].size
  146. if t.After(ts.lastAdd) {
  147. ts.lastAdd = t
  148. }
  149. if t.After(ts.pendingTime) {
  150. ts.advance(t)
  151. ts.mergePendingUpdates()
  152. ts.pendingTime = ts.levels[0].end
  153. ts.pending.CopyFrom(observation)
  154. ts.dirty = true
  155. } else if t.After(ts.pendingTime.Add(-1 * smallBucketDuration)) {
  156. // The observation is close enough to go into the pending bucket.
  157. // This compensates for clock skewing and small scheduling delays
  158. // by letting the update stay in the fast path.
  159. ts.pending.Add(observation)
  160. ts.dirty = true
  161. } else {
  162. ts.mergeValue(observation, t)
  163. }
  164. }
  165. // mergeValue inserts the observation at the specified time in the past into all levels.
  166. func (ts *timeSeries) mergeValue(observation Observable, t time.Time) {
  167. for _, level := range ts.levels {
  168. index := (ts.numBuckets - 1) - int(level.end.Sub(t)/level.size)
  169. if 0 <= index && index < ts.numBuckets {
  170. bucketNumber := (level.oldest + index) % ts.numBuckets
  171. if level.buckets[bucketNumber] == nil {
  172. level.buckets[bucketNumber] = level.provider()
  173. }
  174. level.buckets[bucketNumber].Add(observation)
  175. }
  176. }
  177. ts.total.Add(observation)
  178. }
  179. // mergePendingUpdates applies the pending updates into all levels.
  180. func (ts *timeSeries) mergePendingUpdates() {
  181. if ts.dirty {
  182. ts.mergeValue(ts.pending, ts.pendingTime)
  183. ts.pending = ts.resetObservation(ts.pending)
  184. ts.dirty = false
  185. }
  186. }
  187. // advance cycles the buckets at each level until the latest bucket in
  188. // each level can hold the time specified.
  189. func (ts *timeSeries) advance(t time.Time) {
  190. if !t.After(ts.levels[0].end) {
  191. return
  192. }
  193. for i := 0; i < len(ts.levels); i++ {
  194. level := ts.levels[i]
  195. if !level.end.Before(t) {
  196. break
  197. }
  198. // If the time is sufficiently far, just clear the level and advance
  199. // directly.
  200. if !t.Before(level.end.Add(level.size * time.Duration(ts.numBuckets))) {
  201. for _, b := range level.buckets {
  202. ts.resetObservation(b)
  203. }
  204. level.end = time.Unix(0, (t.UnixNano()/level.size.Nanoseconds())*level.size.Nanoseconds())
  205. }
  206. for t.After(level.end) {
  207. level.end = level.end.Add(level.size)
  208. level.newest = level.oldest
  209. level.oldest = (level.oldest + 1) % ts.numBuckets
  210. ts.resetObservation(level.buckets[level.newest])
  211. }
  212. t = level.end
  213. }
  214. }
  215. // Latest returns the sum of the num latest buckets from the level.
  216. func (ts *timeSeries) Latest(level, num int) Observable {
  217. now := ts.clock.Time()
  218. if ts.levels[0].end.Before(now) {
  219. ts.advance(now)
  220. }
  221. ts.mergePendingUpdates()
  222. result := ts.provider()
  223. l := ts.levels[level]
  224. index := l.newest
  225. for i := 0; i < num; i++ {
  226. if l.buckets[index] != nil {
  227. result.Add(l.buckets[index])
  228. }
  229. if index == 0 {
  230. index = ts.numBuckets
  231. }
  232. index--
  233. }
  234. return result
  235. }
  236. // LatestBuckets returns a copy of the num latest buckets from level.
  237. func (ts *timeSeries) LatestBuckets(level, num int) []Observable {
  238. if level < 0 || level > len(ts.levels) {
  239. log.Print("timeseries: bad level argument: ", level)
  240. return nil
  241. }
  242. if num < 0 || num >= ts.numBuckets {
  243. log.Print("timeseries: bad num argument: ", num)
  244. return nil
  245. }
  246. results := make([]Observable, num)
  247. now := ts.clock.Time()
  248. if ts.levels[0].end.Before(now) {
  249. ts.advance(now)
  250. }
  251. ts.mergePendingUpdates()
  252. l := ts.levels[level]
  253. index := l.newest
  254. for i := 0; i < num; i++ {
  255. result := ts.provider()
  256. results[i] = result
  257. if l.buckets[index] != nil {
  258. result.CopyFrom(l.buckets[index])
  259. }
  260. if index == 0 {
  261. index = ts.numBuckets
  262. }
  263. index -= 1
  264. }
  265. return results
  266. }
  267. // ScaleBy updates observations by scaling by factor.
  268. func (ts *timeSeries) ScaleBy(factor float64) {
  269. for _, l := range ts.levels {
  270. for i := 0; i < ts.numBuckets; i++ {
  271. l.buckets[i].Multiply(factor)
  272. }
  273. }
  274. ts.total.Multiply(factor)
  275. ts.pending.Multiply(factor)
  276. }
  277. // Range returns the sum of observations added over the specified time range.
  278. // If start or finish times don't fall on bucket boundaries of the same
  279. // level, then return values are approximate answers.
  280. func (ts *timeSeries) Range(start, finish time.Time) Observable {
  281. return ts.ComputeRange(start, finish, 1)[0]
  282. }
  283. // Recent returns the sum of observations from the last delta.
  284. func (ts *timeSeries) Recent(delta time.Duration) Observable {
  285. now := ts.clock.Time()
  286. return ts.Range(now.Add(-delta), now)
  287. }
  288. // Total returns the total of all observations.
  289. func (ts *timeSeries) Total() Observable {
  290. ts.mergePendingUpdates()
  291. return ts.total
  292. }
  293. // ComputeRange computes a specified number of values into a slice using
  294. // the observations recorded over the specified time period. The return
  295. // values are approximate if the start or finish times don't fall on the
  296. // bucket boundaries at the same level or if the number of buckets spanning
  297. // the range is not an integral multiple of num.
  298. func (ts *timeSeries) ComputeRange(start, finish time.Time, num int) []Observable {
  299. if start.After(finish) {
  300. log.Printf("timeseries: start > finish, %v>%v", start, finish)
  301. return nil
  302. }
  303. if num < 0 {
  304. log.Printf("timeseries: num < 0, %v", num)
  305. return nil
  306. }
  307. results := make([]Observable, num)
  308. for _, l := range ts.levels {
  309. if !start.Before(l.end.Add(-l.size * time.Duration(ts.numBuckets))) {
  310. ts.extract(l, start, finish, num, results)
  311. return results
  312. }
  313. }
  314. // Failed to find a level that covers the desired range. So just
  315. // extract from the last level, even if it doesn't cover the entire
  316. // desired range.
  317. ts.extract(ts.levels[len(ts.levels)-1], start, finish, num, results)
  318. return results
  319. }
  320. // RecentList returns the specified number of values in slice over the most
  321. // recent time period of the specified range.
  322. func (ts *timeSeries) RecentList(delta time.Duration, num int) []Observable {
  323. if delta < 0 {
  324. return nil
  325. }
  326. now := ts.clock.Time()
  327. return ts.ComputeRange(now.Add(-delta), now, num)
  328. }
  329. // extract returns a slice of specified number of observations from a given
  330. // level over a given range.
  331. func (ts *timeSeries) extract(l *tsLevel, start, finish time.Time, num int, results []Observable) {
  332. ts.mergePendingUpdates()
  333. srcInterval := l.size
  334. dstInterval := finish.Sub(start) / time.Duration(num)
  335. dstStart := start
  336. srcStart := l.end.Add(-srcInterval * time.Duration(ts.numBuckets))
  337. srcIndex := 0
  338. // Where should scanning start?
  339. if dstStart.After(srcStart) {
  340. advance := dstStart.Sub(srcStart) / srcInterval
  341. srcIndex += int(advance)
  342. srcStart = srcStart.Add(advance * srcInterval)
  343. }
  344. // The i'th value is computed as show below.
  345. // interval = (finish/start)/num
  346. // i'th value = sum of observation in range
  347. // [ start + i * interval,
  348. // start + (i + 1) * interval )
  349. for i := 0; i < num; i++ {
  350. results[i] = ts.resetObservation(results[i])
  351. dstEnd := dstStart.Add(dstInterval)
  352. for srcIndex < ts.numBuckets && srcStart.Before(dstEnd) {
  353. srcEnd := srcStart.Add(srcInterval)
  354. if srcEnd.After(ts.lastAdd) {
  355. srcEnd = ts.lastAdd
  356. }
  357. if !srcEnd.Before(dstStart) {
  358. srcValue := l.buckets[(srcIndex+l.oldest)%ts.numBuckets]
  359. if !srcStart.Before(dstStart) && !srcEnd.After(dstEnd) {
  360. // dst completely contains src.
  361. if srcValue != nil {
  362. results[i].Add(srcValue)
  363. }
  364. } else {
  365. // dst partially overlaps src.
  366. overlapStart := maxTime(srcStart, dstStart)
  367. overlapEnd := minTime(srcEnd, dstEnd)
  368. base := srcEnd.Sub(srcStart)
  369. fraction := overlapEnd.Sub(overlapStart).Seconds() / base.Seconds()
  370. used := ts.provider()
  371. if srcValue != nil {
  372. used.CopyFrom(srcValue)
  373. }
  374. used.Multiply(fraction)
  375. results[i].Add(used)
  376. }
  377. if srcEnd.After(dstEnd) {
  378. break
  379. }
  380. }
  381. srcIndex++
  382. srcStart = srcStart.Add(srcInterval)
  383. }
  384. dstStart = dstStart.Add(dstInterval)
  385. }
  386. }
  387. // resetObservation clears the content so the struct may be reused.
  388. func (ts *timeSeries) resetObservation(observation Observable) Observable {
  389. if observation == nil {
  390. observation = ts.provider()
  391. } else {
  392. observation.Clear()
  393. }
  394. return observation
  395. }
  396. // TimeSeries tracks data at granularities from 1 second to 16 weeks.
  397. type TimeSeries struct {
  398. timeSeries
  399. }
  400. // NewTimeSeries creates a new TimeSeries using the function provided for creating new Observable.
  401. func NewTimeSeries(f func() Observable) *TimeSeries {
  402. return NewTimeSeriesWithClock(f, defaultClockInstance)
  403. }
  404. // NewTimeSeriesWithClock creates a new TimeSeries using the function provided for creating new Observable and the clock for
  405. // assigning timestamps.
  406. func NewTimeSeriesWithClock(f func() Observable, clock Clock) *TimeSeries {
  407. ts := new(TimeSeries)
  408. ts.timeSeries.init(timeSeriesResolutions, f, timeSeriesNumBuckets, clock)
  409. return ts
  410. }
  411. // MinuteHourSeries tracks data at granularities of 1 minute and 1 hour.
  412. type MinuteHourSeries struct {
  413. timeSeries
  414. }
  415. // NewMinuteHourSeries creates a new MinuteHourSeries using the function provided for creating new Observable.
  416. func NewMinuteHourSeries(f func() Observable) *MinuteHourSeries {
  417. return NewMinuteHourSeriesWithClock(f, defaultClockInstance)
  418. }
  419. // NewMinuteHourSeriesWithClock creates a new MinuteHourSeries using the function provided for creating new Observable and the clock for
  420. // assigning timestamps.
  421. func NewMinuteHourSeriesWithClock(f func() Observable, clock Clock) *MinuteHourSeries {
  422. ts := new(MinuteHourSeries)
  423. ts.timeSeries.init(minuteHourSeriesResolutions, f,
  424. minuteHourSeriesNumBuckets, clock)
  425. return ts
  426. }
  427. func (ts *MinuteHourSeries) Minute() Observable {
  428. return ts.timeSeries.Latest(0, 60)
  429. }
  430. func (ts *MinuteHourSeries) Hour() Observable {
  431. return ts.timeSeries.Latest(1, 60)
  432. }
  433. func minTime(a, b time.Time) time.Time {
  434. if a.Before(b) {
  435. return a
  436. }
  437. return b
  438. }
  439. func maxTime(a, b time.Time) time.Time {
  440. if a.After(b) {
  441. return a
  442. }
  443. return b
  444. }