From ab4f1f5dd86ffeb11aa0ac5029b341d0782d6183 Mon Sep 17 00:00:00 2001
From: Murray Wham <murray.wham@ed.ac.uk>
Date: Mon, 15 Nov 2021 18:11:48 +0000
Subject: [PATCH] Adding BCBio config, connecting up workflows properly

---
 environment.yml                |   2 +-
 pipeline/inputs.nf             |   1 -
 pipeline/main.nf               | 100 +++++++++++++++++++++++++++++++++
 pipeline/validation.nf         |  51 ++++++++---------
 tests/run_tests.sh             |  41 ++++++--------
 tests/scripts/bcbio_nextgen.py |   4 ++
 tests/scripts/test_config.sh   |  18 ++++++
 7 files changed, 167 insertions(+), 50 deletions(-)
 create mode 100644 pipeline/main.nf
 create mode 100755 tests/scripts/bcbio_nextgen.py
 create mode 100644 tests/scripts/test_config.sh

diff --git a/environment.yml b/environment.yml
index ed0f0cf..eceee14 100644
--- a/environment.yml
+++ b/environment.yml
@@ -8,4 +8,4 @@ channels:
 dependencies:
   - coreutils=8.25  # gives us cross-platform md5sum
   - nextflow=21.04.0
-#  - bcbio-nextgen=1.2.8
\ No newline at end of file
+#  - bcbio-nextgen=1.2.8
diff --git a/pipeline/inputs.nf b/pipeline/inputs.nf
index cf4094f..3a0263f 100644
--- a/pipeline/inputs.nf
+++ b/pipeline/inputs.nf
@@ -86,7 +86,6 @@ workflow read_inputs {
         ch_individuals_by_family = ch_ped_file_info
             .join(ch_samplesheet_info)
             .map({[it[1], it]})
-            .groupTuple()
     
     emit:
         ch_samplesheet_info
diff --git a/pipeline/main.nf b/pipeline/main.nf
new file mode 100644
index 0000000..9ee2571
--- /dev/null
+++ b/pipeline/main.nf
@@ -0,0 +1,100 @@
+nextflow.enable.dsl = 2
+
+include {read_inputs} from './inputs.nf'
+include {validation} from './validation.nf'
+
+
+process merge_fastqs {
+    publishDir "outputs/individuals/$indv_id/merged_fastqs", mode: 'copy'
+
+    input:
+    tuple(val(indv_id), val(r1), val(r2))
+
+    output:
+    tuple(
+        val(indv_id),
+        path("${indv_id}_merged_r1.fastq.gz"),
+        path("${indv_id}_merged_r2.fastq.gz")
+    )
+
+    script:
+    // todo: pigz if gzip becomes a bottleneck
+    """
+    cat ${r1.join(' ')} | gzip -c > ${indv_id}_merged_r1.fastq.gz &
+    cat ${r2.join(' ')} | gzip -c > ${indv_id}_merged_r2.fastq.gz
+    """
+}
+
+process write_bcbio_csv {
+    publishDir "outputs/families/$family_id", mode: 'copy'
+
+    input:
+    tuple(val(family_id), val(individual_info))
+
+    output:
+    path("${family_id}.csv")
+
+    script:
+    """
+    #!/usr/bin/env python
+
+    individual_info = '$individual_info'
+    lines = individual_info.lstrip('[').rstrip(']').split('], [')
+    
+    with open('${family_id}.csv', 'w') as f:
+        f.write('samplename,description,batch,sex,phenotype,variant_regions\\n')
+        for l in lines:
+            l = l.lstrip('[').rstrip(']').split(', ')
+            f.write(','.join(l))
+    """
+}
+
+process run_bcbio {
+    input:
+    path(bcbio_config)
+
+    script:
+    """
+    bcbio_nextgen.py $bcbio_config -n 16 -t local
+    """
+
+}
+
+workflow prepare_bcbio_config {
+    /*
+    - For each individual, merge all R1 and R2 fastqs
+    - For each family:
+      - Read the relevant information from the samplesheet, ped files and merged fastqs
+      - Write a BCBio config file
+      - Run BCBio on it
+    */
+
+    take:
+        ch_samplesheet_info
+        ch_individuals_by_family
+    
+    main:
+        ch_merged_fastqs = merge_fastqs(ch_samplesheet_info)
+        ch_merged_data = ch_individuals_by_family.map(
+            { k, v -> v }
+        )
+        .join(ch_merged_fastqs)
+        .map(
+            { sample_id, family_id, father, mother, sex, phenotype, r1s, r2s, merged_r1, merged_r2 ->
+                [family_id, [sample_id, father, mother, sex, phenotype, merged_r1, merged_r2]]
+        }).groupTuple()
+
+        ch_bcbio_config = write_bcbio_csv(ch_merged_data)
+        run_bcbio(ch_bcbio_config)
+}
+
+
+workflow {
+    read_inputs()
+    ch_samplesheet_info = read_inputs.out[0]
+    ch_ped_file_info = read_inputs.out[1]
+    ch_individuals_by_family = read_inputs.out[2]
+
+    validation(ch_samplesheet_info)
+    prepare_bcbio_config(ch_samplesheet_info, ch_individuals_by_family)
+}
diff --git a/pipeline/validation.nf b/pipeline/validation.nf
index 81b7c21..965008f 100644
--- a/pipeline/validation.nf
+++ b/pipeline/validation.nf
@@ -47,37 +47,38 @@ process raise_errors {
     exit 1, "MD5 mismatches found"
 }
 
-workflow {
+workflow validation {
     /*
     Take a parsed samplesheet, flatten it and parse into a channel of observed vs.
     expected checksums. Calls check_errors above to raise an exception upon any
     mismatches.
     */
 
-    read_inputs()
-    ch_samplesheet_info = read_inputs.out[0]
+    take:
+        ch_samplesheet_info
     
-    ch_fastqs = ch_samplesheet_info
-    .map(
-        { indv, r1, r2 ->
-            [r1, r2]
-        }
-    ).flatten()
-    .map({file(it)})
-    
-    ch_md5_files = ch_fastqs.map(
-        { fastq -> fastq.getParent().getParent() + '/md5sums.txt' }
-    )
-    
-    ch_obs = observed_md5(ch_fastqs)
-    ch_exp = expected_md5(ch_fastqs, ch_md5_files)
+    main:
+        ch_fastqs = ch_samplesheet_info
+        .map(
+            { indv, r1, r2 ->
+                [r1, r2]
+            }
+        ).flatten()
+        .map({file(it)})
+        
+        ch_md5_files = ch_fastqs.map(
+            { fastq -> fastq.getParent().getParent() + '/md5sums.txt' }
+        )
+        
+        ch_obs = observed_md5(ch_fastqs)
+        ch_exp = expected_md5(ch_fastqs, ch_md5_files)
 
-    ch_mismatches = ch_obs.concat(ch_exp)
-        .map({fastq, md5 -> [fastq, md5.strip()]})
-        .groupTuple()
-        .filter({it[1][0] != it[1][1]})
-        .collect({"${it[0]}: ${it[1][0]} != ${it[1][1]}"})
-    
-    ch_mismatches.view({"\nChecksum mismatches:\n${it.join('\n')}"})
-    raise_errors(ch_mismatches)
+        ch_mismatches = ch_obs.concat(ch_exp)
+            .map({fastq, md5 -> [fastq, md5.strip()]})
+            .groupTuple()
+            .filter({it[1][0] != it[1][1]})
+            .collect({"${it[0]}: ${it[1][0]} != ${it[1][1]}"})
+        
+        ch_mismatches.view({"\nChecksum mismatches:\n${it.join('\n')}"})
+        raise_errors(ch_mismatches)
 }
diff --git a/tests/run_tests.sh b/tests/run_tests.sh
index 59a53a4..bc83de5 100755
--- a/tests/run_tests.sh
+++ b/tests/run_tests.sh
@@ -1,37 +1,32 @@
 #!/bin/bash
 
-function run_nextflow {
-    # Run NextFlow in the background, wait for termination and return an exit status based on
-    # error occurrence. We have to do this because NextFlow doesn't like CI environments with
-    # detached stdin/stdout.
-
-    nextflow run -bg $*
-    nextflow_pid=$(cat .nextflow.pid)
-
-    while kill -0 $nextflow_pid 2> /dev/null
-    do
-        sleep 1
-    done
-    rm .nextflow.pid
-
-    # janky, but couldn't find any other way of getting the exit status
-    return "$(grep -c ERROR .nextflow.log)"
-}
+export PATH=$PATH:$PWD/scripts
+source scripts/test_config.sh
 
 test_exit_status=0
 
 nextflow clean -f
-for d in ./outputs/*
-do
-    rm -r $d
-done
+rm -r ./outputs/* ./work/*
 
 echo "Test case 1: simple trio"
-run_nextflow ../pipeline/validation.nf --ped_file assets/input_data/ped_files/batch_1.ped  --sample_sheet assets/input_data/sample_sheets/batch_1.tsv
+run_nextflow ../pipeline/main.nf --ped_file assets/input_data/ped_files/batch_1.ped  --sample_sheet assets/input_data/sample_sheets/batch_1.tsv
 test_exit_status=$(( $test_exit_status + $? ))
+for f in "
+    outputs/individuals/000001/merged_fastqs/000001_merged_r1.fastq.gz
+    outputs/individuals/000001/merged_fastqs/000001_merged_r2.fastq.gz
+    outputs/individuals/000002/merged_fastqs/000002_merged_r1.fastq.gz
+    outputs/individuals/000002/merged_fastqs/000002_merged_r2.fastq.gz
+    outputs/individuals/000003/merged_fastqs/000003_merged_r1.fastq.gz
+    outputs/individuals/000003/merged_fastqs/000003_merged_r2.fastq.gz
+    outputs/families/000001/000001.csv
+"
+do
+    ls $f > /dev/null
+    test_exit_status=$(( $test_exit_status + $? ))
+done
 
 echo "Test case 2: MD5 errors"
-run_nextflow ../pipeline/validation.nf --ped_file assets/input_data/ped_files/batch_2_md5_errors.ped  --sample_sheet assets/input_data/sample_sheets/batch_2_md5_errors.tsv
+run_nextflow ../pipeline/main.nf --ped_file assets/input_data/ped_files/batch_2_md5_errors.ped  --sample_sheet assets/input_data/sample_sheets/batch_2_md5_errors.tsv
 if [ $? == 0 ]
 then
     test_exit_status=$(( $test_exit_status + 1 ))
diff --git a/tests/scripts/bcbio_nextgen.py b/tests/scripts/bcbio_nextgen.py
new file mode 100755
index 0000000..c8da36a
--- /dev/null
+++ b/tests/scripts/bcbio_nextgen.py
@@ -0,0 +1,4 @@
+#/usr/bin/env python
+"""
+Fake BCBio for testing with the empty datasets from tests/assets/
+"""
diff --git a/tests/scripts/test_config.sh b/tests/scripts/test_config.sh
new file mode 100644
index 0000000..5ddda73
--- /dev/null
+++ b/tests/scripts/test_config.sh
@@ -0,0 +1,18 @@
+
+function run_nextflow {
+    # Run NextFlow in the background, wait for termination and return an exit status based on
+    # error occurrence. We have to do this because NextFlow doesn't like CI environments with
+    # detached stdin/stdout.
+
+    nextflow run -bg $*
+    nextflow_pid=$(cat .nextflow.pid)
+
+    while kill -0 $nextflow_pid 2> /dev/null
+    do
+        sleep 1
+    done
+    rm .nextflow.pid
+
+    # janky, but couldn't find any other way of getting the exit status
+    return "$(grep -c ERROR .nextflow.log)"
+}
-- 
GitLab