summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMarius Halden <marius.h@lden.org>2016-08-26 18:23:25 +0200
committerMarius Halden <marius.h@lden.org>2016-08-26 18:23:25 +0200
commit5b3fe684cfa32a0ea76c3ec0d1cdfb6c71a7d192 (patch)
tree3a18d11e031c16eae49c89fb417026681f8a2654
parentb6125c9eac61eca81eae42fb63a26ce16a8dc8db (diff)
downloadrunq-5b3fe684cfa32a0ea76c3ec0d1cdfb6c71a7d192.tar.gz
runq-5b3fe684cfa32a0ea76c3ec0d1cdfb6c71a7d192.tar.bz2
runq-5b3fe684cfa32a0ea76c3ec0d1cdfb6c71a7d192.tar.xz
Start adding support for parallel jobs
-rw-r--r--.gitignore3
-rw-r--r--batchd.c49
2 files changed, 45 insertions, 7 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..7a2efe7
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,3 @@
+batchd
+newbatch
+run
diff --git a/batchd.c b/batchd.c
index f3480ba..6fde5a8 100644
--- a/batchd.c
+++ b/batchd.c
@@ -10,8 +10,12 @@
#include <string.h>
#include <fcntl.h>
#include <sys/wait.h>
+#include <signal.h>
#define DEFAULT_QUEUE_DIR "."
+#define MAX_JOBS 1
+
+int cur_jobs = 0;
void
run_job(char *queuedir, int fd)
@@ -27,8 +31,7 @@ run_job(char *queuedir, int fd)
execl(run, "run", (char*)NULL);
perror("execle()");
_exit(1);
- default:
- wait(NULL);
+ break;
}
}
@@ -42,6 +45,9 @@ process_queue(char *queuedir, int dfd)
dir = fdopendir(dfd);
while ((de = readdir(dir))) {
+ if (cur_jobs >= MAX_JOBS)
+ break;
+
if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0)
continue;
@@ -56,17 +62,35 @@ process_queue(char *queuedir, int dfd)
run_job(queuedir, fd);
close(fd);
free(name);
+ cur_jobs++;
}
rewinddir(dir);
fdclosedir(dir);
}
+void
+wait_all()
+{
+ for (;;) {
+ int r = waitpid(-1, NULL, WNOHANG);
+ if (r == -1) {
+ if (errno == ECHILD)
+ break;
+ else if (errno == EWOULDBLOCK)
+ break;
+ else
+ err(1, "waitpid()");
+ }
+ cur_jobs--;
+ }
+}
+
int
main(int argc, char **argv)
{
int kq, dfd, ret;
- struct kevent kv;
+ struct kevent kv[2];
char *queuedir = NULL, *newdir = NULL;
if (argc > 1) {
@@ -85,15 +109,16 @@ main(int argc, char **argv)
if (kq == -1)
err(1, "kqueue()");
- EV_SET(&kv, dfd, EVFILT_VNODE, EV_ADD | EV_CLEAR, NOTE_WRITE, 0, NULL);
+ EV_SET(&kv[0], dfd, EVFILT_VNODE, EV_ADD | EV_CLEAR, NOTE_WRITE, 0, NULL);
+ EV_SET(&kv[1], SIGCHLD, EVFILT_SIGNAL, EV_ADD | EV_CLEAR, 0, 0, NULL);
- if (kevent(kq, &kv, 1, NULL, 0, NULL) == -1)
+ if (kevent(kq, kv, 2, NULL, 0, NULL) == -1)
err(1, "kevent()");
process_queue(queuedir, dfd);
for (;;) {
- ret = kevent(kq, NULL, 0, &kv, 1, NULL);
+ ret = kevent(kq, NULL, 0, kv, 2, NULL);
if (ret == 0)
continue;
else if (ret == -1) {
@@ -103,7 +128,17 @@ main(int argc, char **argv)
break;
}
- process_queue(queuedir, dfd);
+ int i;
+ for (i = 0; i < ret; i++) {
+ if (kv[i].filter == EVFILT_SIGNAL) {
+ if (kv[i].ident == SIGCHLD) {
+ wait_all();
+ process_queue(queuedir, dfd);
+ }
+ } else if (kv[i].filter == EVFILT_VNODE) {
+ process_queue(queuedir, dfd);
+ }
+ }
}
return 0;