MCP-Batching-Strategien: Effiziente Stapeloperationen
MCP-Batching für Massenerstellung, -updates und -abfragen von Aufgaben. Optimale Batch-Größen, Fehlerbehandlungsmuster und Performance-Strategien.
Warum Stapeloperationen?
Stapelverarbeitung verbessert die Performance und hilft, Rate Limits zu vermeiden, wenn Sie mit vielen Aufgaben oder Projekten arbeiten. Statt hunderter einzelner API-Aufrufe werden die Operationen in Gruppen gebündelt.
Vorteile
- Rate Limits (429-Fehler) vermeiden
- Schnellere Gesamtausführung
- Bessere Fehlerbehandlung
- Vorhersehbarere Performance
- Einfacheres Fortschritts-Tracking
Anwendungsfälle
- Massenanlage von Aufgaben aus Specs
- Status-Updates für mehrere Aufgaben
- Stapel-Bereinigung von Titel/Beschreibung
- Projektmigration
- Berichte über viele Aufgaben
Empfohlene Batch-Größen
| Operationstyp | Batch-Größe | Pause zwischen Batches | Begründung |
|---|---|---|---|
| Lesen (list_tasks) | 50-100 | Keine | Leichtgewichtig, Pagination nutzen |
| Abrufen (get_task) | 20-30 | 100ms | Einzelabfragen, moderat |
| Anlegen (create_task) | 10-20 | 500ms | Schreiboperationen, Validierung |
| Aktualisieren (update_task) | 10-20 | 500ms | Schreiboperationen, Validierung |
| Löschen (delete_task) | 5-10 | 1000ms | Aufwändig, Kaskaden-Logik |
Grundmuster Batching
Einfache Batch-Schleife
import time
def batch_process(items, batch_size=10, delay=0.5):
"""Elemente in Stapeln mit Verzögerung zwischen den Stapeln verarbeiten."""
results = []
for i in range(0, len(items), batch_size):
batch = items[i:i+batch_size]
print(f"Stapel {i//batch_size + 1}/{(len(items)-1)//batch_size + 1} wird verarbeitet")
for item in batch:
result = process_item(item)
results.append(result)
# Verzögerung zwischen Stapeln (außer nach dem letzten)
if i + batch_size < len(items):
time.sleep(delay)
return resultsBatching-Strategien
1. Sequenzielles Batching
Batches nacheinander abarbeiten:
Beispiel: Stapel-Aufgabenerstellung
def create_tasks_batch(task_data_list):
"""Mehrere Aufgaben in Stapeln anlegen."""
BATCH_SIZE = 10
DELAY = 0.5 # 500 ms zwischen Stapeln
created_tasks = []
for i in range(0, len(task_data_list), BATCH_SIZE):
batch = task_data_list[i:i+BATCH_SIZE]
for task_data in batch:
try:
task = create_task(
title=task_data['title'],
project_id=task_data['project_id'],
description=task_data.get('description')
)
created_tasks.append(task)
print(f"✓ Erstellt: {task_data['title']}")
except Exception as e:
print(f"✗ Fehlgeschlagen: {task_data['title']} - {e}")
time.sleep(DELAY)
return created_tasks2. Paralleles Batching
Elemente innerhalb eines Batches parallel verarbeiten (unter Einhaltung der Parallelitätslimits):
Beispiel: Parallele Updates
import asyncio
async def update_tasks_parallel(task_updates):
"""Aufgaben mit kontrollierter Parallelität aktualisieren."""
BATCH_SIZE = 10
MAX_CONCURRENT = 5 # Rate-Limit nicht überschreiten
DELAY_BETWEEN_BATCHES = 1.0
semaphore = asyncio.Semaphore(MAX_CONCURRENT)
async def update_with_semaphore(task_id, updates):
async with semaphore:
return await update_task_async(task_id, **updates)
for i in range(0, len(task_updates), BATCH_SIZE):
batch = task_updates[i:i+BATCH_SIZE]
tasks = [
update_with_semaphore(item['id'], item['updates'])
for item in batch
]
results = await asyncio.gather(*tasks, return_exceptions=True)
await asyncio.sleep(DELAY_BETWEEN_BATCHES)
return results3. Progressives Batching
Batch-Größe anhand der Erfolgsrate anpassen:
Adaptive Batch-Größe
def adaptive_batch_process(items):
"""Stapelgröße anhand der Fehlerrate anpassen."""
batch_size = 20 # Optimistisch starten
min_batch = 5
max_batch = 50
i = 0
while i < len(items):
batch = items[i:i+batch_size]
errors = 0
for item in batch:
try:
process_item(item)
except Exception:
errors += 1
# Stapelgröße anhand der Fehlerrate anpassen
if errors > len(batch) * 0.2: # >20 % Fehler
batch_size = max(min_batch, batch_size // 2)
elif errors == 0: # Keine Fehler
batch_size = min(max_batch, batch_size + 5)
i += len(batch)
time.sleep(0.5)Fehlerbehandlung
Robustes Fehlerbehandlungs-Muster
def batch_with_retry(items, max_retries=3):
"""Stapelverarbeitung mit Wiederholungslogik."""
results = []
failed = []
for i in range(0, len(items), 10):
batch = items[i:i+10]
for item in batch:
retry_count = 0
while retry_count < max_retries:
try:
result = process_item(item)
results.append({'item': item, 'result': result, 'success': True})
break # Erfolg, Retry-Schleife verlassen
except RateLimitError:
# Rate-Limit – exponentielle Wartezeit
wait_time = 2 ** retry_count
time.sleep(wait_time)
retry_count += 1
except ValidationError as e:
# Validierungsfehler – nicht erneut versuchen
failed.append({'item': item, 'error': str(e)})
break
except Exception as e:
# Anderer Fehler – erneut versuchen
retry_count += 1
if retry_count >= max_retries:
failed.append({'item': item, 'error': str(e)})
time.sleep(0.5) # Verzögerung zwischen Stapeln
return {'success': results, 'failed': failed}Fortschrittsverfolgung
Batch-Fortschritt verfolgen
def batch_with_progress(items, batch_size=10):
"""Stapelverarbeitung mit Fortschrittsanzeige."""
total_batches = (len(items) - 1) // batch_size + 1
processed = 0
for batch_num in range(total_batches):
start_idx = batch_num * batch_size
batch = items[start_idx:start_idx+batch_size]
print(f"\nStapel {batch_num + 1}/{total_batches}")
print(f"Fortschritt: {processed}/{len(items)} Elemente")
for item in batch:
process_item(item)
processed += 1
# Fortschrittsbalken
percent = (processed / len(items)) * 100
bar = '█' * int(percent / 2) + '░' * (50 - int(percent / 2))
print(f"\r[{bar}] {percent:.1f}%", end='')
time.sleep(0.5)
print(f"\n\nAbgeschlossen: {processed}/{len(items)} Elemente")Praktische Beispiele
Beispiel 1: Sprint-Planung
50 Aufgaben aus Spec anlegen
# Aus Besprechungsnotizen Sprint-Aufgaben anlegen
tasks_to_create = [
{"title": "API-Endpoints einrichten", "project_id": 123},
{"title": "Datenbankschema entwerfen", "project_id": 123},
# ... 48 weitere Aufgaben
]
# Stapel mit 10 Aufgaben pro Stapel erstellen
created = create_tasks_batch(tasks_to_create)
print(f"Erstellt: {len(created)} Aufgaben")Beispiel 2: Status-Update
100 erledigte Aufgaben als „Done“ markieren
# Alle Aufgaben mit Status in_progress laden
tasks = list_tasks(status="in_progress", limit=100)
# Stapelweise auf „done“ setzen
for i in range(0, len(tasks), 10):
batch = tasks[i:i+10]
for task in batch:
update_task(task['id'], status="done")
time.sleep(0.5) # 500 ms zwischen StapelnBewährte Praktiken
Batching-Richtlinien
- Konservativ starten: Mit kleineren Batches beginnen, bei Stabilität erhöhen
- Verzögerungen einbauen: Immer Pause zwischen Batches, um Rate Limits zu vermeiden
- Fehler behandeln: Retry-Logik implementieren und Fehlschläge sammeln
- Fortschritt protokollieren: Batch-Nummern und Erfolgs-/Fehlanzahl loggen
- Limits einhalten: Unter den gleichzeitigen Request-Limits bleiben
- Zuerst testen: Mit 5–10 Einträgen testen, bevor der volle Stapel läuft
- Antworten überwachen: Auf Rate-Limit-Header und langsame Antworten achten
