todo.c (5859B)
1 #include "mxf-send.h" 2 /* this file is too long ---------------------------------------------- TODO */ 3 4 datetime_sec nexttodorun; 5 DIR *tododir; /* if 0, have to opendir again */ 6 stralloc todoline = {0}; 7 char todobuf[SUBSTDIO_INSIZE]; 8 char todobufinfo[512]; 9 char todobufchan[CHANNELS][1024]; 10 11 void 12 todo_init() 13 { 14 tododir = 0; 15 nexttodorun = now(); 16 trigger_set(); 17 } 18 19 void 20 todo_selprep(int *nfds, fd_set * rfds, datetime_sec * wakeup) 21 { 22 if (flagexitasap) 23 return; 24 trigger_selprep(nfds, rfds); 25 if (tododir) 26 *wakeup = 0; 27 if (*wakeup > nexttodorun) 28 *wakeup = nexttodorun; 29 } 30 31 //XXX XXX - need to refactor this into a todo_doone 32 void 33 todo_do(fd_set * rfds) 34 { 35 struct stat st; 36 substdio ss; 37 int fd; 38 substdio ssinfo; 39 int fdinfo; 40 substdio sschan[CHANNELS]; 41 int fdchan[CHANNELS]; 42 int flagchan[CHANNELS]; 43 struct prioq_elt pe; 44 char ch; 45 int match; 46 unsigned long id; 47 unsigned int len; 48 direntry *dent; 49 int c; 50 unsigned long uid; 51 unsigned long pid; 52 53 fd = -1; 54 fdinfo = -1; 55 for (c = 0; c < CHANNELS; ++c) 56 fdchan[c] = -1; 57 58 if (flagexitasap) 59 return; 60 61 if (!tododir) { 62 if (!trigger_pulled(rfds)) 63 if (recent < nexttodorun) 64 return; 65 trigger_set(); 66 tododir = opendir("todo"); 67 if (!tododir) { 68 pausedir("todo"); 69 return; 70 } 71 nexttodorun = recent + SLEEP_TODO; 72 } 73 74 dent = readdir(tododir); 75 if (!dent) { 76 closedir(tododir); 77 tododir = 0; 78 return; 79 } 80 if (str_equal(dent->d_name, ".")) 81 return; 82 if (str_equal(dent->d_name, "..")) 83 return; 84 len = scan_ulong(dent->d_name, &id); 85 if (!len || dent->d_name[len]) 86 return; 87 88 fnmake_todo(id, &fn); 89 90 fd = open_read(fn.s); /* fn just comes out of nowhere. */ 91 if (fd == -1) { 92 log3("warning: unable to open ", fn.s, "\n"); 93 return; 94 } 95 96 fnmake_mess(id, &fn); 97 /* just for the statistics */ 98 if (stat(fn.s, &st) == -1) { 99 log3("warning: unable to stat ", fn.s, "\n"); 100 goto fail; 101 } 102 103 for (c = 0; c < CHANNELS; ++c) { 104 fnmake_chanaddr(id, c, &fn); 105 if (unlink(fn.s) == -1) 106 if (errno != error_noent) { 107 log3("warning: unable to unlink ", fn.s, "\n"); 108 goto fail; 109 } 110 } 111 112 fnmake_info(id, &fn); 113 if (unlink(fn.s) == -1) 114 if (errno != error_noent) { 115 log3("warning: unable to unlink ", fn.s, "\n"); 116 goto fail; 117 } 118 119 fdinfo = open_excl(fn.s); 120 if (fdinfo == -1) { 121 log3("warning: unable to create ", fn.s, "\n"); 122 goto fail; 123 } 124 125 strnum3[fmt_ulong(strnum3, id)] = 0; 126 log3("new msg ", strnum3, "\n"); 127 128 for (c = 0; c < CHANNELS; ++c) 129 flagchan[c] = 0; 130 131 substdio_fdbuf(&ss, read, fd, todobuf, sizeof(todobuf)); 132 substdio_fdbuf(&ssinfo, write, fdinfo, todobufinfo, sizeof(todobufinfo)); 133 134 uid = 0; 135 pid = 0; 136 137 for (;;) { 138 if (getln(&ss, &todoline, &match, '\0') == -1) { 139 /* perhaps we're out of memory, perhaps an I/O error */ 140 fnmake_todo(id, &fn); 141 log3("warning: trouble reading ", fn.s, "\n"); 142 goto fail; 143 } 144 if (!match) 145 break; 146 147 switch (todoline.s[0]) { 148 case 'u': 149 scan_ulong(todoline.s + 1, &uid); 150 break; 151 case 'p': 152 scan_ulong(todoline.s + 1, &pid); 153 break; 154 case 'F': 155 if (substdio_putflush(&ssinfo, todoline.s, todoline.len) == -1) { 156 fnmake_info(id, &fn); 157 log3("warning: trouble writing to ", fn.s, "\n"); 158 goto fail; 159 } 160 qslog2("info msg ", strnum3); 161 strnum2[fmt_ulong(strnum2, (unsigned long)st.st_size)] = 0; 162 qslog2(": bytes ", strnum2); 163 log1(" from <"); 164 logsafe(todoline.s + 1); 165 strnum2[fmt_ulong(strnum2, pid)] = 0; 166 qslog2("> qp ", strnum2); 167 strnum2[fmt_ulong(strnum2, uid)] = 0; 168 qslog2(" uid ", strnum2); 169 log1("\n"); 170 break; 171 case 'T': 172 switch (c = rewrite(todoline.s + 1)) { 173 case -1: 174 if (errno == ENOMEM) nomem(); 175 goto fail; 176 default: 177 //c = 0; 178 break; 179 } 180 if (fdchan[c] == -1) { 181 fnmake_chanaddr(id, c, &fn); 182 fdchan[c] = open_excl(fn.s); 183 if (fdchan[c] == -1) { 184 log3("warning: unable to create ", fn.s, "\n"); 185 goto fail; 186 } 187 substdio_fdbuf(&sschan[c] 188 ,write, fdchan[c], todobufchan[c], sizeof(todobufchan[c])); 189 flagchan[c] = 1; 190 } 191 if (substdio_bput(&sschan[c], rwline.s, rwline.len) == -1) { 192 fnmake_chanaddr(id, c, &fn); 193 log3("warning: trouble writing to ", fn.s, "\n"); 194 goto fail; 195 } 196 break; 197 default: 198 fnmake_todo(id, &fn); 199 log3("warning: unknown record type in ", fn.s, "\n"); 200 goto fail; 201 } 202 } 203 204 close(fd); 205 fd = -1; 206 207 fnmake_info(id, &fn); 208 if (substdio_flush(&ssinfo) == -1) { 209 log3("warning: trouble writing to ", fn.s, "\n"); 210 goto fail; 211 } 212 if (fsync(fdinfo) == -1) { 213 log3("warning: trouble fsyncing ", fn.s, "\n"); 214 goto fail; 215 } 216 close(fdinfo); 217 fdinfo = -1; 218 219 for (c = 0; c < CHANNELS; ++c) 220 if (fdchan[c] != -1) { 221 fnmake_chanaddr(id, c, &fn); 222 if (substdio_flush(&sschan[c]) == -1) { 223 log3("warning: trouble writing to ", fn.s, "\n"); 224 goto fail; /* jump... */ 225 } 226 if (fsync(fdchan[c]) == -1) { 227 log3("warning: trouble fsyncing ", fn.s, "\n"); 228 goto fail; 229 } 230 close(fdchan[c]); 231 fdchan[c] = -1; 232 } 233 234 fnmake_todo(id, &fn); 235 if (substdio_putflush(&sstoqc, fn.s, fn.len) == -1) { 236 cleandied(); /* the clean pipe is no longer writable. flag that up, and exit ASAP. ~Mel */ 237 return; 238 } 239 if (substdio_get(&ssfromqc, &ch, 1) != 1) { 240 cleandied(); /* ^ */ 241 return; 242 } 243 if (ch != '+') { 244 log3("warning: qmail-clean unable to clean up ", fn.s, "\n"); 245 return; 246 } 247 248 pe.id = id; 249 pe.dt = now(); 250 for (c = 0; c < CHANNELS; ++c) 251 if (flagchan[c]) 252 while (!prioq_insert(&pqchan[c], &pe)) 253 nomem(); 254 255 for (c = 0; c < CHANNELS; ++c) 256 if (flagchan[c]) 257 break; 258 if (c == CHANNELS) 259 while (!prioq_insert(&pqdone, &pe)) 260 nomem(); 261 262 return; 263 264 fail: 265 /* this little number is called under exception. It cleans up 266 * the state of the message. */ 267 if (fd != -1) 268 close(fd); 269 if (fdinfo != -1) 270 close(fdinfo); 271 for (c = 0; c < CHANNELS; ++c) 272 if (fdchan[c] != -1) 273 close(fdchan[c]); 274 } 275 276 int todo_do_onemsg(unsigned long msgid) 277 { 278 }