rkljw vor 4 Monaten
Ursprung
Commit
e3826a2880

+ 25 - 0
app/Amqp/Consumer/GatherConsumer.php

@@ -0,0 +1,25 @@
+<?php
+
+declare(strict_types=1);
+
+namespace App\Amqp\Consumer;
+
+use Hyperf\Amqp\Result;
+use Hyperf\Amqp\Annotation\Consumer;
+use Hyperf\Amqp\Message\ConsumerMessage;
+use PhpAmqpLib\Message\AMQPMessage;
+use App\JsonRpc\CollectorService;
+#[Consumer(exchange: 'gather', routingKey: 'gather', queue: 'gather', name: "GatherConsumer", nums: 1, enable: true)]
+class GatherConsumer extends ConsumerMessage
+{
+    public function consumeMessage($data, AMQPMessage $message): Result
+    {
+        $collectorService = new CollectorService();
+        $collectorService->goCrawler($data);
+        return Result::ACK;
+    }
+    public function isEnable(): bool
+    {
+        return parent::isEnable();
+    }
+}

+ 17 - 0
app/Amqp/Producer/GatherProducer.php

@@ -0,0 +1,17 @@
+<?php
+
+declare(strict_types=1);
+
+namespace App\Amqp\Producer;
+
+use Hyperf\Amqp\Annotation\Producer;
+use Hyperf\Amqp\Message\ProducerMessage;
+
+#[Producer(exchange: 'gather', routingKey: 'gather')]
+class GatherProducer extends ProducerMessage
+{
+    public function __construct($data)
+    {
+        $this->payload = $data;
+    }
+}

+ 3 - 40
app/Job/GatherExampleJob.php

@@ -19,7 +19,7 @@ class GatherExampleJob extends Job
     /**
      * 任务执行失败后的重试次数,即最大执行次数为 $maxAttempts+1 次
      */
-    protected int $maxAttempts = 2;
+    protected int $maxAttempts = 20;
     public function __construct($params)
     {
         // 这里最好是普通数据,不要使用携带 IO 的对象,比如 PDO 对象
@@ -29,45 +29,8 @@ class GatherExampleJob extends Job
 
     public function handle()
     {
-
-//        var_dump("消费进程");
-//        $parallel = new Parallel(15);
         $collector = new CollectorService();
-//        $parallel->add(function () use ($collector){
-//            sleep(1);
-//
-            $collector->goCrawler($this->params);
-////            var_dump("消费消息队列:",$this->params,$result);
-//            $id = Coroutine::id();
-//            var_dump("开启携程:",$id);
-//            return $id;
-//        });
-//        try {
-//            $results = $parallel->wait();
-//            var_dump("协程返回数据:",$results);
-//        }catch (ParallelExecutionException  $e){
-//             var_dump($e->getResults());// 获取协程中的返回值。
-//             var_dump($e->getThrowables());// 获取协程中出现的异常。
-//        }
-    }
-    public function BeforeHandle()
-    {
-        var_dump("========BeforeHandle=========");
-    }
-    public function AfterHandle()
-    {
-        var_dump("========AfterHandle==========");
-    }
-    public function FailedHandle()
-    {
-        var_dump("=======FailedHandle=======");
-    }
-    public function RetryHandle()
-    {
-        var_dump("=========RetryHandle=======");
-    }
-    public function QueueLength()
-    {
-        var_dump("=====QueueLength==========");
+        $collector->goCrawler($this->params);
     }
+
 }

+ 175 - 3
app/JsonRpc/CollectorService.php

@@ -1,11 +1,14 @@
 <?php
 namespace App\JsonRpc;
 
+use App\Amqp\Producer\GatherProducer;
 use App\Model\ArticleData;
 use App\Model\OldModel\Article as OldArticle;
 use App\Model\Article;
 use App\Model\Rule;
 use App\Model\Web;
+use Hyperf\Amqp\Producer;
+use Hyperf\Context\ApplicationContext as ContextApplicationContext;
 use Hyperf\DbConnection\Db;
 use Hyperf\Di\Annotation\Inject;
 use Hyperf\RpcServer\Annotation\RpcService;
