rkljw 4 hónapja
szülő
commit
b8cecb28bd

+ 73 - 0
app/Job/GatherExampleJob.php

@@ -0,0 +1,73 @@
+<?php
+
+declare(strict_types=1);
+
+namespace App\Job;
+
+use Hyperf\AsyncQueue\Job;
+use App\JsonRpc\CollectorService;
+use Hyperf\Coroutine\Exception\ParallelExecutionException;
+use Hyperf\Coroutine\Coroutine;
+use Hyperf\Coroutine\Parallel;
+/**
+ * @Job(name="import")
+ */
+class GatherExampleJob extends Job
+{
+    public $params;
+
+    /**
+     * 任务执行失败后的重试次数,即最大执行次数为 $maxAttempts+1 次
+     */
+    protected int $maxAttempts = 2;
+    public function __construct($params)
+    {
+        // 这里最好是普通数据,不要使用携带 IO 的对象,比如 PDO 对象
+        $this->params = $params;
+    }
+
+
+    public function handle()
+    {
+
+//        var_dump("消费进程");
+//        $parallel = new Parallel(15);
+        $collector = new CollectorService();
+//        $parallel->add(function () use ($collector){
+//            sleep(1);
+//
+            $collector->goCrawler($this->params);
+////            var_dump("消费消息队列:",$this->params,$result);
+//            $id = Coroutine::id();
+//            var_dump("开启携程:",$id);
+//            return $id;
+//        });
+//        try {
+//            $results = $parallel->wait();
+//            var_dump("协程返回数据:",$results);
+//        }catch (ParallelExecutionException  $e){
+//             var_dump($e->getResults());// 获取协程中的返回值。
+//             var_dump($e->getThrowables());// 获取协程中出现的异常。
+//        }
+    }
+    public function BeforeHandle()
+    {
+        var_dump("========BeforeHandle=========");
+    }
+    public function AfterHandle()
+    {
+        var_dump("========AfterHandle==========");
+    }
+    public function FailedHandle()
+    {
+        var_dump("=======FailedHandle=======");
+    }
+    public function RetryHandle()
+    {
+        var_dump("=========RetryHandle=======");
+    }
+    public function QueueLength()
+    {
+        var_dump("=====QueueLength==========");
+    }
+}

+ 34 - 17
app/JsonRpc/CollectorService.php

@@ -7,16 +7,20 @@ use App\Model\Article;
 use App\Model\Rule;
 use App\Model\Rule;
 use App\Model\Web;
 use App\Model\Web;
 use Hyperf\DbConnection\Db;
 use Hyperf\DbConnection\Db;
+use Hyperf\Di\Annotation\Inject;
 use Hyperf\RpcServer\Annotation\RpcService;
 use Hyperf\RpcServer\Annotation\RpcService;
 use App\Tools\Result;
 use App\Tools\Result;
 use QL\QueryList;
 use QL\QueryList;
 use Swoole\Coroutine;
 use Swoole\Coroutine;
