Your ComfyUI workflows are deployed as APIs—now let’s integrate them into Python applications. This SDK transforms your creative AI work into application features that users can access through web services, data pipelines, automation systems, and interactive tools.
What you’re building: Instead of asking users to understand ComfyUI or API endpoints, you’re creating seamless experiences where AI capabilities feel like native Python functions. Your workflows become the invisible intelligence powering data processing, web applications, and automated systems.
🚀 What’s New in v1.1.4
🔄 Updated Response Format : Both execute_workflow and execute_workflow_async now return workflow_id alongside run_id
✨ Enhanced Polling : Improved execute_workflow_async with better run status detection and failure handling
📚 Comprehensive Documentation : Advanced usage examples for FastAPI, Django, Flask, and Jupyter integration
🖼️ Dynamic Parameters : Full support for dynamic parameter names matching your ComfyUI workflow structure
📤 Multiple File Support : Handle multiple images, videos, and files in single workflow execution
📊 Pagination & Filtering : Enhanced get_runs() with sorting, filtering, and pagination capabilities
Installation: Adding AI Power to Your Python Projects
Ready to bring your AI workflows into your Python applications? Install the FlowScale SDK to start integrating your deployed ComfyUI workflows into any Python project.
What this enables: Your users won’t interact with ComfyUI or see API endpoints—they’ll experience AI capabilities as seamless Python functions. Image generation, style transfer, content creation, and other AI workflows become as simple as calling a method.
Quick Start: From Deployment to Integration
Prerequisites: You should have already deployed your ComfyUI workflow as an API through the FlowScale AI platform. If you haven’t, complete the Deploy Workflows guide first.
Connecting Your Python Application to Your AI Infrastructure
This is where your deployed AI workflows become Python application features. Import the SDK and connect to the production APIs you’ve already created:
Basic Usage
With Configuration
from flowscale import FlowscaleAPI
import os
# Store API keys in environment variables
api_key = os.environ.get( "FLOWSCALE_API_KEY" )
api_url = os.environ.get( "FLOWSCALE_API_URL" )
if not api_key or not api_url:
print ( "FLOWSCALE_API_KEY or FLOWSCALE_API_URL not set in environment" )
exit ( 1 )
flowscale = FlowscaleAPI(api_key, api_url)
Environment Variables
Add the following to your .env file:
FLOWSCALE_API_KEY = your-api-key
FLOWSCALE_API_URL = https://your-api-url.pod.flowscale.ai
Integration Patterns for Production Applications
Choose the integration pattern that matches your application’s needs:
Best for: API endpoints, web applications, microservicesBenefits:
Production-ready web framework integration
Built-in authentication and validation
Scalable request handling
Easy deployment and monitoring
Use cases: REST APIs, web applications, background job processing
Configuration
Client Options
from flowscale import FlowscaleAPI
flowscale = FlowscaleAPI(
api_key = "your-api-key" ,
api_url = "https://your-api-url.pod.flowscale.ai" ,
timeout = 300 , # Request timeout in seconds (default: 300)
max_retries = 3 , # Number of retry attempts (default: 3)
retry_delay = 1.0 , # Delay between retries in seconds (default: 1.0)
enable_logging = True , # Enable SDK logging (default: True)
log_level = "info" , # Log level: debug, info, warn, error (default: info)
user_agent = "MyApp/1.0" # Custom user agent string
)
Logging Configuration
The SDK includes configurable logging to help with debugging and monitoring:
import logging
from flowscale import FlowscaleAPI
# Configure logging during initialization
flowscale = FlowscaleAPI(
api_key = "your-api-key" ,
api_url = "https://your-api-url.pod.flowscale.ai" ,
enable_logging = True ,
log_level = "debug"
)
# Change logging settings after initialization
flowscale.set_logging_config( enabled = True , level = "debug" )
# Disable logging completely
flowscale.set_logging_config( enabled = False )
# Use custom logger
custom_logger = logging.getLogger( "my_app.flowscale" )
flowscale.set_logger(custom_logger)
debug : Most verbose, includes detailed operation information
info : General information about operations (default)
warn : Warning messages for potential issues
error : Only error messages
All logs are prefixed with timestamp and level: [FlowScale INFO] 2025-01-15 10:30:45,123: Executing workflow bncu0a1kipv
[FlowScale DEBUG] 2025-01-15 10:30:46,456: Polling workflow output: {'status': 'running'}
Core Methods
Check Health Status
Monitor the health of the FlowScale platform, including container status:
health = flowscale.check_health()
print ( f "API Health: { health } " )
{
"status" : "success" ,
"data" : [
{
"container" : "container #1" ,
"status" : "idle"
},
{
"container" : "container #2" ,
"status" : "running"
}
]
}
Get Queue Status
Retrieve the current workflow queue status:
queue = flowscale.get_queue()
print ( f "Queue Details: { queue } " )
{
"status" : "success" ,
"data" : [
{
"container" : "container #1" ,
"queue" : {
"queue_running" : [
[ 0 , "2a0babc4-acce-4521-9576-00fa0e6ecc91" ]
],
"queue_pending" : [
[ 1 , "5d60718a-7e89-4c64-b32d-0d1366b44e2a" ]
]
}
}
]
}
Workflow Execution
Execute Workflow (Synchronous)
Trigger a workflow execution and get immediate response:
workflow_id = "bncu0a1kipv"
group_id = "test_group" # Optional for organizing runs
# Basic usage with text inputs
inputs = {
"text_51536" : "Prompt test" ,
"another_param" : "some value"
}
# Advanced usage with files and dynamic parameters
inputs = {
"text_51536" : "A beautiful sunset over mountains" ,
"image_35728" : open ( "path/to/image.png" , "rb" ), # File object
"image_42561" : "path/to/another_image.jpg" , # File path (auto-detected)
"video_1239" : open ( "path/to/video.mp4" , "rb" ) # Multiple file types
}
result = flowscale.execute_workflow(workflow_id, inputs, group_id)
print ( f "Workflow Result: { result } " )
{
"status" : "success" ,
"data" : {
"run_id" : "808f34d0-ef97-4b78-a00f-1268077ea6db" ,
"workflow_id" : "550e8400-e29b-41d4-a716-446655440000"
}
}
Execute Workflow (Asynchronous with Auto-Polling)
Execute a workflow and automatically poll for its output until completion:
workflow_id = "bncu0a1kipv"
# Simple usage
inputs = {
"text_51536" : "A serene lake at dawn" ,
"image_1234" : open ( "path/to/image.png" , "rb" )
}
# Auto-polls every 2 seconds, times out after 10 minutes by default
result = flowscale.execute_workflow_async(workflow_id, inputs)
print ( f "Workflow Result: { result } " )
# With custom timeout and polling interval
result = flowscale.execute_workflow_async(
workflow_id,
inputs,
group_id = "my_group" ,
timeout = 600 , # 10 minutes
polling_interval = 5 # Check every 5 seconds
)
# With multiple dynamic image parameters
inputs = {
"prompt_text" : "A beautiful landscape" ,
"image_35728" : open ( "reference1.png" , "rb" ),
"image_42561" : open ( "reference2.jpg" , "rb" ),
"style_image_789" : "path/to/style.png"
}
try :
result = flowscale.execute_workflow_async(
workflow_id,
inputs,
group_id = "batch_001" ,
timeout = 600 , # 10 minutes
polling_interval = 2 # Check every 2 seconds
)
print ( "Success:" , result)
except Exception as e:
if "timed out" in str (e):
print ( "Workflow took too long to complete" )
else :
print ( "Workflow error:" , e)
{
"run_id" : "808f34d0-ef97-4b78-a00f-1268077ea6db" ,
"workflow_id" : "550e8400-e29b-41d4-a716-446655440000" ,
"outputs" : {
"filename_prefix_58358_5WWF7GQUYF" : {
"status" : "success" ,
"data" : {
"download_url" : "https://runs.s3.amazonaws.com/generations/..." ,
"generation_status" : "success"
}
}
},
"total_outputs" : 1 ,
"status" : "completed"
}
If the workflow doesn’t complete within the timeout period, a FlowScaleTimeoutError exception will be raised.
Dynamic Parameter Support
The SDK supports dynamic parameter names that match your workflow’s requirements:
Dynamic Image Parameters
Multiple File Types
# Example with ComfyUI dynamic parameter names
inputs = {
"image_35728" : open ( "input.png" , "rb" ), # Dynamic image parameter
"image_42561" : "path/to/second_image.jpg" , # Another dynamic image
"text_input_123" : "Your prompt here" , # Dynamic text parameter
"seed_456" : 12345 # Dynamic numeric parameter
}
result = flowscale.execute_workflow_async( "workflow_id" , inputs)
Managing Workflow Runs
Get Run Details
Retrieve detailed information about a specific workflow run:
run_details = flowscale.get_run( "808f34d0-ef97-4b78-a00f-1268077ea6db" )
print ( f "Run Details: { run_details } " )
{
"status" : "success" ,
"data" : {
"_id" : "808f34d0-ef97-4b78-a00f-1268077ea6db" ,
"status" : "completed" ,
"inputs" : [
{
"path" : "text_51536" ,
"value" : "a man riding a bike"
}
],
"outputs" : [
{
"filename" : "filename_prefix_58358_5WWF7GQUYF.png" ,
"url" : "https://runs.s3.amazonaws.com/generations/..."
}
],
"created_at" : "2024-01-15T10:30:00Z" ,
"started_at" : "2024-01-15T10:30:15Z" ,
"completed_at" : "2024-01-15T10:32:45Z"
}
}
Retrieve workflow runs with advanced pagination, sorting, and filtering capabilities:
# Basic usage - get recent runs
runs = flowscale.get_runs()
print ( f "Recent Runs: { runs } " )
# Filter by group
group_runs = flowscale.get_runs( group_id = "test_group" )
print ( f "Group Runs: { group_runs } " )
# Pagination with custom page size
paginated_runs = flowscale.get_runs( page = 2 , page_size = 20 )
print ( f "Page 2 Runs: { paginated_runs } " )
# Sort by completion time (oldest first)
completed_runs = flowscale.get_runs(
sort_by = "completed_at" ,
sort_order = "asc" ,
page_size = 50
)
# Advanced filtering and sorting
filtered_runs = flowscale.get_runs(
group_id = "production_batch" ,
sort_by = "started_at" ,
sort_order = "desc" ,
page = 1 ,
page_size = 25
)
Parameters:
group_id (string, optional) : Filter runs by group identifier
sort_by (string, optional) : Field to sort by - "created_at", "started_at", or "completed_at" (default: "created_at")
sort_order (string, optional) : Sort order - "asc" or "desc" (default: "desc")
page (int, optional) : Page number starting from 1 (default: 1)
page_size (int, optional) : Number of items per page, 1-100 (default: 10)
Enhanced Response with Metadata
Cancel Running Workflow
Cancel a workflow execution using its run ID:
result = flowscale.cancel_run( "808f34d0-ef97-4b78-a00f-1268077ea6db" )
print ( f "Cancellation Result: { result } " )
{
"status" : "success" ,
"data" : "Run cancelled successfully"
}
Retrieving Outputs
Get Workflow Output
Fetch the output of a completed workflow using its filename:
output = flowscale.get_output( "filename_prefix_58358_5WWF7GQUYF.png" )
print ( f "Workflow Output: { output } " )
{
"status" : "success" ,
"data" : {
"download_url" : "https://runs.s3.amazonaws.com/generations/..." ,
"generation_status" : "success"
}
}
Advanced Usage Examples
Batch Processing with Groups
import time
from pathlib import Path
def process_image_batch ( image_folder , workflow_id ):
"""Process multiple images in batches with grouping"""
image_files = list (Path(image_folder).glob( "*.{png,jpg,jpeg}" ))
group_id = f "batch_ { int (time.time()) } "
results = []
for i, image_path in enumerate (image_files):
inputs = {
"input_image" : str (image_path),
"prompt" : f "Process image { i + 1 } " ,
"batch_index" : i
}
result = flowscale.execute_workflow_async(
workflow_id,
inputs,
group_id = group_id
)
results.append(result)
# Get all results for this batch
batch_runs = flowscale.get_runs( group_id = group_id, page_size = 100 )
return batch_runs
# Usage
results = process_image_batch( "./input_images" , "image-processor-v1" )
print ( f "Processed { len (results[ 'data' ][ 'runs' ]) } images" )
Error Handling and Retries
import time
from flowscale.exceptions import FlowScaleError, FlowScaleTimeoutError
def robust_workflow_execution ( workflow_id , inputs , max_retries = 3 ):
"""Execute workflow with retry logic"""
for attempt in range (max_retries):
try :
result = flowscale.execute_workflow_async(
workflow_id,
inputs,
timeout = 600 , # 10 minutes
polling_interval = 2
)
return result
except FlowScaleTimeoutError as e:
print ( f "Attempt { attempt + 1 } timed out: { e } " )
if attempt < max_retries - 1 :
print ( "Retrying with longer timeout..." )
time.sleep( 5 )
else :
raise
except FlowScaleError as e:
print ( f "Attempt { attempt + 1 } failed: { e } " )
if attempt < max_retries - 1 :
time.sleep( 2 ** attempt) # Exponential backoff
else :
raise
except Exception as e:
print ( f "Unexpected error on attempt { attempt + 1 } : { e } " )
if attempt < max_retries - 1 :
time.sleep( 2 ** attempt)
else :
raise
# Usage
try :
result = robust_workflow_execution( "workflow_id" , { "prompt" : "Test" })
print ( "Success:" , result)
except Exception as e:
print ( "Failed after all retries:" , e)
def get_all_runs ( group_id = None , sort_by = "created_at" ):
"""Example function to get all runs across multiple pages"""
all_runs = []
page = 1
while True :
response = flowscale.get_runs(
group_id = group_id,
sort_by = sort_by,
page = page,
page_size = 100 # Max page size
)
runs = response[ "data" ][ "runs" ]
all_runs.extend(runs)
# Check if we've reached the last page
if page >= response[ "meta" ][ "total_pages" ]:
break
page += 1
return all_runs
# Usage
all_runs = get_all_runs( group_id = "production_batch" )
print ( f "Total runs: { len (all_runs) } " )
Framework Integration
FastAPI Integration
The SDK works seamlessly with FastAPI for building API endpoints:
from fastapi import FastAPI, UploadFile, File, Form
from flowscale import FlowscaleAPI
app = FastAPI()
flowscale = FlowscaleAPI(api_key, api_url)
@app.post ( "/execute_with_images" )
async def execute_with_multiple_images (
workflow_id : str = Form( ... ),
image_35728 : UploadFile = File( ... ), # Dynamic parameter names
image_42561 : UploadFile = File( ... ), # Multiple images supported
text_prompt : str = Form( "default prompt" ),
group_id : str = Form( None )
):
"""Execute workflow with multiple dynamic image parameters"""
data = {
"image_35728" : image_35728.file,
"image_42561" : image_42561.file,
"text_prompt" : text_prompt
}
result = flowscale.execute_workflow_async(workflow_id, data, group_id)
return result
@app.get ( "/runs" )
async def get_runs_paginated (
group_id : str = None ,
sort_by : str = "created_at" ,
sort_order : str = "desc" ,
page : int = 1 ,
page_size : int = 10
):
"""Get paginated runs with sorting"""
return flowscale.get_runs(group_id, sort_by, sort_order, page, page_size)
Django Integration
# views.py
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
import json
from flowscale import FlowscaleAPI
flowscale = FlowscaleAPI(
api_key = settings. FLOWSCALE_API_KEY ,
api_url = settings. FLOWSCALE_API_URL
)
@csrf_exempt
@require_http_methods ([ "POST" ])
def generate_image ( request ):
try :
data = json.loads(request.body)
result = flowscale.execute_workflow_async(
workflow_id = data[ 'workflow_id' ],
inputs = data[ 'inputs' ],
group_id = data.get( 'group_id' )
)
return JsonResponse(result)
except Exception as e:
return JsonResponse({ 'error' : str (e)}, status = 500 )
Flask Integration
from flask import Flask, request, jsonify
from flowscale import FlowscaleAPI
app = Flask( __name__ )
flowscale = FlowscaleAPI(api_key, api_url)
@app.route ( '/generate' , methods = [ 'POST' ])
def generate ():
try :
data = request.get_json()
result = flowscale.execute_workflow_async(
workflow_id = data[ 'workflow_id' ],
inputs = data[ 'inputs' ],
group_id = data.get( 'group_id' )
)
return jsonify(result)
except Exception as e:
return jsonify({ 'error' : str (e)}), 500
if __name__ == '__main__' :
app.run( debug = True )
Jupyter Notebook Integration
# Cell 1: Setup
from flowscale import FlowscaleAPI
import os
from IPython.display import Image, display
import requests
flowscale = FlowscaleAPI(
api_key = os.environ.get( "FLOWSCALE_API_KEY" ),
api_url = os.environ.get( "FLOWSCALE_API_URL" )
)
# Cell 2: Generate image
inputs = {
"text_51536" : "A serene mountain landscape at sunset" ,
"width" : 1024 ,
"height" : 1024
}
result = flowscale.execute_workflow_async( "text-to-image-v1" , inputs)
print ( "Generation complete!" )
# Cell 3: Display result
if result[ "status" ] == "success" :
download_url = result[ "data" ][ "download_url" ]
# Download and display image
response = requests.get(download_url)
if response.status_code == 200 :
with open ( "generated_image.png" , "wb" ) as f:
f.write(response.content)
display(Image( "generated_image.png" ))
File Handling
Working with Different File Types
File Objects
File Paths
Base64 Data
Bytes Data
# Using file objects directly
with open ( "input_image.png" , "rb" ) as f:
inputs = {
"image_input" : f,
"prompt" : "Transform this image"
}
result = flowscale.execute_workflow_async( "workflow_id" , inputs)
Downloading and Saving Results
import requests
from pathlib import Path
def download_result ( result , output_dir = "./outputs" ):
"""Download workflow result to local file"""
if result[ "status" ] != "success" :
raise Exception ( f "Workflow failed: { result } " )
download_url = result[ "data" ][ "download_url" ]
# Create output directory
Path(output_dir).mkdir( exist_ok = True )
# Download file
response = requests.get(download_url)
response.raise_for_status()
# Extract filename from URL or generate one
filename = download_url.split( "/" )[ - 1 ]
if "?" in filename:
filename = filename.split( "?" )[ 0 ]
output_path = Path(output_dir) / filename
with open (output_path, "wb" ) as f:
f.write(response.content)
return str (output_path)
# Usage
result = flowscale.execute_workflow_async( "workflow_id" , inputs)
local_path = download_result(result)
print ( f "Downloaded to: { local_path } " )
Error Handling
Exception Types
from flowscale.exceptions import (
FlowScaleError,
FlowScaleTimeoutError,
FlowScaleValidationError,
FlowScaleNetworkError
)
try :
result = flowscale.execute_workflow_async( "workflow_id" , inputs)
except FlowScaleTimeoutError as e:
print ( f "Workflow timed out: { e } " )
# Handle timeout - maybe retry with longer timeout
except FlowScaleValidationError as e:
print ( f "Invalid inputs: { e } " )
# Handle validation error - fix inputs and retry
except FlowScaleNetworkError as e:
print ( f "Network error: { e } " )
# Handle network issue - maybe retry with backoff
except FlowScaleError as e:
print ( f "FlowScale API error: { e } " )
# Handle general API error
except Exception as e:
print ( f "Unexpected error: { e } " )
# Handle any other error
Comprehensive Error Handling Pattern
import logging
import time
from typing import Optional, Dict, Any
def execute_with_comprehensive_error_handling (
workflow_id : str ,
inputs : Dict[ str , Any],
group_id : Optional[ str ] = None ,
max_retries : int = 3 ,
timeout : int = 300
) -> Dict[ str , Any]:
"""
Execute workflow with comprehensive error handling and logging
"""
logger = logging.getLogger( __name__ )
for attempt in range (max_retries):
try :
logger.info( f "Attempting workflow execution (attempt { attempt + 1 } / { max_retries } )" )
result = flowscale.execute_workflow_async(
workflow_id = workflow_id,
inputs = inputs,
group_id = group_id,
timeout = timeout
)
logger.info( "Workflow executed successfully" )
return result
except FlowScaleTimeoutError as e:
logger.warning( f "Timeout on attempt { attempt + 1 } : { e } " )
if attempt < max_retries - 1 :
# Increase timeout for next attempt
timeout = min (timeout * 1.5 , 1800 ) # Max 30 minutes
logger.info( f "Retrying with increased timeout: { timeout } s" )
else :
logger.error( "Max retries reached for timeout" )
raise
except FlowScaleValidationError as e:
logger.error( f "Validation error (not retrying): { e } " )
raise # Don't retry validation errors
except FlowScaleNetworkError as e:
logger.warning( f "Network error on attempt { attempt + 1 } : { e } " )
if attempt < max_retries - 1 :
delay = 2 ** attempt # Exponential backoff
logger.info( f "Retrying after { delay } s delay" )
time.sleep(delay)
else :
logger.error( "Max retries reached for network error" )
raise
except Exception as e:
logger.error( f "Unexpected error on attempt { attempt + 1 } : { e } " )
if attempt < max_retries - 1 :
time.sleep( 1 )
else :
raise
raise Exception ( "Max retries exceeded" )
Best Practices
Environment Configuration
Always store sensitive information such as API keys in environment variables:
Using python-dotenv
Using os.environ directly
from dotenv import load_dotenv
import os
# Load environment variables from .env file
load_dotenv()
from flowscale import FlowscaleAPI
flowscale = FlowscaleAPI(
api_key = os.environ.get( "FLOWSCALE_API_KEY" ),
api_url = os.environ.get( "FLOWSCALE_API_URL" )
)
import threading
from concurrent.futures import ThreadPoolExecutor
from flowscale import FlowscaleAPI
# Reuse client instances
flowscale = FlowscaleAPI(
api_key = os.environ[ "FLOWSCALE_API_KEY" ],
api_url = os.environ[ "FLOWSCALE_API_URL" ]
)
# Use connection pooling for high-volume applications
import requests
session = requests.Session()
session.mount( 'https://' , requests.adapters.HTTPAdapter(
pool_connections = 10 ,
pool_maxsize = 20
))
flowscale = FlowscaleAPI(
api_key = os.environ[ "FLOWSCALE_API_KEY" ],
api_url = os.environ[ "FLOWSCALE_API_URL" ],
session = session # Use custom session with connection pooling
)
# Process multiple workflows concurrently
def process_concurrent_workflows ( workflow_configs ):
"""Process multiple workflows concurrently using ThreadPoolExecutor"""
with ThreadPoolExecutor( max_workers = 5 ) as executor:
futures = []
for config in workflow_configs:
future = executor.submit(
flowscale.execute_workflow_async,
config[ 'workflow_id' ],
config[ 'inputs' ],
config.get( 'group_id' )
)
futures.append(future)
results = []
for future in futures:
try :
result = future.result( timeout = 600 ) # 10 minute timeout per workflow
results.append(result)
except Exception as e:
print ( f "Workflow failed: { e } " )
results.append({ 'error' : str (e)})
return results
# Cache results when appropriate
from functools import lru_cache
import hashlib
import json
@lru_cache ( maxsize = 100 )
def get_cached_workflow_result ( workflow_id : str , inputs_hash : str ):
"""Cache workflow results for identical inputs"""
# This is a simple example - in production, use a proper cache like Redis
return None
def execute_with_cache ( workflow_id : str , inputs : dict , group_id : str = None ):
"""Execute workflow with caching"""
# Create hash of inputs for cache key
inputs_str = json.dumps(inputs, sort_keys = True )
inputs_hash = hashlib.md5(inputs_str.encode()).hexdigest()
# Check cache first
cached_result = get_cached_workflow_result(workflow_id, inputs_hash)
if cached_result:
return cached_result
# Execute workflow
result = flowscale.execute_workflow_async(workflow_id, inputs, group_id)
# Cache result (implement proper caching logic here)
return result
from typing import Dict, Any, Union
import os
from pathlib import Path
def validate_workflow_inputs ( inputs : Dict[ str , Any]) -> Dict[ str , Any]:
"""Validate and sanitize workflow inputs"""
validated_inputs = {}
for key, value in inputs.items():
if isinstance (value, str ):
# Validate string inputs
if len (value) > 10000 : # Reasonable limit for text inputs
raise ValueError ( f "Input ' { key } ' is too long ( { len (value) } chars)" )
if not value.strip(): # Don't allow empty strings
raise ValueError ( f "Input ' { key } ' cannot be empty" )
validated_inputs[key] = value.strip()
elif isinstance (value, ( int , float )):
# Validate numeric inputs
if key.endswith( '_width' ) or key.endswith( '_height' ):
if not 64 <= value <= 2048 : # Reasonable image size limits
raise ValueError ( f "Dimension ' { key } ' must be between 64 and 2048" )
validated_inputs[key] = value
elif hasattr (value, 'read' ): # File-like object
validated_inputs[key] = value
elif isinstance (value, ( str , Path)) and os.path.exists(value): # File path
file_path = Path(value)
if file_path.stat().st_size > 50 * 1024 * 1024 : # 50MB limit
raise ValueError ( f "File ' { key } ' is too large (max 50MB)" )
validated_inputs[key] = value
else :
# Accept other types as-is but log for debugging
print ( f "Warning: Unvalidated input type for ' { key } ': { type (value) } " )
validated_inputs[key] = value
return validated_inputs
# Usage
try :
inputs = {
"prompt" : "A beautiful landscape" ,
"width" : 1024 ,
"height" : 1024 ,
"image_input" : "input.png"
}
validated_inputs = validate_workflow_inputs(inputs)
result = flowscale.execute_workflow_async( "workflow_id" , validated_inputs)
except ValueError as e:
print ( f "Input validation failed: { e } " )
Logging and Monitoring
import logging
import time
from functools import wraps
# Set up structured logging
logging.basicConfig(
level = logging. INFO ,
format = ' %(asctime)s - %(name)s - %(levelname)s - %(message)s '
)
logger = logging.getLogger( __name__ )
def monitor_workflow_execution ( func ):
"""Decorator to monitor workflow execution metrics"""
@wraps (func)
def wrapper ( * args , ** kwargs ):
start_time = time.time()
workflow_id = args[ 0 ] if args else kwargs.get( 'workflow_id' , 'unknown' )
logger.info( f "Starting workflow execution: { workflow_id } " )
try :
result = func( * args, ** kwargs)
execution_time = time.time() - start_time
logger.info( f "Workflow completed successfully: { workflow_id } (took { execution_time :.2f} s)" )
return result
except Exception as e:
execution_time = time.time() - start_time
logger.error( f "Workflow failed: { workflow_id } after { execution_time :.2f} s - { e } " )
raise
return wrapper
# Usage
@monitor_workflow_execution
def execute_monitored_workflow ( workflow_id , inputs , group_id = None ):
return flowscale.execute_workflow_async(workflow_id, inputs, group_id)
# Example usage
result = execute_monitored_workflow( "text-to-image-v1" , { "prompt" : "Test" })
Testing and Debugging
Unit Testing
import unittest
from unittest.mock import patch, MagicMock
from flowscale import FlowscaleAPI
class TestFlowScaleIntegration ( unittest . TestCase ):
def setUp ( self ):
self .flowscale = FlowscaleAPI(
api_key = "test_key" ,
api_url = "https://test.flowscale.ai"
)
@patch ( 'flowscale.FlowscaleAPI.execute_workflow_async' )
def test_workflow_execution ( self , mock_execute ):
# Mock successful response
mock_execute.return_value = {
"status" : "success" ,
"data" : {
"download_url" : "https://example.com/result.png" ,
"generation_status" : "success"
}
}
result = self .flowscale.execute_workflow_async(
"test_workflow" ,
{ "prompt" : "test" }
)
self .assertEqual(result[ "status" ], "success" )
mock_execute.assert_called_once_with( "test_workflow" , { "prompt" : "test" })
@patch ( 'flowscale.FlowscaleAPI.execute_workflow_async' )
def test_workflow_timeout ( self , mock_execute ):
# Mock timeout error
from flowscale.exceptions import FlowScaleTimeoutError
mock_execute.side_effect = FlowScaleTimeoutError( "Workflow timed out" )
with self .assertRaises(FlowScaleTimeoutError):
self .flowscale.execute_workflow_async(
"test_workflow" ,
{ "prompt" : "test" },
timeout = 1
)
if __name__ == '__main__' :
unittest.main()
Debug Mode Usage
from flowscale import FlowscaleAPI
import logging
# Enable debug logging
logging.basicConfig( level = logging. DEBUG )
flowscale = FlowscaleAPI(
api_key = os.environ[ "FLOWSCALE_API_KEY" ],
api_url = os.environ[ "FLOWSCALE_API_URL" ],
enable_logging = True ,
log_level = "debug"
)
# Test with debug information
try :
result = flowscale.execute_workflow_async(
"workflow_id" ,
{ "prompt" : "debug test" },
timeout = 60
)
print ( "Success:" , result)
except Exception as e:
print ( "Error details:" , e)
# Debug logs will show detailed information about the failure
Examples & Recipes
Support
Need help with the Python SDK?