Skip to content
Snippets Groups Projects
main.nf 7.07 KiB
Newer Older
nextflow.enable.dsl = 2

include {read_inputs} from './pipeline/inputs.nf'
include {validation} from './pipeline/validation.nf'
params.pipeline_project_id = null
params.pipeline_project_version = null

process merge_fastqs {
    tuple(val(indv_id), val(family_id), val(father), val(mother), val(sex), val(affected), path(r1), path(r2))

    output:
    tuple(
        val(indv_id),
        path("${indv_id}_merged_r1.fastq.gz"),
        path("${indv_id}_merged_r2.fastq.gz")
    )

    script:
    // todo: pigz if gzip becomes a bottleneck
    """
    zcat ${r1.join(' ')} | gzip -c > ${indv_id}_merged_r1.fastq.gz &
    zcat ${r2.join(' ')} | gzip -c > ${indv_id}_merged_r2.fastq.gz
    """
}

process write_bcbio_csv {
    input:
    tuple(val(family_id), val(individual_info))

    output:
    tuple(val(family_id), path("${family_id}.csv"))

    script:
    """
    #!/usr/bin/env python

    individual_info = '$individual_info'
    lines = individual_info.lstrip('[').rstrip(']').split('], [')
    
    with open('${family_id}.csv', 'w') as f:
        f.write('samplename,description,batch,sex,phenotype,variant_regions\\n')
        for l in lines:
            f.write(l.replace(', ', ',') + '\\n')
    tuple(val(family_id), val(individuals), path(family_csv))

    output:
    tuple(val(family_id), val(individuals), path("${family_id}-merged"))
    ${params.bcbio}/anaconda/bin/bcbio_prepare_samples.py --out . --csv $family_csv &&
    ${params.bcbio}/anaconda/bin/bcbio_nextgen.py -w template ${params.bcbio_template} ${family_csv.getBaseName()}-merged.csv ${individuals.collect({"${it}.fastq.gz"}).join(' ')} &&
    cd ${family_id}-merged &&
    ${params.bcbio}/anaconda/bin/bcbio_nextgen.py config/${family_id}-merged.yaml -n 16 -t local

    """
}


