package coordinator import ( "io" "time" "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/services/meta" "github.com/influxdata/influxdb/tsdb" ) // IteratorCreator is an interface that combines mapping fields and creating iterators. type IteratorCreator interface { influxql.IteratorCreator influxql.FieldMapper io.Closer } // ShardMapper retrieves and maps shards into an IteratorCreator that can later be // used for executing queries. type ShardMapper interface { MapShards(sources influxql.Sources, opt *influxql.SelectOptions) (IteratorCreator, error) } // LocalShardMapper implements a ShardMapper for local shards. type LocalShardMapper struct { MetaClient interface { ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error) } TSDBStore interface { ShardGroup(ids []uint64) tsdb.ShardGroup } } // MapShards maps the sources to the appropriate shards into an IteratorCreator. func (e *LocalShardMapper) MapShards(sources influxql.Sources, opt *influxql.SelectOptions) (IteratorCreator, error) { a := &LocalShardMapping{ ShardMap: make(map[Source]tsdb.ShardGroup), } if err := e.mapShards(a, sources, opt); err != nil { return nil, err } return a, nil } func (e *LocalShardMapper) mapShards(a *LocalShardMapping, sources influxql.Sources, opt *influxql.SelectOptions) error { for _, s := range sources { switch s := s.(type) { case *influxql.Measurement: source := Source{ Database: s.Database, RetentionPolicy: s.RetentionPolicy, } // Retrieve the list of shards for this database. This list of // shards is always the same regardless of which measurement we are // using. if _, ok := a.ShardMap[source]; !ok { groups, err := e.MetaClient.ShardGroupsByTimeRange(s.Database, s.RetentionPolicy, opt.MinTime, opt.MaxTime) if err != nil { return err } if len(groups) == 0 { a.ShardMap[source] = nil continue } shardIDs := make([]uint64, 0, len(groups[0].Shards)*len(groups)) for _, g := range groups { for _, si := range g.Shards { shardIDs = append(shardIDs, si.ID) } } a.ShardMap[source] = e.TSDBStore.ShardGroup(shardIDs) } case *influxql.SubQuery: if err := e.mapShards(a, s.Statement.Sources, opt); err != nil { return err } } } return nil } // ShardMapper maps data sources to a list of shard information. type LocalShardMapping struct { ShardMap map[Source]tsdb.ShardGroup } func (a *LocalShardMapping) FieldDimensions(m *influxql.Measurement) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) { source := Source{ Database: m.Database, RetentionPolicy: m.RetentionPolicy, } sg := a.ShardMap[source] if sg == nil { return } fields = make(map[string]influxql.DataType) dimensions = make(map[string]struct{}) var measurements []string if m.Regex != nil { measurements = sg.MeasurementsByRegex(m.Regex.Val) } else { measurements = []string{m.Name} } f, d, err := sg.FieldDimensions(measurements) if err != nil { return nil, nil, err } for k, typ := range f { fields[k] = typ } for k := range d { dimensions[k] = struct{}{} } return } func (a *LocalShardMapping) MapType(m *influxql.Measurement, field string) influxql.DataType { source := Source{ Database: m.Database, RetentionPolicy: m.RetentionPolicy, } sg := a.ShardMap[source] if sg == nil { return influxql.Unknown } var names []string if m.Regex != nil { names = sg.MeasurementsByRegex(m.Regex.Val) } else { names = []string{m.Name} } var typ influxql.DataType for _, name := range names { t := sg.MapType(name, field) if typ.LessThan(t) { typ = t } } return typ } func (a *LocalShardMapping) CreateIterator(m *influxql.Measurement, opt influxql.IteratorOptions) (influxql.Iterator, error) { source := Source{ Database: m.Database, RetentionPolicy: m.RetentionPolicy, } sg := a.ShardMap[source] if sg == nil { return nil, nil } if m.Regex != nil { measurements := sg.MeasurementsByRegex(m.Regex.Val) inputs := make([]influxql.Iterator, 0, len(measurements)) if err := func() error { for _, measurement := range measurements { input, err := sg.CreateIterator(measurement, opt) if err != nil { return err } inputs = append(inputs, input) } return nil }(); err != nil { influxql.Iterators(inputs).Close() return nil, err } return influxql.Iterators(inputs).Merge(opt) } return sg.CreateIterator(m.Name, opt) } // Close does nothing for a LocalShardMapping. func (a *LocalShardMapping) Close() error { return nil } // Source contains the database and retention policy source for data. type Source struct { Database string RetentionPolicy string }