#include #include #include #include #include #include #include #include #include #include #include #include #include #define DEFAULT_QUEUE_DIR "." #define MAX_JOBS 2 struct job { pid_t pid; char *file; }; int cur_jobs = 0; struct job jobs[MAX_JOBS]; pid_t run_job(char *queuedir, int fd) { char *run; pid_t pid; switch((pid = fork())) { case -1: return -1; case 0: asprintf(&run, "%s/run", queuedir); dup2(fd, STDIN_FILENO); close(fd); execl(run, "run", (char*)NULL); perror("execle()"); _exit(1); break; default: return pid; } } void process_queue(char *queuedir, int dfd) { int fd, i; DIR *dir; struct dirent *de; char *name, *work, *failed; pid_t pid; dir = fdopendir(dfd); while ((de = readdir(dir))) { if (cur_jobs >= MAX_JOBS) break; if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0) continue; asprintf(&name, "new/%s", de->d_name); asprintf(&work, "work/%s", de->d_name); asprintf(&failed, "failed/%s", de->d_name); fd = open(name, O_RDONLY); if (fd == -1) { goto next; } rename(name, work); pid = run_job(queuedir, fd); if (pid == -1) { rename(work, failed); free(name); free(work); free(failed); break; } for (i = 0; i < MAX_JOBS; i++) { if (jobs[i].pid == -1) { jobs[i].pid = pid; jobs[i].file = strdup(de->d_name); } } close(fd); cur_jobs++; next: free(name); free(work); free(failed); } rewinddir(dir); fdclosedir(dir); } void wait_all() { int i, status; char *work, *done, *failed; for (;;) { pid_t r = waitpid(-1, &status, WNOHANG); if (r == 0) break; else if (r == -1) { if (errno == ECHILD) break; else err(1, "waitpid()"); } for (i = 0; i < MAX_JOBS; i++) { if (jobs[i].pid == r) { asprintf(&work, "work/%s", jobs[i].file); asprintf(&done, "done/%s", jobs[i].file); asprintf(&failed, "failed/%s", jobs[i].file); if (WEXITSTATUS(status) == 0) rename(work, done); else rename(work, failed); jobs[i].pid = -1; free(jobs[i].file); jobs[i].file = NULL; cur_jobs--; free(work); free(done); free(failed); break; } } } } int main(int argc, char **argv) { int kq, dfd, ret, i; struct kevent kv[2]; char *queuedir = NULL, *newdir = NULL; for (i = 0; i < MAX_JOBS; i++) { jobs[i].pid = -1; jobs[i].file = NULL; } if (argc > 1) { queuedir = strdup(argv[1]); } else { queuedir = strdup(DEFAULT_QUEUE_DIR); } asprintf(&newdir, "%s/new", queuedir); dfd = open(newdir, O_RDONLY | O_DIRECTORY); if (dfd == -1) err(1, "open()"); kq = kqueue(); if (kq == -1) err(1, "kqueue()"); EV_SET(&kv[0], dfd, EVFILT_VNODE, EV_ADD | EV_CLEAR, NOTE_WRITE, 0, NULL); EV_SET(&kv[1], SIGCHLD, EVFILT_SIGNAL, EV_ADD | EV_CLEAR, 0, 0, NULL); if (kevent(kq, kv, 2, NULL, 0, NULL) == -1) err(1, "kevent()"); process_queue(queuedir, dfd); for (;;) { ret = kevent(kq, NULL, 0, kv, 2, NULL); if (ret == 0) continue; else if (ret == -1) { if (errno == EINTR) continue; else break; } int i; for (i = 0; i < ret; i++) { if (kv[i].filter == EVFILT_SIGNAL) { if (kv[i].ident == SIGCHLD) { wait_all(); process_queue(queuedir, dfd); } } else if (kv[i].filter == EVFILT_VNODE) { process_queue(queuedir, dfd); } } } return 0; }