"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "pkg/tsdb/cloudwatch/time_series_query.go" between
grafana-8.0.0.tar.gz and grafana-8.0.1.tar.gz

About: Grafana is a visualization tool for monitoring, metric analytics and dashboards for Graphite, InfluxDB, Prometheus and many more.

time_series_query.go  (grafana-8.0.0):time_series_query.go  (grafana-8.0.1)
package cloudwatch package cloudwatch
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/log"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
type responseWrapper struct {
DataResponse *backend.DataResponse
RefId string
}
func (e *cloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, req *ba ckend.QueryDataRequest) (*backend.QueryDataResponse, error) { func (e *cloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, req *ba ckend.QueryDataRequest) (*backend.QueryDataResponse, error) {
plog.Debug("Executing time series query") plog.Debug("Executing time series query")
resp := backend.NewQueryDataResponse() resp := backend.NewQueryDataResponse()
for _, q := range req.Queries { if len(req.Queries) == 0 {
startTime := q.TimeRange.From return nil, fmt.Errorf("request contains no queries")
endTime := q.TimeRange.To }
if !startTime.Before(endTime) {
return nil, fmt.Errorf("invalid time range: start time mu
st be before end time")
}
requestQueriesByRegion, err := e.parseQueries(req, startTime, end
Time)
if err != nil {
return nil, err
}
if len(requestQueriesByRegion) == 0 {
return backend.NewQueryDataResponse(), nil
}
resultChan := make(chan *backend.DataResponse, len(req.Queries))
eg, ectx := errgroup.WithContext(ctx)
for r, q := range requestQueriesByRegion {
requestQueries := q
region := r
eg.Go(func() error {
defer func() {
if err := recover(); err != nil {
plog.Error("Execute Get Metric Da
ta Query Panic", "error", err, "stack", log.Stack(1))
if theErr, ok := err.(error); ok
{
resultChan <- &backend.Da
taResponse{
Error: theErr,
}
}
}
}()
client, err := e.getCWClient(region, req.PluginCo // startTime and endTime are always the same for all queries
ntext) startTime := req.Queries[0].TimeRange.From
if err != nil { endTime := req.Queries[0].TimeRange.To
return err if !startTime.Before(endTime) {
} return nil, fmt.Errorf("invalid time range: start time must be be
fore end time")
}
queries, err := e.transformRequestQueriesToCloudW requestQueriesByRegion, err := e.parseQueries(req.Queries, startTime, end
atchQueries(requestQueries) Time)
if err != nil { if err != nil {
for _, query := range requestQueries { return nil, err
resultChan <- &backend.DataRespon }
se{
Frames: data.Frames{data.
NewFrame(query.RefId)},
Error: err,
}
}
return nil
}
metricDataInput, err := e.buildMetricDataInput(st if len(requestQueriesByRegion) == 0 {
artTime, endTime, queries) return backend.NewQueryDataResponse(), nil
if err != nil { }
return err
}
cloudwatchResponses := make([]*cloudwatchResponse resultChan := make(chan *responseWrapper, len(req.Queries))
, 0) eg, ectx := errgroup.WithContext(ctx)
mdo, err := e.executeRequest(ectx, client, metric for r, q := range requestQueriesByRegion {
DataInput) requestQueries := q
if err != nil { region := r
for _, query := range requestQueries { eg.Go(func() error {
resultChan <- &backend.DataRespon defer func() {
se{ if err := recover(); err != nil {
Frames: data.Frames{data. plog.Error("Execute Get Metric Data Query
NewFrame(query.RefId)}, Panic", "error", err, "stack", log.Stack(1))
Error: err, if theErr, ok := err.(error); ok {
resultChan <- &responseWrapper{
DataResponse: &backend.Da
taResponse{
Error: theErr,
},
} }
} }
return nil
} }
}()
responses, err := e.parseResponse(mdo, queries) client, err := e.getCWClient(region, req.PluginContext)
if err != nil { if err != nil {
for _, query := range requestQueries { return err
resultChan <- &backend.DataRespon }
se{
Frames: data.Frames{data. queries, err := e.transformRequestQueriesToCloudWatchQuer
NewFrame(query.RefId)}, ies(requestQueries)
Error: err, if err != nil {
} return err
} }
return nil
} metricDataInput, err := e.buildMetricDataInput(startTime,
endTime, queries)
if err != nil {
return err
}
mdo, err := e.executeRequest(ectx, client, metricDataInpu
t)
if err != nil {
return err
}
responses, err := e.parseResponse(mdo, queries)
if err != nil {
return err
}
res, err := e.transformQueryResponsesToQueryResult(respon
ses, requestQueries, startTime, endTime)
if err != nil {
return err
}
for refID, queryRes := range res {
resultChan <- &responseWrapper{
DataResponse: queryRes,
RefId: refID,
}
}
return nil
})
}
cloudwatchResponses = append(cloudwatchResponses, if err := eg.Wait(); err != nil {
responses...) return nil, err
res, err := e.transformQueryResponsesToQueryResul }
t(cloudwatchResponses, requestQueries, startTime, endTime) close(resultChan)
if err != nil {
for _, query := range requestQueries {
resultChan <- &backend.DataRespon
se{
Frames: data.Frames{data.
NewFrame(query.RefId)},
Error: err,
}
}
return nil
}
for _, queryRes := range res { for result := range resultChan {
resultChan <- queryRes resp.Responses[result.RefId] = *result.DataResponse
}
return nil
})
}
if err := eg.Wait(); err != nil {
return nil, err
}
close(resultChan)
for result := range resultChan {
resp.Responses[q.RefID] = *result
}
} }
return resp, nil return resp, nil
} }
 End of changes. 13 change blocks. 
116 lines changed or deleted 89 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)