Your ComfyUI workflows are deployed as APIs—now let’s integrate them into Python applications. This SDK transforms your creative AI work into application features that users can access through web services, data pipelines, automation systems, and interactive tools.
What you’re building: Instead of asking users to understand ComfyUI or API endpoints, you’re creating seamless experiences where AI capabilities feel like native Python functions. Your workflows become the invisible intelligence powering data processing, web applications, and automated systems.
🚀 What’s New in v1.1.4
  • 🔄 Updated Response Format: Both execute_workflow and execute_workflow_async now return workflow_id alongside run_id
  • ✨ Enhanced Polling: Improved execute_workflow_async with better run status detection and failure handling
  • 📚 Comprehensive Documentation: Advanced usage examples for FastAPI, Django, Flask, and Jupyter integration
  • 🖼️ Dynamic Parameters: Full support for dynamic parameter names matching your ComfyUI workflow structure
  • 📤 Multiple File Support: Handle multiple images, videos, and files in single workflow execution
  • 📊 Pagination & Filtering: Enhanced get_runs() with sorting, filtering, and pagination capabilities

Installation: Adding AI Power to Your Python Projects

Ready to bring your AI workflows into your Python applications? Install the FlowScale SDK to start integrating your deployed ComfyUI workflows into any Python project.
What this enables: Your users won’t interact with ComfyUI or see API endpoints—they’ll experience AI capabilities as seamless Python functions. Image generation, style transfer, content creation, and other AI workflows become as simple as calling a method.
pip install flowscale

Quick Start: From Deployment to Integration

Prerequisites: You should have already deployed your ComfyUI workflow as an API through the FlowScale AI platform. If you haven’t, complete the Deploy Workflows guide first.

Connecting Your Python Application to Your AI Infrastructure

This is where your deployed AI workflows become Python application features. Import the SDK and connect to the production APIs you’ve already created:
from flowscale import FlowscaleAPI
import os

# Store API keys in environment variables
api_key = os.environ.get("FLOWSCALE_API_KEY")
api_url = os.environ.get("FLOWSCALE_API_URL")

if not api_key or not api_url:
    print("FLOWSCALE_API_KEY or FLOWSCALE_API_URL not set in environment")
    exit(1)

flowscale = FlowscaleAPI(api_key, api_url)

Environment Variables

Add the following to your .env file:
FLOWSCALE_API_KEY=your-api-key
FLOWSCALE_API_URL=https://your-api-url.pod.flowscale.ai

Integration Patterns for Production Applications

Choose the integration pattern that matches your application’s needs:
Best for: API endpoints, web applications, microservicesBenefits:
  • Production-ready web framework integration
  • Built-in authentication and validation
  • Scalable request handling
  • Easy deployment and monitoring
Use cases: REST APIs, web applications, background job processing

Configuration

Client Options

from flowscale import FlowscaleAPI

flowscale = FlowscaleAPI(
    api_key="your-api-key",
    api_url="https://your-api-url.pod.flowscale.ai",
    timeout=300,           # Request timeout in seconds (default: 300)
    max_retries=3,         # Number of retry attempts (default: 3)
    retry_delay=1.0,       # Delay between retries in seconds (default: 1.0) 
    enable_logging=True,   # Enable SDK logging (default: True)
    log_level="info",      # Log level: debug, info, warn, error (default: info)
    user_agent="MyApp/1.0" # Custom user agent string
)

Logging Configuration

The SDK includes configurable logging to help with debugging and monitoring:
import logging
from flowscale import FlowscaleAPI

# Configure logging during initialization
flowscale = FlowscaleAPI(
    api_key="your-api-key",
    api_url="https://your-api-url.pod.flowscale.ai",
    enable_logging=True,
    log_level="debug"
)

# Change logging settings after initialization
flowscale.set_logging_config(enabled=True, level="debug")

# Disable logging completely
flowscale.set_logging_config(enabled=False)

# Use custom logger
custom_logger = logging.getLogger("my_app.flowscale")
flowscale.set_logger(custom_logger)

Core Methods

Platform Health and Status

Check Health Status

Monitor the health of the FlowScale platform, including container status:
health = flowscale.check_health()
print(f"API Health: {health}")

Get Queue Status

Retrieve the current workflow queue status:
queue = flowscale.get_queue()
print(f"Queue Details: {queue}")

Workflow Execution

Execute Workflow (Synchronous)

Trigger a workflow execution and get immediate response:
workflow_id = "bncu0a1kipv"
group_id = "test_group"  # Optional for organizing runs

# Basic usage with text inputs
inputs = {
    "text_51536": "Prompt test",
    "another_param": "some value"
}

# Advanced usage with files and dynamic parameters
inputs = {
    "text_51536": "A beautiful sunset over mountains",
    "image_35728": open("path/to/image.png", "rb"),      # File object
    "image_42561": "path/to/another_image.jpg",          # File path (auto-detected)
    "video_1239": open("path/to/video.mp4", "rb")       # Multiple file types
}

result = flowscale.execute_workflow(workflow_id, inputs, group_id)
print(f"Workflow Result: {result}")

Execute Workflow (Asynchronous with Auto-Polling)

Execute a workflow and automatically poll for its output until completion:
workflow_id = "bncu0a1kipv"

# Simple usage
inputs = {
    "text_51536": "A serene lake at dawn",
    "image_1234": open("path/to/image.png", "rb")
}

# Auto-polls every 2 seconds, times out after 10 minutes by default
result = flowscale.execute_workflow_async(workflow_id, inputs)
print(f"Workflow Result: {result}")

# With custom timeout and polling interval
result = flowscale.execute_workflow_async(
    workflow_id, 
    inputs,
    group_id="my_group",
    timeout=600,           # 10 minutes
    polling_interval=5     # Check every 5 seconds
)

# With multiple dynamic image parameters
inputs = {
    "prompt_text": "A beautiful landscape",
    "image_35728": open("reference1.png", "rb"),
    "image_42561": open("reference2.jpg", "rb"),
    "style_image_789": "path/to/style.png"
}

try:
    result = flowscale.execute_workflow_async(
        workflow_id, 
        inputs, 
        group_id="batch_001",
        timeout=600,           # 10 minutes
        polling_interval=2     # Check every 2 seconds
    )
    print("Success:", result)
except Exception as e:
    if "timed out" in str(e):
        print("Workflow took too long to complete")
    else:
        print("Workflow error:", e)
If the workflow doesn’t complete within the timeout period, a FlowScaleTimeoutError exception will be raised.

Dynamic Parameter Support

The SDK supports dynamic parameter names that match your workflow’s requirements:
# Example with ComfyUI dynamic parameter names
inputs = {
    "image_35728": open("input.png", "rb"),      # Dynamic image parameter
    "image_42561": "path/to/second_image.jpg",   # Another dynamic image
    "text_input_123": "Your prompt here",        # Dynamic text parameter
    "seed_456": 12345                            # Dynamic numeric parameter
}

result = flowscale.execute_workflow_async("workflow_id", inputs)

Managing Workflow Runs

Get Run Details

Retrieve detailed information about a specific workflow run:
run_details = flowscale.get_run("808f34d0-ef97-4b78-a00f-1268077ea6db")
print(f"Run Details: {run_details}")

Get Multiple Runs with Pagination ✨

Retrieve workflow runs with advanced pagination, sorting, and filtering capabilities:
# Basic usage - get recent runs
runs = flowscale.get_runs()
print(f"Recent Runs: {runs}")

# Filter by group
group_runs = flowscale.get_runs(group_id="test_group")
print(f"Group Runs: {group_runs}")

