python threading模块有两类锁:互斥锁(threading.Lock )和可重用锁(threading.RLock)。两者的用法基本相同,具体如下:
lock = threading.Lock() lock.acquire() dosomething…… lock.release()
RLock的用法是将threading.Lock()修改为threading.RLock()。便于理解,先来段代码:
[root@361way lock]# cat lock1.py #!/usr/bin/env python # coding=utf-8 import threading # 导入threading模块 import time # 导入time模块 class mythread(threading.Thread): # 通过继承创建类 def __init__(self,threadname): # 初始化方法 # 调用父类的初始化方法 threading.Thread.__init__(self,name = threadname) def run(self): # 重载run方法 global x # 使用global表明x为全局变量 for i in range(3): x = x + 1 time.sleep(5) # 调用sleep函数,让线程休眠5秒 print x tl = [] # 定义列表 for i in range(10): t = mythread(str(i)) # 类实例化 tl.append(t) # 将类对象添加到列表中 x=0 # 将x赋值为0 for i in tl: i.start()
这里执行的结果和想想的不同,结果如下:
[root@361way lock]# python lock1.py 30 30 30 30 30 30 30 30 30 30
为什么结果都是30呢?关键在于global 行和 time.sleep行。
1、由于x是一个全局变量,所以每次循环后 x 的值都是执行后的结果值;
2、由于该代码是多线程的操作,所以在sleep 等待的时候,之前已经执行完成的线程会在这等待,而后续的进程在等待的5秒这段时间也执行完成 ,等待print。同样由于global 的原理,x被重新斌值。所以打印出的结果全是30 ;
3、便于理解,可以尝试将sleep等注释,你再看下结果,就会发现有不同。
在实际应用中,如抓取程序等,也会出现类似于sleep等待的情况。在前后调用有顺序或打印有输出的时候,就会现并发竞争,造成结果或输出紊乱。这里就引入了锁的概念,上面的代码修改下,如下:
[root@361way lock]# cat lock2.py #!/usr/bin/env python # coding=utf-8 import threading # 导入threading模块 import time # 导入time模块 class mythread(threading.Thread): # 通过继承创建类 def __init__(self,threadname): # 初始化方法 threading.Thread.__init__(self,name = threadname) def run(self): # 重载run方法 global x # 使用global表明x为全局变量 lock.acquire() # 调用lock的acquire方法 for i in range(3): x = x + 1 time.sleep(5) # 调用sleep函数,让线程休眠5秒 print x lock.release() # 调用lock的release方法 lock = threading.Lock() # 类实例化 tl = [] # 定义列表 for i in range(10): t = mythread(str(i)) # 类实例化 tl.append(t) # 将类对象添加到列表中 x=0 # 将x赋值为0 for i in tl: i.start() # 依次运行线程
执行的结果如下:
[root@361way lock]# python lock2.py 3 6 9 12 15 18 21 24 27 30
加锁的结果会造成阻塞,而且会造成开锁大。会根据顺序由并发的多线程按顺序输出,如果后面的线程执行过快,需要等待前面的进程结束后其才能结束 --- 写的貌似有点像队列的概念了 ,不过在加锁的很多场景下确实可以通过队列去解决。
最后,再引入一个示例,在股票量化分析(二)PE和流通市值篇中,介绍了如何采集stock的两个指标,并按结果输出,不过在输出的时候发有会出现输出紊乱,如下:
如600131和000708的stockid就输出到了同一行,虽然通过多线程使执行速度快了很多 ,但这样很不美观,也不便于后续处理。
1、输出竞争紊乱代码
#!/usr/bin/python #coding=utf-8 # 1、pe在 0~20 之间的企业 # 2、流通股本小于50亿的企业 import urllib2 import time import json from threading import Thread def get_pe(stockid): try: url = 'http://d.10jqka.com.cn/v2/realhead/hs_%s/last.js' % stockid send_headers = { 'Host':'d.10jqka.com.cn', 'Referer':'http://stock.10jqka.com.cn/', 'Accept':'application/json, text/javascript, */*; q=0.01', 'Connection':'keep-alive', 'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/38.0.2125.122 Safari/537.36', 'X-Forwarded-For':'124.160.148.178', 'X-Requested-With':'XMLHttpRequest' } req = urllib2.Request(url,headers=send_headers) f = urllib2.urlopen(req) data = f.read().split('items":',1)[1] data = data.split('})',1)[0] J_data = json.loads(data) #J_data = json.dumps(data,indent=4,encoding='utf-8') stockpe = J_data['2034120'] stockname = J_data['name'] sumvalue = J_data['3475914'] currentprice = J_data['10'] #print stockid,stockname,stockpe return stockname,stockpe,sumvalue,currentprice except urllib2.HTTPError, e: #return stockid ,'get happed httperror' return e.code def cond(stockid,pe,asset): pe = int(pe) asset = int(asset) try: stockname,stockpe,sumvalue,currentprice = get_pe(stockid) if sumvalue: Billvalue = round(float(sumvalue)/1000/1000/100) else: Billvalue = 0 if stockpe: if float(stockpe) > 0 and float(stockpe) < pe and Billvalue < asset : print stockid,stockname,currentprice,stockpe,Billvalue #else: # print stockid except TypeError ,e: print stockid ,'get is error' if __name__ == '__main__': threads = [] print 'stockid stockname currentprice stockpe Billvalue' stockids = [line.strip() for line in open("stock_exp.txt", 'r')] nloops = range(len(stockids)) for stockid in stockids: t = Thread(target=cond, args=(stockid,28,80)) threads.append(t) for i in nloops: threads[i].start() for i in nloops: threads[i].join()
2、加锁后的代码
#!/usr/bin/python #coding=utf-8 # 1、pe在 0~20 之间的企业 # 2、流通股本小于50亿的企业 import threading import urllib2 import time import json lock = threading.Lock() def get_pe(stockid): try: url = 'http://d.10jqka.com.cn/v2/realhead/hs_%s/last.js' % stockid send_headers = { 'Host':'d.10jqka.com.cn', 'Referer':'http://stock.10jqka.com.cn/', 'Accept':'application/json, text/javascript, */*; q=0.01', 'Connection':'keep-alive', 'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/38.0.2125.122 Safari/537.36', 'X-Forwarded-For':'124.160.148.178', 'X-Requested-With':'XMLHttpRequest' } req = urllib2.Request(url,headers=send_headers) f = urllib2.urlopen(req) data = f.read().split('items":',1)[1] data = data.split('})',1)[0] J_data = json.loads(data) #J_data = json.dumps(data,indent=4,encoding='utf-8') stockpe = J_data['2034120'] stockname = J_data['name'] sumvalue = J_data['3475914'] currentprice = J_data['10'] #print stockid,stockname,stockpe return stockname,stockpe,sumvalue,currentprice except urllib2.HTTPError, e: #return stockid ,'get happed httperror' return e.code def cond(stockid,pe,asset): pe = int(pe) asset = int(asset) try: stockname,stockpe,sumvalue,currentprice = get_pe(stockid) if sumvalue: Billvalue = round(float(sumvalue)/1000/1000/100) else: Billvalue = 0 if stockpe: if float(stockpe) > 0 and float(stockpe) < pe and Billvalue < asset : lock.acquire() print stockid,stockname,currentprice,stockpe,Billvalue lock.release() #else: # print stockid except TypeError ,e: print stockid ,'get is error' if __name__ == '__main__': threads = [] print 'stockid stockname currentprice stockpe Billvalue' stockids = [line.strip() for line in open("stock_exp.txt", 'r')] for stockid in stockids: t = threading.Thread(target=cond, args=(stockid,25,50)) threads.append(t) t.start()