|
@@ -1,6 +1,7 @@
|
|
|
<?php
|
|
|
namespace App\JsonRpc;
|
|
|
|
|
|
+use App\Amqp\Producer\GatherProducer;
|
|
|
use App\Model\ArticleData;
|
|
|
use App\Model\OldModel\Article as OldArticle;
|
|
|
use App\Model\OldModel\ArticleData as OldArticleData;
|
|
@@ -8,18 +9,25 @@ use App\Model\OldModel\Category;
|
|
|
use App\Model\Article;
|
|
|
use App\Model\Rule;
|
|
|
use App\Model\Web;
|
|
|
+use Hyperf\Amqp\Producer;
|
|
|
+use Hyperf\Context\ApplicationContext as ContextApplicationContext;
|
|
|
use Hyperf\DbConnection\Db;
|
|
|
+use Hyperf\Di\Annotation\Inject;
|
|
|
use Hyperf\RpcServer\Annotation\RpcService;
|
|
|
use App\Tools\Result;
|
|
|
use QL\QueryList;
|
|
|
use Swoole\Coroutine;
|
|
|
+use App\Service\GatherQueueService;
|
|
|
+use App\Amqp\Producer\ImportProducer;
|
|
|
+
|
|
|
|
|
|
use function Hyperf\Support\retry;
|
|
|
|
|
|
#[RpcService(name: "CollectorService", protocol: "jsonrpc-http", server: "jsonrpc-http")]
|
|
|
class CollectorService implements CollectorServiceInterface
|
|
|
{
|
|
|
-
|
|
|
+ #[Inject]
|
|
|
+ protected GatherQueueService $Gservice;
|
|
|
/**
|
|
|
* 添加网站
|
|
|
* @param array $data
|
|
@@ -141,6 +149,7 @@ class CollectorService implements CollectorServiceInterface
|
|
|
}
|
|
|
return Result::success($result);
|
|
|
}
|
|
|
+
|
|
|
/**
|
|
|
* 添加任务规则
|
|
|
* @param array $data
|
|
@@ -195,16 +204,15 @@ class CollectorService implements CollectorServiceInterface
|
|
|
break;
|
|
|
}
|
|
|
if(!empty($data['source']) && $data['type'] != 3){
|
|
|
+
|
|
|
$rule ['source'] = $data['source'];
|
|
|
}
|
|
|
if(isset($data['writer_class']) && $data['type'] != 3){
|
|
|
$rule ['writer_class'] = $data['writer_class'];
|
|
|
}
|
|
|
-
|
|
|
if(isset($data['writer']) && $data['type'] != 3){
|
|
|
$rule ['writer'] = $data['writer'];
|
|
|
}
|
|
|
-
|
|
|
date_default_timezone_set('Asia/Shanghai');
|
|
|
//若不存在,根据网站类型添加到不行类型的规则表中
|
|
|
$result = Rule::insertGetId($rule);
|
|
@@ -246,7 +254,6 @@ class CollectorService implements CollectorServiceInterface
|
|
|
['name','like','%'.$data['keyWord'].'%']
|
|
|
];
|
|
|
}
|
|
|
-
|
|
|
if(empty($where)){
|
|
|
$rep = Rule::withCount(relations:'arts')->limit($data['pageSize'])->orderBy("created_at","desc")->offset(($data['page']-1)*$data['pageSize'])->get();
|
|
|
}else{
|
|
@@ -282,6 +289,7 @@ class CollectorService implements CollectorServiceInterface
|
|
|
}else{
|
|
|
return Result::success($result);
|
|
|
}
|
|
|
+
|
|
|
}
|
|
|
/**
|
|
|
* 修改规则任务
|
|
@@ -351,7 +359,23 @@ class CollectorService implements CollectorServiceInterface
|
|
|
* @param array $data
|
|
|
* @return array
|
|
|
*/
|
|
|
+
|
|
|
public function sendCrawler(array $data): array
|
|
|
+ {
|
|
|
+ var_dump("接收到的数据:",$data);
|
|
|
+ $message = new GatherProducer($data);
|
|
|
+ $producer = ContextApplicationContext::getContainer()->get(Producer::class);
|
|
|
+ $a = $producer->produce($message);
|
|
|
+ var_dump("生产者:",$a);
|
|
|
+// $result = $this->Gservice->push($data,rand(5,20));
|
|
|
+ return Result::success([]);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @param array $data
|
|
|
+ * @return array
|
|
|
+ */
|
|
|
+ public function goCrawler(array $data): array
|
|
|
{
|
|
|
//通过规则id 查询规则类型
|
|
|
$where = [
|
|
@@ -364,10 +388,10 @@ class CollectorService implements CollectorServiceInterface
|
|
|
|
|
|
switch ($info['web_type']){
|
|
|
case 1:
|
|
|
- var_dump("wojinlailaile======",$info);
|
|
|
+ var_dump("===========规则采集======",$info);
|
|
|
Rule::where(['id'=>$data['id']])->update(['status'=>1]);
|
|
|
$data['copyfrom'] = $info['web_name'];
|
|
|
- $data['author'] = '刘德华';
|
|
|
+ $data['author'] = $info['writer'];;
|
|
|
$data['first_url'] = $info['first_url'];
|
|
|
$data['second_start'] = $info['second_start'];
|
|
|
$data['second_num'] = $info['second_num'];
|
|
@@ -379,28 +403,36 @@ class CollectorService implements CollectorServiceInterface
|
|
|
$urlList = $this->addUrlArr($data);
|
|
|
if($urlList){
|
|
|
foreach ($urlList as $val){
|
|
|
- var_dump("单列表地址:",$val);
|
|
|
+// var_dump("单列表地址:",$val);
|
|
|
$this->ruleCollection($val,$data);
|
|
|
}
|
|
|
}
|
|
|
Rule::where(['id'=>$data['id']])->update(['status'=>2]);
|
|
|
break;
|
|
|
case 2:
|
|
|
-
|
|
|
+ Rule::where(['id'=>$data['id']])->update(['status'=>1]);
|
|
|
$wecUrl = $info['first_url'];//'https://www.ndcpa.gov.cn/queryList';
|
|
|
$parames = json_decode($info['parameter'],true);
|
|
|
+
|
|
|
+// var_dump($parames);die;
|
|
|
+
|
|
|
$parames['webSiteCode'] = [trim($parames['webSiteCode'], "[]")]; //['jbkzzx'];//
|
|
|
$parames['channelCode'] = [trim($parames['channelCode'], "[]")]; // ['c100008'];//
|
|
|
$other = [
|
|
|
'web_url'=>$info['web_url'],
|
|
|
'copyfrom'=>$info['web_name'],
|
|
|
'admin_user_id'=>$data['admin_user_id'],
|
|
|
- 'rule_id'=>$data['id']
|
|
|
+ 'rule_id'=>$data['id'],
|
|
|
+ 'writer'=>$info['writer'],
|
|
|
];
|
|
|
- var_dump("开始调用接口方法====",$parames);
|
|
|
+ var_dump("=======开始接口采集====",$parames);
|
|
|
// die;
|
|
|
$this->foreachCurl($wecUrl,$parames,$other);
|
|
|
|
|
|
+ Rule::where(['id'=>$data['id']])->update(['status'=>2]);
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
}
|
|
|
return Result::success([]);
|
|
|
}
|
|
@@ -420,7 +452,7 @@ class CollectorService implements CollectorServiceInterface
|
|
|
$i++;
|
|
|
$url = $data['second_start'].$i.$data['second_end'];
|
|
|
$respon1 = Result::pageExists($url);
|
|
|
- var_dump("采集地址:",$respon1,$url);
|
|
|
+// var_dump("采集地址:",$respon1,$url);
|
|
|
// Coroutine::sleep(2);
|
|
|
if ($i==intval($data['end_pagenum'])-1) {
|
|
|
$exit = true;
|
|
@@ -434,19 +466,20 @@ class CollectorService implements CollectorServiceInterface
|
|
|
return $arrList;
|
|
|
}
|
|
|
|
|
|
+
|
|
|
/**
|
|
|
* 按照规则采集数据
|
|
|
* @return void
|
|
|
*/
|
|
|
public function ruleCollection($url,$data)
|
|
|
{
|
|
|
- var_dump("采集参数:",$data);
|
|
|
+// var_dump("采集参数:",$data);
|
|
|
$list = QueryList::get($url);
|
|
|
$dataList = $list->rules([
|
|
|
'title' => ['a', 'text'],
|
|
|
'link' => ['a', 'href'],
|
|
|
])->range('.list1 li')->query()->getData();
|
|
|
- var_dump("采集的内容:",$dataList);
|
|
|
+// var_dump("采集的内容:",$dataList);
|
|
|
// var_dump("====",$dataList);die;
|
|
|
$firstUrlArr = explode("/", $url);
|
|
|
array_pop($firstUrlArr);
|
|
@@ -464,10 +497,10 @@ class CollectorService implements CollectorServiceInterface
|
|
|
])->range(".news-details")->query()->getData();
|
|
|
|
|
|
$detailData = $detailData->toArray();
|
|
|
- var_dump("内容详情:",$detailData,$newUrlStr);
|
|
|
+// var_dump("内容详情:",$detailData,$newUrlStr);
|
|
|
if($detailData){
|
|
|
foreach ($detailData as $val){
|
|
|
- var_dump("进没进foreach:",$newUrlStr,$val);
|
|
|
+// var_dump("进没进foreach:",$newUrlStr,$val);
|
|
|
$data['fromurl'] = $newUrlStr;
|
|
|
$data['title'] = $val['title'];
|
|
|
$data['content'] = $val['content'];
|
|
@@ -480,7 +513,7 @@ class CollectorService implements CollectorServiceInterface
|
|
|
$data['admin_user_id'] = $data['admin_user_id']??'';
|
|
|
$data['rule_id'] = $data['rule_id']??'';
|
|
|
// $data['copyfrom'] = $data['copyfrom'];
|
|
|
- var_dump("要插入的数据:",$data);
|
|
|
+// var_dump("要插入的数据:",$data);
|
|
|
$this->insertArticleData($data);
|
|
|
}
|
|
|
}
|
|
@@ -501,7 +534,7 @@ class CollectorService implements CollectorServiceInterface
|
|
|
Db::beginTransaction();
|
|
|
try{
|
|
|
$articleInfo = Article::where(['title'=>$data['title']])->first();
|
|
|
- var_dump("获取详情:",$articleInfo,$data);
|
|
|
+// var_dump("获取详情:",$articleInfo,$data);
|
|
|
if(empty($articleInfo)){
|
|
|
$insertData = [];
|
|
|
$insertData['fromurl'] =$data['newUrlStr'];
|
|
@@ -519,7 +552,7 @@ class CollectorService implements CollectorServiceInterface
|
|
|
$insertDataDetail = [];
|
|
|
$insertDataDetail['article_id'] = $article_id;
|
|
|
$insertDataDetail['content'] = $data['content'];
|
|
|
- var_dump("插入ArticleData:",$insertDataDetail);
|
|
|
+// var_dump("插入ArticleData:",$insertDataDetail);
|
|
|
ArticleData::insertGetId($insertDataDetail);
|
|
|
// Coroutine::sleep(2);
|
|
|
// var_dump("插入成功一次:",$article_id,$insertDataDetail);
|
|
@@ -548,7 +581,7 @@ class CollectorService implements CollectorServiceInterface
|
|
|
];
|
|
|
$result = Result::http_post($wecUrl,$parames,$options);
|
|
|
$result = json_decode($result['response'],true);
|
|
|
- var_dump("获取数据:",$result);
|
|
|
+// var_dump("获取数据:",$result);
|
|
|
if($result['data'] && $result['data']['results']){
|
|
|
$dataList = $result['data']['results'];
|
|
|
// var_dump("取数据结构体:",$dataList);
|
|
@@ -565,7 +598,7 @@ class CollectorService implements CollectorServiceInterface
|
|
|
'content'=>$val['source']['content']['content']??'',
|
|
|
'admin_user_id'=>$other['admin_user_id']??'',
|
|
|
'rule_id'=>$other['rule_id']??'',
|
|
|
- 'author'=>'冯蕊'
|
|
|
+ 'author'=>$other['writer']??''
|
|
|
];
|
|
|
// var_dump("调用插入数据方法,组装数据:",$insertData);
|
|
|
$this->insertArticleData($insertData);
|
|
@@ -576,7 +609,7 @@ class CollectorService implements CollectorServiceInterface
|
|
|
$parames['current'] = $pages;
|
|
|
$twoResult = Result::http_post($wecUrl,$parames,$options);
|
|
|
if($result['data'] && $result['data']['results'] && count($result['data']['results'])>0){
|
|
|
- var_dump("分页测试:",$parames,$parames['current']);
|
|
|
+// var_dump("分页测试:",$parames,$parames['current']);
|
|
|
$this->foreachCurl($wecUrl,$parames,$other,$pages);
|
|
|
}
|
|
|
// var_dump("正确的数据:",$result);
|
|
@@ -762,22 +795,39 @@ class CollectorService implements CollectorServiceInterface
|
|
|
return Result::success($result);
|
|
|
}
|
|
|
/**
|
|
|
- * 导入文章
|
|
|
+ * 导入文章(生产者)
|
|
|
* @param array $data
|
|
|
* @return array
|
|
|
*/
|
|
|
public function addArt(array $data): array
|
|
|
{
|
|
|
+ var_dump("接收到的数据:",$data);
|
|
|
+ $message = new ImportProducer($data);
|
|
|
+ $producer = ContextApplicationContext::getContainer()->get(Producer::class);
|
|
|
+ $a = $producer->produce($message);
|
|
|
+ var_dump("生产者:",$a);
|
|
|
+// $result = $this->Gservice->push($data,rand(5,20));
|
|
|
+ return Result::success([]);
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * 导入文章(消费者)
|
|
|
+ * @param array $data
|
|
|
+ * @return array
|
|
|
+ */
|
|
|
+ public function goAddArt(array $data): array
|
|
|
+ {
|
|
|
+ // var_dump('准备去消费------',$data);
|
|
|
// var_dump("======@@@====");
|
|
|
$where = [
|
|
|
'rule_id' => $data['rule_id'],
|
|
|
'state' => 0
|
|
|
];
|
|
|
//获取某个规则任务下的已采集未导入的文章及文章详情
|
|
|
- $arts_id = Article::where($where)->wherenotNull('catid')->orderBy('id')->select('id')->get();
|
|
|
+ $arts_id = Article::where($where)->wherenotNull('catid')->select('id')->orderBy('id')->get()->toArray();
|
|
|
$arts = Article::where($where)->wherenotNull('catid')->select('title','catid','level','introduce','keyword','author','copyfrom','fromurl','hits','islink','imgurl','admin_user_id','is_original')->orderBy('id')->get()->toArray();
|
|
|
- $arts_data = ArticleData::whereIn('article_id',$arts_id)->select('content')->orderBy('article_id')->get()->toArray();
|
|
|
- // var_dump($article_data);
|
|
|
+ // var_dump('=============:::',$arts_id);
|
|
|
+ $arts_data = ArticleData::whereIn('article_id',$arts_id)->select('content')->orderBy('article_id','desc')->get()->toArray();
|
|
|
+ // var_dump('=============',$arts);
|
|
|
$data = [
|
|
|
'articles' => $arts,
|
|
|
'art_content' => $arts_data
|