Newer
Older
nextflow.enable.dsl = 2
include {read_inputs} from './inputs.nf'
include {validation} from './validation.nf'
params.bcbio = null
mwham
committed
params.bcbio_template = null
params.output_dir = null
params.target_bed = null
label 'medium'
publishDir "${params.output_dir}/individuals/$indv_id/merged_fastqs", mode: 'copy'
mwham
committed
tuple(val(indv_id), path(r1), path(r2))
output:
tuple(
val(indv_id),
path("${indv_id}_merged_r1.fastq.gz"),
path("${indv_id}_merged_r2.fastq.gz")
)
script:
// todo: pigz if gzip becomes a bottleneck
"""
zcat ${r1.join(' ')} | gzip -c > ${indv_id}_merged_r1.fastq.gz &
zcat ${r2.join(' ')} | gzip -c > ${indv_id}_merged_r2.fastq.gz
"""
}
process write_bcbio_csv {
label 'small'
publishDir "${params.output_dir}/families/$family_id", mode: 'copy'
input:
tuple(val(family_id), val(individual_info))
output:
path("${family_id}.csv")
script:
"""
#!/usr/bin/env python
individual_info = '$individual_info'
lines = individual_info.lstrip('[').rstrip(']').split('], [')
with open('${family_id}.csv', 'w') as f:
f.write('samplename,description,batch,sex,phenotype,variant_regions\\n')
for l in lines:
f.write(l.replace(', ', ',') + '\\n')
mwham
committed
"""
}
process bcbio {
label 'large'
mwham
committed
publishDir "${params.output_dir}/families/$family_id", mode: 'copy'
mwham
committed
input:
tuple(val(family_id), val(individuals))
path(family_csv)
mwham
committed
script:
"""
${params.bcbio}/anaconda/bin/bcbio_prepare_samples.py --out . --csv $family_csv &&
${params.bcbio}/anaconda/bin/bcbio_nextgen.py -w template ${params.bcbio_template} ${family_csv.getBaseName()}-merged.csv ${individuals.collect({"${it}.fastq.gz"}).join(' ')} &&
cd ${family_id}-merged &&
export PATH=$PATH:${params.bcbio}/tools/bin &&
${params.bcbio}/anaconda/bin/bcbio_nextgen.py config/${family_id}-merged.yaml -n 16 -t local
"""
}
workflow prepare_bcbio_config {
/*
- For each individual, merge all R1 and R2 fastqs
- For each family:
- Read the relevant information from the samplesheet, ped files and merged fastqs
- Write a BCBio config file
- Run BCBio on it
*/
take:
ch_samplesheet_info
ch_individuals_by_family
main:
ch_merged_fastqs = merge_fastqs(ch_samplesheet_info)
mwham
committed
ch_joined_family_info = ch_individuals_by_family.map({ k, v -> v })
mwham
committed
ch_joined_family_info.map(
{ sample_id, family_id, father, mother, sex, phenotype, r1s, r2s, merged_r1, merged_r2 ->
[family_id, sample_id, father, mother, sex, phenotype, merged_r1]
})
.tap {ch_read1_meta} // I hate this syntax so much
ch_joined_family_info.map(
mwham
committed
{ sample_id, family_id, father, mother, sex, phenotype, r1s, r2s, merged_r1, merged_r2 ->
[family_id, sample_id, father, mother, sex, phenotype, merged_r2]
})
.tap {ch_read2_meta}
ch_metadata = ch_read1_meta.mix(ch_read2_meta).map(
{ family_id, sample_id, father, mother, sex, phenotype, merged_fastq ->
[family_id, [merged_fastq, sample_id, family_id, sex, phenotype, params.target_bed]]
mwham
committed
}
).groupTuple()
ch_bcbio_csv = write_bcbio_csv(ch_metadata)
ch_individuals = ch_joined_family_info.map(
{ sample_id, family_id, father, mother, sex, phenotype, r1s, r2s, merged_r1, merged_r2 ->
[family_id, sample_id]
}).groupTuple()
bcbio(ch_individuals, ch_bcbio_csv)
}
workflow {
read_inputs()
ch_samplesheet_info = read_inputs.out[0]
ch_ped_file_info = read_inputs.out[1]
ch_individuals_by_family = read_inputs.out[2]
validation(ch_samplesheet_info)
prepare_bcbio_config(ch_samplesheet_info, ch_individuals_by_family)
}