process bcbio_family_outputs {
    label 'small'

    input:
    tuple(val(family_id), val(individuals), path(bcbio_output_dir))

    script:
    """
    merged_bcbio_family_output="\$(ls -d $bcbio_output_dir/results/????-??-??_* | head -n 1)" &&
    bcbio_family_output="\$(echo \$merged_bcbio_family_output | sed 's/-merged\\/*\$//g')" &&
    run_date="\$(basename \$bcbio_family_output | sed -E 's/^([0-9]{4}-[0-9]{2}-[0-9]{2})_.+\$/\\1/g')" &&
    broken_family_id="\$(echo ${family_id} | sed 's/_//g')" &&

    if [ \$merged_bcbio_family_output != \$bcbio_family_output ] && [ -d \$merged_bcbio_family_output ]
    then
        mv \$merged_bcbio_family_output \$bcbio_family_output
    fi

    for suffix in gatk-haplotype-annotated.vcf.gz{,.tbi}
    do
        src=\$bcbio_family_output/\${broken_family_id}-\${suffix}
        if [ -f \$src ]
        then
            mv \$src \$bcbio_family_output/${family_id}-\${suffix}
        fi
    done &&

    for sample in ${individuals.join(' ')}
    do
        if [ -d "$bcbio_output_dir/results/\$sample" ]
        then
            mv "$bcbio_output_dir/results/\$sample" "\$bcbio_family_output"
        fi
    done &&

    cd \$bcbio_family_output &&

    samtools=${params.bcbio}/anaconda/bin/samtools &&
    for bam in */*.bam
    do
        cram=\${bam%.bam}.cram &&
        \$samtools view -@ ${task.cpus} -T ${params.reference_genome} -C -o \$cram \$bam &&
        rm \$bam &&
        \$samtools index \$cram &&

        \$samtools flagstat \$bam > \$bam.flagstat.txt &&
        \$samtools flagstat \$cram > \$cram.flagstat.txt &&
        diff \$bam.flagstat.txt \$cram.flagstat.txt

        if [ \$? -ne 0 ]
        then
            echo "Unexpected flagstat results in \$cram.flagstat.txt"
            exit 1
        fi
    done &&

    for f in \$(find . -type f | grep -v '\\.bam')
    do
        md5sum \$f >> md5sum.txt
    done
}


process collate_pipeline_outputs {
    label 'small'

    publishDir "${params.output_dir}/${params.pipeline_project_id}/families/$family_id", mode: 'copy', pattern: "${bcbio_output_dir}"
    input:
    val(family_ids)
    val(bcbio_family_output_dirs)

    script:
    """
    mkdir -p outputs/{config,families,params,prioritization,qc} &&
    
    for d in ${bcbio_family_output_dirs.join(' ')}
    do
        ln -s \$d outputs/families/\$(basename \$d)
    done &&

    cd outputs/families &&
    multiqc \
        --title "Trio whole exome QC report: ${params.pipeline_project_id}_${params.pipeline_project_version}" \
        --outdir ../qc \
        --filename ${params.pipeline_project_id}_${params.pipeline_project_version}_qc_report.html \
        . &&
    
    peddy_output=../qc/${params.pipeline_project_id}_${params.pipeline_project_version}.ped_check.txt
    # trio_whole_exome_parse_peddy_ped_csv.pl \
    #     --output \$peddy_output \
    #     --project ${params.pipeline_project_id} \
    #     --batch ${family_ids[0].split('_')[0]} \
    #     --version ${params.pipeline_project_version} \
    #     --ped ${params.ped_file} \
    #     --families . &&

    # no && here - exit status checked below
    # grep -v 'False\$' \$peddy_output
    # if [ \$? -ne 0 ]
    # then
    #     echo "Found Peddy mismatches in \$peddy_output"
    #     exit 1
    # fi

    """

workflow run_bcbio {
    /*
    - For each individual, merge all R1 and R2 fastqs
    - For each family:
      - Read the relevant information from the samplesheet, ped files and merged fastqs
      - Write a BCBio config file
      - Run BCBio on it
    */

    take:
        ch_individuals
        ch_merged_fastqs = merge_fastqs(ch_individuals)
        ch_joined_indv_info = ch_individuals.join(ch_merged_fastqs)
        ch_joined_indv_info.map(
            { sample_id, family_id, father, mother, sex, phenotype, r1s, r2s, merged_r1, merged_r2 ->
                [family_id, sample_id, father, mother, sex, phenotype, merged_r1]
        })
        .tap {ch_read1_meta}  // I hate this syntax so much
        ch_joined_indv_info.map(
            { sample_id, family_id, father, mother, sex, phenotype, r1s, r2s, merged_r1, merged_r2 ->
                [family_id, sample_id, father, mother, sex, phenotype, merged_r2]
        })
        .tap {ch_read2_meta}

        ch_metadata = ch_read1_meta.mix(ch_read2_meta).map(
            { family_id, sample_id, father, mother, sex, phenotype, merged_fastq ->
                [family_id, [merged_fastq, sample_id, family_id, sex, phenotype, params.target_bed]]
        ch_bcbio_csvs = write_bcbio_csv(ch_metadata)
        ch_bcbio_inputs = ch_joined_indv_info.map(
            { sample_id, family_id, father, mother, sex, phenotype, r1s, r2s, merged_r1, merged_r2 ->
                [family_id, sample_id]
        }).groupTuple().join(ch_bcbio_csvs)

        ch_bcbio_outputs = bcbio(ch_bcbio_inputs)
        bcbio_family_outputs(ch_bcbio_outputs)

        collate_pipeline_outputs(
            ch_bcbio_outputs.map({it[0]}).collect(),
            ch_bcbio_outputs.map({it[2]}).collect()
        )
    ch_individual_info = read_inputs.out[0]
    validation(ch_individual_info)
    run_bcbio(ch_individual_info)