Newer
Older
include {read_inputs} from './inputs.nf'
include {validation} from './validation.nf'
params.bcbio = null
mwham
committed
params.bcbio_template = null
params.output_dir = null
params.pipeline_project_id = null
params.pipeline_project_version = null
params.target_bed = null
params.parse_peddy_output = null
label 'medium'
tuple(val(indv_id), path(r1), path(r2))
output:
tuple(
val(indv_id),
path("${indv_id}_merged_r1.fastq.gz"),
path("${indv_id}_merged_r2.fastq.gz")
)
script:
// todo: pigz if gzip becomes a bottleneck
"""
zcat ${r1.join(' ')} | gzip -c > ${indv_id}_merged_r1.fastq.gz &
zcat ${r2.join(' ')} | gzip -c > ${indv_id}_merged_r2.fastq.gz
"""
}
process write_bcbio_csv {
label 'small'
input:
tuple(val(family_id), val(individual_info))
output:
tuple(val(family_id), path("${family_id}.csv"))
script:
"""
#!/usr/bin/env python
individual_info = '$individual_info'
lines = individual_info.lstrip('[').rstrip(']').split('], [')
with open('${family_id}.csv', 'w') as f:
f.write('samplename,description,batch,sex,phenotype,variant_regions\\n')
for l in lines:
f.write(l.replace(', ', ',') + '\\n')
mwham
committed
"""
}
process bcbio_family_processing {
label 'large'
mwham
committed
input:
tuple(val(family_id), val(individuals), path(family_csv))
output:
tuple(val(family_id), val(individuals), path("${family_id}-merged"))
mwham
committed
script:
"""
${params.bcbio}/anaconda/bin/bcbio_prepare_samples.py --out . --csv $family_csv &&
${params.bcbio}/anaconda/bin/bcbio_nextgen.py -w template ${params.bcbio_template} ${family_csv.getBaseName()}-merged.csv ${individuals.collect({"${it}.fastq.gz"}).join(' ')} &&
cd ${family_id}-merged &&
${params.bcbio}/anaconda/bin/bcbio_nextgen.py config/${family_id}-merged.yaml -n 16 -t local
"""
}
process format_bcbio_individual_outputs {
input:
tuple(val(family_id), val(individuals), path(bcbio_output_dir))
output:
tuple(val(family_id), path('individual_outputs'))
script:
"""
samtools=${params.bcbio}/anaconda/bin/samtools &&
mkdir individual_outputs
for i in ${individuals.join(' ')}
do
indv_input=\$PWD/${bcbio_output_dir}/results/\$i
indv_output=individual_outputs/\$i &&
mkdir -p \$indv_output &&
ln -s \$indv_input/\${i}-callable.bed \$indv_output/\${i}-callable.bed &&
ln -s \$indv_input/qc \$indv_output/qc &&
bam=\$indv_input/\$i-ready.bam
cram="\$indv_output/\$i-ready.cram" &&
\$samtools view -@ ${task.cpus} -T ${params.reference_genome} -C -o \$cram \$bam &&
\$samtools index \$cram &&
bam_flagstat=./\$i-ready.bam.flagstat.txt &&
cram_flagstat=\$cram.flagstat.txt &&
\$samtools flagstat \$bam > \$bam_flagstat &&
\$samtools flagstat \$cram > \$cram_flagstat &&
diff \$bam_flagstat \$cram_flagstat
if [ \$? -ne 0 ]
then
echo "Unexpected flagstat results in \$cram_flagstat"
exit 1
fi
done
process format_bcbio_family_outputs {
tuple(val(family_id), val(individuals), path(bcbio_output_dir), path(formatted_individual_outputs))
output:
tuple(val(family_id), path("*_${family_id}"))
merged_bcbio_family_output="\$(ls -d \$(readlink -f $bcbio_output_dir)/results/????-??-??_* | head -n 1)" &&
bcbio_family_output="\$(echo \$merged_bcbio_family_output | sed 's/-merged\\/*\$//g')" &&
run_date="\$(basename \$bcbio_family_output | sed -E 's/^([0-9]{4}-[0-9]{2}-[0-9]{2})_.+\$/\\1/g')" &&
broken_family_id="\$(echo ${family_id} | sed 's/_//g')" &&
bcbio_family_output="\$(basename \$merged_bcbio_family_output | sed 's/-merged\\/*\$//g')" &&
mkdir \$bcbio_family_output &&
for suffix in gatk-haplotype-annotated.vcf.gz{,.tbi}
do
src=\$merged_bcbio_family_output/\${broken_family_id}-\${suffix}
ln -s \$src \$bcbio_family_output/${family_id}-\${suffix}
for f in \$merged_bcbio_family_output/*{.log,.csv,.yaml,multiqc}
ln -s \$f \$bcbio_family_output/
for i in ${individuals.join(' ')}
ln -s \$(readlink -f $formatted_individual_outputs)/\$i \$bcbio_family_output/\$i
cd \$bcbio_family_output &&
for f in \$(find . -type f | grep -v '\\.bam')
do
md5sum \$f >> md5sum.txt
done
process collate_pipeline_outputs {
label 'small'
publishDir "${params.output_dir}", mode: 'move', pattern: "${params.pipeline_project_id}_${params.pipeline_project_version}"
input:
val(bcbio_family_output_dirs)
output:
path("${params.pipeline_project_id}_${params.pipeline_project_version}")
outputs=${params.pipeline_project_id}_${params.pipeline_project_version}
mkdir \$outputs &&
mkdir \$outputs/{config,families,params,prioritization,qc} &&
for d in ${bcbio_family_output_dirs.join(' ')}
do
cp -rL \$d \$outputs/families/\$(basename \$d)
cd \$outputs/families &&
${params.bcbio}/anaconda/bin/multiqc \
--title "Trio whole exome QC report: ${params.pipeline_project_id}_${params.pipeline_project_version}" \
--outdir ../qc \
--filename ${params.pipeline_project_id}_${params.pipeline_project_version}_qc_report.html \
. &&
peddy_output=../qc/${params.pipeline_project_id}_${params.pipeline_project_version}.ped_check.txt &&
perl ${params.parse_peddy_output} \
--output \$peddy_output \
--project ${params.pipeline_project_id} \
--batch ${bcbio_family_output_dirs[0].getName().split('_')[1]} \
--version ${params.pipeline_project_version} \
--ped ${params.ped_file} \
--families . &&
# no && here - exit status checked below
grep -v 'False\$' \$peddy_output
if [ \$? -ne 0 ]
then
echo "Found Peddy mismatches in \$peddy_output"
exit 1
fi
workflow process_families {
- For each individual in the family, merge all R1 and R2 fastqs
- For the family:
- Read the relevant information from the samplesheet, ped files and merged fastqs
- Write a BCBio config file
- Run BCBio on it
- Move files around into the right places
- Run CRAM compression, MultiQC and MD5 checks
ch_merged_fastqs = merge_fastqs(
ch_individuals.map(
{ indv, family, father, mother, sex, affected, r1, r2 ->
[ indv, r1, r2 ]
})
)
ch_joined_indv_info = ch_individuals.join(ch_merged_fastqs)
mwham
committed
{ sample_id, family_id, father, mother, sex, phenotype, r1s, r2s, merged_r1, merged_r2 ->
[family_id, sample_id, father, mother, sex, phenotype, merged_r1]
})
.tap {ch_read1_meta} // I hate this syntax so much
mwham
committed
{ sample_id, family_id, father, mother, sex, phenotype, r1s, r2s, merged_r1, merged_r2 ->
[family_id, sample_id, father, mother, sex, phenotype, merged_r2]
})
.tap {ch_read2_meta}
ch_bcbio_csvs = write_bcbio_csv(
ch_read1_meta.mix(ch_read2_meta).map(
{ family_id, sample_id, father, mother, sex, phenotype, merged_fastq ->
[family_id, [merged_fastq, sample_id, family_id, sex, phenotype, params.target_bed]]
}
).groupTuple()
)
ch_bcbio_inputs = ch_joined_indv_info.map(
{ sample_id, family_id, father, mother, sex, phenotype, r1s, r2s, merged_r1, merged_r2 ->
[family_id, sample_id]
}).groupTuple().join(ch_bcbio_csvs)
ch_bcbio_family_outputs = bcbio_family_processing(ch_bcbio_inputs)
ch_individual_folders = format_bcbio_individual_outputs(ch_bcbio_family_outputs)
ch_formatted_bcbio_outputs = format_bcbio_family_outputs(
ch_bcbio_family_outputs.join(ch_individual_folders)
collate_pipeline_outputs(ch_formatted_bcbio_outputs.map({it[1]}).collect())
}
workflow {
read_inputs()
ch_individuals = read_inputs.out[0]
ch_individuals_by_family = read_inputs.out[1]
validation(ch_individuals)
process_families(ch_individuals)