绑定完请刷新页面
取消
刷新

分享好友

×
取消 复制
分布式爬虫系统设计、实现与实战:爬取京东、苏宁易购全网手机商品数据+MySQL、HBase存储-2
2020-01-10 17:52:52

4.2 URL调度器

所谓url调度器,其实说白了就是url仓库java代码的调度策略,不过因为其核心在于调度,所以将其放到URL调度器中来进行说明,目前其调度基于以下接口开发:

/**
 * url 仓库
 * 主要功能:
 *      向仓库中添加url(高优先级的列表,低优先级的商品url)
 *      从仓库中获取url(优先获取高优先级的url,如果没有,再获取低优先级的url)
 *
 */
public interface IRepository {

    /**
     * 获取url的方法
     * 从仓库中获取url(优先获取高优先级的url,如果没有,再获取低优先级的url)
     * @return
     */
    public String poll();

    /**
     * 向高优先级列表中添加商品列表url
     * @param highUrl
     */
    public void offerHigher(String highUrl);

    /**
     * 向低优先级列表中添加商品url
     * @param lowUrl
     */
    public void offerLower(String lowUrl);

}

其基于Redis作为URL仓库的实现如下:

/**
 * 基于Redis的全网爬虫,随机获取爬虫url:
 *
 * Redis中用来保存url的数据结构如下:
 * 1.需要爬取的域名集合(存储数据类型为set,这个需要先在Redis中添加)
 *      key
 *          spider.website.domains
 *      value(set)
 *          jd.com  suning.com  gome.com
 *      key由常量对象SpiderConstants.SPIDER_WEBSITE_DOMAINS_KEY 获得
 * 2.各个域名所对应的高低优先url队列(存储数据类型为list,这个由爬虫程序解析种子url后动态添加)
 *      key
 *          jd.com.higher
 *          jd.com.lower
 *          suning.com.higher
 *          suning.com.lower
 *          gome.com.higher
 *          gome.come.lower
 *      value(list)
 *          相对应需要解析的url列表
 *      key由随机的域名 + 常量 SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX或者SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX获得
 * 3.种子url列表
 *      key
 *          spider.seed.urls
 *      value(list)
 *          需要爬取的数据的种子url
 *       key由常量SpiderConstants.SPIDER_SEED_URLS_KEY获得
 *
 *       种子url列表中的url会由url调度器定时向高低优先url队列中
 */
public class RandomRedisRepositoryImpl implements IRepository {

    /**
     * 构造方法
     */
    public RandomRedisRepositoryImpl() {
        init();
    }

    /**
     * 初始化方法,初始化时,先将redis中存在的高低优先级url队列全部删除
     * 否则上一次url队列中的url没有消耗完时,再停止启动跑下一次,就会导致url仓库中有重复的url
     */
    public void init() {
        Jedis jedis = JedisUtil.getJedis();
        Set<String> domains = jedis.smembers(SpiderConstants.SPIDER_WEBSITE_DOMAINS_KEY);
        String higherUrlKey;
        String lowerUrlKey;
        for(String domain : domains) {
            higherUrlKey = domain + SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX;
            lowerUrlKey = domain + SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX;
            jedis.del(higherUrlKey, lowerUrlKey);
        }
        JedisUtil.returnJedis(jedis);
    }

    /**
     * 从队列中获取url,目前的策略是:
     *      1.先从高优先级url队列中获取
     *      2.再从低优先级url队列中获取
     *  对应我们的实际场景,应该是先解析完列表url再解析商品url
     *  但是需要注意的是,在分布式多线程的环境下,肯定是不能完全保证的,因为在某个时刻高优先级url队列中
     *  的url消耗完了,但实际上程序还在解析下一个高优先级url,此时,其它线程去获取高优先级队列url肯定获取不到
     *  这时就会去获取低优先级队列中的url,在实际考虑分析时,这点尤其需要注意
     * @return
     */
    @Override
    public String poll() {
        // 从set中随机获取一个域名
        Jedis jedis = JedisUtil.getJedis();
        String randomDomain = jedis.srandmember(SpiderConstants.SPIDER_WEBSITE_DOMAINS_KEY);    // jd.com
        String key = randomDomain + SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX;                // jd.com.higher
        String url = jedis.lpop(key);
        if(url == null) {   // 如果为null,则从低优先级中获取
            key = randomDomain + SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX;    // jd.com.lower
            url = jedis.lpop(key);
        }
        JedisUtil.returnJedis(jedis);
        return url;
    }

