MCP Batching Strategies: Efficient Bulk Operations

Master MCP batching techniques for bulk task creation, updates, and queries. Learn optimal batch sizes, error handling patterns, and performance optimization strategies.

Why Batch Operations?

Batching improves performance and avoids rate limits when working with multiple tasks or projects. Instead of making hundreds of individual API calls, batch them into groups.

Benefits

  • Avoid rate limits (429 errors)
  • Faster overall execution
  • Better error handling
  • More predictable performance
  • Easier progress tracking

Use Cases

  • Bulk task creation from specs
  • Status updates for multiple tasks
  • Batch title/description cleanup
  • Project migration
  • Reporting across many tasks

Recommended Batch Sizes

Operation Type Batch Size Delay Between Batches Reasoning
Read (list_tasks) 50-100 None Lightweight, use pagination
Get (get_task) 20-30 100ms Individual lookups, moderate
Create (create_task) 10-20 500ms Write operations, validation
Update (update_task) 10-20 500ms Write operations, validation
Delete (delete_task) 5-10 1000ms Expensive, cascade logic

Basic Batching Pattern

Simple Batch Loop

import time

def batch_process(items, batch_size=10, delay=0.5):
    """Process items in batches with delay between batches."""
    results = []
    
    for i in range(0, len(items), batch_size):
        batch = items[i:i+batch_size]
        print(f"Processing batch {i//batch_size + 1}/{(len(items)-1)//batch_size + 1}")
        
        for item in batch:
            result = process_item(item)
            results.append(result)
        
        # Delay between batches (except after last batch)
        if i + batch_size < len(items):
            time.sleep(delay)
    
    return results

Batching Strategies

1. Sequential Batching

Process batches one after another:

Example: Bulk Task Creation

def create_tasks_batch(task_data_list):
    """Create multiple tasks in batches."""
    BATCH_SIZE = 10
    DELAY = 0.5  # 500ms between batches
    created_tasks = []
    
    for i in range(0, len(task_data_list), BATCH_SIZE):
        batch = task_data_list[i:i+BATCH_SIZE]
        
        for task_data in batch:
            try:
                task = create_task(
                    title=task_data['title'],
                    project_id=task_data['project_id'],
                    description=task_data.get('description')
                )
                created_tasks.append(task)
                print(f"✓ Created: {task_data['title']}")
            except Exception as e:
                print(f"✗ Failed: {task_data['title']} - {e}")
        
        time.sleep(DELAY)
    
    return created_tasks

2. Parallel Batching

Process items within a batch concurrently (respecting concurrent limits):

Example: Concurrent Updates

import asyncio

async def update_tasks_parallel(task_updates):
    """Update tasks with controlled concurrency."""
    BATCH_SIZE = 10
    MAX_CONCURRENT = 5  # Don't exceed rate limit
    DELAY_BETWEEN_BATCHES = 1.0
    
    semaphore = asyncio.Semaphore(MAX_CONCURRENT)
    
    async def update_with_semaphore(task_id, updates):
        async with semaphore:
            return await update_task_async(task_id, **updates)
    
    for i in range(0, len(task_updates), BATCH_SIZE):
        batch = task_updates[i:i+BATCH_SIZE]
        
        tasks = [
            update_with_semaphore(item['id'], item['updates'])
            for item in batch
        ]
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        await asyncio.sleep(DELAY_BETWEEN_BATCHES)
    
    return results

3. Progressive Batching

Adjust batch size based on success rate:

Adaptive Batch Size

def adaptive_batch_process(items):
    """Adjust batch size based on errors."""
    batch_size = 20  # Start optimistic
    min_batch = 5
    max_batch = 50
    
    i = 0
    while i < len(items):
        batch = items[i:i+batch_size]
        errors = 0
        
        for item in batch:
            try:
                process_item(item)
            except Exception:
                errors += 1
        
        # Adjust batch size based on error rate
        if errors > len(batch) * 0.2:  # >20% errors
            batch_size = max(min_batch, batch_size // 2)
        elif errors == 0:  # No errors
            batch_size = min(max_batch, batch_size + 5)
        
        i += len(batch)
        time.sleep(0.5)

Error Handling

Robust Error Handling Pattern

def batch_with_retry(items, max_retries=3):
    """Batch processing with retry logic."""
    results = []
    failed = []
    
    for i in range(0, len(items), 10):
        batch = items[i:i+10]
        
        for item in batch:
            retry_count = 0
            
            while retry_count < max_retries:
                try:
                    result = process_item(item)
                    results.append({'item': item, 'result': result, 'success': True})
                    break  # Success, exit retry loop
                    
                except RateLimitError:
                    # Rate limited - exponential backoff
                    wait_time = 2 ** retry_count
                    time.sleep(wait_time)
                    retry_count += 1
                    
                except ValidationError as e:
                    # Validation error - don't retry
                    failed.append({'item': item, 'error': str(e)})
                    break
                    
                except Exception as e:
                    # Other error - retry
                    retry_count += 1
                    if retry_count >= max_retries:
                        failed.append({'item': item, 'error': str(e)})
        
        time.sleep(0.5)  # Delay between batches
    
    return {'success': results, 'failed': failed}

Progress Tracking

Track Batch Progress

def batch_with_progress(items, batch_size=10):
    """Batch processing with progress indicator."""
    total_batches = (len(items) - 1) // batch_size + 1
    processed = 0
    
    for batch_num in range(total_batches):
        start_idx = batch_num * batch_size
        batch = items[start_idx:start_idx+batch_size]
        
        print(f"\nBatch {batch_num + 1}/{total_batches}")
        print(f"Progress: {processed}/{len(items)} items")
        
        for item in batch:
            process_item(item)
            processed += 1
            
            # Progress bar
            percent = (processed / len(items)) * 100
            bar = '█' * int(percent / 2) + '░' * (50 - int(percent / 2))
            print(f"\r[{bar}] {percent:.1f}%", end='')
        
        time.sleep(0.5)
    
    print(f"\n\nCompleted: {processed}/{len(items)} items")

Real-World Examples

Example 1: Sprint Planning

Create 50 Tasks from Spec

# From meeting notes, create sprint tasks
tasks_to_create = [
    {"title": "Setup API endpoints", "project_id": 123},
    {"title": "Design database schema", "project_id": 123},
    # ... 48 more tasks
]

# Batch create with 10 tasks per batch
created = create_tasks_batch(tasks_to_create)
print(f"Created {len(created)} tasks")

Example 2: Status Update

Mark 100 Completed Tasks as Done

# Get all in_progress tasks
tasks = list_tasks(status="in_progress", limit=100)

# Batch update to "done"
for i in range(0, len(tasks), 10):
    batch = tasks[i:i+10]
    
    for task in batch:
        update_task(task['id'], status="done")
    
    time.sleep(0.5)  # 500ms between batches

Best Practices

Batching Guidelines

  • Start conservative: Begin with smaller batches, increase if stable
  • Add delays: Always add delay between batches to avoid rate limits
  • Handle errors: Implement retry logic and collect failures
  • Track progress: Log batch numbers and success/failure counts
  • Respect limits: Stay under concurrent request limits
  • Test first: Try with 5-10 items before running full batch
  • Monitor responses: Watch for rate limit headers and slow responses

Related Resources