feat(crumbforest): wire up docs, missions, and history indexer
This commit is contained in:
134
app/services/history_indexer.py
Normal file
134
app/services/history_indexer.py
Normal file
@@ -0,0 +1,134 @@
|
||||
# app/services/history_indexer.py
|
||||
"""
|
||||
History Indexer Service
|
||||
Indexes chat history from .jsonl logs into Qdrant for semantic search.
|
||||
"""
|
||||
import json
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Any, Optional
|
||||
from datetime import datetime
|
||||
|
||||
from pymysql import Connection
|
||||
from qdrant_client import QdrantClient
|
||||
from qdrant_client.http import models
|
||||
|
||||
from lib.embedding_providers.base import BaseProvider
|
||||
from services.rag_service import RAGService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class HistoryIndexer:
|
||||
"""
|
||||
Indexes chat history from line-delimited JSON files.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
db_conn: Connection,
|
||||
qdrant_client: QdrantClient,
|
||||
embedding_provider: BaseProvider,
|
||||
collection_name: str = "chat_history"
|
||||
):
|
||||
self.db_conn = db_conn
|
||||
self.qdrant = qdrant_client
|
||||
self.embedding_provider = embedding_provider
|
||||
self.collection_name = collection_name
|
||||
self.log_path = Path("/var/log/crumbforest/chat_history.jsonl")
|
||||
|
||||
def ensure_collection(self):
|
||||
"""Ensure the Qdrant collection exists."""
|
||||
collections = self.qdrant.get_collections()
|
||||
exists = any(c.name == self.collection_name for c in collections.collections)
|
||||
|
||||
if not exists:
|
||||
logger.info(f"Creating collection {self.collection_name}")
|
||||
self.qdrant.create_collection(
|
||||
collection_name=self.collection_name,
|
||||
vectors_config=models.VectorParams(
|
||||
size=self.embedding_provider.dimension,
|
||||
distance=models.Distance.COSINE
|
||||
)
|
||||
)
|
||||
|
||||
def parse_line(self, line: str) -> Optional[Dict[str, Any]]:
|
||||
"""Parse a single log line."""
|
||||
try:
|
||||
return json.loads(line)
|
||||
except json.JSONDecodeError:
|
||||
return None
|
||||
|
||||
def index_history(self, batch_size: int = 50) -> Dict[str, int]:
|
||||
"""
|
||||
Read the log file and index entries.
|
||||
Ideally, this should track progress (e.g. last read line) to avoid re-indexing.
|
||||
For V1, we naively read all and upsert (relying on deterministic IDs or just appending).
|
||||
"""
|
||||
if not self.log_path.exists():
|
||||
logger.warning(f"Log file not found: {self.log_path}")
|
||||
return {"indexed": 0, "errors": 1}
|
||||
|
||||
self.ensure_collection()
|
||||
|
||||
indexed_count = 0
|
||||
errors = 0
|
||||
batch = []
|
||||
|
||||
# RagService helps with embedding, but here we might want raw access or use RagService's helper
|
||||
# We'll use the embedding provider directly for custom points
|
||||
|
||||
with open(self.log_path, 'r', encoding='utf-8') as f:
|
||||
for i, line in enumerate(f):
|
||||
entry = self.parse_line(line)
|
||||
if not entry:
|
||||
errors += 1
|
||||
continue
|
||||
|
||||
# We expect entry to have 'question', 'answer', 'role', 'timestamp'
|
||||
if 'question' not in entry or 'answer' not in entry:
|
||||
continue
|
||||
|
||||
text_content = f"Q: {entry.get('question')}\nA: {entry.get('answer')}"
|
||||
|
||||
# Create a deterministic ID based on content + timestamp
|
||||
# or just use loop index if file is immutable (risky)
|
||||
# Let's use hash of the line
|
||||
import hashlib
|
||||
line_hash = hashlib.md5(line.encode('utf-8')).hexdigest()
|
||||
point_id = str(line_hash) # Qdrant supports UUID strings or ints
|
||||
|
||||
batch.append({
|
||||
"id": point_id,
|
||||
"payload": entry,
|
||||
"text": text_content
|
||||
})
|
||||
|
||||
if len(batch) >= batch_size:
|
||||
self._flush_batch(batch)
|
||||
indexed_count += len(batch)
|
||||
batch = []
|
||||
|
||||
if batch:
|
||||
self._flush_batch(batch)
|
||||
indexed_count += len(batch)
|
||||
|
||||
return {"indexed": indexed_count, "errors": errors}
|
||||
|
||||
def _flush_batch(self, batch: List[Dict[str, Any]]):
|
||||
"""Embed and upsert a batch of points."""
|
||||
texts = [b["text"] for b in batch]
|
||||
embeddings = self.embedding_provider.get_embeddings(texts)
|
||||
|
||||
points = [
|
||||
models.PointStruct(
|
||||
id=b["id"],
|
||||
vector=embedding,
|
||||
payload=b["payload"]
|
||||
)
|
||||
for b, embedding in zip(batch, embeddings)
|
||||
]
|
||||
|
||||
self.qdrant.upsert(
|
||||
collection_name=self.collection_name,
|
||||
points=points
|
||||
)
|
||||
Reference in New Issue
Block a user