Skip to content

Commit 3b38e4d

Browse files
committed
remove distribute lock
1 parent d29473e commit 3b38e4d

File tree

4 files changed

+6
-168
lines changed

4 files changed

+6
-168
lines changed

internal/job/cleaner.go

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"github.com/robfig/cron/v3"
99
"github.com/zeromicro/go-zero/core/logx"
1010
"github.com/zgsm-ai/codebase-indexer/internal/dao/model"
11-
redisstore "github.com/zgsm-ai/codebase-indexer/internal/store/redis"
1211
"github.com/zgsm-ai/codebase-indexer/internal/store/vector"
1312
"github.com/zgsm-ai/codebase-indexer/internal/svc"
1413
"github.com/zgsm-ai/codebase-indexer/internal/types"
@@ -38,25 +37,6 @@ func NewCleaner(ctx context.Context, svcCtx *svc.ServiceContext) (Job, error) {
3837
cr := cron.New() // 创建默认 Cron 实例(支持秒级精度)
3938
// 添加任务(参数:Cron 表达式, 要执行的函数)
4039
_, err := cr.AddFunc(svcCtx.Config.Cleaner.Cron, func() {
41-
// aquice lock
42-
mux, locked, err := svcCtx.DistLock.TryLock(ctx, cleanLockKey, lockTimeout)
43-
if err != nil {
44-
logx.Errorf("cleaner try lock error: %v", err)
45-
return
46-
}
47-
if !locked {
48-
logx.Infof("cleaner lock %s is already locked ,return", cleanLockKey)
49-
return
50-
}
51-
defer func(DistLock redisstore.DistributedLock, ctx context.Context, key string) {
52-
err := DistLock.Unlock(ctx, mux)
53-
if err != nil {
54-
logx.Errorf("cleaner unlock %s error: %v", cleanLockKey, err)
55-
}
56-
logx.Errorf("cleaner unlock successfully.")
57-
}(svcCtx.DistLock, ctx, cleanLockKey)
58-
59-
logx.Infof("cleaner get lock %s successfully, start.", cleanLockKey)
6040

6141
expireDays := time.Duration(svcCtx.Config.Cleaner.CodebaseExpireDays) * 24 * time.Hour
6242
expiredDate := time.Now().Add(-expireDays)

internal/job/index_task.go

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,14 @@ import (
55
"fmt"
66
"time"
77

8-
"github.com/go-redsync/redsync/v4"
98
"github.com/zgsm-ai/codebase-indexer/internal/svc"
109
"github.com/zgsm-ai/codebase-indexer/internal/tracer"
1110
"github.com/zgsm-ai/codebase-indexer/internal/types"
1211
)
1312

1413
type IndexTask struct {
15-
SvcCtx *svc.ServiceContext
16-
LockMux *redsync.Mutex
17-
Params *IndexTaskParams
14+
SvcCtx *svc.ServiceContext
15+
Params *IndexTaskParams
1816
}
1917

2018
type IndexTaskParams struct {
@@ -33,12 +31,6 @@ func (i *IndexTask) Run(ctx context.Context) (embedTaskOk bool) {
3331
start := time.Now()
3432
tracer.WithTrace(ctx).Infof("index task started")
3533

36-
// 解锁
37-
defer func() {
38-
if err := i.SvcCtx.DistLock.Unlock(ctx, i.LockMux); err != nil {
39-
tracer.WithTrace(ctx).Errorf("index task unlock failed, key %s, err:%v", i.LockMux.Name(), err)
40-
}
41-
}()
4234
// 启动嵌入任务
4335
embedErr := i.buildEmbedding(ctx)
4436
if embedErr != nil {

internal/logic/embedding_task.go

Lines changed: 4 additions & 131 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,8 @@ import (
1313
"os"
1414
"strings"
1515

16-
"github.com/go-redsync/redsync/v4"
1716
"github.com/zgsm-ai/codebase-indexer/internal/dao/model"
1817
"github.com/zgsm-ai/codebase-indexer/internal/job"
19-
"github.com/zgsm-ai/codebase-indexer/internal/parser"
2018
"github.com/zgsm-ai/codebase-indexer/internal/store/vector"
2119
"github.com/zgsm-ai/codebase-indexer/internal/tracer"
2220
"github.com/zgsm-ai/codebase-indexer/internal/types"
@@ -102,17 +100,6 @@ func (l *TaskLogic) SubmitTask(req *types.IndexTaskRequest, r *http.Request) (re
102100

103101
ctx := context.WithValue(l.ctx, tracer.Key, tracer.RequestTraceId(int(codebase.ID)))
104102

105-
// 获取分布式锁
106-
// mux, err := l.acquireTaskLock(ctx, codebase.ID)
107-
l.Logger.Infof("开始获取分布式锁 - RequestId: %s", req.RequestId)
108-
mux, err := l.acquireTaskLock(ctx, req.RequestId)
109-
if err != nil {
110-
l.Logger.Errorf("获取分布式锁失败 - RequestId: %s, 错误: %v", req.RequestId, err)
111-
return nil, err
112-
}
113-
l.Logger.Infof("获取分布式锁成功 - RequestId: %s", req.RequestId)
114-
defer l.svcCtx.DistLock.Unlock(ctx, mux)
115-
116103
// 处理上传的ZIP文件
117104
l.Logger.Infof("开始处理上传的ZIP文件 - RequestId: %s", req.RequestId)
118105
files, fileCount, metadata, err := l.processUploadedZipFile(r)
@@ -274,7 +261,7 @@ func (l *TaskLogic) SubmitTask(req *types.IndexTaskRequest, r *http.Request) (re
274261
l.Logger.Infof("初始化文件处理状态为完成成功 - RequestId: %s", req.RequestId)
275262
} else {
276263
// 文件数量大于0,正常提交索引任务
277-
if err := l.submitIndexTask(ctx, codebase, clientId, req.RequestId, mux, files, metadata); err != nil {
264+
if err := l.submitIndexTask(ctx, codebase, clientId, req.RequestId, files, metadata); err != nil {
278265
l.Logger.Errorf("提交索引任务失败 - RequestId: %s, 错误: %v", req.RequestId, err)
279266
return nil, err
280267
}
@@ -289,19 +276,6 @@ func (l *TaskLogic) validateUploadToken(uploadToken string) error {
289276
return nil
290277
}
291278

292-
// acquireTaskLock 获取任务锁
293-
func (l *TaskLogic) acquireTaskLock(ctx context.Context, codebaseID string) (*redsync.Mutex, error) {
294-
lockKey := fmt.Sprintf("codebase_embedder:task:%s", codebaseID)
295-
296-
mux, locked, err := l.svcCtx.DistLock.TryLock(ctx, lockKey, l.svcCtx.Config.IndexTask.LockTimeout)
297-
if err != nil || !locked {
298-
return nil, fmt.Errorf("failed to acquire lock %s to sumit index task, err:%w", lockKey, err)
299-
}
300-
301-
tracer.WithTrace(ctx).Infof("acquire lock %s successfully, start to submit index task.", lockKey)
302-
return mux, nil
303-
}
304-
305279
// processUploadedZipFile 处理上传的ZIP文件
306280
func (l *TaskLogic) processUploadedZipFile(r *http.Request) (map[string][]byte, int, *types.SyncMetadata, error) {
307281
// 解析multipart表单
@@ -641,13 +615,12 @@ func (l *TaskLogic) updateCodebaseInfo(codebase *model.Codebase, fileCount int,
641615
}
642616

643617
// submitIndexTask 提交索引任务
644-
func (l *TaskLogic) submitIndexTask(ctx context.Context, codebase *model.Codebase, clientId, requestId string, mux *redsync.Mutex, files map[string][]byte, metadata *types.SyncMetadata) error {
618+
func (l *TaskLogic) submitIndexTask(ctx context.Context, codebase *model.Codebase, clientId, requestId string, files map[string][]byte, metadata *types.SyncMetadata) error {
645619
startTime := time.Now()
646620
l.Logger.Infof("开始创建索引任务 - RequestId: %s, CodebaseId: %d, 文件数量: %d", requestId, codebase.ID, len(files))
647621

648622
task := &job.IndexTask{
649-
SvcCtx: l.svcCtx,
650-
LockMux: mux,
623+
SvcCtx: l.svcCtx,
651624
Params: &job.IndexTaskParams{
652625
ClientId: clientId,
653626
CodebaseID: codebase.ID,
@@ -811,112 +784,12 @@ func (l *TaskLogic) executeRenameTasks(ctx context.Context, codebase *model.Code
811784
func (l *TaskLogic) renameFileInVectorDB(ctx context.Context, codebase *model.Codebase, sourcePath, targetPath string) error {
812785
l.Logger.Infof("开始执行向量数据库中的文件重命名 - 源路径: %s, 目标路径: %s", sourcePath, targetPath)
813786

814-
// 由于向量存储接口没有直接的更新方法,我们需要:
815-
// 1. 删除源文件的向量
816-
// 2. 重新插入带有新路径的向量(如果文件内容在ZIP中存在)
817-
818787
// 将文件路径转换为Linux格式(正斜杠)
819788
sourceLinuxPath := strings.ReplaceAll(sourcePath, "\\", "/")
820789
targetLinuxPath := strings.ReplaceAll(targetPath, "\\", "/")
821790

822-
// 构建需要删除的 CodeChunk
823-
sourceChunk := &types.CodeChunk{
824-
CodebaseId: codebase.ID,
825-
CodebasePath: codebase.Path,
826-
FilePath: sourceLinuxPath,
827-
}
828-
829-
// 删除源文件的向量
830-
options := vector.Options{
831-
CodebasePath: codebase.Path,
832-
}
833-
err := l.svcCtx.VectorStore.DeleteCodeChunks(ctx, []*types.CodeChunk{sourceChunk}, options)
834-
if err != nil {
835-
l.Logger.Errorf("删除源文件向量失败 - 路径: %s, 错误: %v", sourceLinuxPath, err)
836-
return fmt.Errorf("failed to delete source file vectors: %w", err)
837-
}
838-
839-
l.Logger.Infof("成功删除源文件向量 - 路径: %s", sourceLinuxPath)
840-
841-
// 检查目标文件是否在上传的文件中存在
842-
if files, ok := l.getUploadedFiles(); ok {
843-
if targetContent, exists := files[targetLinuxPath]; exists {
844-
l.Logger.Infof("目标文件存在于上传的ZIP中,重新创建向量 - 路径: %s", targetLinuxPath)
845-
846-
// 为目标文件创建新的向量
847-
if err := l.createEmbeddingForFile(ctx, codebase, targetLinuxPath, targetContent); err != nil {
848-
l.Logger.Errorf("为目标文件创建向量失败 - 路径: %s, 错误: %v", targetLinuxPath, err)
849-
return fmt.Errorf("failed to create embedding for target file: %w", err)
850-
}
851-
852-
l.Logger.Infof("成功为目标文件创建向量 - 路径: %s", targetLinuxPath)
853-
} else {
854-
l.Logger.Infof("目标文件不在上传的ZIP中,仅删除源文件向量 - 路径: %s", targetLinuxPath)
855-
}
856-
}
857-
858-
return nil
859-
}
860-
861-
// getUploadedFiles 获取上传的文件内容
862-
func (l *TaskLogic) getUploadedFiles() (map[string][]byte, bool) {
863-
if len(l.uploadedFiles) > 0 {
864-
return l.uploadedFiles, true
865-
}
866-
return nil, false
867-
}
868-
869-
// createEmbeddingForFile 为文件创建向量
870-
func (l *TaskLogic) createEmbeddingForFile(ctx context.Context, codebase *model.Codebase, filePath string, content []byte) error {
871-
l.Logger.Infof("开始为文件创建向量 - 路径: %s, 大小: %d bytes", filePath, len(content))
872-
873-
// 创建源文件对象
874-
sourceFile := &types.SourceFile{
875-
CodebaseId: codebase.ID,
876-
CodebasePath: codebase.Path,
877-
CodebaseName: codebase.Name,
878-
Path: filePath,
879-
Content: content,
880-
}
881-
882-
// 使用代码分割器分割文件
883-
chunks, err := l.svcCtx.CodeSplitter.Split(sourceFile)
884-
if err != nil {
885-
l.Logger.Errorf("分割文件失败 - 路径: %s, 错误: %v", filePath, err)
886-
if parser.IsNotSupportedFileError(err) {
887-
l.Logger.Infof("文件类型不支持,跳过向量化 - 路径: %s", filePath)
888-
return nil
889-
}
890-
return fmt.Errorf("failed to split file: %w", err)
891-
}
892-
893-
if len(chunks) == 0 {
894-
l.Logger.Infof("文件分割后没有产生代码块,跳过向量化 - 路径: %s", filePath)
895-
return nil
896-
}
897-
898-
// 为每个代码块设置代码库信息
899-
for _, chunk := range chunks {
900-
chunk.CodebaseId = codebase.ID
901-
chunk.CodebasePath = codebase.Path
902-
chunk.CodebaseName = codebase.Name
903-
}
904-
905-
// 创建向量存储选项
906-
options := vector.Options{
907-
CodebaseId: codebase.ID,
908-
CodebasePath: codebase.Path,
909-
CodebaseName: codebase.Name,
910-
}
911-
912-
// 插入向量到数据库
913-
err = l.svcCtx.VectorStore.UpsertCodeChunks(ctx, chunks, options)
914-
if err != nil {
915-
l.Logger.Errorf("插入向量失败 - 路径: %s, 错误: %v", filePath, err)
916-
return fmt.Errorf("failed to upsert code chunks: %w", err)
917-
}
791+
l.svcCtx.VectorStore.UpdateCodeChunksDictionary(ctx, codebase.Path, sourceLinuxPath, targetLinuxPath)
918792

919-
l.Logger.Infof("成功为文件创建向量 - 路径: %s, 代码块数量: %d", filePath, len(chunks))
920793
return nil
921794
}
922795

internal/svc/service_context.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ type ServiceContext struct {
1919
Config config.Config
2020
db *gorm.DB
2121
Querier *query.Query
22-
DistLock redisstore.DistributedLock
2322
Embedder vector.Embedder
2423
VectorStore vector.Store
2524
CodeSplitter *embedding.CodeSplitter
@@ -76,11 +75,6 @@ func NewServiceContext(ctx context.Context, c config.Config) (*ServiceContext, e
7675
}
7776
svcCtx.redisClient = client
7877

79-
lock, err := redisstore.NewRedisDistributedLock(client)
80-
if err != nil {
81-
return nil, err
82-
}
83-
8478
embedder, err := vector.NewEmbedder(c.VectorStore.Embedder)
8579
if err != nil {
8680
return nil, err
@@ -108,7 +102,6 @@ func NewServiceContext(ctx context.Context, c config.Config) (*ServiceContext, e
108102

109103
svcCtx.Embedder = embedder
110104
svcCtx.CodeSplitter = splitter
111-
svcCtx.DistLock = lock
112105
// 状态管理器 - 使用配置中的默认过期时间
113106
svcCtx.StatusManager = redisstore.NewStatusManagerWithExpiration(client, c.Redis.DefaultExpiration)
114107

0 commit comments

Comments
 (0)