Snakemake: 搭建生信工作流程 (3)

Published at 2021-06-10 18:48

Author:zhixy

View:780


输入输出流,及wildcards

输入输出(文件/文件夹)流,是snakemake工作流能否顺利执行的关键。

输入输出流根据形式可分为:

  1. file_to_file,即输入一个文件,经规则定义的操作,输出一个文件
  2. files_to_files,即输入多个文件,经规则定义的操作,输出多个文件
  3. file_to_files,即输入一个文件,经规则定义的操作,输出多个文件
  4. files_to_file,即输入多个文件,经规则定义的操作,输出一个文件
  5. dir_to_dir,即输入一个文件夹,经规则定义的操作,输出一个文件夹
  6. file/files_to_dir,即输入一个或多个文件,经规则定义的操作,输出一个文件夹
  7. dir_to_file/files,即输入一个文件夹,经规则定义的操作,输出一个或多个文件

首先,只要出现多个文件,不论是输入还是输出,snakemake提供了wildcardsexpand函数两种常用机制来实现文件名的匹配。

wildcards是一种基于正则表达式的文件名匹配。而expand函数只是实现了一种变量组合(以列表形式返回),而每种变量的取值实际上是确定的,本质上是Python的列表推导式。两者的关键区别并不在形式上,而在于功能上,wildcards 输入输出的一对一关系,也就说通过wildcards可以将该规则中的操作(如shell命令)执行多次,而expand函数是将列表内的元素串联成字符串,然后交给规则中的操作,且相应操作仅执行一次。

特别地,当wildcards出现在输入时,输出也必须有相对应的wildcards,否则会报错:

Wildcards in input files cannot be determined from output files:

而当wildcards出现在输出时,输入不必有相对应的wildcards

对于expand函数则没有这样的限制,所以expand函数可用于files_to_filefile/files_to_dir的情形。

要在expand函数中使用wildcards,需要双花括号。

当规则的输入不确定时

工作流中,有时会出现某一个规则,其生成的输出是不确定的,包括文件数量不确定和文件名不确定。

例如在利用orthofinder进行基因家族分析时,单拷贝的直系同源基因对应的结果文件,名称和数量在orthofinder运行完毕之前是未知的。

那么如果想要以orthofinder规则的输出,作为下一个规则的输入,通过wildcardsexpand函数都是不能实现的。

要解决这一问题,需要将orthofinder规则声明为checkpoint,即用checkpoint替换rule

snakemake工作流的运行,分为三个阶段:

  • 在初始化阶段,工作流程会被解析,所有规则都会被实例化
  • 在DAG阶段,也就是生成有向无环图,确定依赖关系的时候,所有的通配名部分都会被真正的文件名代替。
  • 在调度阶段,DAG的任务按照顺序执行

声明为checkpoint的规则,可让snakemake在DAG阶段暂时忽略未知文件不能前后串联的问题,而在完成checkpoint规则后,再完成DAG阶段。

重要的是,在checkpoint规则之后,必须至少有一个rule规则明确的与checkpoint规则串联起来。

在下例中,checkpoint clustering模拟了一次聚类分析。然后rule intermediate从checkpoint clustering的结果中拷贝文件到post目录。

因为checkpoint clustering的输出,只是clustering目录的下一级目录(以通配符{sample}为名),所以如果clustering被声明为一般的规则时,rule intermediate是不能通过DAG阶段的。

而当clustering声明为checkpoint时,DAG分析会在完成clustering后更新,此时wildcards就能发现clustering/{sample}/下的文件内容了。

# a target rule to define the desired final output
rule all:
    input:
        "aggregated/a.txt",
        "aggregated/b.txt"


# the checkpoint that shall trigger re-evaluation of the DAG
checkpoint clustering:
    input:
        "samples/{sample}.txt"
    output:
        clusters=directory("clustering/{sample}")
    shell:
        "mkdir clustering/{wildcards.sample}; "
        "for i in 1 2 3; do echo $i > clustering/{wildcards.sample}/$i.txt; done"


# an intermediate rule
rule intermediate:
    input:
        "clustering/{sample}/{i}.txt"
    output:
        "post/{sample}/{i}.txt"
    shell:
        "cp {input} {output}"


def aggregate_input(wildcards):
    checkpoint_output = checkpoints.clustering.get(**wildcards).output[0]
    return expand("post/{sample}/{i}.txt", sample=wildcards.sample, i=glob_wildcards(os.path.join(checkpoint_output, "{i}.txt")).i)


# an aggregation over all produced clusters
rule aggregate:
    input:
        aggregate_input
    output:
        "aggregated/{sample}.txt"
    shell:
        "cat {input}{output}"

这里明确的与checkpointclustering串联起来是rule aggregate。

通过checkpoints.clustering.get()函数,rule aggregate逻辑上与checkpoint clustering联系了起来。但操作流程的顺序为:

clustering > intermediate > aggregate > all

除此之外,checkpoint规则,的主要应用场景是数据以来的条件执行