feat(data-integration): add Jumbo API setup tools

Adds a management command and helper script to configure the Jumbo API external data source and fetch chat sessions.
Ensures idempotent creation and linkage to the Jumbo company while reading API credentials from environment variables.
Prints data source details and a post-fetch summary to ease setup and verification.
This commit is contained in:
2025-11-05 20:43:35 +01:00
parent 45f003eafa
commit d82be93da2
2 changed files with 146 additions and 0 deletions

View File

@@ -0,0 +1,69 @@
"""
Management command to set up Jumbo API external data source and fetch data.
"""
import os
from accounts.models import Company
from data_integration.models import ChatSession, ExternalDataSource
from django.core.management.base import BaseCommand
class Command(BaseCommand):
help = "Set up Jumbo API external data source and fetch chat data"
def handle(self, **options): # noqa: ARG002
self.stdout.write("Setting up Jumbo API data source...")
# Get Jumbo company
try:
jumbo = Company.objects.get(name="Jumbo")
self.stdout.write(f"✓ Found Jumbo company (ID: {jumbo.id})")
except Company.DoesNotExist:
self.stdout.write(self.style.ERROR("✗ Jumbo company not found. Run setup_jumbo command first."))
return
# Create or get Jumbo API external data source
source, created = ExternalDataSource.objects.get_or_create(
name="Jumbo API",
defaults={
"company": jumbo,
"api_url": "https://proto.notso.ai/jumbo/chats",
"auth_username": os.environ.get("EXTERNAL_API_USERNAME", ""),
"auth_password": os.environ.get("EXTERNAL_API_PASSWORD", ""),
"is_active": True,
},
)
# Ensure company is set if already existed
if not created and not source.company:
source.company = jumbo
source.save()
self.stdout.write(self.style.SUCCESS(f"✓ Linked existing Jumbo API source to {jumbo.name}"))
elif created:
self.stdout.write(self.style.SUCCESS("✓ Created Jumbo API external data source"))
else:
self.stdout.write("✓ Jumbo API source already exists")
self.stdout.write(
f"\nData source details:"
f"\n ID: {source.id}"
f"\n Company: {source.company.name if source.company else 'None'}"
f"\n Endpoint: {source.api_url}"
f"\n Active: {source.is_active}"
)
# Fetch data (call the task synchronously)
self.stdout.write("\nFetching Jumbo chat data...")
try:
# Use .apply() or direct function call to run synchronously
from data_integration.utils import fetch_and_store_chat_data
result = fetch_and_store_chat_data(source.id)
self.stdout.write(self.style.SUCCESS(f"✓ Data fetch completed: {result}"))
except Exception as e:
self.stdout.write(self.style.ERROR(f"✗ Error fetching data: {e}"))
# Show summary
session_count = ChatSession.objects.filter(company=jumbo).count()
self.stdout.write(self.style.SUCCESS(f"\n✓ Setup complete!\n Total Jumbo chat sessions: {session_count}"))