来自 计算机编程 2019-12-09 01:12 的文章
当前位置: 澳门威尼斯人平台 > 计算机编程 > 正文

Python基础(十三) 为什么说python多线程没有真正实现多现程

Python中的多线程没有真正实现多现程! 为什么这么说,我们了解一个概念,全局解释器锁(GIL)。

Python多线程实现同步的四种方式,python多线程

临界资源即那些一次只能被一个线程访问的资源,典型例子就是打印机,它一次只能被一个程序用来执行打印功能,因为不能多个线程同时操作,而访问这部分资源的代码通常称之为临界区。

锁机制

threading的Lock类,用该类的acquire函数进行加锁,用realease函数进行解锁

import threading
import time

class Num:
  def __init__(self):
    self.num = 0
    self.lock = threading.Lock()
  def add(self):
    self.lock.acquire()#加锁,锁住相应的资源
    self.num += 1
    num = self.num
    self.lock.release()#解锁,离开该资源
    return num

n = Num()
class jdThread(threading.Thread):
  def __init__(self,item):
    threading.Thread.__init__(self)
    self.item = item
  def run(self):
    time.sleep(2)
    value = n.add()#将num加1,并输出原来的数据和+1之后的数据
    print(self.item,value)

for item in range(5):
  t = jdThread(item)
  t.start()
  t.join()#使线程一个一个执行

当一个线程调用锁的acquire()方法获得锁时,锁就进入“locked”状态。每次只有一个线程可以获得锁。如果此时另一个线程试图获得这个锁,该线程就会变为“blocked”状态,称为“同步阻塞”(参见多线程的基本概念)。

直到拥有锁的线程调用锁的release()方法释放锁之后,锁进入“unlocked”状态。线程调度程序从处于同步阻塞状态的线程中选择一个来获得锁,并使得该线程进入运行(running)状态。

信号量

信号量也提供acquire方法和release方法,每当调用acquire方法的时候,如果内部计数器大于0,则将其减1,如果内部计数器等于0,则会阻塞该线程,知道有线程调用了release方法将内部计数器更新到大于1位置。

import threading
import time
class Num:
  def __init__(self):
    self.num = 0
    self.sem = threading.Semaphore(value = 3)
    #允许最多三个线程同时访问资源

  def add(self):
    self.sem.acquire()#内部计数器减1
    self.num += 1
    num = self.num
    self.sem.release()#内部计数器加1
    return num

n = Num()
class jdThread(threading.Thread):
  def __init__(self,item):
    threading.Thread.__init__(self)
    self.item = item
  def run(self):
    time.sleep(2)
    value = n.add()
    print(self.item,value)

for item in range(100):
  t = jdThread(item)
  t.start()
  t.join()

条件判断

所谓条件变量,即这种机制是在满足了特定的条件后,线程才可以访问相关的数据。

它使用Condition类来完成,由于它也可以像锁机制那样用,所以它也有acquire方法和release方法,而且它还有wait,notify,notifyAll方法。

"""
一个简单的生产消费者模型,通过条件变量的控制产品数量的增减,调用一次生产者产品就是+1,调用一次消费者产品就会-1.
"""

"""
使用 Condition 类来完成,由于它也可以像锁机制那样用,所以它也有 acquire 方法和 release 方法,而且它还有
wait, notify, notifyAll 方法。
"""

import threading
import queue,time,random

class Goods:#产品类
  def __init__(self):
    self.count = 0
  def add(self,num = 1):
    self.count += num
  def sub(self):
    if self.count>=0:
      self.count -= 1
  def empty(self):
    return self.count <= 0

class Producer(threading.Thread):#生产者类
  def __init__(self,condition,goods,sleeptime = 1):#sleeptime=1
    threading.Thread.__init__(self)
    self.cond = condition
    self.goods = goods
    self.sleeptime = sleeptime
  def run(self):
    cond = self.cond
    goods = self.goods
    while True:
      cond.acquire()#锁住资源
      goods.add()
      print("产品数量:",goods.count,"生产者线程")
      cond.notifyAll()#唤醒所有等待的线程--》其实就是唤醒消费者进程
      cond.release()#解锁资源
      time.sleep(self.sleeptime)

