Skip to content

Grok Client

The Grok client provides access to xAI's Grok models. It inherits from OpenAIClient since Grok uses an OpenAI-compatible API.

Configuration

from padwan_llm.grok import GrokClient

client = GrokClient(
    api_key="...",      # or set GROK_API_KEY env var
    model="grok-3",     # default model
)

Usage

Basic Chat

from padwan_llm.conversation import Message

async with GrokClient() as client:
    response, usage = await client.complete_chat([
        Message(role="user", content="Hello!")
    ])
    print(response["content"])

Streaming

from padwan_llm.conversation import Message

async with GrokClient() as client:
    stream = client.stream_chat([
        Message(role="user", content="Tell me a story")
    ])
    async for chunk in stream:
        print(chunk, end="")

With System Prompt

from padwan_llm import ConversationState

state = ConversationState(system="You are a helpful assistant.")
state.add_user_message("Hello!")

async with GrokClient() as client:
    response, usage = await client.complete_chat(state.messages)
    state.add_assistant_message(response["content"])
    state.accumulate_usage(usage)

Batch API

Grok uses xAI's native batch API, which differs from OpenAI's file-upload approach. Requests are submitted directly via JSON and results are fetched per-batch.

Create a batch

from padwan_llm.grok import GrokClient, GrokBatchRequest

requests = [
    GrokBatchRequest(
        body={"messages": [{"role": "user", "content": "Summarize X"}]},
        custom_id="req-1",
    ),
    GrokBatchRequest(
        body={"messages": [{"role": "user", "content": "Summarize Y"}]},
        custom_id="req-2",
    ),
]

async with GrokClient() as client:
    job = await client.create_batch(requests, name="summaries")
    print(job.batch_id, job.num_requests)

Check status and fetch results

async with GrokClient() as client:
    job = await client.get_batch("batch-abc")

    if job.succeeded:
        results, _ = await client.get_batch_results(job.batch_id)
        for r in results:
            print(r.custom_id, r.content)

List and cancel batches

async with GrokClient() as client:
    jobs, next_token = await client.list_batches(limit=10)
    job = await client.cancel_batch("batch-abc")

Batch types reference

Type Fields
GrokBatchRequest Single request: body, custom_id
GrokBatchJob Job state: batch_id, name, num_requests, num_pending, num_success, num_error, num_cancelled, is_terminal, succeeded
GrokBatchResult Parsed result: custom_id, content, input_tokens, output_tokens, total_tokens, error_message

Method Outputs

response, usage = await client.complete_chat(messages)

stream = client.stream_chat(messages)
async for chunk in stream:
    ...
usage = stream.usage