Skip to content

SmartJob Executor Service

To get a smartjob.ExecutorService use the function smartjob.get_executor_service.

Then you can use the two methods on smartjob.ExecutorService to execute smartjob.SmartJob jobs (depending on your use-case).

These methods will return a smartjob.ExecutionResult object (when the job is fully executed) or a future on it.

Getting the executor service

smartjob.get_executor_service

Return a singleton instance of ExecutorService (initialized on first call with given arguments).

Parameters:

Name Type Description Default
type str

The type of executor to use ('cloudrun', 'cloudbatch', 'vertex' or 'docker').

'cloudrun'
max_workers int

Maximum number of workers

DEFAULT_MAX_WORKERS
use_cache bool

if set to True, the same instance will be returned for the same type.

True

Returns:

Type Description
ExecutorService

Instance of ExecutorService.

Source code in smartjob/infra/controllers/lib.py
def get_executor_service(
    type: str = "cloudrun",
    max_workers: int = DEFAULT_MAX_WORKERS,
    use_cache: bool = True,
) -> ExecutorService:
    """Return a singleton instance of ExecutorService (initialized on first call with given arguments).

    Args:
        type: The type of executor to use ('cloudrun', 'cloudbatch', 'vertex' or 'docker').
        max_workers: Maximum number of workers
        use_cache: if set to True, the same instance will be returned for the same type.

    Returns:
        Instance of ExecutorService.

    """
    adapter: ExecutorPort
    storage_adapter: StoragePort
    if type == "cloudrun":
        from smartjob.infra.adapters.executor.cloudrun import CloudRunExecutorAdapter

        adapter = CloudRunExecutorAdapter(max_workers=max_workers)
        storage_adapter = GcsStorageAdapter()
    elif type == "cloudbatch":
        from smartjob.infra.adapters.executor.cloudbatch import (
            CloudBatchExecutorAdapter,
        )

        adapter = CloudBatchExecutorAdapter(max_workers=max_workers)
        storage_adapter = GcsStorageAdapter()
    elif type == "vertex":
        from smartjob.infra.adapters.executor.vertex import VertexExecutorAdapter

        adapter = VertexExecutorAdapter(max_workers=max_workers)
        storage_adapter = GcsStorageAdapter()
    elif type == "docker":
        from smartjob.infra.adapters.executor.docker import DockerExecutorAdapter

        adapter = DockerExecutorAdapter(max_workers=max_workers)
        storage_adapter = DockerStorageAdapter()
    else:
        raise ValueError(
            f"Invalid executor type: {type} => must be 'cloudrun', 'cloudbatch', 'vertex' or 'docker'"
        )
    if not use_cache or type not in __cache:
        __cache[type] = ExecutorService(
            adapter=adapter,
            storage_service=StorageService(adapter=storage_adapter),
        )
    return __cache[type]

ExecutorService object

smartjob.ExecutorService dataclass

