Corcava logoDas einzige Business-Tool, das Sie brauchenCorcava
Menü

MCP-Batching-Strategien: Effiziente Stapeloperationen

MCP-Batching für Massenerstellung, -updates und -abfragen von Aufgaben. Optimale Batch-Größen, Fehlerbehandlungsmuster und Performance-Strategien.

Warum Stapeloperationen?

Stapelverarbeitung verbessert die Performance und hilft, Rate Limits zu vermeiden, wenn Sie mit vielen Aufgaben oder Projekten arbeiten. Statt hunderter einzelner API-Aufrufe werden die Operationen in Gruppen gebündelt.

Vorteile

  • Rate Limits (429-Fehler) vermeiden
  • Schnellere Gesamtausführung
  • Bessere Fehlerbehandlung
  • Vorhersehbarere Performance
  • Einfacheres Fortschritts-Tracking

Anwendungsfälle

  • Massenanlage von Aufgaben aus Specs
  • Status-Updates für mehrere Aufgaben
  • Stapel-Bereinigung von Titel/Beschreibung
  • Projektmigration
  • Berichte über viele Aufgaben

Empfohlene Batch-Größen

Operationstyp Batch-Größe Pause zwischen Batches Begründung
Lesen (list_tasks) 50-100 Keine Leichtgewichtig, Pagination nutzen
Abrufen (get_task) 20-30 100ms Einzelabfragen, moderat
Anlegen (create_task) 10-20 500ms Schreiboperationen, Validierung
Aktualisieren (update_task) 10-20 500ms Schreiboperationen, Validierung
Löschen (delete_task) 5-10 1000ms Aufwändig, Kaskaden-Logik

Grundmuster Batching

Einfache Batch-Schleife

import time

def batch_process(items, batch_size=10, delay=0.5):
    """Elemente in Stapeln mit Verzögerung zwischen den Stapeln verarbeiten."""
    results = []
    
    for i in range(0, len(items), batch_size):
        batch = items[i:i+batch_size]
        print(f"Stapel {i//batch_size + 1}/{(len(items)-1)//batch_size + 1} wird verarbeitet")
        
        for item in batch:
            result = process_item(item)
            results.append(result)
        
        # Verzögerung zwischen Stapeln (außer nach dem letzten)
        if i + batch_size < len(items):
            time.sleep(delay)
    
    return results

Batching-Strategien

1. Sequenzielles Batching

Batches nacheinander abarbeiten:

Beispiel: Stapel-Aufgabenerstellung

def create_tasks_batch(task_data_list):
    """Mehrere Aufgaben in Stapeln anlegen."""
    BATCH_SIZE = 10
    DELAY = 0.5  # 500 ms zwischen Stapeln
    created_tasks = []
    
    for i in range(0, len(task_data_list), BATCH_SIZE):
        batch = task_data_list[i:i+BATCH_SIZE]
        
        for task_data in batch:
            try:
                task = create_task(
                    title=task_data['title'],
                    project_id=task_data['project_id'],
                    description=task_data.get('description')
                )
                created_tasks.append(task)
                print(f"✓ Erstellt: {task_data['title']}")
            except Exception as e:
                print(f"✗ Fehlgeschlagen: {task_data['title']} - {e}")
        
        time.sleep(DELAY)
    
    return created_tasks

2. Paralleles Batching

Elemente innerhalb eines Batches parallel verarbeiten (unter Einhaltung der Parallelitätslimits):

Beispiel: Parallele Updates

import asyncio

async def update_tasks_parallel(task_updates):
    """Aufgaben mit kontrollierter Parallelität aktualisieren."""
    BATCH_SIZE = 10
    MAX_CONCURRENT = 5  # Rate-Limit nicht überschreiten
    DELAY_BETWEEN_BATCHES = 1.0
    
    semaphore = asyncio.Semaphore(MAX_CONCURRENT)
    
    async def update_with_semaphore(task_id, updates):
        async with semaphore:
            return await update_task_async(task_id, **updates)
    
    for i in range(0, len(task_updates), BATCH_SIZE):
        batch = task_updates[i:i+BATCH_SIZE]
        
        tasks = [
            update_with_semaphore(item['id'], item['updates'])
            for item in batch
        ]
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        await asyncio.sleep(DELAY_BETWEEN_BATCHES)
    
    return results

3. Progressives Batching

Batch-Größe anhand der Erfolgsrate anpassen:

Adaptive Batch-Größe

