From 8f754358e536de8c2565814ea42dd2f931d06deb Mon Sep 17 00:00:00 2001
From: Murray Wham <murray.wham@ed.ac.uk>
Date: Fri, 1 Apr 2022 13:09:35 +0100
Subject: [PATCH] Moving BCBio outputs into the right places

---
 pipeline/inputs.nf     |  32 +++++---
 pipeline/main.nf       | 173 +++++++++++++++++++++++++++++++++--------
 pipeline/validation.nf |  19 ++---
 3 files changed, 172 insertions(+), 52 deletions(-)

diff --git a/pipeline/inputs.nf b/pipeline/inputs.nf
index e5f6b31..d9e1a63 100644
--- a/pipeline/inputs.nf
+++ b/pipeline/inputs.nf
@@ -64,8 +64,25 @@ workflow read_inputs {
             .groupTuple()
 
         /*
-        ch_individuals_by_family - merge of samplesheet and ped file information,
-        grouped by family ID:
+        ch_individuals - merge of samplesheet and ped file information:
+        [
+            [
+                indv_id,
+                family_id,
+                father_id,
+                mother_id,
+                sex,
+                affected,
+                [r1_fastqs],
+                [r2_fastqs]
+            ],  
+        ...
+        ]
+        */
+        ch_individuals = ch_ped_file_info.join(ch_samplesheet_info)
+
+        /*
+        ch_individuals_by_family - as ch_individuals, but grouped by family:
         [
             [
                 indv1,
@@ -76,19 +93,16 @@ workflow read_inputs {
                     mother_id,
                     sex,
                     affected,
-                    r1_fastqs,
-                    r2_fastqs
+                    [r1_fastqs],
+                    [r2_fastqs]
                 ]
             ],
             ...
         ]
         */
-        ch_individuals_by_family = ch_ped_file_info
-            .join(ch_samplesheet_info)
-            .map({[it[1], it]})
+        ch_individuals_by_family = ch_individuals.map({[it[1], it]})
     
     emit:
-        ch_samplesheet_info
-        ch_ped_file_info
+        ch_individuals
         ch_individuals_by_family
 }
diff --git a/pipeline/main.nf b/pipeline/main.nf
index cae26d8..c7df05e 100644
--- a/pipeline/main.nf
+++ b/pipeline/main.nf
@@ -1,20 +1,20 @@
 nextflow.enable.dsl = 2
 
-include {read_inputs} from './inputs.nf'
-include {validation} from './validation.nf'
+include {read_inputs} from './pipeline/inputs.nf'
+include {validation} from './pipeline/validation.nf'
 
 params.bcbio = null
 params.bcbio_template = null
 params.output_dir = null
