Skip to content

vllm.entrypoints.openai.responses.api_router

logger module-attribute

logger = init_logger(__name__)

router module-attribute

router = APIRouter()

_convert_stream_to_sse_events async

_convert_stream_to_sse_events(
    generator: AsyncGenerator[
        StreamingResponsesResponse, None
    ],
) -> AsyncGenerator[str, None]

Convert the generator to a stream of events in SSE format

Source code in vllm/entrypoints/openai/responses/api_router.py
async def _convert_stream_to_sse_events(
    generator: AsyncGenerator[StreamingResponsesResponse, None],
) -> AsyncGenerator[str, None]:
    """Convert the generator to a stream of events in SSE format"""
    async for event in generator:
        event_type = getattr(event, "type", "unknown")
        # https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#event_stream_format
        event_data = (
            f"event: {event_type}\ndata: {event.model_dump_json(indent=None)}\n\n"
        )
        yield event_data

attach_router

attach_router(app: FastAPI)
Source code in vllm/entrypoints/openai/responses/api_router.py
def attach_router(app: FastAPI):
    app.include_router(router)

cancel_responses async

cancel_responses(response_id: str, raw_request: Request)
Source code in vllm/entrypoints/openai/responses/api_router.py
@router.post("/v1/responses/{response_id}/cancel")
@load_aware_call
async def cancel_responses(response_id: str, raw_request: Request):
    handler = responses(raw_request)
    if handler is None:
        base_server = raw_request.app.state.openai_serving_tokenization
        return base_server.create_error_response(
            message="The model does not support Responses API"
        )

    try:
        response = await handler.cancel_responses(response_id)
    except Exception as e:
        return handler.create_error_response(e)

    if isinstance(response, ErrorResponse):
        return JSONResponse(
            content=response.model_dump(), status_code=response.error.code
        )
    return JSONResponse(content=response.model_dump())

create_responses async

create_responses(
    request: ResponsesRequest, raw_request: Request
)
Source code in vllm/entrypoints/openai/responses/api_router.py
@router.post(
    "/v1/responses",
    dependencies=[Depends(validate_json_request)],
    responses={
        HTTPStatus.OK.value: {"content": {"text/event-stream": {}}},
        HTTPStatus.BAD_REQUEST.value: {"model": ErrorResponse},
        HTTPStatus.NOT_FOUND.value: {"model": ErrorResponse},
        HTTPStatus.INTERNAL_SERVER_ERROR.value: {"model": ErrorResponse},
    },
)
@with_cancellation
@load_aware_call
async def create_responses(request: ResponsesRequest, raw_request: Request):
    handler = responses(raw_request)
    if handler is None:
        base_server = raw_request.app.state.openai_serving_tokenization
        return base_server.create_error_response(
            message="The model does not support Responses API"
        )
    try:
        generator = await handler.create_responses(request, raw_request)
    except Exception as e:
        return handler.create_error_response(e)

    if isinstance(generator, ErrorResponse):
        return JSONResponse(
            content=generator.model_dump(), status_code=generator.error.code
        )
    elif isinstance(generator, ResponsesResponse):
        return JSONResponse(content=generator.model_dump())

    return StreamingResponse(
        content=_convert_stream_to_sse_events(generator), media_type="text/event-stream"
    )

responses

responses(
    request: Request,
) -> OpenAIServingResponses | None
Source code in vllm/entrypoints/openai/responses/api_router.py
def responses(request: Request) -> OpenAIServingResponses | None:
    return request.app.state.openai_serving_responses

retrieve_responses async

retrieve_responses(
    response_id: str,
    raw_request: Request,
    starting_after: int | None = None,
    stream: bool | None = False,
)
Source code in vllm/entrypoints/openai/responses/api_router.py
@router.get("/v1/responses/{response_id}")
@load_aware_call
async def retrieve_responses(
    response_id: str,
    raw_request: Request,
    starting_after: int | None = None,
    stream: bool | None = False,
):
    handler = responses(raw_request)
    if handler is None:
        base_server = raw_request.app.state.openai_serving_tokenization
        return base_server.create_error_response(
            message="The model does not support Responses API"
        )

    try:
        response = await handler.retrieve_responses(
            response_id,
            starting_after=starting_after,
            stream=stream,
        )
    except Exception as e:
        return handler.create_error_response(e)

    if isinstance(response, ErrorResponse):
        return JSONResponse(
            content=response.model_dump(), status_code=response.error.code
        )
    elif isinstance(response, ResponsesResponse):
        return JSONResponse(content=response.model_dump())
    return StreamingResponse(
        content=_convert_stream_to_sse_events(response), media_type="text/event-stream"
    )