@@ -19,8 +19,11 @@ import (
1919 promapi "github.com/prometheus/client_golang/api"
2020 promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
2121 "github.com/prometheus/common/model"
22+ "github.com/prometheus/prometheus/model/labels"
2223 "github.com/prometheus/prometheus/model/rulefmt"
2324 "github.com/prometheus/prometheus/prompb"
25+ "github.com/prometheus/prometheus/storage"
26+ "github.com/prometheus/prometheus/storage/remote"
2427 yaml "gopkg.in/yaml.v3"
2528
2629 "github.com/cortexproject/cortex/pkg/ruler"
@@ -153,6 +156,72 @@ func (c *Client) QueryRaw(query string) (*http.Response, []byte, error) {
153156 return c .query (addr )
154157}
155158
159+ // RemoteRead runs a remote read query.
160+ func (c * Client ) RemoteRead (matchers []* labels.Matcher , start , end time.Time , step time.Duration ) (* prompb.ReadResponse , error ) {
161+ startMs := start .UnixMilli ()
162+ endMs := end .UnixMilli ()
163+ stepMs := step .Milliseconds ()
164+
165+ q , err := remote .ToQuery (startMs , endMs , matchers , & storage.SelectHints {
166+ Step : stepMs ,
167+ Start : startMs ,
168+ End : endMs ,
169+ })
170+ if err != nil {
171+ return nil , err
172+ }
173+
174+ req := & prompb.ReadRequest {
175+ Queries : []* prompb.Query {q },
176+ AcceptedResponseTypes : []prompb.ReadRequest_ResponseType {prompb .ReadRequest_STREAMED_XOR_CHUNKS },
177+ }
178+
179+ data , err := proto .Marshal (req )
180+ if err != nil {
181+ return nil , err
182+ }
183+ compressed := snappy .Encode (nil , data )
184+
185+ // Call the remote read API endpoint with a timeout.
186+ httpReqCtx , cancel := context .WithTimeout (context .Background (), 5 * time .Second )
187+ defer cancel ()
188+
189+ httpReq , err := http .NewRequestWithContext (httpReqCtx , "POST" , "http://" + c .querierAddress + "/prometheus/api/v1/read" , bytes .NewReader (compressed ))
190+ if err != nil {
191+ return nil , err
192+ }
193+ httpReq .Header .Set ("X-Scope-OrgID" , "user-1" )
194+ httpReq .Header .Add ("Content-Encoding" , "snappy" )
195+ httpReq .Header .Add ("Accept-Encoding" , "snappy" )
196+ httpReq .Header .Set ("Content-Type" , "application/x-protobuf" )
197+ httpReq .Header .Set ("User-Agent" , "Prometheus/1.8.2" )
198+ httpReq .Header .Set ("X-Prometheus-Remote-Read-Version" , "0.1.0" )
199+
200+ httpResp , err := c .httpClient .Do (httpReq )
201+ if err != nil {
202+ return nil , err
203+ }
204+ if httpResp .StatusCode != http .StatusOK {
205+ return nil , fmt .Errorf ("unexpected status code %d" , httpResp .StatusCode )
206+ }
207+
208+ compressed , err = io .ReadAll (httpResp .Body )
209+ if err != nil {
210+ return nil , err
211+ }
212+
213+ uncompressed , err := snappy .Decode (nil , compressed )
214+ if err != nil {
215+ return nil , err
216+ }
217+
218+ var resp prompb.ReadResponse
219+ if err = proto .Unmarshal (uncompressed , & resp ); err != nil {
220+ return nil , err
221+ }
222+ return & resp , nil
223+ }
224+
156225func (c * Client ) query (addr string ) (* http.Response , []byte , error ) {
157226 ctx , cancel := context .WithTimeout (context .Background (), c .timeout )
158227 defer cancel ()
0 commit comments