I am the developer of Ruffus. I am not sure I entirely understand what you are trying to do but here goes:
Waiting for jobs which take a different amount of time to finish in order to run the next stage of your pipeline is exactly what Ruffus is about so this hopefully is straightforward.
The first question is do you know which files are being created up front, i.e. before the pipeline is run? Lets start by assuming you do.
from ruffus import *
filenames = ["one.file", "two.file", "three.file"]
Let us write a dummy function which creates a file each time it is called. In Ruffus, any input and output file names are contained in the first two parameters respectively. We have no input file name, so our function calls should look like this:
create_file(None, "one.file")
create_file(None, "two.file")
create_file(None, "three.file")
The definition of create_file would look like this:
@files([(None, fn) for fn in filenames])
def create_file(no_input_file_name, output_file_name):
open(output_file_name, "w").write("dummy file")
Each of these files would be created in 3 separate calls to create_file. These can be run in parallel if you wish.
pipeline_run([create_file], multiprocess = 5)
Now to combine the files. The "@Merge" decorator is indeed set up precisely for this. We just need to link it up to the previous function :
@merge(create_file, "merge.file")
def merge_file(input_file_names, output_file_name):
output_file = open(output_file_name, "w")
for i in input_file_names:
output_file.write(open(i).read())
This will only call merge_file when all the files are ready from the three calls to create_file().
The entire code is as follows:
from ruffus import *
filenames = ["one.file", "two.file", "three.file"]
from random import randint
from time import sleep
@files([(None, fn) for fn in filenames])
def create_file(no_input_file_name, output_file_name):
# simulate create file process of indeterminate complexity
sleep(randint(1,5))
open(output_file_name, "w").write("dummy file")
@merge(create_file, "merge.file")
def merge_file(input_file_names, output_file_name):
output_file = open(output_file_name, "w")
for i in input_file_names:
output_file.write(open(i).read())
pipeline_run([merge_file], multiprocess = 5)
And this is the result:
>>> pipeline_run([merge_file], multiprocess = 5)
Job = [None -> two.file] completed
Job = [None -> three.file] completed
Job = [None -> one.file] completed
Completed Task = create_file
Job = [[one.file, three.file, two.file] -> merge.file] completed
Completed Task = merge_file