class Consumer(threading.Thread):#消费者类
  def __init__(self,condition,goods,sleeptime = 2):#sleeptime=2
    threading.Thread.__init__(self)
    self.cond = condition
    self.goods = goods
    self.sleeptime = sleeptime
  def run(self):
    cond = self.cond
    goods = self.goods
    while True:
      time.sleep(self.sleeptime)
      cond.acquire()#锁住资源
      while goods.empty():#如无产品则让线程等待
        cond.wait()
      goods.sub()
      print("产品数量:",goods.count,"消费者线程")
      cond.release()#解锁资源

g = Goods()
c = threading.Condition()

pro = Producer(c,g)
pro.start()

con = Consumer(c,g)
con.start()

同步队列

put方法和task_done方法,queue有一个未完成任务数量num,put依次num+1,task依次num-1.任务都完成时任务结束。

import threading
import queue
import time
import random

'''
1.创建一个 Queue.Queue() 的实例,然后使用数据对它进行填充。
2.将经过填充数据的实例传递给线程类,后者是通过继承 threading.Thread 的方式创建的。
3.每次从队列中取出一个项目,并使用该线程中的数据和 run 方法以执行相应的工作。
4.在完成这项工作之后,使用 queue.task_done() 函数向任务已经完成的队列发送一个信号。
5.对队列执行 join 操作,实际上意味着等到队列为空,再退出主程序。
'''

class jdThread(threading.Thread):
  def __init__(self,index,queue):
    threading.Thread.__init__(self)
    self.index = index
    self.queue = queue

  def run(self):
    while True:
      time.sleep(1)
      item = self.queue.get()
      if item is None:
        break
      print("序号:",self.index,"任务",item,"完成")
      self.queue.task_done()#task_done方法使得未完成的任务数量-1

q = queue.Queue(0)
'''
初始化函数接受一个数字来作为该队列的容量,如果传递的是
一个小于等于0的数,那么默认会认为该队列的容量是无限的.
'''
for i in range(2):
  jdThread(i,q).start()#两个线程同时完成任务

for i in range(10):
  q.put(i)#put方法使得未完成的任务数量+1

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持帮客之家。

临界资源即那些一次只能被一个线程访问的资源,典型例子就是打印机,它一次只能被一个程...

Python多线程和Python的锁,python多线程

Python多线程 
Python中实现多线程有两种方式,一种基于_thread模块(在Python2.x版本中为thread模块,没有下划线)的start_new_thread()函数,另一种基于threading模块的Thread类。 
其实Python的多线程编程不能真正利用多核的CPU,但是用开源模块使你的计算压力分布到多核CPU上......... 

一.使用start_new_thread()实现线程,是比较底层的实现方式,所有线程共享他们global数据,为了达到同步,模块也提供了简单的锁机制 

_thread.start_new_thread(function, args[, kwargs])
启动一个新的进程,并返回其标识符. 线程执行的函数需要的参数由args(必须为一个元组)提供,亦可通过可选参数kwargs提供关键字参数组  成的字典。当函数返回时,启动的线程也   停止退出。如果函数中存在未处理异常,会打印堆栈跟踪后线程停止退出(其他线程继续执行)。

其中线程标识符是一个非0整数,并没有直接意思,可以当作从一个线程组成的特殊字典中索引本线程的一个key,也可用_thread.get_ident()得到,在线程退出后,标识符会被系统回收。在线程执行过程中可以调用_thread.exit()终止本线程的执行。 

Java代码  图片 1

  1. import _thread  
  2. import time  
  3. def threadFunction(count):  
  4.     for i in range(count):  
  5.       print('进程id为%d的打印%d'%(_thread.get_ident(),i))  
  6.       i-=1  
  7.       time.sleep(0.1)  
  8.   
  9. def begin():  
  10.   ident1=_thread.start_new_thread(threadFunction,(100,))  
  11.   print('启动标识符为%d的进程'%(ident1,))  
  12.   ident2=_thread.start_new_thread(threadFunction,(100,))  
  13.   print('启动标识符为%d的进程'%(ident2,))  
  14.   
  15.    
  16. if __name__ == '__main__':  
  17.   begin()  

