#!/usr/bin/env python3
importasyncio
importjson
importos
importsys
fromfunctoolsimportpartial
fromlightragimportLightRAG, QueryParam
fromlightrag.llm.ollamaimportollama_model_complete, ollama_embed
fromlightrag.utilsimportsetup_logger, EmbeddingFunc
importnumpyasnp
setup_logger("lightrag", level="INFO")
classLightRAGMCPServer:
def__init__(self):
self.rag=None
self.initialized=False
asyncdefinitialize(self):
working_dir=os.getenv("DATA_DIR", "./rag_storage")
os.makedirs(working_dir, exist_ok=True)
ollama_host=os.getenv("OLLAMA_BASE_URL", "http://127.0.0.1:11434")
num_ctx=int(os.getenv("OLLAMA_NUM_CTX", "8192"))
timeout=int(os.getenv("LLM_TIMEOUT", "600"))
embed_timeout=int(os.getenv("EMBEDDING_TIMEOUT", "120"))
embedding_dim=int(os.getenv("EMBEDDING_DIM", "1024"))
max_embed_tokens=int(os.getenv("MAX_EMBED_TOKENS", "8192"))
llm_model=os.getenv("LLM_MODEL", "my-qwen3-coder:latest")
embed_model=os.getenv("EMBEDDING_MODEL", "bge-m3:latest")
reranker_model=os.getenv("RERANKER_MODEL", "dengcao/Qwen3-Reranker-4B:Q5_K_M")
print(f"Initializing LightRAG: ctx={num_ctx}, timeout={timeout}s, embed_dim={embedding_dim}",
file=sys.stderr, flush=True)
# Ваша кастомная функция (без изменений)
asyncdef_embedding_impl(texts: list[str]) -> np.ndarray:
importollama
importnumpyasnp
embedding_dim=int(os.getenv("EMBEDDING_DIM", "1024"))
embeddings= []
fortextintexts:
cleaned=text.strip()
ifnotcleaned:
embeddings.append([0.0] *embedding_dim)
continue
try:
response=ollama.embeddings(
model=os.getenv("EMBEDDING_MODEL", "bge-m3"),
prompt=cleaned
)
emb=response['embedding']
ifany(np.isnan(emb)) orany(np.isinf(emb)):
print(f"⚠️ NaN in embedding, using zero vector", file=sys.stderr, flush=True)
embeddings.append([0.0] *embedding_dim)
else:
embeddings.append(emb)
exceptExceptionase:
if"nan"instr(e).lower() or"500"instr(e):
print(f"⚠️ Ollama error, using zero vector: {e}", file=sys.stderr, flush=True)
embeddings.append([0.0] *embedding_dim)
else:
raise
returnnp.array(embeddings, dtype=np.float32)
embedding_func=EmbeddingFunc(
embedding_dim=int(os.getenv("EMBEDDING_DIM", "1024")),
max_token_size=int(os.getenv("MAX_EMBED_TOKENS", "8192")),
func=_embedding_impl
)
asyncdefrerank_model_func(query: str, documents: list[str]) -> list[float]:
importollama
scores= []
fordocindocuments:
prompt=f"Query: {query}\nDocument: {doc}\nRelevance score (0-1):"
try:
response=ollama.generate(
model=reranker_model,
prompt=prompt,
host=ollama_host,
options={"temperature": 0, "num_predict": 5}
)
# Парсим число из ответа
score_text=response['response'].strip()
score=float(''.join(cforcinscore_textifc.isdigit() orc=='.'))
score=max(0.0, min(1.0, score)) # ограничиваем [0, 1]
except:
score=0.5# fallback
scores.append(score)
returnscores
self.rag=LightRAG(
working_dir=working_dir,
llm_model_func=ollama_model_complete,
llm_model_name=llm_model,
llm_model_kwargs={
"host": ollama_host,
"system_prompt": (
"Ты — эксперт по извлечению сущностей из текстов на русском языке. "
"Извлекай сущности и отношения строго в формате JSON. "
"КРИТИЧЕСКИ ВАЖНО: Сохраняй пробелы между словами в названиях сущностей. "
"Пиши 'сухое голодание', а не 'сухоеголодание'. "
"Пиши 'время года', а не 'времягода'. "
"Не используй слитное написание, camelCase, snake_case или нижние подчёркивания. "
"Соблюдай естественное русское написание с пробелами. "
"Без <think>, без пояснений, без markdown. Только валидный JSON."
),
"options": {
"num_ctx": num_ctx,
"temperature": 0.1,
"top_p": 0.8,
"top_k": 20,
"num_predict": 4096,
"num_think": 0,
"stop": ["</answer>", "```", "\n\n", "<|end|>"]
},
"timeout": timeout
},
llm_model_max_async=1,
embedding_func=embedding_func,
embedding_func_max_async=2,
chunk_token_size=1000,
chunk_overlap_token_size=80,
entity_extract_max_gleaning=1,
rerank_model_func=rerank_model_funcifreranker_modelelseNone,
graph_storage=os.getenv("GRAPH_STORE", "Neo4JStorage"),
vector_storage=os.getenv("VECTOR_STORE", "FaissVectorDBStorage"),
)
awaitself.rag.initialize_storages()
self.initialized=True
print("LightRAG MCP Server initialized successfully", file=sys.stderr)
asyncdefprocess_message(self, message: dict) -> dict:
ifnotself.initialized:
awaitself.initialize()
method=message.get("method")
params=message.get("params", {})
try:
ifmethod=="query":
result=awaitself.rag.aquery(
params.get("query", ""),
param=QueryParam(
mode=params.get("mode", "hybrid"),
stream=False,
only_need_context=params.get("only_context", False),
enable_rerank=Trueifself.rag.rerank_model_funcelseFalse
)
)
return {"result": result}
elifmethod=="insert":
text=params.get("text", "")
ifnottext:
return {"error": "No text provided"}
awaitself.rag.ainsert(text)
return {"result": "Document inserted successfully"}
elifmethod=="health":
return {"status": "ok", "initialized": self.initialized}
else:
return {"error": f"Unknown method: {method}"}
exceptExceptionase:
return {"error": str(e)}
asyncdefmain():
server=LightRAGMCPServer()
forlineinsys.stdin:
try:
line=line.strip()
ifnotline:
continue
message=json.loads(line)
response=awaitserver.process_message(message)
print(json.dumps(response), flush=True)
exceptjson.JSONDecodeErrorase:
print(json.dumps({"error": f"Invalid JSON: {e}"}), flush=True)
exceptExceptionase:
print(json.dumps({"error": f"Unexpected error: {e}"}), flush=True)
if__name__=="__main__":
asyncio.run(main())