rkljw 4 місяців тому
батько
коміт
32a7466ce9

+ 25 - 0
app/Amqp/Consumer/GatherConsumer.php

@@ -0,0 +1,25 @@
+<?php
+
+declare(strict_types=1);
+
+namespace App\Amqp\Consumer;
+
+use Hyperf\Amqp\Result;
+use Hyperf\Amqp\Annotation\Consumer;
+use Hyperf\Amqp\Message\ConsumerMessage;
+use PhpAmqpLib\Message\AMQPMessage;
+use App\JsonRpc\CollectorService;
+#[Consumer(exchange: 'gather', routingKey: 'gather', queue: 'gather', name: "GatherConsumer", nums: 1, enable: true)]
+class GatherConsumer extends ConsumerMessage
+{
+    public function consumeMessage($data, AMQPMessage $message): Result
+    {
+        $collectorService = new CollectorService();
+        $collectorService->goCrawler($data);
+        return Result::ACK;
+    }
+    public function isEnable(): bool
+    {
+        return parent::isEnable();
+    }
+}

+ 30 - 0
app/Amqp/Consumer/ImportConsumer.php

@@ -0,0 +1,30 @@
+<?php
+
+declare(strict_types=1);
+
+namespace App\Amqp\Consumer;
+
+use App\JsonRpc\CollectorService;
+use Hyperf\Amqp\Result;
+use Hyperf\Amqp\Annotation\Consumer;
+use Hyperf\Amqp\Message\ConsumerMessage;
+use PhpAmqpLib\Message\AMQPMessage;
+
+#[Consumer(exchange: 'import', routingKey: 'import', queue: 'import', name: "ImportConsumer", nums: 1, enable: true)]
+class ImportConsumer extends ConsumerMessage
+{
+    public function consumeMessage($data, AMQPMessage $message): Result
+    {
+
+        $collector = new CollectorService();
+        var_dump('文章---------------------',$data);
+        $result = $collector->goAddArt($data);
+        
+
+        return Result::ACK;
+    }
+    public function isEnable(): bool
+    {
+        return parent::isEnable();
+    }
+}

+ 17 - 0
app/Amqp/Producer/GatherProducer.php

@@ -0,0 +1,17 @@
+<?php
+
+declare(strict_types=1);
+
+namespace App\Amqp\Producer;
+
+use Hyperf\Amqp\Annotation\Producer;
+use Hyperf\Amqp\Message\ProducerMessage;
+
+#[Producer(exchange: 'gather', routingKey: 'gather')]
+class GatherProducer extends ProducerMessage
+{
+    public function __construct($data)
+    {
+        $this->payload = $data;
+    }
+}

+ 17 - 0
app/Amqp/Producer/ImportProducer.php

@@ -0,0 +1,17 @@
+<?php
+
+declare(strict_types=1);
+
+namespace App\Amqp\Producer;
+
+use Hyperf\Amqp\Annotation\Producer;
+use Hyperf\Amqp\Message\ProducerMessage;
+
+#[Producer(exchange: 'import', routingKey: 'import')]
+class ImportProducer extends ProducerMessage
+{
+    public function __construct($data)
+    {
+        $this->payload = $data;
+    }
+}

+ 338 - 35
app/JsonRpc/CollectorService.php

@@ -1,22 +1,33 @@
 <?php
 namespace App\JsonRpc;
 
+use App\Amqp\Producer\GatherProducer;
+use App\Model\ArticleData;
 use App\Model\OldModel\Article as OldArticle;
 use App\Model\OldModel\ArticleData as OldArticleData;
 use App\Model\OldModel\Category;
 use App\Model\Article;
-use App\Model\Web;
 use App\Model\Rule;
-use App\Model\ArticleData;
+use App\Model\Web;
+use Hyperf\Amqp\Producer;
+use Hyperf\Context\ApplicationContext as ContextApplicationContext;
 use Hyperf\DbConnection\Db;
