Files
Crumb-Core-v.1/app/services/history_indexer.py

144 lines
4.8 KiB
Python

# app/services/history_indexer.py
"""
History Indexer Service
Indexes chat history from .jsonl logs into Qdrant for semantic search.
"""
import json
import logging
from pathlib import Path
from typing import List, Dict, Any, Optional
from datetime import datetime
from pymysql import Connection
from qdrant_client import QdrantClient
from qdrant_client.http import models
from lib.embedding_providers.base import BaseProvider
from services.rag_service import RAGService
logger = logging.getLogger(__name__)
class HistoryIndexer:
"""
Indexes chat history from line-delimited JSON files.
"""
def __init__(
self,
db_conn: Connection,
qdrant_client: QdrantClient,
embedding_provider: BaseProvider,
collection_name: str = "chat_history"
):
self.db_conn = db_conn
self.qdrant = qdrant_client
self.embedding_provider = embedding_provider
self.collection_name = collection_name
self.log_path = Path("/var/log/crumbforest/chat_history.jsonl")
def ensure_collection(self):
"""Ensure the Qdrant collection exists."""
collections = self.qdrant.get_collections()
exists = any(c.name == self.collection_name for c in collections.collections)
if not exists:
logger.info(f"Creating collection {self.collection_name}")
self.qdrant.create_collection(
collection_name=self.collection_name,
vectors_config=models.VectorParams(
size=self.embedding_provider.dimension,
distance=models.Distance.COSINE
)
)
def parse_line(self, line: str) -> Optional[Dict[str, Any]]:
"""Parse a single log line."""
try:
return json.loads(line)
except json.JSONDecodeError:
return None
def index_history(self, batch_size: int = 50) -> Dict[str, int]:
"""
Read the log file and index entries.
Ideally, this should track progress (e.g. last read line) to avoid re-indexing.
For V1, we naively read all and upsert (relying on deterministic IDs or just appending).
"""
if not self.log_path.exists():
logger.warning(f"Log file not found: {self.log_path}")
return {"indexed": 0, "errors": 1}
self.ensure_collection()
indexed_count = 0
errors = 0
batch = []
# RagService helps with embedding, but here we might want raw access or use RagService's helper
# We'll use the embedding provider directly for custom points
with open(self.log_path, 'r', encoding='utf-8') as f:
for i, line in enumerate(f):
entry = self.parse_line(line)
if not entry:
errors += 1
continue
# Check for nested interaction structure (from ChatLogger)
interaction = entry.get('interaction', {})
question = interaction.get('question')
answer = interaction.get('answer')
if not question or not answer:
# Fallback for older flat logs if they exist
question = entry.get('question')
answer = entry.get('answer')
if not question or not answer:
continue
text_content = f"Q: {question}\nA: {answer}"
# Create a deterministic ID based on content + timestamp
# or just use loop index if file is immutable (risky)
# Let's use hash of the line
import hashlib
line_hash = hashlib.md5(line.encode('utf-8')).hexdigest()
point_id = str(line_hash) # Qdrant supports UUID strings or ints
batch.append({
"id": point_id,
"payload": entry,
"text": text_content
})
if len(batch) >= batch_size:
self._flush_batch(batch)
indexed_count += len(batch)
batch = []
if batch:
self._flush_batch(batch)
indexed_count += len(batch)
return {"indexed": indexed_count, "errors": errors}
def _flush_batch(self, batch: List[Dict[str, Any]]):
"""Embed and upsert a batch of points."""
texts = [b["text"] for b in batch]
embeddings = self.embedding_provider.get_embeddings(texts)
points = [
models.PointStruct(
id=b["id"],
vector=embedding,
payload=b["payload"]
)
for b, embedding in zip(batch, embeddings)
]
self.qdrant.upsert(
collection_name=self.collection_name,
points=points
)