Stratégies de batching MCP : opérations en masse efficaces
Maîtrisez les techniques de batching MCP pour la création, les mises à jour et les requêtes en masse. Tailles de lots optimales, gestion des erreurs et stratégies d'optimisation.
Pourquoi regrouper les opérations ?
Le batching améliore les perfs et évite les limites de débit avec de nombreuses tâches ou projets. Au lieu de centaines d'appels API individuels, regroupez-les.
Avantages
- Éviter les limites de débit (429)
- Exécution plus rapide
- Meilleure gestion des erreurs
- Performances plus prévisibles
- Suivi de progression plus simple
Cas d'usage
- Création en masse de tâches depuis des specs
- Mises à jour de statut pour plusieurs tâches
- Nettoyage titre/description par lots
- Migration de projet
- Reporting sur de nombreuses tâches
Tailles de lots recommandées
| Type d'opération | Taille du lot | Délai entre lots | Raison |
|---|---|---|---|
| Read (list_tasks) | 50-100 | Aucun | Léger, utiliser la pagination |
| Get (get_task) | 20-30 | 100ms | Lookups individuels, modéré |
| Create (create_task) | 10-20 | 500ms | Opérations d'écriture, validation |
| Update (update_task) | 10-20 | 500ms | Opérations d'écriture, validation |
| Delete (delete_task) | 5-10 | 1000ms | Coûteux, logique en cascade |
Pattern de batching de base
Boucle de lot simple
import time
def batch_process(items, batch_size=10, delay=0.5):
"""Process items in batches with delay between batches."""
results = []
for i in range(0, len(items), batch_size):
batch = items[i:i+batch_size]
print(f"Processing batch {i//batch_size + 1}/{(len(items)-1)//batch_size + 1}")
for item in batch:
result = process_item(item)
results.append(result)
# Delay between batches (except after last batch)
if i + batch_size < len(items):
time.sleep(delay)
return resultsBatching Strategies
1. Sequential Batching
Process batches one after another:
Example: Bulk Task Creation
def create_tasks_batch(task_data_list):
"""Create multiple tasks in batches."""
BATCH_SIZE = 10
DELAY = 0.5 # 500ms between batches
created_tasks = []
for i in range(0, len(task_data_list), BATCH_SIZE):
batch = task_data_list[i:i+BATCH_SIZE]
for task_data in batch:
try:
task = create_task(
title=task_data['title'],
project_id=task_data['project_id'],
description=task_data.get('description')
)
created_tasks.append(task)
print(f"✓ Created: {task_data['title']}")
except Exception as e:
print(f"✗ Failed: {task_data['title']} - {e}")
time.sleep(DELAY)
return created_tasks2. Parallel Batching
Process items within a batch concurrently (respecting concurrent limits):
Example: Concurrent Updates
import asyncio
async def update_tasks_parallel(task_updates):
"""Update tasks with controlled concurrency."""
BATCH_SIZE = 10
MAX_CONCURRENT = 5 # Don't exceed rate limit
DELAY_BETWEEN_BATCHES = 1.0
semaphore = asyncio.Semaphore(MAX_CONCURRENT)
async def update_with_semaphore(task_id, updates):
async with semaphore:
return await update_task_async(task_id, **updates)
for i in range(0, len(task_updates), BATCH_SIZE):
batch = task_updates[i:i+BATCH_SIZE]
tasks = [
update_with_semaphore(item['id'], item['updates'])
for item in batch
]
results = await asyncio.gather(*tasks, return_exceptions=True)
await asyncio.sleep(DELAY_BETWEEN_BATCHES)
return results3. Progressive Batching
Adjust batch size based on success rate:
Adaptive Batch Size
def adaptive_batch_process(items):
"""Adjust batch size based on errors."""
batch_size = 20 # Start optimistic
min_batch = 5
max_batch = 50
i = 0
while i < len(items):
batch = items[i:i+batch_size]
errors = 0
for item in batch:
try:
process_item(item)
except Exception:
errors += 1
# Adjust batch size based on error rate
if errors > len(batch) * 0.2: # >20% errors
batch_size = max(min_batch, batch_size // 2)
elif errors == 0: # No errors
batch_size = min(max_batch, batch_size + 5)
i += len(batch)
time.sleep(0.5)Error Handling
Robust Error Handling Pattern
def batch_with_retry(items, max_retries=3):
"""Batch processing with retry logic."""
results = []
failed = []
for i in range(0, len(items), 10):
batch = items[i:i+10]
for item in batch:
retry_count = 0
while retry_count < max_retries:
try:
result = process_item(item)
results.append({'item': item, 'result': result, 'success': True})
break # Success, exit retry loop
except RateLimitError:
# Rate limited - exponential backoff
wait_time = 2 ** retry_count
time.sleep(wait_time)
retry_count += 1
except ValidationError as e:
# Validation error - don't retry
failed.append({'item': item, 'error': str(e)})
break
except Exception as e:
# Other error - retry
retry_count += 1
if retry_count >= max_retries:
failed.append({'item': item, 'error': str(e)})
time.sleep(0.5) # Delay between batches
return {'success': results, 'failed': failed}Progress Tracking
Track Batch Progress
def batch_with_progress(items, batch_size=10):
"""Batch processing with progress indicator."""
total_batches = (len(items) - 1) // batch_size + 1
processed = 0
for batch_num in range(total_batches):
start_idx = batch_num * batch_size
batch = items[start_idx:start_idx+batch_size]
print(f"\nBatch {batch_num + 1}/{total_batches}")
print(f"Progress: {processed}/{len(items)} items")
for item in batch:
process_item(item)
processed += 1
# Progress bar
percent = (processed / len(items)) * 100
bar = '█' * int(percent / 2) + '░' * (50 - int(percent / 2))
print(f"\r[{bar}] {percent:.1f}%", end='')
time.sleep(0.5)
print(f"\n\nCompleted: {processed}/{len(items)} items")Real-World Examples
Example 1: Sprint Planning
Create 50 Tasks from Spec
# From meeting notes, create sprint tasks
tasks_to_create = [
{"title": "Setup API endpoints", "project_id": 123},
{"title": "Design database schema", "project_id": 123},
# ... 48 more tasks
]
# Batch create with 10 tasks per batch
created = create_tasks_batch(tasks_to_create)
print(f"Created {len(created)} tasks")Example 2: Status Update
Mark 100 Completed Tasks as Done
# Get all in_progress tasks
tasks = list_tasks(status="in_progress", limit=100)
# Batch update to "done"
for i in range(0, len(tasks), 10):
batch = tasks[i:i+10]
for task in batch:
update_task(task['id'], status="done")
time.sleep(0.5) # 500ms between batchesBest Practices
Batching Guidelines
- Start conservative: Begin with smaller batches, increase if stable
- Add delays: Always add delay between batches to avoid rate limits
- Handle errors: Implement retry logic and collect failures
- Track progress: Log batch numbers and success/failure counts
- Respect limits: Stay under concurrent request limits
- Test first: Try with 5-10 items before running full batch
- Monitor responses: Watch for rate limit headers and slow responses