    /**
     * 向高优先级url队列中添加url
     * @param highUrl
     */
    @Override
    public void offerHigher(String highUrl) {
        offerUrl(highUrl, SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX);
    }

    /**
     * 向低优先url队列中添加url
     * @param lowUrl
     */
    @Override
    public void offerLower(String lowUrl) {
        offerUrl(lowUrl, SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX);
    }

    /**
     * 添加url的通用方法,通过offerHigher和offerLower抽象而来
     * @param url   需要添加的url
     * @param url*uffix  url类型后缀.higher或.lower
     */
    public void offerUrl(String url, String url*uffix) {
        Jedis jedis = JedisUtil.getJedis();
        String domain = SpiderUtil.getTopDomain(url);   // 获取url对应的域名,如jd.com
        String key = domain + url*uffix;            // 拼接url队列的key,如jd.com.higher
        jedis.lpush(key, url);                          // 向url队列中添加url
        JedisUtil.returnJedis(jedis);
    }
}

通过代码分析也是可以知道,其核心就在如何调度url仓库(Redis)中的url。

4.3 URL定时器

一段时间后,高优先级URL队列和低优先URL队列中的url都会被消费完,为了让程序可以继续爬取数据,同时减少人为的干预,可以预先在Redis中插入种子url,之后定时让URL定时器从种子url中取出url定存放到高优先级URL队列中,以此达到程序定时不间断爬取数据的目的。

url消费完毕后,是否需要循环不断爬取数据根据个人业务需求而不同,因此这一步不是必需的,只是也提供了这样的操作。因为事实上,我们需要爬取的数据也是每隔一段时间就会更新的,如果希望我们爬取的数据也跟着定时更新,那么这时定时器就有非常重要的作用了。不过需要注意的是,一旦决定需要循环重复爬取数据,则在设计存储器实现时需要考虑重复数据的问题,即重复数据应该是更新操作,目前在我设计的存储器不包括这个功能,有兴趣的朋友可以自己实现,只需要在插入数据前判断数据库中是否存在该数据即可。

另外需要注意的一点是,URL定时器是一个独立的进程,需要单独启动。

定时器基于Quartz实现,下面是其job的代码:

/**
 * 每天定时从url仓库中获取种子url,添加进高优先级列表
 */
public class UrlJob implements Job {

    // log4j日志记录
    private Logger logger = LoggerFactory.getLogger(UrlJob.class);

    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        /**
         * 1.从指定url种子仓库获取种子url
         * 2.将种子url添加进高优先级列表
         */
        Jedis jedis = JedisUtil.getJedis();
        Set<String> seedUrls = jedis.smembers(SpiderConstants.SPIDER_SEED_URLS_KEY);  // spider.seed.urls Redis数据类型为set,防止重复添加种子url
        for(String seedUrl : seedUrls) {
            String domain = SpiderUtil.getTopDomain(seedUrl);   // 种子url的域名
            jedis.sadd(domain + SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX, seedUrl);
            logger.info("获取种子:{}", seedUrl);
        }
        JedisUtil.returnJedis(jedis);
//        System.out.println("Scheduler Job Test...");
    }

}

调度器的实现如下:


/**
 * url定时调度器,定时向url对应仓库中存放种子url
 *
 * 业务规定:每天凌晨1点10分向仓库中存放种子url
 */
public class UrlJobScheduler {

    public UrlJobScheduler() {
        init();
    }

