Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions coco.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
env:
ES_ENDPOINT: https://localhost:9200
ES_ENDPOINT: http://192.168.3.119:9200
ES_USERNAME: admin
ES_PASSWORD: $[[keystore.ES_PASSWORD]]
WEB_BINDING: 0.0.0.0:9000
Expand Down Expand Up @@ -311,7 +311,7 @@ web:
##background jobs
pipeline:
- name: enrich_documents
auto_start: false
auto_start: true
keep_running: true
processor:
- consumer:
Expand All @@ -323,21 +323,23 @@ pipeline:
group: enriched_documents
fetch_max_messages: 10
processor:
- document_summarization:
model: $[[env.ENRICHMENT_MODEL]]
input_queue: "indexing_documents"
min_input_document_length: 500
- read_file_content: {}
- extract_file_text: {}
# - document_summarization:
# model_provider: deepseek
# model: deepseek-chat
- document_embedding:
model_provider: qianwen
model: text-embedding-v4
output_queue:
name: "enriched_documents"
label:
tag: "enriched"

- name: merge_documents
auto_start: true
keep_running: true
processor:
- indexing_merge:
input_queue: "indexing_documents"
input_queue: "enriched_documents"
idle_timeout_in_seconds: 1
elasticsearch: "prod"
index_name: "coco_document-v2"
Expand All @@ -362,6 +364,7 @@ pipeline:
queues:
type: indexing_merge
tag: "merged"

- name: connector_dispatcher
auto_start: true
keep_running: true
Expand Down
6 changes: 4 additions & 2 deletions core/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ type Document struct {
Title string `json:"title,omitempty" elastic_mapping:"title:{type:text,copy_to:combined_fulltext,fields:{keyword: {type: keyword}, pinyin: {type: text, analyzer: pinyin_analyzer}}}"` // Document title
Summary string `json:"summary,omitempty" elastic_mapping:"summary:{type:text,copy_to:combined_fulltext}"` // Brief summary or description of the document

Lang string `json:"lang,omitempty" elastic_mapping:"lang:{type:keyword,copy_to:combined_fulltext}"` // Language code (e.g., "en", "fr")
Content string `json:"content,omitempty" elastic_mapping:"content:{type:text,copy_to:combined_fulltext}"` // Document content for full-text indexing
Lang string `json:"lang,omitempty" elastic_mapping:"lang:{type:keyword,copy_to:combined_fulltext}"` // Language code (e.g., "en", "fr")
Content string `json:"content,omitempty" elastic_mapping:"content:{type:text,copy_to:combined_fulltext}"` // Document content for full-text indexing
Text string `json:"text,omitempty" elastic_mapping:"text:{type:text,copy_to:combined_fulltext}"` // Document content in text for full-text indexing
Embedding []float64 `json:"embedding,omitempty" elastic_mapping:"embedding:{type:knn_dense_float_vector,knn:{dims:1024}}"`

Icon string `json:"icon,omitempty" elastic_mapping:"icon:{enabled:false}"` // Icon Key, need work with datasource's assets to get the icon url, if it is a full url, then use it directly
Thumbnail string `json:"thumbnail,omitempty" elastic_mapping:"thumbnail:{enabled:false}"` // Thumbnail image URL, for preview purposes
Expand Down
7 changes: 5 additions & 2 deletions modules/assistant/langchain/llm.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@
package langchain

import (
"net/http"
"net/http/httputil"

log "github.com/cihub/seelog"
"github.com/tmc/langchaingo/llms"
"github.com/tmc/langchaingo/llms/ollama"
"github.com/tmc/langchaingo/llms/openai"
"infini.sh/coco/core"
"infini.sh/coco/modules/common"
"infini.sh/framework/core/global"
"net/http"
"net/http/httputil"
)

type LoggingRoundTripper struct {
Expand Down Expand Up @@ -61,12 +62,14 @@ func GetLLM(endpoint, apiType, model, token string, keepalive string) llms.Model
openai.WithToken(token),
openai.WithBaseURL(endpoint),
openai.WithModel(model),
openai.WithEmbeddingModel(model),
)
} else {
llm, err = openai.New(
openai.WithToken(token),
openai.WithBaseURL(endpoint),
openai.WithModel(model),
openai.WithEmbeddingModel(model),
)
}

