Skip to content
Snippets Groups Projects
var_calling.nf 13.5 KiB
Newer Older
nextflow.enable.dsl = 2

include {read_inputs} from './inputs.nf'
include {validation} from './validation.nf'


process merge_fastqs {
    tuple(val(indv_id), path(r1), path(r2))

    output:
    tuple(
        val(indv_id),
        path("${indv_id}_merged_r1.fastq.gz"),
        path("${indv_id}_merged_r2.fastq.gz")
    )

    script:
    // todo: pigz if gzip becomes a bottleneck
    """
    zcat ${r1.join(' ')} | gzip -c > ${indv_id}_merged_r1.fastq.gz &
    zcat ${r2.join(' ')} | gzip -c > ${indv_id}_merged_r2.fastq.gz
    """
}

process write_bcbio_csv {
    input:
    tuple(val(family_id), val(individual_info))
    tuple(val(family_id), path("${family_id}.csv"))
    #!/usr/bin/env python3
    target_bed = os.path.realpath('${target_bed}')
    individual_info = '$individual_info'
    lines = individual_info.lstrip('[').rstrip(']').split('], [')
    with open('${family_id}.csv', 'w') as f:
        f.write('samplename,description,batch,sex,phenotype,variant_regions\\n')
        for l in lines:
            f.write(l.replace(', ', ',') + ',' + target_bed + '\\n')
process bcbio_family_processing {
    tuple(val(family_id), val(individuals), path(family_csv))

    output:
    tuple(val(family_id), val(individuals), path("${family_id}-merged"))
    ${bcbio}/anaconda/bin/bcbio_prepare_samples.py --out . --csv $family_csv &&
    ${bcbio}/anaconda/bin/bcbio_nextgen.py -w template ${bcbio_template} ${family_csv.getBaseName()}-merged.csv ${individuals.collect({"${it}.fastq.gz"}).join(' ')} &&
    ../${bcbio}/anaconda/bin/bcbio_nextgen.py config/${family_id}-merged.yaml -n 16 -t local

    stub:
    """
    output_dir=${family_id}-merged/results
    family_dir="${family_id}-merged/results/\$(date '+%Y-%m-%d')_${family_id}-merged"
    mkdir -p \$family_dir
    mkdir ${family_id}-merged/config
    touch ${family_id}-merged/config/${family_id}-merged{.csv,.yaml,-template.yaml}
    cd \$family_dir
    touch "\$(echo ${family_id} | sed 's/_//g')-gatk-haplotype-annotated.vcf.gz{,.tbi}" bcbio-nextgen{,-commands}.log data_versions.csv
    touch project-summary.yaml metadata.csv programs.txt
    mkdir multiqc
    touch list_files_final.txt multiqc_config.yaml multiqc_report.html
    mkdir multiqc_data report
    cd ..

    for i in ${individuals.collect().join(' ')}
    do
        mkdir -p \$i/qc
        cd \$i
        touch \$i-{callable.bed,ready.bam,ready.bam.bai}
        cd qc
        mkdir contamination coverage fastqc peddy samtools variants
        touch contamination/\$i-verifybamid.{out,selfSM}
        touch coverage/cleaned-Twist_Exome_RefSeq_targets_hg38.plus15bp-merged-padded.bed
        touch fastqc/{\$i.zip,fastqc_data.txt,fastqc_report.html}
        touch peddy/{\$i.ped_check.csv,\$i.peddy.ped,\$i.sex_check.csv}
        touch samtools/{\$i-idxstats.txt,\$i.txt}
        touch variants/\${i}_bcftools_stats.txt
        cd ../..
    done
    """
}


process format_bcbio_individual_outputs {
    input:
    tuple(val(family_id), val(individuals), path(bcbio_output_dir))

    output:
    tuple(val(family_id), path('individual_outputs'))

    script:
    """
    samtools=${bcbio}/anaconda/bin/samtools &&
    mkdir individual_outputs
    for i in ${individuals.join(' ')}
    do
        indv_input=\$PWD/${bcbio_output_dir}/results/\$i
        indv_output=individual_outputs/\$i &&
        mkdir -p \$indv_output &&

        ln -s \$indv_input/\${i}-callable.bed \$indv_output/\${i}-callable.bed &&
        ln -s \$indv_input/qc \$indv_output/qc &&
        # todo: make cram compression its own process
        bam=\$indv_input/\$i-ready.bam
        cram="\$indv_output/\$i-ready.cram" &&
        \$samtools view -@ ${task.cpus} -T ${reference_genome} -C -o \$cram \$bam &&
        \$samtools index \$cram &&
        bam_flagstat=./\$i-ready.bam.flagstat.txt &&
        cram_flagstat=\$cram.flagstat.txt &&
        \$samtools flagstat \$bam > \$bam_flagstat &&
        \$samtools flagstat \$cram > \$cram_flagstat &&
        diff \$bam_flagstat \$cram_flagstat

        if [ \$? -ne 0 ]
        then
            echo "Unexpected flagstat results in \$cram_flagstat"
            exit 1
        fi
    done

    stub:
    """
    mkdir individual_outputs
    for i in ${individuals.join(' ')}
    do
        indv_input=\$PWD/${bcbio_output_dir}/results/\$i
        indv_output=individual_outputs/\$i &&
        mkdir -p \$indv_output &&

        ln -s \$indv_input/\${i}-callable.bed \$indv_output/\${i}-callable.bed &&
        ln -s \$indv_input/qc \$indv_output/qc &&

        bam=\$indv_input/\$i-ready.bam
        cram="\$indv_output/\$i-ready.cram" &&
        cp \$bam \$cram &&
        touch \$cram.crai &&
        bam_flagstat=./\$i-ready.bam.flagstat.txt &&
        cram_flagstat=\$cram.flagstat.txt &&
        touch \$bam_flagstat &&
        touch \$cram_flagstat
    done
    """
process format_bcbio_family_outputs {
    label 'small'

    input:
    tuple(val(family_id), val(individuals), path(bcbio_output_dir), path(formatted_individual_outputs))

    output:
mwham's avatar
mwham committed
    // we don't use bcbio_output_dir here, but we need it later
    tuple(val(family_id), path("*_${family_id}"), path(bcbio_output_dir))
    merged_bcbio_family_output="\$(ls -d \$(readlink -f $bcbio_output_dir)/results/????-??-??_* | head -n 1)" &&
    bcbio_family_output="\$(echo \$merged_bcbio_family_output | sed 's/-merged\\/*\$//g')" &&
    run_date="\$(basename \$bcbio_family_output | sed -E 's/^([0-9]{4}-[0-9]{2}-[0-9]{2})_.+\$/\\1/g')" &&
    broken_family_id="\$(echo ${family_id} | sed 's/_//g')" &&

    bcbio_family_output="\$(basename \$merged_bcbio_family_output | sed 's/-merged\\/*\$//g')" &&
    mkdir \$bcbio_family_output &&

    for suffix in gatk-haplotype-annotated.vcf.gz{,.tbi}
    do
        src=\$merged_bcbio_family_output/\${broken_family_id}-\${suffix}
        if [ -f \$src ]
        then
            ln -s \$src \$bcbio_family_output/${family_id}-\${suffix}
    for f in \$merged_bcbio_family_output/*{.log,.csv,.yaml,multiqc}
        ln -s \$f \$bcbio_family_output/
    for i in ${individuals.join(' ')}
        ln -s \$(readlink -f $formatted_individual_outputs)/\$i \$bcbio_family_output/\$i
    for f in \$(find . -type f | grep -v '\\.bam')
    do
        md5sum \$f >> md5sum.txt
    done
}


process collate_pipeline_outputs {
    label 'small'

    publishDir "${params.output_dir}", mode: 'move', pattern: "${params.pipeline_project_id}_${params.pipeline_project_version}"
    val(bcbio_family_output_dirs)
mwham's avatar
mwham committed
    val(raw_bcbio_output_dirs)
    path(ped_file)
    path(samplesheet)
    path(bcbio)
    output:
    path("${params.pipeline_project_id}_${params.pipeline_project_version}")

    outputs=${params.pipeline_project_id}_${params.pipeline_project_version}
    mkdir \$outputs &&
    mkdir \$outputs/{config,families,params,prioritization,qc} &&
    for d in ${bcbio_family_output_dirs.join(' ')}
    do
        cp -rL \$d \$outputs/families/\$(basename \$d)
    for f in ${family_ids.join(' ')}
    do
        grep \$f ${ped_file} > \$outputs/params/\$f.ped
    done &&


    # todo: make multiqc its own process
    ../../${bcbio}/anaconda/bin/multiqc \
        --title "Trio whole exome QC report: ${params.pipeline_project_id}_${params.pipeline_project_version}" \
        --outdir ../qc \
        --filename ${params.pipeline_project_id}_${params.pipeline_project_version}_qc_report.html \
        . &&
    peddy_validation_output=../qc/${params.pipeline_project_id}_${params.pipeline_project_version}.ped_check.txt &&
    peddy_validation.pl \
user name's avatar
user name committed
        --output \$peddy_validation_output \
        --project ${params.pipeline_project_id} \
        --version ${params.pipeline_project_version} \

    # no && here - exit status checked below
user name's avatar
user name committed
#    grep -v 'False\$' \$peddy_validation_output
#    if [ \$? -ne 0 ]
#    then
#        echo "Found Peddy mismatches in \$peddy_validation_output"
#        exit 1
#    fi &&
mwham's avatar
mwham committed

    cd ../.. &&

    for d in ${raw_bcbio_output_dirs.join(' ')}
    do
        family_id_merged=\$(basename \$d)
        family_id=\$(echo \$family_id_merged | sed 's/-merged//') &&
        dest_basename=${params.pipeline_project_id}_${params.pipeline_project_version}_\$family_id &&
        cp -L \$d/config/\$family_id_merged.csv \$outputs/params/\$dest_basename.csv &&
        cp -L \$d/config/\$family_id_merged.yaml \$outputs/config/\$dest_basename.yaml &&
        cp -L ${ped_file} \$outputs/params/ &&
        cp -L ${samplesheet} \$outputs/params/
mwham's avatar
mwham committed
    done

    stub:
    """
    outputs=${params.pipeline_project_id}_${params.pipeline_project_version}
    mkdir \$outputs &&
    mkdir \$outputs/{config,families,params,prioritization,qc} &&

    for d in ${bcbio_family_output_dirs.join(' ')}
    do
        cp -rL \$d \$outputs/families/\$(basename \$d)
    done &&

    for f in ${family_ids.join(' ')}
    do
        grep \$f ${ped_file} > \$outputs/params/\$f.ped
    done &&

    cd \$outputs/families &&

    # todo: make multiqc its own process
    echo "Trio whole exome QC report: ${params.pipeline_project_id}_${params.pipeline_project_version}" > ../qc/${params.pipeline_project_id}_${params.pipeline_project_version}_qc_report.html &&

    peddy_validation_output=../qc/${params.pipeline_project_id}_${params.pipeline_project_version}.ped_check.txt &&
    peddy_validation.pl \
        --output \$peddy_validation_output \
        --project ${params.pipeline_project_id} \
        --version ${params.pipeline_project_version} \
        --ped ../../${ped_file} \
        --families . &&

    cd ../.. &&

    for d in ${raw_bcbio_output_dirs.join(' ')}
    do
        family_id_merged=\$(basename \$d)
        family_id=\$(echo \$family_id_merged | sed 's/-merged//') &&
        dest_basename=${params.pipeline_project_id}_${params.pipeline_project_version}_\$family_id &&
        cp -L \$d/config/\$family_id_merged.csv \$outputs/params/\$dest_basename.csv &&
        cp -L \$d/config/\$family_id_merged.yaml \$outputs/config/\$dest_basename.yaml &&
        cp -L ${ped_file} \$outputs/params/ &&
        cp -L ${samplesheet} \$outputs/params/
    done
    """
    - For each individual in the family, merge all R1 and R2 fastqs
    - For the family:
      - Read the relevant information from the samplesheet, ped files and merged fastqs
      - Write a BCBio config file
      - Run BCBio on it
      - Move files around into the right places
      - Run CRAM compression, MultiQC and MD5 checks
        ch_individuals
        ch_bcbio = file(params.bcbio, checkIfExists: true)
        ch_bcbio_template = file(params.bcbio_template, checkIfExists: true)
        ch_target_bed = file(params.target_bed, checkIfExists: true)
        ch_reference_genome = file(params.reference_genome, checkIfExists: true)

        ch_merged_fastqs = merge_fastqs(
            ch_individuals.map(
                { indv, family, father, mother, sex, affected, r1, r2 ->
                    [ indv, r1, r2 ]
            })
        )
        ch_joined_indv_info = ch_individuals.join(ch_merged_fastqs)
        ch_joined_indv_info.map(
            { sample_id, family_id, father, mother, sex, phenotype, r1s, r2s, merged_r1, merged_r2 ->
                [family_id, sample_id, father, mother, sex, phenotype, merged_r1]
        })
        .tap {ch_read1_meta}  // I hate this syntax so much
        ch_joined_indv_info.map(
            { sample_id, family_id, father, mother, sex, phenotype, r1s, r2s, merged_r1, merged_r2 ->
                [family_id, sample_id, father, mother, sex, phenotype, merged_r2]
        })
        .tap {ch_read2_meta}

        ch_bcbio_csvs = write_bcbio_csv(
            ch_read1_meta.mix(ch_read2_meta).map(
                { family_id, sample_id, father, mother, sex, phenotype, merged_fastq ->
                    [family_id, [merged_fastq, sample_id, family_id, sex, phenotype]]
        ch_bcbio_inputs = ch_joined_indv_info.map(
            { sample_id, family_id, father, mother, sex, phenotype, r1s, r2s, merged_r1, merged_r2 ->
                [family_id, sample_id]
        }).groupTuple().join(ch_bcbio_csvs)

        ch_bcbio_family_outputs = bcbio_family_processing(
            ch_bcbio_inputs,
            ch_bcbio,
            ch_bcbio_template
        )

        ch_individual_folders = format_bcbio_individual_outputs(
            ch_bcbio_family_outputs,
            ch_bcbio,
            ch_reference_genome
        ch_formatted_bcbio_outputs = format_bcbio_family_outputs(
            ch_bcbio_family_outputs.join(ch_individual_folders)
mwham's avatar
mwham committed
        collate_pipeline_outputs(
            ch_formatted_bcbio_outputs.map({it[0]}).collect(),
mwham's avatar
mwham committed
            ch_formatted_bcbio_outputs.map({it[1]}).collect(),
            ch_formatted_bcbio_outputs.map({it[2]}).collect(),
            ch_ped_file,
            ch_samplesheet,
user name's avatar
user name committed
            ch_bcbio
    validation(read_inputs.out.ch_individuals)
    process_families(
        read_inputs.out.ch_individuals,
        read_inputs.out.ch_ped_file,
        read_inputs.out.ch_samplesheet
    )