Sometimes your flows or tasks will encounter an exception. Prefect captures all exceptions in order to report states to the orchestrator, but we do not hide them from you (unless you ask us to) as your program needs to know if an unexpected error has occurred.
When calling flows or tasks, the exceptions are raised as in normal Python:
fromprefectimportflow,task@taskdefmy_task():raiseValueError()@flowdefmy_flow():try:my_task()exceptValueError:print("Oh no! The task failed.")returnTruemy_flow()
If you would prefer to check for a failed task without using try/except, you may ask Prefect to return the state:
fromprefectimportflow,task@taskdefmy_task():raiseValueError()@flowdefmy_flow():state=my_task(return_state=True)ifstate.is_failed():print("Oh no! The task failed. Falling back to '1'.")result=1else:result=state.result()returnresult+1result=my_flow()assertresult==2
If you retrieve the result from a failed state, the exception will be raised. For this reason, it's often best to check if the state is failed first.
fromprefectimportflow,task@taskdefmy_task():raiseValueError()@flowdefmy_flow():state=my_task(return_state=True)try:result=state.result()exceptValueError:print("Oh no! The state raised the error!")returnTruemy_flow()
When retrieving the result from a state, you can ask Prefect not to raise exceptions:
fromprefectimportflow,task@taskdefmy_task():raiseValueError()@flowdefmy_flow():state=my_task(return_state=True)maybe_result=state.result(raise_on_failure=False)ifisinstance(maybe_result,ValueError):print("Oh no! The task failed. Falling back to '1'.")result=1else:result=maybe_resultreturnresult+1result=my_flow()assertresult==2
When submitting tasks to a runner, Future.result() works the same as State.result():
fromprefectimportflow,task@taskdefmy_task():raiseValueError()@flowdefmy_flow():future=my_task.submit()try:future.result()exceptValueError:print("Ah! Futures will raise the failure as well.")# You can ask it not to raise the exception toomaybe_result=future.result(raise_on_failure=False)print(f"Got {type(maybe_result)}")returnTruemy_flow()
Prefect 2.6.0 added automatic retrieval of persisted results.
Prior to this version, State.result() did not require an await.
For backwards compatibility, when used from an asynchronous context, State.result() returns a raw result type.
You may opt-in to the new behavior by passing fetch=True as shown in the example above.
If you would like this behavior to be used automatically, you may enable the PREFECT_ASYNC_FETCH_STATE_RESULT setting.
If you do not opt-in to this behavior, you will see a warning.
You may also opt-out by setting fetch=False.
This will silence the warning, but you will need to retrieve your result manually from the result type.
When submitting tasks to a runner, the result can be retrieved with the Future.result() method:
The Prefect API does not store your results except in special cases. Instead, the result is persisted to a storage location in your infrastructure and Prefect stores a reference to the result.
The following Prefect features require results to be persisted:
Task cache keys
Flow run retries
If results are not persisted, these features may not be usable.
Persistence of results requires a serializer and a storage location. Prefect sets defaults for these, and you should not need to adjust them until you want to customize behavior. You can configure results on the flow and task decorators with the following options:
persist_result: Whether the result should be persisted to storage.
result_storage: Where to store the result when persisted.
result_serializer: How to convert the result to a storable form.
Persistence of the result of a task or flow can be configured with the persist_result option. The persist_result option defaults to a null value, which will automatically enable persistence if it is needed for a Prefect feature used by the flow or task. Otherwise, persistence is disabled by default.
For example, the following flow has retries enabled. Flow retries require that all task results are persisted, so the task's result will be persisted:
fromprefectimportflow,task@taskdefmy_task():return"hello world!"@flow(retries=2)defmy_flow():# This task does not have persistence toggled off and it is needed for the flow feature,# so Prefect will persist its result at runtimemy_task()
Flow retries do not require the flow's result to be persisted, so it will not be.
In this next example, one task has caching enabled. Task caching requires that the given task's result is persisted:
fromprefectimportflow,taskfromdatetimeimporttimedelta@task(cache_key_fn=lambda:"always",cache_expiration=timedelta(seconds=20))defmy_task():# This task uses caching so its result will be persisted by defaultreturn"hello world!"@taskdefmy_other_task():...@flowdefmy_flow():# This task uses a feature that requires result persistencemy_task()# This task does not use a feature that requires result persistence and the# flow does not use any features that require task result persistence so its# result will not be persisted by defaultmy_other_task()
Persistence of results can be manually toggled on or off:
fromprefectimportflow,task@flow(persist_result=True)defmy_flow():# This flow will persist its result even if not necessary for a feature....@task(persist_result=False)defmy_task():# This task will never persist its result.# If persistence needed for a feature, an error will be raised....
Toggling persistence manually will always override any behavior that Prefect would infer.
You may also change Prefect's default persistence behavior with the PREFECT_RESULTS_PERSIST_BY_DEFAULT setting. To persist results by default, even if they are not needed for a feature change the value to a truthy value:
prefect config set PREFECT_RESULTS_PERSIST_BY_DEFAULT=true
Task and flows with persist_result=False will not persist their results even if PREFECT_RESULTS_PERSIST_BY_DEFAULT is true.
The result storage location can be configured with the result_storage option. The result_storage option defaults to a null value, which infers storage from the context.
Generally, this means that tasks will use the result storage configured on the flow unless otherwise specified.
If there is no context to load the storage from and results must be persisted, results will be stored in the path specified by the PREFECT_LOCAL_STORAGE_PATH setting (defaults to ~/.prefect/storage).
fromprefectimportflow,taskfromprefect.filesystemsimportLocalFileSystem,S3@flow(persist_result=True)defmy_flow():my_task()# This task will use the flow's result storage@task(persist_result=True)defmy_task():...my_flow()# The flow has no result storage configured and no parent, the local file system will be used.# Reconfigure the flow to use a different storage typenew_flow=my_flow.with_options(result_storage=S3(bucket_path="my-bucket"))new_flow()# The flow and task within it will use S3 for result storage.
You can configure this to use a specific storage using one of the following:
A storage instance, e.g. LocalFileSystem(basepath=".my-results")
The path of the result file in the result storage can be configured with the result_storage_key. The result_storage_key option defaults to a null value, which generates a unique identifier for each result.
fromprefectimportflow,taskfromprefect.filesystemsimportLocalFileSystem,S3@flow(result_storage=S3(bucket_path="my-bucket"))defmy_flow():my_task()@task(persist_result=True,result_storage_key="my_task.json")defmy_task():...my_flow()# The task's result will be persisted to 's3://my-bucket/my_task.json'
Result storage keys are formatted with access to all of the modules in prefect.runtime and the run's parameters. In the following example, we will run a flow with three runs of the same task. Each task run will write its result to a unique file based on the name parameter.
The result serializer can be configured with the result_serializer option. The result_serializer option defaults to a null value, which infers the serializer from the context.
Generally, this means that tasks will use the result serializer configured on the flow unless otherwise specified.
If there is no context to load the serializer from, the serializer defined by PREFECT_RESULTS_DEFAULT_SERIALIZER will be used. This setting defaults to Prefect's pickle serializer.
You may configure the result serializer using:
A type name, e.g. "json" or "pickle" — this corresponds to an instance with default values
An instance, e.g. JSONSerializer(jsonlib="orjson")
Prefect provides a CompressedSerializer which can be used to wrap other serializers to provide compression over the bytes they generate. The compressed serializer uses lzma compression by default. We test other compression schemes provided in the Python standard library such as bz2 and zlib, but you should be able to use any compression library that provides compress and decompress methods.
You may configure compression of results using:
A type name, prefixed with compressed/ e.g. "compressed/json" or "compressed/pickle"
An instance e.g. CompressedSerializer(serializer="pickle", compressionlib="lzma")
Note that the "compressed/<serializer-type>" shortcut will only work for serializers provided by Prefect.
If you are using custom serializers, you must pass a full instance.
The Prefect API does not store your results in most cases for the following reasons:
Results can be large and slow to send to and from the API.
Results often contain private information or data.
Results would need to be stored in the database or complex logic implemented to hydrate from another source.
There are a few cases where Prefect will store your results directly in the database. This is an optimization to reduce the overhead of reading and writing to result storage.
The following data types will be stored by the API without persistence to storage:
booleans (True, False)
nulls (None)
If persist_result is set to False, these values will never be stored.
The Prefect API tracks metadata about your results. The value of your result is only stored in specific cases. Result metadata can be seen in the UI on the "Results" page for flows.
When running your workflows, Prefect will keep the results of all tasks and flows in memory so they can be passed downstream. In some cases, it is desirable to override this behavior. For example, if you are returning a large amount of data from a task it can be costly to keep it memory for the entire duration of the flow run.
Flows and tasks both include an option to drop the result from memory with cache_result_in_memory:
@flow(cache_result_in_memory=False)deffoo():return"pretend this is large data"@task(cache_result_in_memory=False)defbar():return"pretend this is biiiig data"
When cache_result_in_memory is disabled, the result of your flow or task will be persisted by default. The result will then be pulled from storage when needed.
@flowdeffoo():result=bar()state=bar(return_state=True)# The result will be retrieved from storage herestate.result()future=bar.submit()# The result will be retrieved from storage herefuture.result()@task(cache_result_in_memory=False)defbar():# This result will persistedreturn"pretend this is biiiig data"
If both cache_result_in_memory and persistence are disabled, your results will not be available downstream.
@task(persist_result=False,cache_result_in_memory=False)defbar():return"pretend this is biiiig data"@flowdeffoo():# Raises an errorresult=bar()# This is oakystate=bar(return_state=True)# Raises an errorstate.result()# This is okayfuture=bar.submit()# Raises an errorfuture.result()
Result storage is responsible for reading and writing serialized data to an external location. At this time, any file system block can be used for result storage.
A result serializer is responsible for converting your Python object to and from bytes. This is necessary to store the object outside of Python and retrieve it later.
Pickle is a standard Python protocol for encoding arbitrary Python objects. We supply a custom pickle serializer at prefect.serializers.PickleSerializer. Prefect's pickle serializer uses the cloudpickle project by default to support more object types. Alternative pickle libraries can be specified:
We supply a custom JSON serializer at prefect.serializers.JSONSerializer. Prefect's JSON serializer uses custom hooks by default to support more object types. Specifically, we add support for all types supported by Pydantic.
By default, we use the standard Python json library. Alternative JSON libraries can be specified:
Prefect uses internal result types to capture information about the result attached to a state. The following types are used:
UnpersistedResult: Stores result metadata but the value is only available when created.
LiteralResult: Stores simple values inline.
PersistedResult: Stores a reference to a result persisted to storage.
All result types include a get() method that can be called to return the value of the result. This is done behind the scenes when the result() method is used on states or futures.
Unpersisted results are used to represent results that have not been and will not be persisted beyond the current flow run. The value associated with the result is stored in memory, but will not be available later. Result metadata is attached to this object for storage in the API and representation in the UI.
The get() method on result references retrieves the data from storage, deserializes it, and returns the original object.
The get() operation will cache the resolved object to reduce the overhead of subsequent calls.
When results are persisted to storage, they are always written as a JSON document. The schema for this is described by the PersistedResultBlob type. The document contains:
The serialized data of the result.
A full description of result serializer that can be used to deserialize the result data.