Expand Down
7 changes: 4 additions & 3 deletions plugins/connectors/local_fs/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,10 @@ func (p *Plugin) saveDocument(ctx *pipeline.Context, currentPath, basePath strin
Source: core.DataSourceReference{ID: datasource.ID, Type: "connector", Name: datasource.Name},
Type: connectors.TypeFile,
Category: filepath.Dir(currentPath),
Content: "", // skip content
URL: currentPath,
Size: int(fileInfo.Size()),
// skip content here, which will be popluated by the `read_file_content` processor
Content: "",
URL: currentPath,
Size: int(fileInfo.Size()),
}
doc.System = datasource.System
if doc.System == nil {
Expand Down
182 changes: 182 additions & 0 deletions plugins/processors/embedding/processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/* Copyright © INFINI LTD. All rights reserved.
* Web: https://infinilabs.com
* Email: hello#infini.ltd */

package embedding

import (
"context"
"fmt"
"time"

log "github.com/cihub/seelog"
"github.com/tmc/langchaingo/embeddings"
"github.com/tmc/langchaingo/textsplitter"
"infini.sh/coco/core"
"infini.sh/coco/modules/assistant/langchain"
"infini.sh/coco/modules/common"
"infini.sh/framework/core/config"
"infini.sh/framework/core/errors"
"infini.sh/framework/core/global"
"infini.sh/framework/core/param"
"infini.sh/framework/core/pipeline"
"infini.sh/framework/core/queue"
"infini.sh/framework/core/util"
)

const ProcessorName = "document_embedding"

type Config struct {
MessageField param.ParaKey `config:"message_field"`
OutputQueue *queue.QueueConfig `config:"output_queue"`
ModelProviderID string `config:"model_provider"`
ModelName string `config:"model"`
MinInputDocumentLength int `config:"min_input_document_length"`
MaxInputDocumentLength int `config:"max_input_document_length"`
}

type DocumentEmbeddingProcessor struct {
config *Config
outputQueue *queue.QueueConfig
}

func init() {
pipeline.RegisterProcessorPlugin(ProcessorName, New)
}

func New(c *config.Config) (pipeline.Processor, error) {
cfg := Config{MessageField: core.PipelineContextDocuments, MinInputDocumentLength: 10, MaxInputDocumentLength: 100000}

if err := c.Unpack(&cfg); err != nil {
log.Error(err)
return nil, fmt.Errorf("failed to unpack the configuration of %s processor: %s", ProcessorName, err)
}

if cfg.MessageField == "" {
cfg.MessageField = "messages"
}

if cfg.ModelProviderID == "" {
panic(errors.New("model_provider can't be empty"))
}
if cfg.ModelName == "" {
panic(errors.New("model can't be empty"))
}

processor := DocumentEmbeddingProcessor{config: &cfg}

if cfg.OutputQueue != nil {
processor.outputQueue = queue.SmartGetOrInitConfig(cfg.OutputQueue)
}

return &processor, nil
}

func (processor *DocumentEmbeddingProcessor) Name() string {
return ProcessorName
}

func (processor *DocumentEmbeddingProcessor) Process(ctx *pipeline.Context) error {
fmt.Printf("DocumentEmbeddingProcessor.Process()\n")
obj := ctx.Get(processor.config.MessageField)

if obj == nil {
log.Warnf("processor [] receives an empty pipeline context", processor.Name())
return nil
}

messages := obj.([]queue.Message)
if global.Env().IsDebug {
log.Tracef("get %v messages from context", len(messages))
}

if len(messages) == 0 {
return nil
}

provider, err := common.GetModelProvider(processor.config.ModelProviderID)
if err != nil {
log.Error("failed to get model provider:", err)
return err
}

llm := langchain.GetLLM(provider.BaseURL, provider.APIType, processor.config.ModelName, provider.APIKey, "")
c := context.Background()

// Check if the LLM client supports embeddings
embedder, ok := llm.(embeddings.EmbedderClient)
if !ok {
log.Errorf("Model [%s/%s] does not support embeddings", processor.config.ModelProviderID, processor.config.ModelName)
return nil
}

for i := range messages {
message := &messages[i]
pop := message.Data

doc := core.Document{}
err := util.FromJSONBytes(pop, &doc)
if err != nil {
log.Error("error on handle document:", i, err)
continue
}

// Skip if text is too short or too long
if len(doc.Text) < processor.config.MinInputDocumentLength {
log.Debugf("skipping document %s: text length %d < min %d", doc.ID, len(doc.Text), processor.config.MinInputDocumentLength)
continue
} else {
log.Info("start embedding doc: ", doc.ID, ",", doc.Title)
start := time.Now()

// Strategy: Truncation (One Document -> One Vector)
// We use the splitter to safely get the first chunk that fits the model's limit.
chunks, err := splitText(doc.Text)
if err != nil {
log.Errorf("failed to split text for doc %s: %v", doc.ID, err)
continue
}

if len(chunks) > 0 {
// Only use the first chunk to generate the embedding
textToEmbed := chunks[0]
if len(chunks) > 1 {
log.Warnf("doc %s is too long (%d chunks), truncating to first chunk for embedding", doc.ID, len(chunks))
}

embeddings, err := embedder.CreateEmbedding(c, []string{textToEmbed})
if err != nil {
log.Errorf("failed to generate embeddings for doc %s: %v", doc.ID, err)
continue
}

if len(embeddings) > 0 {
// Convert []float32 to []float64
var embedding64 []float64
for _, v := range embeddings[0] {
embedding64 = append(embedding64, float64(v))
}
doc.Embedding = embedding64
}
}

// Push the original document (with embedding) to the output queue
message.Data = util.MustToJSONBytes(doc)
if processor.outputQueue != nil {
if err := queue.Push(processor.outputQueue, message.Data); err != nil {
log.Errorf("failed to push document to [%s]'s output queue: %v", processor.Name(), err)
}
}
log.Infof("finished embedding doc %s, elapsed: %v", doc.ID, util.Since(start))
}
}

return nil
}

func splitText(text string) ([]string, error) {
splitter := textsplitter.NewRecursiveCharacter()
splitter.ChunkSize = 8000 // Safe limit for 8192 token limit
splitter.ChunkOverlap = 0 // No overlap needed for truncation
return splitter.SplitText(text)
}
Loading
Loading