nextflow.enable.dsl = 2 include {read_inputs} from './inputs.nf' include {validation} from './validation.nf' params.bcbio = null params.bcbio_template = null params.output_dir = null params.pipeline_project_id = null params.pipeline_project_version = null params.target_bed = null params.parse_peddy_output = null process merge_fastqs { label 'medium' input: tuple(val(indv_id), path(r1), path(r2)) output: tuple( val(indv_id), path("${indv_id}_merged_r1.fastq.gz"), path("${indv_id}_merged_r2.fastq.gz") ) script: // todo: pigz if gzip becomes a bottleneck """ zcat ${r1.join(' ')} | gzip -c > ${indv_id}_merged_r1.fastq.gz & zcat ${r2.join(' ')} | gzip -c > ${indv_id}_merged_r2.fastq.gz """ } process write_bcbio_csv { label 'small' input: tuple(val(family_id), val(individual_info)) output: tuple(val(family_id), path("${family_id}.csv")) script: """ #!/usr/bin/env python individual_info = '$individual_info' lines = individual_info.lstrip('[').rstrip(']').split('], [') with open('${family_id}.csv', 'w') as f: f.write('samplename,description,batch,sex,phenotype,variant_regions\\n') for l in lines: f.write(l.replace(', ', ',') + '\\n') """ } process bcbio_family_processing { label 'large' input: tuple(val(family_id), val(individuals), path(family_csv)) output: tuple(val(family_id), val(individuals), path("${family_id}-merged")) script: """ ${params.bcbio}/anaconda/bin/bcbio_prepare_samples.py --out . --csv $family_csv && ${params.bcbio}/anaconda/bin/bcbio_nextgen.py -w template ${params.bcbio_template} ${family_csv.getBaseName()}-merged.csv ${individuals.collect({"${it}.fastq.gz"}).join(' ')} && cd ${family_id}-merged && ${params.bcbio}/anaconda/bin/bcbio_nextgen.py config/${family_id}-merged.yaml -n 16 -t local """ } process format_bcbio_individual_outputs { input: tuple(val(family_id), val(individuals), path(bcbio_output_dir)) output: tuple(val(family_id), path('individual_outputs')) script: """ samtools=${params.bcbio}/anaconda/bin/samtools && mkdir individual_outputs for i in ${individuals.join(' ')} do indv_input=\$PWD/${bcbio_output_dir}/results/\$i indv_output=individual_outputs/\$i && mkdir -p \$indv_output && ln -s \$indv_input/\${i}-callable.bed \$indv_output/\${i}-callable.bed && ln -s \$indv_input/qc \$indv_output/qc && bam=\$indv_input/\$i-ready.bam cram="\$indv_output/\$i-ready.cram" && \$samtools view -@ ${task.cpus} -T ${params.reference_genome} -C -o \$cram \$bam && \$samtools index \$cram && bam_flagstat=./\$i-ready.bam.flagstat.txt && cram_flagstat=\$cram.flagstat.txt && \$samtools flagstat \$bam > \$bam_flagstat && \$samtools flagstat \$cram > \$cram_flagstat && diff \$bam_flagstat \$cram_flagstat if [ \$? -ne 0 ] then echo "Unexpected flagstat results in \$cram_flagstat" exit 1 fi done """ } process format_bcbio_family_outputs { label 'small' input: tuple(val(family_id), val(individuals), path(bcbio_output_dir), path(formatted_individual_outputs)) output: tuple(val(family_id), path("*_${family_id}")) script: """ merged_bcbio_family_output="\$(ls -d \$(readlink -f $bcbio_output_dir)/results/????-??-??_* | head -n 1)" && bcbio_family_output="\$(echo \$merged_bcbio_family_output | sed 's/-merged\\/*\$//g')" && run_date="\$(basename \$bcbio_family_output | sed -E 's/^([0-9]{4}-[0-9]{2}-[0-9]{2})_.+\$/\\1/g')" && broken_family_id="\$(echo ${family_id} | sed 's/_//g')" && bcbio_family_output="\$(basename \$merged_bcbio_family_output | sed 's/-merged\\/*\$//g')" && mkdir \$bcbio_family_output && for suffix in gatk-haplotype-annotated.vcf.gz{,.tbi} do src=\$merged_bcbio_family_output/\${broken_family_id}-\${suffix} if [ -f \$src ] then ln -s \$src \$bcbio_family_output/${family_id}-\${suffix} fi done && for f in \$merged_bcbio_family_output/*{.log,.csv,.yaml,multiqc} do ln -s \$f \$bcbio_family_output/ done && for i in ${individuals.join(' ')} do ln -s \$(readlink -f $formatted_individual_outputs)/\$i \$bcbio_family_output/\$i done && cd \$bcbio_family_output && for f in \$(find . -type f | grep -v '\\.bam') do md5sum \$f >> md5sum.txt done """ } process collate_pipeline_outputs { label 'small' publishDir "${params.output_dir}", mode: 'move', pattern: "${params.pipeline_project_id}_${params.pipeline_project_version}" input: val(bcbio_family_output_dirs) output: path("${params.pipeline_project_id}_${params.pipeline_project_version}") script: """ outputs=${params.pipeline_project_id}_${params.pipeline_project_version} mkdir \$outputs && mkdir \$outputs/{config,families,params,prioritization,qc} && for d in ${bcbio_family_output_dirs.join(' ')} do cp -rL \$d \$outputs/families/\$(basename \$d) done && cd \$outputs/families && ${params.bcbio}/anaconda/bin/multiqc \ --title "Trio whole exome QC report: ${params.pipeline_project_id}_${params.pipeline_project_version}" \ --outdir ../qc \ --filename ${params.pipeline_project_id}_${params.pipeline_project_version}_qc_report.html \ . && peddy_output=../qc/${params.pipeline_project_id}_${params.pipeline_project_version}.ped_check.txt && perl ${params.parse_peddy_output} \ --output \$peddy_output \ --project ${params.pipeline_project_id} \ --batch ${bcbio_family_output_dirs[0].getName().split('_')[1]} \ --version ${params.pipeline_project_version} \ --ped ${params.ped_file} \ --families . && # no && here - exit status checked below grep -v 'False\$' \$peddy_output if [ \$? -ne 0 ] then echo "Found Peddy mismatches in \$peddy_output" exit 1 fi """ } workflow process_families { /* - For each individual in the family, merge all R1 and R2 fastqs - For the family: - Read the relevant information from the samplesheet, ped files and merged fastqs - Write a BCBio config file - Run BCBio on it - Move files around into the right places - Run CRAM compression, MultiQC and MD5 checks */ take: ch_individuals main: ch_merged_fastqs = merge_fastqs( ch_individuals.map( { indv, family, father, mother, sex, affected, r1, r2 -> [ indv, r1, r2 ] }) ) ch_joined_indv_info = ch_individuals.join(ch_merged_fastqs) ch_joined_indv_info.map( { sample_id, family_id, father, mother, sex, phenotype, r1s, r2s, merged_r1, merged_r2 -> [family_id, sample_id, father, mother, sex, phenotype, merged_r1] }) .tap {ch_read1_meta} // I hate this syntax so much ch_joined_indv_info.map( { sample_id, family_id, father, mother, sex, phenotype, r1s, r2s, merged_r1, merged_r2 -> [family_id, sample_id, father, mother, sex, phenotype, merged_r2] }) .tap {ch_read2_meta} ch_bcbio_csvs = write_bcbio_csv( ch_read1_meta.mix(ch_read2_meta).map( { family_id, sample_id, father, mother, sex, phenotype, merged_fastq -> [family_id, [merged_fastq, sample_id, family_id, sex, phenotype, params.target_bed]] } ).groupTuple() ) ch_bcbio_inputs = ch_joined_indv_info.map( { sample_id, family_id, father, mother, sex, phenotype, r1s, r2s, merged_r1, merged_r2 -> [family_id, sample_id] }).groupTuple().join(ch_bcbio_csvs) ch_bcbio_family_outputs = bcbio_family_processing(ch_bcbio_inputs) ch_individual_folders = format_bcbio_individual_outputs(ch_bcbio_family_outputs) ch_formatted_bcbio_outputs = format_bcbio_family_outputs( ch_bcbio_family_outputs.join(ch_individual_folders) ) collate_pipeline_outputs(ch_formatted_bcbio_outputs.map({it[1]}).collect()) } workflow { read_inputs() ch_individuals = read_inputs.out[0] ch_individuals_by_family = read_inputs.out[1] validation(ch_individuals) process_families(ch_individuals) }