diff options
-rw-r--r-- | batchd.c | 110 | ||||
-rw-r--r-- | newbatch.c | 70 |
2 files changed, 180 insertions, 0 deletions
diff --git a/batchd.c b/batchd.c new file mode 100644 index 0000000..f3480ba --- /dev/null +++ b/batchd.c @@ -0,0 +1,110 @@ +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <sys/types.h> +#include <sys/event.h> +#include <sys/time.h> +#include <errno.h> +#include <err.h> +#include <dirent.h> +#include <string.h> +#include <fcntl.h> +#include <sys/wait.h> + +#define DEFAULT_QUEUE_DIR "." + +void +run_job(char *queuedir, int fd) +{ + char *run; + switch(fork()) { + case -1: + return; + case 0: + asprintf(&run, "%s/run", queuedir); + dup2(fd, STDIN_FILENO); + close(fd); + execl(run, "run", (char*)NULL); + perror("execle()"); + _exit(1); + default: + wait(NULL); + } +} + +void +process_queue(char *queuedir, int dfd) +{ + int fd; + DIR *dir; + struct dirent *de; + char *name; + + dir = fdopendir(dfd); + while ((de = readdir(dir))) { + if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0) + continue; + + asprintf(&name, "new/%s", de->d_name); + + fd = open(name, O_RDONLY); + if (fd == -1) { + free(name); + continue; + } + unlink(name); + run_job(queuedir, fd); + close(fd); + free(name); + } + + rewinddir(dir); + fdclosedir(dir); +} + +int +main(int argc, char **argv) +{ + int kq, dfd, ret; + struct kevent kv; + char *queuedir = NULL, *newdir = NULL; + + if (argc > 1) { + queuedir = strdup(argv[1]); + } else { + queuedir = strdup(DEFAULT_QUEUE_DIR); + } + + asprintf(&newdir, "%s/new", queuedir); + + dfd = open(newdir, O_RDONLY | O_DIRECTORY); + if (dfd == -1) + err(1, "open()"); + + kq = kqueue(); + if (kq == -1) + err(1, "kqueue()"); + + EV_SET(&kv, dfd, EVFILT_VNODE, EV_ADD | EV_CLEAR, NOTE_WRITE, 0, NULL); + + if (kevent(kq, &kv, 1, NULL, 0, NULL) == -1) + err(1, "kevent()"); + + process_queue(queuedir, dfd); + + for (;;) { + ret = kevent(kq, NULL, 0, &kv, 1, NULL); + if (ret == 0) + continue; + else if (ret == -1) { + if (errno == EINTR) + continue; + else + break; + } + + process_queue(queuedir, dfd); + } + + return 0; +} diff --git a/newbatch.c b/newbatch.c new file mode 100644 index 0000000..6b27234 --- /dev/null +++ b/newbatch.c @@ -0,0 +1,70 @@ +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <sys/time.h> +#include <errno.h> +#include <err.h> +#include <string.h> +#include <fcntl.h> + +#define DEFAULT_QUEUE_DIR "." + +int +main(int argc, char **argv) +{ + pid_t mypid; + struct timeval tv; + int64_t microtime; + char *timestr = NULL, *queuedir = NULL, *tmpfile = NULL, *newfile = NULL; + char buf[1024], *tmp; + int fd, l, m; + + mypid = getpid(); + gettimeofday(&tv, NULL); + microtime = tv.tv_sec * 1000000 + tv.tv_usec; + asprintf(×tr, "%li.%u", microtime, mypid); + + if (argc > 1) { + queuedir = strdup(argv[1]); + } else { + queuedir = strdup(DEFAULT_QUEUE_DIR); + } + + asprintf(&tmpfile, "%s/tmp/%s", queuedir, timestr); + asprintf(&newfile, "%s/new/%s", queuedir, timestr); + free(timestr); + free(queuedir); + + fd = open(tmpfile, O_WRONLY | O_CREAT | O_EXCL, 0660); + if (fd == -1) + err(1, "open()"); + + while ((l = read(STDIN_FILENO, buf, sizeof(buf))) != 0) { + if (l == -1) { + unlink(tmpfile); + err(1, NULL); + } + + tmp = buf; + while (l > 0) { + m = write(fd, tmp, l); + if (m == -1) { + unlink(tmpfile); + err(1, NULL); + } + l -= m; + tmp += m; + } + } + close(fd); + + if (rename(tmpfile, newfile) == -1) { + unlink(tmpfile); + err(1, NULL); + } + + free(tmpfile); + free(newfile); + + return 0; +} |