@@ -4,8 +4,10 @@ import (
44 "context"
55 "encoding/json"
66 "fmt"
7+ "sort"
78 "strconv"
89 "strings"
10+ "sync"
911 "time"
1012
1113 "github.com/NeedleInAJayStack/haystack"
@@ -164,14 +166,21 @@ func (datasource *Datasource) query(ctx context.Context, pCtx backend.PluginCont
164166 return response
165167
166168 case "hisReadFilter" :
167- points , readErr := datasource .read (model .HisReadFilter , variables )
169+ pointsGrid , readErr := datasource .read (model .HisReadFilter + " and hisStart" , variables )
168170 if readErr != nil {
169171 log .DefaultLogger .Error (readErr .Error ())
170172 return backend .ErrDataResponse (backend .StatusBadRequest , fmt .Sprintf ("HisReadFilter failure: %v" , readErr .Error ()))
171173 }
174+ points := pointsGrid .Rows ()
175+ recLimit := 300
176+ if len (points ) > recLimit {
177+ errMsg := fmt .Sprintf ("Query exceeded record limit of %d: %d records" , recLimit , len (points ))
178+ log .DefaultLogger .Error (errMsg )
179+ return backend .ErrDataResponse (backend .StatusBadRequest , errMsg )
180+ }
172181
173- grids := []haystack. Grid {}
174- for _ , point := range points . Rows ( ) {
182+ // Function to read a single point and send it to a channel.
183+ readPoint := func ( point haystack. Row , hisReadChannel chan haystack. Grid , wg * sync. WaitGroup ) {
175184 id := point .Get ("id" )
176185 var ref haystack.Ref
177186 switch id .(type ) {
@@ -180,15 +189,38 @@ func (datasource *Datasource) query(ctx context.Context, pCtx backend.PluginCont
180189 default :
181190 errMsg := fmt .Sprintf ("id is not a ref: %v" , id )
182191 log .DefaultLogger .Error (errMsg )
183- return backend . ErrDataResponse ( backend . StatusBadRequest , errMsg )
192+ hisReadChannel <- haystack . EmptyGrid ( )
184193 }
185194 hisRead , err := datasource .hisRead (ref , query .TimeRange )
186195 if err != nil {
187196 log .DefaultLogger .Error (err .Error ())
188- return backend . ErrDataResponse ( backend . StatusBadRequest , fmt . Sprintf ( "HisReadFilter failure: %v" , err . Error ()) )
197+ hisReadChannel <- haystack . EmptyGrid ( )
189198 }
190- grids = append (grids , hisRead )
199+ hisReadChannel <- hisRead
200+ wg .Done ()
191201 }
202+
203+ // Start a goroutine to collect all the grids into a slice.
204+ hisReadChannel := make (chan haystack.Grid )
205+ combinedChannel := make (chan []haystack.Grid )
206+ go func () {
207+ grids := []haystack.Grid {}
208+ for grid := range hisReadChannel {
209+ grids = append (grids , grid )
210+ }
211+ combinedChannel <- grids
212+ }()
213+
214+ // Read all the points in parallel using goroutines.
215+ var wg sync.WaitGroup
216+ wg .Add (len (points ))
217+ for _ , point := range points {
218+ go readPoint (point , hisReadChannel , & wg )
219+ }
220+ wg .Wait ()
221+ close (hisReadChannel )
222+
223+ grids := <- combinedChannel
192224 response := responseFromGrids (grids )
193225 // Make the display name on the "val" fields the names of the points.
194226 for _ , frame := range response .Frames {
@@ -213,19 +245,25 @@ func (datasource *Datasource) query(ctx context.Context, pCtx backend.PluginCont
213245 }
214246}
215247
248+ // Creates a response from the input grids. The frames in the result are sorted by display name.
216249func responseFromGrids (grids []haystack.Grid ) backend.DataResponse {
217- var response backend. DataResponse
250+ frames := data. Frames {}
218251 for _ , grid := range grids {
219252 frame , frameErr := dataFrameFromGrid (grid )
220253 if frameErr != nil {
221254 log .DefaultLogger .Error (frameErr .Error ())
222255 return backend .ErrDataResponse (backend .StatusBadRequest , fmt .Sprintf ("Frame conversion failure: %v" , frameErr .Error ()))
223256 }
224-
225- // add the frames to the response.
226- response .Frames = append (response .Frames , frame )
227- response .Status = backend .StatusOK
257+ frames = append (frames , frame )
228258 }
259+
260+ sort .Slice (frames , func (i , j int ) bool {
261+ return frames [i ].Name < frames [j ].Name
262+ })
263+
264+ var response backend.DataResponse
265+ response .Frames = frames
266+ response .Status = backend .StatusOK
229267 return response
230268}
231269
0 commit comments