- test.sh: .venv Support und verbesserte API Key Erkennung - test_integration.py: Cleanup Logik repariert (Duplicate Token) - docker-compose.yml: Port 3306 für lokale Tests exposed - rag_service.py: NameError und Collection Naming Bug gefixt
327 lines
9.8 KiB
Python
327 lines
9.8 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Integration Test: PHP <-> FastAPI <-> Qdrant
|
|
|
|
This test simulates the complete workflow:
|
|
1. PHP creates a child + token (simulated via direct DB insert)
|
|
2. PHP creates a diary entry in MySQL (simulated)
|
|
3. PHP triggers FastAPI indexing (via HTTP)
|
|
4. FastAPI indexes the entry in Qdrant
|
|
5. Search finds the entry
|
|
6. RAG answers a question
|
|
|
|
Prerequisites:
|
|
- Docker Compose must be running
|
|
- Database must be initialized
|
|
- At least one API key configured (OpenAI, Anthropic, or OpenRouter)
|
|
|
|
Usage:
|
|
python tests/test_integration.py
|
|
"""
|
|
|
|
import requests
|
|
import pymysql
|
|
import os
|
|
import sys
|
|
import time
|
|
from datetime import datetime
|
|
|
|
# Configuration
|
|
API_BASE_URL = os.getenv("API_BASE_URL", "http://localhost:8000")
|
|
DB_HOST = os.getenv("MARIADB_HOST", "localhost")
|
|
DB_USER = os.getenv("MARIADB_USER", "crumb")
|
|
DB_PASSWORD = os.getenv("MARIADB_PASSWORD", "secret")
|
|
DB_NAME = os.getenv("MARIADB_DATABASE", "crumbcrm")
|
|
PROVIDER = os.getenv("TEST_PROVIDER", "openai") # openai, claude, or openrouter
|
|
|
|
|
|
def get_db_connection():
|
|
"""Get MySQL database connection."""
|
|
return pymysql.connect(
|
|
host=DB_HOST,
|
|
user=DB_USER,
|
|
password=DB_PASSWORD,
|
|
database=DB_NAME,
|
|
charset='utf8mb4',
|
|
cursorclass=pymysql.cursors.DictCursor
|
|
)
|
|
|
|
|
|
def cleanup_test_data(conn, child_id=None):
|
|
"""Clean up test data from previous runs."""
|
|
with conn.cursor() as cur:
|
|
if child_id:
|
|
# Clean up specific child's data
|
|
cur.execute("DELETE FROM post_vectors WHERE child_id=%s AND post_type='diary'", (child_id,))
|
|
cur.execute("DELETE FROM diary_entries WHERE child_id=%s", (child_id,))
|
|
cur.execute("DELETE FROM children WHERE id=%s", (child_id,))
|
|
else:
|
|
# Clean up all test data
|
|
cur.execute("DELETE FROM post_vectors WHERE post_type='diary'")
|
|
|
|
# Clean up specific test user by token (in case ID < 9000)
|
|
cur.execute("SELECT id FROM children WHERE token = 'test-token-12345'")
|
|
result = cur.fetchone()
|
|
if result:
|
|
cur.execute("DELETE FROM diary_entries WHERE child_id = %s", (result['id'],))
|
|
cur.execute("DELETE FROM children WHERE id = %s", (result['id'],))
|
|
|
|
# Legacy cleanup
|
|
cur.execute("DELETE FROM diary_entries WHERE child_id >= 9000")
|
|
cur.execute("DELETE FROM children WHERE id >= 9000")
|
|
conn.commit()
|
|
print("✓ Cleaned up test data")
|
|
|
|
|
|
def test_step_1_create_child(conn):
|
|
"""
|
|
Step 1: Create a test child (simulating PHP child creation).
|
|
"""
|
|
print("\n=== Step 1: Create Child ===")
|
|
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO children (name, age, parent_email, token, created_at)
|
|
VALUES ('Max Mustermann', 8, 'parent@test.local', 'test-token-12345', NOW())
|
|
""")
|
|
child_id = cur.lastrowid
|
|
conn.commit()
|
|
|
|
print(f"✓ Created child with ID: {child_id}")
|
|
return child_id
|
|
|
|
|
|
def test_step_2_create_diary_entry(conn, child_id):
|
|
"""
|
|
Step 2: Create a diary entry (simulating PHP diary entry creation).
|
|
"""
|
|
print("\n=== Step 2: Create Diary Entry ===")
|
|
|
|
diary_content = """# Heute im Wald
|
|
|
|
Ich war heute mit Papa im Wald spazieren. Wir haben einen **Igel** gesehen!
|
|
Er war ganz klein und süß. Papa hat gesagt, dass Igel nachtaktiv sind, aber
|
|
dieser war wohl auf der Suche nach Futter.
|
|
|
|
## Was ich gelernt habe
|
|
|
|
- Igel fressen Insekten und Schnecken
|
|
- Sie rollen sich zu einer Kugel zusammen, wenn sie Angst haben
|
|
- Im Winter halten sie Winterschlaf
|
|
|
|
Das war ein toller Tag! 🦔
|
|
"""
|
|
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO diary_entries (child_id, entry_text, created_at)
|
|
VALUES (%s, %s, NOW())
|
|
""",
|
|
(child_id, diary_content)
|
|
)
|
|
entry_id = cur.lastrowid
|
|
conn.commit()
|
|
|
|
print(f"✓ Created diary entry with ID: {entry_id}")
|
|
return entry_id, diary_content
|
|
|
|
|
|
def test_step_3_index_diary_entry(child_id, entry_id, content):
|
|
"""
|
|
Step 3: Trigger FastAPI indexing (simulating PHP calling FastAPI).
|
|
"""
|
|
print("\n=== Step 3: Index Diary Entry ===")
|
|
|
|
url = f"{API_BASE_URL}/api/diary/index"
|
|
payload = {
|
|
"entry_id": entry_id,
|
|
"child_id": child_id,
|
|
"content": content,
|
|
"provider": PROVIDER
|
|
}
|
|
|
|
response = requests.post(url, json=payload)
|
|
|
|
if response.status_code != 200:
|
|
print(f"✗ Indexing failed: {response.status_code} - {response.text}")
|
|
return False
|
|
|
|
data = response.json()
|
|
print(f"✓ Indexed successfully:")
|
|
print(f" - Status: {data['status']}")
|
|
print(f" - Chunks: {data.get('chunks')}")
|
|
print(f" - Collection: {data.get('collection')}")
|
|
print(f" - Provider: {data.get('provider')}")
|
|
|
|
# Wait a bit for Qdrant to process
|
|
time.sleep(1)
|
|
return True
|
|
|
|
|
|
def test_step_4_search_diary(child_id):
|
|
"""
|
|
Step 4: Search the diary (semantic search).
|
|
"""
|
|
print("\n=== Step 4: Search Diary ===")
|
|
|
|
url = f"{API_BASE_URL}/api/diary/search"
|
|
payload = {
|
|
"child_id": child_id,
|
|
"query": "Igel",
|
|
"provider": PROVIDER,
|
|
"limit": 3
|
|
}
|
|
|
|
response = requests.post(url, json=payload)
|
|
|
|
if response.status_code != 200:
|
|
print(f"✗ Search failed: {response.status_code} - {response.text}")
|
|
return False
|
|
|
|
data = response.json()
|
|
print(f"✓ Search successful:")
|
|
print(f" - Query: {data['query']}")
|
|
print(f" - Results: {len(data['results'])}")
|
|
|
|
for idx, result in enumerate(data['results'], 1):
|
|
print(f"\n Result {idx}:")
|
|
print(f" - Entry ID: {result['entry_id']}")
|
|
print(f" - Score: {result['score']:.4f}")
|
|
print(f" - Content preview: {result['content'][:100]}...")
|
|
|
|
return len(data['results']) > 0
|
|
|
|
|
|
def test_step_5_ask_diary(child_id):
|
|
"""
|
|
Step 5: RAG query (ask a question about the diary).
|
|
"""
|
|
print("\n=== Step 5: RAG Query (Ask Question) ===")
|
|
|
|
url = f"{API_BASE_URL}/api/diary/ask"
|
|
payload = {
|
|
"child_id": child_id,
|
|
"question": "Was habe ich im Wald gesehen?",
|
|
"provider": PROVIDER,
|
|
"context_limit": 3
|
|
}
|
|
|
|
response = requests.post(url, json=payload)
|
|
|
|
if response.status_code != 200:
|
|
print(f"✗ RAG query failed: {response.status_code} - {response.text}")
|
|
return False
|
|
|
|
data = response.json()
|
|
print(f"✓ RAG query successful:")
|
|
print(f" - Question: {data['question']}")
|
|
print(f" - Provider: {data['provider']}")
|
|
print(f" - Model: {data['model']}")
|
|
print(f"\n Answer:\n {data['answer']}\n")
|
|
print(f" Sources: {len(data['sources'])} diary entries")
|
|
|
|
return True
|
|
|
|
|
|
def test_step_6_check_status(child_id):
|
|
"""
|
|
Step 6: Check indexing status for the child.
|
|
"""
|
|
print("\n=== Step 6: Check Indexing Status ===")
|
|
|
|
url = f"{API_BASE_URL}/api/diary/{child_id}/status"
|
|
response = requests.get(url)
|
|
|
|
if response.status_code != 200:
|
|
print(f"✗ Status check failed: {response.status_code} - {response.text}")
|
|
return False
|
|
|
|
data = response.json()
|
|
print(f"✓ Status:")
|
|
print(f" - Child ID: {data['child_id']}")
|
|
print(f" - Total entries: {data['total_entries']}")
|
|
print(f" - Indexed entries: {data['indexed_entries']}")
|
|
print(f" - Total vectors: {data['total_vectors']}")
|
|
print(f" - Collection: {data['collection_name']}")
|
|
print(f" - Last indexed: {data.get('last_indexed', 'N/A')}")
|
|
|
|
return True
|
|
|
|
|
|
def main():
|
|
"""Run the complete integration test."""
|
|
print("=" * 60)
|
|
print("Crumbforest Integration Test")
|
|
print("PHP <-> FastAPI <-> Qdrant")
|
|
print("=" * 60)
|
|
|
|
# Check if API is reachable
|
|
try:
|
|
health_response = requests.get(f"{API_BASE_URL}/health")
|
|
if health_response.status_code != 200:
|
|
print(f"✗ API health check failed. Is the server running?")
|
|
sys.exit(1)
|
|
print("✓ API is healthy")
|
|
except requests.exceptions.ConnectionError:
|
|
print(f"✗ Cannot connect to API at {API_BASE_URL}")
|
|
print(" Make sure docker-compose is running!")
|
|
sys.exit(1)
|
|
|
|
# Connect to database
|
|
try:
|
|
conn = get_db_connection()
|
|
print("✓ Connected to database")
|
|
except Exception as e:
|
|
print(f"✗ Database connection failed: {e}")
|
|
sys.exit(1)
|
|
|
|
try:
|
|
# Clean up previous test data
|
|
cleanup_test_data(conn)
|
|
|
|
# Run test steps
|
|
child_id = test_step_1_create_child(conn)
|
|
entry_id, content = test_step_2_create_diary_entry(conn, child_id)
|
|
|
|
if not test_step_3_index_diary_entry(child_id, entry_id, content):
|
|
raise Exception("Indexing failed")
|
|
|
|
if not test_step_4_search_diary(child_id):
|
|
raise Exception("Search failed")
|
|
|
|
if not test_step_5_ask_diary(child_id):
|
|
raise Exception("RAG query failed")
|
|
|
|
if not test_step_6_check_status(child_id):
|
|
raise Exception("Status check failed")
|
|
|
|
# Success!
|
|
print("\n" + "=" * 60)
|
|
print("✓ ALL TESTS PASSED!")
|
|
print("=" * 60)
|
|
print("\nThe complete integration flow works:")
|
|
print(" 1. Child creation ✓")
|
|
print(" 2. Diary entry creation ✓")
|
|
print(" 3. FastAPI indexing ✓")
|
|
print(" 4. Semantic search ✓")
|
|
print(" 5. RAG query ✓")
|
|
print(" 6. Status check ✓")
|
|
print("\nWuuuuhuuu! 💚")
|
|
|
|
except Exception as e:
|
|
print(f"\n✗ TEST FAILED: {e}")
|
|
sys.exit(1)
|
|
finally:
|
|
# Cleanup
|
|
print("\n--- Cleanup ---")
|
|
# Comment out the next line if you want to inspect the test data
|
|
# cleanup_test_data(conn, child_id)
|
|
conn.close()
|
|
print("✓ Closed database connection")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|