CollectorService.php 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. <?php
  2. namespace App\JsonRpc;
  3. use App\Model\ArticleData;
  4. use App\Model\OldModel\Article as OldArticle;
  5. use App\Model\Article;
  6. use App\Model\Rule;
  7. use App\Model\Web;
  8. use Hyperf\DbConnection\Db;
  9. use Hyperf\Di\Annotation\Inject;
  10. use Hyperf\RpcServer\Annotation\RpcService;
  11. use App\Tools\Result;
  12. use QL\QueryList;
  13. use Swoole\Coroutine;
  14. use App\Service\GatherQueueService;
  15. #[RpcService(name: "CollectorService", protocol: "jsonrpc-http", server: "jsonrpc-http")]
  16. class CollectorService implements CollectorServiceInterface
  17. {
  18. #[Inject]
  19. protected GatherQueueService $Gservice;
  20. /**
  21. * 添加网站
  22. * @param array $data
  23. * @return array|mixed
  24. */
  25. public function addWeb(array $data): array
  26. {
  27. $where = [
  28. 'name' => $data['name']
  29. ];
  30. $isweb = Web::where($where)->first();
  31. if(empty($isweb)){
  32. date_default_timezone_set('Asia/Shanghai');
  33. $time = time();
  34. $catetime = date('Y-m-d H:i:s', $time);
  35. $data['created_at'] = $catetime;
  36. $web = Web::insert($data);
  37. }else{
  38. return Result::error('此网站已存在,不可重复添加!');
  39. }
  40. if(empty($web)){
  41. return Result::error('添加失败');
  42. }
  43. return Result::success('添加成功');
  44. }
  45. /**
  46. * 获取并搜索网站
  47. * @param array $data
  48. * @return array|mixed
  49. */
  50. public function getWeb(array $data): array
  51. {
  52. if(isset($data['keyWord'])){
  53. $where = [
  54. ['name','like','%'.$data['keyWord'].'%']
  55. ];
  56. $webss = Web::where($where)->first();
  57. if(empty($webss)){
  58. return Result::error('未查找到相关网站!');
  59. }
  60. }else{
  61. $web = Web::get();
  62. }
  63. if(empty($web)){
  64. return Result::error('您还未添加网站,请先去添加!');
  65. }
  66. return Result::success($web);
  67. }
  68. /**
  69. * 修改网站
  70. * @param array $data
  71. * @return array|mixed
  72. */
  73. public function upWeb(array $data): array
  74. {
  75. $web = Web::where('id',$data['id'])->first();
  76. if(empty($web)){
  77. return Result::error('请输入正确的网站id!');
  78. }else{
  79. $id = Web::where('id',$data['id'])->update($data);
  80. if(empty($id)){
  81. return Result::error('无法修改!');
  82. }
  83. }
  84. return Result::success($id);
  85. }
  86. /**
  87. * 删除网站
  88. * @param array $data
  89. * @return array|mixed
  90. */
  91. public function delWeb(array $data): array
  92. {
  93. $web = Web::where('id',$data['id'])->first();
  94. if(empty($web)){
  95. return Result::error('请输入正确的网站id!');
  96. }else{
  97. $id = Web::where('id',$data['id'])->delete();
  98. if(empty($id)){
  99. return Result::error('无法删除!');
  100. }
  101. }
  102. return Result::success($id);
  103. }
  104. /**
  105. * 发送数据
  106. * @param array $data
  107. * @return array
  108. */
  109. public function sendCrawler(array $data): array
  110. {
  111. $result = $this->Gservice->push($data,rand(5,20));
  112. return Result::success([$result]);
  113. }
  114. /**
  115. * @param array $data
  116. * @return array
  117. */
  118. public function goCrawler(array $data): array
  119. {
  120. //通过规则id 查询规则类型
  121. $where = [
  122. 'rule.id'=>$data['id']
  123. ];
  124. $info = Rule::where($where)->leftJoin('web','rule.web_id','web.id')
  125. ->select("rule.*","web.name as web_name","web.url as web_url","web.type as web_type")
  126. ->first();
  127. $info = $info->toArray();
  128. switch ($info['web_type']){
  129. case 1:
  130. var_dump("===========规则采集======",$info);
  131. Rule::where(['id'=>$data['id']])->update(['status'=>1]);
  132. $data['copyfrom'] = $info['web_name'];
  133. $data['author'] = $info['writer'];;
  134. $data['first_url'] = $info['first_url'];
  135. $data['second_start'] = $info['second_start'];
  136. $data['second_num'] = $info['second_num'];
  137. $data['second_end'] = $info['second_end'];
  138. $data['end_pagenum']= $info['end_pagenum'];
  139. $data['rule_id']= $data['id'];
  140. $data['admin_user_id']= $data['admin_user_id'];
  141. // $data['newUrlStr'] =
  142. $urlList = $this->addUrlArr($data);
  143. if($urlList){
  144. foreach ($urlList as $val){
  145. // var_dump("单列表地址:",$val);
  146. $this->ruleCollection($val,$data);
  147. }
  148. }
  149. Rule::where(['id'=>$data['id']])->update(['status'=>2]);
  150. break;
  151. case 2:
  152. Rule::where(['id'=>$data['id']])->update(['status'=>1]);
  153. $wecUrl = $info['first_url'];//'https://www.ndcpa.gov.cn/queryList';
  154. $parames = json_decode($info['parameter'],true);
  155. // var_dump($parames);die;
  156. $parames['webSiteCode'] = [trim($parames['webSiteCode'], "[]")]; //['jbkzzx'];//
  157. $parames['channelCode'] = [trim($parames['channelCode'], "[]")]; // ['c100008'];//
  158. $other = [
  159. 'web_url'=>$info['web_url'],
  160. 'copyfrom'=>$info['web_name'],
  161. 'admin_user_id'=>$data['admin_user_id'],
  162. 'rule_id'=>$data['id'],
  163. 'writer'=>$info['writer'],
  164. ];
  165. var_dump("=======开始接口采集====",$parames);
  166. // die;
  167. $this->foreachCurl($wecUrl,$parames,$other);
  168. Rule::where(['id'=>$data['id']])->update(['status'=>2]);
  169. }
  170. return Result::success([]);
  171. }
  172. /**
  173. * 把可采集的列表页连接 打包成一个大数组
  174. * @return void
  175. */
  176. public function addUrlArr($data)
  177. {
  178. $arrList = [];
  179. array_push($arrList,$data['first_url']);
  180. $exit = false;
  181. $i = 0;
  182. while(!$exit){
  183. $i++;
  184. $url = $data['second_start'].$i.$data['second_end'];
  185. $respon1 = Result::pageExists($url);
  186. // var_dump("采集地址:",$respon1,$url);
  187. // Coroutine::sleep(2);
  188. if ($i==intval($data['end_pagenum'])-1) {
  189. $exit = true;
  190. // Coroutine::exit(); // 退出循环
  191. }else{
  192. array_push($arrList,$url);
  193. }
  194. }
  195. return $arrList;
  196. }
  197. /**
  198. * 按照规则采集数据
  199. * @return void
  200. */
  201. public function ruleCollection($url,$data)
  202. {
  203. // var_dump("采集参数:",$data);
  204. $list = QueryList::get($url);
  205. $dataList = $list->rules([
  206. 'title' => ['a', 'text'],
  207. 'link' => ['a', 'href'],
  208. ])->range('.list1 li')->query()->getData();
  209. // var_dump("采集的内容:",$dataList);
  210. // var_dump("====",$dataList);die;
  211. $firstUrlArr = explode("/", $url);
  212. array_pop($firstUrlArr);
  213. $firstUrlArr = implode('/',$firstUrlArr);
  214. $dataList = $dataList->toArray();
  215. if($dataList){
  216. foreach ($dataList as $tiem){
  217. $newUrl = substr($tiem['link'], 1);
  218. $newUrlStr = $firstUrlArr.$newUrl;
  219. $detailContent = QueryList::get($newUrlStr);
  220. $detailData = $detailContent->rules([
  221. 'title'=>['h1','text'],
  222. 'content'=>['.TRS_UEDITOR','html'],
  223. ])->range(".news-details")->query()->getData();
  224. $detailData = $detailData->toArray();
  225. // var_dump("内容详情:",$detailData,$newUrlStr);
  226. if($detailData){
  227. foreach ($detailData as $val){
  228. // var_dump("进没进foreach:",$newUrlStr,$val);
  229. $data['fromurl'] = $newUrlStr;
  230. $data['title'] = $val['title'];
  231. $data['content'] = $val['content'];
  232. $data['newUrlStr'] = $newUrlStr;
  233. $data['source'] = '';
  234. $data['introduce'] = $val['title']??'';
  235. $data['keyword'] = $val['title']??'';
  236. $data['copyfrom'] = $data['copyfrom'];
  237. $data['source'] = $data['source']??$data['copyfrom'];
  238. $data['admin_user_id'] = $data['admin_user_id']??'';
  239. $data['rule_id'] = $data['rule_id']??'';
  240. // $data['copyfrom'] = $data['copyfrom'];
  241. // var_dump("要插入的数据:",$data);
  242. $this->insertArticleData($data);
  243. }
  244. }
  245. }
  246. //
  247. }
  248. }
  249. /**
  250. * 插入数据
  251. * @param $data
  252. * @return void
  253. */
  254. public function insertArticleData($data=[])
  255. {
  256. if($data){
  257. Db::beginTransaction();
  258. try{
  259. $articleInfo = Article::where(['title'=>$data['title']])->first();
  260. // var_dump("获取详情:",$articleInfo,$data);
  261. if(empty($articleInfo)){
  262. $insertData = [];
  263. $insertData['fromurl'] =$data['newUrlStr'];
  264. $insertData['oldtitle'] =$data['title'];
  265. $insertData['title'] = $data['title'];
  266. $insertData['copyfrom'] = $data['copyfrom'];
  267. $insertData['author'] = $data['author'];
  268. $insertData['introduce'] = $data['title'];
  269. $insertData['keyword'] = $data['title'];
  270. $insertData['source'] = isset($data['source']) && $data['source']!=''? $data['source']:$data['copyfrom'];
  271. $insertData['admin_user_id'] = $data['admin_user_id'];
  272. $insertData['rule_id'] = $data['rule_id'];
  273. // var_dump("插入Article:",$insertData);
  274. $article_id = Article::insertGetId($insertData);
  275. $insertDataDetail = [];
  276. $insertDataDetail['article_id'] = $article_id;
  277. $insertDataDetail['content'] = $data['content'];
  278. // var_dump("插入ArticleData:",$insertDataDetail);
  279. ArticleData::insertGetId($insertDataDetail);
  280. // Coroutine::sleep(2);
  281. // var_dump("插入成功一次:",$article_id,$insertDataDetail);
  282. }
  283. Db::commit();
  284. }catch (\Exception $e){
  285. Db::rollBack();
  286. var_dump("插入失败:",$e->getMessage());
  287. }
  288. }else{
  289. var_dump("没有数据可以插入:");
  290. }
  291. }
  292. /**
  293. * 分页采集
  294. * @return void
  295. */
  296. public function foreachCurl($wecUrl,$parames,$other,&$page=1)
  297. {
  298. $options = [
  299. CURLOPT_HEADER => true, // 如果想包含头部信息在响应中,可以设置为true
  300. CURLOPT_TIMEOUT => 30 // 设置请求超时时间为30秒
  301. ];
  302. $result = Result::http_post($wecUrl,$parames,$options);
  303. $result = json_decode($result['response'],true);
  304. // var_dump("获取数据:",$result);
  305. if($result['data'] && $result['data']['results']){
  306. $dataList = $result['data']['results'];
  307. // var_dump("取数据结构体:",$dataList);
  308. foreach ($dataList as $val){
  309. // var_dump("进入循环插入:",$val);
  310. $newUrlStr = json_decode($val['source']['urls'],true);
  311. $newUrlStr = $other['web_url'].$newUrlStr['common'];
  312. // var_dump("来源地址:",$newUrlStr);
  313. $insertData = [
  314. 'newUrlStr'=>$newUrlStr,
  315. 'title'=>$val['source']['title']??'',
  316. 'source'=>$val['source']['contentSource']??'',
  317. 'copyfrom'=>$other['copyfrom']??'',
  318. 'content'=>$val['source']['content']['content']??'',
  319. 'admin_user_id'=>$other['admin_user_id']??'',
  320. 'rule_id'=>$other['rule_id']??'',
  321. 'author'=>$other['writer']??''
  322. ];
  323. // var_dump("调用插入数据方法,组装数据:",$insertData);
  324. $this->insertArticleData($insertData);
  325. }
  326. }
  327. $pages = intval($parames['current']);
  328. $pages = $pages+1;
  329. $parames['current'] = $pages;
  330. $twoResult = Result::http_post($wecUrl,$parames,$options);
  331. if($result['data'] && $result['data']['results'] && count($result['data']['results'])>0){
  332. // var_dump("分页测试:",$parames,$parames['current']);
  333. $this->foreachCurl($wecUrl,$parames,$other,$pages);
  334. }
  335. // var_dump("正确的数据:",$result);
  336. }
  337. }