Skip to content

Quickstart

Installation

pip install smartjob

Very basic usage as a library

Note

In this tutorial, we only use Cloud Run Jobs but ideas and concepts are exactly the sames with Vertex AI CustomJobs.

Let's start to use the library with simple synchronous Python code:

from smartjob import SmartJob, get_executor_service
from smartjob.app.execution import ExecutionConfig

# Get a JobExecutorService object (for executing jobs with Cloud Run Jobs)
job_executor_service = get_executor_service("cloudrun")

# Let's define a Cloud Run job that runs a Python 3.12 container with the command "python --version"
job = SmartJob(
    name="foo", docker_image="python:3.12", overridden_args=["python", "--version"]
)

# Let's define an ExecutionConfig object
execution_config = ExecutionConfig(
    project="your-gcp-project",
    region="us-east1",
    staging_bucket="gs://a-bucket-name",
)

# Let's execute the job (and wait for the result) in a blocking synchronous way
result = job_executor_service.run(job, execution_config=execution_config)

# Let's print the execution result
# => it will print something like:
# ExecutionResult(
#     job_name=foo, job_namespace=default,
#     execution_id=ce5dc480b392434a833a7dbbfae8dcd9,
#     state=SUCCESS, duration_seconds=14,
#     log_url=https://...
# )
print(result)

Launch 10 jobs in parallel

import concurrent.futures

import stlog

from smartjob import SmartJob, get_executor_service
from smartjob.app.execution import ExecutionConfig
from smartjob.app.executor import ExecutionResult

# Get a JobExecutorService object
job_executor_service = get_executor_service(type="cloudrun")

# Let's define an ExecutionConfig object
execution_config = ExecutionConfig(
    project="your-gcp-project",
    region="us-east1",
    staging_bucket="gs://a-bucket-name",
)

if __name__ == "__main__":
    stlog.setup(level="INFO")  # setup a better logging

    # Let's define a Cloud Run job that runs a Python 3.12 container with the command "python --version"
    job = SmartJob(
        name="foo", docker_image="python:3.12", overridden_args=["python", "--version"]
    )

    # Let's launch 10 jobs (in parallel!) and get 10 futures on the results
    futures: list[concurrent.futures.Future[ExecutionResult]] = []
    for i in range(10):
        scheduling_result, execution_result_future = job_executor_service.schedule(
            job, execution_config=execution_config, add_envs={"JOB_NUMBER": str(i)}
        )
        print(
            f"You can follow job: {scheduling_result.execution_id} at {scheduling_result.log_url}"
        )
        assert execution_result_future is not None
        futures.append(execution_result_future)

    # Let's wait for all the results
    # (this is blocking until all the jobs are done)
    waited_futures = concurrent.futures.wait(futures)

    # Let's print the execution results for each job
    for f in waited_futures.done:
        result: ExecutionResult = f.result()
        print(f"Job: {result.execution_id} => {result.success}")

Let's execute a local script in a Cloud Run Job without rebuilding/repushing a docker image

Let's start with a basic Python local script local_script.py:

import os

execution_id = os.environ.get("EXECUTION_ID")

print("Hello world!")
print("I'm going to be executed in a (remote) Cloud Run Job")
print(f"My unique execution id is {execution_id}")

Then, let's use the SmartJob library to upload/execute it:

import stlog

from smartjob import SmartJob, get_executor_service
from smartjob.app.execution import ExecutionConfig

# Setup logging
stlog.setup(level="INFO")

# Get a JobExecutorService object
job_executor_service = get_executor_service("cloudrun")

# Let's define an ExecutionConfig object
execution_config = ExecutionConfig(
    project="your-gcp-project",
    region="us-east1",
    staging_bucket="gs://a-bucket-name",
)

# Let's define a Cloud Run job that runs a local Python script
# (that will be automatically uploaded) into a Python 3.12 container
job = SmartJob(
    name="foo", docker_image="python:3.12", python_script_path="./local_script.py"
)

# Let's execute the job (and wait for the result) in a blocking synchronous way
result = job_executor_service.run(job, execution_config=execution_config)

# Let's print the execution result
print(result)

Adding input/output to the job

Let's start with a basic Python local script local_script2.py:

import hashlib
import json
import os

# These 2 environment variables are automatically injected by SmartJob
# They are pointing to a local mount of the staging bucket
# input_path and output_path are two unique directories specific
# to this job execution
input_path = os.environ.get("INPUT_PATH")
output_path = os.environ.get("OUTPUT_PATH")

# Let's read the input file: 'my-input-file'
with open(f"{input_path}/my-input-file", "rb") as f:
    content = f.read()

# Compute the hash of the input file
h = hashlib.sha1(content).hexdigest()

# Let's write the hash to the output file
# Note: you can write any file you want in the output directory
#       but the smartjob.json file is a special file that will be
#       automatically parsed by the SmartJob lib and the content will be
#       injected in the Python result object.
with open(f"{output_path}/smartjob.json", "w") as f:
    json.dump({"hash": h}, f)

Then, the corresponding SmartJob code to execute it.

import stlog

from smartjob import SmartJob, get_executor_service
from smartjob.app.execution import ExecutionConfig
from smartjob.app.input import LocalPathInput

# Setup logging
stlog.setup(level="INFO")

# Get a JobExecutorService object
job_executor_service = get_executor_service("cloudrun")

# Let's define an ExecutionConfig object
execution_config = ExecutionConfig(
    project="your-gcp-project",
    region="us-east1",
    staging_bucket="gs://a-bucket-name",
)

# Let's define a Cloud Run job that runs a local Python script
# (that will be automatically uploaded) into a Python 3.12 container
job = SmartJob(
    name="foo", docker_image="python:3.12", python_script_path="./local_script2.py"
)

# Let's execute the job (and wait for the result) in a blocking synchronous way
# and pass an input as a local file
#
# 'my-input-file' will be the filename in the input directory of the container
# './local_script2.py' is the local path to the input file (you want to send)
# (here we are sending the same script as input)
result = job_executor_service.run(
    job,
    execution_config=execution_config,
    inputs=[LocalPathInput("my-input-file", "./local_script2.py")],
)

# Let's print the execution result
# => It will print something like:
# ExecutionResult(
#     job_name=foo, job_namespace=default,
#     execution_id=48d23af998f54f0a812cdea85429850a,
#     state=SUCCESS, duration_seconds=21,
#     log_url=https://...,
#     json_output={
#         "hash": "087c01ffad8fb2cd580c63896e0c75d9ac6b028d"
#     }
# )
print(result)

What about the CLI tool?

Let's do the same but with the CLI tool:

# Note: you can also use CLI options but it's probably less convenient
export SMARTJOB_PROJECT="your-gcp-project"
export SMARTJOB_REGION="us-east1"
export SMARTJOB_STAGING_BUCKET="gs://a-bucket-name" 

smartjob run --executor=cloudrun --docker-image docker.io/python:3.12 --python-script-path ./local_script.py foo

And if we need more CPU/RAM to execute this job?

smartjob run --executor=cloudrun --docker-image docker.io/python:3.12 --python-script-path ./local_script.py --cpu 2 --memory-gb 4 foo