CollectorService.php 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553
  1. <?php
  2. namespace App\JsonRpc;
  3. use App\Amqp\Producer\GatherProducer;
  4. use App\Model\ArticleData;
  5. use App\Model\OldModel\Article as OldArticle;
  6. use App\Model\Article;
  7. use App\Model\Rule;
  8. use App\Model\Web;
  9. use Hyperf\Amqp\Producer;
  10. use Hyperf\Context\ApplicationContext as ContextApplicationContext;
  11. use Hyperf\DbConnection\Db;
  12. use Hyperf\Di\Annotation\Inject;
  13. use Hyperf\RpcServer\Annotation\RpcService;
  14. use App\Tools\Result;
  15. use QL\QueryList;
  16. use Swoole\Coroutine;
  17. //use App\Service\GatherQueueService;
  18. #[RpcService(name: "CollectorService", protocol: "jsonrpc-http", server: "jsonrpc-http")]
  19. class CollectorService implements CollectorServiceInterface
  20. {
  21. // #[Inject]
  22. // protected GatherQueueService $Gservice;
  23. /**
  24. * 添加网站
  25. * @param array $data
  26. * @return array|mixed
  27. */
  28. public function addWeb(array $data): array
  29. {
  30. $where = [
  31. 'name' => $data['name']
  32. ];
  33. $isweb = Web::where($where)->first();
  34. if(empty($isweb)){
  35. date_default_timezone_set('Asia/Shanghai');
  36. $time = time();
  37. $catetime = date('Y-m-d H:i:s', $time);
  38. $data['created_at'] = $catetime;
  39. $web = Web::insert($data);
  40. }else{
  41. return Result::error('此网站已存在,不可重复添加!');
  42. }
  43. if(empty($web)){
  44. return Result::error('添加失败');
  45. }
  46. return Result::success('添加成功');
  47. }
  48. /**
  49. * 获取并搜索网站
  50. * @param array $data
  51. * @return array|mixed
  52. */
  53. public function getWeb(array $data): array
  54. {
  55. if(isset($data['keyWord'])){
  56. $where = [
  57. ['name','like','%'.$data['keyWord'].'%']
  58. ];
  59. $webss = Web::where($where)->first();
  60. if(empty($webss)){
  61. return Result::error('未查找到相关网站!');
  62. }
  63. }else{
  64. $web = Web::get();
  65. }
  66. if(empty($web)){
  67. return Result::error('您还未添加网站,请先去添加!');
  68. }
  69. return Result::success($web);
  70. }
  71. /**
  72. * 修改网站
  73. * @param array $data
  74. * @return array|mixed
  75. */
  76. public function upWeb(array $data): array
  77. {
  78. $web = Web::where('id',$data['id'])->first();
  79. if(empty($web)){
  80. return Result::error('请输入正确的网站id!');
  81. }else{
  82. $id = Web::where('id',$data['id'])->update($data);
  83. if(empty($id)){
  84. return Result::error('无法修改!');
  85. }
  86. }
  87. return Result::success($id);
  88. }
  89. /**
  90. * 删除网站
  91. * @param array $data
  92. * @return array|mixed
  93. */
  94. public function delWeb(array $data): array
  95. {
  96. $web = Web::where('id',$data['id'])->first();
  97. if(empty($web)){
  98. return Result::error('请输入正确的网站id!');
  99. }else{
  100. $id = Web::where('id',$data['id'])->delete();
  101. if(empty($id)){
  102. return Result::error('无法删除!');
  103. }
  104. }
  105. return Result::success($id);
  106. }
  107. /**
  108. * 添加任务规则
  109. * @param array $data
  110. * @return array|mixed
  111. */
  112. public function addRule(array $data): array
  113. {
  114. $web = Web::where('id',$data['web_id'])->get();
  115. if(empty($web->toArray())){
  116. return Result::error('请输入正确的网站id!');
  117. }else{
  118. $rulename = Rule::where('name',$data['name'])->get();
  119. //查找是否存在规则名称重复的
  120. if(empty($rulename->toArray())){
  121. //(若是多类型参数一起传过来则根据类型,只获取对应类型需要的参数)
  122. switch($data['type']){
  123. case 1:
  124. $rule = [
  125. 'name' => $data['name'],
  126. 'web_id' => $data['web_id'],
  127. 'first_url' => $data['first_url'],
  128. 'second_start' => $data['second_start'],
  129. 'second_num' => $data['second_num'],
  130. 'second_end' => $data['second_end'],
  131. 'end_pagenum' => $data['end_pagenum'],
  132. 'start' => $data['start'],
  133. 'title' => $data['title'],
  134. 'content' => $data['content']
  135. ];
  136. // var_dump("============1============");
  137. break;
  138. case 2:
  139. $rule = [
  140. 'name' => $data['name'],
  141. 'web_id' => $data['web_id'],
  142. 'first_url' => $data['first_url'],
  143. 'parameter' => $data['parameter'],
  144. 'start' => $data['start'],
  145. 'title' => $data['title'],
  146. 'content' => $data['content']
  147. ];
  148. // var_dump("============2============");
  149. break;
  150. default:
  151. $rule = [
  152. 'name' => $data['name'],
  153. 'web_id' => $data['web_id'],
  154. 'diy_rule' => $data['diy_rule']
  155. ];
  156. // var_dump("============3============");
  157. break;
  158. }
  159. if(!empty($data['source']) && $data['type'] != 3){
  160. $rule ['source'] = $data['source'];
  161. }
  162. if(isset($data['writer_class']) && $data['type'] != 3){
  163. $rule ['writer_class'] = $data['writer_class'];
  164. }
  165. if(isset($data['writer']) && $data['type'] != 3){
  166. $rule ['writer'] = $data['writer'];
  167. }
  168. date_default_timezone_set('Asia/Shanghai');
  169. //若不存在,根据网站类型添加到不行类型的规则表中
  170. $result = Rule::insertGetId($rule);
  171. }else{
  172. return Result::error('此任务已存在!');
  173. }
  174. }
  175. return Result::success($result);
  176. }
  177. /**
  178. * 获取并搜索规则任务
  179. * @param array $data
  180. * @return array|mixed
  181. */
  182. public function getRule(array $data): array
  183. {
  184. $where = [];
  185. if(isset($data['web_id'])){
  186. $web = Web::where('id',$data['web_id'])->get();
  187. if(empty($web->toArray())){
  188. return Result::error('请输入正确的网站id!');
  189. }else{
  190. //若是根据网站跳转到的规则任务则存到$where数组中
  191. $where = [
  192. ['web_id','=', $data['web_id']]
  193. ];
  194. }
  195. }
  196. if(isset($data['keyWord'])){
  197. //若存在搜索词,则存到条件数组$where中
  198. $where = [
  199. ['name','like','%'.$data['keyWord'].'%']
  200. ];
  201. }
  202. if(empty($where)){
  203. $rep = Rule::withCount(relations:'arts')->limit($data['pageSize'])->orderBy("created_at","desc")->offset(($data['page']-1)*$data['pageSize'])->get();
  204. }else{
  205. $rep = Rule::withCount(relations:'arts')->where($where)->limit($data['pageSize'])->orderBy("created_at","desc")->offset(($data['page']-1)*$data['pageSize'])->get();
  206. }
  207. $count = Rule::where($where)->count();
  208. if($count==0){
  209. return Result::error('暂无相关规则任务!');
  210. }
  211. $data = [
  212. 'rep' => $rep->toArray(),
  213. 'count' => $count
  214. ];
  215. return Result::success($data);
  216. }
  217. /**
  218. * 获取某个任务规则
  219. * @param array $data
  220. * @return array|mixed
  221. */
  222. public function getOneRule(array $data): array
  223. {
  224. $result = Rule::where('id',$data['id'])->first();
  225. if(empty($result)){
  226. return Result::error('请输入正确的规则任务id!');
  227. }else{
  228. return Result::success($result);
  229. }
  230. }
  231. /**
  232. * 修改规则任务
  233. * @param array $data
  234. * @return array|mixed
  235. */
  236. public function upRule(array $data): array
  237. {
  238. $rule = Rule::where('id',$data['id'])->select('id')->first();
  239. unset($data['type']);
  240. if(empty($rule)){
  241. return Result::error('请输入正确的规则任务id!');
  242. }else{
  243. $rulename = Rule::where('id','!=',$rule['id'])->where('name',$data['name'])->select('name')->first();
  244. if(empty($rulename)){
  245. $result = Rule::where('id',$data['id'])->update($data);
  246. }else{
  247. return Result::error('已存在此任务规则名称!');
  248. }
  249. }
  250. return Result::success($result);
  251. }
  252. /**
  253. * 删除规则任务
  254. * @param array $data
  255. * @return array
  256. */
  257. public function sendCrawler(array $data): array
  258. {
  259. var_dump("接收到的数据:",$data);
  260. $message = new GatherProducer($data);
  261. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  262. $a = $producer->produce($message);
  263. var_dump("生产者:",$a);
  264. // $result = $this->Gservice->push($data,rand(5,20));
  265. return Result::success([]);
  266. }
  267. /**
  268. * @param array $data
  269. * @return array
  270. */
  271. public function goCrawler(array $data): array
  272. {
  273. //通过规则id 查询规则类型
  274. $where = [
  275. 'rule.id'=>$data['id']
  276. ];
  277. $info = Rule::where($where)->leftJoin('web','rule.web_id','web.id')
  278. ->select("rule.*","web.name as web_name","web.url as web_url","web.type as web_type")
  279. ->first();
  280. $info = $info->toArray();
  281. var_dump("规则信息:",$info);
  282. switch ($info['web_type']){
  283. case 1:
  284. var_dump("===========规则采集======",$info);
  285. try {
  286. Rule::where(['id'=>$data['id']])->update(['status'=>1]);
  287. //添加几个值
  288. $info['admin_user_id'] = $data['admin_user_id'];
  289. $info['rule_id'] = $data['id'];
  290. $info['copyfrom'] = $info['web_name'];
  291. $info['author'] = $info['writer'];
  292. // var_dump("++++++++++++++++++");
  293. $urlList = $this->addUrlArr($info);
  294. // var_dump("采集列表:",$urlList);
  295. if($urlList){
  296. foreach ($urlList as $val){
  297. $this->ruleCollection($val,$info);
  298. }
  299. }
  300. Rule::where(['id'=>$data['id']])->update(['status'=>2]);
  301. }catch (\Exception $e){
  302. var_dump("采集失败报错:",$e->getMessage());
  303. Rule::where(['id'=>$data['id']])->update(['status'=>2]);
  304. }
  305. break;
  306. case 2:
  307. Rule::where(['id'=>$data['id']])->update(['status'=>1]);
  308. $wecUrl = $info['first_url'];//'https://www.ndcpa.gov.cn/queryList';
  309. $parames = json_decode($info['parameter'],true);
  310. // var_dump($parames);die;
  311. $parames['webSiteCode'] = [trim($parames['webSiteCode'], "[]")]; //['jbkzzx'];//
  312. $parames['channelCode'] = [trim($parames['channelCode'], "[]")]; // ['c100008'];//
  313. $other = [
  314. 'web_url'=>$info['web_url'],
  315. 'copyfrom'=>$info['web_name'],
  316. 'admin_user_id'=>$data['admin_user_id'],
  317. 'rule_id'=>$data['id'],
  318. 'writer'=>$info['writer'],
  319. ];
  320. var_dump("=======开始接口采集====",$parames);
  321. // die;
  322. $this->foreachCurl($wecUrl,$parames,$other);
  323. Rule::where(['id'=>$data['id']])->update(['status'=>2]);
  324. }
  325. return Result::success([]);
  326. }
  327. /**
  328. * 把可采集的列表页连接 打包成一个大数组
  329. * @return void
  330. */
  331. public function addUrlArr($data)
  332. {
  333. $arrList = [];
  334. array_push($arrList,$data['first_url']);
  335. $exit = false;
  336. $i = 0;
  337. while(!$exit){
  338. $i++;
  339. $url = $data['second_start'].$i.$data['second_end'];
  340. $respon1 = Result::pageExists($url);
  341. // Coroutine::sleep(2);
  342. if ($i==intval($data['end_pagenum'])-1 || intval($data['end_pagenum'])-1==0) {
  343. $exit = true;
  344. // Coroutine::exit(); // 退出循环
  345. }else{
  346. array_push($arrList,$url);
  347. }
  348. }
  349. return $arrList;
  350. }
  351. /**
  352. * 按照规则采集数据
  353. * @return void
  354. */
  355. public function ruleCollection($url,$info)
  356. {
  357. // var_dump("采集参数:",$url,$info['start']);
  358. $list = QueryList::get($url);
  359. $dataList = $list->rules([
  360. 'title' => ['a:eq(0)', 'text'],
  361. 'link' => ['a:eq(0)', 'href'],
  362. ])->range($info['start'])->query()->getData();
  363. var_dump("采集的列表:",$dataList);
  364. $firstUrlArr = explode("/", $url);
  365. array_pop($firstUrlArr);
  366. $firstUrlArr = implode('/',$firstUrlArr);
  367. $dataList = $dataList->toArray();
  368. if($dataList){
  369. foreach ($dataList as $tiem){
  370. //检测采集的url是否存在网站域名 。存在就继续,不存在就检测是否是三方跳转
  371. $newUrlStr = $tiem['link'];
  372. if (strpos($tiem['link'], $info['web_url']) === false) {
  373. $array = ['http','https'];
  374. $link = $tiem['link'];
  375. $found = array_filter($array, function($item) use ($link) {
  376. return str_contains($link, $item);
  377. });
  378. if(count($found)>0){
  379. continue;
  380. }
  381. $newUrlStr = $info['con_url'].$tiem['link'];
  382. }
  383. var_dump("详情地址:",$newUrlStr);
  384. $detailContent = QueryList::get($newUrlStr);
  385. $rules = [];
  386. if($info['title']){
  387. $rules['title'] = [$info['title'],'text'];
  388. }
  389. if($info['content']){
  390. $rules['content'] = [$info['content'],'html'];
  391. }
  392. //详情页范围
  393. $detailRange = $info['con_start']??'';
  394. var_dump("打印规则:",$rules,"详情起始:", $info['con_start']);
  395. $detailData = $detailContent->rules($rules)->range($detailRange)->query()->getData();
  396. $detailData = $detailData->toArray();
  397. var_dump("内容详情:",$detailData,$newUrlStr);
  398. if($detailData){
  399. foreach ($detailData as $val){
  400. // var_dump("进没进foreach:",$newUrlStr,$val);
  401. $data = [];
  402. $data['fromurl'] = $newUrlStr;
  403. $data['title'] = $val['title'];
  404. $data['content'] = $val['content'];
  405. $data['newUrlStr'] = $newUrlStr;
  406. $data['introduce'] = $val['title']??'';
  407. $data['keyword'] = $val['title']??'';
  408. $data['copyfrom'] = $info['copyfrom'];
  409. $data['source'] = $info['source']??$info['copyfrom'];
  410. $data['admin_user_id'] = $info['admin_user_id']??'';
  411. $data['rule_id'] = $info['rule_id']??'';
  412. $data['author'] = $info['author']??'';
  413. $this->insertArticleData($data);
  414. }
  415. }
  416. }
  417. }
  418. }
  419. /**
  420. * 插入数据
  421. * @param $data
  422. * @return void
  423. */
  424. public function insertArticleData($data=[])
  425. {
  426. if($data){
  427. Db::beginTransaction();
  428. try{
  429. $articleInfo = Article::where(['title'=>$data['title']])->first();
  430. // var_dump("获取详情:",$articleInfo,$data);
  431. if(empty($articleInfo)){
  432. $insertData = [];
  433. $insertData['fromurl'] =$data['newUrlStr'];
  434. $insertData['oldtitle'] =$data['title'];
  435. $insertData['title'] = $data['title'];
  436. $insertData['copyfrom'] = $data['copyfrom'];
  437. $insertData['author'] = $data['author'];
  438. $insertData['introduce'] = $data['title'];
  439. $insertData['keyword'] = $data['title'];
  440. $insertData['source'] = isset($data['source']) && $data['source']!=''? $data['source']:$data['copyfrom'];
  441. $insertData['admin_user_id'] = $data['admin_user_id'];
  442. $insertData['rule_id'] = $data['rule_id'];
  443. // var_dump("插入Article:",$insertData);
  444. $article_id = Article::insertGetId($insertData);
  445. $insertDataDetail = [];
  446. $insertDataDetail['article_id'] = $article_id;
  447. $insertDataDetail['content'] = $data['content'];
  448. // var_dump("插入ArticleData:",$insertDataDetail);
  449. ArticleData::insertGetId($insertDataDetail);
  450. // Coroutine::sleep(2);
  451. // var_dump("插入成功一次:",$article_id,$insertDataDetail);
  452. }
  453. Db::commit();
  454. }catch (\Exception $e){
  455. Db::rollBack();
  456. var_dump("插入失败:",$e->getMessage());
  457. }
  458. }else{
  459. var_dump("没有数据可以插入:");
  460. }
  461. }
  462. /**
  463. * 分页采集
  464. * @return void
  465. */
  466. public function foreachCurl($wecUrl,$parames,$other,&$page=1)
  467. {
  468. $options = [
  469. CURLOPT_HEADER => true, // 如果想包含头部信息在响应中,可以设置为true
  470. CURLOPT_TIMEOUT => 30 // 设置请求超时时间为30秒
  471. ];
  472. $result = Result::http_post($wecUrl,$parames,$options);
  473. $result = json_decode($result['response'],true);
  474. // var_dump("获取数据:",$result);
  475. if($result['data'] && $result['data']['results']){
  476. $dataList = $result['data']['results'];
  477. // var_dump("取数据结构体:",$dataList);
  478. foreach ($dataList as $val){
  479. // var_dump("进入循环插入:",$val);
  480. $newUrlStr = json_decode($val['source']['urls'],true);
  481. $newUrlStr = $other['web_url'].$newUrlStr['common'];
  482. // var_dump("来源地址:",$newUrlStr);
  483. $insertData = [
  484. 'newUrlStr'=>$newUrlStr,
  485. 'title'=>$val['source']['title']??'',
  486. 'source'=>$val['source']['contentSource']??'',
  487. 'copyfrom'=>$other['copyfrom']??'',
  488. 'content'=>$val['source']['content']['content']??'',
  489. 'admin_user_id'=>$other['admin_user_id']??'',
  490. 'rule_id'=>$other['rule_id']??'',
  491. 'author'=>$other['writer']??''
  492. ];
  493. // var_dump("调用插入数据方法,组装数据:",$insertData);
  494. $this->insertArticleData($insertData);
  495. }
  496. }
  497. $pages = intval($parames['current']);
  498. $pages = $pages+1;
  499. $parames['current'] = $pages;
  500. $twoResult = Result::http_post($wecUrl,$parames,$options);
  501. if($result['data'] && $result['data']['results'] && count($result['data']['results'])>0){
  502. // var_dump("分页测试:",$parames,$parames['current']);
  503. $this->foreachCurl($wecUrl,$parames,$other,$pages);
  504. }
  505. // var_dump("正确的数据:",$result);
  506. }
  507. }