views:

557

answers:

4

I want to know how to distribute N independent tasks to exactly M processors on a machine that has L cores, where L>M. I don't want to use all the processors because I still want to have I/O available. The solutions I've tried seem to get distributed to all processors, bogging down the system.

I assume the multiprocessing module is the way to go.

I do numerical simulations. My background is in physics, not computer science, so unfortunately, I often don't fully understand discussions involving standard tasking models like server/client, producer/consumer, etc.

Here are some simplified models that I've tried:

Suppose I have a function run_sim(**kwargs) (see that further below) that runs a simulation, and a long list of kwargs for the simulations, and I have an 8 core machine.

from multiprocessing import Pool, Process

#using pool
p = Pool(4)
p.map(run_sim, kwargs)

# using process
number_of_live_jobs=0
all_jobs=[]
sim_index=0
while sim_index < len(kwargs)+1:
   number_of_live_jobs = len([1 for job in all_jobs if job.is_alive()])
   if number_of_live_jobs <= 4:
      p = Process(target=run_sim, args=[], kwargs=kwargs[sim_index])
      print "starting job", kwargs[sim_index]["data_file_name"]
      print "number of live jobs: ", number_of_live_jobs
      p.start()
      p.join()
      all_jobs.append(p)
      sim_index += 1

When I look at the processor usage with "top" and then "1", All processors seem to get used anyway in either case. It is not out of the question that I am misinterpreting the output of "top", but if the run_simulation() is processor intensive, the machine bogs down heavily.

Hypothetical simulation and data:

# simulation kwargs
numbers_of_steps = range(0,10000000, 1000000)
sigmas = [x for x in range(11)]
kwargs = []
for number_of_steps in numbers_of_steps:
   for sigma in sigmas:
      kwargs.append(
         dict(
            number_of_steps=number_of_steps,
            sigma=sigma,
            # why do I need to cast to int?
            data_file_name="walk_steps=%i_sigma=%i" % (number_of_steps, sigma),
            )
         )

import random, time
random.seed(time.time())

# simulation of random walk
def run_sim(kwargs):
   number_of_steps = kwargs["number_of_steps"]
   sigma = kwargs["sigma"]
   data_file_name = kwargs["data_file_name"]
   data_file = open(data_file_name+".dat", "w")
   current_position = 0
   print "running simulation", data_file_name
   for n in range(int(number_of_steps)+1):
      data_file.write("step number %i   position=%f\n" % (n, current_position))
      random_step = random.gauss(0,sigma)
      current_position += random_step

   data_file.close()
+1  A: 

You might want to look into the following package:

http://pypi.python.org/pypi/affinity

It is a package that uses sched_setaffinity and sched _getaffinity.

The drawback is that it is highly Linux-specific.

terminus
A: 

If you are on linux, use taskset when you launch the program

A child created via fork(2) inherits its parent’s CPU affinity mask. The affinity mask is preserved across an execve(2).

TASKSET(1)
Linux User’s Manual
TASKSET(1)

NAME taskset - retrieve or set a process’s CPU affinity

SYNOPSIS taskset [options] mask command [arg]... taskset [options] -p [mask] pid

DESCRIPTION taskset is used to set or retrieve the CPU affinity of a running process given its PID or to launch a new COMMAND with a given CPU affinity. CPU affinity is a scheduler property that "bonds" a process to a given set of CPUs on the system. The Linux scheduler will honor the given CPU affinity and the process will not run on any other CPUs. Note that the Linux scheduler also supports natural CPU affinity: the scheduler attempts to keep processes on the same CPU as long as practical for performance reasons. Therefore, forcing a specific CPU affinity is useful only in certain applications.

The CPU affinity is represented as a bitmask, with the lowest order bit corresponding to the first logical CPU and the highest order bit corresponding to the last logical CPU. Not all CPUs may exist on a given sys‐ tem but a mask may specify more CPUs than are present. A retrieved mask will reflect only the bits that cor‐ respond to CPUs physically on the system. If an invalid mask is given (i.e., one that corresponds to no valid CPUs on the current system) an error is returned. The masks are typically given in hexadecimal.

gnibbler
+1  A: 

On my dual-core machine the total number of processes is honoured, i.e. if I do

p = Pool(1)

Then I only see one CPU in use at any given time. The process is free to migrate to a different processor, but then the other processor is idle. I don't see how all your processors can be in use at the same time, so I don't follow how this can be related to your I/O issues. Of course, if your simulation is I/O bound, then you will see sluggish I/O regardless of core usage...

ire_and_curses
It looks like top is showing all the CPU used, so I'm guessing that the user interface is getting sluggish, rather than actual I/O bottleneck.
gnibbler
A: 

Probably a dumb observation, pls forgive my inexperience in Python.

But your while loop polling for the finished tasks is not going to sleep and is consuming one core all time, isn't it?

The other thing to notice is that if your tasks are I/O bound, you M should be adjusted to the number of parallel disks(?) you have ... if they are NFS mounted in different machine you could potentially have M>L.

g'luck!

janpf