Advanced Pipeline

This guide explains how to create an example pipeline that’s closer to a typical use-case of a Nextflow bioinformatics pipeline. It has a mixture of scripting languages and uses a join to merge all events.

Creating the modules

The sections below describe how to create the VDSL3 modules in preparation for creating the pipeline.

Preparation

create a new folder named advanced pipeline, add src folder with a nextflow_module folder inside. Now create three folders inside nextflow_module folder, one per component needed for the pipeline:

  • combine_columns
  • remove_comments
  • take_columns

The folder structure should look like this now:

advanced_pipeline
└── src
    └── nextflow_modules
        ├── combine_columns
        ├── remove_comments
        └── take_column

Creating the remove_comments component

This component removes all comments (lines starting with a hashtag) from a tsv file.
Download or copy the contents of config.vsh.yaml and script.sh below into the remove_comments folder.

Download config.vsh.yaml
Contents of config.vsh.yaml
functionality:
  name: remove_comments
  description: Remove comments from a TSV file.
  namespace: nextflow_modules
  arguments:
    - name: "--input"
      alternatives: [ "-i" ]
      type: file
      required: true
      example: "file.tsv"
    - name: "--output"
      alternatives: [ "-o" ]
      type: file
      required: true
      direction: output
      example: "file.tsv"
    - type: string
      name: --id
      default: "remove_comments"
  resources:
  - type: bash_script
    path: ./script.sh
platforms:
  - type: docker
    image: ubuntu:20.04
  - type: nextflow
    variant: vdsl3
Download script.sh
Contents of script.sh
#!/bin/bash

grep -v '^#' "$par_input" > "$par_output"

Creating the take_column component

This component subsets an incoming tsv file by extracting a certain column from the file.
Download or copy the contents of config.vsh.yaml and script.py below into the take_column folder.

Download config.vsh.yaml
Contents of config.vsh.yaml
functionality:
  name: take_column
  namespace: nextflow_modules
  arguments:
    - name: "--input"
      alternatives: [ "-i" ]
      type: file
      required: true
    - name: "--output"
      alternatives: [ "-o" ]
      type: file
      required: true
      direction: output
    - name: "--column"
      type: integer
      required: false
      default: 2
    - type: string
      name: --id
      default: "take_column"
  resources:
  - type: python_script
    path: ./script.py
platforms:
  - type: nextflow
    variant: vdsl3
    directives:
      container: amancevice/pandas:slim
Download script.py
Contents of script.py
import pandas as pd

## VIASH START
par = {
    "input": "data/file1.tsv",
    "column": 2,
    "output": "temp/foo"
}
## VIASH END

# read the tsv file
tab = pd.read_csv(par["input"], sep="\t", comment="#")

# subset a column
tab_filt = tab.iloc[:, par["column"]-1]

# write to file
tab_filt.to_csv(par["output"], index=False)

Creating the combine_columns component

This component combines multiple tsv files into one by concatenating all of the columns together. It assumes each incoming tsv file has an equal number of rows.
Download or copy the contents of config.vsh.yaml and script.R below into the combine_columns folder.

Download config.vsh.yaml
Contents of config.vsh.yaml
functionality:
  name: combine_columns
  namespace: nextflow_modules
  arguments:
    - name: "--input"
      alternatives: [ "-i" ]
      type: file
      multiple: true
      required: true
    - name: "--output"
      alternatives: [ "-o" ]
      type: file
      required: true
      direction: output
    - type: string
      name: --id
      default: "combine_columns"
  resources:
    - type: r_script
      path: ./script.R
platforms:
  - type: nextflow
    variant: vdsl3
    directives:
      container: rocker/r-ver:4.1
Download script.R
Contents of script.R
## VIASH START
par <- list(
    input = c("data/file1.tsv", "data/file2.tsv"),
    output = "temp/foo.tsv"
)
## VIASH END

outs <- lapply(par$input, function(file) {
  read.delim(file, comment.char = "#", sep = "\t", header = FALSE)
})

table <- do.call(cbind, outs)

write.table(table, par$output, col.names = FALSE, sep = "\t")

Building the modules

