Skip to content

vllm.v1.worker.gpu.kv_connector

NO_OP_KV_CONNECTOR module-attribute

NO_OP_KV_CONNECTOR = KVConnector()

ActiveKVConnector

Bases: KVConnector

Source code in vllm/v1/worker/gpu/kv_connector.py
class ActiveKVConnector(KVConnector):
    def __init__(
        self, vllm_config: VllmConfig, kv_caches_dict: dict[str, torch.Tensor]
    ):
        self.vllm_config = vllm_config
        self.kv_connector = get_kv_transfer_group()
        # Register kv caches with KV Connector if applicable.
        # TODO: support cross_layers_kv_cache
        # (see https://github.com/vllm-project/vllm/pull/27743)
        self.kv_connector.register_kv_caches(kv_caches_dict)
        self.kv_connector.set_host_xfer_buffer_ops(copy_kv_blocks)

        self._disabled = False

    def pre_forward(self, scheduler_output: "SchedulerOutput") -> None:
        if self._disabled:
            return

        if scheduler_output.preempted_req_ids:
            self.kv_connector.handle_preemptions(scheduler_output.preempted_req_ids)
        assert scheduler_output.kv_connector_metadata is not None
        self.kv_connector.bind_connector_metadata(
            scheduler_output.kv_connector_metadata
        )
        # TODO: sort out KV Connectors' use of forward_context
        if is_forward_context_available():
            self.kv_connector.start_load_kv(get_forward_context())
        else:
            with set_forward_context(None, self.vllm_config):
                self.kv_connector.start_load_kv(get_forward_context())

    def post_forward(
        self, scheduler_output: "SchedulerOutput", wait_for_save: bool = True
    ) -> KVConnectorOutput | None:
        if self._disabled:
            return None

        output = KVConnectorOutput()
        if wait_for_save:
            self.kv_connector.wait_for_save()
        output.finished_sending, output.finished_recving = (
            self.kv_connector.get_finished(scheduler_output.finished_req_ids)
        )
        output.invalid_block_ids = self.kv_connector.get_block_ids_with_load_errors()
        output.kv_connector_stats = self.kv_connector.get_kv_connector_stats()
        output.kv_cache_events = self.kv_connector.get_kv_connector_kv_cache_events()
        self.kv_connector.clear_connector_metadata()
        return output

    def no_forward(self, scheduler_output: "SchedulerOutput") -> ModelRunnerOutput:
        if self._disabled:
            return EMPTY_MODEL_RUNNER_OUTPUT

        self.pre_forward(scheduler_output)
        kv_connector_output = self.post_forward(scheduler_output, wait_for_save=False)
        if kv_connector_output is None or kv_connector_output.is_empty():
            return EMPTY_MODEL_RUNNER_OUTPUT
        output = copy.copy(EMPTY_MODEL_RUNNER_OUTPUT)
        output.kv_connector_output = kv_connector_output
        return output

    def set_disabled(self, disabled: bool) -> None:
        # Ensure that layer-wise connector hooks aren't called when disabled.
        kv_transfer_state._KV_CONNECTOR_AGENT = None if disabled else self.kv_connector
        self._disabled = disabled

_disabled instance-attribute

_disabled = False

kv_connector instance-attribute

kv_connector = get_kv_transfer_group()

vllm_config instance-attribute

vllm_config = vllm_config

__init__

__init__(
    vllm_config: VllmConfig,
    kv_caches_dict: dict[str, Tensor],
)
Source code in vllm/v1/worker/gpu/kv_connector.py
def __init__(
    self, vllm_config: VllmConfig, kv_caches_dict: dict[str, torch.Tensor]
):
    self.vllm_config = vllm_config
    self.kv_connector = get_kv_transfer_group()
    # Register kv caches with KV Connector if applicable.
    # TODO: support cross_layers_kv_cache
    # (see https://github.com/vllm-project/vllm/pull/27743)
    self.kv_connector.register_kv_caches(kv_caches_dict)
    self.kv_connector.set_host_xfer_buffer_ops(copy_kv_blocks)

    self._disabled = False

no_forward

no_forward(
    scheduler_output: SchedulerOutput,
) -> ModelRunnerOutput
Source code in vllm/v1/worker/gpu/kv_connector.py
def no_forward(self, scheduler_output: "SchedulerOutput") -> ModelRunnerOutput:
    if self._disabled:
        return EMPTY_MODEL_RUNNER_OUTPUT

    self.pre_forward(scheduler_output)
    kv_connector_output = self.post_forward(scheduler_output, wait_for_save=False)
    if kv_connector_output is None or kv_connector_output.is_empty():
        return EMPTY_MODEL_RUNNER_OUTPUT
    output = copy.copy(EMPTY_MODEL_RUNNER_OUTPUT)
    output.kv_connector_output = kv_connector_output
    return output

