Files
Crumb-Core-v.1/app/services/document_indexer.py

366 lines
12 KiB
Python

# app/services/document_indexer.py
"""
Document Indexer Service
Automatically indexes Markdown documents from docs/ directories
Tracks changes via file hashes (DSGVO-compliant)
"""
import os
import json
import hashlib
from pathlib import Path
from typing import List, Dict, Any, Optional
from datetime import datetime
from pymysql import Connection
from pymysql.cursors import DictCursor
from qdrant_client import QdrantClient
from lib.embedding_providers.base import BaseProvider
from services.rag_service import RAGService
class DocumentIndexer:
"""
Indexes Markdown documents from docs/ directories.
Tracks changes and only re-indexes modified files.
"""
def __init__(
self,
db_conn: Connection,
qdrant_client: QdrantClient,
embedding_provider: BaseProvider,
docs_base_path: str = "docs"
):
"""
Initialize document indexer.
Args:
db_conn: Database connection
qdrant_client: Qdrant client
embedding_provider: Provider for embeddings
docs_base_path: Base path to docs directory
"""
self.db_conn = db_conn
self.qdrant = qdrant_client
self.embedding_provider = embedding_provider
self.docs_base_path = Path(docs_base_path)
# Supported document categories
self.categories = {
"rz-nullfeld": "docs_rz_nullfeld",
"crumbforest": "docs_crumbforest"
}
def get_file_hash(self, file_path: Path) -> str:
"""Calculate MD5 hash of file content."""
with open(file_path, 'rb') as f:
return hashlib.md5(f.read()).hexdigest()
def find_markdown_files(self, category: str) -> List[Path]:
"""
Find all Markdown files in a category directory.
Args:
category: Category name (e.g., 'rz-nullfeld')
Returns:
List of Markdown file paths
"""
category_path = self.docs_base_path / category
if not category_path.exists():
return []
# Find all .md files recursively
return list(category_path.glob("**/*.md"))
def should_reindex(self, file_path: Path, collection_name: str) -> bool:
"""
Check if file should be re-indexed based on hash comparison.
Args:
file_path: Path to markdown file
collection_name: Qdrant collection name
Returns:
True if file should be re-indexed
"""
# Calculate current hash
current_hash = self.get_file_hash(file_path)
# Get stored hash from database
with self.db_conn.cursor(DictCursor) as cur:
cur.execute(
"""
SELECT file_hash FROM post_vectors
WHERE post_id = %s AND collection_name = %s AND post_type = 'document'
""",
(str(file_path), collection_name)
)
result = cur.fetchone()
# Re-index if no record exists or hash changed
if not result:
return True
return result['file_hash'] != current_hash
def index_document(
self,
file_path: Path,
category: str,
force: bool = False
) -> Dict[str, Any]:
"""
Index a single Markdown document.
Args:
file_path: Path to markdown file
category: Category name (e.g., 'rz-nullfeld')
force: Force re-indexing even if unchanged
Returns:
Indexing result dictionary
"""
collection_name = self.categories.get(category)
if not collection_name:
raise ValueError(f"Unknown category: {category}")
# Check if re-indexing needed
if not force and not self.should_reindex(file_path, collection_name):
return {
'file': str(file_path),
'status': 'unchanged',
'message': 'File unchanged, skipping'
}
# Read file content
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
if not content.strip():
return {
'file': str(file_path),
'status': 'skipped',
'message': 'Empty file'
}
# Create RAG service with document-specific collection
rag_service = RAGService(
db_conn=self.db_conn,
qdrant_client=self.qdrant,
embedding_provider=self.embedding_provider,
collection_prefix=collection_name
)
# Index as post (using file path as ID)
result = rag_service.index_post(
post_id=hash(str(file_path)) % (2**31), # Convert path to int ID
title=file_path.stem, # Filename without extension
slug=f"{category}/{file_path.stem}",
locale="", # Documents are locale-agnostic
body_md=content
)
# Update post_vectors to mark as document type
file_hash = self.get_file_hash(file_path)
with self.db_conn.cursor(DictCursor) as cur:
cur.execute(
"""
UPDATE post_vectors
SET post_type='document',
metadata=%s,
file_hash=%s
WHERE post_id=%s AND collection_name=%s
""",
(
json.dumps({
'category': category,
'file_path': str(file_path),
'file_name': file_path.name
}),
file_hash,
hash(str(file_path)) % (2**31),
result['collection']
)
)
# Add metadata column if it doesn't exist
try:
cur.execute(
"ALTER TABLE post_vectors ADD COLUMN IF NOT EXISTS metadata JSON NULL"
)
except Exception:
pass # Column might already exist
# DSGVO Audit Log
cur.execute(
"""
INSERT INTO audit_log (action, entity_type, entity_id, user_id, metadata)
VALUES ('document_indexed', 'document', %s, NULL, %s)
""",
(
hash(str(file_path)) % (2**31),
json.dumps({
'category': category,
'file_path': str(file_path),
'provider': self.embedding_provider.provider_name,
'chunks': result.get('chunks', 0),
'timestamp': datetime.now().isoformat()
})
)
)
return {
'file': str(file_path),
'status': 'indexed',
'chunks': result.get('chunks', 0),
'collection': result['collection'],
'provider': result.get('provider')
}
def index_category(
self,
category: str,
force: bool = False
) -> Dict[str, Any]:
"""
Index all documents in a category.
Args:
category: Category name (e.g., 'rz-nullfeld')
force: Force re-indexing of all files
Returns:
Summary of indexing results
"""
files = self.find_markdown_files(category)
if not files:
return {
'category': category,
'total': 0,
'indexed': 0,
'unchanged': 0,
'errors': 0,
'message': f'No markdown files found in docs/{category}/'
}
results = {
'category': category,
'total': len(files),
'indexed': 0,
'unchanged': 0,
'errors': 0,
'files': []
}
for file_path in files:
try:
result = self.index_document(file_path, category, force=force)
results['files'].append(result)
if result['status'] == 'indexed':
results['indexed'] += 1
elif result['status'] == 'unchanged':
results['unchanged'] += 1
except Exception as e:
results['errors'] += 1
results['files'].append({
'file': str(file_path),
'status': 'error',
'message': str(e)
})
print(f"Error indexing {file_path}: {e}")
return results
def index_all_categories(self, force: bool = False) -> Dict[str, Any]:
"""
Index all documents in all categories.
Args:
force: Force re-indexing of all files
Returns:
Summary of indexing results for all categories
"""
results = {
'categories': {},
'total_files': 0,
'total_indexed': 0,
'total_unchanged': 0,
'total_errors': 0
}
for category in self.categories.keys():
category_result = self.index_category(category, force=force)
results['categories'][category] = category_result
results['total_files'] += category_result['total']
results['total_indexed'] += category_result['indexed']
results['total_unchanged'] += category_result['unchanged']
results['total_errors'] += category_result['errors']
return results
def get_indexing_status(self, category: Optional[str] = None) -> Dict[str, Any]:
"""
Get indexing status for documents.
Args:
category: Optional category filter
Returns:
Status information
"""
status = {}
with self.db_conn.cursor(DictCursor) as cur:
if category:
collection_name = self.categories.get(category)
if not collection_name:
raise ValueError(f"Unknown category: {category}")
cur.execute(
"""
SELECT COUNT(*) as count,
SUM(chunk_count) as total_vectors,
MAX(indexed_at) as last_indexed
FROM post_vectors
WHERE collection_name=%s AND post_type='document'
""",
(collection_name,)
)
result = cur.fetchone()
status[category] = {
'collection': collection_name,
'documents': result['count'] or 0,
'vectors': result['total_vectors'] or 0,
'last_indexed': result['last_indexed'].isoformat() if result['last_indexed'] else None
}
else:
# Get status for all categories
for cat, coll_name in self.categories.items():
cur.execute(
"""
SELECT COUNT(*) as count,
SUM(chunk_count) as total_vectors,
MAX(indexed_at) as last_indexed
FROM post_vectors
WHERE collection_name=%s AND post_type='document'
""",
(coll_name,)
)
result = cur.fetchone()
status[cat] = {
'collection': coll_name,
'documents': result['count'] or 0,
'vectors': result['total_vectors'] or 0,
'last_indexed': result['last_indexed'].isoformat() if result['last_indexed'] else None
}
return status