The basic pipeline guide describes how to generate an individual VDSL3 Nextflow module using the viash build command, but there’s a better way when it comes to building multiple modules at once: viash ns build. Each one of the three components has its namespace defined as nextflow_modules inside of its config file. Execute this command from the root of the advanced_pipeline directory

viash ns build

This will result in the following output:

Exporting take_column (nextflow_modules) =nextflow=> target/nextflow/nextflow_modules/take_column
Exporting combine_columns (nextflow_modules) =nextflow=> target/nextflow/nextflow_modules/combine_columns
Exporting remove_comments (nextflow_modules) =nextflow=> target/nextflow/nextflow_modules/remove_comments

The VDSL3 Nextflow modules were generated inside of a target folder. The directory structure now looks like this:

advanced_pipeline
├── src
│   └── nextflow_modules
│       ├── combine_columns
│       │   ├── config.vsh.yaml
│       │   └── script.R
│       ├── remove_comments
│       │   ├── config.vsh.yaml
│       │   └── script.sh
│       └── take_column
│           ├── config.vsh.yaml
│           └── script.py
└── target
    └── nextflow
        └── nextflow_modules
            ├── combine_columns
            │   ├── main.nf
            │   └── nextflow.config
            ├── remove_comments
            │   ├── main.nf
            │   └── nextflow.config
            └── take_column
                ├── main.nf
                └── nextflow.config

Creating the pipeline

Create a new file named main.nf and add it to the root of the advanced_pipeline directory. Use the following as its contents:

targetDir = "../target/nextflow" // 1

// 2
include { remove_comments } from "$targetDir/nextflow_modules/remove_comments/main.nf"
include { take_column } from "$targetDir/nextflow_modules/take_column/main.nf"
include { combine_columns } from "$targetDir/nextflow_modules/combine_columns/main.nf"

workflow {
  Channel.fromPath(params.input) // 3
  
    // 4
    // File -> (String, File)
    | map{ file -> [ file.baseName, file ] }
    
    // 5
    // (String, File) -> (String, File)
    | remove_comments

    // 6
    // (String, File) -> (String, File)
    | take_column

    // 7
    // (String, File)* -> List[(String, File)]
    | toList()

    // 8
    // List[(String, File)] -> (String, {input: List[File]})
    | map{ tups -> 
      files = tups.collect{id, file -> file}
      [ "combined", [ input: files ] ] 
    }

    // 9
    // (String, {input: List[File]}) -> (String, File)
    | combine_columns.run(
      auto: [ publish: true ]
      )

    // 10
    | view{ file -> "Output: $file" }
}
  1. Target directory where the modules are located
  2. Include the modules from the target directory
  3. Create a channel based on the input parameter’s path
  4. Assign a unique ID to each event using map{}
  5. Run remove_comments to remove the comments from the TSV
  6. Extract a single column from TSV by running take_column
  7. Combine all events into a single List event using toList()
  8. Add unique ID to the tuple
  9. Concatenate the TSVs into one by running the combine_columns module with auto publishing enabled using the auto directive
  10. View the channel contents bu printing it to the console using view()

Running the pipeline

Before being able to run the pipeline, you’ll need some TSV files to work with. Download the files below and place them in a new directory named data in the root of advanced_pipeline.

Download file1.tsv

Download file2.tsv

Now run the following command to run the pipeline using Nextflow:

nextflow run main.nf --input "data/file?.tsv" --publishDir output

You should get an output similar to this:

N E X T F L O W  ~  version 22.04.3
Launching `workflows/310-realistic_pipeline/main.nf` [stupefied_saha] DSL2 - revision: 6669aefc6c
[93/232aba] Submitted process > remove_comments:remove_comments_process (2)
[ef/a28e89] Submitted process > remove_comments:remove_comments_process (1)
[63/279f98] Submitted process > take_column:take_column_process (1)
[50/2a17ef] Submitted process > take_column:take_column_process (2)
[8d/6eeff5] Submitted process > combine_columns:combine_columns_process
Output: [combined, /home/runner/work/viash_nxf_course/viash_nxf_course/work/8d/6eeff571e9ff2c5389851c6ab3001c/combined.combine_columns.output]

You can find the final TSV in the output directory:

# this is a header      
# this is also a header     
one     0.11    123
two     0.23    456
three   0.35    789
four    0.47    123