# Pagination with custom page size
paginated_runs = flowscale.get_runs(page=2, page_size=20)
print(f"Page 2 Runs: {paginated_runs}")

# Sort by completion time (oldest first)
completed_runs = flowscale.get_runs(
    sort_by="completed_at", 
    sort_order="asc",
    page_size=50
)

# Advanced filtering and sorting
filtered_runs = flowscale.get_runs(
    group_id="production_batch",
    sort_by="started_at",
    sort_order="desc",
    page=1,
    page_size=25
)
Parameters:
  • group_id (string, optional): Filter runs by group identifier
  • sort_by (string, optional): Field to sort by - "created_at", "started_at", or "completed_at" (default: "created_at")
  • sort_order (string, optional): Sort order - "asc" or "desc" (default: "desc")
  • page (int, optional): Page number starting from 1 (default: 1)
  • page_size (int, optional): Number of items per page, 1-100 (default: 10)

Cancel Running Workflow

Cancel a workflow execution using its run ID:
result = flowscale.cancel_run("808f34d0-ef97-4b78-a00f-1268077ea6db")
print(f"Cancellation Result: {result}")

Retrieving Outputs

Get Workflow Output

Fetch the output of a completed workflow using its filename:
output = flowscale.get_output("filename_prefix_58358_5WWF7GQUYF.png")
print(f"Workflow Output: {output}")

Advanced Usage Examples

Batch Processing with Groups

import time
from pathlib import Path

def process_image_batch(image_folder, workflow_id):
    """Process multiple images in batches with grouping"""
    image_files = list(Path(image_folder).glob("*.{png,jpg,jpeg}"))
    group_id = f"batch_{int(time.time())}"
    
    results = []
    for i, image_path in enumerate(image_files):
        inputs = {
            "input_image": str(image_path),
            "prompt": f"Process image {i+1}",
            "batch_index": i
        }
        
        result = flowscale.execute_workflow_async(
            workflow_id, 
            inputs, 
            group_id=group_id
        )
        results.append(result)
        
    # Get all results for this batch
    batch_runs = flowscale.get_runs(group_id=group_id, page_size=100)
    return batch_runs

# Usage
results = process_image_batch("./input_images", "image-processor-v1")
print(f"Processed {len(results['data']['runs'])} images")

Error Handling and Retries

import time
from flowscale.exceptions import FlowScaleError, FlowScaleTimeoutError

def robust_workflow_execution(workflow_id, inputs, max_retries=3):
    """Execute workflow with retry logic"""
    for attempt in range(max_retries):
        try:
            result = flowscale.execute_workflow_async(
                workflow_id, 
                inputs,
                timeout=600,  # 10 minutes
                polling_interval=2
            )
            return result
            
        except FlowScaleTimeoutError as e:
            print(f"Attempt {attempt + 1} timed out: {e}")
            if attempt < max_retries - 1:
                print("Retrying with longer timeout...")
                time.sleep(5)
            else:
                raise
                
        except FlowScaleError as e:
            print(f"Attempt {attempt + 1} failed: {e}")
            if attempt < max_retries - 1:
                time.sleep(2 ** attempt)  # Exponential backoff
            else:
                raise
                
        except Exception as e:
            print(f"Unexpected error on attempt {attempt + 1}: {e}")
            if attempt < max_retries - 1:
                time.sleep(2 ** attempt)
            else:
                raise

# Usage
try:
    result = robust_workflow_execution("workflow_id", {"prompt": "Test"})
    print("Success:", result)
except Exception as e:
    print("Failed after all retries:", e)

Pagination Helper

def get_all_runs(group_id=None, sort_by="created_at"):
    """Example function to get all runs across multiple pages"""
    all_runs = []
    page = 1
    
    while True:
        response = flowscale.get_runs(
            group_id=group_id,
            sort_by=sort_by,
            page=page,
            page_size=100  # Max page size
        )
        
        runs = response["data"]["runs"]
        all_runs.extend(runs)
        
        # Check if we've reached the last page
        if page >= response["meta"]["total_pages"]:
            break
            
        page += 1
    
    return all_runs

