- def select_sort(origin_items, comp=lambda x, y: x < y):
- """简单选择排序"""
- items = origin_items[:]
- for i in range(len(items) - 1):
- min_index = i
- for j in range(i + 1, len(items)):
- if comp(items[j], items[min_index]):
- min_index = j
- items[i], items[min_index] = items[min_index], items[i]
- return items
-
- def bubble_sort(origin_items, comp=lambda x, y: x > y):
- """高质量冒泡排序(搅拌排序)"""
- items = origin_items[:]
- for i in range(len(items) - 1):
- swapped = False
- for j in range(i, len(items) - 1 - i):
- if comp(items[j], items[j + 1]):
- items[j], items[j + 1] = items[j + 1], items[j]
- swapped = True
- if swapped:
- swapped = False
- for j in range(len(items) - 2 - i, i, -1):
- if comp(items[j - 1], items[j]):
- items[j], items[j - 1] = items[j - 1], items[j]
- swapped = True
- if not swapped:
- break
- return items
-
- def merge_sort(items, comp=lambda x, y: x <= y):
- """归并排序(分治法)"""
- if len(items) < 2:
- return items[:]
- mid = len(items) // 2
- left = merge_sort(items[:mid], comp)
- right = merge_sort(items[mid:], comp)
- return merge(left, right, comp)
-
-
- def merge(items1, items2, comp):
- """合并(将两个有序的列表合并成一个有序的列表)"""
- items = []
- index, index2 = 0, 0
- while index1 < len(items1) and index2 < len(items2):
- if comp(items1[index1], items2[index2]):
- items.append(items1[index1])
- index1 += 1
- else:
- items.append(items2[index2])
- index2 += 1
- items += items1[index1:]
- items += items2[index2:]
- return items
-
- def seq_search(items, key):
- """顺序查找"""
- for index, item in enumerate(items):
- if item == key:
- return index
- return -1
-
- def bin_search(items, key):
- """折半查找"""
- start, end = 0, len(items) - 1
- while start <= end:
- mid = (start + end) // 2
- if key > items[mid]:
- start = mid + 1
- elif key < items[mid]:
- end = mid - 1
- else:
- return mid
- return -1
-
- prices = {
- 'AAPL': 191.88,
- 'GOOG': 1186.96,
- 'IBM': 149.24,
- 'ORCL': 48.44,
- 'ACN': 166.89,
- 'FB': 208.09,
- 'SYMC': 21.29
- }
- # 用股票价格大于100元的股票构造一个新的字典
- prices2 = {key: value for key, value in prices.items() if value > 100}
- print(prices2)
-
说明:生成式(推导式)可以用来生成列表、集合和字典。
- names = ['关羽', '张飞', '赵云', '马超', '黄忠']
- courses = ['语文', '数学', '英语']
- # 录入五个学生三门课程的成绩
- # 错误 - 参考http://pythontutor.com/visualize.html#mode=edit
- # scores = [[None] * len(courses)] * len(names)
- scores = [[None] * len(courses) for _ in range(len(names))]
- for row, name in enumerate(names):
- for col, course in enumerate(courses):
- scores[row][col] = float(input(f'请输入{name}的{course}成绩: '))
- print(scores)
-
Python Tutor(link:http://pythontutor.com/) - VISUALIZE CODE AND GET LIVE HELP
- """
- 从列表中找出最大的或最小的N个元素
- 堆结构(大根堆/小根堆)
- """
- import heapq
-
- list1 = [34, 25, 12, 99, 87, 63, 58, 78, 88, 92]
- list2 = [
- {'name': 'IBM', 'shares': 100, 'price': 91.1},
- {'name': 'AAPL', 'shares': 50, 'price': 543.22},
- {'name': 'FB', 'shares': 200, 'price': 21.09},
- {'name': 'HPQ', 'shares': 35, 'price': 31.75},
- {'name': 'YHOO', 'shares': 45, 'price': 16.35},
- {'name': 'ACME', 'shares': 75, 'price': 115.65}
- ]
- print(heapq.nlargest(3, list1))
- print(heapq.nsmallest(3, list1))
- print(heapq.nlargest(2, list2, key=lambda x: x['price']))
- print(heapq.nlargest(2, list2, key=lambda x: x['shares']))
- """
- 迭代工具 - 排列 / 组合 / 笛卡尔积
- """
- import itertools
-
- itertools.permutations('ABCD')
- itertools.combinations('ABCDE', 3)
- itertools.product('ABCD', '123')
-
- """
- 找出序列中出现次数最多的元素
- """
- from collections import Counter
-
- words = [
- 'look', 'into', 'my', 'eyes', 'look', 'into', 'my', 'eyes',
- 'the', 'eyes', 'the', 'eyes', 'the', 'eyes', 'not', 'around',
- 'the', 'eyes', "don't", 'look', 'around', 'the', 'eyes',
- 'look', 'into', 'my', 'eyes', "you're", 'under'
- ]
- counter = Counter(words)
- print(counter.most_common(3))
-
常用算法:
- # 公鸡5元一只 母鸡3元一只 小鸡1元三只
- # 用100元买100只鸡 问公鸡/母鸡/小鸡各多少只
- for x in range(20):
- for y in range(33):
- z = 100 - x - y
- if 5 * x + 3 * y + z // 3 == 100 and z % 3 == 0:
- print(x, y, z)
-
- # A、B、C、D、E五人在某天夜里合伙捕鱼 最后疲惫不堪各自睡觉
- # 第二天A第一个醒来 他将鱼分为5份 扔掉多余的1条 拿走自己的一份
- # B第二个醒来 也将鱼分为5份 扔掉多余的1条 拿走自己的一份
- # 然后C、D、E依次醒来也按同样的方式分鱼 问他们至少捕了多少条鱼
- fish = 1
- while True:
- total = fish
- enough = True
- for _ in range(5):
- if (total - 1) % 5 == 0:
- total = (total - 1) // 5 * 4
- else:
- enough = False
- break
- if enough:
- print(fish)
- break
- fish += 1
-
贪婪法例子:假设小偷有一个背包,最多能装 20 公斤赃物,他闯入一户人家,发现如下表所示的物品。很显然,他不能把所有物品都装进背包,所以必须确定拿走哪些物品,留下哪些物品。
名称 | 价格(美元) | 重量(kg) |
---|---|---|
电脑 | 200 | 20 |
收音机 | 20 | 4 |
钟 | 175 | 10 |
花瓶 | 50 | 2 |
书 | 10 | 1 |
油画 | 90 | 9 |
- """
- 贪婪法:在对问题求解时,总是做出在当前看来是最好的选择,不追求最优解,快速找到满意解。
- 输入:
- 20 6
- 电脑 200 20
- 收音机 20 4
- 钟 175 10
- 花瓶 50 2
- 书 10 1
- 油画 90 9
- """
- class Thing(object):
- """物品"""
-
- def __init__(self, name, price, weight):
- self.name = name
- self.price = price
- self.weight = weight
-
- @property
- def value(self):
- """价格重量比"""
- return self.price / self.weight
-
-
- def input_thing():
- """输入物品信息"""
- name_str, price_str, weight_str = input().split()
- return name_str, int(price_str), int(weight_str)
-
-
- def main():
- """主函数"""
- max_weight, num_of_things = map(int, input().split())
- all_things = []
- for _ in range(num_of_things):
- all_things.append(Thing(*input_thing()))
- all_things.sort(key=lambda x: x.value, reverse=True)
- total_weight = 0
- total_price = 0
- for thing in all_things:
- if total_weight + thing.weight <= max_weight:
- print(f'小偷拿走了{thing.name}')
- total_weight += thing.weight
- total_price += thing.price
- print(f'总价值: {total_price}美元')
-
-
- if __name__ == '__main__':
- main()
-
- """
- 快速排序 - 选择枢轴对元素进行划分,左边都比枢轴小右边都比枢轴大
- """
- def quick_sort(origin_items, comp=lambda x, y: x <= y):
- items = origin_items[:]
- _quick_sort(items, 0, len(items) - 1, comp)
- return items
-
-
- def _quick_sort(items, start, end, comp):
- if start < end:
- pos = _partition(items, start, end, comp)
- _quick_sort(items, start, pos - 1, comp)
- _quick_sort(items, pos + 1, end, comp)
-
-
- def _partition(items, start, end, comp):
- pivot = items[end]
- i = start - 1
- for j in range(start, end):
- if comp(items[j], pivot):
- i += 1
- items[i], items[j] = items[j], items[i]
- items[i + 1], items[end] = items[end], items[i + 1]
- return i + 1
-
- """
- 递归回溯法:叫称为试探法,按选优条件向前搜索,当搜索到某一步,发现原先选择并不优或达不到目标时,就退回一步重新选择,比较经典的问题包括骑士巡逻、八皇后和迷宫寻路等。
- """
- import sys
- import time
-
- SIZE = 5
- total = 0
-
-
- def print_board(board):
- for row in board:
- for col in row:
- print(str(col).center(4), end='')
- print()
-
-
- def patrol(board, row, col, step=1):
- if row >= 0 and row < SIZE and \
- col >= 0 and col < SIZE and \
- board[row][col] == 0:
- board[row][col] = step
- if step == SIZE * SIZE:
- global total
- total += 1
- print(f'第{total}种走法: ')
- print_board(board)
- patrol(board, row - 2, col - 1, step + 1)
- patrol(board, row - 1, col - 2, step + 1)
- patrol(board, row + 1, col - 2, step + 1)
- patrol(board, row + 2, col - 1, step + 1)
- patrol(board, row + 2, col + 1, step + 1)
- patrol(board, row + 1, col + 2, step + 1)
- patrol(board, row - 1, col + 2, step + 1)
- patrol(board, row - 2, col + 1, step + 1)
- board[row][col] = 0
-
-
- def main():
- board = [[0] * SIZE for _ in range(SIZE)]
- patrol(board, SIZE - 1, SIZE - 1)
-
-
- if __name__ == '__main__':
- main()
-
不使用动态规划将会是几何级数复杂度。
- """
- 动态规划 - 适用于有重叠子问题和最优子结构性质的问题
- 使用动态规划方法所耗时间往往远少于朴素解法(用空间换取时间)
- """
- def fib(num, temp={}):
- """用递归计算Fibonacci数"""
- if num in (1, 2):
- return 1
- try:
- return temp[num]
- except KeyError:
- temp[num] = fib(num - 1) + fib(num - 2)
- return temp[num]
-
使用动态规划可以避免二重循环。
说明:子列表指的是列表中索引(下标)连续的元素构成的列表;列表中的元素是int类型,可能包含正整数、0、负整数;程序输入列表中的元素,输出子列表元素求和的最大值,例如:
输入:1 -2 3 5 -3 2
输出:8
输入:0 -2 3 5 -1 2
输出:9
输入:-9 -2 -3 -5 -3
输出:-2
- def main():
- items = list(map(int, input().split()))
- size = len(items)
- overall, partial = {}, {}
- overall[size - 1] = partial[size - 1] = items[size - 1]
- for i in range(size - 2, -1, -1):
- partial[i] = max(items[i], partial[i + 1] + items[i])
- overall[i] = max(partial[i], overall[i + 1])
- print(overall[0])
-
-
- if __name__ == '__main__':
- main()
-
- items1 = list(map(lambda x: x ** 2, filter(lambda x: x % 2, range(1, 10))))
- items2 = [x ** 2 for x in range(1, 10) if x % 2]
-
例子:输出函数执行时间的装饰器。
- def record_time(func):
- """自定义装饰函数的装饰器"""
-
- @wraps(func)
- def wrapper(*args, **kwargs):
- start = time()
- result = func(*args, **kwargs)
- print(f'{func.__name__}: {time() - start}秒')
- return result
-
- return wrapper
-
如果装饰器不希望跟print函数耦合,可以编写带参数的装饰器。
- from functools import wraps
- from time import time
-
-
- def record(output):
- """自定义带参数的装饰器"""
-
- def decorate(func):
-
- @wraps(func)
- def wrapper(*args, **kwargs):
- start = time()
- result = func(*args, **kwargs)
- output(func.__name__, time() - start)
- return result
-
- return wrapper
-
- return decorate
-
- from functools import wraps
- from time import time
-
-
- class Record():
- """自定义装饰器类(通过__call__魔术方法使得对象可以当成函数调用)"""
-
- def __init__(self, output):
- self.output = output
-
- def __call__(self, func):
-
- @wraps(func)
- def wrapper(*args, **kwargs):
- start = time()
- result = func(*args, **kwargs)
- self.output(func.__name__, time() - start)
- return result
-
- return wrapper
-
说明:由于对带装饰功能的函数添加了@wraps装饰器,可以通过func.__wrapped__方式获得被装饰之前的函数或类来取消装饰器的作用。
例子:用装饰器来实现单例模式。
- from functools import wraps
-
-
- def singleton(cls):
- """装饰类的装饰器"""
- instances = {}
-
- @wraps(cls)
- def wrapper(*args, **kwargs):
- if cls not in instances:
- instances[cls] = cls(*args, **kwargs)
- return instances[cls]
-
- return wrapper
-
-
- @singleton
- class President():
- """总统(单例类)"""
- pass
-
说明:上面的代码中用到了闭包(closure),不知道你是否已经意识到了。还没有一个小问题就是,上面的代码并没有实现线程安全的单例,如果要实现线程安全的单例应该怎么做呢?
- from functools import wraps
-
-
- def singleton(cls):
- """线程安全的单例装饰器"""
- instances = {}
- locker = Lock()
-
- @wraps(cls)
- def wrapper(*args, **kwargs):
- if cls not in instances:
- with locker:
- if cls not in instances:
- instances[cls] = cls(*args, **kwargs)
- return instances[cls]
-
- return wrapper
-
例子:工资结算系统。
- """
- 月薪结算系统 - 部门经理每月15000 程序员每小时200 销售员1800底薪加销售额5%提成
- """
- from abc import ABCMeta, abstractmethod
-
-
- class Employee(metaclass=ABCMeta):
- """员工(抽象类)"""
-
- def __init__(self, name):
- self.name = name
-
- @abstractmethod
- def get_salary(self):
- """结算月薪(抽象方法)"""
- pass
-
-
- class Manager(Employee):
- """部门经理"""
-
- def get_salary(self):
- return 15000.0
-
-
- class Programmer(Employee):
- """程序员"""
-
- def __init__(self, name, working_hour=0):
- self.working_hour = working_hour
- super().__init__(name)
-
- def get_salary(self):
- return 200.0 * self.working_hour
-
-
- class Salesman(Employee):
- """销售员"""
-
- def __init__(self, name, sales=0.0):
- self.sales = sales
- super().__init__(name)
-
- def get_salary(self):
- return 1800.0 + self.sales * 0.05
-
-
- class EmployeeFactory():
- """创建员工的工厂(工厂模式 - 通过工厂实现对象使用者和对象之间的解耦合)"""
-
- @staticmethod
- def create(emp_type, *args, **kwargs):
- """创建员工"""
- emp_type = emp_type.upper()
- emp = None
- if emp_type == 'M':
- emp = Manager(*args, **kwargs)
- elif emp_type == 'P':
- emp = Programmer(*args, **kwargs)
- elif emp_type == 'S':
- emp = Salesman(*args, **kwargs)
- return emp
-
-
- def main():
- """主函数"""
- emps = [
- EmployeeFactory.create('M', '曹操'),
- EmployeeFactory.create('P', '荀彧', 120),
- EmployeeFactory.create('P', '郭嘉', 85),
- EmployeeFactory.create('S', '典韦', 123000),
- ]
- for emp in emps:
- print('%s: %.2f元' % (emp.name, emp.get_salary()))
-
-
- if __name__ == '__main__':
- main()
-
例子:扑克游戏。
- """
- 经验:符号常量总是优于字面常量,枚举类型是定义符号常量的最佳选择
- """
- from enum import Enum, unique
-
- import random
-
-
- @unique
- class Suite(Enum):
- """花色"""
-
- SPADE, HEART, CLUB, DIAMOND = range(4)
-
- def __lt__(self, other):
- return self.value < other.value
-
-
- class Card():
- """牌"""
-
- def __init__(self, suite, face):
- """初始化方法"""
- self.suite = suite
- self.face = face
-
- def show(self):
- """显示牌面"""
- suites = ['♠️', '♥️', '♣️', '♦️']
- faces = ['', 'A', '2', '3', '4', '5', '6', '7', '8', '9', '10', 'J', 'Q', 'K']
- return f'{suites[self.suite.value]} {faces[self.face]}'
-
- def __str__(self):
- return self.show()
-
- def __repr__(self):
- return self.show()
-
-
- class Poker():
- """扑克"""
-
- def __init__(self):
- self.index = 0
- self.cards = [Card(suite, face)
- for suite in Suite
- for face in range(1, 14)]
-
- def shuffle(self):
- """洗牌(随机乱序)"""
- random.shuffle(self.cards)
- self.index = 0
-
- def deal(self):
- """发牌"""
- card = self.cards[self.index]
- self.index += 1
- return card
-
- @property
- def has_more(self):
- return self.index < len(self.cards)
-
-
- class Player():
- """玩家"""
-
- def __init__(self, name):
- self.name = name
- self.cards = []
-
- def get_one(self, card):
- """摸一张牌"""
- self.cards.append(card)
-
- def sort(self, comp=lambda card: (card.suite, card.face)):
- """整理手上的牌"""
- self.cards.sort(key=comp)
-
-
- def main():
- """主函数"""
- poker = Poker()
- poker.shuffle()
- players = [Player('东邪'), Player('西毒'), Player('南帝'), Player('北丐')]
- while poker.has_more:
- for player in players:
- player.get_one(poker.deal())
- for player in players:
- player.sort()
- print(player.name, end=': ')
- print(player.cards)
-
-
- if __name__ == '__main__':
- main()
-
Python 使用了自动化内存管理,这种管理机制以引用计数为基础,同时也引入了标记-清除和分代收集两种机制为辅的策略。
- typedef struct_object {
- /* 引用计数 */
- int ob_refcnt;
- /* 对象指针 */
- struct_typeobject *ob_type;
- } PyObject;
-
- /* 增加引用计数的宏定义 */
- #define Py_INCREF(op) ((op)->ob_refcnt++)
- /* 减少引用计数的宏定义 */
- #define Py_DECREF(op) \ //减少计数
- if (--(op)->ob_refcnt != 0) \
- ; \
- else \
- __Py_Dealloc((PyObject *)(op))
-
导致引用计数+1的情况:
导致引用计数-1的情况:
引用计数可能会导致循环引用问题,而循环引用会导致内存泄露,如下面的代码所示。为了解决这个问题,Python中引入了“标记-清除”和“分代收集”。在创建一个对象的时候,对象被放在第一代中,如果在第一代的垃圾检查中对象存活了下来,该对象就会被放到第二代中,同理在第二代的垃圾检查中对象存活下来,该对象就会被放到第三代中。
- # 循环引用会导致内存泄露 - Python除了引用技术还引入了标记清理和分代回收
- # 在Python 3.6以前如果重写__del__魔术方法会导致循环引用处理失效
- # 如果不想造成循环引用可以使用弱引用
- list1 = []
- list2 = []
- list1.append(list2)
- list2.append(list1)
-
以下情况会导致垃圾回收:
如果循环引用中两个对象都定义了__del__方法,gc模块不会销毁这些不可达对象,因为gc模块不知道应该先调用哪个对象的__del__方法,这个问题在Python 3.6中得到了解决。
也可以通过weakref模块构造弱引用的方式来解决循环引用的问题。
有几个小问题请大家思考:
例子:自定义字典限制只有在指定的key不存在时才能在字典中设置键值对。
- class SetOnceMappingMixin():
- """自定义混入类"""
- __slots__ = ()
-
- def __setitem__(self, key, value):
- if key in self:
- raise KeyError(str(key) + ' already set')
- return super().__setitem__(key, value)
-
-
- class SetOnceDict(SetOnceMappingMixin, dict):
- """自定义字典"""
- pass
-
-
- my_dict= SetOnceDict()
- try:
- my_dict['username'] = 'jackfrued'
- my_dict['username'] = 'hellokitty'
- except KeyError:
- pass
- print(my_dict)
-
例子:用元类实现单例模式。
- import threading
-
-
- class SingletonMeta(type):
- """自定义元类"""
-
- def __init__(cls, *args, **kwargs):
- cls.__instance = None
- cls.__lock = threading.Lock()
- super().__init__(*args, **kwargs)
-
- def __call__(cls, *args, **kwargs):
- if cls.__instance is None:
- with cls.__lock:
- if cls.__instance is None:
- cls.__instance = super().__call__(*args, **kwargs)
- return cls.__instance
-
-
- class President(metaclass=SingletonMeta):
- """总统(单例类)"""
- pass
-
说明:上面加粗的字母放在一起称为面向对象的SOLID原则。
例子:可插拔的哈希算法。
- class StreamHasher():
- """哈希摘要生成器(策略模式)"""
-
- def __init__(self, alg='md5', size=4096):
- self.size = size
- alg = alg.lower()
- self.hasher = getattr(__import__('hashlib'), alg.lower())()
-
- def __call__(self, stream):
- return self.to_digest(stream)
-
- def to_digest(self, stream):
- """生成十六进制形式的摘要"""
- for buf in iter(lambda: stream.read(self.size), b''):
- self.hasher.update(buf)
- return self.hasher.hexdigest()
-
- def main():
- """主函数"""
- hasher1 = StreamHasher()
- with open('Python-3.7.1.tgz', 'rb') as stream:
- print(hasher1.to_digest(stream))
- hasher2 = StreamHasher('sha1')
- with open('Python-3.7.1.tgz', 'rb') as stream:
- print(hasher2(stream))
-
-
- if __name__ == '__main__':
- main()
-
- def fib(num):
- """生成器"""
- a, b = 0, 1
- for _ in range(num):
- a, b = b, a + b
- yield a
-
-
- class Fib(object):
- """迭代器"""
-
- def __init__(self, num):
- self.num = num
- self.a, self.b = 0, 1
- self.idx = 0
-
- def __iter__(self):
- return self
-
- def __next__(self):
- if self.idx < self.num:
- self.a, self.b = self.b, self.a + self.b
- self.idx += 1
- return self.a
- raise StopIteration()
-
Python中实现并发编程的三种方案:多线程、多进程和异步I/O。并发编程的好处在于可以提升程序的执行效率以及改善用户体验;坏处在于并发的程序不容易开发和调试,同时对其他程序来说它并不友好。
- """
- 面试题:进程和线程的区别和联系?
- 进程 - 操作系统分配内存的基本单位 - 一个进程可以包含一个或多个线程
- 线程 - 操作系统分配CPU的基本单位
- 并发编程(concurrent programming)
- 1. 提升执行性能 - 让程序中没有因果关系的部分可以并发的执行
- 2. 改善用户体验 - 让耗时间的操作不会造成程序的假死
- """
- import glob
- import os
- import threading
-
- from PIL import Image
-
- PREFIX = 'thumbnails'
-
-
- def generate_thumbnail(infile, size, format='PNG'):
- """生成指定图片文件的缩略图"""
- file, ext = os.path.splitext(infile)
- file = file[file.rfind('/') + 1:]
- outfile = f'{PREFIX}/{file}_{size[0]}_{size[1]}.{ext}'
- img = Image.open(infile)
- img.thumbnail(size, Image.ANTIALIAS)
- img.save(outfile, format)
-
-
- def main():
- """主函数"""
- if not os.path.exists(PREFIX):
- os.mkdir(PREFIX)
- for infile in glob.glob('images/*.png'):
- for size in (32, 64, 128):
- # 创建并启动线程
- threading.Thread(
- target=generate_thumbnail,
- args=(infile, (size, size))
- ).start()
-
-
- if __name__ == '__main__':
- main()
-
多个线程竞争资源的情况
- """
- 多线程程序如果没有竞争资源处理起来通常也比较简单
- 当多个线程竞争临界资源的时候如果缺乏必要的保护措施就会导致数据错乱
- 说明:临界资源就是被多个线程竞争的资源
- """
- import time
- import threading
-
- from concurrent.futures import ThreadPoolExecutor
-
-
- class Account(object):
- """银行账户"""
-
- def __init__(self):
- self.balance = 0.0
- self.lock = threading.Lock()
-
- def deposit(self, money):
- # 通过锁保护临界资源
- with self.lock:
- new_balance = self.balance + money
- time.sleep(0.001)
- self.balance = new_balance
-
-
- class AddMoneyThread(threading.Thread):
- """自定义线程类"""
-
- def __init__(self, account, money):
- self.account = account
- self.money = money
- # 自定义线程的初始化方法中必须调用父类的初始化方法
- super().__init__()
-
- def run(self):
- # 线程启动之后要执行的操作
- self.account.deposit(self.money)
-
- def main():
- """主函数"""
- account = Account()
- # 创建线程池
- pool = ThreadPoolExecutor(max_workers=10)
- futures = []
- for _ in range(100):
- # 创建线程的第1种方式
- # threading.Thread(
- # target=account.deposit, args=(1, )
- # ).start()
- # 创建线程的第2种方式
- # AddMoneyThread(account, 1).start()
- # 创建线程的第3种方式
- # 调用线程池中的线程来执行特定的任务
- future = pool.submit(account.deposit, 1)
- futures.append(future)
- # 关闭线程池
- pool.shutdown()
- for future in futures:
- future.result()
- print(account.balance)
-
-
- if __name__ == '__main__':
- main()
-
修改上面的程序,启动5个线程向账户中存钱,5个线程从账户中取钱,取钱时如果余额不足就暂停线程进行等待。为了达到上述目标,需要对存钱和取钱的线程进行调度,在余额不足时取钱的线程暂停并释放锁,而存钱的线程将钱存入后要通知取钱的线程,使其从暂停状态被唤醒。可以使用threading模块的Condition来实现线程调度,该对象也是基于锁来创建的,代码如下所示:
- """
- 多个线程竞争一个资源 - 保护临界资源 - 锁(Lock/RLock)
- 多个线程竞争多个资源(线程数>资源数) - 信号量(Semaphore)
- 多个线程的调度 - 暂停线程执行/唤醒等待中的线程 - Condition
- """
- from concurrent.futures import ThreadPoolExecutor
- from random import randint
- from time import sleep
-
- import threading
-
-
- class Account():
- """银行账户"""
-
- def __init__(self, balance=0):
- self.balance = balance
- lock = threading.Lock()
- self.condition = threading.Condition(lock)
-
- def withdraw(self, money):
- """取钱"""
- with self.condition:
- while money > self.balance:
- self.condition.wait()
- new_balance = self.balance - money
- sleep(0.001)
- self.balance = new_balance
-
- def deposit(self, money):
- """存钱"""
- with self.condition:
- new_balance = self.balance + money
- sleep(0.001)
- self.balance = new_balance
- self.condition.notify_all()
-
-
- def add_money(account):
- while True:
- money = randint(5, 10)
- account.deposit(money)
- print(threading.current_thread().name,
- ':', money, '====>', account.balance)
- sleep(0.5)
-
-
- def sub_money(account):
- while True:
- money = randint(10, 30)
- account.withdraw(money)
- print(threading.current_thread().name,
- ':', money, '<====', account.balance)
- sleep(1)
-
-
- def main():
- account = Account()
- with ThreadPoolExecutor(max_workers=10) as pool:
- for _ in range(5):
- pool.submit(add_money, account)
- pool.submit(sub_money, account)
-
-
- if __name__ == '__main__':
- main()
-
多进程可以有效的解决GIL的问题,实现多进程主要的类是Process,其他辅助的类跟threading模块中的类似,进程间共享数据可以使用管道、套接字等,在multiprocessing模块中有一个Queue类,它基于管道和锁机制提供了多个进程共享的队列。下面是官方文档上关于多进程和进程池的一个示例。
- """
- 多进程和进程池的使用
- 多线程因为GIL的存在不能够发挥CPU的多核特性
- 对于计算密集型任务应该考虑使用多进程
- time python3 example22.py
- real 0m11.512s
- user 0m39.319s
- sys 0m0.169s
- 使用多进程后实际执行时间为11.512秒,而用户时间39.319秒约为实际执行时间的4倍
- 这就证明我们的程序通过多进程使用了CPU的多核特性,而且这台计算机配置了4核的CPU
- """
- import concurrent.futures
- import math
-
- PRIMES = [
- 1116281,
- 1297337,
- 104395303,
- 472882027,
- 533000389,
- 817504243,
- 982451653,
- 112272535095293,
- 112582705942171,
- 112272535095293,
- 115280095190773,
- 115797848077099,
- 1099726899285419
- ] * 5
-
-
- def is_prime(n):
- """判断素数"""
- if n % 2 == 0:
- return False
-
- sqrt_n = int(math.floor(math.sqrt(n)))
- for i in range(3, sqrt_n + 1, 2):
- if n % i == 0:
- return False
- return True
-
-
- def main():
- """主函数"""
- with concurrent.futures.ProcessPoolExecutor() as executor:
- for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
- print('%d is prime: %s' % (number, prime))
-
-
- if __name__ == '__main__':
- main()
-
说明:多线程和多进程的比较。
以下情况需要使用多线程:
- 程序需要维护许多共享的状态(尤其是可变状态),Python中的列表、字典、集合都是线程安全的,所以使用线程而不是进程维护共享状态的代价相对较小。
- 程序会花费大量时间在I/O操作上,没有太多并行计算的需求且不需占用太多的内存。
以下情况需要使用多进程:
- 程序执行计算密集型任务(如:字节码操作、数据处理、科学计算)。
- 程序的输入可以并行的分成块,并且可以将运算结果合并。
- 程序在内存使用方面没有任何限制且不强依赖于I/O操作(如:读写文件、套接字等)。
从调度程序的任务队列中挑选任务,该调度程序以交叉的形式执行这些任务,我们并不能保证任务将以某种顺序去执行,因为执行顺序取决于队列中的一项任务是否愿意将CPU处理时间让位给另一项任务。异步任务通常通过多任务协作处理的方式来实现,由于执行时间和顺序的不确定,因此需要通过回调式编程或者future对象来获取任务执行的结果。Python 3通过asyncio模块和await和async关键字(在Python 3.7中正式被列为关键字)来支持异步处理。
- """
- 异步I/O - async / await
- """
- import asyncio
-
-
- def num_generator(m, n):
- """指定范围的数字生成器"""
- yield from range(m, n + 1)
-
-
- async def prime_filter(m, n):
- """素数过滤器"""
- primes = []
- for i in num_generator(m, n):
- flag = True
- for j in range(2, int(i ** 0.5 + 1)):
- if i % j == 0:
- flag = False
- break
- if flag:
- print('Prime =>', i)
- primes.append(i)
-
- await asyncio.sleep(0.001)
- return tuple(primes)
-
-
- async def square_mapper(m, n):
- """平方映射器"""
- squares = []
- for i in num_generator(m, n):
- print('Square =>', i * i)
- squares.append(i * i)
-
- await asyncio.sleep(0.001)
- return squares
-
-
- def main():
- """主函数"""
- loop = asyncio.get_event_loop()
- future = asyncio.gather(prime_filter(2, 100), square_mapper(1, 100))
- future.add_done_callback(lambda x: print(x.result()))
- loop.run_until_complete(future)
- loop.close()
-
-
- if __name__ == '__main__':
- main()
-
说明:上面的代码使用get_event_loop函数获得系统默认的事件循环,通过gather函数可以获得一个future对象,future对象的add_done_callback可以添加执行完成时的回调函数,loop对象的run_until_complete方法可以等待通过future对象获得协程执行结果。
Python 中有一个名为 aiohttp 的三方库,它提供了异步的HTTP客户端和服务器,这个三方库可以跟asyncio模块一起工作,并提供了对Future对象的支持。Python 3.6中引入了async和await来定义异步执行的函数以及创建异步上下文,在Python 3.7中它们正式成为了关键字。下面的代码异步的从5个URL中获取页面并通过正则表达式的命名捕获组提取了网站的标题。
- import asyncio
- import re
-
- import aiohttp
-
- PATTERN = re.compile(r'\<title\>(?P<title>.*)\<\/title\>')
-
-
- async def fetch_page(session, url):
- async with session.get(url, ssl=False) as resp:
- return await resp.text()
-
-
- async def show_title(url):
- async with aiohttp.ClientSession() as session:
- html = await fetch_page(session, url)
- print(PATTERN.search(html).group('title'))
-
-
- def main():
- urls = ('https://www.python.org/',
- 'https://git-scm.com/',
- 'https://www.jd.com/',
- 'https://www.taobao.com/',
- 'https://www.douban.com/')
- loop = asyncio.get_event_loop()
- tasks = [show_title(url) for url in urls]
- loop.run_until_complete(asyncio.wait(tasks))
- loop.close()
-
-
- if __name__ == '__main__':
- main()
-
说明:异步I/O与多进程的比较。
当程序不需要真正的并发性或并行性,而是更多的依赖于异步处理和回调时,asyncio就是一种很好的选择。如果程序中有大量的等待与休眠时,也应该考虑asyncio,它很适合编写没有实时数据处理需求的Web应用服务器。
Python还有很多用于处理并行任务的三方库,例如:joblib、PyMP等。实际开发中,要提升系统的可扩展性和并发性通常有垂直扩展(增加单个节点的处理能力)和水平扩展(将单个节点变成多个节点)两种做法。可以通过消息队列来实现应用程序的解耦合,消息队列相当于是多线程同步队列的扩展版本,不同机器上的应用程序相当于就是线程,而共享的分布式消息队列就是原来程序中的Queue。消息队列(面向消息的中间件)的最流行和最标准化的实现是AMQP(高级消息队列协议),AMQP源于金融行业,提供了排队、路由、可靠传输、安全等功能,最著名的实现包括:Apache的ActiveMQ、RabbitMQ等。
要实现任务的异步化,可以使用名为Celery的三方库。Celery是Python编写的分布式任务队列,它使用分布式消息进行工作,可以基于RabbitMQ或Redis来作为后端的消息代理。