@@ -114,14 +117,183 @@ class CollectorService implements CollectorServiceInterface
     }
 
     /**
-     * 发送数据
+     * 添加任务规则
+     * @param array $data
+     * @return array|mixed
+     */
+    public function addRule(array $data): array
+    {
+        $web = Web::where('id',$data['web_id'])->get();
+        if(empty($web->toArray())){
+            return Result::error('请输入正确的网站id!');
+            
+        }else{
+            $rulename = Rule::where('name',$data['name'])->get();
+            //查找是否存在规则名称重复的
+            if(empty($rulename->toArray())){
+                //(若是多类型参数一起传过来则根据类型,只获取对应类型需要的参数)
+                switch($data['type']){
+                    case 1:
+                        $rule = [
+                            'name' => $data['name'],
+                            'web_id' => $data['web_id'],
+                            'first_url' => $data['first_url'],
+                            'second_start' => $data['second_start'],
+                            'second_num' => $data['second_num'],
+                            'second_end' => $data['second_end'],
+                            'end_pagenum' => $data['end_pagenum'],
+                            'start' => $data['start'],
+                            'title' => $data['title'],
+                            'content' => $data['content']
+                        ];
+                        // var_dump("============1============");       
+                        break;         
+                    case 2:
+                        $rule = [
+                            'name' => $data['name'],
+                            'web_id' => $data['web_id'],
+                            'first_url' => $data['first_url'],
+                            'parameter' => $data['parameter'],
+                            'start' => $data['start'],
+                            'title' => $data['title'],
+                            'content' => $data['content']
+                        ];  
+                        // var_dump("============2============");       
+                        break;        
+                    default:
+                        $rule = [
+                            'name' => $data['name'],
+                            'web_id' => $data['web_id'],
+                            'diy_rule' => $data['diy_rule']
+                        ];        
+                        // var_dump("============3============");       
+                        break;              
+                }
+                if(!empty($data['source']) && $data['type'] != 3){
+
+                    $rule ['source'] = $data['source'];
+                }
+                if(isset($data['writer_class']) && $data['type'] != 3){
+                    $rule ['writer_class'] = $data['writer_class'];
+                }
+                if(isset($data['writer']) && $data['type'] != 3){
+                    $rule ['writer'] = $data['writer'];
+                }
+                date_default_timezone_set('Asia/Shanghai');
+                //若不存在,根据网站类型添加到不行类型的规则表中
+                $result = Rule::insertGetId($rule);
+                
+            }else{
+                return Result::error('此任务已存在!');
+            } 
+             
+        }
+        return Result::success($result);
+    }
+    /**
+     * 获取并搜索规则任务
+     * @param array $data
+     * @return array|mixed
+     */
+
+    public function getRule(array $data): array
+    {
+
+        $where = [];
+        
+        if(isset($data['web_id'])){
+            $web = Web::where('id',$data['web_id'])->get();
+            if(empty($web->toArray())){
+                return Result::error('请输入正确的网站id!');
+                
+            }else{
+                //若是根据网站跳转到的规则任务则存到$where数组中
+                $where = [
+                    ['web_id','=', $data['web_id']]
+                ];
+            }
+        } 
+
+        if(isset($data['keyWord'])){
+            //若存在搜索词,则存到条件数组$where中
+            $where = [
+                ['name','like','%'.$data['keyWord'].'%']
+            ];
+        }
+        if(empty($where)){
+            $rep = Rule::withCount(relations:'arts')->limit($data['pageSize'])->orderBy("created_at","desc")->offset(($data['page']-1)*$data['pageSize'])->get();
+        }else{
+            $rep = Rule::withCount(relations:'arts')->where($where)->limit($data['pageSize'])->orderBy("created_at","desc")->offset(($data['page']-1)*$data['pageSize'])->get();
+        }
+        
+        $count = Rule::where($where)->count();
+        if($count==0){
+            return Result::error('暂无相关规则任务!');
+        }       
+        
+        $data = [
+            'rep' => $rep->toArray(),
+            'count' => $count
+        ];
+
+        return Result::success($data);
+
+    }
+    
+
+     /**
+     * 获取某个任务规则
+     * @param array $data
+     * @return array|mixed
+     */
+    public function getOneRule(array $data): array
+    {
+        $result = Rule::where('id',$data['id'])->first();
+        if(empty($result)){
+            return Result::error('请输入正确的规则任务id!');
+            
+        }else{
+            return Result::success($result);
+        } 
+
+    }
+    /**
+     * 修改规则任务
+     * @param array $data
+     * @return array|mixed
+     */
+    public function upRule(array $data): array
+    {
+        $rule = Rule::where('id',$data['id'])->select('id')->first();
+        unset($data['type']);
+        if(empty($rule)){
+            return Result::error('请输入正确的规则任务id!');
+            
+        }else{
+            $rulename = Rule::where('id','!=',$rule['id'])->where('name',$data['name'])->select('name')->first();
+            if(empty($rulename)){
+                $result = Rule::where('id',$data['id'])->update($data);
+            }else{
+                return Result::error('已存在此任务规则名称!');
+            }
+        }
+       
+        return Result::success($result);
+    }
+    /**
+     * 删除规则任务
      * @param array $data
      * @return array
      */
     public function sendCrawler(array $data): array
     {
-       $result =  $this->Gservice->push($data,rand(5,20));
-       return  Result::success([$result]);
+        var_dump("接收到的数据:",$data);
+        $message = new GatherProducer($data);
+        $producer = ContextApplicationContext::getContainer()->get(Producer::class);
+        $a = $producer->produce($message);
+        var_dump("生产者:",$a);
+//       $result =  $this->Gservice->push($data,rand(5,20));
+       return  Result::success([]);
     }
 
     /**

+ 1 - 0
composer.json

@@ -14,6 +14,7 @@
     "require": {
         "php": ">=8.1",
         "doctrine/annotations": "^2.0",
+        "hyperf/amqp": "^3.1",
         "hyperf/async-queue": "^3.1",
         "hyperf/cache": "~3.1.0",
         "hyperf/command": "~3.1.0",

+ 385 - 1
composer.lock

@@ -4,7 +4,7 @@
         "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
         "This file is @generated automatically"
     ],
-    "content-hash": "b6123e07a5d46021d034b563b38afc91",
+    "content-hash": "239ee8acadf3c91412a44beeca7f9a96",
     "packages": [
         {
             "name": "carbonphp/carbon-doctrine-types",
@@ -832,6 +832,82 @@
             ],
             "time": "2024-07-18T11:15:46+00:00"
         },
+        {
+            "name": "hyperf/amqp",
+            "version": "v3.1.42",
+            "source": {
+                "type": "git",
+                "url": "https://github.com/hyperf/amqp.git",
+                "reference": "45f1c42c84af67668040db936e11b1e64e2531d7"
+            },
+            "dist": {
+                "type": "zip",
+                "url": "https://api.github.com/repos/hyperf/amqp/zipball/45f1c42c84af67668040db936e11b1e64e2531d7",
+                "reference": "45f1c42c84af67668040db936e11b1e64e2531d7",
+                "shasum": ""
+            },
+            "require": {
+                "doctrine/instantiator": "^1.2.0",
+                "hyperf/codec": "~3.1.0",
+                "hyperf/contract": "~3.1.0",
+                "hyperf/coroutine": "~3.1.0",
+                "hyperf/pool": "~3.1.0",
+                "hyperf/process": "~3.1.0",
+                "hyperf/support": "~3.1.0",
+                "hyperf/utils": "~3.1.0",
+                "php": ">=8.1",
+                "php-amqplib/php-amqplib": "^3.5",
+                "psr/container": "^1.0 || ^2.0",
+                "psr/event-dispatcher": "^1.0",
+                "psr/log": "^1.0 || ^2.0 || ^3.0"
+            },
+            "suggest": {
+                "hyperf/di": "Required to use annotations.",
+                "hyperf/event": "Declare queue and start consumers automatically."
+            },
+            "type": "library",
+            "extra": {
+                "branch-alias": {
+                    "dev-master": "3.1-dev"
+                },
+                "hyperf": {
+                    "config": "Hyperf\\Amqp\\ConfigProvider"
+                }
+            },
+            "autoload": {
+                "psr-4": {
+                    "Hyperf\\Amqp\\": "src/"
+                }
+            },
+            "notification-url": "https://packagist.org/downloads/",
+            "license": [
+                "MIT"
+            ],
+            "description": "A amqplib for hyperf.",
+            "homepage": "https://hyperf.io",
+            "keywords": [
+                "AMQP",
+                "hyperf",
+                "php"
+            ],
+            "support": {
+                "docs": "https://hyperf.wiki",
+                "issues": "https://github.com/hyperf/hyperf/issues",
+                "pull-request": "https://github.com/hyperf/hyperf/pulls",
+                "source": "https://github.com/hyperf/hyperf"
+            },
+            "funding": [
+                {
+                    "url": "https://hyperf.wiki/#/zh-cn/donate",
+                    "type": "custom"
+                },
+                {
+                    "url": "https://opencollective.com/hyperf",
+                    "type": "open_collective"
+                }
+            ],
+            "time": "2024-09-25T02:54:12+00:00"
+        },
         {
             "name": "hyperf/async-queue",
             "version": "v3.1.42",
@@ -4998,6 +5074,204 @@
             },
             "time": "2024-09-29T15:01:53+00:00"
         },
+        {
+            "name": "paragonie/constant_time_encoding",
+            "version": "v3.0.0",
+            "source": {
+                "type": "git",
+                "url": "https://github.com/paragonie/constant_time_encoding.git",
+                "reference": "df1e7fde177501eee2037dd159cf04f5f301a512"
+            },
+            "dist": {
+                "type": "zip",
+                "url": "https://api.github.com/repos/paragonie/constant_time_encoding/zipball/df1e7fde177501eee2037dd159cf04f5f301a512",
+                "reference": "df1e7fde177501eee2037dd159cf04f5f301a512",
+                "shasum": ""
+            },
+            "require": {
+                "php": "^8"
+            },
+            "require-dev": {
+                "phpunit/phpunit": "^9",
+                "vimeo/psalm": "^4|^5"
+            },
+            "type": "library",
+            "autoload": {
+                "psr-4": {
+                    "ParagonIE\\ConstantTime\\": "src/"
+                }
+            },
+            "notification-url": "https://packagist.org/downloads/",
+            "license": [
+                "MIT"
+            ],
+            "authors": [
+                {
+                    "name": "Paragon Initiative Enterprises",
+                    "email": "security@paragonie.com",
+                    "homepage": "https://paragonie.com",
+                    "role": "Maintainer"
+                },
+                {
+                    "name": "Steve 'Sc00bz' Thomas",
+                    "email": "steve@tobtu.com",
+                    "homepage": "https://www.tobtu.com",
+                    "role": "Original Developer"
+                }
+            ],
+            "description": "Constant-time Implementations of RFC 4648 Encoding (Base-64, Base-32, Base-16)",
+            "keywords": [
+                "base16",
+                "base32",
+                "base32_decode",
+                "base32_encode",
+                "base64",
+                "base64_decode",
+                "base64_encode",
+                "bin2hex",
+                "encoding",
+                "hex",
+                "hex2bin",
+                "rfc4648"
+            ],
+            "support": {
+                "email": "info@paragonie.com",
+                "issues": "https://github.com/paragonie/constant_time_encoding/issues",
+                "source": "https://github.com/paragonie/constant_time_encoding"
+            },
+            "time": "2024-05-08T12:36:18+00:00"
+        },
+        {
+            "name": "paragonie/random_compat",
+            "version": "v9.99.100",
+            "source": {
+                "type": "git",
+                "url": "https://github.com/paragonie/random_compat.git",
+                "reference": "996434e5492cb4c3edcb9168db6fbb1359ef965a"
+            },
+            "dist": {
+                "type": "zip",
+                "url": "https://api.github.com/repos/paragonie/random_compat/zipball/996434e5492cb4c3edcb9168db6fbb1359ef965a",
+                "reference": "996434e5492cb4c3edcb9168db6fbb1359ef965a",
+                "shasum": ""
+            },
+            "require": {
+                "php": ">= 7"
+            },
+            "require-dev": {
+                "phpunit/phpunit": "4.*|5.*",
+                "vimeo/psalm": "^1"
+            },
+            "suggest": {
+                "ext-libsodium": "Provides a modern crypto API that can be used to generate random bytes."
+            },
+            "type": "library",
+            "notification-url": "https://packagist.org/downloads/",
+            "license": [
+                "MIT"
+            ],
+            "authors": [
+                {
+                    "name": "Paragon Initiative Enterprises",
+                    "email": "security@paragonie.com",
+                    "homepage": "https://paragonie.com"
+                }
+            ],
+            "description": "PHP 5.x polyfill for random_bytes() and random_int() from PHP 7",
+            "keywords": [
+                "csprng",
+                "polyfill",
+                "pseudorandom",
+                "random"
+            ],
+            "support": {
+                "email": "info@paragonie.com",
+                "issues": "https://github.com/paragonie/random_compat/issues",
+                "source": "https://github.com/paragonie/random_compat"
+            },
+            "time": "2020-10-15T08:29:30+00:00"
+        },
+        {
+            "name": "php-amqplib/php-amqplib",
+            "version": "v3.7.2",
+            "source": {
+                "type": "git",
+                "url": "https://github.com/php-amqplib/php-amqplib.git",
+                "reference": "738a73eb0019b6c99d9bc25d7a0c0dd8f56a5199"
+            },
+            "dist": {
+                "type": "zip",
+                "url": "https://api.github.com/repos/php-amqplib/php-amqplib/zipball/738a73eb0019b6c99d9bc25d7a0c0dd8f56a5199",
+                "reference": "738a73eb0019b6c99d9bc25d7a0c0dd8f56a5199",
+                "shasum": ""
+            },
+            "require": {
+                "ext-mbstring": "*",
+                "ext-sockets": "*",
+                "php": "^7.2||^8.0",
+                "phpseclib/phpseclib": "^2.0|^3.0"
+            },
+            "conflict": {
+                "php": "7.4.0 - 7.4.1"
+            },
+            "replace": {
+                "videlalvaro/php-amqplib": "self.version"
+            },
+            "require-dev": {
+                "ext-curl": "*",
+                "nategood/httpful": "^0.2.20",
+                "phpunit/phpunit": "^7.5|^9.5",
+                "squizlabs/php_codesniffer": "^3.6"
+            },
+            "type": "library",
+            "extra": {
+                "branch-alias": {
+                    "dev-master": "3.0-dev"
+                }
+            },
+            "autoload": {
+                "psr-4": {
+                    "PhpAmqpLib\\": "PhpAmqpLib/"
+                }
+            },
+            "notification-url": "https://packagist.org/downloads/",
+            "license": [
+                "LGPL-2.1-or-later"
+            ],
+            "authors": [
+                {
+                    "name": "Alvaro Videla",
+                    "role": "Original Maintainer"
+                },
+                {
+                    "name": "Raúl Araya",
+                    "email": "nubeiro@gmail.com",
+                    "role": "Maintainer"
+                },
+                {
+                    "name": "Luke Bakken",
+                    "email": "luke@bakken.io",
+                    "role": "Maintainer"
+                },
+                {
+                    "name": "Ramūnas Dronga",
+                    "email": "github@ramuno.lt",
+                    "role": "Maintainer"
+                }
+            ],
+            "description": "Formerly videlalvaro/php-amqplib.  This library is a pure PHP implementation of the AMQP protocol. It's been tested against RabbitMQ.",
+            "homepage": "https://github.com/php-amqplib/php-amqplib/",
+            "keywords": [
+                "message",
+                "queue",
+                "rabbitmq"
+            ],
+            "support": {
+                "issues": "https://github.com/php-amqplib/php-amqplib/issues",
+                "source": "https://github.com/php-amqplib/php-amqplib/tree/v3.7.2"
+            },
+            "time": "2024-11-21T09:21:41+00:00"
+        },
         {
             "name": "php-di/phpdoc-reader",
             "version": "2.2.1",
@@ -5115,6 +5389,116 @@
             ],
             "time": "2024-07-20T21:41:07+00:00"
         },
+        {
+            "name": "phpseclib/phpseclib",
+            "version": "3.0.42",
+            "source": {
+                "type": "git",
+                "url": "https://github.com/phpseclib/phpseclib.git",
+                "reference": "db92f1b1987b12b13f248fe76c3a52cadb67bb98"
+            },
+            "dist": {
+                "type": "zip",
+                "url": "https://api.github.com/repos/phpseclib/phpseclib/zipball/db92f1b1987b12b13f248fe76c3a52cadb67bb98",
+                "reference": "db92f1b1987b12b13f248fe76c3a52cadb67bb98",
+                "shasum": ""
+            },
+            "require": {
+                "paragonie/constant_time_encoding": "^1|^2|^3",
+                "paragonie/random_compat": "^1.4|^2.0|^9.99.99",
+                "php": ">=5.6.1"
+            },
+            "require-dev": {
+                "phpunit/phpunit": "*"
+            },
+            "suggest": {
+                "ext-dom": "Install the DOM extension to load XML formatted public keys.",
+                "ext-gmp": "Install the GMP (GNU Multiple Precision) extension in order to speed up arbitrary precision integer arithmetic operations.",
+                "ext-libsodium": "SSH2/SFTP can make use of some algorithms provided by the libsodium-php extension.",
+                "ext-mcrypt": "Install the Mcrypt extension in order to speed up a few other cryptographic operations.",
+                "ext-openssl": "Install the OpenSSL extension in order to speed up a wide variety of cryptographic operations."
+            },
+            "type": "library",
+            "autoload": {
+                "files": [
+                    "phpseclib/bootstrap.php"
+                ],
+                "psr-4": {
+                    "phpseclib3\\": "phpseclib/"
+                }
+            },
+            "notification-url": "https://packagist.org/downloads/",
+            "license": [
+                "MIT"
+            ],
+            "authors": [
+                {
+                    "name": "Jim Wigginton",
+                    "email": "terrafrost@php.net",
+                    "role": "Lead Developer"
+                },
+                {
+                    "name": "Patrick Monnerat",
+                    "email": "pm@datasphere.ch",
+                    "role": "Developer"
+                },
+                {
+                    "name": "Andreas Fischer",
+                    "email": "bantu@phpbb.com",
+                    "role": "Developer"
+                },
+                {
+                    "name": "Hans-Jürgen Petrich",
+                    "email": "petrich@tronic-media.com",
+                    "role": "Developer"
+                },
+                {
+                    "name": "Graham Campbell",
+                    "email": "graham@alt-three.com",
+                    "role": "Developer"
+                }
+            ],
+            "description": "PHP Secure Communications Library - Pure-PHP implementations of RSA, AES, SSH2, SFTP, X.509 etc.",
+            "homepage": "http://phpseclib.sourceforge.net",
+            "keywords": [
+                "BigInteger",
+                "aes",
+                "asn.1",
+                "asn1",
+                "blowfish",
+                "crypto",
+                "cryptography",
+                "encryption",
+                "rsa",
+                "security",
+                "sftp",
+                "signature",
+                "signing",
+                "ssh",
+                "twofish",
+                "x.509",
+                "x509"
+            ],
+            "support": {
+                "issues": "https://github.com/phpseclib/phpseclib/issues",
+                "source": "https://github.com/phpseclib/phpseclib/tree/3.0.42"
+            },
+            "funding": [
+                {
+                    "url": "https://github.com/terrafrost",
+                    "type": "github"
+                },
+                {
+                    "url": "https://www.patreon.com/phpseclib",
+                    "type": "patreon"
+                },
+                {
+                    "url": "https://tidelift.com/funding/github/packagist/phpseclib/phpseclib",
+                    "type": "tidelift"
+                }
+            ],
+            "time": "2024-09-16T03:06:04+00:00"
+        },
         {
             "name": "psr/cache",
             "version": "3.0.0",

+ 37 - 0
config/autoload/amqp.php

@@ -0,0 +1,37 @@
+<?php
+
+declare(strict_types=1);
+use function Hyperf\Support\env;
+return [
+    'enable' => true,
+    'default' => [
+        'host' =>  env('AMQP_HOST', '192.168.1.201'),
+        'port' =>  (int)env('AMQP_PORT', 5672),
+        'user' =>  env('AMQP_USER', 'admin'),
+        'password' =>  env('AMQP_PASSWORD', '123456'),
+        'vhost' => '/',
+        'concurrent' => [
+            'limit' => 1,
+        ],
+        'pool' => [
+            'connections' => 1,
+        ],
+        'params' => [
+            'insist' => false,
+            'login_method' => 'AMQPLAIN',
+            'login_response' => null,
+            'locale' => 'en_US',
+            'connection_timeout' => 300.0,
+            // 尽量保持是 heartbeat 数值的两倍
+            'read_write_timeout' => 600.0,
+            'context' => null,
+            'keepalive' => false,
+            // 尽量保证每个消息的消费时间小于心跳时间
+            'heartbeat' => 3,
+            'close_on_destruct' => false,
+        ],
+    ],
+    'pool2' => [
+
+    ]
+];

+ 1 - 1
config/autoload/async_queue.php

@@ -17,7 +17,7 @@ return [
         'redis' => [
             'pool' => 'default',
         ],
-        'channel' => '{gather}',
+        'channel' => 'gather',
         'timeout' => 2,
         'retry_seconds' => 5,
         'handle_timeout' => 10,

+ 1 - 0
config/autoload/processes.php

@@ -11,4 +11,5 @@ declare(strict_types=1);
  */
 return [
     Hyperf\AsyncQueue\Process\ConsumerProcess::class, //异步消费进程
+    Hyperf\AsyncQueue\Listener\QueueLengthListener::class
 ];

Datei-Diff unterdrückt, da er zu groß ist
+ 0 - 0
runtime/container/classes.cache


Datei-Diff unterdrückt, da er zu groß ist
+ 0 - 0
runtime/container/scan.cache


+ 1 - 0
runtime/hyperf.pid

@@ -0,0 +1 @@
+69072

Datei-Diff unterdrückt, da er zu groß ist
+ 0 - 781
runtime/logs/hyperf.log


Einige Dateien werden nicht angezeigt, da zu viele Dateien in diesem Diff geändert wurden.