gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

timeseries.go (14949B)


      1 // Copyright 2015 The Go Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style
      3 // license that can be found in the LICENSE file.
      4 
      5 // Package timeseries implements a time series structure for stats collection.
      6 package timeseries // import "golang.org/x/net/internal/timeseries"
      7 
      8 import (
      9 	"fmt"
     10 	"log"
     11 	"time"
     12 )
     13 
     14 const (
     15 	timeSeriesNumBuckets       = 64
     16 	minuteHourSeriesNumBuckets = 60
     17 )
     18 
     19 var timeSeriesResolutions = []time.Duration{
     20 	1 * time.Second,
     21 	10 * time.Second,
     22 	1 * time.Minute,
     23 	10 * time.Minute,
     24 	1 * time.Hour,
     25 	6 * time.Hour,
     26 	24 * time.Hour,          // 1 day
     27 	7 * 24 * time.Hour,      // 1 week
     28 	4 * 7 * 24 * time.Hour,  // 4 weeks
     29 	16 * 7 * 24 * time.Hour, // 16 weeks
     30 }
     31 
     32 var minuteHourSeriesResolutions = []time.Duration{
     33 	1 * time.Second,
     34 	1 * time.Minute,
     35 }
     36 
     37 // An Observable is a kind of data that can be aggregated in a time series.
     38 type Observable interface {
     39 	Multiply(ratio float64)    // Multiplies the data in self by a given ratio
     40 	Add(other Observable)      // Adds the data from a different observation to self
     41 	Clear()                    // Clears the observation so it can be reused.
     42 	CopyFrom(other Observable) // Copies the contents of a given observation to self
     43 }
     44 
     45 // Float attaches the methods of Observable to a float64.
     46 type Float float64
     47 
     48 // NewFloat returns a Float.
     49 func NewFloat() Observable {
     50 	f := Float(0)
     51 	return &f
     52 }
     53 
     54 // String returns the float as a string.
     55 func (f *Float) String() string { return fmt.Sprintf("%g", f.Value()) }
     56 
     57 // Value returns the float's value.
     58 func (f *Float) Value() float64 { return float64(*f) }
     59 
     60 func (f *Float) Multiply(ratio float64) { *f *= Float(ratio) }
     61 
     62 func (f *Float) Add(other Observable) {
     63 	o := other.(*Float)
     64 	*f += *o
     65 }
     66 
     67 func (f *Float) Clear() { *f = 0 }
     68 
     69 func (f *Float) CopyFrom(other Observable) {
     70 	o := other.(*Float)
     71 	*f = *o
     72 }
     73 
     74 // A Clock tells the current time.
     75 type Clock interface {
     76 	Time() time.Time
     77 }
     78 
     79 type defaultClock int
     80 
     81 var defaultClockInstance defaultClock
     82 
     83 func (defaultClock) Time() time.Time { return time.Now() }
     84 
     85 // Information kept per level. Each level consists of a circular list of
     86 // observations. The start of the level may be derived from end and the
     87 // len(buckets) * sizeInMillis.
     88 type tsLevel struct {
     89 	oldest   int               // index to oldest bucketed Observable
     90 	newest   int               // index to newest bucketed Observable
     91 	end      time.Time         // end timestamp for this level
     92 	size     time.Duration     // duration of the bucketed Observable
     93 	buckets  []Observable      // collections of observations
     94 	provider func() Observable // used for creating new Observable
     95 }
     96 
     97 func (l *tsLevel) Clear() {
     98 	l.oldest = 0
     99 	l.newest = len(l.buckets) - 1
    100 	l.end = time.Time{}
    101 	for i := range l.buckets {
    102 		if l.buckets[i] != nil {
    103 			l.buckets[i].Clear()
    104 			l.buckets[i] = nil
    105 		}
    106 	}
    107 }
    108 
    109 func (l *tsLevel) InitLevel(size time.Duration, numBuckets int, f func() Observable) {
    110 	l.size = size
    111 	l.provider = f
    112 	l.buckets = make([]Observable, numBuckets)
    113 }
    114 
    115 // Keeps a sequence of levels. Each level is responsible for storing data at
    116 // a given resolution. For example, the first level stores data at a one
    117 // minute resolution while the second level stores data at a one hour
    118 // resolution.
    119 
    120 // Each level is represented by a sequence of buckets. Each bucket spans an
    121 // interval equal to the resolution of the level. New observations are added
    122 // to the last bucket.
    123 type timeSeries struct {
    124 	provider    func() Observable // make more Observable
    125 	numBuckets  int               // number of buckets in each level
    126 	levels      []*tsLevel        // levels of bucketed Observable
    127 	lastAdd     time.Time         // time of last Observable tracked
    128 	total       Observable        // convenient aggregation of all Observable
    129 	clock       Clock             // Clock for getting current time
    130 	pending     Observable        // observations not yet bucketed
    131 	pendingTime time.Time         // what time are we keeping in pending
    132 	dirty       bool              // if there are pending observations
    133 }
    134 
    135 // init initializes a level according to the supplied criteria.
    136 func (ts *timeSeries) init(resolutions []time.Duration, f func() Observable, numBuckets int, clock Clock) {
    137 	ts.provider = f
    138 	ts.numBuckets = numBuckets
    139 	ts.clock = clock
    140 	ts.levels = make([]*tsLevel, len(resolutions))
    141 
    142 	for i := range resolutions {
    143 		if i > 0 && resolutions[i-1] >= resolutions[i] {
    144 			log.Print("timeseries: resolutions must be monotonically increasing")
    145 			break
    146 		}
    147 		newLevel := new(tsLevel)
    148 		newLevel.InitLevel(resolutions[i], ts.numBuckets, ts.provider)
    149 		ts.levels[i] = newLevel
    150 	}
    151 
    152 	ts.Clear()
    153 }
    154 
    155 // Clear removes all observations from the time series.
    156 func (ts *timeSeries) Clear() {
    157 	ts.lastAdd = time.Time{}
    158 	ts.total = ts.resetObservation(ts.total)
    159 	ts.pending = ts.resetObservation(ts.pending)
    160 	ts.pendingTime = time.Time{}
    161 	ts.dirty = false
    162 
    163 	for i := range ts.levels {
    164 		ts.levels[i].Clear()
    165 	}
    166 }
    167 
    168 // Add records an observation at the current time.
    169 func (ts *timeSeries) Add(observation Observable) {
    170 	ts.AddWithTime(observation, ts.clock.Time())
    171 }
    172 
    173 // AddWithTime records an observation at the specified time.
    174 func (ts *timeSeries) AddWithTime(observation Observable, t time.Time) {
    175 
    176 	smallBucketDuration := ts.levels[0].size
    177 
    178 	if t.After(ts.lastAdd) {
    179 		ts.lastAdd = t
    180 	}
    181 
    182 	if t.After(ts.pendingTime) {
    183 		ts.advance(t)
    184 		ts.mergePendingUpdates()
    185 		ts.pendingTime = ts.levels[0].end
    186 		ts.pending.CopyFrom(observation)
    187 		ts.dirty = true
    188 	} else if t.After(ts.pendingTime.Add(-1 * smallBucketDuration)) {
    189 		// The observation is close enough to go into the pending bucket.
    190 		// This compensates for clock skewing and small scheduling delays
    191 		// by letting the update stay in the fast path.
    192 		ts.pending.Add(observation)
    193 		ts.dirty = true
    194 	} else {
    195 		ts.mergeValue(observation, t)
    196 	}
    197 }
    198 
    199 // mergeValue inserts the observation at the specified time in the past into all levels.
    200 func (ts *timeSeries) mergeValue(observation Observable, t time.Time) {
    201 	for _, level := range ts.levels {
    202 		index := (ts.numBuckets - 1) - int(level.end.Sub(t)/level.size)
    203 		if 0 <= index && index < ts.numBuckets {
    204 			bucketNumber := (level.oldest + index) % ts.numBuckets
    205 			if level.buckets[bucketNumber] == nil {
    206 				level.buckets[bucketNumber] = level.provider()
    207 			}
    208 			level.buckets[bucketNumber].Add(observation)
    209 		}
    210 	}
    211 	ts.total.Add(observation)
    212 }
    213 
    214 // mergePendingUpdates applies the pending updates into all levels.
    215 func (ts *timeSeries) mergePendingUpdates() {
    216 	if ts.dirty {
    217 		ts.mergeValue(ts.pending, ts.pendingTime)
    218 		ts.pending = ts.resetObservation(ts.pending)
    219 		ts.dirty = false
    220 	}
    221 }
    222 
    223 // advance cycles the buckets at each level until the latest bucket in
    224 // each level can hold the time specified.
    225 func (ts *timeSeries) advance(t time.Time) {
    226 	if !t.After(ts.levels[0].end) {
    227 		return
    228 	}
    229 	for i := 0; i < len(ts.levels); i++ {
    230 		level := ts.levels[i]
    231 		if !level.end.Before(t) {
    232 			break
    233 		}
    234 
    235 		// If the time is sufficiently far, just clear the level and advance
    236 		// directly.
    237 		if !t.Before(level.end.Add(level.size * time.Duration(ts.numBuckets))) {
    238 			for _, b := range level.buckets {
    239 				ts.resetObservation(b)
    240 			}
    241 			level.end = time.Unix(0, (t.UnixNano()/level.size.Nanoseconds())*level.size.Nanoseconds())
    242 		}
    243 
    244 		for t.After(level.end) {
    245 			level.end = level.end.Add(level.size)
    246 			level.newest = level.oldest
    247 			level.oldest = (level.oldest + 1) % ts.numBuckets
    248 			ts.resetObservation(level.buckets[level.newest])
    249 		}
    250 
    251 		t = level.end
    252 	}
    253 }
    254 
    255 // Latest returns the sum of the num latest buckets from the level.
    256 func (ts *timeSeries) Latest(level, num int) Observable {
    257 	now := ts.clock.Time()
    258 	if ts.levels[0].end.Before(now) {
    259 		ts.advance(now)
    260 	}
    261 
    262 	ts.mergePendingUpdates()
    263 
    264 	result := ts.provider()
    265 	l := ts.levels[level]
    266 	index := l.newest
    267 
    268 	for i := 0; i < num; i++ {
    269 		if l.buckets[index] != nil {
    270 			result.Add(l.buckets[index])
    271 		}
    272 		if index == 0 {
    273 			index = ts.numBuckets
    274 		}
    275 		index--
    276 	}
    277 
    278 	return result
    279 }
    280 
    281 // LatestBuckets returns a copy of the num latest buckets from level.
    282 func (ts *timeSeries) LatestBuckets(level, num int) []Observable {
    283 	if level < 0 || level > len(ts.levels) {
    284 		log.Print("timeseries: bad level argument: ", level)
    285 		return nil
    286 	}
    287 	if num < 0 || num >= ts.numBuckets {
    288 		log.Print("timeseries: bad num argument: ", num)
    289 		return nil
    290 	}
    291 
    292 	results := make([]Observable, num)
    293 	now := ts.clock.Time()
    294 	if ts.levels[0].end.Before(now) {
    295 		ts.advance(now)
    296 	}
    297 
    298 	ts.mergePendingUpdates()
    299 
    300 	l := ts.levels[level]
    301 	index := l.newest
    302 
    303 	for i := 0; i < num; i++ {
    304 		result := ts.provider()
    305 		results[i] = result
    306 		if l.buckets[index] != nil {
    307 			result.CopyFrom(l.buckets[index])
    308 		}
    309 
    310 		if index == 0 {
    311 			index = ts.numBuckets
    312 		}
    313 		index -= 1
    314 	}
    315 	return results
    316 }
    317 
    318 // ScaleBy updates observations by scaling by factor.
    319 func (ts *timeSeries) ScaleBy(factor float64) {
    320 	for _, l := range ts.levels {
    321 		for i := 0; i < ts.numBuckets; i++ {
    322 			l.buckets[i].Multiply(factor)
    323 		}
    324 	}
    325 
    326 	ts.total.Multiply(factor)
    327 	ts.pending.Multiply(factor)
    328 }
    329 
    330 // Range returns the sum of observations added over the specified time range.
    331 // If start or finish times don't fall on bucket boundaries of the same
    332 // level, then return values are approximate answers.
    333 func (ts *timeSeries) Range(start, finish time.Time) Observable {
    334 	return ts.ComputeRange(start, finish, 1)[0]
    335 }
    336 
    337 // Recent returns the sum of observations from the last delta.
    338 func (ts *timeSeries) Recent(delta time.Duration) Observable {
    339 	now := ts.clock.Time()
    340 	return ts.Range(now.Add(-delta), now)
    341 }
    342 
    343 // Total returns the total of all observations.
    344 func (ts *timeSeries) Total() Observable {
    345 	ts.mergePendingUpdates()
    346 	return ts.total
    347 }
    348 
    349 // ComputeRange computes a specified number of values into a slice using
    350 // the observations recorded over the specified time period. The return
    351 // values are approximate if the start or finish times don't fall on the
    352 // bucket boundaries at the same level or if the number of buckets spanning
    353 // the range is not an integral multiple of num.
    354 func (ts *timeSeries) ComputeRange(start, finish time.Time, num int) []Observable {
    355 	if start.After(finish) {
    356 		log.Printf("timeseries: start > finish, %v>%v", start, finish)
    357 		return nil
    358 	}
    359 
    360 	if num < 0 {
    361 		log.Printf("timeseries: num < 0, %v", num)
    362 		return nil
    363 	}
    364 
    365 	results := make([]Observable, num)
    366 
    367 	for _, l := range ts.levels {
    368 		if !start.Before(l.end.Add(-l.size * time.Duration(ts.numBuckets))) {
    369 			ts.extract(l, start, finish, num, results)
    370 			return results
    371 		}
    372 	}
    373 
    374 	// Failed to find a level that covers the desired range. So just
    375 	// extract from the last level, even if it doesn't cover the entire
    376 	// desired range.
    377 	ts.extract(ts.levels[len(ts.levels)-1], start, finish, num, results)
    378 
    379 	return results
    380 }
    381 
    382 // RecentList returns the specified number of values in slice over the most
    383 // recent time period of the specified range.
    384 func (ts *timeSeries) RecentList(delta time.Duration, num int) []Observable {
    385 	if delta < 0 {
    386 		return nil
    387 	}
    388 	now := ts.clock.Time()
    389 	return ts.ComputeRange(now.Add(-delta), now, num)
    390 }
    391 
    392 // extract returns a slice of specified number of observations from a given
    393 // level over a given range.
    394 func (ts *timeSeries) extract(l *tsLevel, start, finish time.Time, num int, results []Observable) {
    395 	ts.mergePendingUpdates()
    396 
    397 	srcInterval := l.size
    398 	dstInterval := finish.Sub(start) / time.Duration(num)
    399 	dstStart := start
    400 	srcStart := l.end.Add(-srcInterval * time.Duration(ts.numBuckets))
    401 
    402 	srcIndex := 0
    403 
    404 	// Where should scanning start?
    405 	if dstStart.After(srcStart) {
    406 		advance := int(dstStart.Sub(srcStart) / srcInterval)
    407 		srcIndex += advance
    408 		srcStart = srcStart.Add(time.Duration(advance) * srcInterval)
    409 	}
    410 
    411 	// The i'th value is computed as show below.
    412 	// interval = (finish/start)/num
    413 	// i'th value = sum of observation in range
    414 	//   [ start + i       * interval,
    415 	//     start + (i + 1) * interval )
    416 	for i := 0; i < num; i++ {
    417 		results[i] = ts.resetObservation(results[i])
    418 		dstEnd := dstStart.Add(dstInterval)
    419 		for srcIndex < ts.numBuckets && srcStart.Before(dstEnd) {
    420 			srcEnd := srcStart.Add(srcInterval)
    421 			if srcEnd.After(ts.lastAdd) {
    422 				srcEnd = ts.lastAdd
    423 			}
    424 
    425 			if !srcEnd.Before(dstStart) {
    426 				srcValue := l.buckets[(srcIndex+l.oldest)%ts.numBuckets]
    427 				if !srcStart.Before(dstStart) && !srcEnd.After(dstEnd) {
    428 					// dst completely contains src.
    429 					if srcValue != nil {
    430 						results[i].Add(srcValue)
    431 					}
    432 				} else {
    433 					// dst partially overlaps src.
    434 					overlapStart := maxTime(srcStart, dstStart)
    435 					overlapEnd := minTime(srcEnd, dstEnd)
    436 					base := srcEnd.Sub(srcStart)
    437 					fraction := overlapEnd.Sub(overlapStart).Seconds() / base.Seconds()
    438 
    439 					used := ts.provider()
    440 					if srcValue != nil {
    441 						used.CopyFrom(srcValue)
    442 					}
    443 					used.Multiply(fraction)
    444 					results[i].Add(used)
    445 				}
    446 
    447 				if srcEnd.After(dstEnd) {
    448 					break
    449 				}
    450 			}
    451 			srcIndex++
    452 			srcStart = srcStart.Add(srcInterval)
    453 		}
    454 		dstStart = dstStart.Add(dstInterval)
    455 	}
    456 }
    457 
    458 // resetObservation clears the content so the struct may be reused.
    459 func (ts *timeSeries) resetObservation(observation Observable) Observable {
    460 	if observation == nil {
    461 		observation = ts.provider()
    462 	} else {
    463 		observation.Clear()
    464 	}
    465 	return observation
    466 }
    467 
    468 // TimeSeries tracks data at granularities from 1 second to 16 weeks.
    469 type TimeSeries struct {
    470 	timeSeries
    471 }
    472 
    473 // NewTimeSeries creates a new TimeSeries using the function provided for creating new Observable.
    474 func NewTimeSeries(f func() Observable) *TimeSeries {
    475 	return NewTimeSeriesWithClock(f, defaultClockInstance)
    476 }
    477 
    478 // NewTimeSeriesWithClock creates a new TimeSeries using the function provided for creating new Observable and the clock for
    479 // assigning timestamps.
    480 func NewTimeSeriesWithClock(f func() Observable, clock Clock) *TimeSeries {
    481 	ts := new(TimeSeries)
    482 	ts.timeSeries.init(timeSeriesResolutions, f, timeSeriesNumBuckets, clock)
    483 	return ts
    484 }
    485 
    486 // MinuteHourSeries tracks data at granularities of 1 minute and 1 hour.
    487 type MinuteHourSeries struct {
    488 	timeSeries
    489 }
    490 
    491 // NewMinuteHourSeries creates a new MinuteHourSeries using the function provided for creating new Observable.
    492 func NewMinuteHourSeries(f func() Observable) *MinuteHourSeries {
    493 	return NewMinuteHourSeriesWithClock(f, defaultClockInstance)
    494 }
    495 
    496 // NewMinuteHourSeriesWithClock creates a new MinuteHourSeries using the function provided for creating new Observable and the clock for
    497 // assigning timestamps.
    498 func NewMinuteHourSeriesWithClock(f func() Observable, clock Clock) *MinuteHourSeries {
    499 	ts := new(MinuteHourSeries)
    500 	ts.timeSeries.init(minuteHourSeriesResolutions, f,
    501 		minuteHourSeriesNumBuckets, clock)
    502 	return ts
    503 }
    504 
    505 func (ts *MinuteHourSeries) Minute() Observable {
    506 	return ts.timeSeries.Latest(0, 60)
    507 }
    508 
    509 func (ts *MinuteHourSeries) Hour() Observable {
    510 	return ts.timeSeries.Latest(1, 60)
    511 }
    512 
    513 func minTime(a, b time.Time) time.Time {
    514 	if a.Before(b) {
    515 		return a
    516 	}
    517 	return b
    518 }
    519 
    520 func maxTime(a, b time.Time) time.Time {
    521 	if a.After(b) {
    522 		return a
    523 	}
    524 	return b
    525 }