Source code in smartjob/app/executor.py
@dataclass
class ExecutorService:
    adapter: ExecutorPort
    storage_service: StorageService
    executor_name: str = field(default="", init=False)

    def __post_init__(self):
        self.executor_name = self.adapter.get_name()

    def _get_input_path(self, execution: Execution) -> str:
        return f"{self.adapter.staging_mount_path(execution)}/{execution.input_relative_path}"

    def _get_output_path(self, execution: Execution) -> str:
        return f"{self.adapter.staging_mount_path(execution)}/{execution.output_relative_path}"

    def _update_execution_env(self, execution: Execution):
        execution.add_envs["INPUT_PATH"] = self._get_input_path(execution)
        execution.add_envs["OUTPUT_PATH"] = self._get_output_path(execution)
        execution.add_envs["EXECUTION_ID"] = execution.id

    def _upload_python_script_if_needed_and_update_overridden_args(
        self, execution: Execution
    ):
        job = execution.job
        if not job.python_script_path:
            return
        with open(job.python_script_path) as f:
            content = f.read()
        destination_path = f"{execution.base_dir}/input/script.py"
        logger.info(
            "Uploading python script (%s) to %s/%s...",
            job.python_script_path,
            execution.config._staging_bucket,
            destination_path,
        )
        self.storage_service.upload(
            content.encode("utf8"),
            execution.config._staging_bucket_name,
            destination_path,
        )
        logger.debug(
            "Done uploading python script (%s) to %s/%s",
            job.python_script_path,
            execution.config.staging_bucket,
            destination_path,
        )
        execution.overridden_args = [
            "python",
            f"{self.adapter.staging_mount_path(execution)}/{destination_path}",
        ]

    def _replace_overridden_args_placeholders(self, execution: Execution):
        input_path = (
            f"{self.adapter.staging_mount_path(execution)}/{execution.base_dir}/input"
        )
        execution.overridden_args = [
            x.replace("{{INPUT}}", input_path) for x in execution.overridden_args
        ]

    def _upload_input(self, execution: Execution, input: Input):
        path = f"{execution.config._staging_bucket}/{execution.input_relative_path}/{input.filename}"
        logger.info(f"Uploading input to {path}...")
        input._create(
            execution.config._staging_bucket_name,
            execution.input_relative_path,
            self.storage_service,
        )
        logger.debug(f"Done uploading input: {path}")

    def _upload_inputs(self, execution: Execution):
        inputs = execution.inputs
        for input in inputs:
            self._upload_input(execution, input)

    def _create_input_output_paths_if_needed(self, execution: Execution):
        logger.info(
            "Creating input path %s/%s/...",
            execution.config._staging_bucket,
            execution.input_relative_path,
        )
        self.storage_service.upload(
            b"",
            execution.config._staging_bucket_name,
            execution.input_relative_path + "/",
        )
        logger.info(
            "Creating output path %s/%s/...",
            execution.config._staging_bucket,
            execution.output_relative_path,
        )
        self.storage_service.upload(
            b"",
            execution.config._staging_bucket_name,
            execution.output_relative_path + "/",
        )
        logger.debug("Done creating input/output paths")

    def _prepare(self, execution: Execution):
        self._update_execution_env(execution)
        self._create_input_output_paths_if_needed(execution)
        self._upload_python_script_if_needed_and_update_overridden_args(execution)
        self._replace_overridden_args_placeholders(execution)
        self._upload_inputs(execution)

    def _schedule(
        self,
        job: SmartJob,
        execution_config: ExecutionConfig,
        add_envs: dict[str, str] | None = None,
        inputs: list[Input] | None = None,
        forget: bool = False,
    ) -> tuple[SchedulingDetails, concurrent.futures.Future[ExecutionResult] | None]:
        execution = Execution(
            job,
            overridden_args=list(job.overridden_args),
            add_envs={**job.add_envs, **(add_envs or {})},
            config=execution_config,
            inputs=inputs or [],
        )
        with LogContext.bind(execution_id=execution.id):
            logger.info("Preparing the smartjob execution...")
            self._prepare(execution)
            logger.debug("Smartjob execution prepared")
            logger.info("Scheduling the smartjob execution...")
            scheduling_result, future_or_none = self.adapter.schedule(execution, forget)
            logger.debug("Smartjob execution scheduled")
            if future_or_none is None:
                return scheduling_result, None
            return scheduling_result, ExecutionResultFuture(
                log_context=LogContext.getall(),
                storage_service=self.storage_service,
                execution=execution,
                future=future_or_none,
            )

    def schedule(
        self,
        job: SmartJob,
        add_envs: dict[str, str] | None = None,
        inputs: list[Input] | None = None,
        execution_config: ExecutionConfig | None = None,
        forget: bool = False,
    ) -> tuple[SchedulingDetails, concurrent.futures.Future[ExecutionResult] | None]:
        """Schedule a job.

        This method returns when the job is scheduled (not when it is finished!).

        If we can't schedule the job an exception is raised.

        If forget=False (default), we return a first object about scheduling details
        and a second object about the future result of the job execution.

        If forget=True, we return only the first object about scheduling details
        (and None for the second one).

        Arguments:
            job: The job to run.
            add_envs: Environment variables to add for this particular execution.
            inputs: Inputs to add for this particular execution.
            execution_config: Execution configuration.
            forget: if True, don't return a future on ExecutionResult (but None).

        Returns:
            The result of the job scheduling and (optionally) a future object about the result of the job execution.

        """
        execution_config = execution_config or ExecutionConfig()
        execution_config.fix_for_executor_name(self.executor_name)
        with LogContext.bind(
            job_name=job.name,
            job_namespace=job.namespace,
        ):
            try:
                for attempt in Retrying(
                    reraise=True,
                    wait=wait_exponential(multiplier=1, min=1, max=300),
                    stop=stop_after_attempt(
                        execution_config._retry_config._max_attempts_schedule
                    ),
                ):
                    with attempt:
                        with LogContext.bind(
                            attempt=attempt.retry_state.attempt_number
                        ):
                            schedule_result, future_or_none = self._schedule(
                                job,
                                execution_config=execution_config,
                                add_envs=add_envs,
                                inputs=inputs,
                                forget=forget,
                            )
                            logger.info(
                                "Smartjob execution scheduled",
                                log_url=schedule_result.log_url,
                            )
                            LogContext.remove("attempt")
                            LogContext.add(execution_id=schedule_result.execution_id)
                            return schedule_result, future_or_none
            except RetryError:
                raise
            raise Exception("Unreachable code")

    def run(
        self,
        job: SmartJob,
        add_envs: dict[str, str] | None = None,
        inputs: list[Input] | None = None,
        execution_config: ExecutionConfig | None = None,
    ) -> ExecutionResult:
        """Schedule a job and wait for its completion.

        Arguments:
            job: The job to run.
            add_envs: Environment variables to add for this particular execution.
            inputs: Inputs to add for this particular execution.
            execution_config: Execution configuration.

        Returns:
            The result of the job execution.

        """
        execution_config = execution_config or ExecutionConfig()
        execution_config.fix_timeout_config()
        _, execution_result = self.schedule(
            job,
            add_envs=add_envs,
            inputs=inputs,
            execution_config=execution_config,
            forget=False,
        )
        assert execution_result is not None  # Can't be None because forget=False
        result = execution_result.result()  # FIXME: timeout
        return result