+use Hyperf\Di\Annotation\Inject;
 use Hyperf\RpcServer\Annotation\RpcService;
 use App\Tools\Result;
+use QL\QueryList;
+use Swoole\Coroutine;
+//use App\Service\GatherQueueService;
+use App\Amqp\Producer\ImportProducer;
+
 
 use function Hyperf\Support\retry;
 
 #[RpcService(name: "CollectorService", protocol: "jsonrpc-http", server: "jsonrpc-http")]
 class CollectorService implements CollectorServiceInterface
 {
+//    #[Inject]
+//    protected GatherQueueService $Gservice;
     /**
      * 添加网站
      * @param array $data
@@ -138,6 +149,12 @@ class CollectorService implements CollectorServiceInterface
         }
         return Result::success($result);
     }
+
+    /**
+     * 添加任务规则
+     * @param array $data
+     * @return array|mixed
+     */
     public function addRule(array $data): array
     {
         $web = Web::where('id',$data['web_id'])->get();
@@ -186,7 +203,8 @@ class CollectorService implements CollectorServiceInterface
                         // var_dump("============3============");       
                         break;              
                 }
-                if(isset($data['source']) && $data['type'] != 3){
+                if(!empty($data['source']) && $data['type'] != 3){
+
                     $rule ['source'] = $data['source'];
                 }
                 if(isset($data['writer_class']) && $data['type'] != 3){
@@ -201,44 +219,62 @@ class CollectorService implements CollectorServiceInterface
                 
             }else{
                 return Result::error('此任务已存在!');
-            }  
+            } 
+             
         }
         return Result::success($result);
     }
-    
     /**
      * 获取并搜索规则任务
      * @param array $data
      * @return array|mixed
      */
