Ops
If you are just getting started with Dagster, we strongly recommend you use assets rather than ops to build your data pipelines. The ops documentation is for Dagster users who need to manage existing ops, or who have complex use cases.
Ops are the core unit of computation in Dagster.
An individual op should perform relatively simple tasks, such as:
- Deriving a dataset from other datasets
- Executing a database query
- Initiating a Spark job in a remote cluster
- Querying an API and storing the result in a data warehouse
- Sending an email or Slack message
The computational core of an asset definition is an op. Collections of ops can also be assembled to create a graph.
Ops support a variety of useful features for data orchestration, such as:
-
Flexible execution strategies: Painlessly transition from development to production with ops, as they are sealed units of logic independent of execution strategy. Collections of ops - called graphs - can be bound via jobs to an appropriate executor for single-process execution or distribution across a cluster.
-
Pluggable external systems: If your data pipeline interfaces with external systems, you may want to use local substitutes during development over a cloud-based production system. Dagster provides resources as an abstraction layer for this purpose.
Ops can be written against abstract resources (e.g.
database
), with resource definitions later bound at the job level. Op logic can thus remain uncoupled to any particular implementation of an external system. -
Input and output management: Ops have defined inputs and outputs, analogous to the arguments and return value(s) of a Python function. An input or output can be annotated with a Dagster type for arbitrarily complex runtime validation. Outputs can additionally be tagged with an IO Manager to manage storage of the associated data in between ops. This enables easy swapping of I/O strategy depending on the execution environment, as well as efficient caching of data intermediates.
-
Configuration: Operations in a data pipeline are often parameterized by both upstream data (e.g. a stream of database records) and configuration parameters independent of upstream data (e.g. a "chunk size" of incoming records to operate on). Define configuration parameters by providing an associated config schema to the op.
-
Event streams: Ops emit a stream of events during execution. Certain events are emitted by default - such as indicating the start of an op's execution - but op authors are additionally given access to an event API.
This can be used to report data asset creation or modification (
AssetMaterialization
), the result of a data quality check (ExpectationResult
), or other arbitrary information. Event streams can be visualized in the Dagster UI. This rich log of execution facilitates debugging, inspection, and real-time monitoring of running jobs. -
Testability: The properties that enable flexible execution of ops also facilitate versatile testing. Ops can be tested in isolation or as part of a pipeline. Further, the resource API allows external systems (e.g. databases) to be stubbed or substituted as needed.
Relevant APIs
Name | Description |
---|---|
@dg.op | A decorator used to define ops. Returns an OpDefinition . The decorated function is called the "compute function". |
In | An input to an op. Defined on the ins argument to the @dg.op decorator. |
Out | An output of an op. Defined on the out argument to the @dg.op decorator. |
OpExecutionContext | An object exposing Dagster system APIs for resource access, logging, and more. Can be injected into an op by specifying context as the first argument of the compute function. |
OpDefinition | Class for ops. You will rarely want to instantiate this class directly. Instead, you should use the @dg.op . |
Defining an op
To define an op, use the @dg.op
decorator. The decorated function is called the compute_fn
.
@dg.op
def my_op():
return "hello"
Inputs and outputs
Each op has a set of inputs and outputs, which define the data it consumes and produces. Inputs and outputs are used to define dependencies between ops and to pass data between ops.
Both definitions have a few important properties:
- They are named.
- They are optionally typed. These types are validated at runtime.
- (Advanced) They can be linked to an
IOManager
, which defines how the output or input is stored and loaded. See the IO manager concept page for more info.
Inputs
Inputs are passed as arguments to an op's compute_fn
. The value of an input can be passed from the output of another op, or stubbed (hardcoded) using config.
The most common way to define inputs is just to add arguments to the decorated function:
@dg.op
def my_input_op(abc, xyz):
pass
An op only starts to execute once all of its inputs have been resolved. Inputs can be resolved in two ways:
- The upstream output that the input depends on has been successfully emitted and stored.
- The input was stubbed through config.
You can use a Dagster Type to provide a function that validates an op's input every time the op runs. In this case, you use a dictionary of Ins
corresponding to the decorated function arguments.
MyDagsterType = dg.DagsterType(
type_check_fn=lambda _, value: value % 2 == 0, name="MyDagsterType"
)
@dg.op(ins={"abc": dg.In(dagster_type=MyDagsterType)})
def my_typed_input_op(abc):
pass