Skip to content

nos.server

nos.server

Docker Runtime

The docker runtime provides the docker-py interface to run containerized inference workloads using Docker. It allows starting and stopping containers, getting container information, and running the containers programmatically with HW support for accelerators like GPUs, ASICs etc.

nos.server._docker.DeviceRequest dataclass

Device request mappings for docker-py.

For the given key, we map to the corresponding docker.types.DeviceRequest or docker.types.Device object. This makes it easy to add new devices in the future.

Source code in nos/server/_docker.py
@dataclass
class DeviceRequest:
    """Device request mappings for docker-py.

    For the given key, we map to the corresponding
    `docker.types.DeviceRequest` or `docker.types.Device` object.
    This makes it easy to add new devices in the future.
    """

    configs = {
        "gpu": {
            "device_requests": [
                docker.types.DeviceRequest(
                    device_ids=["all"],
                    capabilities=[["gpu"]],
                )
            ],
        },
        "inf2": {
            "devices": ["/dev/neuron0:/dev/neuron0:rwm"],
        },
    }

    @classmethod
    def get(cls, device: str) -> Dict[str, Any]:
        """Get device request."""
        try:
            return cls.configs[device]
        except KeyError:
            raise ValueError(f"Invalid DeviceRequest: {device}")

get classmethod

get(device: str) -> Dict[str, Any]

Get device request.

Source code in nos/server/_docker.py
@classmethod
def get(cls, device: str) -> Dict[str, Any]:
    """Get device request."""
    try:
        return cls.configs[device]
    except KeyError:
        raise ValueError(f"Invalid DeviceRequest: {device}")

nos.server._docker.DockerRuntime dataclass

Docker runtime for running containerized inference workloads.

Source code in nos/server/_docker.py
@dataclass
class DockerRuntime:
    """
    Docker runtime for running containerized inference workloads.
    """

    _instance: "DockerRuntime" = None
    _client: docker.DockerClient = None

    def __init__(self):
        """Initialize DockerExecutor."""
        self._client = docker.from_env()

    @classmethod
    def get(cls: "DockerRuntime") -> "DockerRuntime":
        """Get DockerRuntime instance."""
        if cls._instance is None:
            cls._instance = cls()
        return cls._instance

    @classmethod
    def list(cls, **kwargs) -> Iterable[docker.models.containers.Container]:
        """List docker containers."""
        return cls.get()._client.containers.list(**kwargs)

    def start(
        self,
        image: str,
        command: Optional[Union[str, List[str]]] = None,
        name: str = None,
        device: Optional[str] = None,
        **kwargs: Any,
    ) -> docker.models.containers.Container:
        """Start docker container.

        Args:
            **kwargs: See https://docker-py.readthedocs.io/en/stable/containers.html#docker.models.containers.ContainerCollection.run
            ports (Optional[Dict[int, int]], optional): Port mapping. Defaults to None.
            environment (Optional[Dict[str, str]], optional): Environment variables. Defaults to None.
            volumes (Optional[Dict[str, str]], optional): Volume mapping. Defaults to None.
            shm_size (Optional[int], optional): Shared memory size. Defaults to None.
            detach (bool, optional): Whether to run the container in detached mode. Defaults to True.
            remove (bool, optional): Whether to remove the container when it exits. Defaults to True.
            device (bool, optional): Device to request (i.e. gpu, inf2). Defaults to None (i.e. cpu).

        Note (Non-standard arguments):
            gpu (bool): Whether to start the container with GPU support.

        """
        # Check if container is already running, raise error if it is
        if name and self.get_container(name) is not None:
            container = self.get_container(name)
            if container.status == "running":
                raise RuntimeError(f"Container with same name already running (name={name}).")
            else:
                logger.warning(f"Container with same name already exists, removing it (name={name}).")
                self.stop(name)

        # Validate kwargs before passing to `containers.run(...)`
        if "devices" in kwargs:
            raise ValueError("Use `device='inf2'` instead of `devices`.")
        if "device_requests" in kwargs:
            raise ValueError("Use `device='gpu'` instead of `device_requests`.")

        # Handle device requests (gpu=True, or inf2=True)
        if device is not None:
            assert device in DeviceRequest.configs, f"Invalid device: {device}, available: {DeviceRequest.configs}"
            device_kwargs = DeviceRequest.get(device)
            logger.debug(f"Adding device [device={device}, {device_kwargs}]")
            kwargs.update(device_kwargs)

        # Try starting the container, if it fails, remove it and try again
        logger.debug(f"Starting container: {name}")
        logger.debug(f"\timage: {image}")
        logger.debug(f"\tcommand: {command}")
        logger.debug(f"\tname: {name}")
        for k, v in kwargs.items():
            logger.debug(f"\t{k}: {v}")

        # Start container (pass through kwargs)
        try:
            container = self._client.containers.run(
                image,
                command=command,
                name=name,
                **kwargs,
            )
            logger.debug(f"Started container [name={name}, image={container.image}, id={container.id[:12]}]")
            logger.debug(f"Get logs using `docker logs -f {container.id[:12]}`")
        except (docker.errors.APIError, docker.errors.DockerException) as exc:
            self.stop(name)
            raise ServerException(f"Failed to start container [image={image}]", exc=exc)
        return container

    def stop(self, name: str, timeout: int = 30) -> docker.models.containers.Container:
        """Stop docker container."""
        try:
            container = self.get_container(name)
            if container is None:
                logger.debug(f"Container not running: {name}, exiting early.")
                return
            logger.debug(f"Removing container: [name={name}, image={container.image}, id={container.id[:12]}]")
            container.remove(force=True)
            logger.debug(f"Removed container: [name={name}, image={container.image}, id={container.id[:12]}]")
        except (docker.errors.APIError, docker.errors.DockerException) as exc:
            raise ServerException(f"Failed to stop container [name={name}]", exc=exc)
        return container

    def get_container_id(self, name: str) -> Optional[str]:
        """Get the runtime container ID."""
        container = self.get_container(name)
        return container.id if container else None

    def get_container(self, id_or_name: str) -> docker.models.containers.Container:
        """Get container by id or name."""
        try:
            return self._client.containers.get(id_or_name)
        except docker.errors.NotFound:
            return None

    def get_container_status(self, id_or_name: str) -> Optional[str]:
        """Get container status by id or name."""
        container = self.get_container(id_or_name)
        return container.status if container else None

    def get_container_logs(self, name: str, **kwargs) -> Iterable[str]:
        """Get container logs."""
        try:
            container = self.get_container(name)
            if container is None:
                return iter([])

            for line in container.logs(stream=True):
                yield line.decode("utf-8")
        except (docker.errors.APIError, docker.errors.DockerException) as exc:
            logger.error(f"Failed to get container logs: {exc}")
            raise ServerException("Failed to get container logs [name={name}]", exc=exc)

__init__

__init__()

Initialize DockerExecutor.

Source code in nos/server/_docker.py
def __init__(self):
    """Initialize DockerExecutor."""
    self._client = docker.from_env()

get classmethod

get() -> DockerRuntime

Get DockerRuntime instance.

Source code in nos/server/_docker.py
@classmethod
def get(cls: "DockerRuntime") -> "DockerRuntime":
    """Get DockerRuntime instance."""
    if cls._instance is None:
        cls._instance = cls()
    return cls._instance

list classmethod

list(**kwargs) -> Iterable[Container]

List docker containers.

Source code in nos/server/_docker.py
@classmethod
def list(cls, **kwargs) -> Iterable[docker.models.containers.Container]:
    """List docker containers."""
    return cls.get()._client.containers.list(**kwargs)

start

start(image: str, command: Optional[Union[str, List[str]]] = None, name: str = None, device: Optional[str] = None, **kwargs: Any) -> Container

Start docker container.

Parameters:

  • **kwargs (Any, default: {} ) –

    See https://docker-py.readthedocs.io/en/stable/containers.html#docker.models.containers.ContainerCollection.run

  • ports (Optional[Dict[int, int]]) –

    Port mapping. Defaults to None.

  • environment (Optional[Dict[str, str]]) –

    Environment variables. Defaults to None.

  • volumes (Optional[Dict[str, str]]) –

    Volume mapping. Defaults to None.

  • shm_size (Optional[int]) –

    Shared memory size. Defaults to None.

  • detach (bool) –

    Whether to run the container in detached mode. Defaults to True.

  • remove (bool) –

    Whether to remove the container when it exits. Defaults to True.

  • device (bool, default: None ) –

    Device to request (i.e. gpu, inf2). Defaults to None (i.e. cpu).

Note (Non-standard arguments): gpu (bool): Whether to start the container with GPU support.

