Published at 2021-06-10 18:48
Author:zhixy
View:1019
输入输出(文件/文件夹)流,是snakemake工作流能否顺利执行的关键。
输入输出流根据形式可分为:
首先,只要出现多个文件,不论是输入还是输出,snakemake提供了wildcards
和expand函数
两种常用机制来实现文件名的匹配。
wildcards
是一种基于正则表达式的文件名匹配。而expand函数
只是实现了一种变量组合(以列表形式返回),而每种变量的取值实际上是确定的,本质上是Python的列表推导式。两者的关键区别并不在形式上,而在于功能上,wildcards
输入输出的一对一关系,也就说通过wildcards
可以将该规则中的操作(如shell命令)执行多次,而expand函数
是将列表内的元素串联成字符串,然后交给规则中的操作,且相应操作仅执行一次。
特别地,当wildcards
出现在输入时,输出也必须有相对应的wildcards
,否则会报错:
Wildcards in input files cannot be determined from output files:
而当wildcards
出现在输出时,输入不必有相对应的wildcards
。
对于expand函数
则没有这样的限制,所以expand函数
可用于files_to_file
和file/files_to_dir
的情形。
要在
expand函数
中使用wildcards
,需要双花括号。
工作流中,有时会出现某一个规则,其生成的输出是不确定的,包括文件数量不确定和文件名不确定。
例如在利用orthofinder
进行基因家族分析时,单拷贝的直系同源基因对应的结果文件,名称和数量在orthofinder
运行完毕之前是未知的。
那么如果想要以orthofinder
规则的输出,作为下一个规则的输入,通过wildcards
和expand函数
都是不能实现的。
要解决这一问题,需要将orthofinder
规则声明为checkpoint,即用checkpoint替换rule。
snakemake工作流的运行,分为三个阶段:
声明为checkpoint的规则,可让snakemake在DAG阶段暂时忽略未知文件不能前后串联的问题,而在完成checkpoint规则后,再完成DAG阶段。
重要的是,在checkpoint规则之后,必须至少有一个rule规则明确的与checkpoint规则串联起来。
在下例中,checkpoint clustering模拟了一次聚类分析。然后rule intermediate从checkpoint clustering的结果中拷贝文件到post目录。
因为checkpoint clustering的输出,只是clustering目录的下一级目录(以通配符{sample}为名),所以如果clustering被声明为一般的规则时,rule intermediate是不能通过DAG阶段的。
而当clustering声明为checkpoint时,DAG分析会在完成clustering后更新,此时wildcards就能发现clustering/{sample}/
下的文件内容了。
# a target rule to define the desired final output
rule all:
input:
"aggregated/a.txt",
"aggregated/b.txt"
# the checkpoint that shall trigger re-evaluation of the DAG
checkpoint clustering:
input:
"samples/{sample}.txt"
output:
clusters=directory("clustering/{sample}")
shell:
"mkdir clustering/{wildcards.sample}; "
"for i in 1 2 3; do echo $i > clustering/{wildcards.sample}/$i.txt; done"
# an intermediate rule
rule intermediate:
input:
"clustering/{sample}/{i}.txt"
output:
"post/{sample}/{i}.txt"
shell:
"cp {input} {output}"
def aggregate_input(wildcards):
checkpoint_output = checkpoints.clustering.get(**wildcards).output[0]
return expand("post/{sample}/{i}.txt", sample=wildcards.sample, i=glob_wildcards(os.path.join(checkpoint_output, "{i}.txt")).i)
# an aggregation over all produced clusters
rule aggregate:
input:
aggregate_input
output:
"aggregated/{sample}.txt"
shell:
"cat {input} > {output}"
这里明确的与checkpointclustering串联起来是rule aggregate。
通过checkpoints.clustering.get()
函数,rule aggregate逻辑上与checkpoint clustering联系了起来。但操作流程的顺序为:
clustering > intermediate > aggregate > all
除此之外,checkpoint规则,的主要应用场景是数据以来的条件执行。