Skip to content

Commit ab49927

Browse files
committed
fix status bug
1 parent 035468c commit ab49927

File tree

4 files changed

+85
-100
lines changed

4 files changed

+85
-100
lines changed

internal/job/embedding.go

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -38,18 +38,6 @@ type fileProcessResult struct {
3838
path string
3939
}
4040

41-
// extractFileOperations 从同步元数据中提取文件操作类型
42-
func extractFileOperations(metadata *types.SyncMetadata) map[string]string {
43-
operations := make(map[string]string)
44-
45-
// 遍历FileList,该字段存储了文件路径到操作类型的映射
46-
for filePath, operation := range metadata.FileList {
47-
operations[filePath] = operation
48-
}
49-
50-
return operations
51-
}
52-
5341
func (t *embeddingProcessor) Process(ctx context.Context) error {
5442
tracer.WithTrace(ctx).Infof("start to execute embedding task, codebase: %s RequestId %s", t.params.CodebaseName, t.params.RequestId)
5543
start := time.Now()

internal/job/index_task.go

Lines changed: 7 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package job
33
import (
44
"context"
55
"fmt"
6-
"sync"
76
"time"
87

98
"github.com/go-redsync/redsync/v4"
@@ -30,7 +29,7 @@ type IndexTaskParams struct {
3029
TotalFiles int // 文件总数
3130
}
3231

33-
func (i *IndexTask) Run(ctx context.Context) (embedTaskOk bool, graphTaskOk bool) {
32+
func (i *IndexTask) Run(ctx context.Context) (embedTaskOk bool) {
3433
start := time.Now()
3534
tracer.WithTrace(ctx).Infof("index task started")
3635

@@ -40,64 +39,20 @@ func (i *IndexTask) Run(ctx context.Context) (embedTaskOk bool, graphTaskOk bool
4039
tracer.WithTrace(ctx).Errorf("index task unlock failed, key %s, err:%v", i.LockMux.Name(), err)
4140
}
4241
}()
43-
44-
var wg sync.WaitGroup
45-
wg.Add(2) // 两个待等待的任务
46-
47-
var embedErr error
48-
4942
// 启动嵌入任务
50-
go func() {
51-
defer wg.Done()
52-
embedErr = i.buildEmbedding(ctx)
53-
if embedErr != nil {
54-
tracer.WithTrace(ctx).Errorf("embedding task failed:%v", embedErr)
55-
}
56-
}()
57-
58-
// 等待两个任务完成
59-
wg.Wait()
43+
embedErr := i.buildEmbedding(ctx)
44+
if embedErr != nil {
45+
tracer.WithTrace(ctx).Errorf("embedding task failed:%v", embedErr)
46+
}
6047

6148
embedTaskOk = embedErr == nil
6249

63-
tracer.WithTrace(ctx).Infof("index task end, cost %d ms. embedding ok? %t, graph ok? %t",
64-
time.Since(start).Milliseconds(), embedTaskOk, graphTaskOk)
50+
tracer.WithTrace(ctx).Infof("index task end, cost %d ms. embedding ok? %t",
51+
time.Since(start).Milliseconds(), embedTaskOk)
6552
return
6653
}
6754

6855
func (i *IndexTask) buildEmbedding(ctx context.Context) error {
69-
70-
// for filePath, operation := range i.Params.Metadata.FileList {
71-
// tracer.WithTrace(ctx).Infof("------------------------------------, %s :%s ms.", filePath, operation)
72-
// }
73-
74-
fileOperations := make(map[string]string)
75-
if i.Params.Metadata != nil {
76-
fileOperations = extractFileOperations(i.Params.Metadata)
77-
}
78-
79-
// tracer.WithTrace(ctx).Infof("------------------------------------, %v ms.", fileOperations)
80-
81-
// 状态修改为处理中
82-
i.SvcCtx.StatusManager.UpdateFileStatus(ctx, i.Params.RequestId,
83-
func(status *types.FileStatusResponseData) {
84-
status.Process = "processing"
85-
status.TotalProgress = 0
86-
var fileStatusItems []types.FileStatusItem
87-
88-
for path, _ := range i.Params.Files {
89-
fileStatusItem := types.FileStatusItem{
90-
Path: path, // 使用当前处理的文件路径,而不是codebasePath
91-
Status: "processing",
92-
Operate: fileOperations[path],
93-
}
94-
fileStatusItems = append(fileStatusItems, fileStatusItem)
95-
}
96-
97-
status.FileList = fileStatusItems
98-
99-
})
100-
10156
start := time.Now()
10257

10358
// 添加日志来跟踪参数

internal/logic/embedding_task.go

Lines changed: 73 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,17 @@ import (
2626
"github.com/zgsm-ai/codebase-indexer/internal/svc"
2727
)
2828

29+
func extractFileOperations(metadata *types.SyncMetadata) map[string]string {
30+
operations := make(map[string]string)
31+
32+
// 遍历FileList,该字段存储了文件路径到操作类型的映射
33+
for filePath, operation := range metadata.FileList {
34+
operations[filePath] = operation
35+
}
36+
37+
return operations
38+
}
39+
2940
type TaskLogic struct {
3041
logx.Logger
3142
ctx context.Context
@@ -130,6 +141,39 @@ func (l *TaskLogic) SubmitTask(req *types.IndexTaskRequest, r *http.Request) (re
130141
l.Logger.Infof("任务分类统计 - 添加: %d, 删除: %d, 修改: %d",
131142
len(addTasks), len(deleteTasks), len(modifyTasks))
132143

144+
/////////////////////////////////////////////////////////////
145+
//初始化任务状态
146+
/////////////////////////////////////////////
147+
148+
fileOperations := make(map[string]string)
149+
if metadata != nil {
150+
fileOperations = extractFileOperations(metadata)
151+
}
152+
153+
// 状态修改为处理中
154+
l.svcCtx.StatusManager.UpdateFileStatus(ctx, req.RequestId,
155+
func(status *types.FileStatusResponseData) {
156+
status.Process = "processing"
157+
status.TotalProgress = 0
158+
var fileStatusItems []types.FileStatusItem
159+
160+
for path, _ := range files {
161+
fileStatusItem := types.FileStatusItem{
162+
Path: path, // 使用当前处理的文件路径,而不是codebasePath
163+
Status: "processing",
164+
Operate: fileOperations[path],
165+
}
166+
fileStatusItems = append(fileStatusItems, fileStatusItem)
167+
}
168+
169+
status.FileList = fileStatusItems
170+
l.Logger.Infof("初始化状态: - RequestId: %s , %v", req.RequestId, status.FileList)
171+
})
172+
173+
/////////////////////////////////////////////////////////////
174+
//执行任务
175+
/////////////////////////////////////////////
176+
133177
// 如果有删除任务,从向量数据库中删除对应的文件
134178
if len(deleteTasks) > 0 {
135179
l.Logger.Infof("开始从向量数据库中删除 %d 个文件", len(deleteTasks))
@@ -152,25 +196,32 @@ func (l *TaskLogic) SubmitTask(req *types.IndexTaskRequest, r *http.Request) (re
152196
}
153197
l.Logger.Infof("更新代码库信息成功 - RequestId: %s", req.RequestId)
154198

155-
// 生成请求ID
156-
requestId := l.generateRequestId(req.RequestId, clientId, codebase.Name)
157-
l.Logger.Infof("生成请求ID - RequestId: %s, 新RequestId: %s", req.RequestId, requestId)
158-
159199
// 提交索引任务
160-
l.Logger.Infof("开始提交索引任务 - RequestId: %s", req.RequestId)
161-
if err := l.submitIndexTask(ctx, codebase, clientId, requestId, mux, files, metadata); err != nil {
162-
l.Logger.Errorf("提交索引任务失败 - RequestId: %s, 错误: %v", req.RequestId, err)
163-
return nil, err
164-
}
165-
l.Logger.Infof("提交索引任务成功 - RequestId: %s", req.RequestId)
200+
l.Logger.Infof("开始提交索引任务 - RequestId: %s, 文件数量: %d", req.RequestId, len(files))
201+
202+
// 检查文件处理个数是否为0,如果为0则标识完成状态,不提交submitIndexTask任务
203+
if len(files) == 0 {
204+
l.Logger.Infof("文件处理个数为0,直接标识完成状态 - RequestId: %s", req.RequestId)
205+
206+
l.svcCtx.StatusManager.UpdateFileStatus(ctx, req.RequestId, func(status *types.FileStatusResponseData) {
207+
status.Process = "processing"
208+
status.TotalProgress = 100
209+
210+
for i, _ := range status.FileList {
211+
status.FileList[i].Status = "completed"
212+
}
213+
214+
})
166215

167-
// 初始化文件处理状态
168-
l.Logger.Infof("开始初始化文件处理状态 - RequestId: %s", req.RequestId)
169-
if err := l.initializeFileStatus(ctx, req.RequestId); err != nil {
170-
l.Logger.Errorf("初始化文件处理状态失败 - RequestId: %s, 错误: %v", req.RequestId, err)
171-
// 不返回错误,继续处理
216+
l.Logger.Infof("初始化文件处理状态为完成成功 - RequestId: %s", req.RequestId)
217+
} else {
218+
// 文件数量大于0,正常提交索引任务
219+
if err := l.submitIndexTask(ctx, codebase, clientId, req.RequestId, mux, files, metadata); err != nil {
220+
l.Logger.Errorf("提交索引任务失败 - RequestId: %s, 错误: %v", req.RequestId, err)
221+
return nil, err
222+
}
223+
l.Logger.Infof("提交索引任务成功 - RequestId: %s", req.RequestId)
172224
}
173-
l.Logger.Infof("初始化文件处理状态成功 - RequestId: %s", req.RequestId)
174225

175226
return &types.IndexTaskResponseData{TaskId: req.RequestId}, nil
176227
}
@@ -469,14 +520,6 @@ func (l *TaskLogic) updateCodebaseInfo(codebase *model.Codebase, fileCount int,
469520
return nil
470521
}
471522

472-
// generateRequestId 生成请求ID
473-
func (l *TaskLogic) generateRequestId(requestId, clientId, codebaseName string) string {
474-
if requestId == "" {
475-
return fmt.Sprintf("%s-%s-%d", clientId, codebaseName, time.Now().Unix())
476-
}
477-
return requestId
478-
}
479-
480523
// submitIndexTask 提交索引任务
481524
func (l *TaskLogic) submitIndexTask(ctx context.Context, codebase *model.Codebase, clientId, requestId string, mux *redsync.Mutex, files map[string][]byte, metadata *types.SyncMetadata) error {
482525
startTime := time.Now()
@@ -497,6 +540,12 @@ func (l *TaskLogic) submitIndexTask(ctx context.Context, codebase *model.Codebas
497540
},
498541
}
499542

543+
runningTasks := l.svcCtx.TaskPool.Running()
544+
taskCapacity := l.svcCtx.TaskPool.Cap()
545+
l.Logger.Infof("任务池状态 - RequestId: %s, 正在运行任务: %d, 任务容量: %d", requestId, runningTasks, taskCapacity)
546+
547+
// 使用任务池提交任务
548+
500549
l.Logger.Infof("开始提交任务到任务池 - RequestId: %s, 超时时间: %v", requestId, l.svcCtx.Config.IndexTask.GraphTask.Timeout)
501550
err := l.svcCtx.TaskPool.Submit(func() {
502551
taskStartTime := time.Now()
@@ -523,18 +572,6 @@ func (l *TaskLogic) submitIndexTask(ctx context.Context, codebase *model.Codebas
523572
return nil
524573
}
525574

526-
// initializeFileStatus 初始化文件处理状态
527-
func (l *TaskLogic) initializeFileStatus(ctx context.Context, requestId string) error {
528-
initialStatus := &types.FileStatusResponseData{
529-
Process: "pending",
530-
TotalProgress: 0,
531-
FileList: []types.FileStatusItem{},
532-
}
533-
534-
// 使用RequestId作为键存储状态
535-
return l.svcCtx.StatusManager.SetFileStatusByRequestId(ctx, requestId, initialStatus)
536-
}
537-
538575
func (l *TaskLogic) initCodebaseIfNotExists(clientId, clientPath, userUid, codebaseName string) (*model.Codebase, error) {
539576
var codebase *model.Codebase
540577
var err error

internal/store/vector/embedder.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,8 @@ func (e *customEmbedder) EmbedCodeChunks(ctx context.Context, chunks []*types.Co
136136
status.Process = "processing"
137137
status.TotalProgress = progress
138138

139+
tracer.WithTrace(ctx).Infof(" requestid %s ,finish before %v", e.requestId, status.FileList)
140+
139141
// 将这批文件的状态添加到FileList中
140142
for _, filePath := range completedFiles {
141143
// 检查文件是否已在FileList中
@@ -157,6 +159,9 @@ func (e *customEmbedder) EmbedCodeChunks(ctx context.Context, chunks []*types.Co
157159
})
158160
}
159161
}
162+
163+
tracer.WithTrace(ctx).Infof(" requestid %s ,finish %v", e.requestId, status.FileList)
164+
160165
})
161166
if err != nil {
162167
tracer.WithTrace(ctx).Errorf("failed to update progress: %v", err)

0 commit comments

Comments
 (0)