Source code in nos/server/_docker.py
def start(
    self,
    image: str,
    command: Optional[Union[str, List[str]]] = None,
    name: str = None,
    device: Optional[str] = None,
    **kwargs: Any,
) -> docker.models.containers.Container:
    """Start docker container.

    Args:
        **kwargs: See https://docker-py.readthedocs.io/en/stable/containers.html#docker.models.containers.ContainerCollection.run
        ports (Optional[Dict[int, int]], optional): Port mapping. Defaults to None.
        environment (Optional[Dict[str, str]], optional): Environment variables. Defaults to None.
        volumes (Optional[Dict[str, str]], optional): Volume mapping. Defaults to None.
        shm_size (Optional[int], optional): Shared memory size. Defaults to None.
        detach (bool, optional): Whether to run the container in detached mode. Defaults to True.
        remove (bool, optional): Whether to remove the container when it exits. Defaults to True.
        device (bool, optional): Device to request (i.e. gpu, inf2). Defaults to None (i.e. cpu).

    Note (Non-standard arguments):
        gpu (bool): Whether to start the container with GPU support.

    """
    # Check if container is already running, raise error if it is
    if name and self.get_container(name) is not None:
        container = self.get_container(name)
        if container.status == "running":
            raise RuntimeError(f"Container with same name already running (name={name}).")
        else:
            logger.warning(f"Container with same name already exists, removing it (name={name}).")
            self.stop(name)

    # Validate kwargs before passing to `containers.run(...)`
    if "devices" in kwargs:
        raise ValueError("Use `device='inf2'` instead of `devices`.")
    if "device_requests" in kwargs:
        raise ValueError("Use `device='gpu'` instead of `device_requests`.")

    # Handle device requests (gpu=True, or inf2=True)
    if device is not None:
        assert device in DeviceRequest.configs, f"Invalid device: {device}, available: {DeviceRequest.configs}"
        device_kwargs = DeviceRequest.get(device)
        logger.debug(f"Adding device [device={device}, {device_kwargs}]")
        kwargs.update(device_kwargs)

    # Try starting the container, if it fails, remove it and try again
    logger.debug(f"Starting container: {name}")
    logger.debug(f"\timage: {image}")
    logger.debug(f"\tcommand: {command}")
    logger.debug(f"\tname: {name}")
    for k, v in kwargs.items():
        logger.debug(f"\t{k}: {v}")

    # Start container (pass through kwargs)
    try:
        container = self._client.containers.run(
            image,
            command=command,
            name=name,
            **kwargs,
        )
        logger.debug(f"Started container [name={name}, image={container.image}, id={container.id[:12]}]")
        logger.debug(f"Get logs using `docker logs -f {container.id[:12]}`")
    except (docker.errors.APIError, docker.errors.DockerException) as exc:
        self.stop(name)
        raise ServerException(f"Failed to start container [image={image}]", exc=exc)
    return container

stop

stop(name: str, timeout: int = 30) -> Container

Stop docker container.

Source code in nos/server/_docker.py
def stop(self, name: str, timeout: int = 30) -> docker.models.containers.Container:
    """Stop docker container."""
    try:
        container = self.get_container(name)
        if container is None:
            logger.debug(f"Container not running: {name}, exiting early.")
            return
        logger.debug(f"Removing container: [name={name}, image={container.image}, id={container.id[:12]}]")
        container.remove(force=True)
        logger.debug(f"Removed container: [name={name}, image={container.image}, id={container.id[:12]}]")
    except (docker.errors.APIError, docker.errors.DockerException) as exc:
        raise ServerException(f"Failed to stop container [name={name}]", exc=exc)
    return container

get_container_id

get_container_id(name: str) -> Optional[str]

Get the runtime container ID.

Source code in nos/server/_docker.py
def get_container_id(self, name: str) -> Optional[str]:
    """Get the runtime container ID."""
    container = self.get_container(name)
    return container.id if container else None

get_container

get_container(id_or_name: str) -> Container

Get container by id or name.

Source code in nos/server/_docker.py
def get_container(self, id_or_name: str) -> docker.models.containers.Container:
    """Get container by id or name."""
    try:
        return self._client.containers.get(id_or_name)
    except docker.errors.NotFound:
        return None

get_container_status

get_container_status(id_or_name: str) -> Optional[str]

Get container status by id or name.

Source code in nos/server/_docker.py
def get_container_status(self, id_or_name: str) -> Optional[str]:
    """Get container status by id or name."""
    container = self.get_container(id_or_name)
    return container.status if container else None

get_container_logs

get_container_logs(name: str, **kwargs) -> Iterable[str]

Get container logs.

Source code in nos/server/_docker.py
def get_container_logs(self, name: str, **kwargs) -> Iterable[str]:
    """Get container logs."""
    try:
        container = self.get_container(name)
        if container is None:
            return iter([])

        for line in container.logs(stream=True):
            yield line.decode("utf-8")
    except (docker.errors.APIError, docker.errors.DockerException) as exc:
        logger.error(f"Failed to get container logs: {exc}")
        raise ServerException("Failed to get container logs [name={name}]", exc=exc)

InferenceServiceRuntime

nos.server._runtime.InferenceServiceRuntime

Inference service runtime.

This class is responsible for handling the lifecycle of the inference service docker runtime.

Attributes:

  • configs (InferenceServiceConfig) –

    Inference service configuration.

Source code in nos/server/_runtime.py
class InferenceServiceRuntime:
    """Inference service runtime.

    This class is responsible for handling the lifecycle of the
    inference service docker runtime.

    Attributes:
        configs (InferenceServiceConfig): Inference service configuration.
    """

    configs = {
        "cpu": InferenceServiceRuntimeConfig(
            image=NOS_DOCKER_IMAGE_CPU,
            name=f"{NOS_INFERENCE_SERVICE_CONTAINER_NAME}-cpu",
            kwargs={
                "nano_cpus": int(6e9),
                "mem_limit": "6g",
                "log_config": {"type": LogConfig.types.JSON, "config": {"max-size": "100m", "max-file": "10"}},
            },
        ),
        "gpu": InferenceServiceRuntimeConfig(
            image=NOS_DOCKER_IMAGE_GPU,
            name=f"{NOS_INFERENCE_SERVICE_CONTAINER_NAME}-gpu",
            device="gpu",
            kwargs={
                "nano_cpus": int(8e9),
                "mem_limit": "12g",
                "log_config": {"type": LogConfig.types.JSON, "config": {"max-size": "100m", "max-file": "10"}},
            },
        ),
        "trt": InferenceServiceRuntimeConfig(
            image="autonomi/nos:latest-trt",
            name=f"{NOS_INFERENCE_SERVICE_CONTAINER_NAME}-trt",
            device="gpu",
            kwargs={
                "nano_cpus": int(8e9),
                "mem_limit": "12g",
                "log_config": {"type": LogConfig.types.JSON, "config": {"max-size": "100m", "max-file": "10"}},
            },
        ),
        "inf2": InferenceServiceRuntimeConfig(
            image="autonomi/nos:latest-inf2",
            name=f"{NOS_INFERENCE_SERVICE_CONTAINER_NAME}-inf2",
            device="inf2",
            environment=_default_environment({"NEURON_RT_VISIBLE_CORES": 2}),
            kwargs={
                "nano_cpus": int(8e9),
                "log_config": {"type": LogConfig.types.JSON, "config": {"max-size": "100m", "max-file": "10"}},
            },
        ),
    }

    def __init__(self, runtime: str = "cpu", name: str = None):
        """Initialize the inference runtime.

        Args:
            runtime (str, optional): Inference runtime. Defaults to "cpu".
            name (str, optional): Inference runtime name. Defaults to "nos-inference-service".
        """
        if runtime not in self.configs:
            raise ValueError(f"Invalid inference runtime: {runtime}, available: {list(self.configs.keys())}")
        self.cfg = copy.deepcopy(self.configs[runtime])
        if name is not None:
            self.cfg.name = name

        self._runtime = DockerRuntime.get()

    def __repr__(self) -> str:
        return f"InferenceServiceRuntime(image={self.cfg.image}, name={self.cfg.name}, device={self.cfg.device})"

    @staticmethod
    def detect() -> str:
        """Auto-detect inference runtime."""
        from nos.common.system import has_gpu, is_aws_inf2

        if is_aws_inf2():
            return "inf2"
        elif has_gpu():
            return "gpu"
        else:
            return "cpu"

    @staticmethod
    def devices() -> List[str]:
        """Auto-detect devices to use."""
        from nos.common.system import has_gpu, is_aws_inf2

        if is_aws_inf2():
            return [str(p) for p in Path("/dev/").glob("neuron*")]
        elif has_gpu():
            return []
        else:
            return []

    @staticmethod
    def list(**kwargs) -> List[docker.models.containers.Container]:
        """List running docker containers."""
        containers = DockerRuntime.get().list(**kwargs)
        return [
            container for container in containers if container.name.startswith(NOS_INFERENCE_SERVICE_CONTAINER_NAME)
        ]

    @classmethod
    def supported_runtimes(cls) -> List[str]:
        """Get supported runtimes."""
        return list(cls.configs.keys())

    def start(self, **kwargs: Any) -> docker.models.containers.Container:
        """Start the inference runtime.

        Args:
            **kwargs: Additional keyword-arguments to pass to `DockerRuntime.start`.
        """
        logger.debug(f"Starting inference runtime with image: {self.cfg.image}")

        # Override dict values
        for k in ("ports", "volumes", "environment"):
            if k in kwargs:
                self.cfg.__dict__[k].update(kwargs.pop(k))
                logger.debug(f"Updating runtime configuration [key={k}, value={self.cfg.__dict__[k]}]")

        # Override config with supplied kwargs
        for k in list(kwargs.keys()):
            value = kwargs[k]
            if hasattr(self.cfg, k):
                setattr(self.cfg, k, value)
                logger.debug(f"Overriding inference runtime config: {k}={value}")
            else:
                self.cfg.kwargs[k] = value

        # Start inference runtime
        container = self._runtime.start(
            image=self.cfg.image,
            name=self.cfg.name,
            command=self.cfg.command,
            ports=self.cfg.ports,
            environment=self.cfg.environment,
            volumes=self.cfg.volumes,
            detach=self.cfg.detach,
            device=self.cfg.device,
            ipc_mode=self.cfg.ipc_mode,
            **self.cfg.kwargs,
        )
        logger.debug(f"Started inference runtime: {self}")
        return container

    def stop(self, timeout: int = 30) -> docker.models.containers.Container:
        return self._runtime.stop(self.cfg.name, timeout=timeout)

    def get_container(self) -> docker.models.containers.Container:
        return self._runtime.get_container(self.cfg.name)

    def get_container_name(self) -> Optional[str]:
        return self._runtime.get_container(self.cfg.name).name

    def get_container_id(self) -> Optional[str]:
        return self._runtime.get_container_id(self.cfg.name)

    def get_container_status(self) -> Optional[str]:
        return self._runtime.get_container_status(self.cfg.name)

    def get_container_logs(self, **kwargs) -> Iterable[str]:
        return self._runtime.get_container_logs(self.cfg.name, **kwargs)