post_forward

post_forward(
    scheduler_output: SchedulerOutput,
    wait_for_save: bool = True,
) -> KVConnectorOutput | None
Source code in vllm/v1/worker/gpu/kv_connector.py
def post_forward(
    self, scheduler_output: "SchedulerOutput", wait_for_save: bool = True
) -> KVConnectorOutput | None:
    if self._disabled:
        return None

    output = KVConnectorOutput()
    if wait_for_save:
        self.kv_connector.wait_for_save()
    output.finished_sending, output.finished_recving = (
        self.kv_connector.get_finished(scheduler_output.finished_req_ids)
    )
    output.invalid_block_ids = self.kv_connector.get_block_ids_with_load_errors()
    output.kv_connector_stats = self.kv_connector.get_kv_connector_stats()
    output.kv_cache_events = self.kv_connector.get_kv_connector_kv_cache_events()
    self.kv_connector.clear_connector_metadata()
    return output

pre_forward

pre_forward(scheduler_output: SchedulerOutput) -> None
Source code in vllm/v1/worker/gpu/kv_connector.py
def pre_forward(self, scheduler_output: "SchedulerOutput") -> None:
    if self._disabled:
        return

    if scheduler_output.preempted_req_ids:
        self.kv_connector.handle_preemptions(scheduler_output.preempted_req_ids)
    assert scheduler_output.kv_connector_metadata is not None
    self.kv_connector.bind_connector_metadata(
        scheduler_output.kv_connector_metadata
    )
    # TODO: sort out KV Connectors' use of forward_context
    if is_forward_context_available():
        self.kv_connector.start_load_kv(get_forward_context())
    else:
        with set_forward_context(None, self.vllm_config):
            self.kv_connector.start_load_kv(get_forward_context())

set_disabled

set_disabled(disabled: bool) -> None
Source code in vllm/v1/worker/gpu/kv_connector.py
def set_disabled(self, disabled: bool) -> None:
    # Ensure that layer-wise connector hooks aren't called when disabled.
    kv_transfer_state._KV_CONNECTOR_AGENT = None if disabled else self.kv_connector
    self._disabled = disabled

KVConnector

KVConnector interface used by GPUModelRunner.

Source code in vllm/v1/worker/gpu/kv_connector.py
class KVConnector:
    """KVConnector interface used by GPUModelRunner."""

    def pre_forward(self, scheduler_output: "SchedulerOutput") -> None:
        pass

    def post_forward(
        self, scheduler_output: "SchedulerOutput", wait_for_save: bool = True
    ) -> KVConnectorOutput | None:
        return None

    def no_forward(self, scheduler_output: "SchedulerOutput") -> ModelRunnerOutput:
        return EMPTY_MODEL_RUNNER_OUTPUT

    def set_disabled(self, disabled: bool) -> None:
        pass

no_forward

no_forward(
    scheduler_output: SchedulerOutput,
) -> ModelRunnerOutput
Source code in vllm/v1/worker/gpu/kv_connector.py
def no_forward(self, scheduler_output: "SchedulerOutput") -> ModelRunnerOutput:
    return EMPTY_MODEL_RUNNER_OUTPUT

post_forward

post_forward(
    scheduler_output: SchedulerOutput,
    wait_for_save: bool = True,
) -> KVConnectorOutput | None
Source code in vllm/v1/worker/gpu/kv_connector.py
def post_forward(
    self, scheduler_output: "SchedulerOutput", wait_for_save: bool = True
) -> KVConnectorOutput | None:
    return None

pre_forward

pre_forward(scheduler_output: SchedulerOutput) -> None
Source code in vllm/v1/worker/gpu/kv_connector.py
def pre_forward(self, scheduler_output: "SchedulerOutput") -> None:
    pass

set_disabled

set_disabled(disabled: bool) -> None
Source code in vllm/v1/worker/gpu/kv_connector.py
def set_disabled(self, disabled: bool) -> None:
    pass

get_kv_connector

get_kv_connector(
    vllm_config: VllmConfig,
    kv_caches_dict: dict[str, Tensor],
) -> KVConnector
Source code in vllm/v1/worker/gpu/kv_connector.py
def get_kv_connector(
    vllm_config: VllmConfig, kv_caches_dict: dict[str, torch.Tensor]
) -> KVConnector:
    if not has_kv_transfer_group():
        # No-op connector.
        return NO_OP_KV_CONNECTOR

    return ActiveKVConnector(vllm_config, kv_caches_dict)