# Usage
all_runs = get_all_runs(group_id="production_batch")
print(f"Total runs: {len(all_runs)}")

Framework Integration

FastAPI Integration

The SDK works seamlessly with FastAPI for building API endpoints:
from fastapi import FastAPI, UploadFile, File, Form
from flowscale import FlowscaleAPI

app = FastAPI()
flowscale = FlowscaleAPI(api_key, api_url)

@app.post("/execute_with_images")
async def execute_with_multiple_images(
    workflow_id: str = Form(...),
    image_35728: UploadFile = File(...),      # Dynamic parameter names
    image_42561: UploadFile = File(...),      # Multiple images supported
    text_prompt: str = Form("default prompt"),
    group_id: str = Form(None)
):
    """Execute workflow with multiple dynamic image parameters"""
    data = {
        "image_35728": image_35728.file,
        "image_42561": image_42561.file,
        "text_prompt": text_prompt
    }
    
    result = flowscale.execute_workflow_async(workflow_id, data, group_id)
    return result

@app.get("/runs")
async def get_runs_paginated(
    group_id: str = None,
    sort_by: str = "created_at",
    sort_order: str = "desc",
    page: int = 1,
    page_size: int = 10
):
    """Get paginated runs with sorting"""
    return flowscale.get_runs(group_id, sort_by, sort_order, page, page_size)

Django Integration

# views.py
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
import json
from flowscale import FlowscaleAPI

flowscale = FlowscaleAPI(
    api_key=settings.FLOWSCALE_API_KEY,
    api_url=settings.FLOWSCALE_API_URL
)

@csrf_exempt
@require_http_methods(["POST"])
def generate_image(request):
    try:
        data = json.loads(request.body)
        
        result = flowscale.execute_workflow_async(
            workflow_id=data['workflow_id'],
            inputs=data['inputs'],
            group_id=data.get('group_id')
        )
        
        return JsonResponse(result)
        
    except Exception as e:
        return JsonResponse({'error': str(e)}, status=500)

Flask Integration

from flask import Flask, request, jsonify
from flowscale import FlowscaleAPI

app = Flask(__name__)
flowscale = FlowscaleAPI(api_key, api_url)

@app.route('/generate', methods=['POST'])
def generate():
    try:
        data = request.get_json()
        
        result = flowscale.execute_workflow_async(
            workflow_id=data['workflow_id'],
            inputs=data['inputs'],
            group_id=data.get('group_id')
        )
        
        return jsonify(result)
        
    except Exception as e:
        return jsonify({'error': str(e)}), 500

if __name__ == '__main__':
    app.run(debug=True)

Jupyter Notebook Integration

# Cell 1: Setup
from flowscale import FlowscaleAPI
import os
from IPython.display import Image, display
import requests

flowscale = FlowscaleAPI(
    api_key=os.environ.get("FLOWSCALE_API_KEY"),
    api_url=os.environ.get("FLOWSCALE_API_URL")
)

# Cell 2: Generate image
inputs = {
    "text_51536": "A serene mountain landscape at sunset",
    "width": 1024,
    "height": 1024
}

result = flowscale.execute_workflow_async("text-to-image-v1", inputs)
print("Generation complete!")

# Cell 3: Display result
if result["status"] == "success":
    download_url = result["data"]["download_url"]
    
    # Download and display image
    response = requests.get(download_url)
    if response.status_code == 200:
        with open("generated_image.png", "wb") as f:
            f.write(response.content)
        display(Image("generated_image.png"))

File Handling

Working with Different File Types

# Using file objects directly
with open("input_image.png", "rb") as f:
    inputs = {
        "image_input": f,
        "prompt": "Transform this image"
    }
    result = flowscale.execute_workflow_async("workflow_id", inputs)

