Advanced Pipeline
This guide explains how to create an example pipeline that’s closer to a typical use-case of a Nextflow bioinformatics pipeline. It has a mixture of scripting languages and uses a join to merge all events.
Creating the modules
The sections below describe how to create the VDSL3 modules in preparation for creating the pipeline.
Preparation
create a new folder named advanced pipeline, add src folder with a nextflow_module folder inside. Now create three folders inside nextflow_module folder, one per component needed for the pipeline:
combine_columnsremove_commentstake_columns
The folder structure should look like this now:
advanced_pipeline
└── src
└── nextflow_modules
├── combine_columns
├── remove_comments
└── take_column
Creating the remove_comments component
This component removes all comments (lines starting with a hashtag) from a tsv file.
Download or copy the contents of config.vsh.yaml and script.sh below into the remove_comments folder.
Contents of config.vsh.yaml
functionality:
name: remove_comments
description: Remove comments from a TSV file.
namespace: nextflow_modules
arguments:
- name: "--input"
alternatives: [ "-i" ]
type: file
required: true
example: "file.tsv"
- name: "--output"
alternatives: [ "-o" ]
type: file
required: true
direction: output
example: "file.tsv"
- type: string
name: --id
default: "remove_comments"
resources:
- type: bash_script
path: ./script.sh
platforms:
- type: docker
image: ubuntu:20.04
- type: nextflow
variant: vdsl3Contents of script.sh
#!/bin/bash
grep -v '^#' "$par_input" > "$par_output"Creating the take_column component
This component subsets an incoming tsv file by extracting a certain column from the file.
Download or copy the contents of config.vsh.yaml and script.py below into the take_column folder.
Contents of config.vsh.yaml
functionality:
name: take_column
namespace: nextflow_modules
arguments:
- name: "--input"
alternatives: [ "-i" ]
type: file
required: true
- name: "--output"
alternatives: [ "-o" ]
type: file
required: true
direction: output
- name: "--column"
type: integer
required: false
default: 2
- type: string
name: --id
default: "take_column"
resources:
- type: python_script
path: ./script.py
platforms:
- type: nextflow
variant: vdsl3
directives:
container: amancevice/pandas:slimContents of script.py
import pandas as pd
## VIASH START
par = {
"input": "data/file1.tsv",
"column": 2,
"output": "temp/foo"
}
## VIASH END
# read the tsv file
tab = pd.read_csv(par["input"], sep="\t", comment="#")
# subset a column
tab_filt = tab.iloc[:, par["column"]-1]
# write to file
tab_filt.to_csv(par["output"], index=False)Creating the combine_columns component
This component combines multiple tsv files into one by concatenating all of the columns together. It assumes each incoming tsv file has an equal number of rows.
Download or copy the contents of config.vsh.yaml and script.R below into the combine_columns folder.
Contents of config.vsh.yaml
functionality:
name: combine_columns
namespace: nextflow_modules
arguments:
- name: "--input"
alternatives: [ "-i" ]
type: file
multiple: true
required: true
- name: "--output"
alternatives: [ "-o" ]
type: file
required: true
direction: output
- type: string
name: --id
default: "combine_columns"
resources:
- type: r_script
path: ./script.R
platforms:
- type: nextflow
variant: vdsl3
directives:
container: rocker/r-ver:4.1Contents of script.R
## VIASH START
par <- list(
input = c("data/file1.tsv", "data/file2.tsv"),
output = "temp/foo.tsv"
)
## VIASH END
outs <- lapply(par$input, function(file) {
read.delim(file, comment.char = "#", sep = "\t", header = FALSE)
})
table <- do.call(cbind, outs)
write.table(table, par$output, col.names = FALSE, sep = "\t")Building the modules
The basic pipeline guide describes how to generate an individual VDSL3 Nextflow module using the viash build command, but there’s a better way when it comes to building multiple modules at once: viash ns build. Each one of the three components has its namespace defined as nextflow_modules inside of its config file. Execute this command from the root of the advanced_pipeline directory
viash ns buildThis will result in the following output:
Exporting take_column (nextflow_modules) =nextflow=> target/nextflow/nextflow_modules/take_column
Exporting combine_columns (nextflow_modules) =nextflow=> target/nextflow/nextflow_modules/combine_columns
Exporting remove_comments (nextflow_modules) =nextflow=> target/nextflow/nextflow_modules/remove_comments
The VDSL3 Nextflow modules were generated inside of a target folder. The directory structure now looks like this:
advanced_pipeline
├── src
│ └── nextflow_modules
│ ├── combine_columns
│ │ ├── config.vsh.yaml
│ │ └── script.R
│ ├── remove_comments
│ │ ├── config.vsh.yaml
│ │ └── script.sh
│ └── take_column
│ ├── config.vsh.yaml
│ └── script.py
└── target
└── nextflow
└── nextflow_modules
├── combine_columns
│ ├── main.nf
│ └── nextflow.config
├── remove_comments
│ ├── main.nf
│ └── nextflow.config
└── take_column
├── main.nf
└── nextflow.config
Creating the pipeline
Create a new file named main.nf and add it to the root of the advanced_pipeline directory. Use the following as its contents:
targetDir = "../target/nextflow" // 1
// 2
include { remove_comments } from "$targetDir/nextflow_modules/remove_comments/main.nf"
include { take_column } from "$targetDir/nextflow_modules/take_column/main.nf"
include { combine_columns } from "$targetDir/nextflow_modules/combine_columns/main.nf"
workflow {
Channel.fromPath(params.input) // 3
// 4
// File -> (String, File)
| map{ file -> [ file.baseName, file ] }
// 5
// (String, File) -> (String, File)
| remove_comments
// 6
// (String, File) -> (String, File)
| take_column
// 7
// (String, File)* -> List[(String, File)]
| toList()
// 8
// List[(String, File)] -> (String, {input: List[File]})
| map{ tups ->
files = tups.collect{id, file -> file}
[ "combined", [ input: files ] ]
}
// 9
// (String, {input: List[File]}) -> (String, File)
| combine_columns.run(
auto: [ publish: true ]
)
// 10
| view{ file -> "Output: $file" }
}- Target directory where the modules are located
- Include the modules from the
targetdirectory - Create a channel based on the
inputparameter’s path - Assign a unique ID to each event using
map{} - Run
remove_commentsto remove the comments from the TSV - Extract a single column from TSV by running
take_column - Combine all events into a single List event using
toList() - Add unique ID to the tuple
- Concatenate the TSVs into one by running the
combine_columnsmodule with auto publishing enabled using theautodirective - View the channel contents bu printing it to the console using
view()
Running the pipeline
Before being able to run the pipeline, you’ll need some TSV files to work with. Download the files below and place them in a new directory named data in the root of advanced_pipeline.
Now run the following command to run the pipeline using Nextflow:
nextflow run main.nf --input "data/file?.tsv" --publishDir outputYou should get an output similar to this:
N E X T F L O W ~ version 22.04.3
Launching `workflows/310-realistic_pipeline/main.nf` [stupefied_saha] DSL2 - revision: 6669aefc6c
[93/232aba] Submitted process > remove_comments:remove_comments_process (2)
[ef/a28e89] Submitted process > remove_comments:remove_comments_process (1)
[63/279f98] Submitted process > take_column:take_column_process (1)
[50/2a17ef] Submitted process > take_column:take_column_process (2)
[8d/6eeff5] Submitted process > combine_columns:combine_columns_process
Output: [combined, /home/runner/work/viash_nxf_course/viash_nxf_course/work/8d/6eeff571e9ff2c5389851c6ab3001c/combined.combine_columns.output]
You can find the final TSV in the output directory:
# this is a header
# this is also a header
one 0.11 123
two 0.23 456
three 0.35 789
four 0.47 123