__init__

__init__(runtime: str = 'cpu', name: str = None)

Initialize the inference runtime.

Parameters:

  • runtime (str, default: 'cpu' ) –

    Inference runtime. Defaults to "cpu".

  • name (str, default: None ) –

    Inference runtime name. Defaults to "nos-inference-service".

Source code in nos/server/_runtime.py
def __init__(self, runtime: str = "cpu", name: str = None):
    """Initialize the inference runtime.

    Args:
        runtime (str, optional): Inference runtime. Defaults to "cpu".
        name (str, optional): Inference runtime name. Defaults to "nos-inference-service".
    """
    if runtime not in self.configs:
        raise ValueError(f"Invalid inference runtime: {runtime}, available: {list(self.configs.keys())}")
    self.cfg = copy.deepcopy(self.configs[runtime])
    if name is not None:
        self.cfg.name = name

    self._runtime = DockerRuntime.get()

detect staticmethod

detect() -> str

Auto-detect inference runtime.

Source code in nos/server/_runtime.py
@staticmethod
def detect() -> str:
    """Auto-detect inference runtime."""
    from nos.common.system import has_gpu, is_aws_inf2

    if is_aws_inf2():
        return "inf2"
    elif has_gpu():
        return "gpu"
    else:
        return "cpu"

devices staticmethod

devices() -> List[str]

Auto-detect devices to use.

Source code in nos/server/_runtime.py
@staticmethod
def devices() -> List[str]:
    """Auto-detect devices to use."""
    from nos.common.system import has_gpu, is_aws_inf2

    if is_aws_inf2():
        return [str(p) for p in Path("/dev/").glob("neuron*")]
    elif has_gpu():
        return []
    else:
        return []

list staticmethod

list(**kwargs) -> List[Container]

List running docker containers.

Source code in nos/server/_runtime.py
@staticmethod
def list(**kwargs) -> List[docker.models.containers.Container]:
    """List running docker containers."""
    containers = DockerRuntime.get().list(**kwargs)
    return [
        container for container in containers if container.name.startswith(NOS_INFERENCE_SERVICE_CONTAINER_NAME)
    ]

supported_runtimes classmethod

supported_runtimes() -> List[str]

Get supported runtimes.

Source code in nos/server/_runtime.py
@classmethod
def supported_runtimes(cls) -> List[str]:
    """Get supported runtimes."""
    return list(cls.configs.keys())

start

start(**kwargs: Any) -> Container

Start the inference runtime.

Parameters:

  • **kwargs (Any, default: {} ) –

    Additional keyword-arguments to pass to DockerRuntime.start.

Source code in nos/server/_runtime.py
def start(self, **kwargs: Any) -> docker.models.containers.Container:
    """Start the inference runtime.

    Args:
        **kwargs: Additional keyword-arguments to pass to `DockerRuntime.start`.
    """
    logger.debug(f"Starting inference runtime with image: {self.cfg.image}")

    # Override dict values
    for k in ("ports", "volumes", "environment"):
        if k in kwargs:
            self.cfg.__dict__[k].update(kwargs.pop(k))
            logger.debug(f"Updating runtime configuration [key={k}, value={self.cfg.__dict__[k]}]")

    # Override config with supplied kwargs
    for k in list(kwargs.keys()):
        value = kwargs[k]
        if hasattr(self.cfg, k):
            setattr(self.cfg, k, value)
            logger.debug(f"Overriding inference runtime config: {k}={value}")
        else:
            self.cfg.kwargs[k] = value

    # Start inference runtime
    container = self._runtime.start(
        image=self.cfg.image,
        name=self.cfg.name,
        command=self.cfg.command,
        ports=self.cfg.ports,
        environment=self.cfg.environment,
        volumes=self.cfg.volumes,
        detach=self.cfg.detach,
        device=self.cfg.device,
        ipc_mode=self.cfg.ipc_mode,
        **self.cfg.kwargs,
    )
    logger.debug(f"Started inference runtime: {self}")
    return container

InferenceService

The InferenceService along with the InferenceServiceImpl gRPC service implementation provides a fully wrapped inference service via gRPC/HTTP2. The InferenceServiceImpl wraps the relevant API services such as ListModels(), GetModelInfo() and crucially Run() and executes the inference request via the InferenceService class. The InferenceService class manages models via the ModelManager, and sets up the necessary execution backend via RayExecutor. In addition to this, it is also responsible for managing shared memory regions (if requested) for high-performance inference running locally in a single machine.

nos.server._service.ModelHandle dataclass

Model handles for distributed model execution.

Usage
# Initialize a model handle
>> model = ModelHandle(spec, num_replicas=1)

# Call the task immediately
>> response = model(**model_inputs)

# Call a method on the model handle
>> response = model.process_images(**model_inputs)

# Submit a task to the model handle,
# this will add results to the queue
>> model.submit(**model_inputs)
# Fetch the next result from the queue
>> response = model.get()

# Submit a task to a specific model handle method
>> model.submit(**model_inputs, _method="process_images")

# Submit a task to the model handle,
# this will add results to the queue
>> model_handle.submit(**model_inputs)
# Fetch the next result from the queue
>> response = model_handle.get()