+use App\Service\GatherQueueService;
+
 
 
 
 
 #[RpcService(name: "CollectorService", protocol: "jsonrpc-http", server: "jsonrpc-http")]
 #[RpcService(name: "CollectorService", protocol: "jsonrpc-http", server: "jsonrpc-http")]
 class CollectorService implements CollectorServiceInterface
 class CollectorService implements CollectorServiceInterface
 {
 {
-
+    #[Inject]
+    protected GatherQueueService $Gservice;
     /**
     /**
      * 添加网站
      * 添加网站
      * @param array $data
      * @param array $data
@@ -108,11 +112,23 @@ class CollectorService implements CollectorServiceInterface
         }
         }
         return Result::success($id);
         return Result::success($id);
     }
     }
+
     /**
     /**
+     * 发送数据
      * @param array $data
      * @param array $data
      * @return array
      * @return array
      */
      */
     public function sendCrawler(array $data): array
     public function sendCrawler(array $data): array
+    {
+       $result =  $this->Gservice->push($data,rand(5,20));
+       return  Result::success([$result]);
+    }
+
+    /**
+     * @param array $data
+     * @return array
+     */
+    public function goCrawler(array $data): array
     {
     {
         //通过规则id 查询规则类型
         //通过规则id 查询规则类型
         $where = [
         $where = [
@@ -125,10 +141,10 @@ class CollectorService implements CollectorServiceInterface
 
 
         switch ($info['web_type']){
         switch ($info['web_type']){
             case 1:
             case 1:
-                var_dump("wojinlailaile======",$info);
+                var_dump("===========规则采集======",$info);
                 Rule::where(['id'=>$data['id']])->update(['status'=>1]);
                 Rule::where(['id'=>$data['id']])->update(['status'=>1]);
                 $data['copyfrom'] = $info['web_name'];
                 $data['copyfrom'] = $info['web_name'];
-                $data['author'] = '刘德华';
+                $data['author'] = $info['writer'];;
                 $data['first_url'] = $info['first_url'];
                 $data['first_url'] = $info['first_url'];
                 $data['second_start'] = $info['second_start'];
                 $data['second_start'] = $info['second_start'];
                 $data['second_num'] = $info['second_num'];
                 $data['second_num'] = $info['second_num'];
@@ -140,7 +156,7 @@ class CollectorService implements CollectorServiceInterface
                 $urlList = $this->addUrlArr($data);
                 $urlList = $this->addUrlArr($data);
                 if($urlList){
                 if($urlList){
                     foreach ($urlList as $val){
                     foreach ($urlList as $val){
-                        var_dump("单列表地址:",$val);
+//                        var_dump("单列表地址:",$val);
                         $this->ruleCollection($val,$data);
                         $this->ruleCollection($val,$data);
                     }
                     }
                 }
                 }
@@ -158,9 +174,10 @@ class CollectorService implements CollectorServiceInterface
                     'web_url'=>$info['web_url'],
                     'web_url'=>$info['web_url'],
                     'copyfrom'=>$info['web_name'],
                     'copyfrom'=>$info['web_name'],
                     'admin_user_id'=>$data['admin_user_id'],
                     'admin_user_id'=>$data['admin_user_id'],
-                    'rule_id'=>$data['id']
+                    'rule_id'=>$data['id'],
+                    'writer'=>$info['writer'],
                 ];
                 ];
-                var_dump("开始调用接口方法====",$parames);
+                var_dump("=======开始接口采集====",$parames);
 //                die;
 //                die;
                 $this->foreachCurl($wecUrl,$parames,$other);
                 $this->foreachCurl($wecUrl,$parames,$other);
                 Rule::where(['id'=>$data['id']])->update(['status'=>2]);
                 Rule::where(['id'=>$data['id']])->update(['status'=>2]);
@@ -183,7 +200,7 @@ class CollectorService implements CollectorServiceInterface
                 $i++;
                 $i++;
                 $url = $data['second_start'].$i.$data['second_end'];
                 $url = $data['second_start'].$i.$data['second_end'];
                 $respon1 = Result::pageExists($url);
                 $respon1 = Result::pageExists($url);
-                var_dump("采集地址:",$respon1,$url);
+//                var_dump("采集地址:",$respon1,$url);
 //                Coroutine::sleep(2);
 //                Coroutine::sleep(2);
                 if ($i==intval($data['end_pagenum'])-1) {
                 if ($i==intval($data['end_pagenum'])-1) {
                     $exit = true;
                     $exit = true;
@@ -203,13 +220,13 @@ class CollectorService implements CollectorServiceInterface
      */
      */
     public function ruleCollection($url,$data)
     public function ruleCollection($url,$data)
     {
     {
-        var_dump("采集参数:",$data);
+//        var_dump("采集参数:",$data);
         $list = QueryList::get($url);
         $list = QueryList::get($url);
         $dataList = $list->rules([
         $dataList = $list->rules([
             'title' => ['a', 'text'],
             'title' => ['a', 'text'],
             'link'  => ['a', 'href'],
             'link'  => ['a', 'href'],
         ])->range('.list1 li')->query()->getData();
         ])->range('.list1 li')->query()->getData();
-        var_dump("采集的内容:",$dataList);
+//        var_dump("采集的内容:",$dataList);
 //        var_dump("====",$dataList);die;
 //        var_dump("====",$dataList);die;
         $firstUrlArr =  explode("/", $url);
         $firstUrlArr =  explode("/", $url);
         array_pop($firstUrlArr);
         array_pop($firstUrlArr);
@@ -227,10 +244,10 @@ class CollectorService implements CollectorServiceInterface
                 ])->range(".news-details")->query()->getData();
                 ])->range(".news-details")->query()->getData();
 
 
                 $detailData = $detailData->toArray();
                 $detailData = $detailData->toArray();
-                var_dump("内容详情:",$detailData,$newUrlStr);
+//                var_dump("内容详情:",$detailData,$newUrlStr);
                 if($detailData){
                 if($detailData){
                     foreach ($detailData as $val){
                     foreach ($detailData as $val){
-                        var_dump("进没进foreach:",$newUrlStr,$val);
+//                        var_dump("进没进foreach:",$newUrlStr,$val);
                         $data['fromurl'] = $newUrlStr;
                         $data['fromurl'] = $newUrlStr;
                         $data['title'] = $val['title'];
                         $data['title'] = $val['title'];
                         $data['content'] = $val['content'];
                         $data['content'] = $val['content'];
@@ -243,7 +260,7 @@ class CollectorService implements CollectorServiceInterface
                         $data['admin_user_id'] = $data['admin_user_id']??'';
                         $data['admin_user_id'] = $data['admin_user_id']??'';
                         $data['rule_id'] = $data['rule_id']??'';
                         $data['rule_id'] = $data['rule_id']??'';
 //                        $data['copyfrom'] = $data['copyfrom'];
 //                        $data['copyfrom'] = $data['copyfrom'];
-                        var_dump("要插入的数据:",$data);
+//                        var_dump("要插入的数据:",$data);
                         $this->insertArticleData($data);
                         $this->insertArticleData($data);
                     }
                     }
                 }
                 }
@@ -264,7 +281,7 @@ class CollectorService implements CollectorServiceInterface
             Db::beginTransaction();
             Db::beginTransaction();
             try{
             try{
                 $articleInfo =  Article::where(['title'=>$data['title']])->first();
                 $articleInfo =  Article::where(['title'=>$data['title']])->first();
-                var_dump("获取详情:",$articleInfo,$data);
+//                var_dump("获取详情:",$articleInfo,$data);
                 if(empty($articleInfo)){
                 if(empty($articleInfo)){
                     $insertData = [];
                     $insertData = [];
                     $insertData['fromurl'] =$data['newUrlStr'];
                     $insertData['fromurl'] =$data['newUrlStr'];
@@ -282,7 +299,7 @@ class CollectorService implements CollectorServiceInterface
                     $insertDataDetail = [];
                     $insertDataDetail = [];
                     $insertDataDetail['article_id'] = $article_id;
                     $insertDataDetail['article_id'] = $article_id;
                     $insertDataDetail['content'] = $data['content'];
                     $insertDataDetail['content'] = $data['content'];
-                    var_dump("插入ArticleData:",$insertDataDetail);
+//                    var_dump("插入ArticleData:",$insertDataDetail);
                     ArticleData::insertGetId($insertDataDetail);
                     ArticleData::insertGetId($insertDataDetail);
 //                        Coroutine::sleep(2);
 //                        Coroutine::sleep(2);
 //                    var_dump("插入成功一次:",$article_id,$insertDataDetail);
 //                    var_dump("插入成功一次:",$article_id,$insertDataDetail);
@@ -311,7 +328,7 @@ class CollectorService implements CollectorServiceInterface
         ];
         ];
         $result = Result::http_post($wecUrl,$parames,$options);
         $result = Result::http_post($wecUrl,$parames,$options);
         $result = json_decode($result['response'],true);
         $result = json_decode($result['response'],true);
-        var_dump("获取数据:",$result);
+//        var_dump("获取数据:",$result);
         if($result['data'] && $result['data']['results']){
         if($result['data'] && $result['data']['results']){
             $dataList  = $result['data']['results'];
             $dataList  = $result['data']['results'];
 //            var_dump("取数据结构体:",$dataList);
 //            var_dump("取数据结构体:",$dataList);
@@ -328,7 +345,7 @@ class CollectorService implements CollectorServiceInterface
                     'content'=>$val['source']['content']['content']??'',
                     'content'=>$val['source']['content']['content']??'',
                     'admin_user_id'=>$other['admin_user_id']??'',
                     'admin_user_id'=>$other['admin_user_id']??'',
                     'rule_id'=>$other['rule_id']??'',
                     'rule_id'=>$other['rule_id']??'',
-                    'author'=>'冯蕊'
+                    'author'=>$other['writer']??''
                 ];
                 ];
 //                var_dump("调用插入数据方法,组装数据:",$insertData);
 //                var_dump("调用插入数据方法,组装数据:",$insertData);
                 $this->insertArticleData($insertData);
                 $this->insertArticleData($insertData);
@@ -339,7 +356,7 @@ class CollectorService implements CollectorServiceInterface
         $parames['current'] = $pages;
         $parames['current'] = $pages;
         $twoResult = Result::http_post($wecUrl,$parames,$options);
         $twoResult = Result::http_post($wecUrl,$parames,$options);
         if($result['data'] && $result['data']['results'] && count($result['data']['results'])>0){
         if($result['data'] && $result['data']['results'] && count($result['data']['results'])>0){
-            var_dump("分页测试:",$parames,$parames['current']);
+//            var_dump("分页测试:",$parames,$parames['current']);
             $this->foreachCurl($wecUrl,$parames,$other,$pages);
             $this->foreachCurl($wecUrl,$parames,$other,$pages);
         }
         }
 //        var_dump("正确的数据:",$result);
 //        var_dump("正确的数据:",$result);

+ 1 - 0
app/JsonRpc/CollectorServiceInterface.php

@@ -29,6 +29,7 @@ interface CollectorServiceInterface
      */
      */
     public function sendCrawler(array $data): array;
     public function sendCrawler(array $data): array;
 
 
+
 }
 }
 
 
 
 

+ 34 - 0
app/Service/GatherQueueService.php

@@ -0,0 +1,34 @@
+<?php
+
+declare(strict_types=1);
+
+namespace App\Service;
+
+use App\Job\GatherExampleJob;
+use Hyperf\AsyncQueue\Driver\DriverFactory;
+use Hyperf\AsyncQueue\Driver\DriverInterface;
+/**
+ * @Job(name="default")
+ */
+class GatherQueueService
+{
+    protected DriverInterface $driver;
+
+    public function __construct(DriverFactory $driverFactory)
+    {
+        $this->driver = $driverFactory->get('default');
+    }
+
+    /**
+     * 生产消息.
+     * @param $params 数据
+     * @param int $delay 延时时间 单位秒
+     */
+    public function push($params, int $delay = 0): bool
+    {
+        // 这里的 `ExampleJob` 会被序列化存到 Redis 中,所以内部变量最好只传入普通数据
+        // 同理,如果内部使用了注解 @Value 会把对应对象一起序列化,导致消息体变大。
+        // 所以这里也不推荐使用 `make` 方法来创建 `Job` 对象。
+        return $this->driver->push(new GatherExampleJob($params), $delay);
+    }
+}

+ 5 - 5
config/autoload/async_queue.php

@@ -17,13 +17,13 @@ return [
         'redis' => [
         'redis' => [
             'pool' => 'default',
             'pool' => 'default',
         ],
         ],
-        'channel' => '{queue}',
-        'timeout' => 2,
-        'retry_seconds' => 5,
-        'handle_timeout' => 10,
+        'channel' => '{gather}',
+        'timeout' => 2000,
+        'retry_seconds' => [1,5,10,20],
+        'handle_timeout' => 10000,
         'processes' => 1,
         'processes' => 1,
         'concurrent' => [
         'concurrent' => [
-            'limit' => 10,
+            'limit' => 15,
         ],
         ],
         'max_messages' => 0,
         'max_messages' => 0,
     ],
     ],

+ 1 - 1
config/autoload/processes.php

@@ -10,5 +10,5 @@ declare(strict_types=1);
  * @license  https://github.com/hyperf/hyperf/blob/master/LICENSE
  * @license  https://github.com/hyperf/hyperf/blob/master/LICENSE
  */
  */
 return [
 return [
-//    Hyperf\AsyncQueue\Process\ConsumerProcess::class, //异步消费进程
+    Hyperf\AsyncQueue\Process\ConsumerProcess::class, //异步消费进程
 ];
 ];

+ 9 - 6
config/autoload/redis.php

@@ -13,17 +13,20 @@ use function Hyperf\Support\env;
 
 
 return [
 return [
     'default' => [
     'default' => [
-        'host' => env('REDIS_HOST', 'localhost'),
-        'auth' => env('REDIS_AUTH', null),
+        'host' => env('REDIS_HOST', '127.0.0.1'),
+        'auth' => env('REDIS_AUTH', '123456'),
         'port' => (int) env('REDIS_PORT', 6379),
         'port' => (int) env('REDIS_PORT', 6379),
         'db' => (int) env('REDIS_DB', 0),
         'db' => (int) env('REDIS_DB', 0),
         'pool' => [
         'pool' => [
             'min_connections' => 1,
             'min_connections' => 1,
-            'max_connections' => 10,
-            'connect_timeout' => 10.0,
-            'wait_timeout' => 3.0,
+            'max_connections' => 100,
+            'connect_timeout' => 1000.0,
+            'wait_timeout' => 30000.0,
             'heartbeat' => -1,
             'heartbeat' => -1,
-            'max_idle_time' => (float) env('REDIS_MAX_IDLE_TIME', 60),
+            'max_idle_time' => (float) env('REDIS_MAX_IDLE_TIME', 600),
+            'max_attempts' => 5,
         ],
         ],
+
     ],
     ],
+
 ];
 ];

A különbségek nem kerülnek megjelenítésre, a fájl túl nagy
+ 0 - 0
runtime/container/classes.cache


A különbségek nem kerülnek megjelenítésre, a fájl túl nagy
+ 0 - 0
runtime/container/scan.cache


+ 1 - 1
runtime/hyperf.pid

@@ -1 +1 @@
-59669
+6759

A különbségek nem kerülnek megjelenítésre, a fájl túl nagy
+ 722 - 11449
runtime/logs/hyperf.log


Nem az összes módosított fájl került megjelenítésre, mert túl sok fájl változott