二.使用Thread类来实现多线程,这种方式是对_thread模块(如果没有_thread,则为dummy_threading)的高级封装,在这种方式下我们需创建新类继承threading.Thread,和java一样重写threading.Thread的run方法即可.启动线程用线程的start方法,它会调用我们重写的run方法. 

Java代码  图片 2

  1. class MyThread(threading.Thread):  
  2.     '''只能重写__init__ 和 run 两个方法'''  
  3.     def __init__(self,name):  
  4.         threading.Thread.__init__(self)  
  5.         self.name=name  
  6.         self.bool_stop=False  
  7.     def run(self):  
  8.         while not self.bool_stop:  
  9.             print('进程%s,于%s'%(self.name,time.asctime()))  
  10.             time.sleep(1)  
  11.     def stop(self):  
  12.         self.bool_stop = True  
  13.   
  14.   
  15. if __name__ == '__main__':  
  16.     th1=MyThread('one')  
  17.     th2=MyThread('two')  
  18.     th1.start()  
  19.     th2.start()  

Thread类还定义了以下常用方法与属性: 

Thread.getName() Thread.setName()
老方式用于获取和设置线程的名称,官方建议用Thread.name替代
Thread.ident
获取线程的标识符。只有在调用start()方法执行后才有效,否则返回None。
Thread.is_alive()
判断线程是否是激活的。
Thread.join([timeout])
调用Thread.join将会使主调线程堵塞,直到被调用线程运行结束或超时。参数timeout是一个数值类型,表示超时时间,如果未提供该参数,那么主调线程将一直堵塞到被调线程结束。

Python中的锁 

先用_thread模块的Lock锁来实现生产者消费者问题,Lock对象是Python提供的低级线程控制工具,使用起来非常简单,只需下面3条语句即可: 

_thread.allocate_lock()返回一个新Lock对象,即为一个新锁
lock.acquire() 相当于P操作,得到一个锁,
lock.release()相当于V操作,释放一个锁

代码如下: 

Java代码  图片 3

  1. import _thread,time,random  
  2. dish=0  
  3. lock = _thread.allocate_lock()  
  4. def producerFunction():  
  5.    '''如果投的筛子比0.2大,则向盘子中增加一个苹果'''  
  6.    global lock,dish  
  7.    while True:  
  8.        if(random.random() > 0.1):  
  9.          lock.acquire()  
  10.          if dish < 100:  
  11.            dish+=1  
  12.            print('生产者增加了一个苹果,现在有%d个苹果'%(dish,))  
  13.          lock.release()  
  14.          time.sleep(random.random()*3)  
  15.   
  16. def consumerFunction():  
  17.   '''如果投的筛子比0.5大,则从盘子中取一个苹果'''  
  18.   global lock,dish  
  19.   while True:  
  20.     if(random.random() > 0.9):  
  21.       lock.acquire()  
  22.       if dish > 0:  
  23.         dish-=1  
  24.         print('消费者拿走一个苹果现,在有%d个苹果'%(dish,))  
  25.       lock.release()  
  26.       time.sleep(random.random()*3)  
  27.   
  28. def begin():  
  29.   ident1=_thread.start_new_thread(producerFunction,())  
  30.   ident2=_thread.start_new_thread(consumerFunction,())   
  31. if __name__ == '__main__':  
  32.   begin()  

另一个较高级的锁为RLock锁,RLock对象内部维护着一个Lock对象,它是一种可重入的对象。对于Lock对象而言,如果一个线程连续两次进行acquire操作,那么由于第一次acquire之后没有release,第二次acquire将挂起线程。这会导致Lock对象永远不会release,使得线程死锁。RLock对象允许一个线程多次对其进行acquire操作,因为在其内部通过一个counter变量维护着线程acquire的次数。而且每一次的acquire操作必须有一个release操作与之对应,在所有的release操作完成之后,别的线程才能申请该RLock对象。 

