Source Code for Me (s-c.me)

Allows you to paste souce code to blogs! Adapted for Twitter! Here is Search Form in case you missed your code.
Tags: Php,$in,$processors_states,$processors,$proc_signal_pipes,$proc_data_pipes,$descriptorspec, Created At: 10/24/2016 7:54:12 PMViews:

HTML view:
Copy Source | Copy HTML
#!/usr/bin/php
<?php
// кол-во обработчиков
define('PROCESSORS_TO_SPAWN', 5);
// полный путь к обработчику
define('PROCESSOR_PATH', '/path/to/processor.php');
 
$in=fopen("php://stdin",'r');
// переводим стандартный ввод в неблокирующий режим
stream_set_blocking($in, 0);
 
// список флагов готовности обработчиков
$processors_states = array_fill(0, PROCESSORS_TO_SPAWN, true);
$processors = array();
$proc_signal_pipes = array();
$proc_data_pipes = array();
$descriptorspec = array(
  0 => array("pipe", "r"),
  1 => array("pipe", "w"),
  2 => array("file", "/dev/null", "a")
);
$buffer = array();
 
// запускаем обработчики и переводим канал чтения в неблокирующий режим
while (count($processors) < PROCESSORS_TO_SPAWN) {
  $processors[] = proc_open(PROCESSOR_PATH, $descriptorspec, $pipes, NULL, NULL);
  stream_set_blocking($pipes[1], 0);
  $proc_data_pipes[] = $pipes[0];
  $proc_signal_pipes[] = $pipes[1];
}
 
while(true) {
  $in_str = fgets($in);
  if($in_str !== false) {
    // тут можно проверять валидность строки лога
    if (true) {
    $buffer[] = $in_str;
    }
  }
  foreach ($processors_states as $proc_num => $processor_state) {
    // в неблокирующем режиме проверяем готовность обработчика
    if (fgets($proc_signal_pipes[$proc_num]) == '1') {
      $processors_states[$proc_num] = true;
    }
  }
  // пока есть свободные обработчики и очередь не пуста - скармливаем им данные
  while (count($buffer) > 0 and
    ($selected_proc = array_search(true, $processors_states)) !== false) {
    $item = array_shift($buffer);
    fwrite($proc_data_pipes[$selected_proc], $item);
    $processors_states[$selected_proc] = false;
  }
  // если стандартный ввод закрыт и очередь пуста - завершаем работу
  if (feof($in) and count($buffer) == 0) {
    break;
  }
  $check_list = $proc_signal_pipes;
  $check_list[] = $in;
  // ожидаем данных для чтения на стандартном вводе или из одного из обработчиков
  stream_select($check_list, $w = NULL, $e = NULL, 1);
}
 
// закрываем обработчики и прибираемся
foreach($processors as $proc_num => $proc) {
  fclose($proc_data_pipes[$proc_num]);
  fclose($proc_signal_pipes[$proc_num]);
  proc_close($proc);
}
fclose($in);
 
?>

Based on Manoli.Net's CodeFormatter. Made by Topbot (c) 2008-2018