MCP Batching Strategies: Efficient Bulk Operations
Master MCP batching techniques for bulk task creation, updates, and queries. Learn optimal batch sizes, error handling patterns, and performance optimization strategies.
Why Batch Operations?
Batching improves performance and avoids rate limits when working with multiple tasks or projects. Instead of making hundreds of individual API calls, batch them into groups.
Benefits
- Avoid rate limits (429 errors)
- Faster overall execution
- Better error handling
- More predictable performance
- Easier progress tracking
Use Cases
- Bulk task creation from specs
- Status updates for multiple tasks
- Batch title/description cleanup
- Project migration
- Reporting across many tasks
Recommended Batch Sizes
| Operation Type | Batch Size | Delay Between Batches | Reasoning |
|---|---|---|---|
| Read (list_tasks) | 50-100 | None | Lightweight, use pagination |
| Get (get_task) | 20-30 | 100ms | Individual lookups, moderate |
| Create (create_task) | 10-20 | 500ms | Write operations, validation |
| Update (update_task) | 10-20 | 500ms | Write operations, validation |
| Delete (delete_task) | 5-10 | 1000ms | Expensive, cascade logic |
Basic Batching Pattern
Simple Batch Loop
import time
def batch_process(items, batch_size=10, delay=0.5):
"""Process items in batches with delay between batches."""
results = []
for i in range(0, len(items), batch_size):
batch = items[i:i+batch_size]
print(f"Processing batch {i//batch_size + 1}/{(len(items)-1)//batch_size + 1}")
for item in batch:
result = process_item(item)
results.append(result)
# Delay between batches (except after last batch)
if i + batch_size < len(items):
time.sleep(delay)
return resultsBatching Strategies
1. Sequential Batching
Process batches one after another:
Example: Bulk Task Creation
def create_tasks_batch(task_data_list):
"""Create multiple tasks in batches."""
BATCH_SIZE = 10
DELAY = 0.5 # 500ms between batches
created_tasks = []
for i in range(0, len(task_data_list), BATCH_SIZE):
batch = task_data_list[i:i+BATCH_SIZE]
for task_data in batch:
try:
task = create_task(
title=task_data['title'],
project_id=task_data['project_id'],
description=task_data.get('description')
)
created_tasks.append(task)
print(f"✓ Created: {task_data['title']}")
except Exception as e:
print(f"✗ Failed: {task_data['title']} - {e}")
time.sleep(DELAY)
return created_tasks2. Parallel Batching
Process items within a batch concurrently (respecting concurrent limits):
Example: Concurrent Updates
import asyncio
async def update_tasks_parallel(task_updates):
"""Update tasks with controlled concurrency."""
BATCH_SIZE = 10
MAX_CONCURRENT = 5 # Don't exceed rate limit
DELAY_BETWEEN_BATCHES = 1.0
semaphore = asyncio.Semaphore(MAX_CONCURRENT)
async def update_with_semaphore(task_id, updates):
async with semaphore:
return await update_task_async(task_id, **updates)
for i in range(0, len(task_updates), BATCH_SIZE):
batch = task_updates[i:i+BATCH_SIZE]
tasks = [
update_with_semaphore(item['id'], item['updates'])
for item in batch
]
results = await asyncio.gather(*tasks, return_exceptions=True)
await asyncio.sleep(DELAY_BETWEEN_BATCHES)
return results3. Progressive Batching
Adjust batch size based on success rate:
Adaptive Batch Size
def adaptive_batch_process(items):
"""Adjust batch size based on errors."""
batch_size = 20 # Start optimistic
min_batch = 5
max_batch = 50
i = 0
while i < len(items):
batch = items[i:i+batch_size]
errors = 0
for item in batch:
try:
process_item(item)
except Exception:
errors += 1
# Adjust batch size based on error rate
if errors > len(batch) * 0.2: # >20% errors
batch_size = max(min_batch, batch_size // 2)
elif errors == 0: # No errors
batch_size = min(max_batch, batch_size + 5)
i += len(batch)
time.sleep(0.5)Error Handling
Robust Error Handling Pattern
def batch_with_retry(items, max_retries=3):
"""Batch processing with retry logic."""
results = []
failed = []
for i in range(0, len(items), 10):
batch = items[i:i+10]
for item in batch:
retry_count = 0
while retry_count < max_retries:
try:
result = process_item(item)
results.append({'item': item, 'result': result, 'success': True})
break # Success, exit retry loop
except RateLimitError:
# Rate limited - exponential backoff
wait_time = 2 ** retry_count
time.sleep(wait_time)
retry_count += 1
except ValidationError as e:
# Validation error - don't retry
failed.append({'item': item, 'error': str(e)})
break
except Exception as e:
# Other error - retry
retry_count += 1
if retry_count >= max_retries:
failed.append({'item': item, 'error': str(e)})
time.sleep(0.5) # Delay between batches
return {'success': results, 'failed': failed}Progress Tracking
Track Batch Progress
def batch_with_progress(items, batch_size=10):
"""Batch processing with progress indicator."""
total_batches = (len(items) - 1) // batch_size + 1
processed = 0
for batch_num in range(total_batches):
start_idx = batch_num * batch_size
batch = items[start_idx:start_idx+batch_size]
print(f"\nBatch {batch_num + 1}/{total_batches}")
print(f"Progress: {processed}/{len(items)} items")
for item in batch:
process_item(item)
processed += 1
# Progress bar
percent = (processed / len(items)) * 100
bar = '█' * int(percent / 2) + '░' * (50 - int(percent / 2))
print(f"\r[{bar}] {percent:.1f}%", end='')
time.sleep(0.5)
print(f"\n\nCompleted: {processed}/{len(items)} items")Real-World Examples
Example 1: Sprint Planning
Create 50 Tasks from Spec
# From meeting notes, create sprint tasks
tasks_to_create = [
{"title": "Setup API endpoints", "project_id": 123},
{"title": "Design database schema", "project_id": 123},
# ... 48 more tasks
]
# Batch create with 10 tasks per batch
created = create_tasks_batch(tasks_to_create)
print(f"Created {len(created)} tasks")Example 2: Status Update
Mark 100 Completed Tasks as Done
# Get all in_progress tasks
tasks = list_tasks(status="in_progress", limit=100)
# Batch update to "done"
for i in range(0, len(tasks), 10):
batch = tasks[i:i+10]
for task in batch:
update_task(task['id'], status="done")
time.sleep(0.5) # 500ms between batchesBest Practices
Batching Guidelines
- Start conservative: Begin with smaller batches, increase if stable
- Add delays: Always add delay between batches to avoid rate limits
- Handle errors: Implement retry logic and collect failures
- Track progress: Log batch numbers and success/failure counts
- Respect limits: Stay under concurrent request limits
- Test first: Try with 5-10 items before running full batch
- Monitor responses: Watch for rate limit headers and slow responses