+
     public function getRule(array $data): array
     {
-        $web = Web::where('id',$data['web_id'])->get();
-        if(empty($web->toArray())){
-            return Result::error('请输入正确的网站id!');
-            
-        }else{
-            $where = [
-                ['web_id','=', $data['web_id']]
-            ];
-            if(isset($data['keyWord'])){
-                //若存在搜索词,则存到条件数组$where
+
+        $where = [];
+        
+        if(isset($data['web_id'])){
+            $web = Web::where('id',$data['web_id'])->get();
+            if(empty($web->toArray())){
+                return Result::error('请输入正确的网站id!');
+                
+            }else{
+                //若是根据网站跳转到的规则任务则存到$where数组
                 $where = [
-                    ['name','like','%'.$data['keyWord'].'%']
+                    ['web_id','=', $data['web_id']]
                 ];
             }
+        } 
+
+        if(isset($data['keyWord'])){
+            //若存在搜索词,则存到条件数组$where中
+            $where = [
+                ['name','like','%'.$data['keyWord'].'%']
+            ];
+        }
+        if(empty($where)){
+            $rep = Rule::withCount(relations:'arts')->limit($data['pageSize'])->orderBy("created_at","desc")->offset(($data['page']-1)*$data['pageSize'])->get();
+        }else{
             $rep = Rule::withCount(relations:'arts')->where($where)->limit($data['pageSize'])->orderBy("created_at","desc")->offset(($data['page']-1)*$data['pageSize'])->get();
-            $count = Rule::where($where)->count();
-            if($count==0){
-                return Result::error('暂无相关规则任务!');
-            }       
         }
+        
+        $count = Rule::where($where)->count();
+        if($count==0){
+            return Result::error('暂无相关规则任务!');
+        }       
+        
         $data = [
             'rep' => $rep->toArray(),
             'count' => $count
         ];
+
         return Result::success($data);
+
     }
+    
+
      /**
      * 获取某个任务规则
      * @param array $data
@@ -253,6 +289,7 @@ class CollectorService implements CollectorServiceInterface
         }else{
             return Result::success($result);
         } 
+
     }
     /**
      * 修改规则任务
@@ -322,15 +359,260 @@ class CollectorService implements CollectorServiceInterface
      * @param array $data
      * @return array
      */
+
     public function sendCrawler(array $data): array
     {
-        $result =  Article::get();
-        $b = OldArticle::get();
-        $a = [
-            'old'=>$b,
-            'new'=>$result
+        var_dump("接收到的数据:",$data);
+        $message = new GatherProducer($data);
+        $producer = ContextApplicationContext::getContainer()->get(Producer::class);
+        $a = $producer->produce($message);
+        var_dump("生产者:",$a);
+//       $result =  $this->Gservice->push($data,rand(5,20));
+       return  Result::success([]);
+    }
+
+    /**
+     * @param array $data
+     * @return array
+     */
+    public function goCrawler(array $data): array
+    {
+        //通过规则id 查询规则类型
+        $where = [
+            'rule.id'=>$data['id']
+        ];
+        $info = Rule::where($where)->leftJoin('web','rule.web_id','web.id')
+            ->select("rule.*","web.name as web_name","web.url as web_url","web.type as web_type")
+            ->first();
+        $info = $info->toArray();
+
+        switch ($info['web_type']){
+            case 1:
+                var_dump("===========规则采集======",$info);
+                Rule::where(['id'=>$data['id']])->update(['status'=>1]);
+                $data['copyfrom'] = $info['web_name'];
+                $data['author'] = $info['writer'];;
+                $data['first_url'] = $info['first_url'];
+                $data['second_start'] = $info['second_start'];
+                $data['second_num'] = $info['second_num'];
+                $data['second_end'] = $info['second_end'];
+                $data['end_pagenum']= $info['end_pagenum'];
+                $data['rule_id']= $data['id'];
+                $data['admin_user_id']= $data['admin_user_id'];
+//                $data['newUrlStr'] =
+                $urlList = $this->addUrlArr($data);
+                if($urlList){
+                    foreach ($urlList as $val){
+//                        var_dump("单列表地址:",$val);
+                        $this->ruleCollection($val,$data);
+                    }
+                }
+                Rule::where(['id'=>$data['id']])->update(['status'=>2]);
+                break;
+            case 2:
+                Rule::where(['id'=>$data['id']])->update(['status'=>1]);
+                $wecUrl = $info['first_url'];//'https://www.ndcpa.gov.cn/queryList';
+                $parames = json_decode($info['parameter'],true);
+
+//                var_dump($parames);die;
+
+                $parames['webSiteCode'] = [trim($parames['webSiteCode'], "[]")]; //['jbkzzx'];//
+                $parames['channelCode'] =  [trim($parames['channelCode'], "[]")]; // ['c100008'];//
+                $other = [
+                    'web_url'=>$info['web_url'],
+                    'copyfrom'=>$info['web_name'],
+                    'admin_user_id'=>$data['admin_user_id'],
+                    'rule_id'=>$data['id'],
+                    'writer'=>$info['writer'],
+                ];
+                var_dump("=======开始接口采集====",$parames);
+//                die;
+                $this->foreachCurl($wecUrl,$parames,$other);
+
+                Rule::where(['id'=>$data['id']])->update(['status'=>2]);
+
+
+
+        }
+        return  Result::success([]);
+    }
+
+    /**
+     * 把可采集的列表页连接 打包成一个大数组
+     * @return void
+     */
+    public function addUrlArr($data)
+    {
+        $arrList = [];
+        array_push($arrList,$data['first_url']);
+
+            $exit = false;
+            $i = 0;
+            while(!$exit){
+                $i++;
+                $url = $data['second_start'].$i.$data['second_end'];
+                $respon1 = Result::pageExists($url);
+//                var_dump("采集地址:",$respon1,$url);
+//                Coroutine::sleep(2);
+                if ($i==intval($data['end_pagenum'])-1) {
+                    $exit = true;
+//                    Coroutine::exit(); // 退出循环
+                }else{
+                    array_push($arrList,$url);
+                }
+            }
+
+
+        return $arrList;
+    }
+
+    
+    /**
+     * 按照规则采集数据
+     * @return void
+     */
+    public function ruleCollection($url,$data)
+    {
+//        var_dump("采集参数:",$data);
+        $list = QueryList::get($url);
+        $dataList = $list->rules([
+            'title' => ['a', 'text'],
+            'link'  => ['a', 'href'],
+        ])->range('.list1 li')->query()->getData();
+//        var_dump("采集的内容:",$dataList);
+//        var_dump("====",$dataList);die;
+        $firstUrlArr =  explode("/", $url);
+        array_pop($firstUrlArr);
+        $firstUrlArr = implode('/',$firstUrlArr);
+
+        $dataList = $dataList->toArray();
+        if($dataList){
+            foreach ($dataList as $tiem){
+                $newUrl =  substr($tiem['link'], 1);
+                $newUrlStr = $firstUrlArr.$newUrl;
+                $detailContent = QueryList::get($newUrlStr);
+                $detailData = $detailContent->rules([
+                    'title'=>['h1','text'],
+                    'content'=>['.TRS_UEDITOR','html'],
+                ])->range(".news-details")->query()->getData();
+
+                $detailData = $detailData->toArray();
+//                var_dump("内容详情:",$detailData,$newUrlStr);
+                if($detailData){
+                    foreach ($detailData as $val){
+//                        var_dump("进没进foreach:",$newUrlStr,$val);
+                        $data['fromurl'] = $newUrlStr;
+                        $data['title'] = $val['title'];
+                        $data['content'] = $val['content'];
+                        $data['newUrlStr'] = $newUrlStr;
+                        $data['source'] = '';
+                        $data['introduce'] = $val['title']??'';
+                        $data['keyword'] = $val['title']??'';
+                        $data['copyfrom'] = $data['copyfrom'];
+                        $data['source'] = $data['source']??$data['copyfrom'];
+                        $data['admin_user_id'] = $data['admin_user_id']??'';
+                        $data['rule_id'] = $data['rule_id']??'';
+//                        $data['copyfrom'] = $data['copyfrom'];
+//                        var_dump("要插入的数据:",$data);
+                        $this->insertArticleData($data);
+                    }
+                }
+
+            }
+//
+        }
+    }
+
+    /**
+     * 插入数据
+     * @param $data
+     * @return void
+     */
+    public function insertArticleData($data=[])
+    {
+        if($data){
+            Db::beginTransaction();
+            try{
+                $articleInfo =  Article::where(['title'=>$data['title']])->first();
+//                var_dump("获取详情:",$articleInfo,$data);
+                if(empty($articleInfo)){
+                    $insertData = [];
+                    $insertData['fromurl'] =$data['newUrlStr'];
+                    $insertData['oldtitle'] =$data['title'];
+                    $insertData['title'] = $data['title'];
+                    $insertData['copyfrom'] =  $data['copyfrom'];
+                    $insertData['author'] = $data['author'];
+                    $insertData['introduce'] = $data['title'];
+                    $insertData['keyword'] = $data['title'];
+                    $insertData['source'] = isset($data['source']) && $data['source']!=''? $data['source']:$data['copyfrom'];
+                    $insertData['admin_user_id'] = $data['admin_user_id'];
+                    $insertData['rule_id'] = $data['rule_id'];
+//                    var_dump("插入Article:",$insertData);
+                    $article_id = Article::insertGetId($insertData);
+                    $insertDataDetail = [];
+                    $insertDataDetail['article_id'] = $article_id;
+                    $insertDataDetail['content'] = $data['content'];
+//                    var_dump("插入ArticleData:",$insertDataDetail);
+                    ArticleData::insertGetId($insertDataDetail);
+//                        Coroutine::sleep(2);
+//                    var_dump("插入成功一次:",$article_id,$insertDataDetail);
+                }
+                Db::commit();
+            }catch (\Exception $e){
+                Db::rollBack();
+                var_dump("插入失败:",$e->getMessage());
+            }
+
+        }else{
+            var_dump("没有数据可以插入:");
+        }
+
+    }
+
+    /**
+     * 分页采集
+     * @return void
+     */
+    public function foreachCurl($wecUrl,$parames,$other,&$page=1)
+    {
+        $options = [
+            CURLOPT_HEADER => true, // 如果想包含头部信息在响应中,可以设置为true
+            CURLOPT_TIMEOUT => 30 // 设置请求超时时间为30秒
         ];
-        return  Result::success($a);
+        $result = Result::http_post($wecUrl,$parames,$options);
+        $result = json_decode($result['response'],true);
+//        var_dump("获取数据:",$result);
+        if($result['data'] && $result['data']['results']){
+            $dataList  = $result['data']['results'];
+//            var_dump("取数据结构体:",$dataList);
+            foreach ($dataList as $val){
+//                var_dump("进入循环插入:",$val);
+                $newUrlStr = json_decode($val['source']['urls'],true);
+                $newUrlStr = $other['web_url'].$newUrlStr['common'];
+//                var_dump("来源地址:",$newUrlStr);
+                $insertData = [
+                    'newUrlStr'=>$newUrlStr,
+                    'title'=>$val['source']['title']??'',
+                    'source'=>$val['source']['contentSource']??'',
+                    'copyfrom'=>$other['copyfrom']??'',
+                    'content'=>$val['source']['content']['content']??'',
+                    'admin_user_id'=>$other['admin_user_id']??'',
+                    'rule_id'=>$other['rule_id']??'',
+                    'author'=>$other['writer']??''
+                ];
+//                var_dump("调用插入数据方法,组装数据:",$insertData);
+                $this->insertArticleData($insertData);
+            }
+        }
+        $pages = intval($parames['current']);
+        $pages =  $pages+1;
+        $parames['current'] = $pages;
+        $twoResult = Result::http_post($wecUrl,$parames,$options);
+        if($result['data'] && $result['data']['results'] && count($result['data']['results'])>0){
+//            var_dump("分页测试:",$parames,$parames['current']);
+            $this->foreachCurl($wecUrl,$parames,$other,$pages);
+        }
+//        var_dump("正确的数据:",$result);
     }
     /**
      * 获取并搜索资讯
@@ -358,9 +640,7 @@ class CollectorService implements CollectorServiceInterface
         //跨库查询栏目导航及采集的新闻
         $info = Article::query()
         ->where($where)
-        ->with(['category' => function ($query) {
-            $query->select('name');
-        }])
+        ->with('category')
         ->orderBy("article.id","desc")
         ->limit($data['pageSize'])
         ->offset(($data['page']-1)*$data['pageSize'])->get();
@@ -476,8 +756,11 @@ class CollectorService implements CollectorServiceInterface
     public function addCatid(array $data): array
     {
         $id = $data['rule_id'];
-        //查找此规则任务下的文章是否都已经导入
-        $info = Article::where('rule_id',$id)->where('state',0)->select('id')->get();
+        $art = Article::where('rule_id',$id)->select('id')->count();
+        if($art==0){
+            return Result::error('还未采集,请采集');
+        }else{
+            $info = Article::where('rule_id',$id)->where('state',0)->select('id')->get();
         if(empty($info->toArray())){
             return Result::error('所有文章都已导入,不可修改关联的导航池!'); 
         }else{
@@ -506,25 +789,45 @@ class CollectorService implements CollectorServiceInterface
             }
               
         }
+        }
+        //查找此规则任务下的文章是否都已经导入
+        
         return  Result::success($result);
     }
     /**
-    * 导入文章
+    * 导入文章(生产者)
     * @param array $data
     * @return array
     */
     public function addArt(array $data): array
     {
+        var_dump("接收到的数据:",$data);
+        $message = new ImportProducer($data);
+        $producer = ContextApplicationContext::getContainer()->get(Producer::class);
+        $a = $producer->produce($message);
+        var_dump("生产者:",$a);
+//       $result =  $this->Gservice->push($data,rand(5,20));
+       return  Result::success([]);
+    }
+    /**
+    * 导入文章(消费者)
+    * @param array $data
+    * @return array
+    */
+    public function goAddArt(array $data): array
+    {
+        // var_dump('准备去消费------',$data);
         // var_dump("======@@@====");
         $where = [
             'rule_id' => $data['rule_id'],
             'state' => 0
         ];
         //获取某个规则任务下的已采集未导入的文章及文章详情
-        $arts_id = Article::where($where)->wherenotNull('catid')->orderBy('id')->select('id')->get();
+        $arts_id = Article::where($where)->wherenotNull('catid')->select('id')->orderBy('id')->get()->toArray();
         $arts = Article::where($where)->wherenotNull('catid')->select('title','catid','level','introduce','keyword','author','copyfrom','fromurl','hits','islink','imgurl','admin_user_id','is_original')->orderBy('id')->get()->toArray();       
-        $arts_data = ArticleData::whereIn('article_id',$arts_id)->select('content')->orderBy('article_id')->get()->toArray();
-        // var_dump($article_data);
+        // var_dump('=============:::',$arts_id);
+        $arts_data = ArticleData::whereIn('article_id',$arts_id)->select('content')->orderBy('article_id','desc')->get()->toArray();
+        // var_dump('=============',$arts);
         $data = [
             'articles' => $arts,
             'art_content' => $arts_data

+ 2 - 1
composer.json

@@ -14,6 +14,7 @@
     "require": {
         "php": ">=8.1",
         "doctrine/annotations": "^2.0",
+        "hyperf/amqp": "^3.1",
         "hyperf/cache": "~3.1.0",
         "hyperf/command": "~3.1.0",
         "hyperf/config": "~3.1.0",
@@ -32,7 +33,7 @@
         "hyperf/nacos": "^3.1",
         "hyperf/process": "~3.1.0",
         "hyperf/redis": "~3.1.0",
-        "hyperf/rpc-server": "*",
+        "hyperf/rpc-server": "^3.1",
         "hyperf/service-governance": "^3.1",
         "hyperf/service-governance-consul": "^3.1",
         "hyperf/service-governance-nacos": "^3.1",

Різницю між файлами не показано, бо вона завелика
+ 488 - 101
composer.lock


+ 34 - 0
config/autoload/amqp.php

@@ -0,0 +1,34 @@
+<?php
+declare(strict_types=1);
+
+use function Hyperf\Support\env;
+return [
+    'enable' => true,
+    'default' => [
+        'host' =>  env('AMQP_HOST', '192.168.1.201'),
+        'port' =>  (int)env('AMQP_PORT', 5672),
+        'user' =>  env('AMQP_USER', 'admin'),
+        'password' =>  env('AMQP_PASSWORD', '123456'),
+        'vhost' => '/',
+        'concurrent' => [
+            'limit' => 1,
+        ],
+        'pool' => [
+            'connections' => 1,
+        ],
+        'params' => [
+            'insist' => false,
+            'login_method' => 'AMQPLAIN',
+            'login_response' => null,
+            'locale' => 'en_US',
+            'connection_timeout' => 300.0,
+            // 尽量保持是 heartbeat 数值的两倍
+            'read_write_timeout' => 600.0,
+            'context' => null,
+            'keepalive' => false,
+            // 尽量保证每个消息的消费时间小于心跳时间
+            'heartbeat' => 3,
+            'close_on_destruct' => false,
+        ],
+    ],
+];

Різницю між файлами не показано, бо вона завелика
+ 0 - 0
runtime/container/classes.cache


Різницю між файлами не показано, бо вона завелика
+ 0 - 0
runtime/container/scan.cache


Різницю між файлами не показано, бо вона завелика
+ 0 - 1002
runtime/logs/hyperf.log


Деякі файли не було показано, через те що забагато файлів було змінено