summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--batchd.c110
-rw-r--r--newbatch.c70
2 files changed, 180 insertions, 0 deletions
diff --git a/batchd.c b/batchd.c
new file mode 100644
index 0000000..f3480ba
--- /dev/null
+++ b/batchd.c
@@ -0,0 +1,110 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/event.h>
+#include <sys/time.h>
+#include <errno.h>
+#include <err.h>
+#include <dirent.h>
+#include <string.h>
+#include <fcntl.h>
+#include <sys/wait.h>
+
+#define DEFAULT_QUEUE_DIR "."
+
+void
+run_job(char *queuedir, int fd)
+{
+ char *run;
+ switch(fork()) {
+ case -1:
+ return;
+ case 0:
+ asprintf(&run, "%s/run", queuedir);
+ dup2(fd, STDIN_FILENO);
+ close(fd);
+ execl(run, "run", (char*)NULL);
+ perror("execle()");
+ _exit(1);
+ default:
+ wait(NULL);
+ }
+}
+
+void
+process_queue(char *queuedir, int dfd)
+{
+ int fd;
+ DIR *dir;
+ struct dirent *de;
+ char *name;
+
+ dir = fdopendir(dfd);
+ while ((de = readdir(dir))) {
+ if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0)
+ continue;
+
+ asprintf(&name, "new/%s", de->d_name);
+
+ fd = open(name, O_RDONLY);
+ if (fd == -1) {
+ free(name);
+ continue;
+ }
+ unlink(name);
+ run_job(queuedir, fd);
+ close(fd);
+ free(name);
+ }
+
+ rewinddir(dir);
+ fdclosedir(dir);
+}
+
+int
+main(int argc, char **argv)
+{
+ int kq, dfd, ret;
+ struct kevent kv;
+ char *queuedir = NULL, *newdir = NULL;
+
+ if (argc > 1) {
+ queuedir = strdup(argv[1]);
+ } else {
+ queuedir = strdup(DEFAULT_QUEUE_DIR);
+ }
+
+ asprintf(&newdir, "%s/new", queuedir);
+
+ dfd = open(newdir, O_RDONLY | O_DIRECTORY);
+ if (dfd == -1)
+ err(1, "open()");
+
+ kq = kqueue();
+ if (kq == -1)
+ err(1, "kqueue()");
+
+ EV_SET(&kv, dfd, EVFILT_VNODE, EV_ADD | EV_CLEAR, NOTE_WRITE, 0, NULL);
+
+ if (kevent(kq, &kv, 1, NULL, 0, NULL) == -1)
+ err(1, "kevent()");
+
+ process_queue(queuedir, dfd);
+
+ for (;;) {
+ ret = kevent(kq, NULL, 0, &kv, 1, NULL);
+ if (ret == 0)
+ continue;
+ else if (ret == -1) {
+ if (errno == EINTR)
+ continue;
+ else
+ break;
+ }
+
+ process_queue(queuedir, dfd);
+ }
+
+ return 0;
+}
diff --git a/newbatch.c b/newbatch.c
new file mode 100644
index 0000000..6b27234
--- /dev/null
+++ b/newbatch.c
@@ -0,0 +1,70 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <sys/time.h>
+#include <errno.h>
+#include <err.h>
+#include <string.h>
+#include <fcntl.h>
+
+#define DEFAULT_QUEUE_DIR "."
+
+int
+main(int argc, char **argv)
+{
+ pid_t mypid;
+ struct timeval tv;
+ int64_t microtime;
+ char *timestr = NULL, *queuedir = NULL, *tmpfile = NULL, *newfile = NULL;
+ char buf[1024], *tmp;
+ int fd, l, m;
+
+ mypid = getpid();
+ gettimeofday(&tv, NULL);
+ microtime = tv.tv_sec * 1000000 + tv.tv_usec;
+ asprintf(&timestr, "%li.%u", microtime, mypid);
+
+ if (argc > 1) {
+ queuedir = strdup(argv[1]);
+ } else {
+ queuedir = strdup(DEFAULT_QUEUE_DIR);
+ }
+
+ asprintf(&tmpfile, "%s/tmp/%s", queuedir, timestr);
+ asprintf(&newfile, "%s/new/%s", queuedir, timestr);
+ free(timestr);
+ free(queuedir);
+
+ fd = open(tmpfile, O_WRONLY | O_CREAT | O_EXCL, 0660);
+ if (fd == -1)
+ err(1, "open()");
+
+ while ((l = read(STDIN_FILENO, buf, sizeof(buf))) != 0) {
+ if (l == -1) {
+ unlink(tmpfile);
+ err(1, NULL);
+ }
+
+ tmp = buf;
+ while (l > 0) {
+ m = write(fd, tmp, l);
+ if (m == -1) {
+ unlink(tmpfile);
+ err(1, NULL);
+ }
+ l -= m;
+ tmp += m;
+ }
+ }
+ close(fd);
+
+ if (rename(tmpfile, newfile) == -1) {
+ unlink(tmpfile);
+ err(1, NULL);
+ }
+
+ free(tmpfile);
+ free(newfile);
+
+ return 0;
+}