123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526 |
- // Copyright 2015 The Go Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style
- // license that can be found in the LICENSE file.
- // Package timeseries implements a time series structure for stats collection.
- package timeseries // import "golang.org/x/net/internal/timeseries"
- import (
- "fmt"
- "log"
- "time"
- )
- const (
- timeSeriesNumBuckets = 64
- minuteHourSeriesNumBuckets = 60
- )
- var timeSeriesResolutions = []time.Duration{
- 1 * time.Second,
- 10 * time.Second,
- 1 * time.Minute,
- 10 * time.Minute,
- 1 * time.Hour,
- 6 * time.Hour,
- 24 * time.Hour, // 1 day
- 7 * 24 * time.Hour, // 1 week
- 4 * 7 * 24 * time.Hour, // 4 weeks
- 16 * 7 * 24 * time.Hour, // 16 weeks
- }
- var minuteHourSeriesResolutions = []time.Duration{
- 1 * time.Second,
- 1 * time.Minute,
- }
- // An Observable is a kind of data that can be aggregated in a time series.
- type Observable interface {
- Multiply(ratio float64) // Multiplies the data in self by a given ratio
- Add(other Observable) // Adds the data from a different observation to self
- Clear() // Clears the observation so it can be reused.
- CopyFrom(other Observable) // Copies the contents of a given observation to self
- }
- // Float attaches the methods of Observable to a float64.
- type Float float64
- // NewFloat returns a Float.
- func NewFloat() Observable {
- f := Float(0)
- return &f
- }
- // String returns the float as a string.
- func (f *Float) String() string { return fmt.Sprintf("%g", f.Value()) }
- // Value returns the float's value.
- func (f *Float) Value() float64 { return float64(*f) }
- func (f *Float) Multiply(ratio float64) { *f *= Float(ratio) }
- func (f *Float) Add(other Observable) {
- o := other.(*Float)
- *f += *o
- }
- func (f *Float) Clear() { *f = 0 }
- func (f *Float) CopyFrom(other Observable) {
- o := other.(*Float)
- *f = *o
- }
- // A Clock tells the current time.
- type Clock interface {
- Time() time.Time
- }
- type defaultClock int
- var defaultClockInstance defaultClock
- func (defaultClock) Time() time.Time { return time.Now() }
- // Information kept per level. Each level consists of a circular list of
- // observations. The start of the level may be derived from end and the
- // len(buckets) * sizeInMillis.
- type tsLevel struct {
- oldest int // index to oldest bucketed Observable
- newest int // index to newest bucketed Observable
- end time.Time // end timestamp for this level
- size time.Duration // duration of the bucketed Observable
- buckets []Observable // collections of observations
- provider func() Observable // used for creating new Observable
- }
- func (l *tsLevel) Clear() {
- l.oldest = 0
- l.newest = len(l.buckets) - 1
- l.end = time.Time{}
- for i := range l.buckets {
- if l.buckets[i] != nil {
- l.buckets[i].Clear()
- l.buckets[i] = nil
- }
- }
- }
- func (l *tsLevel) InitLevel(size time.Duration, numBuckets int, f func() Observable) {
- l.size = size
- l.provider = f
- l.buckets = make([]Observable, numBuckets)
- }
- // Keeps a sequence of levels. Each level is responsible for storing data at
- // a given resolution. For example, the first level stores data at a one
- // minute resolution while the second level stores data at a one hour
- // resolution.
- // Each level is represented by a sequence of buckets. Each bucket spans an
- // interval equal to the resolution of the level. New observations are added
- // to the last bucket.
- type timeSeries struct {
- provider func() Observable // make more Observable
- numBuckets int // number of buckets in each level
- levels []*tsLevel // levels of bucketed Observable
- lastAdd time.Time // time of last Observable tracked
- total Observable // convenient aggregation of all Observable
- clock Clock // Clock for getting current time
- pending Observable // observations not yet bucketed
- pendingTime time.Time // what time are we keeping in pending
- dirty bool // if there are pending observations
- }
- // init initializes a level according to the supplied criteria.
- func (ts *timeSeries) init(resolutions []time.Duration, f func() Observable, numBuckets int, clock Clock) {
- ts.provider = f
- ts.numBuckets = numBuckets
- ts.clock = clock
- ts.levels = make([]*tsLevel, len(resolutions))
- for i := range resolutions {
- if i > 0 && resolutions[i-1] >= resolutions[i] {
- log.Print("timeseries: resolutions must be monotonically increasing")
- break
- }
- newLevel := new(tsLevel)
- newLevel.InitLevel(resolutions[i], ts.numBuckets, ts.provider)
- ts.levels[i] = newLevel
- }
- ts.Clear()
- }
- // Clear removes all observations from the time series.
- func (ts *timeSeries) Clear() {
- ts.lastAdd = time.Time{}
- ts.total = ts.resetObservation(ts.total)
- ts.pending = ts.resetObservation(ts.pending)
- ts.pendingTime = time.Time{}
- ts.dirty = false
- for i := range ts.levels {
- ts.levels[i].Clear()
- }
- }
- // Add records an observation at the current time.
- func (ts *timeSeries) Add(observation Observable) {
- ts.AddWithTime(observation, ts.clock.Time())
- }
- // AddWithTime records an observation at the specified time.
- func (ts *timeSeries) AddWithTime(observation Observable, t time.Time) {
- smallBucketDuration := ts.levels[0].size
- if t.After(ts.lastAdd) {
- ts.lastAdd = t
- }
- if t.After(ts.pendingTime) {
- ts.advance(t)
- ts.mergePendingUpdates()
- ts.pendingTime = ts.levels[0].end
- ts.pending.CopyFrom(observation)
- ts.dirty = true
- } else if t.After(ts.pendingTime.Add(-1 * smallBucketDuration)) {
- // The observation is close enough to go into the pending bucket.
- // This compensates for clock skewing and small scheduling delays
- // by letting the update stay in the fast path.
- ts.pending.Add(observation)
- ts.dirty = true
- } else {
- ts.mergeValue(observation, t)
- }
- }
- // mergeValue inserts the observation at the specified time in the past into all levels.
- func (ts *timeSeries) mergeValue(observation Observable, t time.Time) {
- for _, level := range ts.levels {
- index := (ts.numBuckets - 1) - int(level.end.Sub(t)/level.size)
- if 0 <= index && index < ts.numBuckets {
- bucketNumber := (level.oldest + index) % ts.numBuckets
- if level.buckets[bucketNumber] == nil {
- level.buckets[bucketNumber] = level.provider()
- }
- level.buckets[bucketNumber].Add(observation)
- }
- }
- ts.total.Add(observation)
- }
- // mergePendingUpdates applies the pending updates into all levels.
- func (ts *timeSeries) mergePendingUpdates() {
- if ts.dirty {
- ts.mergeValue(ts.pending, ts.pendingTime)
- ts.pending = ts.resetObservation(ts.pending)
- ts.dirty = false
- }
- }
- // advance cycles the buckets at each level until the latest bucket in
- // each level can hold the time specified.
- func (ts *timeSeries) advance(t time.Time) {
- if !t.After(ts.levels[0].end) {
- return
- }
- for i := 0; i < len(ts.levels); i++ {
- level := ts.levels[i]
- if !level.end.Before(t) {
- break
- }
- // If the time is sufficiently far, just clear the level and advance
- // directly.
- if !t.Before(level.end.Add(level.size * time.Duration(ts.numBuckets))) {
- for _, b := range level.buckets {
- ts.resetObservation(b)
- }
- level.end = time.Unix(0, (t.UnixNano()/level.size.Nanoseconds())*level.size.Nanoseconds())
- }
- for t.After(level.end) {
- level.end = level.end.Add(level.size)
- level.newest = level.oldest
- level.oldest = (level.oldest + 1) % ts.numBuckets
- ts.resetObservation(level.buckets[level.newest])
- }
- t = level.end
- }
- }
- // Latest returns the sum of the num latest buckets from the level.
- func (ts *timeSeries) Latest(level, num int) Observable {
- now := ts.clock.Time()
- if ts.levels[0].end.Before(now) {
- ts.advance(now)
- }
- ts.mergePendingUpdates()
- result := ts.provider()
- l := ts.levels[level]
- index := l.newest
- for i := 0; i < num; i++ {
- if l.buckets[index] != nil {
- result.Add(l.buckets[index])
- }
- if index == 0 {
- index = ts.numBuckets
- }
- index--
- }
- return result
- }
- // LatestBuckets returns a copy of the num latest buckets from level.
- func (ts *timeSeries) LatestBuckets(level, num int) []Observable {
- if level < 0 || level > len(ts.levels) {
- log.Print("timeseries: bad level argument: ", level)
- return nil
- }
- if num < 0 || num >= ts.numBuckets {
- log.Print("timeseries: bad num argument: ", num)
- return nil
- }
- results := make([]Observable, num)
- now := ts.clock.Time()
- if ts.levels[0].end.Before(now) {
- ts.advance(now)
- }
- ts.mergePendingUpdates()
- l := ts.levels[level]
- index := l.newest
- for i := 0; i < num; i++ {
- result := ts.provider()
- results[i] = result
- if l.buckets[index] != nil {
- result.CopyFrom(l.buckets[index])
- }
- if index == 0 {
- index = ts.numBuckets
- }
- index -= 1
- }
- return results
- }
- // ScaleBy updates observations by scaling by factor.
- func (ts *timeSeries) ScaleBy(factor float64) {
- for _, l := range ts.levels {
- for i := 0; i < ts.numBuckets; i++ {
- l.buckets[i].Multiply(factor)
- }
- }
- ts.total.Multiply(factor)
- ts.pending.Multiply(factor)
- }
- // Range returns the sum of observations added over the specified time range.
- // If start or finish times don't fall on bucket boundaries of the same
- // level, then return values are approximate answers.
- func (ts *timeSeries) Range(start, finish time.Time) Observable {
- return ts.ComputeRange(start, finish, 1)[0]
- }
- // Recent returns the sum of observations from the last delta.
- func (ts *timeSeries) Recent(delta time.Duration) Observable {
- now := ts.clock.Time()
- return ts.Range(now.Add(-delta), now)
- }
- // Total returns the total of all observations.
- func (ts *timeSeries) Total() Observable {
- ts.mergePendingUpdates()
- return ts.total
- }
- // ComputeRange computes a specified number of values into a slice using
- // the observations recorded over the specified time period. The return
- // values are approximate if the start or finish times don't fall on the
- // bucket boundaries at the same level or if the number of buckets spanning
- // the range is not an integral multiple of num.
- func (ts *timeSeries) ComputeRange(start, finish time.Time, num int) []Observable {
- if start.After(finish) {
- log.Printf("timeseries: start > finish, %v>%v", start, finish)
- return nil
- }
- if num < 0 {
- log.Printf("timeseries: num < 0, %v", num)
- return nil
- }
- results := make([]Observable, num)
- for _, l := range ts.levels {
- if !start.Before(l.end.Add(-l.size * time.Duration(ts.numBuckets))) {
- ts.extract(l, start, finish, num, results)
- return results
- }
- }
- // Failed to find a level that covers the desired range. So just
- // extract from the last level, even if it doesn't cover the entire
- // desired range.
- ts.extract(ts.levels[len(ts.levels)-1], start, finish, num, results)
- return results
- }
- // RecentList returns the specified number of values in slice over the most
- // recent time period of the specified range.
- func (ts *timeSeries) RecentList(delta time.Duration, num int) []Observable {
- if delta < 0 {
- return nil
- }
- now := ts.clock.Time()
- return ts.ComputeRange(now.Add(-delta), now, num)
- }
- // extract returns a slice of specified number of observations from a given
- // level over a given range.
- func (ts *timeSeries) extract(l *tsLevel, start, finish time.Time, num int, results []Observable) {
- ts.mergePendingUpdates()
- srcInterval := l.size
- dstInterval := finish.Sub(start) / time.Duration(num)
- dstStart := start
- srcStart := l.end.Add(-srcInterval * time.Duration(ts.numBuckets))
- srcIndex := 0
- // Where should scanning start?
- if dstStart.After(srcStart) {
- advance := dstStart.Sub(srcStart) / srcInterval
- srcIndex += int(advance)
- srcStart = srcStart.Add(advance * srcInterval)
- }
- // The i'th value is computed as show below.
- // interval = (finish/start)/num
- // i'th value = sum of observation in range
- // [ start + i * interval,
- // start + (i + 1) * interval )
- for i := 0; i < num; i++ {
- results[i] = ts.resetObservation(results[i])
- dstEnd := dstStart.Add(dstInterval)
- for srcIndex < ts.numBuckets && srcStart.Before(dstEnd) {
- srcEnd := srcStart.Add(srcInterval)
- if srcEnd.After(ts.lastAdd) {
- srcEnd = ts.lastAdd
- }
- if !srcEnd.Before(dstStart) {
- srcValue := l.buckets[(srcIndex+l.oldest)%ts.numBuckets]
- if !srcStart.Before(dstStart) && !srcEnd.After(dstEnd) {
- // dst completely contains src.
- if srcValue != nil {
- results[i].Add(srcValue)
- }
- } else {
- // dst partially overlaps src.
- overlapStart := maxTime(srcStart, dstStart)
- overlapEnd := minTime(srcEnd, dstEnd)
- base := srcEnd.Sub(srcStart)
- fraction := overlapEnd.Sub(overlapStart).Seconds() / base.Seconds()
- used := ts.provider()
- if srcValue != nil {
- used.CopyFrom(srcValue)
- }
- used.Multiply(fraction)
- results[i].Add(used)
- }
- if srcEnd.After(dstEnd) {
- break
- }
- }
- srcIndex++
- srcStart = srcStart.Add(srcInterval)
- }
- dstStart = dstStart.Add(dstInterval)
- }
- }
- // resetObservation clears the content so the struct may be reused.
- func (ts *timeSeries) resetObservation(observation Observable) Observable {
- if observation == nil {
- observation = ts.provider()
- } else {
- observation.Clear()
- }
- return observation
- }
- // TimeSeries tracks data at granularities from 1 second to 16 weeks.
- type TimeSeries struct {
- timeSeries
- }
- // NewTimeSeries creates a new TimeSeries using the function provided for creating new Observable.
- func NewTimeSeries(f func() Observable) *TimeSeries {
- return NewTimeSeriesWithClock(f, defaultClockInstance)
- }
- // NewTimeSeriesWithClock creates a new TimeSeries using the function provided for creating new Observable and the clock for
- // assigning timestamps.
- func NewTimeSeriesWithClock(f func() Observable, clock Clock) *TimeSeries {
- ts := new(TimeSeries)
- ts.timeSeries.init(timeSeriesResolutions, f, timeSeriesNumBuckets, clock)
- return ts
- }
- // MinuteHourSeries tracks data at granularities of 1 minute and 1 hour.
- type MinuteHourSeries struct {
- timeSeries
- }
- // NewMinuteHourSeries creates a new MinuteHourSeries using the function provided for creating new Observable.
- func NewMinuteHourSeries(f func() Observable) *MinuteHourSeries {
- return NewMinuteHourSeriesWithClock(f, defaultClockInstance)
- }
- // NewMinuteHourSeriesWithClock creates a new MinuteHourSeries using the function provided for creating new Observable and the clock for
- // assigning timestamps.
- func NewMinuteHourSeriesWithClock(f func() Observable, clock Clock) *MinuteHourSeries {
- ts := new(MinuteHourSeries)
- ts.timeSeries.init(minuteHourSeriesResolutions, f,
- minuteHourSeriesNumBuckets, clock)
- return ts
- }
- func (ts *MinuteHourSeries) Minute() Observable {
- return ts.timeSeries.Latest(0, 60)
- }
- func (ts *MinuteHourSeries) Hour() Observable {
- return ts.timeSeries.Latest(1, 60)
- }
- func minTime(a, b time.Time) time.Time {
- if a.Before(b) {
- return a
- }
- return b
- }
- func maxTime(a, b time.Time) time.Time {
- if a.After(b) {
- return a
- }
- return b
- }
|