summaryrefslogtreecommitdiffstats
path: root/batchd.c
diff options
context:
space:
mode:
authorMarius Halden <marius.h@lden.org>2016-08-30 21:42:30 +0200
committerMarius Halden <marius.h@lden.org>2016-08-30 21:42:30 +0200
commit3039a05f3d0164cf3001de52d872d59ebcacff96 (patch)
tree8f7da04af2340c8dc59707fe950a01412b12c8dc /batchd.c
parent0b7fd61b6d460cc47ac639c9d6d23b8cc05d4e68 (diff)
downloadrunq-3039a05f3d0164cf3001de52d872d59ebcacff96.tar.gz
runq-3039a05f3d0164cf3001de52d872d59ebcacff96.tar.bz2
runq-3039a05f3d0164cf3001de52d872d59ebcacff96.tar.xz
Keep track of running jobs
Diffstat (limited to 'batchd.c')
-rw-r--r--batchd.c81
1 files changed, 68 insertions, 13 deletions
diff --git a/batchd.c b/batchd.c
index ceb3911..fb37b49 100644
--- a/batchd.c
+++ b/batchd.c
@@ -15,15 +15,22 @@
#define DEFAULT_QUEUE_DIR "."
#define MAX_JOBS 2
+struct job {
+ pid_t pid;
+ char *file;
+};
+
int cur_jobs = 0;
+struct job jobs[MAX_JOBS];
-void
+pid_t
run_job(char *queuedir, int fd)
{
char *run;
- switch(fork()) {
+ pid_t pid;
+ switch((pid = fork())) {
case -1:
- return;
+ return -1;
case 0:
asprintf(&run, "%s/run", queuedir);
dup2(fd, STDIN_FILENO);
@@ -32,16 +39,19 @@ run_job(char *queuedir, int fd)
perror("execle()");
_exit(1);
break;
+ default:
+ return pid;
}
}
void
process_queue(char *queuedir, int dfd)
{
- int fd;
+ int fd, i;
DIR *dir;
struct dirent *de;
- char *name;
+ char *name, *work, *failed;
+ pid_t pid;
dir = fdopendir(dfd);
while ((de = readdir(dir))) {
@@ -52,17 +62,32 @@ process_queue(char *queuedir, int dfd)
continue;
asprintf(&name, "new/%s", de->d_name);
+ asprintf(&work, "work/%s", de->d_name);
+ asprintf(&failed, "failed/%s", de->d_name);
fd = open(name, O_RDONLY);
if (fd == -1) {
- free(name);
- continue;
+ goto end;
+ }
+ rename(name, work);
+ pid = run_job(queuedir, fd);
+ if (pid == -1) {
+ rename(work, failed);
+ return;
+ }
+ for (i = 0; i < MAX_JOBS; i++) {
+ if (jobs[i].pid == -1) {
+ jobs[i].pid = pid;
+ jobs[i].file = strdup(de->d_name);
+ }
}
- unlink(name);
- run_job(queuedir, fd);
close(fd);
- free(name);
cur_jobs++;
+
+end:
+ free(name);
+ free(work);
+ free(failed);
}
rewinddir(dir);
@@ -72,8 +97,11 @@ process_queue(char *queuedir, int dfd)
void
wait_all()
{
+ int i, status;
+ char *work, *done, *failed;
+
for (;;) {
- int r = waitpid(-1, NULL, WNOHANG);
+ pid_t r = waitpid(-1, &status, WNOHANG);
if (r == 0)
break;
else if (r == -1) {
@@ -82,17 +110,44 @@ wait_all()
else
err(1, "waitpid()");
}
- cur_jobs--;
+
+ for (i = 0; i < MAX_JOBS; i++) {
+ if (jobs[i].pid == r) {
+ asprintf(&work, "work/%s", jobs[i].file);
+ asprintf(&done, "done/%s", jobs[i].file);
+ asprintf(&failed, "failed/%s", jobs[i].file);
+
+ if (WEXITSTATUS(status) == 0)
+ rename(work, done);
+ else
+ rename(work, failed);
+
+ jobs[i].pid = -1;
+ free(jobs[i].file);
+ jobs[i].file = NULL;
+
+ cur_jobs--;
+
+ free(work);
+ free(done);
+ free(failed);
+ }
+ }
}
}
int
main(int argc, char **argv)
{
- int kq, dfd, ret;
+ int kq, dfd, ret, i;
struct kevent kv[2];
char *queuedir = NULL, *newdir = NULL;
+ for (i = 0; i < MAX_JOBS; i++) {
+ jobs[i].pid = -1;
+ jobs[i].file = NULL;
+ }
+
if (argc > 1) {
queuedir = strdup(argv[1]);
} else {