# Cleanup model resources
>> model_handle.cleanup()
Source code in nos/managers/model.py
@dataclass
class ModelHandle:
    """Model handles for distributed model execution.

    Usage:
        ```python
        # Initialize a model handle
        >> model = ModelHandle(spec, num_replicas=1)

        # Call the task immediately
        >> response = model(**model_inputs)

        # Call a method on the model handle
        >> response = model.process_images(**model_inputs)

        # Submit a task to the model handle,
        # this will add results to the queue
        >> model.submit(**model_inputs)
        # Fetch the next result from the queue
        >> response = model.get()

        # Submit a task to a specific model handle method
        >> model.submit(**model_inputs, _method="process_images")

        # Submit a task to the model handle,
        # this will add results to the queue
        >> model_handle.submit(**model_inputs)
        # Fetch the next result from the queue
        >> response = model_handle.get()

        # Cleanup model resources
        >> model_handle.cleanup()
        ```
    """

    spec: ModelSpec
    """Model specification."""
    deployment: ModelDeploymentSpec = field(default_factory=ModelDeploymentSpec)
    """Number of replicas."""
    _actors: List[Union[ray.remote, ray.actor.ActorHandle]] = field(init=False, default=None)
    """Ray actor handle."""
    _actor_pool: ActorPool = field(init=False, default=None)
    """Ray actor pool."""
    _actor_options: Dict[str, Any] = field(init=False, default=None)
    """Ray actor options."""

    def __post_init__(self):
        """Initialize the actor handles."""
        self._actor_options = self._get_actor_options(self.spec, self.deployment)
        self._actors = [self._get_actor() for _ in range(self.deployment.num_replicas)]
        self._actor_pool = ActorPool(self._actors)

        # Patch the model handle with methods from the model spec signature
        for method in self.spec.signature:
            # Note (spillai): We do not need to patch the __call__ method
            # since it is already re-directed in the model handle.
            if hasattr(self, method):
                logger.debug(f"Model handle ({self}) already has method ({method}), skipping ....")
                continue

            # Methods:
            #   >> handle.process_images: ModelHandlePartial
            #   >> handle.process_images(images=...) => handle.__call__(images=..., _method="process_images")
            #   >> handle.process_images.submit(images=...) => handle.submit(images=..., _method="process_images")
            setattr(self, method, ModelHandlePartial(self, method))

    def __repr__(self) -> str:
        assert len(self._actors) == self.num_replicas
        opts_str = ", ".join([f"{k}={v}" for k, v in self._actor_options.items()])
        return f"ModelHandle(name={self.spec.name}, replicas={len(self._actors)}, opts=({opts_str}))"

    @property
    def num_replicas(self) -> int:
        """Get the number of replicas."""
        return self.deployment.num_replicas

    @classmethod
    def _get_actor_options(cls, spec: ModelSpec, deployment: ModelDeploymentSpec) -> Dict[str, Any]:
        """Get actor options from model specification."""
        # TOFIX (spillai): When considering CPU-only models with num_cpus specified,
        # OMP_NUM_THREADS will be set to the number of CPUs requested. Otherwise,
        # if num_cpus is not specified, OMP_NUM_THREADS will default to 1.
        # Instead, for now, we manually set the environment variable in `InferenceServiceRuntime`
        # to the number of CPUs threads available.

        # If deployment resources are not specified, get the model resources from the catalog
        if deployment.resources is None:
            try:
                catalog = ModelSpecMetadataCatalog.get()
                resources: ModelResources = catalog._resources_catalog[f"{spec.id}/{spec.default_method}"]
            except Exception:
                resources = ModelResources()
                logger.debug(f"Failed to get model resources [model={spec.id}, method={spec.default_method}]")

        # Otherwise, use the deployment resources provided
        else:
            resources = deployment.resources

        # For GPU models, we need to set the number of fractional GPUs to use
        if (resources.device == "auto" or resources.device == "gpu") and torch.cuda.is_available():
            try:
                # TODO (spillai): This needs to be resolved differently for
                # multi-node clusters.
                # Determine the current device id by checking the number of GPUs used.
                total, available = ray.cluster_resources(), ray.available_resources()
                gpus_used = total["GPU"] - available["GPU"]
                device_id = int(gpus_used)

                if isinstance(resources.device_memory, str) and resources.device_memory == "auto":
                    gpu_frac = 1.0 / NOS_MAX_CONCURRENT_MODELS
                    actor_opts = {"num_gpus": gpu_frac}
                elif isinstance(resources.device_memory, int):
                    # Fractional GPU memory needed within the current device
                    device_memory = torch.cuda.get_device_properties(device_id).total_memory
                    gpu_frac = float(resources.device_memory) / device_memory
                    gpu_frac = round(gpu_frac * 10) / 10.0

                    # Fractional GPU used for the current device
                    gpu_frac_used = gpus_used - int(gpus_used)
                    gpu_frac_avail = (1 - gpu_frac_used) * device_memory
                    logger.debug(
                        f"""actor_opts [model={spec.id}, """
                        f"""mem={humanize.naturalsize(resources.device_memory, binary=True)}, device={device_id}, device_mem={humanize.naturalsize(device_memory, binary=True)}, """
                        f"""gpu_frac={gpu_frac}, gpu_frac_avail={gpu_frac_avail}, """
                        f"""gpu_frac_used={gpu_frac_used}]"""
                    )
                    if gpu_frac > gpu_frac_avail:
                        logger.debug(
                            f"Insufficient GPU memory for model [model={spec.id}, "
                            f"method={spec.default_method}, gpu_frac={gpu_frac}, "
                            f"gpu_frac_avail={gpu_frac_avail}, gpu_frac_used={gpu_frac_used}]"
                        )
                        if device_id == torch.cuda.device_count() - 1:
                            # TOFIX (spillai): evict models to make space for the current model
                            logger.debug("All GPUs are fully utilized, this may result in undesirable behavior.")
                    actor_opts = {"num_gpus": gpu_frac}
                else:
                    raise ValueError(f"Invalid device memory: {resources.device_memory}")
            except Exception as exc:
                logger.debug(f"Failed to get GPU memory [e={exc}].")
                actor_opts = {"num_gpus": 1.0 / NOS_MAX_CONCURRENT_MODELS}

        elif resources.device == "cpu":
            actor_opts = {"num_cpus": resources.cpus, "memory": resources.memory}

        else:
            actor_opts = {"num_cpus": resources.cpus, "memory": resources.memory}

        if spec.runtime_env is not None:
            logger.debug("Using custom runtime environment, this may take a while to build.")
            actor_opts["runtime_env"] = RuntimeEnv(**spec.runtime_env.model_dump())
        logger.debug(f"Actor options [id={spec.id}, opts={actor_opts}]")

        return actor_opts

    def _get_actor(self) -> Union[ray.remote, ray.actor.ActorHandle]:
        """Get an actor handle from model specification.

        Returns:
            Union[ray.remote, ray.actor.ActorHandle]: Ray actor handle.
        """
        # TODO (spillai): Use the auto-tuned model spec to instantiate an
        # actor the desired memory requirements. Fractional GPU amounts
        # will need to be calculated from the target HW and model spec
        # (i.e. 0.5 on A100 vs. T4 are different).
        # NOTE (spillai): Using default signature here is OK, since
        # all the signatures for a model spec have the same `func_or_cls`.
        model_cls = self.spec.default_signature.func_or_cls

        # Get the actor options from the model spec
        actor_options = self._actor_options
        actor_cls = ray.remote(**actor_options)(model_cls)

        # Check if the model class has the required method
        logger.debug(
            f"Creating actor [actor={actor_cls}, opts={actor_options}, cls={model_cls}, init_args={self.spec.default_signature.init_args}, init_kwargs={self.spec.default_signature.init_kwargs}]"
        )
        actor = actor_cls.remote(*self.spec.default_signature.init_args, **self.spec.default_signature.init_kwargs)

        # Note: Only check if default signature method is implemented
        # even though other methods may be implemented and used.
        if not hasattr(actor, self.spec.default_method):
            raise NotImplementedError(f"Model class {model_cls} does not have {self.spec.default_method} implemented.")
        logger.debug(f"Creating actor [actor={actor}, opts={actor_options}, cls={model_cls}]")

        # Add some memory logs to this actor
        if NOS_MEMRAY_ENABLED:
            # Replace all non-alphanumeric characters with underscores
            actor_name = re.sub(r"\W+", "_", str(actor))
            log_name = Path(NOS_RAY_LOGS_DIR) / f"{actor_name}_mem_profile.bin"
            if log_name.exists():
                log_name.unlink()
            try:
                memray.Tracker(log_name).__enter__()
            except Exception:
                logger.error("Failed to iniitialize memray tracker.")
        return actor

    def __call__(self, *args: Any, **kwargs: Any) -> Any:
        """Call the task immediately.

        Args:
            *args: Model arguments.
            **kwargs: Model keyword arguments
                (except for special `_method` keyword that is
                used to call different class methods).
        Returns:
            Model response.
        """
        assert len(self._actors) >= 1, "Model should have atleast one replica."
        if self.num_replicas > 1:
            logger.warning("Model has >1 replicas, use `.submit()` instead to fully utilize them.")

        method: str = kwargs.pop("_method", self.spec.default_method)
        # TODO (spillai): We should be able to determine if the output
        # is an iterable or not from the signature, and set the default
        stream: bool = kwargs.pop("_stream", False)
        actor_method_func = getattr(self._actors[0], method)
        if not stream:
            response_ref: ray.ObjectRef = actor_method_func.remote(**kwargs)
            return ray.get(response_ref)
        else:
            response_refs: Iterable[ray.ObjectRef] = actor_method_func.options(num_returns="streaming").remote(
                **kwargs
            )
            return _StreamingModelHandleResponse(response_refs)

    def scale(self, num_replicas: Union[int, str] = 1) -> "ModelHandle":
        """Scale the model handle to a new number of replicas.

        Args:
            num_replicas (int or str): Number of replicas, or set to "auto" to
                automatically scale the model to the number of GPUs available.
        """
        if isinstance(num_replicas, str) and num_replicas == "auto":
            raise NotImplementedError("Automatic scaling not implemented.")
        if not isinstance(num_replicas, int):
            raise ValueError(f"Invalid replicas: {num_replicas}")

        # Check if there are any pending futures
        if self._actor_pool.has_next():
            logger.warning(f"Pending futures detected, this may result in dropped queue items [name={self.spec.name}]")
        logger.debug(f"Waiting for pending futures to complete before scaling [name={self.spec.name}].")
        logger.debug(f"Scaling model [name={self.spec.name}].")

        if num_replicas == len(self._actors):
            logger.debug(f"Model already scaled appropriately [name={self.spec.name}, replicas={num_replicas}].")
            return self
        elif num_replicas > len(self._actors):
            self._actors += [self._get_actor() for _ in range(num_replicas - len(self._actors))]
            logger.debug(f"Scaling up model [name={self.spec.name}, replicas={num_replicas}].")
        else:
            actors_to_remove = self._actors[num_replicas:]
            for actor in actors_to_remove:
                ray.kill(actor)
            self._actors = self._actors[:num_replicas]

            logger.debug(f"Scaling down model [name={self.spec.name}, replicas={num_replicas}].")

        # Update repicas and queue size
        self.deployment.num_replicas = num_replicas

        # Re-create the actor pool
        logger.debug(f"Removing actor pool [replicas={len(self._actors)}].")
        del self._actor_pool
        self._actor_pool = None

        # Re-create the actor pool
        logger.debug(f"Re-creating actor pool [name={self.spec.name}, replicas={num_replicas}].")
        self._actor_pool = ActorPool(self._actors)
        assert len(self._actors) == num_replicas, "Model scaling failed."
        gc.collect()
        return self

    def submit(self, *args: Any, **kwargs: Any) -> ray.ObjectRef:
        """Submit a task to the actor pool.

        Note (spillai): Caveats for `.submit()` with custom methods:
            ModelHandles have a single result queue that add
            results asynchronously on task completion. Calling `submit()`
            with different methods interchangably will result in
            the results queue being populated with results from
            different methods. In other words, it is advised to
            use `submit()` with the same method for a given model
            and then use `get()` to fetch all the results, before
            calling `submit()` with a different method.

        Args:
            *args: Model arguments.
            **kwargs: Model keyword arguments
                (except for special `_method` keyword that is
                used to call different class methods).

        Returns:
            ray.ObjectRef: Ray object reference as a string.
        """
        # Submit the task to the actor pool, leveraging all replicas
        method: str = kwargs.pop("_method", self.spec.default_method)
        # TODO (spillai): We should be able to determine if the output
        # is an iterable or not from the signature, and set the default
        stream: bool = kwargs.pop("_stream", False)
        remote_opts = {"num_returns": "streaming"} if stream else {}
        if not self._actor_pool._idle_actors:
            logger.warning(f"Actor pool is full, this may result in dropped queue items [name={self.spec.name}]")
        future_ref = self._actor_pool.submit(
            lambda a, v: getattr(a, method).options(**remote_opts).remote(**v), kwargs
        )
        logger.info(f"Submitted task [name={self.spec.name}, method={method}, kwargs={kwargs}]")
        return future_ref

    def cleanup(self) -> None:
        """Kill all the actor handles and garbage collect."""
        for actor_handle in self._actors:
            ray.kill(actor_handle)
        self._actors = []
        gc.collect()

    def get(self, future_ref: ray.ObjectRef = None, timeout: int = None) -> Any:
        """Get the result future."""
        return self._actor_pool.get(future_ref)

    async def async_get(self, future_ref: ray.ObjectRef = None, timeout: int = None) -> Any:
        """Get the result future asynchronously."""
        return await self._actor_pool.async_get(future_ref)

