@@ -47,11 +47,11 @@ type PushService interface {
4747 PushBatch (ctx context.Context , req * distirbutormodel.PushRequest ) error
4848}
4949
50- func NewOTLPIngestHandler (cfg server.Config , svc PushService , l log.Logger , me bool ) Handler {
50+ func NewOTLPIngestHandler (cfg server.Config , svc PushService , l log.Logger , multitenancyEnabled bool ) Handler {
5151 h := & ingestHandler {
5252 svc : svc ,
5353 log : l ,
54- multitenancyEnabled : me ,
54+ multitenancyEnabled : multitenancyEnabled ,
5555 }
5656
5757 grpcServer := newGrpcServer (cfg )
@@ -108,9 +108,47 @@ func (h *ingestHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
108108 h .handler .ServeHTTP (w , r )
109109}
110110
111+ // extractTenantIDFromHTTPRequest extracts the tenant ID from HTTP request headers.
112+ // If multitenancy is disabled, it injects the default tenant ID.
113+ // Returns a context with the tenant ID injected.
114+ func (h * ingestHandler ) extractTenantIDFromHTTPRequest (r * http.Request ) (context.Context , error ) {
115+ if ! h .multitenancyEnabled {
116+ return user .InjectOrgID (r .Context (), tenant .DefaultTenantID ), nil
117+ }
118+
119+ _ , ctx , err := user .ExtractOrgIDFromHTTPRequest (r )
120+ if err != nil {
121+ return nil , err
122+ }
123+ return ctx , nil
124+ }
125+
126+ // extractTenantIDFromGRPCRequest extracts the tenant ID from a gRPC request context.
127+ // If multitenancy is disabled, it injects the default tenant ID.
128+ // Returns a context with the tenant ID injected.
129+ func (h * ingestHandler ) extractTenantIDFromGRPCRequest (ctx context.Context ) (context.Context , error ) {
130+ // TODO: ideally should be merged with function above
131+ if ! h .multitenancyEnabled {
132+ return user .InjectOrgID (ctx , tenant .DefaultTenantID ), nil
133+ }
134+
135+ _ , ctx , err := user .ExtractFromGRPCRequest (ctx )
136+ if err != nil {
137+ return nil , err
138+ }
139+ return ctx , nil
140+ }
141+
111142func (h * ingestHandler ) handleHTTPRequest (w http.ResponseWriter , r * http.Request ) {
112143 defer r .Body .Close ()
113144
145+ ctx , err := h .extractTenantIDFromHTTPRequest (r )
146+ if err != nil {
147+ level .Error (h .log ).Log ("msg" , "failed to extract tenant ID from HTTP request" , "err" , err )
148+ http .Error (w , "Failed to extract tenant ID from HTTP request" , http .StatusUnauthorized )
149+ return
150+ }
151+
114152 // Read the request body - we need to read it all for protobuf unmarshaling
115153 // Note: Protobuf wire format requires reading the entire message to determine field boundaries
116154 body , err := io .ReadAll (r .Body )
@@ -137,10 +175,8 @@ func (h *ingestHandler) handleHTTPRequest(w http.ResponseWriter, r *http.Request
137175 }
138176 }
139177
140- ctx := r .Context ()
141- // Process the request using the existing Export method
142- // Injects multitenancy info into context if needed
143- resp , err := h .Export (ctx , req )
178+ // Process the request using the existing export method
179+ resp , err := h .export (ctx , req )
144180 if err != nil {
145181 level .Error (h .log ).Log ("msg" , "failed to process profiles" , "err" , err )
146182 // Convert gRPC status to HTTP status
@@ -178,20 +214,23 @@ func (h *ingestHandler) handleHTTPRequest(w http.ResponseWriter, r *http.Request
178214 }
179215}
180216
217+ // Export is the gRPC handler for the ProfilesService Export RPC.
218+ // Extracts tenant ID from gRPC request metadata before processing.
181219func (h * ingestHandler ) Export (ctx context.Context , er * pprofileotlp.ExportProfilesServiceRequest ) (* pprofileotlp.ExportProfilesServiceResponse , error ) {
182- // TODO: @petethepig This logic is copied from util.AuthenticateUser and should be refactored into a common function
183- // Extracts user ID from the request metadata and returns and injects the user ID in the context
184- if ! h .multitenancyEnabled {
185- ctx = user .InjectOrgID (ctx , tenant .DefaultTenantID )
186- } else {
187- var err error
188- _ , ctx , err = user .ExtractFromGRPCRequest (ctx )
189- if err != nil {
190- level .Error (h .log ).Log ("msg" , "failed to extract tenant ID from gRPC request" , "err" , err )
191- return & pprofileotlp.ExportProfilesServiceResponse {}, fmt .Errorf ("failed to extract tenant ID from GRPC request: %w" , err )
192- }
220+ // Extract tenant ID from gRPC request
221+ ctx , err := h .extractTenantIDFromGRPCRequest (ctx )
222+ if err != nil {
223+ level .Error (h .log ).Log ("msg" , "failed to extract tenant ID from gRPC request" , "err" , err )
224+ return & pprofileotlp.ExportProfilesServiceResponse {}, fmt .Errorf ("failed to extract tenant ID from gRPC request: %w" , err )
193225 }
194226
227+ return h .export (ctx , er )
228+ }
229+
230+ // export is the common implementation for processing OTLP profile export requests.
231+ // The context must already have the tenant ID injected before calling this method.
232+ func (h * ingestHandler ) export (ctx context.Context , er * pprofileotlp.ExportProfilesServiceRequest ) (* pprofileotlp.ExportProfilesServiceResponse , error ) {
233+
195234 dc := er .Dictionary
196235 if dc == nil {
197236 return & pprofileotlp.ExportProfilesServiceResponse {}, status .Errorf (codes .InvalidArgument , "missing profile metadata dictionary" )
0 commit comments