def adaptive_batch_process(items):
    """Stapelgröße anhand der Fehlerrate anpassen."""
    batch_size = 20  # Optimistisch starten
    min_batch = 5
    max_batch = 50
    
    i = 0
    while i < len(items):
        batch = items[i:i+batch_size]
        errors = 0
        
        for item in batch:
            try:
                process_item(item)
            except Exception:
                errors += 1
        
        # Stapelgröße anhand der Fehlerrate anpassen
        if errors > len(batch) * 0.2:  # >20 % Fehler
            batch_size = max(min_batch, batch_size // 2)
        elif errors == 0:  # Keine Fehler
            batch_size = min(max_batch, batch_size + 5)
        
        i += len(batch)
        time.sleep(0.5)

Fehlerbehandlung

Robustes Fehlerbehandlungs-Muster

def batch_with_retry(items, max_retries=3):
    """Stapelverarbeitung mit Wiederholungslogik."""
    results = []
    failed = []
    
    for i in range(0, len(items), 10):
        batch = items[i:i+10]
        
        for item in batch:
            retry_count = 0
            
            while retry_count < max_retries:
                try:
                    result = process_item(item)
                    results.append({'item': item, 'result': result, 'success': True})
                    break  # Erfolg, Retry-Schleife verlassen
                    
                except RateLimitError:
                    # Rate-Limit – exponentielle Wartezeit
                    wait_time = 2 ** retry_count
                    time.sleep(wait_time)
                    retry_count += 1
                    
                except ValidationError as e:
                    # Validierungsfehler – nicht erneut versuchen
                    failed.append({'item': item, 'error': str(e)})
                    break
                    
                except Exception as e:
                    # Anderer Fehler – erneut versuchen
                    retry_count += 1
                    if retry_count >= max_retries:
                        failed.append({'item': item, 'error': str(e)})
        
        time.sleep(0.5)  # Verzögerung zwischen Stapeln
    
    return {'success': results, 'failed': failed}

Fortschrittsverfolgung

Batch-Fortschritt verfolgen

def batch_with_progress(items, batch_size=10):
    """Stapelverarbeitung mit Fortschrittsanzeige."""
    total_batches = (len(items) - 1) // batch_size + 1
    processed = 0
    
    for batch_num in range(total_batches):
        start_idx = batch_num * batch_size
        batch = items[start_idx:start_idx+batch_size]
        
        print(f"\nStapel {batch_num + 1}/{total_batches}")
        print(f"Fortschritt: {processed}/{len(items)} Elemente")
        
        for item in batch:
            process_item(item)
            processed += 1
            
            # Fortschrittsbalken
            percent = (processed / len(items)) * 100
            bar = '█' * int(percent / 2) + '░' * (50 - int(percent / 2))
            print(f"\r[{bar}] {percent:.1f}%", end='')
        
        time.sleep(0.5)
    
    print(f"\n\nAbgeschlossen: {processed}/{len(items)} Elemente")

Praktische Beispiele

Beispiel 1: Sprint-Planung

50 Aufgaben aus Spec anlegen

# Aus Besprechungsnotizen Sprint-Aufgaben anlegen
tasks_to_create = [
    {"title": "API-Endpoints einrichten", "project_id": 123},
    {"title": "Datenbankschema entwerfen", "project_id": 123},
    # ... 48 weitere Aufgaben
]

# Stapel mit 10 Aufgaben pro Stapel erstellen
created = create_tasks_batch(tasks_to_create)
print(f"Erstellt: {len(created)} Aufgaben")

Beispiel 2: Status-Update

100 erledigte Aufgaben als „Done“ markieren

# Alle Aufgaben mit Status in_progress laden
tasks = list_tasks(status="in_progress", limit=100)

# Stapelweise auf „done“ setzen
for i in range(0, len(tasks), 10):
    batch = tasks[i:i+10]
    
    for task in batch:
        update_task(task['id'], status="done")
    
    time.sleep(0.5)  # 500 ms zwischen Stapeln

Bewährte Praktiken

Batching-Richtlinien

  • Konservativ starten: Mit kleineren Batches beginnen, bei Stabilität erhöhen
  • Verzögerungen einbauen: Immer Pause zwischen Batches, um Rate Limits zu vermeiden
  • Fehler behandeln: Retry-Logik implementieren und Fehlschläge sammeln
  • Fortschritt protokollieren: Batch-Nummern und Erfolgs-/Fehlanzahl loggen
  • Limits einhalten: Unter den gleichzeitigen Request-Limits bleiben
  • Zuerst testen: Mit 5–10 Einträgen testen, bevor der volle Stapel läuft
  • Antworten überwachen: Auf Rate-Limit-Header und langsame Antworten achten

Weitere Ressourcen