spec instance-attribute

spec: ModelSpec

Model specification.

deployment class-attribute instance-attribute

deployment: ModelDeploymentSpec = field(default_factory=ModelDeploymentSpec)

Number of replicas.

num_replicas property

num_replicas: int

Get the number of replicas.

__post_init__

__post_init__()

Initialize the actor handles.

Source code in nos/managers/model.py
def __post_init__(self):
    """Initialize the actor handles."""
    self._actor_options = self._get_actor_options(self.spec, self.deployment)
    self._actors = [self._get_actor() for _ in range(self.deployment.num_replicas)]
    self._actor_pool = ActorPool(self._actors)

    # Patch the model handle with methods from the model spec signature
    for method in self.spec.signature:
        # Note (spillai): We do not need to patch the __call__ method
        # since it is already re-directed in the model handle.
        if hasattr(self, method):
            logger.debug(f"Model handle ({self}) already has method ({method}), skipping ....")
            continue

        # Methods:
        #   >> handle.process_images: ModelHandlePartial
        #   >> handle.process_images(images=...) => handle.__call__(images=..., _method="process_images")
        #   >> handle.process_images.submit(images=...) => handle.submit(images=..., _method="process_images")
        setattr(self, method, ModelHandlePartial(self, method))

__call__

__call__(*args: Any, **kwargs: Any) -> Any

Call the task immediately.

Parameters:

  • *args (Any, default: () ) –

    Model arguments.

  • **kwargs (Any, default: {} ) –

    Model keyword arguments (except for special _method keyword that is used to call different class methods).

Returns: Model response.

Source code in nos/managers/model.py
def __call__(self, *args: Any, **kwargs: Any) -> Any:
    """Call the task immediately.

    Args:
        *args: Model arguments.
        **kwargs: Model keyword arguments
            (except for special `_method` keyword that is
            used to call different class methods).
    Returns:
        Model response.
    """
    assert len(self._actors) >= 1, "Model should have atleast one replica."
    if self.num_replicas > 1:
        logger.warning("Model has >1 replicas, use `.submit()` instead to fully utilize them.")

    method: str = kwargs.pop("_method", self.spec.default_method)
    # TODO (spillai): We should be able to determine if the output
    # is an iterable or not from the signature, and set the default
    stream: bool = kwargs.pop("_stream", False)
    actor_method_func = getattr(self._actors[0], method)
    if not stream:
        response_ref: ray.ObjectRef = actor_method_func.remote(**kwargs)
        return ray.get(response_ref)
    else:
        response_refs: Iterable[ray.ObjectRef] = actor_method_func.options(num_returns="streaming").remote(
            **kwargs
        )
        return _StreamingModelHandleResponse(response_refs)

scale

scale(num_replicas: Union[int, str] = 1) -> ModelHandle

Scale the model handle to a new number of replicas.

Parameters:

  • num_replicas (int or str, default: 1 ) –

    Number of replicas, or set to "auto" to automatically scale the model to the number of GPUs available.

Source code in nos/managers/model.py
def scale(self, num_replicas: Union[int, str] = 1) -> "ModelHandle":
    """Scale the model handle to a new number of replicas.

    Args:
        num_replicas (int or str): Number of replicas, or set to "auto" to
            automatically scale the model to the number of GPUs available.
    """
    if isinstance(num_replicas, str) and num_replicas == "auto":
        raise NotImplementedError("Automatic scaling not implemented.")
    if not isinstance(num_replicas, int):
        raise ValueError(f"Invalid replicas: {num_replicas}")

    # Check if there are any pending futures
    if self._actor_pool.has_next():
        logger.warning(f"Pending futures detected, this may result in dropped queue items [name={self.spec.name}]")
    logger.debug(f"Waiting for pending futures to complete before scaling [name={self.spec.name}].")
    logger.debug(f"Scaling model [name={self.spec.name}].")

    if num_replicas == len(self._actors):
        logger.debug(f"Model already scaled appropriately [name={self.spec.name}, replicas={num_replicas}].")
        return self
    elif num_replicas > len(self._actors):
        self._actors += [self._get_actor() for _ in range(num_replicas - len(self._actors))]
        logger.debug(f"Scaling up model [name={self.spec.name}, replicas={num_replicas}].")
    else:
        actors_to_remove = self._actors[num_replicas:]
        for actor in actors_to_remove:
            ray.kill(actor)
        self._actors = self._actors[:num_replicas]

        logger.debug(f"Scaling down model [name={self.spec.name}, replicas={num_replicas}].")

    # Update repicas and queue size
    self.deployment.num_replicas = num_replicas

    # Re-create the actor pool
    logger.debug(f"Removing actor pool [replicas={len(self._actors)}].")
    del self._actor_pool
    self._actor_pool = None

    # Re-create the actor pool
    logger.debug(f"Re-creating actor pool [name={self.spec.name}, replicas={num_replicas}].")
    self._actor_pool = ActorPool(self._actors)
    assert len(self._actors) == num_replicas, "Model scaling failed."
    gc.collect()
    return self

