1: <?php
2:
3: chdir(__DIR__);
4:
5:
6: if (!isset($job)) {
7: extract(unserialize(file_get_contents($argv[1])));
8: }
9: require_once $configfile;
10: require_once 'functions.php';
11:
12: execute_job($job);
13: global $return_value;
14: exit($return_value);
15:
16:
17: 18: 19: 20: 21: 22: 23: 24: 25: 26:
27:
28: function execute_job($job) {
29: trigger_error(sprintf("starting job id: %d", $job['running_query_id']), E_USER_NOTICE);
30: global $die_on_timeout;
31: global $job_id;
32: $job_id = $job['running_query_id'];
33: $pdo = pdo_connect();
34: $pdo->prepare('SELECT report_job_pid(?,?)')->execute(array($job_id, getmypid()));
35:
36: $end_time = microtime(true) + $job['max_lifetime'];
37: $send_keepalive = true;
38: $die_on_timeout = true;
39:
40:
41: $die_or_keepalive = function() use (&$end_time, &$send_keepalive, &$die_on_timeout, &$pdo, &$job_id) {
42:
43: if ($die_on_timeout && $end_time < microtime(true)) {
44: trigger_error("timed out, exiting", E_USER_NOTICE);
45: exit(-1);
46: }
47: static $next_keepalive = 0;
48:
49: if ($send_keepalive && $next_keepalive < microtime(true)) {
50:
51: static $keepalive_statement = null;
52: if ($keepalive_statement == null)
53: $keepalive_statement = $pdo->prepare('SELECT keepalive_ping(?)');
54:
55: trigger_error("sending keepalive", E_USER_NOTICE);
56: $keepalive_statement->execute(array($job_id));
57: $keepalive_timeout = $keepalive_statement->fetchColumn();
58: if ($keepalive_timeout == -1) {
59:
60:
61: $end_time = 0;
62:
63: $send_keepalive = false;
64: trigger_error("returned -1, will die on next occasion", E_USER_NOTICE);
65: } else {
66:
67: $next_keepalive = microtime(true) + $keepalive_timeout - 3;
68: trigger_error(sprintf("next keepalive in %d seconds", $keepalive_timeout - 3), E_USER_NOTICE);
69: }
70: }
71: };
72: register_tick_function($die_or_keepalive);
73: declare(ticks = 10);
74: $dbfile = acquire_database($job['target_db'], $job['target_db_md5'], $job['target_db_download_uri']);
75:
76: $supported_programs = unserialize(SUPPORTED_PROGRAMS);
77: $program = $supported_programs[$job['programname']];
78: if (strpos($program, DIRECTORY_SEPARATOR) !== 0)
79: $program = __DIR__ . DIRECTORY_SEPARATOR . $program;
80:
81: $cmd = preg_replace('{([^\\\\]) }', '\\1\\\\ ', $program);
82: $cmd.= ' ' . $job['parameters'];
83: $cmd = str_replace('$DBFILE', $dbfile, $cmd);
84: execute_command(DATABASE_BASEDIR, $cmd, $job['query']);
85: report_results_cleanup();
86: }
87:
88: 89: 90: 91: 92:
93: function acquire_database($target_db, $target_db_md5, $target_db_download_uri) {
94:
95: $basedir = DATABASE_BASEDIR;
96: if (strpos($basedir, DIRECTORY_SEPARATOR) !== 0)
97: $basedir = __DIR__ . DIRECTORY_SEPARATOR . $basedir;
98:
99: $db_dir = $basedir . DIRECTORY_SEPARATOR . $target_db . '.' . $target_db_md5;
100: $db_file = $db_dir . DIRECTORY_SEPARATOR . $target_db;
101: $lockfile = $db_dir . '.lock';
102:
103: if (is_dir($db_dir) && !file_exists($lockfile)) {
104: return $db_file;
105: }
106: if (file_exists($lockfile)) {
107:
108:
109: usleep(1000 * 1000);
110:
111: return acquire_database($target_db, $target_db_md5, $target_db_download_uri);
112: }
113:
114: global $die_on_timeout;
115: $die_on_timeout = false;
116: touch($lockfile);
117: $download_file = $db_dir . '.download';
118: printf('will download %s to %s', $target_db_download_uri, $download_file);
119: try {
120: mkdir($db_dir, 777, true);
121: download($target_db_download_uri, $download_file);
122: if ($target_db_md5 !== ($real_md5 = md5_file($download_file)))
123: throw new Exception(sprintf('download md5 could not be validated. should be %s but was %s', $target_db_md5, $real_md5));
124: unzip($download_file, $db_dir);
125: } catch (Exception $e) {
126: rmdir($db_dir);
127: unlink($download_file);
128: unlink($lockfile);
129: throw $e;
130: }
131: unlink($download_file);
132: unlink($lockfile);
133:
134: $die_on_timeout = true;
135:
136: return $db_file;
137: }
138:
139: function download($uri, $target_file) {
140: if (function_exists('curl_version')) {
141: $fp = fopen($target_file, 'w+');
142: $ch = curl_init($uri);
143: curl_setopt($ch, CURLOPT_TIMEOUT, 50);
144: curl_setopt($ch, CURLOPT_FILE, $fp);
145: curl_setopt($ch, CURLOPT_FOLLOWLOCATION, true);
146:
147: curl_exec($ch);
148: curl_close($ch);
149: fclose($fp);
150: } else {
151: $out = array();
152: $retcode = -1;
153: exec('command -v wget', $out, $retcode);
154: if ($retcode != 0) {
155: throw new Exception('could neither find php-curl nor wget, could not download file');
156: }
157: $cmd = sprintf('command wget -O %2$s %1$s', escapeshellcmd($uri), escapeshellcmd($target_file));
158: exec($cmd, $out);
159: }
160: }
161:
162: function unzip($zipfile, $target_dir) {
163: $zip = new ZipArchive;
164: if ($zip->open($zipfile) === TRUE) {
165: $zip->extractTo($target_dir);
166: $zip->close();
167: }
168: else
169: throw new Exception(sprintf('problems opening zipfile %s', $zipfile));
170: }
171:
172: function execute_command($cwd, $cmd, $query) {
173: trigger_error("will execute $cmd", E_USER_NOTICE);
174: trigger_error("query sequence is \n$query", E_USER_NOTICE);
175:
176:
177: $descriptorspec = array(
178: 0 => array("pipe", "r"),
179: 1 => array("pipe", "w"),
180: 2 => array("pipe", "w")
181: );
182:
183: $pipes = array();
184:
185: global $process;
186: $process = proc_open($cmd, $descriptorspec, $pipes, $cwd, NULL, array('bypass_shell' => true));
187:
188: if (is_resource($process)) {
189:
190: global $stdout_collected, $stderr_collected, $return_value;
191: $return_value = -1;
192: $stdout_collected = $stderr_collected = '';
193:
194: fwrite($pipes[0], $query);
195: fclose($pipes[0]);
196:
197: while (true) {
198: $status = proc_get_status($process);
199:
200: $read = array($pipes[1], $pipes[2]);
201: if (($x = stream_select($read, $write = NULL, $except = NULL, 1, 200000)) > 0) {
202: foreach ($read as $pipe) {
203: switch ($pipe) {
204: case $pipes[1]:
205: $stdout_collected .= fgets($pipe);
206: break;
207: case $pipes[2]:
208: $stderr_collected .= fgets($pipe);
209: break;
210: }
211: }
212: }
213: if (isset($status) && !$status['running']) {
214: $return_value = $status['exitcode'];
215: break;
216: }
217: }
218: fclose($pipes[1]);
219: fclose($pipes[2]);
220:
221: proc_close($process);
222:
223: if (strpos($cmd, 'blast') !== FALSE) {
224:
225:
226:
227:
228: $stdout_collected = strtr($stdout_collected, sprintf('%c', 0xff), 'X');
229: }
230: }
231: }
232:
233: function report_results_cleanup() {
234:
235: global $process;
236: if (is_resource($process)) {
237: $status = proc_get_status($process);
238: if ($status['running'] == true) {
239: proc_terminate($process);
240: }
241: }
242:
243: global $job_id, $stdout_collected, $stderr_collected, $return_value;
244: pdo_connect()->prepare('SELECT report_job_result(?,?,?,?);')->execute(array($job_id, $return_value, $stdout_collected, $stderr_collected));
245:
246: trigger_error("reported processed job back", E_USER_NOTICE);
247: }
248:
249: ?>
250: