#!/usr/bin/env python3 """ Integration Test: PHP <-> FastAPI <-> Qdrant This test simulates the complete workflow: 1. PHP creates a child + token (simulated via direct DB insert) 2. PHP creates a diary entry in MySQL (simulated) 3. PHP triggers FastAPI indexing (via HTTP) 4. FastAPI indexes the entry in Qdrant 5. Search finds the entry 6. RAG answers a question Prerequisites: - Docker Compose must be running - Database must be initialized - At least one API key configured (OpenAI, Anthropic, or OpenRouter) Usage: python tests/test_integration.py """ import requests import pymysql import os import sys import time from datetime import datetime # Configuration API_BASE_URL = os.getenv("API_BASE_URL", "http://localhost:8000") DB_HOST = os.getenv("MARIADB_HOST", "localhost") DB_USER = os.getenv("MARIADB_USER", "crumb") DB_PASSWORD = os.getenv("MARIADB_PASSWORD", "secret") DB_NAME = os.getenv("MARIADB_DATABASE", "crumbcrm") PROVIDER = os.getenv("TEST_PROVIDER", "openai") # openai, claude, or openrouter def get_db_connection(): """Get MySQL database connection.""" return pymysql.connect( host=DB_HOST, user=DB_USER, password=DB_PASSWORD, database=DB_NAME, charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor ) def cleanup_test_data(conn, child_id=None): """Clean up test data from previous runs.""" with conn.cursor() as cur: if child_id: # Clean up specific child's data cur.execute("DELETE FROM post_vectors WHERE child_id=%s AND post_type='diary'", (child_id,)) cur.execute("DELETE FROM diary_entries WHERE child_id=%s", (child_id,)) cur.execute("DELETE FROM children WHERE id=%s", (child_id,)) else: # Clean up all test data cur.execute("DELETE FROM post_vectors WHERE post_type='diary'") # Clean up specific test user by token (in case ID < 9000) cur.execute("SELECT id FROM children WHERE token = 'test-token-12345'") result = cur.fetchone() if result: cur.execute("DELETE FROM diary_entries WHERE child_id = %s", (result['id'],)) cur.execute("DELETE FROM children WHERE id = %s", (result['id'],)) # Legacy cleanup cur.execute("DELETE FROM diary_entries WHERE child_id >= 9000") cur.execute("DELETE FROM children WHERE id >= 9000") conn.commit() print("βœ“ Cleaned up test data") def test_step_1_create_child(conn): """ Step 1: Create a test child (simulating PHP child creation). """ print("\n=== Step 1: Create Child ===") with conn.cursor() as cur: cur.execute( """ INSERT INTO children (name, age, parent_email, token, created_at) VALUES ('Max Mustermann', 8, 'parent@test.local', 'test-token-12345', NOW()) """) child_id = cur.lastrowid conn.commit() print(f"βœ“ Created child with ID: {child_id}") return child_id def test_step_2_create_diary_entry(conn, child_id): """ Step 2: Create a diary entry (simulating PHP diary entry creation). """ print("\n=== Step 2: Create Diary Entry ===") diary_content = """# Heute im Wald Ich war heute mit Papa im Wald spazieren. Wir haben einen **Igel** gesehen! Er war ganz klein und süß. Papa hat gesagt, dass Igel nachtaktiv sind, aber dieser war wohl auf der Suche nach Futter. ## Was ich gelernt habe - Igel fressen Insekten und Schnecken - Sie rollen sich zu einer Kugel zusammen, wenn sie Angst haben - Im Winter halten sie Winterschlaf Das war ein toller Tag! πŸ¦” """ with conn.cursor() as cur: cur.execute( """ INSERT INTO diary_entries (child_id, entry_text, created_at) VALUES (%s, %s, NOW()) """, (child_id, diary_content) ) entry_id = cur.lastrowid conn.commit() print(f"βœ“ Created diary entry with ID: {entry_id}") return entry_id, diary_content def test_step_3_index_diary_entry(child_id, entry_id, content): """ Step 3: Trigger FastAPI indexing (simulating PHP calling FastAPI). """ print("\n=== Step 3: Index Diary Entry ===") url = f"{API_BASE_URL}/api/diary/index" payload = { "entry_id": entry_id, "child_id": child_id, "content": content, "provider": PROVIDER } response = requests.post(url, json=payload) if response.status_code != 200: print(f"βœ— Indexing failed: {response.status_code} - {response.text}") return False data = response.json() print(f"βœ“ Indexed successfully:") print(f" - Status: {data['status']}") print(f" - Chunks: {data.get('chunks')}") print(f" - Collection: {data.get('collection')}") print(f" - Provider: {data.get('provider')}") # Wait a bit for Qdrant to process time.sleep(1) return True def test_step_4_search_diary(child_id): """ Step 4: Search the diary (semantic search). """ print("\n=== Step 4: Search Diary ===") url = f"{API_BASE_URL}/api/diary/search" payload = { "child_id": child_id, "query": "Igel", "provider": PROVIDER, "limit": 3 } response = requests.post(url, json=payload) if response.status_code != 200: print(f"βœ— Search failed: {response.status_code} - {response.text}") return False data = response.json() print(f"βœ“ Search successful:") print(f" - Query: {data['query']}") print(f" - Results: {len(data['results'])}") for idx, result in enumerate(data['results'], 1): print(f"\n Result {idx}:") print(f" - Entry ID: {result['entry_id']}") print(f" - Score: {result['score']:.4f}") print(f" - Content preview: {result['content'][:100]}...") return len(data['results']) > 0 def test_step_5_ask_diary(child_id): """ Step 5: RAG query (ask a question about the diary). """ print("\n=== Step 5: RAG Query (Ask Question) ===") url = f"{API_BASE_URL}/api/diary/ask" payload = { "child_id": child_id, "question": "Was habe ich im Wald gesehen?", "provider": PROVIDER, "context_limit": 3 } response = requests.post(url, json=payload) if response.status_code != 200: print(f"βœ— RAG query failed: {response.status_code} - {response.text}") return False data = response.json() print(f"βœ“ RAG query successful:") print(f" - Question: {data['question']}") print(f" - Provider: {data['provider']}") print(f" - Model: {data['model']}") print(f"\n Answer:\n {data['answer']}\n") print(f" Sources: {len(data['sources'])} diary entries") return True def test_step_6_check_status(child_id): """ Step 6: Check indexing status for the child. """ print("\n=== Step 6: Check Indexing Status ===") url = f"{API_BASE_URL}/api/diary/{child_id}/status" response = requests.get(url) if response.status_code != 200: print(f"βœ— Status check failed: {response.status_code} - {response.text}") return False data = response.json() print(f"βœ“ Status:") print(f" - Child ID: {data['child_id']}") print(f" - Total entries: {data['total_entries']}") print(f" - Indexed entries: {data['indexed_entries']}") print(f" - Total vectors: {data['total_vectors']}") print(f" - Collection: {data['collection_name']}") print(f" - Last indexed: {data.get('last_indexed', 'N/A')}") return True def main(): """Run the complete integration test.""" print("=" * 60) print("Crumbforest Integration Test") print("PHP <-> FastAPI <-> Qdrant") print("=" * 60) # Check if API is reachable try: health_response = requests.get(f"{API_BASE_URL}/health") if health_response.status_code != 200: print(f"βœ— API health check failed. Is the server running?") sys.exit(1) print("βœ“ API is healthy") except requests.exceptions.ConnectionError: print(f"βœ— Cannot connect to API at {API_BASE_URL}") print(" Make sure docker-compose is running!") sys.exit(1) # Connect to database try: conn = get_db_connection() print("βœ“ Connected to database") except Exception as e: print(f"βœ— Database connection failed: {e}") sys.exit(1) try: # Clean up previous test data cleanup_test_data(conn) # Run test steps child_id = test_step_1_create_child(conn) entry_id, content = test_step_2_create_diary_entry(conn, child_id) if not test_step_3_index_diary_entry(child_id, entry_id, content): raise Exception("Indexing failed") if not test_step_4_search_diary(child_id): raise Exception("Search failed") if not test_step_5_ask_diary(child_id): raise Exception("RAG query failed") if not test_step_6_check_status(child_id): raise Exception("Status check failed") # Success! print("\n" + "=" * 60) print("βœ“ ALL TESTS PASSED!") print("=" * 60) print("\nThe complete integration flow works:") print(" 1. Child creation βœ“") print(" 2. Diary entry creation βœ“") print(" 3. FastAPI indexing βœ“") print(" 4. Semantic search βœ“") print(" 5. RAG query βœ“") print(" 6. Status check βœ“") print("\nWuuuuhuuu! πŸ’š") except Exception as e: print(f"\nβœ— TEST FAILED: {e}") sys.exit(1) finally: # Cleanup print("\n--- Cleanup ---") # Comment out the next line if you want to inspect the test data # cleanup_test_data(conn, child_id) conn.close() print("βœ“ Closed database connection") if __name__ == "__main__": main()