Downloading and Saving Results

import requests
from pathlib import Path

def download_result(result, output_dir="./outputs"):
    """Download workflow result to local file"""
    if result["status"] != "success":
        raise Exception(f"Workflow failed: {result}")
    
    download_url = result["data"]["download_url"]
    
    # Create output directory
    Path(output_dir).mkdir(exist_ok=True)
    
    # Download file
    response = requests.get(download_url)
    response.raise_for_status()
    
    # Extract filename from URL or generate one
    filename = download_url.split("/")[-1]
    if "?" in filename:
        filename = filename.split("?")[0]
    
    output_path = Path(output_dir) / filename
    
    with open(output_path, "wb") as f:
        f.write(response.content)
    
    return str(output_path)

# Usage
result = flowscale.execute_workflow_async("workflow_id", inputs)
local_path = download_result(result)
print(f"Downloaded to: {local_path}")

Error Handling

Exception Types

from flowscale.exceptions import (
    FlowScaleError,
    FlowScaleTimeoutError,
    FlowScaleValidationError,
    FlowScaleNetworkError
)

try:
    result = flowscale.execute_workflow_async("workflow_id", inputs)
    
except FlowScaleTimeoutError as e:
    print(f"Workflow timed out: {e}")
    # Handle timeout - maybe retry with longer timeout
    
except FlowScaleValidationError as e:
    print(f"Invalid inputs: {e}")
    # Handle validation error - fix inputs and retry
    
except FlowScaleNetworkError as e:
    print(f"Network error: {e}")
    # Handle network issue - maybe retry with backoff
    
except FlowScaleError as e:
    print(f"FlowScale API error: {e}")
    # Handle general API error
    
except Exception as e:
    print(f"Unexpected error: {e}")
    # Handle any other error

Comprehensive Error Handling Pattern

import logging
import time
from typing import Optional, Dict, Any

def execute_with_comprehensive_error_handling(
    workflow_id: str,
    inputs: Dict[str, Any],
    group_id: Optional[str] = None,
    max_retries: int = 3,
    timeout: int = 300
) -> Dict[str, Any]:
    """
    Execute workflow with comprehensive error handling and logging
    """
    logger = logging.getLogger(__name__)
    
    for attempt in range(max_retries):
        try:
            logger.info(f"Attempting workflow execution (attempt {attempt + 1}/{max_retries})")
            
            result = flowscale.execute_workflow_async(
                workflow_id=workflow_id,
                inputs=inputs,
                group_id=group_id,
                timeout=timeout
            )
            
            logger.info("Workflow executed successfully")
            return result
            
        except FlowScaleTimeoutError as e:
            logger.warning(f"Timeout on attempt {attempt + 1}: {e}")
            if attempt < max_retries - 1:
                # Increase timeout for next attempt
                timeout = min(timeout * 1.5, 1800)  # Max 30 minutes
                logger.info(f"Retrying with increased timeout: {timeout}s")
            else:
                logger.error("Max retries reached for timeout")
                raise
                
        except FlowScaleValidationError as e:
            logger.error(f"Validation error (not retrying): {e}")
            raise  # Don't retry validation errors
            
        except FlowScaleNetworkError as e:
            logger.warning(f"Network error on attempt {attempt + 1}: {e}")
            if attempt < max_retries - 1:
                delay = 2 ** attempt  # Exponential backoff
                logger.info(f"Retrying after {delay}s delay")
                time.sleep(delay)
            else:
                logger.error("Max retries reached for network error")
                raise
                
        except Exception as e:
            logger.error(f"Unexpected error on attempt {attempt + 1}: {e}")
            if attempt < max_retries - 1:
                time.sleep(1)
            else:
                raise
    
    raise Exception("Max retries exceeded")

Best Practices

Environment Configuration

Always store sensitive information such as API keys in environment variables:
from dotenv import load_dotenv
import os

