Launch a new cluster and run a Databricks notebook¶
Notebook named example.ipynb on Databricks which accepts a name parameter:
name=dbutils.widgets.get("name")message=f"Don't worry {name}, I got your request! Welcome to prefect-databricks!"print(message)
Prefect flow that launches a new cluster to run example.ipynb:
fromprefectimportflowfromprefect_databricksimportDatabricksCredentialsfromprefect_databricks.jobsimportjobs_runs_submitfromprefect_databricks.models.jobsimport(AutoScale,AwsAttributes,JobTaskSettings,NotebookTask,NewCluster,)@flowdefjobs_runs_submit_flow(notebook_path,**base_parameters):databricks_credentials=DatabricksCredentials.load("my-block")# specify new cluster settingsaws_attributes=AwsAttributes(availability="SPOT",zone_id="us-west-2a",ebs_volume_type="GENERAL_PURPOSE_SSD",ebs_volume_count=3,ebs_volume_size=100,)auto_scale=AutoScale(min_workers=1,max_workers=2)new_cluster=NewCluster(aws_attributes=aws_attributes,autoscale=auto_scale,node_type_id="m4.large",spark_version="10.4.x-scala2.12",spark_conf={"spark.speculation":True},)# specify notebook to use and parameters to passnotebook_task=NotebookTask(notebook_path=notebook_path,base_parameters=base_parameters,)# compile job task settingsjob_task_settings=JobTaskSettings(new_cluster=new_cluster,notebook_task=notebook_task,task_key="prefect-task")run=jobs_runs_submit(databricks_credentials=databricks_credentials,run_name="prefect-job",tasks=[job_task_settings])returnrunjobs_runs_submit_flow("/Users/username@gmail.com/example.ipynb",name="Marvin")
Note, instead of using the built-in models, you may also input valid JSON. For example, AutoScale(min_workers=1, max_workers=2) is equivalent to {"min_workers": 1, "max_workers": 2}.