    /**
     * 初始化调度器
     */
    public void init() {
        try {
            Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();

            // 如果没有以下start方法的执行,则是不会开启任务的调度
            scheduler.start();

            String name = "URL_SCHEDULER_JOB";
            String group = "URL_SCHEDULER_JOB_GROUP";
            JobDetail jobDetail = new JobDetail(name, group, UrlJob.class);
            String cronExpression = "0 10 1 * * ?";
            Trigger trigger = new CronTrigger(name, group, cronExpression);

            // 调度任务
            scheduler.scheduleJob(jobDetail, trigger);

        } catch (SchedulerException e) {
            e.printStackTrace();
        } catch (ParseException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        UrlJobScheduler urlJobScheduler = new UrlJobScheduler();
        urlJobScheduler.start();
    }

    /**
     * 定时调度任务
     * 因为我们每天要定时从指定的仓库中获取种子url,并存放到高优先级的url列表中
     * 所以是一个不间断的程序,所以不能停止
     */
    private void start() {
        while (true) {

        }
    }
}

5 监控报警系统

分布式爬虫系统设计、实现与实战:爬取京东、苏宁易购全网手机商品数据+MySQL、HBase存储

监控报警系统的加入主要是为了让使用者可以主动发现节点宕机,而不是被动地发现,因为实际中爬虫程序可能是持续不断运行的,并且我们会在多个节点上部署我们的爬虫程序,因此很有必要对节点进行监控,并且在节点出现问题时可以及时发现并修正,需要注意的是,监控报警系统是一个独立的进程,需要单独启动。

5.1 基本原理

首先需要先在zookeeper中创建一个/ispider节点:

[zk: localhost:2181(CONNECTED) 1] create /ispider ispider
Created /ispider

监控报警系统的开发主要依赖于zookeeper实现,监控程序对zookeeper下面的这个节点目录进行监听:

[zk: localhost:2181(CONNECTED) 0] ls /ispider
[]

爬虫程序启动时会在该节点目录下注册一个临时节点目录:

[zk: localhost:2181(CONNECTED) 0] ls /ispider
[192.168.43.166]

当节点出现宕机时,该临时节点目录就会被zookeeper删除

[zk: localhost:2181(CONNECTED) 0] ls /ispider
[]

同时因为我们监听了节点目录/ispider,所以当zookeeper删除其下的节点目录时(或增加一个节点目录),zookeeper会给我们的监控程序发送通知,即我们的监控程序会得到回调,这样便可以在回调程序中执行报警的系统动作,从而完成监控报警的功能。

5.2 zookeeper Java API使用说明

可以使用zookeeper原生的Java API,我在另外写的一个RPC框架(底层基于Netty实现远程通信)中就是使用原生的API,不过显然代码会复杂很多,并且本身需要对zookeeper有更多的学习和了解,这样用起来才会容易一些。

所以为了降低开发的难度,这里使用第三方封装的API,即curator,来进行zookeeper客户端程序的开发。

5.3 爬虫系统zookeeper注册

在启动爬虫系统时,我们的程序都会启动一个zookeeper客户端来向zookeeper来注册自身的节点信息,主要是ip地址,并在/ispider节点目录以创建一个以该爬虫程序所在的节点IP地址命名的节点,如/ispider/192.168.43.116,实现的代码如下:

/**
 * 注册zk
 */
private void registerZK() {
    String zkStr = "uplooking01:2181,uplooking02:2181,uplooking03:2181";
    int baseSleepTimeMs = 1000;
    int maxRetries = 3;
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
    CuratorFramework curator = CuratorFrameworkFactory.newClient(zkStr, retryPolicy);
    curator.start();
    String ip = null;
    try {
        // 向zk的具体目录注册 写节点 创建节点
        ip = InetAddress.getLocalHost().getHostAddress();
        curator.create().withMode(CreateMode.EPHEMERAL).forPath("/ispider/" + ip, ip.getBytes());
    } catch (UnknownHostException e) {
        e.printStackTrace();
    } catch (Exception e) {
        e.printStackTrace();
    }
}
分享好友

分享这个小栈给你的朋友们,一起进步吧。

运维部落
创建时间:2019-09-15 22:54:27
关于运维,你想知道的,这里都有
展开
订阅须知

• 所有用户可根据关注领域订阅专区或所有专区

• 付费订阅:虚拟交易,一经交易不退款;若特殊情况,可3日内客服咨询

• 专区发布评论属默认订阅所评论专区(除付费小栈外)

栈主、嘉宾

查看更多
  • stanleylst
    栈主

小栈成员

查看更多
  • 小尾巴鱼
  • Cyj_me
  • lpysky
  • 栈栈
戳我,来吐槽~