threading模块对Lock也提供和封装,提供了更高级的同步方式(可以理解为更高级的锁),包括threading.Event和threading.Condition,其中threading.Event为提供了简单的同步方式:一个进程标记event,其他进程等待,只需下面的几个方法即可: 

Event.wait([timeout])
堵塞线程,直到Event对象内部标识位被设为True或超时(如果提供了参数timeout)。
Event.set()
将标识号设为Ture
Event.clear()
设为标识符False

threading.Condition 可以把Condiftion理解为一把高级的琐,它提供了比Lock, RLock更高级的功能,允许我们能够控制复杂的线程同步问题。threadiong.Condition在内部维护一个琐对象(默认是RLock),可以在创建Condigtion对象的时候把琐对象作为参数传入。Condition也提供了acquire, release方法,其含义与琐的acquire, release方法一致,其实它只是简单的调用内部琐对象的对应的方法而已。Condition还提供了如下方法(特别要注意:这些方法只有在占用琐(acquire)之后才能调用,否则将会报RuntimeError异常。): 

Condition.wait([timeout]):    
wait方法释放内部所占用的琐,同时线程被挂起,直至接收到通知被唤醒或超时(如果提供了timeout参数的话)。当线程被唤醒并重新占有琐的时候,程序才会继续执行下去。
Condition.notify():  
唤醒一个挂起的线程(如果存在挂起的线程)。注意:notify()方法不会释放所占用的琐。
Condition.notify_all()
唤醒所有挂起的线程(如果存在挂起的线程)。注意:这些方法不会释放所占用的琐。

Python多线程、异步+多进程爬虫实现代码,python多线程

安装Tornado
省事点可以直接用grequests库,下面用的是tornado的异步client。 异步用到了tornado,根据官方文档的例子修改得到一个简单的异步爬虫类。可以参考下最新的文档学习下。
pip install tornado

异步爬虫

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import time
from datetime import timedelta
from tornado import httpclient, gen, ioloop, queues
import traceback


class AsySpider(object):
  """A simple class of asynchronous spider."""
  def __init__(self, urls, concurrency=10, **kwargs):
    urls.reverse()
    self.urls = urls
    self.concurrency = concurrency
    self._q = queues.Queue()
    self._fetching = set()
    self._fetched = set()

  def fetch(self, url, **kwargs):
    fetch = getattr(httpclient.AsyncHTTPClient(), 'fetch')
    return fetch(url, **kwargs)

  def handle_html(self, url, html):
    """handle html page"""
    print(url)

  def handle_response(self, url, response):
    """inherit and rewrite this method"""
    if response.code == 200:
      self.handle_html(url, response.body)

    elif response.code == 599:  # retry
      self._fetching.remove(url)
      self._q.put(url)

  @gen.coroutine
  def get_page(self, url):
    try:
      response = yield self.fetch(url)
      print('######fetched %s' % url)
    except Exception as e:
      print('Exception: %s %s' % (e, url))
      raise gen.Return(e)
    raise gen.Return(response)

  @gen.coroutine
  def _run(self):
    @gen.coroutine
    def fetch_url():
      current_url = yield self._q.get()
      try:
        if current_url in self._fetching:
          return

        print('fetching****** %s' % current_url)
        self._fetching.add(current_url)

        response = yield self.get_page(current_url)
        self.handle_response(current_url, response)  # handle reponse

        self._fetched.add(current_url)

        for i in range(self.concurrency):
          if self.urls:
            yield self._q.put(self.urls.pop())

      finally:
        self._q.task_done()

    @gen.coroutine
    def worker():
      while True:
        yield fetch_url()

    self._q.put(self.urls.pop())  # add first url

    # Start workers, then wait for the work queue to be empty.
    for _ in range(self.concurrency):
      worker()

    yield self._q.join(timeout=timedelta(seconds=300000))
    assert self._fetching == self._fetched

  def run(self):
    io_loop = ioloop.IOLoop.current()
    io_loop.run_sync(self._run)