run(job, add_envs=None, inputs=None, execution_config=None)

Schedule a job and wait for its completion.

Parameters:

Name Type Description Default
job SmartJob

The job to run.

required
add_envs dict[str, str] | None

Environment variables to add for this particular execution.

None
inputs list[Input] | None

Inputs to add for this particular execution.

None
execution_config ExecutionConfig | None

Execution configuration.

None

Returns:

Type Description
ExecutionResult

The result of the job execution.

Source code in smartjob/app/executor.py
def run(
    self,
    job: SmartJob,
    add_envs: dict[str, str] | None = None,
    inputs: list[Input] | None = None,
    execution_config: ExecutionConfig | None = None,
) -> ExecutionResult:
    """Schedule a job and wait for its completion.

    Arguments:
        job: The job to run.
        add_envs: Environment variables to add for this particular execution.
        inputs: Inputs to add for this particular execution.
        execution_config: Execution configuration.

    Returns:
        The result of the job execution.

    """
    execution_config = execution_config or ExecutionConfig()
    execution_config.fix_timeout_config()
    _, execution_result = self.schedule(
        job,
        add_envs=add_envs,
        inputs=inputs,
        execution_config=execution_config,
        forget=False,
    )
    assert execution_result is not None  # Can't be None because forget=False
    result = execution_result.result()  # FIXME: timeout
    return result

schedule(job, add_envs=None, inputs=None, execution_config=None, forget=False)

Schedule a job.

This method returns when the job is scheduled (not when it is finished!).

If we can't schedule the job an exception is raised.

If forget=False (default), we return a first object about scheduling details and a second object about the future result of the job execution.

If forget=True, we return only the first object about scheduling details (and None for the second one).

Parameters:

Name Type Description Default
job SmartJob

The job to run.

required
add_envs dict[str, str] | None

Environment variables to add for this particular execution.

None
inputs list[Input] | None

Inputs to add for this particular execution.

None
execution_config ExecutionConfig | None

Execution configuration.

None
forget bool

if True, don't return a future on ExecutionResult (but None).

False

Returns:

Type Description
tuple[SchedulingDetails, Future[ExecutionResult] | None]

The result of the job scheduling and (optionally) a future object about the result of the job execution.