submit

submit(*args: Any, **kwargs: Any) -> ObjectRef

Submit a task to the actor pool.

Note (spillai): Caveats for .submit() with custom methods: ModelHandles have a single result queue that add results asynchronously on task completion. Calling submit() with different methods interchangably will result in the results queue being populated with results from different methods. In other words, it is advised to use submit() with the same method for a given model and then use get() to fetch all the results, before calling submit() with a different method.

Parameters:

  • *args (Any, default: () ) –

    Model arguments.

  • **kwargs (Any, default: {} ) –

    Model keyword arguments (except for special _method keyword that is used to call different class methods).

Returns:

  • ObjectRef

    ray.ObjectRef: Ray object reference as a string.

Source code in nos/managers/model.py
def submit(self, *args: Any, **kwargs: Any) -> ray.ObjectRef:
    """Submit a task to the actor pool.

    Note (spillai): Caveats for `.submit()` with custom methods:
        ModelHandles have a single result queue that add
        results asynchronously on task completion. Calling `submit()`
        with different methods interchangably will result in
        the results queue being populated with results from
        different methods. In other words, it is advised to
        use `submit()` with the same method for a given model
        and then use `get()` to fetch all the results, before
        calling `submit()` with a different method.

    Args:
        *args: Model arguments.
        **kwargs: Model keyword arguments
            (except for special `_method` keyword that is
            used to call different class methods).

    Returns:
        ray.ObjectRef: Ray object reference as a string.
    """
    # Submit the task to the actor pool, leveraging all replicas
    method: str = kwargs.pop("_method", self.spec.default_method)
    # TODO (spillai): We should be able to determine if the output
    # is an iterable or not from the signature, and set the default
    stream: bool = kwargs.pop("_stream", False)
    remote_opts = {"num_returns": "streaming"} if stream else {}
    if not self._actor_pool._idle_actors:
        logger.warning(f"Actor pool is full, this may result in dropped queue items [name={self.spec.name}]")
    future_ref = self._actor_pool.submit(
        lambda a, v: getattr(a, method).options(**remote_opts).remote(**v), kwargs
    )
    logger.info(f"Submitted task [name={self.spec.name}, method={method}, kwargs={kwargs}]")
    return future_ref

cleanup

cleanup() -> None

Kill all the actor handles and garbage collect.

Source code in nos/managers/model.py
def cleanup(self) -> None:
    """Kill all the actor handles and garbage collect."""
    for actor_handle in self._actors:
        ray.kill(actor_handle)
    self._actors = []
    gc.collect()

get

get(future_ref: ObjectRef = None, timeout: int = None) -> Any

Get the result future.

Source code in nos/managers/model.py
def get(self, future_ref: ray.ObjectRef = None, timeout: int = None) -> Any:
    """Get the result future."""
    return self._actor_pool.get(future_ref)

async_get async

async_get(future_ref: ObjectRef = None, timeout: int = None) -> Any

Get the result future asynchronously.

Source code in nos/managers/model.py
async def async_get(self, future_ref: ray.ObjectRef = None, timeout: int = None) -> Any:
    """Get the result future asynchronously."""
    return await self._actor_pool.async_get(future_ref)

nos.server._service.InferenceServiceImpl

Experimental gRPC-based inference service.

This service is used to serve models over gRPC.

Refer to the bring-your-own-schema section: https://docs.ray.io/en/master/serve/direct-ingress.html?highlight=grpc#bring-your-own-schema

