php实现多线程(实际是多进程,夸平台)

  • 来源: 互联网 作者: rocket   2008-03-17/10:20
  • 代码实现了一个简单的多进程管理机制,比向WEB服务器发送多个请求要实现多进程要方便很多。只能使用在cli模式。可以用在特殊场合,如邮件发送任务等。
    资源的共享访问使用了文件锁,并不是很可靠,主要是为了能够在Windwos下使用,如果确实有必要可以考虑自己改用相应的信号灯机制(这个扩展只能用于xUNIX)。

    实例

    [复制PHP代码] [ - ]
    PHP代码如下:
    define('DIR_PHP_EXEC''php');
    define('DIR_MAIN_EXEC'__FILE__
    );
    define('DIR_TMP''/tmp'
    );
    require_once(
    'my_process.php'
    );

    class 
    pp extends my_process_base 
    {
        public function 
    run($param null
    ) {
            for (
    $i 0$i 4$i
    ++) {
                echo 
    "111 $param"
    ;
                
    sleep(1
    );
            }
        }
    }

    init_my_process
    ();
    $obj $GLOBALS['gal_obj_process_m'
    ];
    if (
    $obj->is_main
    ()) {
        
    $obj->run_task('pp''a'
    );
        
    $obj->run_task('pp''b'
    );
        
    $obj->run_task('pp''c'
    );
        
    $obj->run_task('pp''d'
    );
        
    //$obj->run_task('pp', 'b');
        
    $obj->set_max_run(10
    );
        
    $obj->run
    ();
    }


    进程管理类

    [复制PHP代码] [ - ]
    PHP代码如下:
    /**
     * @copyright 2007 movivi
     * @author  徐智  <[email=xzfred@gmail.com]xzfred@gmail.com[/email]>
     *
     * $Id: getPage.php 11 2007-09-21 02:15:01Z fred $
     */
    if (!defined('DIR_PHP_EXEC')) define('DIR_PHP_EXEC''php'
    );
    //if (!defined('DIR_MAIN_EXEC')) define('DIR_MAIN_EXEC', '');
    if (!defined('DIR_TMP')) define('DIR_TMP'''
    );
    /*****************************************************************************/
    /* 初始化 */
    define('CMD_MAIN_PROCESS_KEY''main_process_key'
    );
    define('CMD_CHILD_PROCESS_NAME''child_process_name'
    );
    define('CMD_CHILD_PROCESS_PARAM''child_process_param'
    );

    function 
    init_my_process
    () {
        
    $GLOBALS['gal_obj_cmd'] = new my_cmd_argv
    ();
        
    $key $GLOBALS['gal_obj_cmd']->get_value(CMD_MAIN_PROCESS_KEY
    );
        
    $key $key === false '' $key
    ;
        
    $GLOBALS['gal_obj_process_m'] = new my_process_m($key
    );
        if (!
    $GLOBALS['gal_obj_process_m']->is_main()) $GLOBALS['gal_obj_process_m']->run
    () ;
    }

    /**
     * php多进程类
     * 
     * 你需要从这个对象继承,然后实现你自己的run处理
     */
    abstract class my_process_base 
    {
        public function 
    __construct($auto_run=true$name=''
    ) {
        }

        public function 
    __destruct
    () {
            echo 
    "@end"
    ;
        }

        abstract public function 
    run($param null
    );
    }


    class 
    my_cmd_argv 
    {
        private 
    $cmd_argv 
    = array();
        public function 
    __construct
    () {
            
    $argv $_SERVER['argv'
    ];
            for (
    $i 1$i count($argv); $i
    ++) {
                
    $cmd explode('='$argv[$i
    ]);
                
    $this->cmd_argv[$cmd[0]] = isset($cmd[1]) ? $cmd[1] : ''
    ;
            }
        }

        public function 
    get_key($key
    ) {
            return isset(
    $this->cmd_argv[$key
    ]);
        }

        public function 
    get_value($key
    ) {
            return isset(
    $this->cmd_argv[$key]) ? $this->cmd_argv[$key] : false

        }
    }

    /**
     * php多进程管理类
     * 可以在PHP中实现多进程处理,只限在控制台方式使用
     * 当前的信号实现机制采用文件方式
     *
     */
    class my_process_m 
    {
        
    /**
         * @var array $task_list 
         * 进程列表
         */
        
    private $task_list 
    = array();
        private 
    $lock_list 
    = array();
        private 
    $lock null
    ;
        private 
    $is_main false
    ;
        private 
    $max_run 3600000
    ;

        private function 
    release_lock($key null
    ) {
            
    $lock = &$this->lock_list
    ;
            if (!
    is_null($key
    )) {
                
    $key md5($this->build_lock_id($key
    ));
                if (isset(
    $lock[$key
    ])) {
                    if (
    is_resource($lock[$key][0])) fclose($lock[$key][0
    ]);
                    
    unlink($lock[$key][1
    ]);
                    unset(
    $lock[$key
    ]);
                }
                return 
    true
    ;
            }

            foreach (
    $lock as $k => $h
    ) {
                if (
    is_resource($h)) fclose($h
    );
                unset(
    $lock[$k
    ]);
            }
            return 
    true
    ;
        }

        private function 
    release_task($key null
    ) {
            
    $task = &$this->task_list
    ;
            if (!
    is_null($key) && isset($task[$key
    ])) {
                if (
    is_resource($task[$key])) pclose($task[$key
    ]);
                unset(
    $task[$key
    ]);
            } else {
                foreach (
    $task as $k => $h
    ) {
                    if (
    is_resource($h)) pclose($h
    );
                    unset(
    $task[$k
    ]);
                }
            }
            return 
    true
    ;
        }
                
        private function 
    build_lock_id($key
    ) {
            return 
    DIR_TMP DIRECTORY_SEPARATOR $key '_sem.lock'
    ;
        }

        protected function 
    run_child_process
    () {
            
    $class $GLOBALS['gal_obj_cmd']->get_value(CMD_CHILD_PROCESS_NAME
    );
            
    $param $GLOBALS['gal_obj_cmd']->get_value(CMD_CHILD_PROCESS_PARAM
    );
            
    $param $param == '' null unserialize(base64_decode(trim($param
    )));
            
    $obj = new $class
    ();
            
    $obj->run($param
    );
            
    $this->task_list[] = $obj
    ;
        }

        public function 
    __construct($lock=''
    ) {
            if (
    $lock === ''
    ) {
                
    $this->is_main true
    ;
                
    $key md5(uniqid()) . '_main.my_process'
    ;
                
    $lock = array($key$this->get($key
    ));
            } else {
                
    $this->is_main false

                
    $lock = array($lock0
    );
            }
            
    $this->lock $lock
    ;
        }

        public function 
    __destruct
    () {
            
    $this->release_lock
    ();
            
    $this->release_task
    ();
        }

        
    /** 
         * 停止所有进程
         *
         */
        
    public function stop_all
    () {
        }

        
    /**
         * 是否是主进程
         *
         */
        
    public function is_main
    () {
            return 
    $this->is_main
    ;
        }

        
    /**
         * 是不是已经存在一个活动信号
         *
         * @param   string      $key
         * @return  bool        
         */
        
    public function exist($key
    ) {
            return 
    file_exists($this->build_lock_id($key
    ));
        }

        
    /**
         * 获取一个信号
         *
         * @param   string      $key     
         * @param   int         $max_acquire    最大请求阻塞数量
         * @return mix 如果成功返回一个信号ID
         *
         */
        
    public function get($key$max_acquire=5
    ) {
            
    $fn $this->build_lock_id($key
    );
            if (isset(
    $this->lock_list[md5($fn)])) return false
    ;
            
    $id fopen($fn'a+'
    );
            if (
    $id$this->lock_list[md5($fn)] = array($id$fn
    );
            return 
    $id
    ;
        }

        
    /**
         * 释放一个信号
         *
         * @param   string      $key     
         * @return  bool        如果成功返回一个信号true
         *
         */
        
    public function remove($key
    ) {
            return 
    $this->release_lock($key
    );
        }

        
    /**
         * 获取一个信号
         *
         * @param string    $id         信号ID
         * @param bool      $block      是否阻塞
         */
        
    public function acquire($id$block=false
    ) {
            if (
    $block
    ) {
                return 
    flock($idLOCK_EX
    );
            } else {
                return 
    flock($idLOCK_EX LOCK_NB
    );
            }
        }

        
    /**
         * 释放一个信号
         *
         */
        
    public function release($id
    ) {
            
    flock($idLOCK_UN
    );
        }
        public function 
    run_task($process_name$param=null
    ) {
            
    $this->task_list[] = popen(DIR_PHP_EXEC ' -f ' DIR_MAIN_EXEC 
    ' -- ' 
                
    CMD_CHILD_PROCESS_NAME '=' $process_name 
    ' ' 
                
    CMD_CHILD_PROCESS_PARAM '="' base64_encode(serialize($param)) . 
    '" '
                
    CMD_MAIN_PROCESS_KEY '="' $this->lock[0] . '" '

                
    'r'
    );
        }

        public function 
    run($auto_run true
    ) {
            if (
    $this->is_main
    ) {
                
    $ps = &$this->task_list
    ;
                
    $max_run = &$this->max_run
    ;
                
    $id 0
    ;
                do {
                    
    //echo "process----------------------------------------: ";
                    
    $c 0
    ;
                    foreach (
    $ps as $k => $h
    ) {
                        
    $c
    ++;
                        
    $msg fread($h8000
    );
                        if (
    substr($msg, -54) === '@end'
    ) {
                            echo 
    "end process:[$k][$id] echo {$msg} "
    ;
                            
    $this->release_task($k
    );
                        } else {
                            echo 
    "process:[$k][$id] echo {$msg} "
    ;
                        }
                    }
                    
    sleep(1
    );
                } while (
    $auto_run && $id++ < $max_run && $c 0
    );
            } else {
                
    $this->run_child_process
    ();
            }
        }

        public function 
    set_max_run($max=1000
    ) {
            
    $this->max_run $max
    ;
        }
    }

    评论 {{userinfo.comments}}

    {{money}}

    {{question.question}}

    A {{question.A}}
    B {{question.B}}
    C {{question.C}}
    D {{question.D}}
    提交

    驱动号 更多