@@ -69,6 +69,8 @@ const (
6969
7070 // the path pattern to search for specific artifacts in the debug zip directory
7171 zippedProfilePattern = "nodes/*/*.pprof"
72+ zippedCPUProfilePattern = "nodes/*/cpuprof/*.pprof"
73+ zippedHeapProfilePattern = "nodes/*/heapprof/*.pprof"
7274 zippedLogsPattern = "nodes/*/logs/*"
7375 zippedNodeTableDumpsPattern = "nodes/*/*.txt"
7476
@@ -83,6 +85,7 @@ const (
8385 clusterTag = "cluster"
8486 ddTagsTag = "ddtags"
8587 tableTag = "table"
88+ fileNameTag = "file_name"
8689
8790 // datadog endpoint URLs
8891 datadogProfileUploadURLTmpl = "https://intake.profile.%s/api/v2/profile"
@@ -296,61 +299,147 @@ func validateZipUploadReadiness() error {
296299 return nil
297300}
298301
302+ // profilePathInfo holds the information about a profile file to be uploaded
303+ // in Datadog. This is used to pass the information to the upload workers
304+ // through upload channel.
305+ type profilePathInfo struct {
306+ nodeID string
307+ filepath string
308+ }
309+
299310func uploadZipProfiles (ctx context.Context , uploadID string , debugDirPath string ) error {
300- paths , err := expandPatterns ([]string {path .Join (debugDirPath , zippedProfilePattern )})
311+
312+ paths , err := expandPatterns ([]string {
313+ path .Join (debugDirPath , zippedProfilePattern ),
314+ path .Join (debugDirPath , zippedCPUProfilePattern ),
315+ path .Join (debugDirPath , zippedHeapProfilePattern )})
316+
301317 if err != nil {
302318 return err
303319 }
304320
321+ if len (paths ) == 0 {
322+ return nil
323+ }
324+
325+ var (
326+ noOfWorkers = min (debugZipUploadOpts .maxConcurrentUploads , len (paths ))
327+ uploadChan = make (chan profilePathInfo , noOfWorkers * 2 ) // 2x the number of workers to keep them busy
328+ uploadWG = sync.WaitGroup {}
329+ profileUploadState struct {
330+ syncutil.Mutex
331+ isSingleUploadSucceeded bool
332+ }
333+ // regex to match the profile directories. This is used to extract the node ID.
334+ reProfileDirectories = regexp .MustCompile (`.*(heapprof|cpuprof).*\.pprof$` )
335+ )
336+
337+ markSuccessOnce := sync .OnceFunc (func () {
338+ profileUploadState .isSingleUploadSucceeded = true
339+ })
340+
305341 pathsByNode := make (map [string ][]string )
342+ maxProfilesOfNode := 0
306343 for _ , path := range paths {
307- nodeID := filepath .Base (filepath .Dir (path ))
344+ // extract the node ID from the zippedProfilePattern. If it does not match the
345+ // nodeID (integer) then we assume the path is from zippedCPUProfilePattern
346+ // and zippedHeapProfilePattern and try to extract the node ID from the suffix.
347+ var nodeID = ""
348+ if reProfileDirectories .MatchString (path ) {
349+ nodeID = filepath .Base (filepath .Dir (filepath .Dir (path )))
350+ } else {
351+ nodeID = filepath .Base (filepath .Dir (path ))
352+ }
353+
308354 if _ , ok := pathsByNode [nodeID ]; ! ok {
309355 pathsByNode [nodeID ] = []string {}
310356 }
311357
312358 pathsByNode [nodeID ] = append (pathsByNode [nodeID ], path )
359+ maxProfilesOfNode = max (maxProfilesOfNode , len (pathsByNode [nodeID ]))
313360 }
314361
315- retryOpts := base .DefaultRetryOptions ()
316- retryOpts .MaxRetries = zipUploadRetries
317- var req * http.Request
318- for nodeID , paths := range pathsByNode {
319- for retry := retry .Start (retryOpts ); retry .Next (); {
320- req , err = newProfileUploadReq (
321- ctx , paths , appendUserTags (
322- append (
323- defaultDDTags , makeDDTag (nodeIDTag , nodeID ), makeDDTag (uploadIDTag , uploadID ),
324- makeDDTag (clusterTag , debugZipUploadOpts .clusterName ),
325- ), // system generated tags
326- debugZipUploadOpts .tags ... , // user provided tags
327- ),
328- )
329- if err != nil {
330- continue
331- }
362+ // start the upload pool
363+ noOfWorkers = min (noOfWorkers , maxProfilesOfNode )
364+ for i := 0 ; i < noOfWorkers ; i ++ {
365+ go func () {
366+ for pathInfo := range uploadChan {
367+ profilePath := pathInfo .filepath
368+ nodeID := pathInfo .nodeID
332369
333- if _ , err = doUploadReq (req ); err == nil {
334- break
370+ func () {
371+ defer uploadWG .Done ()
372+ fileName , err := uploadProfile (profilePath , ctx , nodeID , uploadID )
373+ if err != nil {
374+ fmt .Fprintf (os .Stderr , "failed to upload profile %s of node %s: %s\n " , fileName , nodeID , err )
375+ return
376+ }
377+ markSuccessOnce ()
378+ }()
335379 }
336- }
380+ }()
381+ }
337382
338- if err != nil {
339- return fmt .Errorf ("failed to upload profiles of node %s: %w" , nodeID , err )
383+ for nodeID , paths := range pathsByNode {
384+ for _ , path := range paths {
385+ uploadWG .Add (1 )
386+ uploadChan <- profilePathInfo {nodeID : nodeID , filepath : path }
340387 }
341388
342- fmt .Fprintf (os .Stderr , "Uploaded profiles of node %s to datadog (%s)\n " , nodeID , strings .Join (paths , ", " ))
343- fmt .Fprintf (os .Stderr , "Explore the profiles on datadog: " +
344- "https://%s/profiling/explorer?query=%s:%s\n " , ddSiteToHostMap [debugZipUploadOpts .ddSite ],
345- uploadIDTag , uploadID ,
346- )
389+ uploadWG .Wait ()
390+ fmt .Fprintf (os .Stderr , "Uploaded profiles of node %s to datadog\n " , nodeID )
391+ }
392+
393+ uploadWG .Wait ()
394+ close (uploadChan )
395+
396+ if ! profileUploadState .isSingleUploadSucceeded {
397+ return errors .Newf ("failed to upload profiles to Datadog" )
347398 }
348399
400+ toUnixTimestamp := getCurrentTime ().UnixMilli ()
401+ //create timestamp for T-30 days.
402+ fromUnixTimestamp := toUnixTimestamp - (30 * 24 * 60 * 60 * 1000 )
403+
404+ fmt .Fprintf (os .Stderr , "Explore the profiles on datadog: " +
405+ "https://%s/profiling/explorer?query=%s:%s&viz=stream&from_ts=%d&to_ts=%d&live=false\n " , ddSiteToHostMap [debugZipUploadOpts .ddSite ],
406+ uploadIDTag , uploadID , fromUnixTimestamp , toUnixTimestamp ,
407+ )
408+
349409 return nil
350410}
351411
412+ func uploadProfile (
413+ profilePath string , ctx context.Context , nodeID string , uploadID string ,
414+ ) (string , error ) {
415+ fileName := filepath .Base (profilePath )
416+
417+ req , err := newProfileUploadReq (
418+ ctx , profilePath , appendUserTags (
419+ append (
420+ defaultDDTags , makeDDTag (nodeIDTag , nodeID ), makeDDTag (uploadIDTag , uploadID ),
421+ makeDDTag (clusterTag , debugZipUploadOpts .clusterName ), makeDDTag (fileNameTag , fileName ),
422+ ), // system generated tags
423+ debugZipUploadOpts .tags ... , // user provided tags
424+ ),
425+ )
426+
427+ retryOpts := base .DefaultRetryOptions ()
428+ retryOpts .MaxRetries = zipUploadRetries
429+ for retry := retry .Start (retryOpts ); retry .Next (); {
430+ if err != nil {
431+ continue
432+ }
433+
434+ if _ , err = doUploadReq (req ); err == nil {
435+ break
436+ }
437+ }
438+ return fileName , err
439+ }
440+
352441func newProfileUploadReq (
353- ctx context.Context , profilePaths [] string , tags []string ,
442+ ctx context.Context , profilePath string , tags []string ,
354443) (* http.Request , error ) {
355444 var (
356445 body bytes.Buffer
@@ -370,26 +459,36 @@ func newProfileUploadReq(
370459 }
371460 )
372461
373- for _ , profilePath := range profilePaths {
374- fileName := filepath .Base (profilePath )
375- event .Attachments = append (event .Attachments , fileName )
462+ fileName := filepath .Base (profilePath )
376463
377- f , err := mw .CreateFormFile (fileName , fileName )
378- if err != nil {
379- return nil , err
380- }
464+ // Datadog only accepts CPU and heap profiles with filename as "cpu.pprof" or "heap.pprof".
465+ // The cpu profile files has "cpu" in the filename prefix and heap profile files
466+ // has "memprof/heap" in the filename prefix. Hence we are renaming the files accordingly
467+ // so that Datadog can recognize and accept them correctly.
468+ if strings .HasPrefix (fileName , "cpu" ) {
469+ fileName = "cpu.pprof"
470+ } else {
471+ // If the file is not a CPU profile, we assume it is a heap/memory profile.
472+ fileName = "heap.pprof"
473+ }
381474
382- data , err := os .ReadFile (profilePath )
383- if err != nil {
384- return nil , err
385- }
475+ event .Attachments = append (event .Attachments , fileName )
386476
387- if _ , err := f .Write (data ); err != nil {
388- return nil , err
389- }
477+ f , err := mw .CreateFormFile (fileName , fileName )
478+ if err != nil {
479+ return nil , err
480+ }
481+
482+ data , err := os .ReadFile (profilePath )
483+ if err != nil {
484+ return nil , err
485+ }
486+
487+ if _ , err := f .Write (data ); err != nil {
488+ return nil , err
390489 }
391490
392- f , err : = mw .CreatePart (textproto.MIMEHeader {
491+ f , err = mw .CreatePart (textproto.MIMEHeader {
393492 httputil .ContentDispositionHeader : []string {`form-data; name="event"; filename="event.json"` },
394493 httputil .ContentTypeHeader : []string {httputil .JSONContentType },
395494 })
0 commit comments