# Load environment variables from .env file
load_dotenv()

from flowscale import FlowscaleAPI

flowscale = FlowscaleAPI(
    api_key=os.environ.get("FLOWSCALE_API_KEY"),
    api_url=os.environ.get("FLOWSCALE_API_URL")
)

Performance Optimization

import threading
from concurrent.futures import ThreadPoolExecutor
from flowscale import FlowscaleAPI

# Reuse client instances
flowscale = FlowscaleAPI(
    api_key=os.environ["FLOWSCALE_API_KEY"],
    api_url=os.environ["FLOWSCALE_API_URL"]
)

# Use connection pooling for high-volume applications
import requests

session = requests.Session()
session.mount('https://', requests.adapters.HTTPAdapter(
    pool_connections=10,
    pool_maxsize=20
))

flowscale = FlowscaleAPI(
    api_key=os.environ["FLOWSCALE_API_KEY"],
    api_url=os.environ["FLOWSCALE_API_URL"],
    session=session  # Use custom session with connection pooling
)

# Process multiple workflows concurrently
def process_concurrent_workflows(workflow_configs):
    """Process multiple workflows concurrently using ThreadPoolExecutor"""
    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = []
        
        for config in workflow_configs:
            future = executor.submit(
                flowscale.execute_workflow_async,
                config['workflow_id'],
                config['inputs'],
                config.get('group_id')
            )
            futures.append(future)
        
        results = []
        for future in futures:
            try:
                result = future.result(timeout=600)  # 10 minute timeout per workflow
                results.append(result)
            except Exception as e:
                print(f"Workflow failed: {e}")
                results.append({'error': str(e)})
        
        return results

# Cache results when appropriate
from functools import lru_cache
import hashlib
import json

@lru_cache(maxsize=100)
def get_cached_workflow_result(workflow_id: str, inputs_hash: str):
    """Cache workflow results for identical inputs"""
    # This is a simple example - in production, use a proper cache like Redis
    return None

def execute_with_cache(workflow_id: str, inputs: dict, group_id: str = None):
    """Execute workflow with caching"""
    # Create hash of inputs for cache key
    inputs_str = json.dumps(inputs, sort_keys=True)
    inputs_hash = hashlib.md5(inputs_str.encode()).hexdigest()
    
    # Check cache first
    cached_result = get_cached_workflow_result(workflow_id, inputs_hash)
    if cached_result:
        return cached_result
    
    # Execute workflow
    result = flowscale.execute_workflow_async(workflow_id, inputs, group_id)
    
    # Cache result (implement proper caching logic here)
    return result

Input Validation

from typing import Dict, Any, Union
import os
from pathlib import Path

def validate_workflow_inputs(inputs: Dict[str, Any]) -> Dict[str, Any]:
    """Validate and sanitize workflow inputs"""
    validated_inputs = {}
    
    for key, value in inputs.items():
        if isinstance(value, str):
            # Validate string inputs
            if len(value) > 10000:  # Reasonable limit for text inputs
                raise ValueError(f"Input '{key}' is too long ({len(value)} chars)")
            if not value.strip():  # Don't allow empty strings
                raise ValueError(f"Input '{key}' cannot be empty")
            validated_inputs[key] = value.strip()
            
        elif isinstance(value, (int, float)):
            # Validate numeric inputs
            if key.endswith('_width') or key.endswith('_height'):
                if not 64 <= value <= 2048:  # Reasonable image size limits
                    raise ValueError(f"Dimension '{key}' must be between 64 and 2048")
            validated_inputs[key] = value
            
        elif hasattr(value, 'read'):  # File-like object
            validated_inputs[key] = value
            
        elif isinstance(value, (str, Path)) and os.path.exists(value):  # File path
            file_path = Path(value)
            if file_path.stat().st_size > 50 * 1024 * 1024:  # 50MB limit
                raise ValueError(f"File '{key}' is too large (max 50MB)")
            validated_inputs[key] = value
            
        else:
            # Accept other types as-is but log for debugging
            print(f"Warning: Unvalidated input type for '{key}': {type(value)}")
            validated_inputs[key] = value
    
    return validated_inputs

