Python 版本是3.7.4
前面的文章记录了网络请求(urllib,requests)、数据提取(beautiful,xpath,正则)、数据存储(json,csv)的学习,下面进行一个多线程的学习。
多线程爬虫
有些时候,比如下载图片,因为下载图片是一个耗时的操作,如果采用之前那种同步的方式下载,那效率会特别慢。这时候我们就可以考虑使用多线程的方式来下载图片。
多线程介绍
多线程是为了同步完成多项任务,通过提高资源使用来提高系统的效率,线程是在同一时间需要完成多项任务的时候是西纳的,简单的比喻多线程就像火车的每一节车厢,二进程就是火车。车厢离开火车是无法跑动的,同理火车可以有多节车厢,多线程的出现是为了提高效率,同时他的出现也带来一些问题。
简单来讲,多线程就相当于你原来开了一个窗口爬取,限制开了十个窗口来爬取。
threading模块介绍
threading模块是Python中专门提供用来做多线程的模块。threading模块中常用的类是Thread。下面一个简单的多线程程序:
# 引入所需库
import threading
import time
def coding():
"""
coding函数
:return:
"""
for x in range(5):
print('%s 号程序员正则写代码...' % x)
time.sleep(1)
def drawing():
"""
drawing函数
:return:
"""
for x in range(5):
print('%s 号设计师正在设计图片...' % x)
time.sleep(1)
def single_thread():
"""
单线程执行
:return:
"""
coding()
drawing()
def multi_thread():
"""
多线程执行
:return:
"""
# 创建线程
# 注意:target参数是函数名,不能带括号
t1 = threading.Thread(target=coding, name='coding')
t2 = threading.Thread(target=drawing, name='drawing')
# 启动线程
t1.start()
t2.start()
if __name__ == '__main__':
# single_thread()
multi_thread()
查看线程数:
num = threading.enumerate()
print(num)
查看当前进程名字:
threading.current_thread()
Thread类的使用
为了让线程代码更好的封装,可以使用threading模块下的Thread类,继承自这个类,然后实现run()方法,线程就会自动运行run()方法中的代码,示例代码如下:
# 引入所需库
import threading
import time
class CodingThread(threading.Thread):
"""
写程序进程类
"""
def run(self):
for x in range(5):
print('%s 号程序员正则写代码...' % threading.current_thread())
time.sleep(1)
class DrawingThread(threading.Thread):
"""
设计进程类
"""
def run(self):
for x in range(5):
print('%s 号设计师正在设计图片...' % threading.current_thread())
time.sleep(1)
def multi_thread():
t1 = CodingThread()
t2 = DrawingThread()
t1.start()
t2.start()
if __name__ == '__main__':
multi_thread()
多线程共享全局变量问题
多线程都是在同一个进程中运行的,因此在进程中的全局变量所有的线程都是可以共享的。这就造就了一个问题,因为线程执行的顺序是无序的,有可能会造成数据错误。例如如下代码:
# 引入threading库
import threading
# 定义全局变量
VALUE = 0
def add_value():
"""
增加数值
:return:
"""
global VALUE
for x in range(1000000):
VALUE += 1
print(VALUE)
def main():
for x in range(2):
t = threading.Thread(target=add_value)
t.start()
if __name__ == '__main__':
main()
以上的代码结果正常来讲应该是:
1000000
2000000
但是由于多线程运行的不确定性,因此结果可能是随机的。
锁机制
为了解决上述问题由于多线程运行的不确定性,threading库增加了Lock类锁机制进行处理,当某个线程对全局变量进行修改时则将此变量加锁不允许其他线程进行修改,知道当前线程修改完这个变量之后再进行解锁释放,之后其他线程才可进行修改,这就保证了数据的安全性。修改上述代码如下:
# 引入threading库
import threading
# 定义全局变量
VALUE = 0
# 创建锁
gLock = threading.Lock()
def add_value():
"""
增加数值
:return:
"""
global VALUE
# 加锁
gLock.acquire()
for x in range(1000000):
VALUE += 1
# 解锁
gLock.release()
print(VALUE)
def main():
for x in range(2):
t = threading.Thread(target=add_value)
t.start()
if __name__ == '__main__':
main()
Lock版生产者和消费者模式
生产者和消费者模式时多线程开发中的经常见到的一种模式。生产者的线程专门用来生产一些数据,然后存放到一个中间的变量中。消费者再从这个中间的变量中取出数据进行消费,但是因为要使用中间变量,中间变量经常是一些全局变量,因此需要使用锁来保证数据的完整性。以下是使用threading.Lock()锁实现“生产者与消费者模式”的一个例子:
# 引入所需库
import random
import threading
import time
gMoney = 1000
gTimes = 0
# 定义锁
gLock = threading.Lock()
class Producer(threading.Thread):
"""
生产者
"""
def run(self):
global gMoney
global gTimes
while True:
money = random.randint(100, 1000)
gLock.acquire()
# 仅允许生产10次
if gTimes >= 10:
gLock.release()
break
gMoney += money
print('%s生产了%d元钱,剩余%d元钱' % (threading.current_thread(), money, gMoney))
gTimes += 1
gLock.release()
time.sleep(0.5)
class Consumer(threading.Thread):
"""
消费者
"""
def run(self):
global gMoney
while True:
money = random.randint(100, 1000)
gLock.acquire()
if gMoney >= money:
gMoney -= money
print('%s消费了%d元钱,剩余%d元钱' % (threading.current_thread(), money, gMoney))
else:
if gTimes >= 10:
gLock.release()
break
print("%s消费者消费钱不够,不消费" % threading.current_thread())
gLock.release()
time.sleep(0.5)
def main():
# 定义三个消费者
for x in range(3):
t = Consumer(name='消费者线程%d' % x)
t.start()
# 定义五个生产者
for x in range(5):
t = Producer(name='生产者线程%d' % x)
t.start()
if __name__ == '__main__':
main()
Condition版生产者与消费者模式
Lock()版的生产者与消费者模式可以正常的运行,但是存在一个不足,在消费者中,总是通过while True死循环并且上锁的方法去判断钱够不够,上锁是一个很好CPU资源的行为。因此这种方式不是好的,还有一种更好的方式便是使用threading.Condition来实现。
threading.Condition可以在没有数据的时候处于阻塞等等状态。一旦有合适的数据了,还可以使用notify相关的函数来通知其他处于等待状态的线程,这样就可以不用做一些无用的上锁和解锁的操作,可以提高程序的性能。
首先对threading.Condition相关的函数做个介绍,threading.Condition类似threading.Lock,可以在修改全部数据的时候进行上锁,也可以在修改完毕后进行解锁。以下将一些常用的函数做个简单的介绍:
acquire : 上锁
release : 解锁
wait : 将当前线程处于等待状态,并且会释放锁。可以被其他线程使用notify和notify_all函数唤醒,被唤醒后会继续等待上锁,上锁后继续执行后续的代码
notify : 通知某个正在等待的线程,默认是第1个等待的线程
notify_all : 通知所有正在等待的线程。notify和notify_all不会释放锁,并且需要在release之前掉用
Condition版生产者与消费者模式示例代码如下:
# 引入所需库
import random
import threading
import time
gMoney = 1000
gTimes = 0
# 定义Condition
gCondition = threading.Condition()
class Producer(threading.Thread):
"""
生产者
"""
def run(self):
global gMoney
global gTimes
while True:
money = random.randint(100, 1000)
gCondition.acquire()
# 仅允许生产10次
if gTimes >= 10:
gCondition.release()
break
gMoney += money
print('%s生产了%d元钱,剩余%d元钱' % (threading.current_thread(), money, gMoney))
gTimes += 1
gCondition.notify_all()
gCondition.release()
time.sleep(0.5)
class Consumer(threading.Thread):
"""
消费者
"""
def run(self):
global gMoney
while True:
money = random.randint(100, 1000)
gCondition.acquire()
while gMoney < money:
if gTimes >= 10:
gCondition.release()
return
print("%s消费者消费钱不够,不消费" % threading.current_thread())
gCondition.wait()
gMoney -= money
print('%s消费了%d元钱,剩余%d元钱' % (threading.current_thread(), money, gMoney))
gCondition.release()
time.sleep(0.5)
def main():
# 定义三个消费者
for x in range(3):
t = Consumer(name='消费者线程%d' % x)
t.start()
# 定义五个生产者
for x in range(5):
t = Producer(name='生产者线程%d' % x)
t.start()
if __name__ == '__main__':
main()
Queue线程安全队列
在线程中,访问一些全局变量,加锁是一个经常的过程,如果你想把一些数据存储到莫格队列中,那么Python内置了一个线程安全的模块叫queue模块。Python中的queue模块中提供了同步的、线程安全的对咧咧,包括FIFO(先进先出)队列Queue,LIFO(后入先出)队列LifoQueue。这些队列都实现了所原理(可以理解为原子操作,即要么不做,要么都做完),能够在多线程中直接使用。可以使用队列来实现线程间的同步,相关函数如下:
初始化Queue(maxsize) : 创建一个先进先出的队列
qsize() : 返回队列的大小
empty() : 判断队列是否为空
full() : 判断队列是否已满
get() : 从队列中获取后一个数据
put() : 将一个数据放到队列中
使用代码示例:
# 引入所需库
import threading
import time
from queue import Queue
def set_value(q):
"""
写入队列
:param q:
:return:
"""
index = 1
while True:
q.put(index)
index += 1
time.sleep(3)
def get_value(q):
"""
从队列取值
:param q:
:return:
"""
while True:
print(q.get())
# time.sleep(4)
# print("qsize:", q.qsize())
def main():
"""
主函数
:return:
"""
q = Queue(5)
t1 = threading.Thread(target=set_value, args=[q])
t2 = threading.Thread(target=get_value, args=[q])
t1.start()
t2.start()
if __name__ == '__main__':
main()
使用实例
单线程爬取表情包,实例代码如下:
# 引入所需库
import os
import re
import requests
from lxml import etree
def parse_page(url):
"""
请求 解析 下载
:param url:
:return:
"""
# 声明定义请求头
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.110 Safari/537.36',
} 无锡人流医院哪家好 http://www.bhnfk.com
req = requests.get(url=url, headers=headers)
html = req.text
tree = etree.HTML(html)
imgs = tree.xpath('//div[@class="page-content text-center"]//img[@class!="gif"]')
for img in imgs:
img_url = img.get('data-original')
alt = img.get('alt')
alt = re.sub(r'[\??\..,!!]]', '', alt)
suffix = os.path.splitext(img_url)[1]
file_name = alt + suffix
req_img = requests.get(url=img_url, headers=headers)
with open('images/' + file_name, 'wb') as fp:
fp.write(req_img.content)
print(file_name)
def main():
"""
主函数
:return:
"""
for x in range(1, 101):
print("第%d页开始下载..." % x)
url = 'http://www.doutula.com/photo/list/?page=%d' % x
parse_page(url)
print("第%d页结束下载..." % x)
if __name__ == '__main__':
main()
多线程爬取表情包,实例代码如下:
# 引入所需库
import os
import re
import threading
from queue import Queue
import requests
from lxml import etree
class Producer(threading.Thread):
"""
生产者 - 手机表情包图片地址
"""
def __init__(self, page_queue, img_queue):
super(Producer, self).__init__()
self.page_queue = page_queue
self.img_queue = img_queue
def run(self):
while True:
if self.page_queue.empty():
break
url = self.page_queue.get()
self.parse_page(url)
def parse_page(self, url):
"""
请求 解析 下载
:param url:
:return:
"""
# 声明定义请求头
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.110 Safari/537.36',
}
req = requests.get(url=url, headers=headers)
html = req.text
tree = etree.HTML(html)
imgs = tree.xpath('//div[@class="page-content text-center"]//img[@class!="gif"]')
for img in imgs:
img_url = img.get('data-original')
alt = img.get('alt')
alt = re.sub(r'[\??\..,!!\*]]', '', alt)
suffix = os.path.splitext(img_url)[1]
file_name = alt + suffix
self.img_queue.put((img_url, file_name))
class Consumer(threading.Thread):
"""
消费者 - 下载表情包图片
"""
def __init__(self, page_queue, img_queue):
super(Consumer, self).__init__()
self.page_queue = page_queue
self.img_queue = img_queue
def run(self):
# 声明定义请求头
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.110 Safari/537.36',
}
while True:
if self.img_queue.empty() and self.page_queue.empty():
break
img_url, file_name = self.img_queue.get()
req_img = requests.get(url=img_url, headers=headers)
with open('images/' + file_name, 'wb') as fp:
fp.write(req_img.content)
print(file_name + '下载完成...')
def main():
"""
主函数
:return:
"""
page_queue = Queue(100)
img_queue = Queue(1000)
for x in range(1, 101):
url = 'http://www.doutula.com/photo/list/?page=%d' % x
page_queue.put(url)
# 定义五个生产者
for x in range(6):
t = Producer(page_queue=page_queue, img_queue=img_queue)
t.start()
# 定义三个消费者
for x in range(4):
t = Consumer(page_queue=page_queue, img_queue=img_queue)
t.start()
if __name__ == '__main__':
main()
GIL全局解释器锁
Python自带的解释器是CPython。CPython解释器的多线程实际上是一个家的多线程(在多核CPU中,只能利用一核,不能利用多核)。同一时刻只有一个线程在执行,为了保证同一时刻只有一个线程在执行,在CPython解释器中有一个功能叫做GIL,叫做全局解释器锁。这个解释器锁是有必要的,因为CPython解释器的内存管理不是线程安全的,当然除了CPython解释器,还有其他的解释器,有些解释器是没有GIL锁的,见下面:
Jpython : 用Java实现的Python解释器,不存在GIL锁。更多详情请见:https://zh.wikipedia.org/wiki/Jpython
IronPython : 用.NET实现的Python解释器,不存在GIL锁。更多详情请见:https://zh.wikipedia.org/wiki/IronPython
PyPy : 用Python实现的Python解释器,存在GIL锁。更多详情请见:https://zh.wikipedia.org/wiki/PyPy
GIL虽然是一个假的多线程,但是在处理IO操作