Creating a Nextflow Data Pipeline
Introduction
Pipelines / Workflows
It’s possible to convert a Viash component into a NextFlow module.
Viash uses NextFlow’s DSL2 for this, effectively creating modules that
can be imported in a main.nf
pipeline definition that deals with the
logic of the pipeline rather than the low-level machinery.
When it comes to this low-level machinery and the way Viash creates a module, we refer to the step-by-step introduction about DiFlow.
Parallelization
NextFlow, as any other pipeline platform is able to run tasks in
parallel if the pipeline logic allows for that. In order to keep
different parallel branch unique, we add a unique identifier id
. This
identifier can be a sample identifier, or a plate id (sequencing),
versions of reference files to consider, etc.
In a pipeline, one is often interested in running some computation on different datasets, inputs or parts of an input. This means that obviously we need to keep track of where the input chunks are located. But more importantly, we can not just simply name an output file because multiple parallel processes might just overwrite each other’s output files.
Output Filenames
Therefore, it’s important to keep output files distinct across different steps of the pipeline but also between different parallel runs. In order to assure this, a module will define its own output file name. It is constructed from 3 ingredients:
- The unique
id
of the data going into the process - The name of the current component/task
- An extension
The extension is derived from the component configuration if a default: ...
attribute is present for the corresponding output argument. If no
such default value is provided, the name of the option is used.
As an example, the following argument for a component comp
name: --log
type: file
direction: Output
will result in <id>.comp.log
, while the following
name: --log
type: file
direction: Output
default: log.txt
will become in <id>.comp.txt
.
The config can define a directory as output as well, it will be named accordingly.
Remark: If the output is a directory, type: file
should still be
used, but the corresponding script/code should take care of writing the
content to that directory.
Tips
How to specify multiple inputs?
If a component deals with just one input file, that input file should be provided as the second element in the DiFlow triplet. In other words, if this is the first component in a (sub)workflow, two options are available:
Channel.fromPath(<...>).map{ file -> [ <id>, file, params ] }
or
Channel.from(<...>).map{ filename -> [ <id>, file(filename), params ] }
It is crucial that this second element in the triplet is of type Path
.
If multiple inputs are to be provided corresponding to the same option
for the underlying process or tool a List
of Path
objects can be
provided.
For example, say we ran multiple parallel workflows for a single sample and want to join the result of that. The way to express this in NextFlow would be something like:
concatenate_ = singleSample_ \
| toList \
| map{ it -> [ it.collect{ b -> b[0]}, it.collect{ a -> a[1] }, params ]} \
| concatenate
In other words, we pass a List
of Path objects to the concatenate
module.
In some cases, multiple input arguments deal with different input files,
for instance fastq
files and a reference file for mapping and
counting. One can pass this to the concatenation module by means of a
Map
. This approach, for instance can be used to merge meta information
(from a file) to an h5ad
file:
singleSample_ = input_ \
...
| combine(meta_) \
| map{ id, output, params, meta ->
[ id, [ "input" : output, "meta" : meta ], params ]
} \
| annotate
Where the meta_
Channel
points to the meta file to be used.
In other words, we either provide a List
of Path values or in the case
multiple options take different files we use a HashMap
.
Remark: Be sure to mark the arguments at hand as being of type: file
and direction: input
.
Multiple outputs
Internally, DiFlow uses a similar approach to keeping track of outputs
as discussed for inputs. What comes out of a module, however, is
slightly different. Since a workflow
can not emit a multi-channel
object, we are forced to put all outputs on the same Channel
and so we
use a Map
again to distinguish both. This is only done for multiple
outputs, though.
By means of an example: Say a module outputs one file, then the triplet that is returned from the module looks like this:
[ <id>, file, params ]
If our tools has two output files, say for instance outputfile.txt
and
logfile.txt
(as indicated by the command line for the tool that looks
for instance like this: .... --output outputfile.txt --log logfile.txt
), we still get one Channel
back, but on that Channel
there are now two events and those look like this:
[ <id>, [ output: outputfile.txt ], params ]
[ <id>, [ log: logfile.txt ], params ]
It’s up to the receiving end of the module to split this downstream. The implicit workflow defined in all the generated module contains some example code to that, for instance:
result \
| filterOutput \
| view{ "Output for output: " + it[1] }
result \
| filterLog \
| view{ "Output for log: " + it[1] }
Where the filterLog
process for instance is defined like so:
// A process that filters out output from the output Map
process filterOutput {
input:
tuple val(id), val(input), val(_params)
output:
tuple val(id), val(output), val(_params)
when:
input.keySet().contains("output")
exec:
output = input["output"]
}
Alternatively, one could also use methods on the Channel
itself:
result \
| filter{ it[1].keySet().contains("output") }
| map{ [ it[0], it[1]["output"], it[2] ] }
| view{ "Output for log: " + it[1] }
One more option is to use the branch
or multiMap
Channel
forking
operators in NextFlow.
Access arguments from the nextflow
CLI (v0.4.1+
)
We provide a way to access the argument values for all the arguments for every component from the NextFlow CLI. All components store their respective arguments under a component-specific key that corresponds to the component name.
Furthermore, the value that will effectively be passed to the process depends on the following attributes for an argument:
required: true/false
default: ...
If an argument is required: true
, it can have
- a
default: ...
value: It will use this default value on the CLI unless you override it on the CLI - no default value: In this case, one has to provide a value when starting a NextFlow pipeline. Otherwise Viash will give a warning and NXF will throw an error.
If an argument is required: false
, it can have
a default: ...
value: It will use this default value unless you override it from the CLI- no default value: In this case, this argument will not be present on the CLI, but it can still be overridden from the CLI
Please refer to this issue for more information.
Given the above, its possible to override any of these using the following scheme:
--<component_name>__<argument_name> <value>
For instance:
nextflow run main.nf --filter__threshold 0.2