+params.pipeline_project_id = null
+params.pipeline_project_version = null
 params.target_bed = null
 
 process merge_fastqs {
     label 'medium'
 
-    publishDir "${params.output_dir}/individuals/$indv_id/merged_fastqs", mode: 'copy'
-
     input:
-    tuple(val(indv_id), path(r1), path(r2))
+    tuple(val(indv_id), val(family_id), val(father), val(mother), val(sex), val(affected), path(r1), path(r2))
 
     output:
     tuple(
@@ -34,13 +34,11 @@ process merge_fastqs {
 process write_bcbio_csv {
     label 'small'
 
-    publishDir "${params.output_dir}/families/$family_id", mode: 'copy'
-
     input:
     tuple(val(family_id), val(individual_info))
 
     output:
-    path("${family_id}.csv")
+    tuple(val(family_id), path("${family_id}.csv"))
 
     script:
     """
@@ -60,26 +58,135 @@ process write_bcbio_csv {
 process bcbio {
     label 'large'
 
-    publishDir "${params.output_dir}/families/$family_id", mode: 'copy'
-
     input:
-    tuple(val(family_id), val(individuals))
-    path(family_csv)
+    tuple(val(family_id), val(individuals), path(family_csv))
+
+    output:
+    tuple(val(family_id), val(individuals), path("${family_id}-merged"))
 
     script:
     """
     ${params.bcbio}/anaconda/bin/bcbio_prepare_samples.py --out . --csv $family_csv &&
-
     ${params.bcbio}/anaconda/bin/bcbio_nextgen.py -w template ${params.bcbio_template} ${family_csv.getBaseName()}-merged.csv ${individuals.collect({"${it}.fastq.gz"}).join(' ')} &&
 
     cd ${family_id}-merged &&
-    export PATH=$PATH:${params.bcbio}/tools/bin &&
     ${params.bcbio}/anaconda/bin/bcbio_nextgen.py config/${family_id}-merged.yaml -n 16 -t local
+
+    """
+}
+
+
+process bcbio_family_outputs {
+    label 'small'
+
+    input:
+    tuple(val(family_id), val(individuals), path(bcbio_output_dir))
+
+    script:
+    """
+    merged_bcbio_family_output="\$(ls -d $bcbio_output_dir/results/????-??-??_* | head -n 1)" &&
+    bcbio_family_output="\$(echo \$merged_bcbio_family_output | sed 's/-merged\\/*\$//g')" &&
+    run_date="\$(basename \$bcbio_family_output | sed -E 's/^([0-9]{4}-[0-9]{2}-[0-9]{2})_.+\$/\\1/g')" &&
+    broken_family_id="\$(echo ${family_id} | sed 's/_//g')" &&
+
+    if [ \$merged_bcbio_family_output != \$bcbio_family_output ] && [ -d \$merged_bcbio_family_output ]
+    then
+        mv \$merged_bcbio_family_output \$bcbio_family_output
+    fi
+
+    for suffix in gatk-haplotype-annotated.vcf.gz{,.tbi}
+    do
+        src=\$bcbio_family_output/\${broken_family_id}-\${suffix}
+        if [ -f \$src ]
+        then
+            mv \$src \$bcbio_family_output/${family_id}-\${suffix}
+        fi
+    done &&
+
+    for sample in ${individuals.join(' ')}
+    do
+        if [ -d "$bcbio_output_dir/results/\$sample" ]
+        then
+            mv "$bcbio_output_dir/results/\$sample" "\$bcbio_family_output"
+        fi
+    done &&
+
+    cd \$bcbio_family_output &&
+
+    samtools=${params.bcbio}/anaconda/bin/samtools &&
+    for bam in */*.bam
+    do
+        cram=\${bam%.bam}.cram &&
+        \$samtools view -@ ${task.cpus} -T ${params.reference_genome} -C -o \$cram \$bam &&
+        rm \$bam &&
+        \$samtools index \$cram &&
+
+        \$samtools flagstat \$bam > \$bam.flagstat.txt &&
+        \$samtools flagstat \$cram > \$cram.flagstat.txt &&
+        diff \$bam.flagstat.txt \$cram.flagstat.txt
+
+        if [ \$? -ne 0 ]
+        then
+            echo "Unexpected flagstat results in \$cram.flagstat.txt"
+            exit 1
+        fi
+    done &&
+
+    for f in \$(find . -type f | grep -v '\\.bam')
+    do
+        md5sum \$f >> md5sum.txt
+    done
     """
+}
+
+
+process collate_pipeline_outputs {
+    label 'small'
+
+    publishDir "${params.output_dir}/${params.pipeline_project_id}/families/$family_id", mode: 'copy', pattern: "${bcbio_output_dir}"
 
+    input:
+    val(family_ids)
+    val(bcbio_family_output_dirs)
+
+    script:
+    """
+    mkdir -p outputs/{config,families,params,prioritization,qc} &&
+    
+    for d in ${bcbio_family_output_dirs.join(' ')}
+    do
+        ln -s \$d outputs/families/\$(basename \$d)
+    done &&
+
+    cd outputs/families &&
+    multiqc \
+        --title "Trio whole exome QC report: ${params.pipeline_project_id}_${params.pipeline_project_version}" \
+        --outdir ../qc \
+        --filename ${params.pipeline_project_id}_${params.pipeline_project_version}_qc_report.html \
+        . &&
+    
+    peddy_output=../qc/${params.pipeline_project_id}_${params.pipeline_project_version}.ped_check.txt
+    # trio_whole_exome_parse_peddy_ped_csv.pl \
+    #     --output \$peddy_output \
+    #     --project ${params.pipeline_project_id} \
+    #     --batch ${family_ids[0].split('_')[0]} \
+    #     --version ${params.pipeline_project_version} \
+    #     --ped ${params.ped_file} \
+    #     --families . &&
+
+    # no && here - exit status checked below
+    # grep -v 'False\$' \$peddy_output
+    # if [ \$? -ne 0 ]
+    # then
+    #     echo "Found Peddy mismatches in \$peddy_output"
+    #     exit 1
+    # fi
+
+    """
 }
 
-workflow prepare_bcbio_config {
+
+workflow run_bcbio {
     /*
     - For each individual, merge all R1 and R2 fastqs
     - For each family:
@@ -89,21 +196,19 @@ workflow prepare_bcbio_config {
     */
 
     take:
-        ch_samplesheet_info
-        ch_individuals_by_family
+        ch_individuals
     
     main:
-        ch_merged_fastqs = merge_fastqs(ch_samplesheet_info)
-        ch_joined_family_info = ch_individuals_by_family.map({ k, v -> v })
-        .join(ch_merged_fastqs)
+        ch_merged_fastqs = merge_fastqs(ch_individuals)
+        ch_joined_indv_info = ch_individuals.join(ch_merged_fastqs)
 
-        ch_joined_family_info.map(
+        ch_joined_indv_info.map(
             { sample_id, family_id, father, mother, sex, phenotype, r1s, r2s, merged_r1, merged_r2 ->
                 [family_id, sample_id, father, mother, sex, phenotype, merged_r1]
         })
         .tap {ch_read1_meta}  // I hate this syntax so much
 
-        ch_joined_family_info.map(
+        ch_joined_indv_info.map(
             { sample_id, family_id, father, mother, sex, phenotype, r1s, r2s, merged_r1, merged_r2 ->
                 [family_id, sample_id, father, mother, sex, phenotype, merged_r2]
         })
@@ -114,22 +219,28 @@ workflow prepare_bcbio_config {
                 [family_id, [merged_fastq, sample_id, family_id, sex, phenotype, params.target_bed]]
             }
         ).groupTuple()
-        ch_bcbio_csv = write_bcbio_csv(ch_metadata)
+        ch_bcbio_csvs = write_bcbio_csv(ch_metadata)
 
-        ch_individuals = ch_joined_family_info.map(
+        ch_bcbio_inputs = ch_joined_indv_info.map(
             { sample_id, family_id, father, mother, sex, phenotype, r1s, r2s, merged_r1, merged_r2 ->
                 [family_id, sample_id]
-        }).groupTuple()
-        bcbio(ch_individuals, ch_bcbio_csv)
+        }).groupTuple().join(ch_bcbio_csvs)
+
+        ch_bcbio_outputs = bcbio(ch_bcbio_inputs)
+        bcbio_family_outputs(ch_bcbio_outputs)
+
+        collate_pipeline_outputs(
+            ch_bcbio_outputs.map({it[0]}).collect(),
+            ch_bcbio_outputs.map({it[2]}).collect()
+        )
 }
 
 
 workflow {
     read_inputs()
-    ch_samplesheet_info = read_inputs.out[0]
-    ch_ped_file_info = read_inputs.out[1]
-    ch_individuals_by_family = read_inputs.out[2]
+    ch_individual_info = read_inputs.out[0]
 
-    validation(ch_samplesheet_info)
-    prepare_bcbio_config(ch_samplesheet_info, ch_individuals_by_family)
+    validation(ch_individual_info)
+    run_bcbio(ch_individual_info)
 }
+
diff --git a/pipeline/validation.nf b/pipeline/validation.nf
index c02547a..18058f8 100644
--- a/pipeline/validation.nf
+++ b/pipeline/validation.nf
@@ -1,7 +1,5 @@
 nextflow.enable.dsl = 2
 
-include {read_inputs} from './inputs.nf'
-
 process check_md5s {
     label 'small'
 
@@ -51,20 +49,17 @@ workflow validation {
     */
 
     take:
-        ch_samplesheet_info
-    
+        ch_indv_info
+
     main:
-        ch_fastqs = ch_samplesheet_info
-        .map(
-            { indv, r1, r2 ->
+        ch_fastqs = ch_indv_info.map(
+            { indv, family, father, mother, sex, affected, r1, r2 ->
                 [r1, r2]
             }
-        ).flatten()
+        )
+        .flatten()
         .map({file(it)})
-        
-        ch_md5_files = ch_fastqs.map(
-            { fastq -> fastq.getParent().getParent() + '/md5sums.txt' }
-        ).unique()
 
         check_md5s(ch_fastqs)
 }
+
-- 
GitLab