pass.c (5235B)
1 #include "mxf-send.h" 2 3 /* this file is too long -------------------------------------------- PASSES */ 4 5 struct { 6 unsigned long id; /* if 0, need a new pass */ 7 int j; /* defined if id; job number */ 8 int fd; /* defined if id; reading from {local,remote} */ 9 seek_pos mpos; /* defined if id; mark position */ 10 substdio ss; 11 char buf[128]; 12 } 13 pass[CHANNELS]; 14 15 void 16 pass_init() 17 { 18 int c; 19 for (c = 0; c < CHANNELS; ++c) 20 pass[c].id = 0; 21 } 22 23 void 24 pass_selprep(datetime_sec * wakeup) 25 { 26 /* -Lightning looks confused- Huh? Frick, I need to write a dictionary. */ 27 int c; 28 struct prioq_elt pe; 29 if (flagexitasap) 30 return; 31 for (c = 0; c < CHANNELS; ++c) 32 if (pass[c].id) 33 if (del_avail(c)) { 34 *wakeup = 0; 35 return; 36 } 37 if (job_avail()) 38 for (c = 0; c < CHANNELS; ++c) 39 if (!pass[c].id) 40 if (prioq_min(&pqchan[c], &pe)) 41 if (*wakeup > pe.dt) 42 *wakeup = pe.dt; 43 if (prioq_min(&pqfail, &pe)) 44 if (*wakeup > pe.dt) 45 *wakeup = pe.dt; 46 if (prioq_min(&pqdone, &pe)) 47 if (*wakeup > pe.dt) 48 *wakeup = pe.dt; 49 } 50 51 static datetime_sec squareroot(datetime_sec x) { 52 /* result^2 <= x < (result + 1)^2 */ 53 /* assuming: >= 0 */ 54 datetime_sec y; 55 datetime_sec yy; 56 datetime_sec y21; 57 int j; 58 59 y = 0; 60 yy = 0; 61 for (j = 15; j >= 0; --j) { 62 y21 = (y << (j + 1)) + (1 << (j + j)); 63 if (y21 <= x - yy) { 64 y += (1 << j); 65 yy += y21; 66 } 67 } 68 return y; 69 } 70 71 datetime_sec 72 nextretry(datetime_sec birth, int c) 73 { 74 datetime_sec n; 75 76 if (birth > recent) 77 n = 0; 78 else 79 n = squareroot(recent - birth); /* no need to add fuzz to 80 * recent */ 81 n += chanskip[c]; 82 return birth + n * n; 83 } 84 85 void 86 pass_dochan(int c) 87 { 88 datetime_sec birth; 89 struct prioq_elt pe; 90 static stralloc line = {0}; /* TA_ZERO */ 91 int match; 92 93 if (flagexitasap) /* reporting to demob soon, no need to do this */ 94 return; 95 96 if (!pass[c].id) { 97 if (!job_avail()) 98 return; 99 if (!prioq_min(&pqchan[c], &pe)) 100 return; 101 if (pe.dt > recent) 102 return; 103 fnmake_chanaddr(pe.id, c, &fn); 104 105 prioq_delmin(&pqchan[c]); 106 pass[c].mpos = 0; 107 pass[c].fd = open_read(fn.s); 108 if (pass[c].fd == -1) 109 goto trouble; 110 if (!getinfo(&line, &birth, pe.id)) { 111 close(pass[c].fd); 112 goto trouble; 113 } 114 pass[c].id = pe.id; 115 substdio_fdbuf(&pass[c].ss, read, pass[c].fd, pass[c].buf, sizeof(pass[c].buf)); 116 pass[c].j = job_open(pe.id, c); 117 jo[pass[c].j].retry = nextretry(birth, c); 118 jo[pass[c].j].flagdying = (recent > birth + lifetime); 119 while (!stralloc_copy(&jo[pass[c].j].sender, &line)) 120 nomem(); 121 } 122 123 if (!del_avail(c)) 124 return; 125 126 if (getln(&pass[c].ss, &line, &match, '\0') == -1) { 127 fnmake_chanaddr(pass[c].id, c, &fn); 128 log3("warning: trouble reading ", fn.s, "; will try again later\n"); 129 close(pass[c].fd); 130 job_close(pass[c].j); 131 pass[c].id = 0; 132 return; 133 } 134 if (!match) { 135 close(pass[c].fd); 136 jo[pass[c].j].flaghiteof = 1; 137 job_close(pass[c].j); 138 pass[c].id = 0; 139 return; 140 } 141 switch (line.s[0]) { 142 case 'T': 143 ++jo[pass[c].j].numtodo; 144 del_start(pass[c].j, pass[c].mpos, line.s + 1); 145 break; 146 case 'D': 147 break; 148 default: 149 fnmake_chanaddr(pass[c].id, c, &fn); 150 log3("warning: unknown record type in ", fn.s, "!\n"); 151 close(pass[c].fd); 152 job_close(pass[c].j); 153 pass[c].id = 0; 154 return; 155 } 156 157 pass[c].mpos += line.len; 158 return; 159 160 trouble: 161 log3("warning: trouble opening ", fn.s, "; will try again later\n"); 162 pe.dt = recent + SLEEP_SYSFAIL; 163 while (!prioq_insert(&pqchan[c], &pe)) 164 nomem(); 165 } 166 167 void 168 messdone(unsigned long id) 169 { 170 char ch; 171 int c; 172 struct prioq_elt pe; 173 struct stat st; 174 175 for (c = 0; c < CHANNELS; ++c) { 176 fnmake_chanaddr(id, c, &fn); 177 if (stat(fn.s, &st) == 0) 178 return; /* false alarm; consequence of HOPEFULLY */ 179 if (errno != error_noent) { 180 log3("warning: unable to stat ", fn.s, "; will try again later\n"); 181 goto fail; 182 } 183 } 184 185 fnmake_todo(id, &fn); 186 if (stat(fn.s, &st) == 0) 187 return; 188 if (errno != error_noent) { 189 log3("warning: unable to stat ", fn.s, "; will try again later\n"); 190 goto fail; 191 } 192 193 fnmake_info(id, &fn); 194 if (stat(fn.s, &st) == -1) { 195 if (errno == error_noent) 196 return; 197 log3("warning: unable to stat ", fn.s, "; will try again later\n"); 198 goto fail; 199 } 200 201 /* -todo +info -local -remote ?bounce */ 202 if (!injectbounce(id)) 203 goto fail; /* injectbounce() produced error message */ 204 205 strnum3[fmt_ulong(strnum3, id)] = 0; 206 log3("end msg ", strnum3, "\n"); 207 208 /* -todo +info -local -remote -bounce */ 209 fnmake_info(id, &fn); 210 if (unlink(fn.s) == -1) { 211 log3("warning: unable to unlink ", fn.s, "; will try again later\n"); 212 goto fail; 213 } 214 215 /* -todo -info -local -remote -bounce; we can relax */ 216 fnmake_foop(id, &fn); 217 if (substdio_putflush(&sstoqc, fn.s, fn.len) == -1) { 218 cleandied(); 219 return; 220 } 221 if (substdio_get(&ssfromqc, &ch, 1) != 1) { 222 cleandied(); 223 return; 224 } 225 if (ch != '+') 226 log3("warning: qmail-clean unable to clean up ", fn.s, "\n"); 227 228 return; 229 230 fail: 231 pe.id = id; 232 pe.dt = now() + SLEEP_SYSFAIL; 233 while (!prioq_insert(&pqdone, &pe)) 234 nomem(); 235 } 236 237 void 238 pass_do() 239 { 240 int c; 241 struct prioq_elt pe; 242 243 for (c = 0; c < CHANNELS; ++c) 244 pass_dochan(c); 245 if (prioq_min(&pqfail, &pe)) 246 if (pe.dt <= recent) { 247 prioq_delmin(&pqfail); 248 pqadd(pe.id); 249 } 250 if (prioq_min(&pqdone, &pe)) 251 if (pe.dt <= recent) { 252 prioq_delmin(&pqdone); 253 messdone(pe.id); 254 } 255 }