mirror of
https://github.com/Oxalide/vsphere-influxdb-go.git
synced 2023-10-10 13:36:51 +02:00
1819 lines
49 KiB
Cheetah
1819 lines
49 KiB
Cheetah
package influxql
|
|
|
|
import (
|
|
"container/heap"
|
|
"encoding/binary"
|
|
"fmt"
|
|
"io"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/gogo/protobuf/proto"
|
|
internal "github.com/influxdata/influxdb/influxql/internal"
|
|
)
|
|
|
|
// DefaultStatsInterval is the default value for IteratorEncoder.StatsInterval.
|
|
const DefaultStatsInterval = 10 * time.Second
|
|
|
|
{{with $types := .}}{{range $k := $types}}
|
|
|
|
// {{$k.Name}}Iterator represents a stream of {{$k.name}} points.
|
|
type {{$k.Name}}Iterator interface {
|
|
Iterator
|
|
Next() (*{{$k.Name}}Point, error)
|
|
}
|
|
|
|
// new{{$k.Name}}Iterators converts a slice of Iterator to a slice of {{$k.Name}}Iterator.
|
|
// Drop and closes any iterator in itrs that is not a {{$k.Name}}Iterator and cannot
|
|
// be cast to a {{$k.Name}}Iterator.
|
|
func new{{$k.Name}}Iterators(itrs []Iterator) []{{$k.Name}}Iterator {
|
|
a := make([]{{$k.Name}}Iterator, 0, len(itrs))
|
|
for _, itr := range itrs {
|
|
switch itr := itr.(type) {
|
|
case {{$k.Name}}Iterator:
|
|
a = append(a, itr)
|
|
{{if eq .Name "Float"}}
|
|
case IntegerIterator:
|
|
a = append(a, &integerFloatCastIterator{input: itr})
|
|
{{end}}
|
|
default:
|
|
itr.Close()
|
|
}
|
|
}
|
|
return a
|
|
}
|
|
|
|
|
|
// buf{{$k.Name}}Iterator represents a buffered {{$k.Name}}Iterator.
|
|
type buf{{$k.Name}}Iterator struct {
|
|
itr {{$k.Name}}Iterator
|
|
buf *{{$k.Name}}Point
|
|
}
|
|
|
|
// newBuf{{$k.Name}}Iterator returns a buffered {{$k.Name}}Iterator.
|
|
func newBuf{{$k.Name}}Iterator(itr {{$k.Name}}Iterator) *buf{{$k.Name}}Iterator {
|
|
return &buf{{$k.Name}}Iterator{itr: itr}
|
|
}
|
|
|
|
// Stats returns statistics from the input iterator.
|
|
func (itr *buf{{$k.Name}}Iterator) Stats() IteratorStats { return itr.itr.Stats() }
|
|
|
|
// Close closes the underlying iterator.
|
|
func (itr *buf{{$k.Name}}Iterator) Close() error { return itr.itr.Close() }
|
|
|
|
// peek returns the next point without removing it from the iterator.
|
|
func (itr *buf{{$k.Name}}Iterator) peek() (*{{$k.Name}}Point, error) {
|
|
p, err := itr.Next()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
itr.unread(p)
|
|
return p, nil
|
|
}
|
|
|
|
// peekTime returns the time of the next point.
|
|
// Returns zero time if no more points available.
|
|
func (itr *buf{{$k.Name}}Iterator) peekTime() (int64, error) {
|
|
p, err := itr.peek()
|
|
if p == nil || err != nil {
|
|
return ZeroTime, err
|
|
}
|
|
return p.Time, nil
|
|
}
|
|
|
|
// Next returns the current buffer, if exists, or calls the underlying iterator.
|
|
func (itr *buf{{$k.Name}}Iterator) Next() (*{{$k.Name}}Point, error) {
|
|
buf := itr.buf
|
|
if buf != nil {
|
|
itr.buf = nil
|
|
return buf, nil
|
|
}
|
|
return itr.itr.Next()
|
|
}
|
|
|
|
// NextInWindow returns the next value if it is between [startTime, endTime).
|
|
// If the next value is outside the range then it is moved to the buffer.
|
|
func (itr *buf{{$k.Name}}Iterator) NextInWindow(startTime, endTime int64) (*{{$k.Name}}Point, error) {
|
|
v, err := itr.Next()
|
|
if v == nil || err != nil {
|
|
return nil, err
|
|
} else if t := v.Time; t >= endTime || t < startTime {
|
|
itr.unread(v)
|
|
return nil, nil
|
|
}
|
|
return v, nil
|
|
}
|
|
|
|
// unread sets v to the buffer. It is read on the next call to Next().
|
|
func (itr *buf{{$k.Name}}Iterator) unread(v *{{$k.Name}}Point) { itr.buf = v }
|
|
|
|
// {{$k.name}}MergeIterator represents an iterator that combines multiple {{$k.name}} iterators.
|
|
type {{$k.name}}MergeIterator struct {
|
|
inputs []{{$k.Name}}Iterator
|
|
heap *{{$k.name}}MergeHeap
|
|
init bool
|
|
|
|
// Current iterator and window.
|
|
curr *{{$k.name}}MergeHeapItem
|
|
window struct {
|
|
name string
|
|
tags string
|
|
startTime int64
|
|
endTime int64
|
|
}
|
|
}
|
|
|
|
// new{{$k.Name}}MergeIterator returns a new instance of {{$k.name}}MergeIterator.
|
|
func new{{$k.Name}}MergeIterator(inputs []{{$k.Name}}Iterator, opt IteratorOptions) *{{$k.name}}MergeIterator {
|
|
itr := &{{$k.name}}MergeIterator{
|
|
inputs: inputs,
|
|
heap: &{{$k.name}}MergeHeap{
|
|
items: make([]*{{$k.name}}MergeHeapItem, 0, len(inputs)),
|
|
opt: opt,
|
|
},
|
|
}
|
|
|
|
// Initialize heap items.
|
|
for _, input := range inputs {
|
|
// Wrap in buffer, ignore any inputs without anymore points.
|
|
bufInput := newBuf{{$k.Name}}Iterator(input)
|
|
|
|
// Append to the heap.
|
|
itr.heap.items = append(itr.heap.items, &{{$k.name}}MergeHeapItem{itr: bufInput})
|
|
}
|
|
|
|
return itr
|
|
}
|
|
|
|
// Stats returns an aggregation of stats from the underlying iterators.
|
|
func (itr *{{$k.name}}MergeIterator) Stats() IteratorStats {
|
|
var stats IteratorStats
|
|
for _, input := range itr.inputs {
|
|
stats.Add(input.Stats())
|
|
}
|
|
return stats
|
|
}
|
|
|
|
// Close closes the underlying iterators.
|
|
func (itr *{{$k.name}}MergeIterator) Close() error {
|
|
for _, input := range itr.inputs {
|
|
input.Close()
|
|
}
|
|
itr.curr = nil
|
|
itr.inputs = nil
|
|
itr.heap.items = nil
|
|
return nil
|
|
}
|
|
|
|
// Next returns the next point from the iterator.
|
|
func (itr *{{$k.name}}MergeIterator) Next() (*{{$k.Name}}Point, error) {
|
|
// Initialize the heap. This needs to be done lazily on the first call to this iterator
|
|
// so that iterator initialization done through the Select() call returns quickly.
|
|
// Queries can only be interrupted after the Select() call completes so any operations
|
|
// done during iterator creation cannot be interrupted, which is why we do it here
|
|
// instead so an interrupt can happen while initializing the heap.
|
|
if !itr.init {
|
|
items := itr.heap.items
|
|
itr.heap.items = make([]*{{$k.name}}MergeHeapItem, 0, len(items))
|
|
for _, item := range items {
|
|
if p, err := item.itr.peek(); err != nil {
|
|
return nil, err
|
|
} else if p == nil {
|
|
continue
|
|
}
|
|
itr.heap.items = append(itr.heap.items, item)
|
|
}
|
|
heap.Init(itr.heap)
|
|
itr.init = true
|
|
}
|
|
|
|
for {
|
|
// Retrieve the next iterator if we don't have one.
|
|
if itr.curr == nil {
|
|
if len(itr.heap.items) == 0 {
|
|
return nil, nil
|
|
}
|
|
itr.curr = heap.Pop(itr.heap).(*{{$k.name}}MergeHeapItem)
|
|
|
|
// Read point and set current window.
|
|
p, err := itr.curr.itr.Next()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
tags := p.Tags.Subset(itr.heap.opt.Dimensions)
|
|
itr.window.name, itr.window.tags = p.Name, tags.ID()
|
|
itr.window.startTime, itr.window.endTime = itr.heap.opt.Window(p.Time)
|
|
return p, nil
|
|
}
|
|
|
|
// Read the next point from the current iterator.
|
|
p, err := itr.curr.itr.Next()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// If there are no more points then remove iterator from heap and find next.
|
|
if p == nil {
|
|
itr.curr = nil
|
|
continue
|
|
}
|
|
|
|
// Check if the point is inside of our current window.
|
|
inWindow := true
|
|
if window := itr.window; window.name != p.Name {
|
|
inWindow = false
|
|
} else if tags := p.Tags.Subset(itr.heap.opt.Dimensions); window.tags != tags.ID() {
|
|
inWindow = false
|
|
} else if opt := itr.heap.opt; opt.Ascending && p.Time >= window.endTime {
|
|
inWindow = false
|
|
} else if !opt.Ascending && p.Time < window.startTime {
|
|
inWindow = false
|
|
}
|
|
|
|
// If it's outside our window then push iterator back on the heap and find new iterator.
|
|
if !inWindow {
|
|
itr.curr.itr.unread(p)
|
|
heap.Push(itr.heap, itr.curr)
|
|
itr.curr = nil
|
|
continue
|
|
}
|
|
|
|
return p, nil
|
|
}
|
|
}
|
|
|
|
// {{$k.name}}MergeHeap represents a heap of {{$k.name}}MergeHeapItems.
|
|
// Items are sorted by their next window and then by name/tags.
|
|
type {{$k.name}}MergeHeap struct {
|
|
opt IteratorOptions
|
|
items []*{{$k.name}}MergeHeapItem
|
|
}
|
|
|
|
func (h *{{$k.name}}MergeHeap) Len() int { return len(h.items) }
|
|
func (h *{{$k.name}}MergeHeap) Swap(i, j int) { h.items[i], h.items[j] = h.items[j], h.items[i] }
|
|
func (h *{{$k.name}}MergeHeap) Less(i, j int) bool {
|
|
x, err := h.items[i].itr.peek()
|
|
if err != nil {
|
|
return true
|
|
}
|
|
y, err := h.items[j].itr.peek()
|
|
if err != nil {
|
|
return false
|
|
}
|
|
|
|
if h.opt.Ascending {
|
|
if x.Name != y.Name {
|
|
return x.Name < y.Name
|
|
} else if xTags, yTags := x.Tags.Subset(h.opt.Dimensions), y.Tags.Subset(h.opt.Dimensions); xTags.ID() != yTags.ID() {
|
|
return xTags.ID() < yTags.ID()
|
|
}
|
|
} else {
|
|
if x.Name != y.Name {
|
|
return x.Name > y.Name
|
|
} else if xTags, yTags := x.Tags.Subset(h.opt.Dimensions), y.Tags.Subset(h.opt.Dimensions); xTags.ID() != yTags.ID() {
|
|
return xTags.ID() > yTags.ID()
|
|
}
|
|
}
|
|
|
|
xt, _ := h.opt.Window(x.Time)
|
|
yt, _ := h.opt.Window(y.Time)
|
|
|
|
if h.opt.Ascending {
|
|
return xt < yt
|
|
}
|
|
return xt > yt
|
|
}
|
|
|
|
|
|
func (h *{{$k.name}}MergeHeap) Push(x interface{}) {
|
|
h.items = append(h.items, x.(*{{$k.name}}MergeHeapItem))
|
|
}
|
|
|
|
func (h *{{$k.name}}MergeHeap) Pop() interface{} {
|
|
old := h.items
|
|
n := len(old)
|
|
item := old[n-1]
|
|
h.items = old[0 : n-1]
|
|
return item
|
|
}
|
|
|
|
type {{$k.name}}MergeHeapItem struct {
|
|
itr *buf{{$k.Name}}Iterator
|
|
}
|
|
|
|
// {{$k.name}}SortedMergeIterator is an iterator that sorts and merges multiple iterators into one.
|
|
type {{$k.name}}SortedMergeIterator struct {
|
|
inputs []{{$k.Name}}Iterator
|
|
heap *{{$k.name}}SortedMergeHeap
|
|
init bool
|
|
}
|
|
|
|
// new{{$k.Name}}SortedMergeIterator returns an instance of {{$k.name}}SortedMergeIterator.
|
|
func new{{$k.Name}}SortedMergeIterator(inputs []{{$k.Name}}Iterator, opt IteratorOptions) Iterator {
|
|
itr := &{{$k.name}}SortedMergeIterator{
|
|
inputs: inputs,
|
|
heap: &{{$k.name}}SortedMergeHeap{
|
|
items: make([]*{{$k.name}}SortedMergeHeapItem, 0, len(inputs)),
|
|
opt: opt,
|
|
},
|
|
}
|
|
|
|
// Initialize heap items.
|
|
for _, input := range inputs {
|
|
// Append to the heap.
|
|
itr.heap.items = append(itr.heap.items, &{{$k.name}}SortedMergeHeapItem{itr: input})
|
|
}
|
|
|
|
return itr
|
|
}
|
|
|
|
// Stats returns an aggregation of stats from the underlying iterators.
|
|
func (itr *{{$k.name}}SortedMergeIterator) Stats() IteratorStats {
|
|
var stats IteratorStats
|
|
for _, input := range itr.inputs {
|
|
stats.Add(input.Stats())
|
|
}
|
|
return stats
|
|
}
|
|
|
|
// Close closes the underlying iterators.
|
|
func (itr *{{$k.name}}SortedMergeIterator) Close() error {
|
|
for _, input := range itr.inputs {
|
|
input.Close()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Next returns the next points from the iterator.
|
|
func (itr *{{$k.name}}SortedMergeIterator) Next() (*{{$k.Name}}Point, error) { return itr.pop() }
|
|
|
|
// pop returns the next point from the heap.
|
|
// Reads the next point from item's cursor and puts it back on the heap.
|
|
func (itr *{{$k.name}}SortedMergeIterator) pop() (*{{$k.Name}}Point, error) {
|
|
// Initialize the heap. See the MergeIterator to see why this has to be done lazily.
|
|
if !itr.init {
|
|
items := itr.heap.items
|
|
itr.heap.items = make([]*{{$k.name}}SortedMergeHeapItem, 0, len(items))
|
|
for _, item := range items {
|
|
var err error
|
|
if item.point, err = item.itr.Next(); err != nil {
|
|
return nil, err
|
|
} else if item.point == nil {
|
|
continue
|
|
}
|
|
itr.heap.items = append(itr.heap.items, item)
|
|
}
|
|
heap.Init(itr.heap)
|
|
itr.init = true
|
|
}
|
|
|
|
if len(itr.heap.items) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
// Read the next item from the heap.
|
|
item := heap.Pop(itr.heap).(*{{$k.name}}SortedMergeHeapItem)
|
|
if item.err != nil {
|
|
return nil, item.err
|
|
} else if item.point == nil {
|
|
return nil, nil
|
|
}
|
|
|
|
// Copy the point for return.
|
|
p := item.point.Clone()
|
|
|
|
// Read the next item from the cursor. Push back to heap if one exists.
|
|
if item.point, item.err = item.itr.Next(); item.point != nil {
|
|
heap.Push(itr.heap, item)
|
|
}
|
|
|
|
return p, nil
|
|
}
|
|
|
|
// {{$k.name}}SortedMergeHeap represents a heap of {{$k.name}}SortedMergeHeapItems.
|
|
type {{$k.name}}SortedMergeHeap struct {
|
|
opt IteratorOptions
|
|
items []*{{$k.name}}SortedMergeHeapItem
|
|
}
|
|
|
|
func (h *{{$k.name}}SortedMergeHeap) Len() int { return len(h.items) }
|
|
func (h *{{$k.name}}SortedMergeHeap) Swap(i, j int) { h.items[i], h.items[j] = h.items[j], h.items[i] }
|
|
func (h *{{$k.name}}SortedMergeHeap) Less(i, j int) bool {
|
|
x, y := h.items[i].point, h.items[j].point
|
|
|
|
if h.opt.Ascending {
|
|
if x.Name != y.Name {
|
|
return x.Name < y.Name
|
|
} else if xTags, yTags := x.Tags.Subset(h.opt.Dimensions), y.Tags.Subset(h.opt.Dimensions); !xTags.Equals(&yTags) {
|
|
return xTags.ID() < yTags.ID()
|
|
}
|
|
return x.Time < y.Time
|
|
}
|
|
|
|
if x.Name != y.Name {
|
|
return x.Name > y.Name
|
|
} else if xTags, yTags := x.Tags.Subset(h.opt.Dimensions), y.Tags.Subset(h.opt.Dimensions); !xTags.Equals(&yTags) {
|
|
return xTags.ID() > yTags.ID()
|
|
}
|
|
return x.Time > y.Time
|
|
}
|
|
|
|
func (h *{{$k.name}}SortedMergeHeap) Push(x interface{}) {
|
|
h.items = append(h.items, x.(*{{$k.name}}SortedMergeHeapItem))
|
|
}
|
|
|
|
func (h *{{$k.name}}SortedMergeHeap) Pop() interface{} {
|
|
old := h.items
|
|
n := len(old)
|
|
item := old[n-1]
|
|
h.items = old[0 : n-1]
|
|
return item
|
|
}
|
|
|
|
type {{$k.name}}SortedMergeHeapItem struct {
|
|
point *{{$k.Name}}Point
|
|
err error
|
|
itr {{$k.Name}}Iterator
|
|
}
|
|
|
|
// {{$k.name}}ParallelIterator represents an iterator that pulls data in a separate goroutine.
|
|
type {{$k.name}}ParallelIterator struct {
|
|
input {{$k.Name}}Iterator
|
|
ch chan {{$k.name}}PointError
|
|
|
|
once sync.Once
|
|
closing chan struct{}
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
// new{{$k.Name}}ParallelIterator returns a new instance of {{$k.name}}ParallelIterator.
|
|
func new{{$k.Name}}ParallelIterator(input {{$k.Name}}Iterator) *{{$k.name}}ParallelIterator {
|
|
itr := &{{$k.name}}ParallelIterator{
|
|
input: input,
|
|
ch: make(chan {{$k.name}}PointError, 256),
|
|
closing: make(chan struct{}),
|
|
}
|
|
itr.wg.Add(1)
|
|
go itr.monitor()
|
|
return itr
|
|
}
|
|
|
|
// Stats returns stats from the underlying iterator.
|
|
func (itr *{{$k.name}}ParallelIterator) Stats() IteratorStats { return itr.input.Stats() }
|
|
|
|
// Close closes the underlying iterators.
|
|
func (itr *{{$k.name}}ParallelIterator) Close() error {
|
|
itr.once.Do(func() { close(itr.closing) })
|
|
itr.wg.Wait()
|
|
return itr.input.Close()
|
|
}
|
|
|
|
// Next returns the next point from the iterator.
|
|
func (itr *{{$k.name}}ParallelIterator) Next() (*{{$k.Name}}Point, error) {
|
|
v, ok := <-itr.ch
|
|
if !ok {
|
|
return nil, io.EOF
|
|
}
|
|
return v.point, v.err
|
|
}
|
|
|
|
// monitor runs in a separate goroutine and actively pulls the next point.
|
|
func (itr *{{$k.name}}ParallelIterator) monitor() {
|
|
defer close(itr.ch)
|
|
defer itr.wg.Done()
|
|
|
|
for {
|
|
// Read next point.
|
|
p, err := itr.input.Next()
|
|
if p != nil {
|
|
p = p.Clone()
|
|
}
|
|
|
|
select {
|
|
case <-itr.closing:
|
|
return
|
|
case itr.ch <- {{$k.name}}PointError{point: p, err: err}:
|
|
}
|
|
}
|
|
}
|
|
|
|
type {{$k.name}}PointError struct {
|
|
point *{{$k.Name}}Point
|
|
err error
|
|
}
|
|
|
|
// {{$k.name}}LimitIterator represents an iterator that limits points per group.
|
|
type {{$k.name}}LimitIterator struct {
|
|
input {{$k.Name}}Iterator
|
|
opt IteratorOptions
|
|
n int
|
|
|
|
prev struct {
|
|
name string
|
|
tags Tags
|
|
}
|
|
}
|
|
|
|
// new{{$k.Name}}LimitIterator returns a new instance of {{$k.name}}LimitIterator.
|
|
func new{{$k.Name}}LimitIterator(input {{$k.Name}}Iterator, opt IteratorOptions) *{{$k.name}}LimitIterator {
|
|
return &{{$k.name}}LimitIterator{
|
|
input: input,
|
|
opt: opt,
|
|
}
|
|
}
|
|
|
|
// Stats returns stats from the underlying iterator.
|
|
func (itr *{{$k.name}}LimitIterator) Stats() IteratorStats { return itr.input.Stats() }
|
|
|
|
// Close closes the underlying iterators.
|
|
func (itr *{{$k.name}}LimitIterator) Close() error { return itr.input.Close() }
|
|
|
|
// Next returns the next point from the iterator.
|
|
func (itr *{{$k.name}}LimitIterator) Next() (*{{$k.Name}}Point, error) {
|
|
for {
|
|
p, err := itr.input.Next()
|
|
if p == nil || err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Reset window and counter if a new window is encountered.
|
|
if p.Name != itr.prev.name || !p.Tags.Equals(&itr.prev.tags) {
|
|
itr.prev.name = p.Name
|
|
itr.prev.tags = p.Tags
|
|
itr.n = 0
|
|
}
|
|
|
|
// Increment counter.
|
|
itr.n++
|
|
|
|
// Read next point if not beyond the offset.
|
|
if itr.n <= itr.opt.Offset {
|
|
continue
|
|
}
|
|
|
|
// Read next point if we're beyond the limit.
|
|
if itr.opt.Limit > 0 && (itr.n-itr.opt.Offset) > itr.opt.Limit {
|
|
continue
|
|
}
|
|
|
|
return p, nil
|
|
}
|
|
}
|
|
|
|
type {{$k.name}}FillIterator struct {
|
|
input *buf{{$k.Name}}Iterator
|
|
prev {{$k.Name}}Point
|
|
startTime int64
|
|
endTime int64
|
|
auxFields []interface{}
|
|
init bool
|
|
opt IteratorOptions
|
|
|
|
window struct {
|
|
name string
|
|
tags Tags
|
|
time int64
|
|
offset int64
|
|
}
|
|
}
|
|
|
|
func new{{$k.Name}}FillIterator(input {{$k.Name}}Iterator, expr Expr, opt IteratorOptions) *{{$k.name}}FillIterator {
|
|
if opt.Fill == NullFill {
|
|
if expr, ok := expr.(*Call); ok && expr.Name == "count" {
|
|
opt.Fill = NumberFill
|
|
opt.FillValue = {{$k.Zero}}
|
|
}
|
|
}
|
|
|
|
var startTime, endTime int64
|
|
if opt.Ascending {
|
|
startTime, _ = opt.Window(opt.StartTime)
|
|
endTime, _ = opt.Window(opt.EndTime)
|
|
} else {
|
|
startTime, _ = opt.Window(opt.EndTime)
|
|
endTime, _ = opt.Window(opt.StartTime)
|
|
}
|
|
|
|
var auxFields []interface{}
|
|
if len(opt.Aux) > 0 {
|
|
auxFields = make([]interface{}, len(opt.Aux))
|
|
}
|
|
|
|
return &{{$k.name}}FillIterator{
|
|
input: newBuf{{$k.Name}}Iterator(input),
|
|
prev: {{$k.Name}}Point{Nil: true},
|
|
startTime: startTime,
|
|
endTime: endTime,
|
|
auxFields: auxFields,
|
|
opt: opt,
|
|
}
|
|
}
|
|
|
|
func (itr *{{$k.name}}FillIterator) Stats() IteratorStats { return itr.input.Stats() }
|
|
func (itr *{{$k.name}}FillIterator) Close() error { return itr.input.Close() }
|
|
|
|
func (itr *{{$k.name}}FillIterator) Next() (*{{$k.Name}}Point, error) {
|
|
if !itr.init {
|
|
p, err := itr.input.peek()
|
|
if p == nil || err != nil {
|
|
return nil, err
|
|
}
|
|
itr.window.name, itr.window.tags = p.Name, p.Tags
|
|
itr.window.time = itr.startTime
|
|
if itr.opt.Location != nil {
|
|
_, itr.window.offset = itr.opt.Zone(itr.window.time)
|
|
}
|
|
itr.init = true
|
|
}
|
|
|
|
p, err := itr.input.Next()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Check if the next point is outside of our window or is nil.
|
|
for p == nil || p.Name != itr.window.name || p.Tags.ID() != itr.window.tags.ID() {
|
|
// If we are inside of an interval, unread the point and continue below to
|
|
// constructing a new point.
|
|
if itr.opt.Ascending {
|
|
if itr.window.time <= itr.endTime {
|
|
itr.input.unread(p)
|
|
p = nil
|
|
break
|
|
}
|
|
} else {
|
|
if itr.window.time >= itr.endTime {
|
|
itr.input.unread(p)
|
|
p = nil
|
|
break
|
|
}
|
|
}
|
|
|
|
// We are *not* in a current interval. If there is no next point,
|
|
// we are at the end of all intervals.
|
|
if p == nil {
|
|
return nil, nil
|
|
}
|
|
|
|
// Set the new interval.
|
|
itr.window.name, itr.window.tags = p.Name, p.Tags
|
|
itr.window.time = itr.startTime
|
|
if itr.opt.Location != nil {
|
|
_, itr.window.offset = itr.opt.Zone(itr.window.time)
|
|
}
|
|
itr.prev = {{$k.Name}}Point{Nil: true}
|
|
break
|
|
}
|
|
|
|
// Check if the point is our next expected point.
|
|
if p == nil || (itr.opt.Ascending && p.Time > itr.window.time) || (!itr.opt.Ascending && p.Time < itr.window.time) {
|
|
if p != nil {
|
|
itr.input.unread(p)
|
|
}
|
|
|
|
p = &{{$k.Name}}Point{
|
|
Name: itr.window.name,
|
|
Tags: itr.window.tags,
|
|
Time: itr.window.time,
|
|
Aux: itr.auxFields,
|
|
}
|
|
|
|
switch itr.opt.Fill {
|
|
case LinearFill:
|
|
{{- if or (eq $k.Name "Float") (eq $k.Name "Integer")}}
|
|
if !itr.prev.Nil {
|
|
next, err := itr.input.peek()
|
|
if err != nil {
|
|
return nil, err
|
|
} else if next != nil && next.Name == itr.window.name && next.Tags.ID() == itr.window.tags.ID() {
|
|
interval := int64(itr.opt.Interval.Duration)
|
|
start := itr.window.time / interval
|
|
p.Value = linear{{$k.Name}}(start, itr.prev.Time/interval, next.Time/interval, itr.prev.Value, next.Value)
|
|
} else {
|
|
p.Nil = true
|
|
}
|
|
} else {
|
|
p.Nil = true
|
|
}
|
|
{{else}}
|
|
fallthrough
|
|
{{- end}}
|
|
case NullFill:
|
|
p.Nil = true
|
|
case NumberFill:
|
|
p.Value = castTo{{$k.Name}}(itr.opt.FillValue)
|
|
case PreviousFill:
|
|
if !itr.prev.Nil {
|
|
p.Value = itr.prev.Value
|
|
p.Nil = itr.prev.Nil
|
|
} else {
|
|
p.Nil = true
|
|
}
|
|
}
|
|
} else {
|
|
itr.prev = *p
|
|
}
|
|
|
|
// Advance the expected time. Do not advance to a new window here
|
|
// as there may be lingering points with the same timestamp in the previous
|
|
// window.
|
|
if itr.opt.Ascending {
|
|
itr.window.time += int64(itr.opt.Interval.Duration)
|
|
} else {
|
|
itr.window.time -= int64(itr.opt.Interval.Duration)
|
|
}
|
|
|
|
// Check to see if we have passed over an offset change and adjust the time
|
|
// to account for this new offset.
|
|
if itr.opt.Location != nil {
|
|
if _, offset := itr.opt.Zone(itr.window.time - 1); offset != itr.window.offset {
|
|
diff := itr.window.offset - offset
|
|
if abs(diff) < int64(itr.opt.Interval.Duration) {
|
|
itr.window.time += diff
|
|
}
|
|
itr.window.offset = offset
|
|
}
|
|
}
|
|
return p, nil
|
|
}
|
|
|
|
// {{$k.name}}IntervalIterator represents a {{$k.name}} implementation of IntervalIterator.
|
|
type {{$k.name}}IntervalIterator struct {
|
|
input {{$k.Name}}Iterator
|
|
opt IteratorOptions
|
|
}
|
|
|
|
func new{{$k.Name}}IntervalIterator(input {{$k.Name}}Iterator, opt IteratorOptions) *{{$k.name}}IntervalIterator {
|
|
return &{{$k.name}}IntervalIterator{input: input, opt: opt}
|
|
}
|
|
|
|
func (itr *{{$k.name}}IntervalIterator) Stats() IteratorStats { return itr.input.Stats() }
|
|
func (itr *{{$k.name}}IntervalIterator) Close() error { return itr.input.Close() }
|
|
|
|
func (itr *{{$k.name}}IntervalIterator) Next() (*{{$k.Name}}Point, error) {
|
|
p, err := itr.input.Next()
|
|
if p == nil || err != nil {
|
|
return nil, err
|
|
}
|
|
p.Time, _ = itr.opt.Window(p.Time)
|
|
// If we see the minimum allowable time, set the time to zero so we don't
|
|
// break the default returned time for aggregate queries without times.
|
|
if p.Time == MinTime {
|
|
p.Time = 0
|
|
}
|
|
return p, nil
|
|
}
|
|
|
|
// {{$k.name}}InterruptIterator represents a {{$k.name}} implementation of InterruptIterator.
|
|
type {{$k.name}}InterruptIterator struct {
|
|
input {{$k.Name}}Iterator
|
|
closing <-chan struct{}
|
|
count int
|
|
}
|
|
|
|
func new{{$k.Name}}InterruptIterator(input {{$k.Name}}Iterator, closing <-chan struct{}) *{{$k.name}}InterruptIterator {
|
|
return &{{$k.name}}InterruptIterator{input: input, closing: closing}
|
|
}
|
|
|
|
func (itr *{{$k.name}}InterruptIterator) Stats() IteratorStats { return itr.input.Stats() }
|
|
func (itr *{{$k.name}}InterruptIterator) Close() error { return itr.input.Close() }
|
|
|
|
func (itr *{{$k.name}}InterruptIterator) Next() (*{{$k.Name}}Point, error) {
|
|
// Only check if the channel is closed every N points. This
|
|
// intentionally checks on both 0 and N so that if the iterator
|
|
// has been interrupted before the first point is emitted it will
|
|
// not emit any points.
|
|
if itr.count & 0xFF == 0xFF {
|
|
select {
|
|
case <-itr.closing:
|
|
return nil, itr.Close()
|
|
default:
|
|
// Reset iterator count to zero and fall through to emit the next point.
|
|
itr.count = 0
|
|
}
|
|
}
|
|
|
|
// Increment the counter for every point read.
|
|
itr.count++
|
|
return itr.input.Next()
|
|
}
|
|
|
|
// {{$k.name}}CloseInterruptIterator represents a {{$k.name}} implementation of CloseInterruptIterator.
|
|
type {{$k.name}}CloseInterruptIterator struct {
|
|
input {{$k.Name}}Iterator
|
|
closing <-chan struct{}
|
|
done chan struct{}
|
|
once sync.Once
|
|
}
|
|
|
|
func new{{$k.Name}}CloseInterruptIterator(input {{$k.Name}}Iterator, closing <-chan struct{}) *{{$k.name}}CloseInterruptIterator {
|
|
itr := &{{$k.name}}CloseInterruptIterator{
|
|
input: input,
|
|
closing: closing,
|
|
done: make(chan struct{}),
|
|
}
|
|
go itr.monitor()
|
|
return itr
|
|
}
|
|
|
|
func (itr *{{$k.name}}CloseInterruptIterator) monitor() {
|
|
select {
|
|
case <-itr.closing:
|
|
itr.Close()
|
|
case <-itr.done:
|
|
}
|
|
}
|
|
|
|
func (itr *{{$k.name}}CloseInterruptIterator) Stats() IteratorStats {
|
|
return itr.input.Stats()
|
|
}
|
|
|
|
func (itr *{{$k.name}}CloseInterruptIterator) Close() error {
|
|
itr.once.Do(func() {
|
|
close(itr.done)
|
|
itr.input.Close()
|
|
})
|
|
return nil
|
|
}
|
|
|
|
func (itr *{{$k.name}}CloseInterruptIterator) Next() (*{{$k.Name}}Point, error) {
|
|
p, err := itr.input.Next()
|
|
if err != nil {
|
|
// Check if the iterator was closed.
|
|
select {
|
|
case <-itr.done:
|
|
return nil, nil
|
|
default:
|
|
return nil, err
|
|
}
|
|
}
|
|
return p, nil
|
|
}
|
|
|
|
// aux{{$k.Name}}Point represents a combination of a point and an error for the AuxIterator.
|
|
type aux{{$k.Name}}Point struct {
|
|
point *{{$k.Name}}Point
|
|
err error
|
|
}
|
|
|
|
// {{$k.name}}AuxIterator represents a {{$k.name}} implementation of AuxIterator.
|
|
type {{$k.name}}AuxIterator struct {
|
|
input *buf{{$k.Name}}Iterator
|
|
output chan aux{{$k.Name}}Point
|
|
fields *auxIteratorFields
|
|
background bool
|
|
}
|
|
|
|
func new{{$k.Name}}AuxIterator(input {{$k.Name}}Iterator, opt IteratorOptions) *{{$k.name}}AuxIterator {
|
|
return &{{$k.name}}AuxIterator{
|
|
input: newBuf{{$k.Name}}Iterator(input),
|
|
output: make(chan aux{{$k.Name}}Point, 1),
|
|
fields: newAuxIteratorFields(opt),
|
|
}
|
|
}
|
|
|
|
func (itr *{{$k.name}}AuxIterator) Background() {
|
|
itr.background = true
|
|
itr.Start()
|
|
go DrainIterator(itr)
|
|
}
|
|
|
|
func (itr *{{$k.name}}AuxIterator) Start() { go itr.stream() }
|
|
func (itr *{{$k.name}}AuxIterator) Stats() IteratorStats { return itr.input.Stats() }
|
|
func (itr *{{$k.name}}AuxIterator) Close() error { return itr.input.Close() }
|
|
func (itr *{{$k.name}}AuxIterator) Next() (*{{$k.Name}}Point, error) {
|
|
p := <-itr.output
|
|
return p.point, p.err
|
|
}
|
|
func (itr *{{$k.name}}AuxIterator) Iterator(name string, typ DataType) Iterator { return itr.fields.iterator(name, typ) }
|
|
|
|
func (itr *{{.name}}AuxIterator) stream() {
|
|
for {
|
|
// Read next point.
|
|
p, err := itr.input.Next()
|
|
if err != nil {
|
|
itr.output <- aux{{$k.Name}}Point{err: err}
|
|
itr.fields.sendError(err)
|
|
break
|
|
} else if p == nil {
|
|
break
|
|
}
|
|
|
|
// Send point to output and to each field iterator.
|
|
itr.output <- aux{{$k.Name}}Point{point: p}
|
|
if ok := itr.fields.send(p); !ok && itr.background {
|
|
break
|
|
}
|
|
}
|
|
|
|
close(itr.output)
|
|
itr.fields.close()
|
|
}
|
|
|
|
// {{$k.name}}ChanIterator represents a new instance of {{$k.name}}ChanIterator.
|
|
type {{$k.name}}ChanIterator struct {
|
|
buf struct {
|
|
i int
|
|
filled bool
|
|
points [2]{{$k.Name}}Point
|
|
}
|
|
err error
|
|
cond *sync.Cond
|
|
done bool
|
|
}
|
|
|
|
func (itr *{{$k.name}}ChanIterator) Stats() IteratorStats { return IteratorStats{} }
|
|
|
|
func (itr *{{$k.name}}ChanIterator) Close() error {
|
|
itr.cond.L.Lock()
|
|
// Mark the channel iterator as done and signal all waiting goroutines to start again.
|
|
itr.done = true
|
|
itr.cond.Broadcast()
|
|
// Do not defer the unlock so we don't create an unnecessary allocation.
|
|
itr.cond.L.Unlock()
|
|
return nil
|
|
}
|
|
|
|
func (itr *{{$k.name}}ChanIterator) setBuf(name string, tags Tags, time int64, value interface{}) bool {
|
|
itr.cond.L.Lock()
|
|
defer itr.cond.L.Unlock()
|
|
|
|
// Wait for either the iterator to be done (so we don't have to set the value)
|
|
// or for the buffer to have been read and ready for another write.
|
|
for !itr.done && itr.buf.filled {
|
|
itr.cond.Wait()
|
|
}
|
|
|
|
// Do not set the value and return false to signal that the iterator is closed.
|
|
// Do this after the above wait as the above for loop may have exited because
|
|
// the iterator was closed.
|
|
if itr.done {
|
|
return false
|
|
}
|
|
|
|
switch v := value.(type) {
|
|
case {{$k.Type}}:
|
|
itr.buf.points[itr.buf.i] = {{$k.Name}}Point{Name: name, Tags: tags, Time: time, Value: v}
|
|
{{if eq $k.Name "Float"}}
|
|
case int64:
|
|
itr.buf.points[itr.buf.i] = {{$k.Name}}Point{Name: name, Tags: tags, Time: time, Value: float64(v)}
|
|
{{end}}
|
|
default:
|
|
itr.buf.points[itr.buf.i] = {{$k.Name}}Point{Name: name, Tags: tags, Time: time, Nil: true}
|
|
}
|
|
itr.buf.filled = true
|
|
|
|
// Signal to all waiting goroutines that a new value is ready to read.
|
|
itr.cond.Signal()
|
|
return true
|
|
}
|
|
|
|
func (itr *{{$k.name}}ChanIterator) setErr(err error) {
|
|
itr.cond.L.Lock()
|
|
defer itr.cond.L.Unlock()
|
|
itr.err = err
|
|
|
|
// Signal to all waiting goroutines that a new value is ready to read.
|
|
itr.cond.Signal()
|
|
}
|
|
|
|
func (itr *{{$k.name}}ChanIterator) Next() (*{{$k.Name}}Point, error) {
|
|
itr.cond.L.Lock()
|
|
defer itr.cond.L.Unlock()
|
|
|
|
// Check for an error and return one if there.
|
|
if itr.err != nil {
|
|
return nil, itr.err
|
|
}
|
|
|
|
// Wait until either a value is available in the buffer or
|
|
// the iterator is closed.
|
|
for !itr.done && !itr.buf.filled {
|
|
itr.cond.Wait()
|
|
}
|
|
|
|
// Return nil once the channel is done and the buffer is empty.
|
|
if itr.done && !itr.buf.filled {
|
|
return nil, nil
|
|
}
|
|
|
|
// Always read from the buffer if it exists, even if the iterator
|
|
// is closed. This prevents the last value from being truncated by
|
|
// the parent iterator.
|
|
p := &itr.buf.points[itr.buf.i]
|
|
itr.buf.i = (itr.buf.i + 1) % len(itr.buf.points)
|
|
itr.buf.filled = false
|
|
itr.cond.Signal()
|
|
return p, nil
|
|
}
|
|
|
|
{{range $v := $types}}
|
|
|
|
// {{$k.name}}Reduce{{$v.Name}}Iterator executes a reducer for every interval and buffers the result.
|
|
type {{$k.name}}Reduce{{$v.Name}}Iterator struct {
|
|
input *buf{{$k.Name}}Iterator
|
|
create func() ({{$k.Name}}PointAggregator, {{$v.Name}}PointEmitter)
|
|
dims []string
|
|
opt IteratorOptions
|
|
points []{{$v.Name}}Point
|
|
keepTags bool
|
|
}
|
|
|
|
func new{{$k.Name}}Reduce{{$v.Name}}Iterator(input {{$k.Name}}Iterator, opt IteratorOptions, createFn func() ({{$k.Name}}PointAggregator, {{$v.Name}}PointEmitter)) *{{$k.name}}Reduce{{$v.Name}}Iterator {
|
|
return &{{$k.name}}Reduce{{$v.Name}}Iterator{
|
|
input: newBuf{{$k.Name}}Iterator(input),
|
|
create: createFn,
|
|
dims: opt.GetDimensions(),
|
|
opt: opt,
|
|
}
|
|
}
|
|
|
|
// Stats returns stats from the input iterator.
|
|
func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) Stats() IteratorStats { return itr.input.Stats() }
|
|
|
|
// Close closes the iterator and all child iterators.
|
|
func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) Close() error { return itr.input.Close() }
|
|
|
|
// Next returns the minimum value for the next available interval.
|
|
func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) Next() (*{{$v.Name}}Point, error) {
|
|
// Calculate next window if we have no more points.
|
|
if len(itr.points) == 0 {
|
|
var err error
|
|
itr.points, err = itr.reduce()
|
|
if len(itr.points) == 0 {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// Pop next point off the stack.
|
|
p := &itr.points[len(itr.points)-1]
|
|
itr.points = itr.points[:len(itr.points)-1]
|
|
return p, nil
|
|
}
|
|
|
|
// {{$k.name}}Reduce{{$v.Name}}Point stores the reduced data for a name/tag combination.
|
|
type {{$k.name}}Reduce{{$v.Name}}Point struct {
|
|
Name string
|
|
Tags Tags
|
|
Aggregator {{$k.Name}}PointAggregator
|
|
Emitter {{$v.Name}}PointEmitter
|
|
}
|
|
|
|
// reduce executes fn once for every point in the next window.
|
|
// The previous value for the dimension is passed to fn.
|
|
func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) reduce() ([]{{$v.Name}}Point, error) {
|
|
// Calculate next window.
|
|
var (
|
|
startTime, endTime int64
|
|
window struct {
|
|
name string
|
|
tags string
|
|
}
|
|
)
|
|
for {
|
|
p, err := itr.input.Next()
|
|
if err != nil || p == nil {
|
|
return nil, err
|
|
} else if p.Nil {
|
|
continue
|
|
}
|
|
|
|
// Unread the point so it can be processed.
|
|
itr.input.unread(p)
|
|
startTime, endTime = itr.opt.Window(p.Time)
|
|
window.name, window.tags = p.Name, p.Tags.Subset(itr.opt.Dimensions).ID()
|
|
break
|
|
}
|
|
|
|
// Create points by tags.
|
|
m := make(map[string]*{{$k.name}}Reduce{{$v.Name}}Point)
|
|
for {
|
|
// Read next point.
|
|
curr, err := itr.input.NextInWindow(startTime, endTime)
|
|
if err != nil {
|
|
return nil, err
|
|
} else if curr == nil {
|
|
break
|
|
} else if curr.Nil {
|
|
continue
|
|
} else if curr.Name != window.name {
|
|
itr.input.unread(curr)
|
|
break
|
|
}
|
|
|
|
// Ensure this point is within the same final window.
|
|
if curr.Name != window.name {
|
|
itr.input.unread(curr)
|
|
break
|
|
} else if tags := curr.Tags.Subset(itr.opt.Dimensions); tags.ID() != window.tags {
|
|
itr.input.unread(curr)
|
|
break
|
|
}
|
|
|
|
// Retrieve the tags on this point for this level of the query.
|
|
// This may be different than the bucket dimensions.
|
|
tags := curr.Tags.Subset(itr.dims)
|
|
id := tags.ID()
|
|
|
|
// Retrieve the aggregator for this name/tag combination or create one.
|
|
rp := m[id]
|
|
if rp == nil {
|
|
aggregator, emitter := itr.create()
|
|
rp = &{{$k.name}}Reduce{{$v.Name}}Point{
|
|
Name: curr.Name,
|
|
Tags: tags,
|
|
Aggregator: aggregator,
|
|
Emitter: emitter,
|
|
}
|
|
m[id] = rp
|
|
}
|
|
rp.Aggregator.Aggregate{{$k.Name}}(curr)
|
|
}
|
|
|
|
// Reverse sort points by name & tag if our output is supposed to be ordered.
|
|
keys := make([]string, 0, len(m))
|
|
for k := range m {
|
|
keys = append(keys, k)
|
|
}
|
|
if len(keys) > 1 && itr.opt.Ordered {
|
|
sort.Sort(reverseStringSlice(keys))
|
|
}
|
|
|
|
// Assume the points are already sorted until proven otherwise.
|
|
sortedByTime := true
|
|
// Emit the points for each name & tag combination.
|
|
a := make([]{{$v.Name}}Point, 0, len(m))
|
|
for _, k := range keys {
|
|
rp := m[k]
|
|
points := rp.Emitter.Emit()
|
|
for i := len(points)-1; i >= 0; i-- {
|
|
points[i].Name = rp.Name
|
|
if !itr.keepTags {
|
|
points[i].Tags = rp.Tags
|
|
}
|
|
// Set the points time to the interval time if the reducer didn't provide one.
|
|
if points[i].Time == ZeroTime {
|
|
points[i].Time = startTime
|
|
} else {
|
|
sortedByTime = false
|
|
}
|
|
a = append(a, points[i])
|
|
}
|
|
}
|
|
|
|
// Points may be out of order. Perform a stable sort by time if requested.
|
|
if !sortedByTime && itr.opt.Ordered {
|
|
sort.Stable(sort.Reverse({{$v.name}}PointsByTime(a)))
|
|
}
|
|
|
|
return a, nil
|
|
}
|
|
|
|
// {{$k.name}}Stream{{$v.Name}}Iterator streams inputs into the iterator and emits points gradually.
|
|
type {{$k.name}}Stream{{$v.Name}}Iterator struct {
|
|
input *buf{{$k.Name}}Iterator
|
|
create func() ({{$k.Name}}PointAggregator, {{$v.Name}}PointEmitter)
|
|
dims []string
|
|
opt IteratorOptions
|
|
m map[string]*{{$k.name}}Reduce{{$v.Name}}Point
|
|
points []{{$v.Name}}Point
|
|
}
|
|
|
|
// new{{$k.Name}}Stream{{$v.Name}}Iterator returns a new instance of {{$k.name}}Stream{{$v.Name}}Iterator.
|
|
func new{{$k.Name}}Stream{{$v.Name}}Iterator(input {{$k.Name}}Iterator, createFn func() ({{$k.Name}}PointAggregator, {{$v.Name}}PointEmitter), opt IteratorOptions) *{{$k.name}}Stream{{$v.Name}}Iterator {
|
|
return &{{$k.name}}Stream{{$v.Name}}Iterator{
|
|
input: newBuf{{$k.Name}}Iterator(input),
|
|
create: createFn,
|
|
dims: opt.GetDimensions(),
|
|
opt: opt,
|
|
m: make(map[string]*{{$k.name}}Reduce{{$v.Name}}Point),
|
|
}
|
|
}
|
|
|
|
// Stats returns stats from the input iterator.
|
|
func (itr *{{$k.name}}Stream{{$v.Name}}Iterator) Stats() IteratorStats { return itr.input.Stats() }
|
|
|
|
// Close closes the iterator and all child iterators.
|
|
func (itr *{{$k.name}}Stream{{$v.Name}}Iterator) Close() error { return itr.input.Close() }
|
|
|
|
// Next returns the next value for the stream iterator.
|
|
func (itr *{{$k.name}}Stream{{$v.Name}}Iterator) Next() (*{{$v.Name}}Point, error) {
|
|
// Calculate next window if we have no more points.
|
|
if len(itr.points) == 0 {
|
|
var err error
|
|
itr.points, err = itr.reduce()
|
|
if len(itr.points) == 0 {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// Pop next point off the stack.
|
|
p := &itr.points[len(itr.points)-1]
|
|
itr.points = itr.points[:len(itr.points)-1]
|
|
return p, nil
|
|
}
|
|
|
|
// reduce creates and manages aggregators for every point from the input.
|
|
// After aggregating a point, it always tries to emit a value using the emitter.
|
|
func (itr *{{$k.name}}Stream{{$v.Name}}Iterator) reduce() ([]{{$v.Name}}Point, error) {
|
|
for {
|
|
// Read next point.
|
|
curr, err := itr.input.Next()
|
|
if curr == nil {
|
|
// Close all of the aggregators to flush any remaining points to emit.
|
|
var points []{{$v.Name}}Point
|
|
for _, rp := range itr.m {
|
|
if aggregator, ok := rp.Aggregator.(io.Closer); ok {
|
|
if err := aggregator.Close(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
pts := rp.Emitter.Emit()
|
|
if len(pts) == 0 {
|
|
continue
|
|
}
|
|
|
|
for i := range pts {
|
|
pts[i].Name = rp.Name
|
|
pts[i].Tags = rp.Tags
|
|
}
|
|
points = append(points, pts...)
|
|
}
|
|
}
|
|
|
|
// Eliminate the aggregators and emitters.
|
|
itr.m = nil
|
|
return points, nil
|
|
} else if err != nil {
|
|
return nil, err
|
|
} else if curr.Nil {
|
|
continue
|
|
}
|
|
tags := curr.Tags.Subset(itr.dims)
|
|
|
|
id := curr.Name
|
|
if len(tags.m) > 0 {
|
|
id += "\x00" + tags.ID()
|
|
}
|
|
|
|
// Retrieve the aggregator for this name/tag combination or create one.
|
|
rp := itr.m[id]
|
|
if rp == nil {
|
|
aggregator, emitter := itr.create()
|
|
rp = &{{$k.name}}Reduce{{.Name}}Point{
|
|
Name: curr.Name,
|
|
Tags: tags,
|
|
Aggregator: aggregator,
|
|
Emitter: emitter,
|
|
}
|
|
itr.m[id] = rp
|
|
}
|
|
rp.Aggregator.Aggregate{{$k.Name}}(curr)
|
|
|
|
// Attempt to emit points from the aggregator.
|
|
points := rp.Emitter.Emit()
|
|
if len(points) == 0 {
|
|
continue
|
|
}
|
|
|
|
for i := range points {
|
|
points[i].Name = rp.Name
|
|
points[i].Tags = rp.Tags
|
|
}
|
|
return points, nil
|
|
}
|
|
}
|
|
|
|
// {{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprIterator executes a function to modify an existing point
|
|
// for every output of the input iterator.
|
|
type {{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprIterator struct {
|
|
left *buf{{$k.Name}}Iterator
|
|
right *buf{{$k.Name}}Iterator
|
|
fn {{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprFunc
|
|
points []{{$k.Name}}Point // must be size 2
|
|
storePrev bool
|
|
}
|
|
|
|
func new{{$k.Name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprIterator(left, right {{$k.Name}}Iterator, opt IteratorOptions, fn func(a, b {{$k.Type}}) {{$v.Type}}) *{{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprIterator {
|
|
var points []{{$k.Name}}Point
|
|
switch opt.Fill {
|
|
case NullFill, PreviousFill:
|
|
points = []{{$k.Name}}Point{ {Nil: true}, {Nil: true} }
|
|
case NumberFill:
|
|
value := castTo{{$k.Name}}(opt.FillValue)
|
|
points = []{{$k.Name}}Point{ {Value: value}, {Value: value} }
|
|
}
|
|
return &{{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprIterator{
|
|
left: newBuf{{$k.Name}}Iterator(left),
|
|
right: newBuf{{$k.Name}}Iterator(right),
|
|
points: points,
|
|
fn: fn,
|
|
storePrev: opt.Fill == PreviousFill,
|
|
}
|
|
}
|
|
|
|
func (itr *{{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprIterator) Stats() IteratorStats {
|
|
stats := itr.left.Stats()
|
|
stats.Add(itr.right.Stats())
|
|
return stats
|
|
}
|
|
|
|
func (itr *{{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprIterator) Close() error {
|
|
itr.left.Close()
|
|
itr.right.Close()
|
|
return nil
|
|
}
|
|
|
|
func (itr *{{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprIterator) Next() (*{{$v.Name}}Point, error) {
|
|
for {
|
|
a, b, err := itr.next()
|
|
if err != nil || (a == nil && b == nil) {
|
|
return nil, err
|
|
}
|
|
|
|
// If any of these are nil and we are using fill(none), skip these points.
|
|
if (a == nil || a.Nil || b == nil || b.Nil) && itr.points == nil {
|
|
continue
|
|
}
|
|
|
|
// If one of the two points is nil, we need to fill it with a fake nil
|
|
// point that has the same name, tags, and time as the other point.
|
|
// There should never be a time when both of these are nil.
|
|
if a == nil {
|
|
p := *b
|
|
a = &p
|
|
a.Value = {{$k.Nil}}
|
|
a.Nil = true
|
|
} else if b == nil {
|
|
p := *a
|
|
b = &p
|
|
b.Value = {{$k.Nil}}
|
|
b.Nil = true
|
|
}
|
|
|
|
// If a value is nil, use the fill values if the fill value is non-nil.
|
|
if a.Nil && !itr.points[0].Nil {
|
|
a.Value = itr.points[0].Value
|
|
a.Nil = false
|
|
}
|
|
if b.Nil && !itr.points[1].Nil {
|
|
b.Value = itr.points[1].Value
|
|
b.Nil = false
|
|
}
|
|
|
|
if itr.storePrev {
|
|
itr.points[0], itr.points[1] = *a, *b
|
|
}
|
|
|
|
{{if eq $k.Name $v.Name}}
|
|
if a.Nil {
|
|
return a, nil
|
|
} else if b.Nil {
|
|
return b, nil
|
|
}
|
|
a.Value = itr.fn(a.Value, b.Value)
|
|
return a, nil
|
|
{{else}}
|
|
p := &{{$v.Name}}Point{
|
|
Name: a.Name,
|
|
Tags: a.Tags,
|
|
Time: a.Time,
|
|
Nil: a.Nil || b.Nil,
|
|
Aggregated: a.Aggregated,
|
|
}
|
|
if !p.Nil {
|
|
p.Value = itr.fn(a.Value, b.Value)
|
|
}
|
|
return p, nil
|
|
{{end}}
|
|
}
|
|
}
|
|
|
|
// next returns the next points within each iterator. If the iterators are
|
|
// uneven, it organizes them so only matching points are returned.
|
|
func (itr *{{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprIterator) next() (a, b *{{$k.Name}}Point, err error) {
|
|
// Retrieve the next value for both the left and right.
|
|
a, err = itr.left.Next()
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
b, err = itr.right.Next()
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// If we have a point from both, make sure that they match each other.
|
|
if a != nil && b != nil {
|
|
if a.Name > b.Name {
|
|
itr.left.unread(a)
|
|
return nil, b, nil
|
|
} else if a.Name < b.Name {
|
|
itr.right.unread(b)
|
|
return a, nil, nil
|
|
}
|
|
|
|
if ltags, rtags := a.Tags.ID(), b.Tags.ID(); ltags > rtags {
|
|
itr.left.unread(a)
|
|
return nil, b, nil
|
|
} else if ltags < rtags {
|
|
itr.right.unread(b)
|
|
return a, nil, nil
|
|
}
|
|
|
|
if a.Time > b.Time {
|
|
itr.left.unread(a)
|
|
return nil, b, nil
|
|
} else if a.Time < b.Time {
|
|
itr.right.unread(b)
|
|
return a, nil, nil
|
|
}
|
|
}
|
|
return a, b, nil
|
|
}
|
|
|
|
// {{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprFunc creates or modifies a point by combining two
|
|
// points. The point passed in may be modified and returned rather than
|
|
// allocating a new point if possible. One of the points may be nil, but at
|
|
// least one of the points will be non-nil.
|
|
type {{$k.name}}{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}ExprFunc func(a, b {{$k.Type}}) {{$v.Type}}
|
|
{{end}}
|
|
|
|
// {{$k.name}}TransformIterator executes a function to modify an existing point for every
|
|
// output of the input iterator.
|
|
type {{$k.name}}TransformIterator struct {
|
|
input {{$k.Name}}Iterator
|
|
fn {{$k.name}}TransformFunc
|
|
}
|
|
|
|
// Stats returns stats from the input iterator.
|
|
func (itr *{{$k.name}}TransformIterator) Stats() IteratorStats { return itr.input.Stats() }
|
|
|
|
// Close closes the iterator and all child iterators.
|
|
func (itr *{{$k.name}}TransformIterator) Close() error { return itr.input.Close() }
|
|
|
|
// Next returns the minimum value for the next available interval.
|
|
func (itr *{{$k.name}}TransformIterator) Next() (*{{$k.Name}}Point, error) {
|
|
p, err := itr.input.Next()
|
|
if err != nil {
|
|
return nil, err
|
|
} else if p != nil {
|
|
p = itr.fn(p)
|
|
}
|
|
return p, nil
|
|
}
|
|
|
|
// {{$k.name}}TransformFunc creates or modifies a point.
|
|
// The point passed in may be modified and returned rather than allocating a
|
|
// new point if possible.
|
|
type {{$k.name}}TransformFunc func(p *{{$k.Name}}Point) *{{$k.Name}}Point
|
|
|
|
// {{$k.name}}BoolTransformIterator executes a function to modify an existing point for every
|
|
// output of the input iterator.
|
|
type {{$k.name}}BoolTransformIterator struct {
|
|
input {{$k.Name}}Iterator
|
|
fn {{$k.name}}BoolTransformFunc
|
|
}
|
|
|
|
// Stats returns stats from the input iterator.
|
|
func (itr *{{$k.name}}BoolTransformIterator) Stats() IteratorStats { return itr.input.Stats() }
|
|
|
|
// Close closes the iterator and all child iterators.
|
|
func (itr *{{$k.name}}BoolTransformIterator) Close() error { return itr.input.Close() }
|
|
|
|
// Next returns the minimum value for the next available interval.
|
|
func (itr *{{$k.name}}BoolTransformIterator) Next() (*BooleanPoint, error) {
|
|
p, err := itr.input.Next()
|
|
if err != nil {
|
|
return nil, err
|
|
} else if p != nil {
|
|
return itr.fn(p), nil
|
|
}
|
|
return nil, nil
|
|
}
|
|
|
|
// {{$k.name}}BoolTransformFunc creates or modifies a point.
|
|
// The point passed in may be modified and returned rather than allocating a
|
|
// new point if possible.
|
|
type {{$k.name}}BoolTransformFunc func(p *{{$k.Name}}Point) *BooleanPoint
|
|
|
|
// {{$k.name}}DedupeIterator only outputs unique points.
|
|
// This differs from the DistinctIterator in that it compares all aux fields too.
|
|
// This iterator is relatively inefficient and should only be used on small
|
|
// datasets such as meta query results.
|
|
type {{$k.name}}DedupeIterator struct {
|
|
input {{$k.Name}}Iterator
|
|
m map[string]struct{} // lookup of points already sent
|
|
}
|
|
|
|
type {{$k.name}}IteratorMapper struct {
|
|
e *Emitter
|
|
buf []interface{}
|
|
driver IteratorMap // which iterator to use for the primary value, can be nil
|
|
fields []IteratorMap // which iterator to use for an aux field
|
|
point {{$k.Name}}Point
|
|
}
|
|
|
|
func new{{$k.Name}}IteratorMapper(itrs []Iterator, driver IteratorMap, fields []IteratorMap, opt IteratorOptions) *{{$k.name}}IteratorMapper {
|
|
e := NewEmitter(itrs, opt.Ascending, 0)
|
|
e.OmitTime = true
|
|
return &{{$k.name}}IteratorMapper{
|
|
e: e,
|
|
buf: make([]interface{}, len(itrs)),
|
|
driver: driver,
|
|
fields: fields,
|
|
point: {{$k.Name}}Point{
|
|
Aux: make([]interface{}, len(fields)),
|
|
},
|
|
}
|
|
}
|
|
|
|
func (itr *{{$k.name}}IteratorMapper) Next() (*{{$k.Name}}Point, error) {
|
|
t, name, tags, err := itr.e.loadBuf()
|
|
if err != nil || t == ZeroTime {
|
|
return nil, err
|
|
}
|
|
itr.point.Time = t
|
|
itr.point.Name = name
|
|
itr.point.Tags = tags
|
|
|
|
itr.e.readInto(t, name, tags, itr.buf)
|
|
if itr.driver != nil {
|
|
if v := itr.driver.Value(tags, itr.buf); v != nil {
|
|
if v, ok := v.({{$k.Type}}); ok {
|
|
itr.point.Value = v
|
|
itr.point.Nil = false
|
|
} else {
|
|
itr.point.Value = {{$k.Nil}}
|
|
itr.point.Nil = true
|
|
}
|
|
} else {
|
|
itr.point.Value = {{$k.Nil}}
|
|
itr.point.Nil = true
|
|
}
|
|
}
|
|
for i, f := range itr.fields {
|
|
itr.point.Aux[i] = f.Value(tags, itr.buf)
|
|
}
|
|
return &itr.point, nil
|
|
}
|
|
|
|
func (itr *{{$k.name}}IteratorMapper) Stats() IteratorStats {
|
|
stats := IteratorStats{}
|
|
for _, itr := range itr.e.itrs {
|
|
stats.Add(itr.Stats())
|
|
}
|
|
return stats
|
|
}
|
|
|
|
func (itr *{{$k.name}}IteratorMapper) Close() error {
|
|
return itr.e.Close()
|
|
}
|
|
|
|
type {{$k.name}}FilterIterator struct {
|
|
input {{$k.Name}}Iterator
|
|
cond Expr
|
|
opt IteratorOptions
|
|
m map[string]interface{}
|
|
}
|
|
|
|
func new{{$k.Name}}FilterIterator(input {{$k.Name}}Iterator, cond Expr, opt IteratorOptions) {{$k.Name}}Iterator {
|
|
// Strip out time conditions from the WHERE clause.
|
|
// TODO(jsternberg): This should really be done for us when creating the IteratorOptions struct.
|
|
n := RewriteFunc(CloneExpr(cond), func(n Node) Node {
|
|
switch n := n.(type) {
|
|
case *BinaryExpr:
|
|
if n.LHS.String() == "time" {
|
|
return &BooleanLiteral{Val: true}
|
|
}
|
|
}
|
|
return n
|
|
})
|
|
|
|
cond, _ = n.(Expr)
|
|
if cond == nil {
|
|
return input
|
|
} else if n, ok := cond.(*BooleanLiteral); ok && n.Val {
|
|
return input
|
|
}
|
|
|
|
return &{{$k.name}}FilterIterator{
|
|
input: input,
|
|
cond: cond,
|
|
opt: opt,
|
|
m: make(map[string]interface{}),
|
|
}
|
|
}
|
|
|
|
func (itr *{{$k.name}}FilterIterator) Stats() IteratorStats { return itr.input.Stats() }
|
|
func (itr *{{$k.name}}FilterIterator) Close() error { return itr.input.Close() }
|
|
|
|
func (itr *{{$k.name}}FilterIterator) Next() (*{{$k.Name}}Point, error) {
|
|
for {
|
|
p, err := itr.input.Next()
|
|
if err != nil || p == nil {
|
|
return nil, err
|
|
}
|
|
|
|
for i, ref := range itr.opt.Aux {
|
|
itr.m[ref.Val] = p.Aux[i]
|
|
}
|
|
for k, v := range p.Tags.KeyValues() {
|
|
itr.m[k] = v
|
|
}
|
|
|
|
if !EvalBool(itr.cond, itr.m) {
|
|
continue
|
|
}
|
|
return p, nil
|
|
}
|
|
}
|
|
|
|
// new{{$k.Name}}DedupeIterator returns a new instance of {{$k.name}}DedupeIterator.
|
|
func new{{$k.Name}}DedupeIterator(input {{$k.Name}}Iterator) *{{$k.name}}DedupeIterator {
|
|
return &{{$k.name}}DedupeIterator{
|
|
input: input,
|
|
m: make(map[string]struct{}),
|
|
}
|
|
}
|
|
|
|
// Stats returns stats from the input iterator.
|
|
func (itr *{{$k.name}}DedupeIterator) Stats() IteratorStats { return itr.input.Stats() }
|
|
|
|
// Close closes the iterator and all child iterators.
|
|
func (itr *{{$k.name}}DedupeIterator) Close() error { return itr.input.Close() }
|
|
|
|
// Next returns the next unique point from the input iterator.
|
|
func (itr *{{$k.name}}DedupeIterator) Next() (*{{$k.Name}}Point, error) {
|
|
for {
|
|
// Read next point.
|
|
p, err := itr.input.Next()
|
|
if p == nil || err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Serialize to bytes to store in lookup.
|
|
buf, err := proto.Marshal(encode{{$k.Name}}Point(p))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// If the point has already been output then move to the next point.
|
|
if _, ok := itr.m[string(buf)]; ok {
|
|
continue
|
|
}
|
|
|
|
// Otherwise mark it as emitted and return point.
|
|
itr.m[string(buf)] = struct{}{}
|
|
return p, nil
|
|
}
|
|
}
|
|
|
|
// {{$k.name}}ReaderIterator represents an iterator that streams from a reader.
|
|
type {{$k.name}}ReaderIterator struct {
|
|
r io.Reader
|
|
dec *{{$k.Name}}PointDecoder
|
|
}
|
|
|
|
// new{{$k.Name}}ReaderIterator returns a new instance of {{$k.name}}ReaderIterator.
|
|
func new{{$k.Name}}ReaderIterator(r io.Reader, stats IteratorStats) *{{$k.name}}ReaderIterator {
|
|
dec := New{{$k.Name}}PointDecoder(r)
|
|
dec.stats = stats
|
|
|
|
return &{{$k.name}}ReaderIterator{
|
|
r: r,
|
|
dec: dec,
|
|
}
|
|
}
|
|
|
|
// Stats returns stats about points processed.
|
|
func (itr *{{$k.name}}ReaderIterator) Stats() IteratorStats { return itr.dec.stats }
|
|
|
|
// Close closes the underlying reader, if applicable.
|
|
func (itr *{{$k.name}}ReaderIterator) Close() error {
|
|
if r, ok := itr.r.(io.ReadCloser); ok {
|
|
return r.Close()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Next returns the next point from the iterator.
|
|
func (itr *{{$k.name}}ReaderIterator) Next() (*{{$k.Name}}Point, error) {
|
|
// OPTIMIZE(benbjohnson): Reuse point on iterator.
|
|
|
|
// Unmarshal next point.
|
|
p := &{{$k.Name}}Point{}
|
|
if err := itr.dec.Decode{{$k.Name}}Point(p); err == io.EOF {
|
|
return nil, nil
|
|
} else if err != nil {
|
|
return nil, err
|
|
}
|
|
return p, nil
|
|
}
|
|
{{end}}
|
|
|
|
|
|
// IteratorEncoder is an encoder for encoding an iterator's points to w.
|
|
type IteratorEncoder struct {
|
|
w io.Writer
|
|
|
|
// Frequency with which stats are emitted.
|
|
StatsInterval time.Duration
|
|
}
|
|
|
|
// NewIteratorEncoder encodes an iterator's points to w.
|
|
func NewIteratorEncoder(w io.Writer) *IteratorEncoder {
|
|
return &IteratorEncoder{
|
|
w: w,
|
|
|
|
StatsInterval: DefaultStatsInterval,
|
|
}
|
|
}
|
|
|
|
// EncodeIterator encodes and writes all of itr's points to the underlying writer.
|
|
func (enc *IteratorEncoder) EncodeIterator(itr Iterator) error {
|
|
switch itr := itr.(type) {
|
|
case FloatIterator:
|
|
return enc.encodeFloatIterator(itr)
|
|
case IntegerIterator:
|
|
return enc.encodeIntegerIterator(itr)
|
|
case StringIterator:
|
|
return enc.encodeStringIterator(itr)
|
|
case BooleanIterator:
|
|
return enc.encodeBooleanIterator(itr)
|
|
default:
|
|
panic(fmt.Sprintf("unsupported iterator for encoder: %T", itr))
|
|
}
|
|
}
|
|
|
|
{{range .}}
|
|
// encode{{.Name}}Iterator encodes all points from itr to the underlying writer.
|
|
func (enc *IteratorEncoder) encode{{.Name}}Iterator(itr {{.Name}}Iterator) error {
|
|
ticker := time.NewTicker(enc.StatsInterval)
|
|
defer ticker.Stop()
|
|
|
|
// Emit initial stats.
|
|
if err := enc.encodeStats(itr.Stats()); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Continually stream points from the iterator into the encoder.
|
|
penc := New{{.Name}}PointEncoder(enc.w)
|
|
for {
|
|
// Emit stats periodically.
|
|
select {
|
|
case <-ticker.C:
|
|
if err := enc.encodeStats(itr.Stats()); err != nil {
|
|
return err
|
|
}
|
|
default:
|
|
}
|
|
|
|
// Retrieve the next point from the iterator.
|
|
p, err := itr.Next()
|
|
if err != nil {
|
|
return err
|
|
} else if p == nil {
|
|
break
|
|
}
|
|
|
|
// Write the point to the point encoder.
|
|
if err := penc.Encode{{.Name}}Point(p); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Emit final stats.
|
|
if err := enc.encodeStats(itr.Stats()); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
{{end}}
|
|
|
|
// encode a stats object in the point stream.
|
|
func (enc *IteratorEncoder) encodeStats(stats IteratorStats) error {
|
|
buf, err := proto.Marshal(&internal.Point{
|
|
Name: proto.String(""),
|
|
Tags: proto.String(""),
|
|
Time: proto.Int64(0),
|
|
Nil: proto.Bool(false),
|
|
|
|
Stats: encodeIteratorStats(&stats),
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := binary.Write(enc.w, binary.BigEndian, uint32(len(buf))); err != nil {
|
|
return err
|
|
}
|
|
if _, err := enc.w.Write(buf); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
{{end}}
|