# Usage
try:
    inputs = {
        "prompt": "A beautiful landscape",
        "width": 1024,
        "height": 1024,
        "image_input": "input.png"
    }
    
    validated_inputs = validate_workflow_inputs(inputs)
    result = flowscale.execute_workflow_async("workflow_id", validated_inputs)
    
except ValueError as e:
    print(f"Input validation failed: {e}")

Logging and Monitoring

import logging
import time
from functools import wraps

# Set up structured logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

logger = logging.getLogger(__name__)

def monitor_workflow_execution(func):
    """Decorator to monitor workflow execution metrics"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        workflow_id = args[0] if args else kwargs.get('workflow_id', 'unknown')
        
        logger.info(f"Starting workflow execution: {workflow_id}")
        
        try:
            result = func(*args, **kwargs)
            
            execution_time = time.time() - start_time
            logger.info(f"Workflow completed successfully: {workflow_id} (took {execution_time:.2f}s)")
            
            return result
            
        except Exception as e:
            execution_time = time.time() - start_time
            logger.error(f"Workflow failed: {workflow_id} after {execution_time:.2f}s - {e}")
            raise
    
    return wrapper

# Usage
@monitor_workflow_execution
def execute_monitored_workflow(workflow_id, inputs, group_id=None):
    return flowscale.execute_workflow_async(workflow_id, inputs, group_id)

# Example usage
result = execute_monitored_workflow("text-to-image-v1", {"prompt": "Test"})

Testing and Debugging

Unit Testing

import unittest
from unittest.mock import patch, MagicMock
from flowscale import FlowscaleAPI

class TestFlowScaleIntegration(unittest.TestCase):
    def setUp(self):
        self.flowscale = FlowscaleAPI(
            api_key="test_key",
            api_url="https://test.flowscale.ai"
        )
    
    @patch('flowscale.FlowscaleAPI.execute_workflow_async')
    def test_workflow_execution(self, mock_execute):
        # Mock successful response
        mock_execute.return_value = {
            "status": "success",
            "data": {
                "download_url": "https://example.com/result.png",
                "generation_status": "success"
            }
        }
        
        result = self.flowscale.execute_workflow_async(
            "test_workflow",
            {"prompt": "test"}
        )
        
        self.assertEqual(result["status"], "success")
        mock_execute.assert_called_once_with("test_workflow", {"prompt": "test"})
    
    @patch('flowscale.FlowscaleAPI.execute_workflow_async')
    def test_workflow_timeout(self, mock_execute):
        # Mock timeout error
        from flowscale.exceptions import FlowScaleTimeoutError
        mock_execute.side_effect = FlowScaleTimeoutError("Workflow timed out")
        
        with self.assertRaises(FlowScaleTimeoutError):
            self.flowscale.execute_workflow_async(
                "test_workflow",
                {"prompt": "test"},
                timeout=1
            )

if __name__ == '__main__':
    unittest.main()

Debug Mode Usage

from flowscale import FlowscaleAPI
import logging

# Enable debug logging
logging.basicConfig(level=logging.DEBUG)

flowscale = FlowscaleAPI(
    api_key=os.environ["FLOWSCALE_API_KEY"],
    api_url=os.environ["FLOWSCALE_API_URL"],
    enable_logging=True,
    log_level="debug"
)

# Test with debug information
try:
    result = flowscale.execute_workflow_async(
        "workflow_id",
        {"prompt": "debug test"},
        timeout=60
    )
    print("Success:", result)
    
except Exception as e:
    print("Error details:", e)
    # Debug logs will show detailed information about the failure

Examples & Recipes

Support

Need help with the Python SDK?