diff --git a/environment.yml b/environment.yml index ed0f0cfe9caf08a99b27443c5951828bec6263a0..eceee147bb0b29868596a34ebeae4165a6052d86 100644 --- a/environment.yml +++ b/environment.yml @@ -8,4 +8,4 @@ channels: dependencies: - coreutils=8.25 # gives us cross-platform md5sum - nextflow=21.04.0 -# - bcbio-nextgen=1.2.8 \ No newline at end of file +# - bcbio-nextgen=1.2.8 diff --git a/pipeline/inputs.nf b/pipeline/inputs.nf index cf4094f9a6e3dc0547c8cec7039673db77d6b7c4..3a0263f1b2927506a891654bac1c32e6bd5e8fc4 100644 --- a/pipeline/inputs.nf +++ b/pipeline/inputs.nf @@ -86,7 +86,6 @@ workflow read_inputs { ch_individuals_by_family = ch_ped_file_info .join(ch_samplesheet_info) .map({[it[1], it]}) - .groupTuple() emit: ch_samplesheet_info diff --git a/pipeline/main.nf b/pipeline/main.nf new file mode 100644 index 0000000000000000000000000000000000000000..9ee2571b22e71d7ea53aa3f5847637add3748bda --- /dev/null +++ b/pipeline/main.nf @@ -0,0 +1,100 @@ +nextflow.enable.dsl = 2 + +include {read_inputs} from './inputs.nf' +include {validation} from './validation.nf' + + +process merge_fastqs { + publishDir "outputs/individuals/$indv_id/merged_fastqs", mode: 'copy' + + input: + tuple(val(indv_id), val(r1), val(r2)) + + output: + tuple( + val(indv_id), + path("${indv_id}_merged_r1.fastq.gz"), + path("${indv_id}_merged_r2.fastq.gz") + ) + + script: + // todo: pigz if gzip becomes a bottleneck + """ + cat ${r1.join(' ')} | gzip -c > ${indv_id}_merged_r1.fastq.gz & + cat ${r2.join(' ')} | gzip -c > ${indv_id}_merged_r2.fastq.gz + """ +} + +process write_bcbio_csv { + publishDir "outputs/families/$family_id", mode: 'copy' + + input: + tuple(val(family_id), val(individual_info)) + + output: + path("${family_id}.csv") + + script: + """ + #!/usr/bin/env python + + individual_info = '$individual_info' + lines = individual_info.lstrip('[').rstrip(']').split('], [') + + with open('${family_id}.csv', 'w') as f: + f.write('samplename,description,batch,sex,phenotype,variant_regions\\n') + for l in lines: + l = l.lstrip('[').rstrip(']').split(', ') + f.write(','.join(l)) + """ +} + +process run_bcbio { + input: + path(bcbio_config) + + script: + """ + bcbio_nextgen.py $bcbio_config -n 16 -t local + """ + +} + +workflow prepare_bcbio_config { + /* + - For each individual, merge all R1 and R2 fastqs + - For each family: + - Read the relevant information from the samplesheet, ped files and merged fastqs + - Write a BCBio config file + - Run BCBio on it + */ + + take: + ch_samplesheet_info + ch_individuals_by_family + + main: + ch_merged_fastqs = merge_fastqs(ch_samplesheet_info) + ch_merged_data = ch_individuals_by_family.map( + { k, v -> v } + ) + .join(ch_merged_fastqs) + .map( + { sample_id, family_id, father, mother, sex, phenotype, r1s, r2s, merged_r1, merged_r2 -> + [family_id, [sample_id, father, mother, sex, phenotype, merged_r1, merged_r2]] + }).groupTuple() + + ch_bcbio_config = write_bcbio_csv(ch_merged_data) + run_bcbio(ch_bcbio_config) +} + + +workflow { + read_inputs() + ch_samplesheet_info = read_inputs.out[0] + ch_ped_file_info = read_inputs.out[1] + ch_individuals_by_family = read_inputs.out[2] + + validation(ch_samplesheet_info) + prepare_bcbio_config(ch_samplesheet_info, ch_individuals_by_family) +} diff --git a/pipeline/validation.nf b/pipeline/validation.nf index 81b7c2107074585512e15cd4c4e4c075745c4062..965008f4053c3f32b76da5ca90d9420a03ee48d6 100644 --- a/pipeline/validation.nf +++ b/pipeline/validation.nf @@ -47,37 +47,38 @@ process raise_errors { exit 1, "MD5 mismatches found" } -workflow { +workflow validation { /* Take a parsed samplesheet, flatten it and parse into a channel of observed vs. expected checksums. Calls check_errors above to raise an exception upon any mismatches. */ - read_inputs() - ch_samplesheet_info = read_inputs.out[0] + take: + ch_samplesheet_info - ch_fastqs = ch_samplesheet_info - .map( - { indv, r1, r2 -> - [r1, r2] - } - ).flatten() - .map({file(it)}) - - ch_md5_files = ch_fastqs.map( - { fastq -> fastq.getParent().getParent() + '/md5sums.txt' } - ) - - ch_obs = observed_md5(ch_fastqs) - ch_exp = expected_md5(ch_fastqs, ch_md5_files) + main: + ch_fastqs = ch_samplesheet_info + .map( + { indv, r1, r2 -> + [r1, r2] + } + ).flatten() + .map({file(it)}) + + ch_md5_files = ch_fastqs.map( + { fastq -> fastq.getParent().getParent() + '/md5sums.txt' } + ) + + ch_obs = observed_md5(ch_fastqs) + ch_exp = expected_md5(ch_fastqs, ch_md5_files) - ch_mismatches = ch_obs.concat(ch_exp) - .map({fastq, md5 -> [fastq, md5.strip()]}) - .groupTuple() - .filter({it[1][0] != it[1][1]}) - .collect({"${it[0]}: ${it[1][0]} != ${it[1][1]}"}) - - ch_mismatches.view({"\nChecksum mismatches:\n${it.join('\n')}"}) - raise_errors(ch_mismatches) + ch_mismatches = ch_obs.concat(ch_exp) + .map({fastq, md5 -> [fastq, md5.strip()]}) + .groupTuple() + .filter({it[1][0] != it[1][1]}) + .collect({"${it[0]}: ${it[1][0]} != ${it[1][1]}"}) + + ch_mismatches.view({"\nChecksum mismatches:\n${it.join('\n')}"}) + raise_errors(ch_mismatches) } diff --git a/tests/run_tests.sh b/tests/run_tests.sh index 59a53a4b069d685fed86a35b4edcdbc6d3e2c48d..bc83de57c9d3ef2693c5bd1a473fd1473678c8b8 100755 --- a/tests/run_tests.sh +++ b/tests/run_tests.sh @@ -1,37 +1,32 @@ #!/bin/bash -function run_nextflow { - # Run NextFlow in the background, wait for termination and return an exit status based on - # error occurrence. We have to do this because NextFlow doesn't like CI environments with - # detached stdin/stdout. - - nextflow run -bg $* - nextflow_pid=$(cat .nextflow.pid) - - while kill -0 $nextflow_pid 2> /dev/null - do - sleep 1 - done - rm .nextflow.pid - - # janky, but couldn't find any other way of getting the exit status - return "$(grep -c ERROR .nextflow.log)" -} +export PATH=$PATH:$PWD/scripts +source scripts/test_config.sh test_exit_status=0 nextflow clean -f -for d in ./outputs/* -do - rm -r $d -done +rm -r ./outputs/* ./work/* echo "Test case 1: simple trio" -run_nextflow ../pipeline/validation.nf --ped_file assets/input_data/ped_files/batch_1.ped --sample_sheet assets/input_data/sample_sheets/batch_1.tsv +run_nextflow ../pipeline/main.nf --ped_file assets/input_data/ped_files/batch_1.ped --sample_sheet assets/input_data/sample_sheets/batch_1.tsv test_exit_status=$(( $test_exit_status + $? )) +for f in " + outputs/individuals/000001/merged_fastqs/000001_merged_r1.fastq.gz + outputs/individuals/000001/merged_fastqs/000001_merged_r2.fastq.gz + outputs/individuals/000002/merged_fastqs/000002_merged_r1.fastq.gz + outputs/individuals/000002/merged_fastqs/000002_merged_r2.fastq.gz + outputs/individuals/000003/merged_fastqs/000003_merged_r1.fastq.gz + outputs/individuals/000003/merged_fastqs/000003_merged_r2.fastq.gz + outputs/families/000001/000001.csv +" +do + ls $f > /dev/null + test_exit_status=$(( $test_exit_status + $? )) +done echo "Test case 2: MD5 errors" -run_nextflow ../pipeline/validation.nf --ped_file assets/input_data/ped_files/batch_2_md5_errors.ped --sample_sheet assets/input_data/sample_sheets/batch_2_md5_errors.tsv +run_nextflow ../pipeline/main.nf --ped_file assets/input_data/ped_files/batch_2_md5_errors.ped --sample_sheet assets/input_data/sample_sheets/batch_2_md5_errors.tsv if [ $? == 0 ] then test_exit_status=$(( $test_exit_status + 1 )) diff --git a/tests/scripts/bcbio_nextgen.py b/tests/scripts/bcbio_nextgen.py new file mode 100755 index 0000000000000000000000000000000000000000..c8da36adc591810f50d69215b14dba4443c63f7f --- /dev/null +++ b/tests/scripts/bcbio_nextgen.py @@ -0,0 +1,4 @@ +#/usr/bin/env python +""" +Fake BCBio for testing with the empty datasets from tests/assets/ +""" diff --git a/tests/scripts/test_config.sh b/tests/scripts/test_config.sh new file mode 100644 index 0000000000000000000000000000000000000000..5ddda73556a1a299df097abb5b8cdc64d96f08c9 --- /dev/null +++ b/tests/scripts/test_config.sh @@ -0,0 +1,18 @@ + +function run_nextflow { + # Run NextFlow in the background, wait for termination and return an exit status based on + # error occurrence. We have to do this because NextFlow doesn't like CI environments with + # detached stdin/stdout. + + nextflow run -bg $* + nextflow_pid=$(cat .nextflow.pid) + + while kill -0 $nextflow_pid 2> /dev/null + do + sleep 1 + done + rm .nextflow.pid + + # janky, but couldn't find any other way of getting the exit status + return "$(grep -c ERROR .nextflow.log)" +}