Source code in nos/server/_service.py
class InferenceServiceImpl(nos_service_pb2_grpc.InferenceServiceServicer, InferenceService):
    """
    Experimental gRPC-based inference service.

    This service is used to serve models over gRPC.

    Refer to the bring-your-own-schema section:
    https://docs.ray.io/en/master/serve/direct-ingress.html?highlight=grpc#bring-your-own-schema
    """

    def __init__(self, catalog_filename: str = None):
        super().__init__()
        self._tmp_files = {}

        if catalog_filename is None:
            return

        if not Path(catalog_filename).exists():
            raise ValueError(f"Model catalog not found [catalog={catalog_filename}]")

        # Register models from the catalog
        services: List[ModelServiceSpec] = hub.register_from_yaml(catalog_filename)
        for svc in services:
            logger.debug(f"Servicing model [svc={svc}, replicas={svc.deployment.num_replicas}]")
            self.load_model_spec(svc.model, svc.deployment)
            logger.debug(f"Deployed model [svc={svc}]. \n{self.model_manager}")

    async def Ping(self, _: empty_pb2.Empty, context: grpc.ServicerContext) -> nos_service_pb2.PingResponse:
        """Health check."""
        return nos_service_pb2.PingResponse(status="ok")

    def GetServiceInfo(self, _: empty_pb2.Empty, context: grpc.ServicerContext) -> nos_service_pb2.ServiceInfoResponse:
        """Get information on the service."""
        from nos.common.system import has_gpu, is_inside_docker

        if is_inside_docker():
            runtime = "gpu" if has_gpu() else "cpu"
        else:
            runtime = "local"
        return nos_service_pb2.ServiceInfoResponse(version=__version__, runtime=runtime)

    def ListModels(self, _: empty_pb2.Empty, context: grpc.ServicerContext) -> nos_service_pb2.GenericResponse:
        """List all models."""
        models = list(hub.list())
        logger.debug(f"ListModels() [models={len(models)}]")
        return nos_service_pb2.GenericResponse(response_bytes=dumps(models))

    def GetModelCatalog(self, _: empty_pb2.Empty, context: grpc.ServicerContext) -> nos_service_pb2.GenericResponse:
        """Get the model catalog."""
        catalog = ModelSpecMetadataCatalog.get()
        logger.debug(f"GetModelCatalog() [catalog={catalog._metadata_catalog}]")
        return nos_service_pb2.GenericResponse(response_bytes=dumps(catalog))

    def GetModelInfo(
        self, request: wrappers_pb2.StringValue, context: grpc.ServicerContext
    ) -> nos_service_pb2.GenericResponse:
        """Get model information."""
        try:
            model_id = request.value
            spec: ModelSpec = hub.load_spec(model_id)
        except KeyError as e:
            logger.error(f"Failed to load spec [request={request}, e={e}]")
            context.abort(grpc.StatusCode.NOT_FOUND, str(e))
        return spec._to_proto()

    def LoadModel(self, request: nos_service_pb2.GenericRequest, context: grpc.ServicerContext) -> empty_pb2.Empty:
        """Load / scale the model to the specified number of replicas."""
        request: Dict[str, Any] = loads(request.request_bytes)
        logger.debug(f"ScaleModel() [request={request}]")
        try:
            model_id = request["id"]
            num_replicas = request.get("num_replicas", 1)
            self.load_model(model_id, num_replicas=num_replicas)
            return empty_pb2.Empty()
        except Exception as e:
            err_msg = f"Failed to scale model [model_id={model_id}, num_replicas={num_replicas}, e={e}]"
            logger.error(err_msg)
            context.abort(grpc.StatusCode.INTERNAL, err_msg)

    def RegisterSystemSharedMemory(
        self, request: nos_service_pb2.GenericRequest, context: grpc.ServicerContext
    ) -> nos_service_pb2.GenericResponse:
        """Register system shared memory under a specific namespace `<client_id>/<object_id>`."""
        if not NOS_SHM_ENABLED:
            context.abort(grpc.StatusCode.UNIMPLEMENTED, "Shared memory not enabled.")

        metadata = dict(context.invocation_metadata())
        client_id = metadata.get("client_id", None)
        object_id = metadata.get("object_id", None)
        namespace = f"{client_id}/{object_id}"
        logger.debug(f"Registering shm [client_id={client_id}, object_id={object_id}]")
        try:
            # Create a shared memory segment for the inputs
            # Note: The returned keys for shared memory segments are identical to the
            # keys in the input dictionary (i.e. <key>), and are not prefixed with the
            # namespace `<client_id>/<object_id>`.
            shm_map = self.shm_manager.create(loads(request.request_bytes), namespace=namespace)
            # Here, dumps() is used to serialize the shared memory numy objects via __getstate__().
            # The serialized data is then sent back to the client, which can then deserialized
            # and set via __setstate__() on the client-side, so that the client can access the shared
            # memory segments.
            logger.debug(f"Registered shm [client_id={client_id}, object_id={object_id}, shm_map={shm_map}]")
            return nos_service_pb2.GenericResponse(response_bytes=dumps(shm_map))
        except Exception as e:
            logger.error(f"Failed to register system shared memory: {e}")
            context.abort(grpc.StatusCode.INTERNAL, str(e))

    def UnregisterSystemSharedMemory(
        self, request: nos_service_pb2.GenericRequest, context: grpc.ServicerContext
    ) -> nos_service_pb2.GenericResponse:
        """Unregister system shared memory for specific namespace `<client_id>/<object_id>`."""
        if not NOS_SHM_ENABLED:
            context.abort(context, grpc.StatusCode.UNIMPLEMENTED, "Shared memory not enabled.")

        metadata = dict(context.invocation_metadata())
        client_id = metadata.get("client_id", None)
        object_id = metadata.get("object_id", None)
        namespace = f"{client_id}/{object_id}"
        # TODO (spillai): Currently, we can ignore the `request` provided
        # by the client, since all the shared memory segments under the namespace are deleted.
        logger.debug(f"Unregistering shm [client_id={client_id}, object_id={object_id}]")
        try:
            self.shm_manager.cleanup(namespace=namespace)
        except Exception as e:
            logger.error(f"Failed to unregister shm [e{e}]")
            context.abort(grpc.StatusCode.INTERNAL, str(e))
        return nos_service_pb2.GenericResponse()

    def UploadFile(self, request_iterator: Any, context: grpc.ServicerContext) -> nos_service_pb2.GenericResponse:
        """Upload a file."""
        for _chunk_idx, chunk_request in enumerate(request_iterator):
            chunk = loads(chunk_request.request_bytes)
            chunk_bytes = chunk["chunk_bytes"]
            path = Path(chunk["filename"])
            if str(path) not in self._tmp_files:
                tmp_file = NamedTemporaryFile(delete=False, dir="/tmp", suffix=path.suffix)
                self._tmp_files[str(path)] = tmp_file
                logger.debug(
                    f"Streaming upload [path={tmp_file.name}, size={Path(tmp_file.name).stat().st_size / (1024 * 1024):.2f} MB]"
                )
            else:
                tmp_file = self._tmp_files[str(path)]
            with open(tmp_file.name, "ab") as f:
                f.write(chunk_bytes)
        return nos_service_pb2.GenericResponse(response_bytes=dumps({"filename": tmp_file.name}))

    def DeleteFile(self, request: nos_service_pb2.GenericRequest, context: grpc.ServicerContext) -> empty_pb2.Empty:
        """Delete a file by its file-identifier."""
        request = loads(request.request_bytes)

        filename = str(request["filename"])
        try:
            tmp_file = self._tmp_files[str(filename)]
            path = Path(tmp_file.name)
            assert path.exists(), f"File handle {filename} not found"
        except Exception as e:
            err_msg = f"Failed to delete file [filename={filename}, e={e}]"
            logger.error(err_msg)
            context.abort(grpc.StatusCode.NOT_FOUND, err_msg)

        logger.debug(f"Deleting file [path={path}]")
        path.unlink()
        del self._tmp_files[str(filename)]
        return empty_pb2.Empty()

    async def Run(
        self, request: nos_service_pb2.GenericRequest, context: grpc.ServicerContext
    ) -> nos_service_pb2.GenericResponse:
        """Main model prediction interface."""
        request: Dict[str, Any] = loads(request.request_bytes)
        try:
            st = time.perf_counter()
            logger.info(f"Executing request [model={request['id']}, method={request['method']}]")
            response = await self.execute_model(request["id"], method=request["method"], inputs=request["inputs"])
            logger.info(
                f"Executed request [model={request['id']}, method={request['method']}, elapsed={(time.perf_counter() - st) * 1e3:.1f}ms]"
            )
            return nos_service_pb2.GenericResponse(response_bytes=dumps(response))
        except (grpc.RpcError, Exception) as e:
            msg = f"Failed to execute request [model={request['id']}, method={request['method']}]"
            msg += f"{traceback.format_exc()}"
            logger.error(f"{msg}, e={e}")
            context.abort(grpc.StatusCode.INTERNAL, "Internal Server Error")

    async def Stream(
        self, request: nos_service_pb2.GenericRequest, context: grpc.ServicerContext
    ) -> Iterator[nos_service_pb2.GenericResponse]:
        """Main streaming model prediction interface."""
        request: Dict[str, Any] = loads(request.request_bytes)
        try:
            logger.info(f"Executing request [model={request['id']}, method={request['method']}]")
            response_stream = await self.execute_model(
                request["id"], method=request["method"], inputs=request["inputs"], stream=True
            )
            for response in response_stream:
                yield nos_service_pb2.GenericResponse(response_bytes=dumps(response))
            logger.info(f"Executed request [model={request['id']}, method={request['method']}]")
        except (grpc.RpcError, Exception) as e:
            msg = f"Failed to execute request [model={request['id']}, method={request['method']}]"
            msg += f"{traceback.format_exc()}"
            logger.error(f"{msg}, e={e}")
            context.abort(grpc.StatusCode.INTERNAL, "Internal Server Error")

Ping async

Ping(_: Empty, context: ServicerContext) -> PingResponse

Health check.

Source code in nos/server/_service.py
async def Ping(self, _: empty_pb2.Empty, context: grpc.ServicerContext) -> nos_service_pb2.PingResponse:
    """Health check."""
    return nos_service_pb2.PingResponse(status="ok")

GetServiceInfo

GetServiceInfo(_: Empty, context: ServicerContext) -> ServiceInfoResponse

Get information on the service.

Source code in nos/server/_service.py
def GetServiceInfo(self, _: empty_pb2.Empty, context: grpc.ServicerContext) -> nos_service_pb2.ServiceInfoResponse:
    """Get information on the service."""
    from nos.common.system import has_gpu, is_inside_docker

    if is_inside_docker():
        runtime = "gpu" if has_gpu() else "cpu"
    else:
        runtime = "local"
    return nos_service_pb2.ServiceInfoResponse(version=__version__, runtime=runtime)

ListModels

ListModels(_: Empty, context: ServicerContext) -> GenericResponse

List all models.

Source code in nos/server/_service.py
def ListModels(self, _: empty_pb2.Empty, context: grpc.ServicerContext) -> nos_service_pb2.GenericResponse:
    """List all models."""
    models = list(hub.list())
    logger.debug(f"ListModels() [models={len(models)}]")
    return nos_service_pb2.GenericResponse(response_bytes=dumps(models))

GetModelCatalog

GetModelCatalog(_: Empty, context: ServicerContext) -> GenericResponse

Get the model catalog.

Source code in nos/server/_service.py
def GetModelCatalog(self, _: empty_pb2.Empty, context: grpc.ServicerContext) -> nos_service_pb2.GenericResponse:
    """Get the model catalog."""
    catalog = ModelSpecMetadataCatalog.get()
    logger.debug(f"GetModelCatalog() [catalog={catalog._metadata_catalog}]")
    return nos_service_pb2.GenericResponse(response_bytes=dumps(catalog))

GetModelInfo

GetModelInfo(request: StringValue, context: ServicerContext) -> GenericResponse

Get model information.

Source code in nos/server/_service.py
def GetModelInfo(
    self, request: wrappers_pb2.StringValue, context: grpc.ServicerContext
) -> nos_service_pb2.GenericResponse:
    """Get model information."""
    try:
        model_id = request.value
        spec: ModelSpec = hub.load_spec(model_id)
    except KeyError as e:
        logger.error(f"Failed to load spec [request={request}, e={e}]")
        context.abort(grpc.StatusCode.NOT_FOUND, str(e))
    return spec._to_proto()

LoadModel

LoadModel(request: GenericRequest, context: ServicerContext) -> Empty

Load / scale the model to the specified number of replicas.

Source code in nos/server/_service.py
def LoadModel(self, request: nos_service_pb2.GenericRequest, context: grpc.ServicerContext) -> empty_pb2.Empty:
    """Load / scale the model to the specified number of replicas."""
    request: Dict[str, Any] = loads(request.request_bytes)
    logger.debug(f"ScaleModel() [request={request}]")
    try:
        model_id = request["id"]
        num_replicas = request.get("num_replicas", 1)
        self.load_model(model_id, num_replicas=num_replicas)
        return empty_pb2.Empty()
    except Exception as e:
        err_msg = f"Failed to scale model [model_id={model_id}, num_replicas={num_replicas}, e={e}]"
        logger.error(err_msg)
        context.abort(grpc.StatusCode.INTERNAL, err_msg)

RegisterSystemSharedMemory

RegisterSystemSharedMemory(request: GenericRequest, context: ServicerContext) -> GenericResponse

Register system shared memory under a specific namespace <client_id>/<object_id>.

Source code in nos/server/_service.py
def RegisterSystemSharedMemory(
    self, request: nos_service_pb2.GenericRequest, context: grpc.ServicerContext
) -> nos_service_pb2.GenericResponse:
    """Register system shared memory under a specific namespace `<client_id>/<object_id>`."""
    if not NOS_SHM_ENABLED:
        context.abort(grpc.StatusCode.UNIMPLEMENTED, "Shared memory not enabled.")

    metadata = dict(context.invocation_metadata())
    client_id = metadata.get("client_id", None)
    object_id = metadata.get("object_id", None)
    namespace = f"{client_id}/{object_id}"
    logger.debug(f"Registering shm [client_id={client_id}, object_id={object_id}]")
    try:
        # Create a shared memory segment for the inputs
        # Note: The returned keys for shared memory segments are identical to the
        # keys in the input dictionary (i.e. <key>), and are not prefixed with the
        # namespace `<client_id>/<object_id>`.
        shm_map = self.shm_manager.create(loads(request.request_bytes), namespace=namespace)
        # Here, dumps() is used to serialize the shared memory numy objects via __getstate__().
        # The serialized data is then sent back to the client, which can then deserialized
        # and set via __setstate__() on the client-side, so that the client can access the shared
        # memory segments.
        logger.debug(f"Registered shm [client_id={client_id}, object_id={object_id}, shm_map={shm_map}]")
        return nos_service_pb2.GenericResponse(response_bytes=dumps(shm_map))
    except Exception as e:
        logger.error(f"Failed to register system shared memory: {e}")
        context.abort(grpc.StatusCode.INTERNAL, str(e))

UnregisterSystemSharedMemory

UnregisterSystemSharedMemory(request: GenericRequest, context: ServicerContext) -> GenericResponse

Unregister system shared memory for specific namespace <client_id>/<object_id>.

Source code in nos/server/_service.py
def UnregisterSystemSharedMemory(
    self, request: nos_service_pb2.GenericRequest, context: grpc.ServicerContext
) -> nos_service_pb2.GenericResponse:
    """Unregister system shared memory for specific namespace `<client_id>/<object_id>`."""
    if not NOS_SHM_ENABLED:
        context.abort(context, grpc.StatusCode.UNIMPLEMENTED, "Shared memory not enabled.")

    metadata = dict(context.invocation_metadata())
    client_id = metadata.get("client_id", None)
    object_id = metadata.get("object_id", None)
    namespace = f"{client_id}/{object_id}"
    # TODO (spillai): Currently, we can ignore the `request` provided
    # by the client, since all the shared memory segments under the namespace are deleted.
    logger.debug(f"Unregistering shm [client_id={client_id}, object_id={object_id}]")
    try:
        self.shm_manager.cleanup(namespace=namespace)
    except Exception as e:
        logger.error(f"Failed to unregister shm [e{e}]")
        context.abort(grpc.StatusCode.INTERNAL, str(e))
    return nos_service_pb2.GenericResponse()

UploadFile

UploadFile(request_iterator: Any, context: ServicerContext) -> GenericResponse

Upload a file.

Source code in nos/server/_service.py
def UploadFile(self, request_iterator: Any, context: grpc.ServicerContext) -> nos_service_pb2.GenericResponse:
    """Upload a file."""
    for _chunk_idx, chunk_request in enumerate(request_iterator):
        chunk = loads(chunk_request.request_bytes)
        chunk_bytes = chunk["chunk_bytes"]
        path = Path(chunk["filename"])
        if str(path) not in self._tmp_files:
            tmp_file = NamedTemporaryFile(delete=False, dir="/tmp", suffix=path.suffix)
            self._tmp_files[str(path)] = tmp_file
            logger.debug(
                f"Streaming upload [path={tmp_file.name}, size={Path(tmp_file.name).stat().st_size / (1024 * 1024):.2f} MB]"
            )
        else:
            tmp_file = self._tmp_files[str(path)]
        with open(tmp_file.name, "ab") as f:
            f.write(chunk_bytes)
    return nos_service_pb2.GenericResponse(response_bytes=dumps({"filename": tmp_file.name}))

DeleteFile

DeleteFile(request: GenericRequest, context: ServicerContext) -> Empty

Delete a file by its file-identifier.

Source code in nos/server/_service.py
def DeleteFile(self, request: nos_service_pb2.GenericRequest, context: grpc.ServicerContext) -> empty_pb2.Empty:
    """Delete a file by its file-identifier."""
    request = loads(request.request_bytes)

    filename = str(request["filename"])
    try:
        tmp_file = self._tmp_files[str(filename)]
        path = Path(tmp_file.name)
        assert path.exists(), f"File handle {filename} not found"
    except Exception as e:
        err_msg = f"Failed to delete file [filename={filename}, e={e}]"
        logger.error(err_msg)
        context.abort(grpc.StatusCode.NOT_FOUND, err_msg)

    logger.debug(f"Deleting file [path={path}]")
    path.unlink()
    del self._tmp_files[str(filename)]
    return empty_pb2.Empty()

Run async

Run(request: GenericRequest, context: ServicerContext) -> GenericResponse

Main model prediction interface.

Source code in nos/server/_service.py
async def Run(
    self, request: nos_service_pb2.GenericRequest, context: grpc.ServicerContext
) -> nos_service_pb2.GenericResponse:
    """Main model prediction interface."""
    request: Dict[str, Any] = loads(request.request_bytes)
    try:
        st = time.perf_counter()
        logger.info(f"Executing request [model={request['id']}, method={request['method']}]")
        response = await self.execute_model(request["id"], method=request["method"], inputs=request["inputs"])
        logger.info(
            f"Executed request [model={request['id']}, method={request['method']}, elapsed={(time.perf_counter() - st) * 1e3:.1f}ms]"
        )
        return nos_service_pb2.GenericResponse(response_bytes=dumps(response))
    except (grpc.RpcError, Exception) as e:
        msg = f"Failed to execute request [model={request['id']}, method={request['method']}]"
        msg += f"{traceback.format_exc()}"
        logger.error(f"{msg}, e={e}")
        context.abort(grpc.StatusCode.INTERNAL, "Internal Server Error")

Stream async

Stream(request: GenericRequest, context: ServicerContext) -> Iterator[GenericResponse]

Main streaming model prediction interface.

Source code in nos/server/_service.py
async def Stream(
    self, request: nos_service_pb2.GenericRequest, context: grpc.ServicerContext
) -> Iterator[nos_service_pb2.GenericResponse]:
    """Main streaming model prediction interface."""
    request: Dict[str, Any] = loads(request.request_bytes)
    try:
        logger.info(f"Executing request [model={request['id']}, method={request['method']}]")
        response_stream = await self.execute_model(
            request["id"], method=request["method"], inputs=request["inputs"], stream=True
        )
        for response in response_stream:
            yield nos_service_pb2.GenericResponse(response_bytes=dumps(response))
        logger.info(f"Executed request [model={request['id']}, method={request['method']}]")
    except (grpc.RpcError, Exception) as e:
        msg = f"Failed to execute request [model={request['id']}, method={request['method']}]"
        msg += f"{traceback.format_exc()}"
        logger.error(f"{msg}, e={e}")
        context.abort(grpc.StatusCode.INTERNAL, "Internal Server Error")

nos.server._service.async_serve

async_serve(address: str = DEFAULT_GRPC_ADDRESS, max_workers: int = GRPC_MAX_WORKER_THREADS, wait_for_termination: bool = True, catalog: str = None)

Start the gRPC server.

Source code in nos/server/_service.py
def async_serve(
    address: str = DEFAULT_GRPC_ADDRESS,
    max_workers: int = GRPC_MAX_WORKER_THREADS,
    wait_for_termination: bool = True,
    catalog: str = None,
):
    """Start the gRPC server."""
    import asyncio

    loop = asyncio.new_event_loop()
    task = loop.create_task(async_serve_impl(address, wait_for_termination, catalog))
    loop.run_until_complete(task)
    return task.result()