Source code in smartjob/app/executor.py
def schedule(
    self,
    job: SmartJob,
    add_envs: dict[str, str] | None = None,
    inputs: list[Input] | None = None,
    execution_config: ExecutionConfig | None = None,
    forget: bool = False,
) -> tuple[SchedulingDetails, concurrent.futures.Future[ExecutionResult] | None]:
    """Schedule a job.

    This method returns when the job is scheduled (not when it is finished!).

    If we can't schedule the job an exception is raised.

    If forget=False (default), we return a first object about scheduling details
    and a second object about the future result of the job execution.

    If forget=True, we return only the first object about scheduling details
    (and None for the second one).

    Arguments:
        job: The job to run.
        add_envs: Environment variables to add for this particular execution.
        inputs: Inputs to add for this particular execution.
        execution_config: Execution configuration.
        forget: if True, don't return a future on ExecutionResult (but None).

    Returns:
        The result of the job scheduling and (optionally) a future object about the result of the job execution.

    """
    execution_config = execution_config or ExecutionConfig()
    execution_config.fix_for_executor_name(self.executor_name)
    with LogContext.bind(
        job_name=job.name,
        job_namespace=job.namespace,
    ):
        try:
            for attempt in Retrying(
                reraise=True,
                wait=wait_exponential(multiplier=1, min=1, max=300),
                stop=stop_after_attempt(
                    execution_config._retry_config._max_attempts_schedule
                ),
            ):
                with attempt:
                    with LogContext.bind(
                        attempt=attempt.retry_state.attempt_number
                    ):
                        schedule_result, future_or_none = self._schedule(
                            job,
                            execution_config=execution_config,
                            add_envs=add_envs,
                            inputs=inputs,
                            forget=forget,
                        )
                        logger.info(
                            "Smartjob execution scheduled",
                            log_url=schedule_result.log_url,
                        )
                        LogContext.remove("attempt")
                        LogContext.add(execution_id=schedule_result.execution_id)
                        return schedule_result, future_or_none
        except RetryError:
            raise
        raise Exception("Unreachable code")

ExecutionResult classes

smartjob.SchedulingDetails dataclass

SchedulingDetails holds some data/details about the scheduling of a job.

Attributes:

Name Type Description
created

The datetime when the job has started.

execution_id str

The execution id of the job.

job_name str

The name of the job.

job_namespace str

The namespace of the job.

log_url str

The execution log url.

Source code in smartjob/app/executor.py
@dataclass
class SchedulingDetails:
    """SchedulingDetails holds some data/details about the scheduling of a job.

    Attributes:
        created: The datetime when the job has started.
        execution_id: The execution id of the job.
        job_name: The name of the job.
        job_namespace: The namespace of the job.
        log_url: The execution log url.

    """

    scheduled_date: datetime.datetime = field(
        init=False,
        default_factory=lambda: datetime.datetime.now(tz=datetime.timezone.utc),
    )
    execution_id: str
    log_url: str

smartjob.ExecutionResult dataclass

Bases: _ExecutionResult

ExecutionResult is the (final) result of a job execution.

Attributes:

Name Type Description
success

Whether the job has succeeded or not.

created

The datetime when the job has started.

stopped

The datetime when the job has stopped.

execution_id

The execution id of the job.

job_name

The name of the job.

job_namespace

The namespace of the job.

log_url

The execution log url.

json_output dict | list | str | float | int | bool | None

if the job has created a json file named smartjob.json in the output directory, it will be stored/decoded here.

Source code in smartjob/app/executor.py
@dataclass
class ExecutionResult(_ExecutionResult):
    """ExecutionResult is the (final) result of a job execution.

    Attributes:
        success: Whether the job has succeeded or not.
        created: The datetime when the job has started.
        stopped: The datetime when the job has stopped.
        execution_id: The execution id of the job.
        job_name: The name of the job.
        job_namespace: The namespace of the job.
        log_url: The execution log url.
        json_output: if the job has created a json file named smartjob.json in the output directory, it will be stored/decoded here.

    """

    json_output: dict | list | str | float | int | bool | None = None

    def __str__(self) -> str:
        if self.success:
            state = "SUCCESS"
        else:
            state = "FAILURE"
        if self.json_output is not None:
            json_output = json.dumps(self.json_output, indent=4)
        else:
            json_output = "None"
        res = f"""ExecutionResult(
    job_name={self.job_name}, job_namespace={self.job_namespace},
    execution_id={self.execution_id},
    state={state}, duration_seconds={self.duration_seconds},
    json_output={json_output}
)"""
        return res

    @classmethod
    def _from_execution_result(
        cls,
        execution_result: _ExecutionResult,
        json_output: dict | list | str | float | int | bool | None,
    ) -> "ExecutionResult":
        return cls(
            success=execution_result.success,
            created=execution_result.created,
            stopped=execution_result.stopped,
            execution_id=execution_result.execution_id,
            job_name=execution_result.job_name,
            job_namespace=execution_result.job_namespace,
            log_url=execution_result.log_url,
            json_output=json_output,
        )