Skip to content
Snippets Groups Projects
Commit ab4f1f5d authored by mwham's avatar mwham
Browse files

Adding BCBio config, connecting up workflows properly

parent 3dc7bc2f
No related branches found
No related tags found
1 merge request!1NextFlow
Pipeline #8461 passed
......@@ -8,4 +8,4 @@ channels:
dependencies:
- coreutils=8.25 # gives us cross-platform md5sum
- nextflow=21.04.0
# - bcbio-nextgen=1.2.8
\ No newline at end of file
# - bcbio-nextgen=1.2.8
......@@ -86,7 +86,6 @@ workflow read_inputs {
ch_individuals_by_family = ch_ped_file_info
.join(ch_samplesheet_info)
.map({[it[1], it]})
.groupTuple()
emit:
ch_samplesheet_info
......
nextflow.enable.dsl = 2
include {read_inputs} from './inputs.nf'
include {validation} from './validation.nf'
process merge_fastqs {
publishDir "outputs/individuals/$indv_id/merged_fastqs", mode: 'copy'
input:
tuple(val(indv_id), val(r1), val(r2))
output:
tuple(
val(indv_id),
path("${indv_id}_merged_r1.fastq.gz"),
path("${indv_id}_merged_r2.fastq.gz")
)
script:
// todo: pigz if gzip becomes a bottleneck
"""
cat ${r1.join(' ')} | gzip -c > ${indv_id}_merged_r1.fastq.gz &
cat ${r2.join(' ')} | gzip -c > ${indv_id}_merged_r2.fastq.gz
"""
}
process write_bcbio_csv {
publishDir "outputs/families/$family_id", mode: 'copy'
input:
tuple(val(family_id), val(individual_info))
output:
path("${family_id}.csv")
script:
"""
#!/usr/bin/env python
individual_info = '$individual_info'
lines = individual_info.lstrip('[').rstrip(']').split('], [')
with open('${family_id}.csv', 'w') as f:
f.write('samplename,description,batch,sex,phenotype,variant_regions\\n')
for l in lines:
l = l.lstrip('[').rstrip(']').split(', ')
f.write(','.join(l))
"""
}
process run_bcbio {
input:
path(bcbio_config)
script:
"""
bcbio_nextgen.py $bcbio_config -n 16 -t local
"""
}
workflow prepare_bcbio_config {
/*
- For each individual, merge all R1 and R2 fastqs
- For each family:
- Read the relevant information from the samplesheet, ped files and merged fastqs
- Write a BCBio config file
- Run BCBio on it
*/
take:
ch_samplesheet_info
ch_individuals_by_family
main:
ch_merged_fastqs = merge_fastqs(ch_samplesheet_info)
ch_merged_data = ch_individuals_by_family.map(
{ k, v -> v }
)
.join(ch_merged_fastqs)
.map(
{ sample_id, family_id, father, mother, sex, phenotype, r1s, r2s, merged_r1, merged_r2 ->
[family_id, [sample_id, father, mother, sex, phenotype, merged_r1, merged_r2]]
}).groupTuple()
ch_bcbio_config = write_bcbio_csv(ch_merged_data)
run_bcbio(ch_bcbio_config)
}
workflow {
read_inputs()
ch_samplesheet_info = read_inputs.out[0]
ch_ped_file_info = read_inputs.out[1]
ch_individuals_by_family = read_inputs.out[2]
validation(ch_samplesheet_info)
prepare_bcbio_config(ch_samplesheet_info, ch_individuals_by_family)
}
......@@ -47,37 +47,38 @@ process raise_errors {
exit 1, "MD5 mismatches found"
}
workflow {
workflow validation {
/*
Take a parsed samplesheet, flatten it and parse into a channel of observed vs.
expected checksums. Calls check_errors above to raise an exception upon any
mismatches.
*/
read_inputs()
ch_samplesheet_info = read_inputs.out[0]
take:
ch_samplesheet_info
ch_fastqs = ch_samplesheet_info
.map(
{ indv, r1, r2 ->
[r1, r2]
}
).flatten()
.map({file(it)})
ch_md5_files = ch_fastqs.map(
{ fastq -> fastq.getParent().getParent() + '/md5sums.txt' }
)
ch_obs = observed_md5(ch_fastqs)
ch_exp = expected_md5(ch_fastqs, ch_md5_files)
main:
ch_fastqs = ch_samplesheet_info
.map(
{ indv, r1, r2 ->
[r1, r2]
}
).flatten()
.map({file(it)})
ch_md5_files = ch_fastqs.map(
{ fastq -> fastq.getParent().getParent() + '/md5sums.txt' }
)
ch_obs = observed_md5(ch_fastqs)
ch_exp = expected_md5(ch_fastqs, ch_md5_files)
ch_mismatches = ch_obs.concat(ch_exp)
.map({fastq, md5 -> [fastq, md5.strip()]})
.groupTuple()
.filter({it[1][0] != it[1][1]})
.collect({"${it[0]}: ${it[1][0]} != ${it[1][1]}"})
ch_mismatches.view({"\nChecksum mismatches:\n${it.join('\n')}"})
raise_errors(ch_mismatches)
ch_mismatches = ch_obs.concat(ch_exp)
.map({fastq, md5 -> [fastq, md5.strip()]})
.groupTuple()
.filter({it[1][0] != it[1][1]})
.collect({"${it[0]}: ${it[1][0]} != ${it[1][1]}"})
ch_mismatches.view({"\nChecksum mismatches:\n${it.join('\n')}"})
raise_errors(ch_mismatches)
}
#!/bin/bash
function run_nextflow {
# Run NextFlow in the background, wait for termination and return an exit status based on
# error occurrence. We have to do this because NextFlow doesn't like CI environments with
# detached stdin/stdout.
nextflow run -bg $*
nextflow_pid=$(cat .nextflow.pid)
while kill -0 $nextflow_pid 2> /dev/null
do
sleep 1
done
rm .nextflow.pid
# janky, but couldn't find any other way of getting the exit status
return "$(grep -c ERROR .nextflow.log)"
}
export PATH=$PATH:$PWD/scripts
source scripts/test_config.sh
test_exit_status=0
nextflow clean -f
for d in ./outputs/*
do
rm -r $d
done
rm -r ./outputs/* ./work/*
echo "Test case 1: simple trio"
run_nextflow ../pipeline/validation.nf --ped_file assets/input_data/ped_files/batch_1.ped --sample_sheet assets/input_data/sample_sheets/batch_1.tsv
run_nextflow ../pipeline/main.nf --ped_file assets/input_data/ped_files/batch_1.ped --sample_sheet assets/input_data/sample_sheets/batch_1.tsv
test_exit_status=$(( $test_exit_status + $? ))
for f in "
outputs/individuals/000001/merged_fastqs/000001_merged_r1.fastq.gz
outputs/individuals/000001/merged_fastqs/000001_merged_r2.fastq.gz
outputs/individuals/000002/merged_fastqs/000002_merged_r1.fastq.gz
outputs/individuals/000002/merged_fastqs/000002_merged_r2.fastq.gz
outputs/individuals/000003/merged_fastqs/000003_merged_r1.fastq.gz
outputs/individuals/000003/merged_fastqs/000003_merged_r2.fastq.gz
outputs/families/000001/000001.csv
"
do
ls $f > /dev/null
test_exit_status=$(( $test_exit_status + $? ))
done
echo "Test case 2: MD5 errors"
run_nextflow ../pipeline/validation.nf --ped_file assets/input_data/ped_files/batch_2_md5_errors.ped --sample_sheet assets/input_data/sample_sheets/batch_2_md5_errors.tsv
run_nextflow ../pipeline/main.nf --ped_file assets/input_data/ped_files/batch_2_md5_errors.ped --sample_sheet assets/input_data/sample_sheets/batch_2_md5_errors.tsv
if [ $? == 0 ]
then
test_exit_status=$(( $test_exit_status + 1 ))
......
#/usr/bin/env python
"""
Fake BCBio for testing with the empty datasets from tests/assets/
"""
function run_nextflow {
# Run NextFlow in the background, wait for termination and return an exit status based on
# error occurrence. We have to do this because NextFlow doesn't like CI environments with
# detached stdin/stdout.
nextflow run -bg $*
nextflow_pid=$(cat .nextflow.pid)
while kill -0 $nextflow_pid 2> /dev/null
do
sleep 1
done
rm .nextflow.pid
# janky, but couldn't find any other way of getting the exit status
return "$(grep -c ERROR .nextflow.log)"
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment