Multiprocessing

Proc Open Multiprocessing

Multiprocessing (MP) is a powerful, simple and structured PHP library for multiprocessing and async communication.
MP relies on streams for communication with sub processes with no requirement on the PCNTL extension. On Windows MP requires the PHP sockets extension for inter-process communication.


Build status Code coverage Packagist

Features

Other Benefits

8 Process Fibonacci Example:

  1. Create a child process script fibo.php using the Task class.
     (new class extends Task
     {
         private function fibo($number)
         {
             if ($number == 0) {
                 return 0;
             } else if ($number == 1) {
                 return 1;
             }
        
             return ($this->fibo($number - 1) + $this->fibo($number - 2));
         }
        
         public function consume($number)
         {
             $this->write($this->fibo($number));
         }
     })->listen();
    
  2. Use the PoolFactory class to create a Pool instance with 8 workers.
     $collected = PoolFactory::create([
         "task" => "php fibo.php",
         "queue" => new WorkQueue(range(1, 30)),
         "num_processes" => 8,
         "done" => fn($data) => print($data . PHP_EOL),
         "error" => fn(Exception $e) => print($e->getTraceAsString() . PHP_EOL)
     ])->start();
    

    More Examples:

Main Script Workers
Simple Proc Pool Fibo Proc
WorkerPool Pipeline Step 1 Fibo -> Step 2 Waste Time -> Step 3 Waste More Time
AlphaVantage API - Draw Stock Charts Pipeline Download Stock Data -> Draw Stock Charts
Fibo Rate Limited Fibo Proc
Rate Limited Pipeline Step 1 Fibo -> Step 2 Waste Time -> Step 3 Waste More Time

Debugging Child Processes

If you use xdebug to debug your php code with a remote interpreter you can debug both the parent and child script by adding adding the debug line provided by the Debug class.

//create the debug string
$phpCommand = Debug::cli(["remote_port" => 9000, "serverName" => "SomeName"]);
//use this in your command string
$cmd = "$phpCommand fibo.php";
$collected = PoolFactory::create([
    "task" => $cmd,
    "queue" => new WorkQueue(range(1, 30)),
    "num_processes" => 8
])->start();

If you are using PHPStorm to debug you can now use the serverName as your server configuration and set up path mappings.

IPC

Inter-process communication is done either using streams or sockets. By default MP will choose streams on all non windows operating systems and sockets on windows. If using windows you will need to enable the sockets php extension. If using linux or osx you can choose either IPC mechanism.

//use the socket IPC
$collected = PoolFactory::create([
    "type" => Task::TYPE_SOCKET,
    "task" => "php fibo.php",
    "queue" => new WorkQueue(range(1, 30)),
    "num_processes" => 8
])->start();

//use the stream IPC
$collected = PoolFactory::create([
    "type" => Task::TYPE_STREAM,
    "task" => "php fibo.php",
    "queue" => new WorkQueue(range(1, 30)),
    "num_processes" => 8
])->start();

IPC can add time to process execution due to the communication overhead. We tested the difference in communication overhead using the Speed Test parent script and the Echo Proc child script. While streams is a faster IPC type in the simple echo test, IPC communication overhead difference becomes less important with real world work loads.