class MySpider(AsySpider):

  def fetch(self, url, **kwargs):
    """重写父类fetch方法可以添加cookies,headers,timeout等信息"""
    cookies_str = "PHPSESSID=j1tt66a829idnms56ppb70jri4; pspt=%7B%22id%22%3A%2233153%22%2C%22pswd%22%3A%228835d2c1351d221b4ab016fbf9e8253f%22%2C%22_code%22%3A%22f779dcd011f4e2581c716d1e1b945861%22%7D; key=%E9%87%8D%E5%BA%86%E5%95%84%E6%9C%A8%E9%B8%9F%E7%BD%91%E7%BB%9C%E7%A7%91%E6%8A%80%E6%9C%89%E9%99%90%E5%85%AC%E5%8F%B8; think_language=zh-cn; SERVERID=a66d7d08fa1c8b2e37dbdc6ffff82d9e|1444973193|1444967835; CNZZDATA1254842228=1433864393-1442810831-%7C1444972138"  # 从浏览器拷贝cookie字符串
    headers = {
      'User-Agent': 'mozilla/5.0 (compatible; baiduspider/2.0; +http://www.baidu.com/search/spider.html)',
      'cookie': cookies_str
    }
    return super(MySpider, self).fetch(  # 参数参考tornado文档
      url, headers=headers, request_timeout=1
    )

  def handle_html(self, url, html):
    print(url, html)


def main():
  urls = []
  for page in range(1, 100):
    urls.append('http://www.baidu.com?page=%s' % page)
  s = MySpider(urls)
  s.run()


if __name__ == '__main__':
  main()

可以继承这个类,塞一些url进去,然后重写handle_page处理得到的页面。

异步+多进程爬虫
还可以再变态点,加个进程池,使用了multiprocessing模块。效率飕飕的,

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import time
from multiprocessing import Pool
from datetime import timedelta
from tornado import httpclient, gen, ioloop, queues


class AsySpider(object):
  """A simple class of asynchronous spider."""
  def __init__(self, urls, concurrency):
    urls.reverse()
    self.urls = urls
    self.concurrency = concurrency
    self._q = queues.Queue()
    self._fetching = set()
    self._fetched = set()

  def handle_page(self, url, html):
    filename = url.rsplit('/', 1)[1]
    with open(filename, 'w+') as f:
      f.write(html)

  @gen.coroutine
  def get_page(self, url):
    try:
      response = yield httpclient.AsyncHTTPClient().fetch(url)
      print('######fetched %s' % url)
    except Exception as e:
      print('Exception: %s %s' % (e, url))
      raise gen.Return('')
    raise gen.Return(response.body)

  @gen.coroutine
  def _run(self):

    @gen.coroutine
    def fetch_url():
      current_url = yield self._q.get()
      try:
        if current_url in self._fetching:
          return

        print('fetching****** %s' % current_url)
        self._fetching.add(current_url)
        html = yield self.get_page(current_url)
        self._fetched.add(current_url)

        self.handle_page(current_url, html)

        for i in range(self.concurrency):
          if self.urls:
            yield self._q.put(self.urls.pop())

      finally:
        self._q.task_done()

    @gen.coroutine
    def worker():
      while True:
        yield fetch_url()

    self._q.put(self.urls.pop())

    # Start workers, then wait for the work queue to be empty.
    for _ in range(self.concurrency):
      worker()
    yield self._q.join(timeout=timedelta(seconds=300000))
    assert self._fetching == self._fetched

  def run(self):
    io_loop = ioloop.IOLoop.current()
    io_loop.run_sync(self._run)


def run_spider(beg, end):
  urls = []
  for page in range(beg, end):
    urls.append('http://127.0.0.1/%s.htm' % page)
  s = AsySpider(urls, 10)
  s.run()


def main():
  _st = time.time()
  p = Pool()
  all_num = 73000
  num = 4  # number of cpu cores
  per_num, left = divmod(all_num, num)
  s = range(0, all_num, per_num)
  res = []
  for i in range(len(s)-1):
    res.append((s[i], s[i+1]))
  res.append((s[len(s)-1], all_num))
  print res

  for i in res:
    p.apply_async(run_spider, args=(i[0], i[1],))
  p.close()
  p.join()

  print time.time()-_st


if __name__ == '__main__':
  main()

多线程爬虫
线程池实现.

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import Queue
import sys
import requests
import os
import threading
import time

class Worker(threading.Thread):  # 处理工作请求
  def __init__(self, workQueue, resultQueue, **kwds):
    threading.Thread.__init__(self, **kwds)
    self.setDaemon(True)
    self.workQueue = workQueue
    self.resultQueue = resultQueue


  def run(self):
    while 1:
      try:
        callable, args, kwds = self.workQueue.get(False)  # get task
        res = callable(*args, **kwds)
        self.resultQueue.put(res)  # put result
      except Queue.Empty:
        break

class WorkManager:  # 线程池管理,创建
  def __init__(self, num_of_workers=10):
    self.workQueue = Queue.Queue()  # 请求队列
    self.resultQueue = Queue.Queue()  # 输出结果的队列
    self.workers = []
    self._recruitThreads(num_of_workers)

  def _recruitThreads(self, num_of_workers):
    for i in range(num_of_workers):
      worker = Worker(self.workQueue, self.resultQueue)  # 创建工作线程
      self.workers.append(worker)  # 加入到线程队列


  def start(self):
    for w in self.workers:
      w.start()

  def wait_for_complete(self):
    while len(self.workers):
      worker = self.workers.pop()  # 从池中取出一个线程处理请求
      worker.join()
      if worker.isAlive() and not self.workQueue.empty():
        self.workers.append(worker)  # 重新加入线程池中
    print 'All jobs were complete.'


  def add_job(self, callable, *args, **kwds):
    self.workQueue.put((callable, args, kwds))  # 向工作队列中加入请求

  def get_result(self, *args, **kwds):
    return self.resultQueue.get(*args, **kwds)


def download_file(url):
  #print 'beg download', url
  requests.get(url).text


def main():
  try:
    num_of_threads = int(sys.argv[1])
  except:
    num_of_threads = 10
  _st = time.time()
  wm = WorkManager(num_of_threads)
  print num_of_threads
  urls = ['http://www.baidu.com'] * 1000
  for i in urls:
    wm.add_job(download_file, i)
  wm.start()
  wm.wait_for_complete()
  print time.time() - _st

if __name__ == '__main__':
  main()

这三种随便一种都有很高的效率,但是这么跑会给网站服务器不小的压力,尤其是小站点,还是有点节操为好。

安装Tornado 省事点可以直接用grequests库,下面用的是tornado的异步client。 异步用到了...

Python代码的执行由Python虚拟机(解释器)来控制。

下面这段python的多线程代码为何运行不起来,说是函数没有加锁的属性

下面的代码可以,因为你那个lock是变量又是函数,会冲突的,另外,你这个实验其实测试不出来lock属性,因为你调用实际上是顺序的

import string,socket,time,os,sys,threading
num=0
class xThread():
def __init__(self):
self._lock=threading.Lock()
def lock(self):
self._lock.acquire()
def unlock(self):
self._lock.release()

def p2(a):
global num
thread.lock()
num+=1
print(a+str(num))
time.sleep(0.1)
thread.unlock()

def p1(a):
for i in range(3):
info='this %s thread'%(i)
p2(info)

def main():
for i in range(3):
p1(i)

if __name__ == '__main__':
thread=xThread()
main()  

Python在设计之初就考虑要在主循环中,同时只有一个线程在执行,

python多线程 可以不可以查看锁的个数

给lock加个变量保存锁的层数吧,每acquire一次就加1,每release一次就减1,也就不用另外找什么函数了。  

Python多线程 Python中实现多线程有两种方式,一种基于_thread模块(在Python2.x版本中为thread模块,没有下划...

就像单CPU的系统中运行多个进程那样,内存中可以存放多个程序,

本文由澳门威尼斯人平台发布于计算机编程,转载请注明出处:Python基础(十三) 为什么说python多线程没有真正实现多现程

关键词: