4.2 URL调度器
所谓url调度器,其实说白了就是url仓库java代码的调度策略,不过因为其核心在于调度,所以将其放到URL调度器中来进行说明,目前其调度基于以下接口开发:
/**
* url 仓库
* 主要功能:
* 向仓库中添加url(高优先级的列表,低优先级的商品url)
* 从仓库中获取url(优先获取高优先级的url,如果没有,再获取低优先级的url)
*
*/
public interface IRepository {
/**
* 获取url的方法
* 从仓库中获取url(优先获取高优先级的url,如果没有,再获取低优先级的url)
* @return
*/
public String poll();
/**
* 向高优先级列表中添加商品列表url
* @param highUrl
*/
public void offerHigher(String highUrl);
/**
* 向低优先级列表中添加商品url
* @param lowUrl
*/
public void offerLower(String lowUrl);
}
其基于Redis作为URL仓库的实现如下:
/**
* 基于Redis的全网爬虫,随机获取爬虫url:
*
* Redis中用来保存url的数据结构如下:
* 1.需要爬取的域名集合(存储数据类型为set,这个需要先在Redis中添加)
* key
* spider.website.domains
* value(set)
* jd.com suning.com gome.com
* key由常量对象SpiderConstants.SPIDER_WEBSITE_DOMAINS_KEY 获得
* 2.各个域名所对应的高低优先url队列(存储数据类型为list,这个由爬虫程序解析种子url后动态添加)
* key
* jd.com.higher
* jd.com.lower
* suning.com.higher
* suning.com.lower
* gome.com.higher
* gome.come.lower
* value(list)
* 相对应需要解析的url列表
* key由随机的域名 + 常量 SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX或者SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX获得
* 3.种子url列表
* key
* spider.seed.urls
* value(list)
* 需要爬取的数据的种子url
* key由常量SpiderConstants.SPIDER_SEED_URLS_KEY获得
*
* 种子url列表中的url会由url调度器定时向高低优先url队列中
*/
public class RandomRedisRepositoryImpl implements IRepository {
/**
* 构造方法
*/
public RandomRedisRepositoryImpl() {
init();
}
/**
* 初始化方法,初始化时,先将redis中存在的高低优先级url队列全部删除
* 否则上一次url队列中的url没有消耗完时,再停止启动跑下一次,就会导致url仓库中有重复的url
*/
public void init() {
Jedis jedis = JedisUtil.getJedis();
Set<String> domains = jedis.smembers(SpiderConstants.SPIDER_WEBSITE_DOMAINS_KEY);
String higherUrlKey;
String lowerUrlKey;
for(String domain : domains) {
higherUrlKey = domain + SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX;
lowerUrlKey = domain + SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX;
jedis.del(higherUrlKey, lowerUrlKey);
}
JedisUtil.returnJedis(jedis);
}
/**
* 从队列中获取url,目前的策略是:
* 1.先从高优先级url队列中获取
* 2.再从低优先级url队列中获取
* 对应我们的实际场景,应该是先解析完列表url再解析商品url
* 但是需要注意的是,在分布式多线程的环境下,肯定是不能完全保证的,因为在某个时刻高优先级url队列中
* 的url消耗完了,但实际上程序还在解析下一个高优先级url,此时,其它线程去获取高优先级队列url肯定获取不到
* 这时就会去获取低优先级队列中的url,在实际考虑分析时,这点尤其需要注意
* @return
*/
@Override
public String poll() {
// 从set中随机获取一个域名
Jedis jedis = JedisUtil.getJedis();
String randomDomain = jedis.srandmember(SpiderConstants.SPIDER_WEBSITE_DOMAINS_KEY); // jd.com
String key = randomDomain + SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX; // jd.com.higher
String url = jedis.lpop(key);
if(url == null) { // 如果为null,则从低优先级中获取
key = randomDomain + SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX; // jd.com.lower
url = jedis.lpop(key);
}
JedisUtil.returnJedis(jedis);
return url;
}
/**
* 向高优先级url队列中添加url
* @param highUrl
*/
@Override
public void offerHigher(String highUrl) {
offerUrl(highUrl, SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX);
}
/**
* 向低优先url队列中添加url
* @param lowUrl
*/
@Override
public void offerLower(String lowUrl) {
offerUrl(lowUrl, SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX);
}
/**
* 添加url的通用方法,通过offerHigher和offerLower抽象而来
* @param url 需要添加的url
* @param url*uffix url类型后缀.higher或.lower
*/
public void offerUrl(String url, String url*uffix) {
Jedis jedis = JedisUtil.getJedis();
String domain = SpiderUtil.getTopDomain(url); // 获取url对应的域名,如jd.com
String key = domain + url*uffix; // 拼接url队列的key,如jd.com.higher
jedis.lpush(key, url); // 向url队列中添加url
JedisUtil.returnJedis(jedis);
}
}
通过代码分析也是可以知道,其核心就在如何调度url仓库(Redis)中的url。
4.3 URL定时器
一段时间后,高优先级URL队列和低优先URL队列中的url都会被消费完,为了让程序可以继续爬取数据,同时减少人为的干预,可以预先在Redis中插入种子url,之后定时让URL定时器从种子url中取出url定存放到高优先级URL队列中,以此达到程序定时不间断爬取数据的目的。
url消费完毕后,是否需要循环不断爬取数据根据个人业务需求而不同,因此这一步不是必需的,只是也提供了这样的操作。因为事实上,我们需要爬取的数据也是每隔一段时间就会更新的,如果希望我们爬取的数据也跟着定时更新,那么这时定时器就有非常重要的作用了。不过需要注意的是,一旦决定需要循环重复爬取数据,则在设计存储器实现时需要考虑重复数据的问题,即重复数据应该是更新操作,目前在我设计的存储器不包括这个功能,有兴趣的朋友可以自己实现,只需要在插入数据前判断数据库中是否存在该数据即可。
另外需要注意的一点是,URL定时器是一个独立的进程,需要单独启动。
定时器基于Quartz实现,下面是其job的代码:
/**
* 每天定时从url仓库中获取种子url,添加进高优先级列表
*/
public class UrlJob implements Job {
// log4j日志记录
private Logger logger = LoggerFactory.getLogger(UrlJob.class);
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
/**
* 1.从指定url种子仓库获取种子url
* 2.将种子url添加进高优先级列表
*/
Jedis jedis = JedisUtil.getJedis();
Set<String> seedUrls = jedis.smembers(SpiderConstants.SPIDER_SEED_URLS_KEY); // spider.seed.urls Redis数据类型为set,防止重复添加种子url
for(String seedUrl : seedUrls) {
String domain = SpiderUtil.getTopDomain(seedUrl); // 种子url的域名
jedis.sadd(domain + SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX, seedUrl);
logger.info("获取种子:{}", seedUrl);
}
JedisUtil.returnJedis(jedis);
// System.out.println("Scheduler Job Test...");
}
}
调度器的实现如下:
/**
* url定时调度器,定时向url对应仓库中存放种子url
*
* 业务规定:每天凌晨1点10分向仓库中存放种子url
*/
public class UrlJobScheduler {
public UrlJobScheduler() {
init();
}
/**
* 初始化调度器
*/
public void init() {
try {
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
// 如果没有以下start方法的执行,则是不会开启任务的调度
scheduler.start();
String name = "URL_SCHEDULER_JOB";
String group = "URL_SCHEDULER_JOB_GROUP";
JobDetail jobDetail = new JobDetail(name, group, UrlJob.class);
String cronExpression = "0 10 1 * * ?";
Trigger trigger = new CronTrigger(name, group, cronExpression);
// 调度任务
scheduler.scheduleJob(jobDetail, trigger);
} catch (SchedulerException e) {
e.printStackTrace();
} catch (ParseException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
UrlJobScheduler urlJobScheduler = new UrlJobScheduler();
urlJobScheduler.start();
}
/**
* 定时调度任务
* 因为我们每天要定时从指定的仓库中获取种子url,并存放到高优先级的url列表中
* 所以是一个不间断的程序,所以不能停止
*/
private void start() {
while (true) {
}
}
}
5 监控报警系统
监控报警系统的加入主要是为了让使用者可以主动发现节点宕机,而不是被动地发现,因为实际中爬虫程序可能是持续不断运行的,并且我们会在多个节点上部署我们的爬虫程序,因此很有必要对节点进行监控,并且在节点出现问题时可以及时发现并修正,需要注意的是,监控报警系统是一个独立的进程,需要单独启动。
5.1 基本原理
首先需要先在zookeeper中创建一个/ispider
节点:
[zk: localhost:2181(CONNECTED) 1] create /ispider ispider
Created /ispider
监控报警系统的开发主要依赖于zookeeper实现,监控程序对zookeeper下面的这个节点目录进行监听:
[zk: localhost:2181(CONNECTED) 0] ls /ispider
[]
爬虫程序启动时会在该节点目录下注册一个临时节点目录:
[zk: localhost:2181(CONNECTED) 0] ls /ispider
[192.168.43.166]
当节点出现宕机时,该临时节点目录就会被zookeeper删除
[zk: localhost:2181(CONNECTED) 0] ls /ispider
[]
同时因为我们监听了节点目录/ispider
,所以当zookeeper删除其下的节点目录时(或增加一个节点目录),zookeeper会给我们的监控程序发送通知,即我们的监控程序会得到回调,这样便可以在回调程序中执行报警的系统动作,从而完成监控报警的功能。
5.2 zookeeper Java API使用说明
可以使用zookeeper原生的Java API,我在另外写的一个RPC框架(底层基于Netty实现远程通信)中就是使用原生的API,不过显然代码会复杂很多,并且本身需要对zookeeper有更多的学习和了解,这样用起来才会容易一些。
所以为了降低开发的难度,这里使用第三方封装的API,即curator,来进行zookeeper客户端程序的开发。
5.3 爬虫系统zookeeper注册
在启动爬虫系统时,我们的程序都会启动一个zookeeper客户端来向zookeeper来注册自身的节点信息,主要是ip地址,并在/ispider
节点目录以创建一个以该爬虫程序所在的节点IP地址命名的节点,如/ispider/192.168.43.116
,实现的代码如下:
/**
* 注册zk
*/
private void registerZK() {
String zkStr = "uplooking01:2181,uplooking02:2181,uplooking03:2181";
int baseSleepTimeMs = 1000;
int maxRetries = 3;
RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
CuratorFramework curator = CuratorFrameworkFactory.newClient(zkStr, retryPolicy);
curator.start();
String ip = null;
try {
// 向zk的具体目录注册 写节点 创建节点
ip = InetAddress.getLocalHost().getHostAddress();
curator.create().withMode(CreateMode.EPHEMERAL).forPath("/ispider/" + ip, ip.getBytes());
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}