views:

735

answers:

6

I have an application where a bit of parallel processing would be of benefit. For the purposes of the discussion, let's say there is a directory with 10 text files in it, and I want to start a program, that forks off 10 processes, each taking one of the files, and uppercasing the contents of the file. I acknowledge that the parent program can wait for the children to complete using one of the wait functions, or using the select function.

What I would like to do is have the parent process monitor the progress of each forked process, and display something like a progress bar as the processes run.

My Question.

What would be a reasonable alternatives do I have for the forked processes to communicate this information back to the parent? What IPC techniques would be reasonable to use?

A: 

Just earlier today someone told me that they always use a pipe, by which the children can send notification to the parent process that all is going well. This seems a decent solution, and is especially useful in places where you would want to print an error, but no longer have access to stdout/stderr, etc.

MattJ
+1  A: 

A few options (no idea which, if any, will suit you--a lot depends on what you are actually doing, as opped to the "uppercasing files" analogy):

  • signals
  • fifos / named pipes
  • the STDOUT of the children or other passed handles
  • message queues (if appropriate)
MarkusQ
+2  A: 

In this kind of situation where you only want to monitor the progress, the easiest alternative is to use shared memory. Every process updates it progress value (e.g. an integer) on a shared memory block, and the master process reads the block regularly. Basically, you don't need any locking in this scheme. Also, it is a "polling" style application because the master can read the information whenever it wants, so you do not need any event processing for handling the progress data.

antti.huima
If the progress value is multi-byte you may need locking (depending on all sorts of things from cache to alignment to...). This used to be called "the odometer problem"; basically (and in big-endian decimal) going from 1999 to 2000 the reader might see 1900, 2099, etc.
MarkusQ
A: 

Boost.MPI should be useful in this scenario. You may consider it overkill but it's definitely worth investigating:
www.boost.org/doc/html/mpi.html

Functastic
+1  A: 

If all you want is a progress update, by far the easiest way is probably to use an anonymous pipe. The pipe(2) call will give you two file descriptors, one for each end of the pipe. Call it just before you fork, and have the parent listen to the first fd and the child write to the second. (This works because both the file descriptors and the two-element array containing them are shared between the processes -- not shared memory per se, but it's copy-on-write so they share the values unless you overwrite them.)

Dan Ellis
+2  A: 

If the only progress you need is "how many jobs have completed?", then a simple

while (jobs_running) {
    pid = wait(&status);
    for (i = 0; i < num_jobs; i++)
        if (pid == jobs[i]) {
            jobs_running--;
            break;
        }
    printf("%i/%i\n", num_jobs - jobs_running, num_jobs);
}

will do. For reporting progress while, well, in progress, here's dumb implementations of some of the other suggestions.

Pipes:

#include <poll.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>

int child(int fd) {
    int i;
    struct timespec ts;
    for (i = 0; i < 100; i++) {
        write(fd, &i, sizeof(i));
        ts.tv_sec = 0;
        ts.tv_nsec = rand() % 512 * 1000000;
        nanosleep(&ts, NULL);
    }
    write(fd, &i, sizeof(i));
    exit(0);
}

int main() {
    int fds[10][2];
    int i, j, total, status[10] = {0};
    for (i = 0; i < 10; i++) {
        pipe(fds[i]);
        if (!fork())
            child(fds[i][1]);
    }
    for (total = 0; total < 1000; sleep(1)) {
        for (i = 0; i < 10; i++) {
            struct pollfd pfds = {fds[i][0], POLLIN};
            for (poll(&pfds, 1, 0); pfds.revents & POLLIN; poll(&pfds, 1, 0)) {
                read(fds[i][0], &status[i], sizeof(status[i]));
                for (total = j = 0; j < 10; j++)
                    total += status[j];
            }
        }
        printf("%i/1000\n", total);
    }
    return 0;
}

Shared memory:

#include <semaphore.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/mman.h>
#include <time.h>
#include <unistd.h>

int child(int *o, sem_t *sem) {
    int i;
    struct timespec ts;
    for (i = 0; i < 100; i++) {
        sem_wait(sem);
        *o = i;
        sem_post(sem);
        ts.tv_sec = 0;
        ts.tv_nsec = rand() % 512 * 1000000;
        nanosleep(&ts, NULL);
    }
    sem_wait(sem);
    *o = i;
    sem_post(sem);
    exit(0);
}

int main() {
    int i, j, size, total;
    void *page;
    int *status;
    sem_t *sems;
    size = sysconf(_SC_PAGESIZE);
    size = (10 * sizeof(*status) + 10 * sizeof(*sems) + size - 1) & size;
    page = mmap(0, size, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
    status = page;
    sems = (void *)&status[10];
    for (i = 0; i < 10; i++) {
        status[i] = 0;
        sem_init(&sems[i], 1, 1);
        if (!fork())
            child(&status[i], &sems[i]);
    }
    for (total = 0; total < 1000; sleep(1)) {
        for (total = i = 0; i < 10; i++) {
            sem_wait(&sems[i]);
            total += status[i];
            sem_post(&sems[i]);
        }
        printf("%i/1000\n", total);
    }
    return 0;
}

Error handling etc. elided for clarity.

ephemient