integration.vertex
initialize_comet_logger¶
comet_ml.integration.vertex.initialize_comet_logger(
experiment: BaseExperiment, pipeline_job_name: str,
pipeline_task_name: str, pipeline_task_uuid: str) -> BaseExperiment
Logs the Vertex task identifiers needed to track your pipeline status in Comet.ml. You need to call this function from every component you want to track with Comet.ml.
Deprecated: If you are using CometVertexPipelineLogger.track_task, you don't need to use this function anymore.
Args:
- experiment: An already created Experiment object. pipeline_job_name: string. The Vertex pipeline job name, see below how to get it automatically from Vertex. pipeline_task_name: string. The Vertex task name, see below how to get it automatically from Vertex.
- pipeline_task_uuid: string. The Vertex task unique id, see below how to get it automatically from Vertex.
For example: ```python @kfp.dsl.v2.component def my_component() -> None: import comet_ml.integration.vertex
experiment = comet_ml.Experiment() pipeline_run_name = "{{$.pipeline_job_name}}"
pipeline_task_name = "{{$.pipeline_task_name}}" pipeline_task_id =
"{{$.pipeline_task_uuid}}"
comet_ml.integration.Vertex.initialize_comet_logger(experiment, pipeline_run_name,
pipeline_task_name, pipeline_task_id)
```
comet_logger_component¶
python
comet_ml.integration.vertex.comet_logger_component(
api_key: Optional[str] = None, project_name: Optional[str] = None,
workspace: Optional[str] = None,
packages_to_install: Optional[List[str]] = None,
base_image: Optional[str] = None,
custom_experiment: Optional[BaseExperiment] = None) -> Any
Inject the Comet Logger component which continuously track and report the current pipeline status to Comet.ml.
Deprecated: Use CometVertexPipelineLogger instead.
Args:
api_key: string, optional. Your Comet API Key, if not provided, the value set in the configuration system will be used.
project_name: string, optional. The project name where all pipeline tasks are logged. If not provided, the value set in the configuration system will be used.
workspace: string, optional. The workspace name where all pipeline tasks are logged. If not provided, the value set in the configuration system will be used.
packages_to_install: List of string, optional. Which packages to install, given directly to
kfp.components.create_component_from_func
. Default is ["google-cloud-aiplatform", "comet_ml"].base_image: string, optional. Which docker image to use. If not provided, the default Kubeflow base image will be used.
custom_experiment: Experiment, optional. The Comet Experiment with custom configuration which you can provide to be used instead of Experiment which would be implicitly created with default options.
Example:
@dsl.pipeline(name='ML training pipeline')
def ml_training_pipeline():
import comet_ml.integration.vertex
comet_ml.integration.vertex.comet_logger_component()
CometVertexPipelineLogger¶
class comet_ml.integration.vertex.CometVertexPipelineLogger(self, api_key: Optional[str] = None, workspace: Optional[str] = None, project_name: Optional[str] = None, packages_to_install: Optional[List[str]] = None, base_image: Optional[str] = None, custom_experiment: Optional["comet_ml.BaseExperiment"] = None, share_api_key_to_workers: bool = False)
Creates a local experiment for tracking vertex pipeline work and provides an API to track vertex tasks with their own Comet experiments.
comet_ml.integration.vertex.CometVertexPipelineLogger.init¶
__init__(api_key: Optional[str] = None, workspace: Optional[str] = None,
project_name: Optional[str] = None,
packages_to_install: Optional[List[str]] = None,
base_image: Optional[str] = None,
custom_experiment: Optional["comet_ml.BaseExperiment"] = None,
share_api_key_to_workers: bool = False) -> None
Args:
api_key: str (optional). Your Comet API Key, if not provided, the value set in the configuration system will be used.
project_name: str (optional). The project name where all pipeline tasks are logged. If not provided, the value set in the configuration system will be used.
workspace: str (optional). The workspace name where all pipeline tasks are logged. If not provided, the value set in the configuration system will be used.
packages_to_install: List[str] (optional). Which packages to install, given directly to
kfp.components.create_component_from_func
. Default is["google-cloud-aiplatform", "comet_ml"]
.base_image: str (optional). Which docker image to use. If not provided, the default Kubeflow base image will be used.
custom_experiment: Experiment (optional). The Comet Experiment with custom configuration which you can provide to be used instead of Experiment which would be implicitly created with default options.
share_api_key_to_workers: boolean (optional), if
True
, Comet API key will be shared with workers by setting COMET_API_KEY environment variable. This is an unsafe solution and we recommend you to use a more secure way to set up your API Key in your cluster.
Example:
@dsl.pipeline(name='ML training pipeline')
def ml_training_pipeline():
import comet_ml.integration.vertex
logger = comet_ml.integration.vertex.CometVertexPipelineLogger()
data_preprocessing_op = components.load_component_from_file("data_preprocessing.yaml")
comet_ml.integration.vertex.CometVertexPipelineLogger.track_task¶
track_task(task: Any, additional_environment: Optional[Dict[str,
str]] = None) -> Any
Inject all required information to track the given Vertex task with Comet. You still need to create an experiment inside that task.
Args:
task: (required) The Vertex task to be tracked with Comet.
additional_environment: Dict[str, str] (optional) A dictionary of additional environment variables to be set up in the tracked task.
Example:
def data_preprocessing(input: str) -> str:
from comet_ml import Experiment
# The `track_task` method automatically injects the workspace name and project name.
# If `share_api_key_to_workers` is set to True, the Comet API Key can also be injected
# by `CometVertexPipelineLogger`.
# All Vertex information is automatically logged to the Experiment when the task is
# wrapped with the `track_task` method.
experiment = Experiment()
for i in range(60):
experiment.log_metric("accuracy", math.log(i + random.random())) time.sleep(0.1)
experiment.end()
return input
@dsl.pipeline(name='ML training pipeline')
def ml_training_pipeline():
import comet_ml.integration.vertex
logger = comet_ml.integration.vertex.CometVertexPipelineLogger()
data_preprocessing_op = kfp.components.create_component_from_func(
func=data_preprocessing, packages_to_install=["comet_ml"]
)
task_1 = logger.track_task(data_preprocessing_op("test"))