imap函数python imapexample

进程池Pool的imap方法解析

Python中,multiprocessing库中Pool类代表进程池,其对象有imap()和imap_unordered()方法。

在林周等地区,都构建了全面的区域性战略布局,加强发展的系统性、市场前瞻性、产品创新能力,以专注、极致的服务理念,为客户提供成都网站制作、成都网站设计 网站设计制作按需定制开发,公司网站建设,企业网站建设,品牌网站制作,全网营销推广,成都外贸网站建设,林周网站建设费用合理。

两者都用于对大量数据遍历多进程计算,返回一个迭代器(multiprocessing.pool.IMapIterator)。

imap返回结果顺序和输入相同,imap_unordered则为不保证顺序。

经过测试,发现Python多进程和imap()的一些特性:

1、 iter = pool.imap(fn, data) 一旦生成,无论使不使用iter,多进程计算都会开始。

计算结果会缓存在内存中,所以要注意内存用尽的问题。

2、fn,即执行函数,不可以是局部对象(不能嵌套在其他函数里),否则会报错:

AttributeError: Can't pickle local object 'fn_outer.locals.fn'

3、使用进程池map数据时,如果每次的运算量很小,最后的效率还不如单进程。这时多进程切换造成的开销已大于多进程计算提升的效率。

这时,可以将输入数据集分段,每次map,计算一段。具体分段多大时获得最佳效率,需要实际测试。

4、注意,Pool使用完毕后必须关闭,否则进程不会退出。

有两种写法,推荐第2种:

注意 ,第二种中,必须在with的块内使用iter。

Python判断列表是否已排序的各种方法及其性能

本节判断列表排序的函数名格式为IsListSorted_XXX()。为简洁起见,除代码片段及其输出外,一律以_XXX()指代。

2.1 guess

def IsListSorted_guess(lst):

listLen = len(lst) if listLen = 1: return True

#由首个元素和末尾元素猜测可能的排序规则

if lst[0] == lst[-1]: #列表元素相同

for elem in lst: if elem != lst[0]: return False

elif lst[0] lst[-1]: #列表元素升序

for i, elem in enumerate(lst[1:]): if elem lst[i]: return False

else: #列表元素降序

for i, elem in enumerate(lst[1:]): if elem lst[i]: return False

return True

_guess()是最通用的实现,几乎与语言无关。值得注意的是,该函数内会猜测给定列表可能的排序规则,因此无需外部调用者指明排序规则。

2.2 sorted

def IsListSorted_sorted(lst):

return sorted(lst) == lst or sorted(lst, reverse=True) == lst

_sorted()使用Python内置函数sorted()。由于sorted()会对未排序的列表排序,_sorted()函数主要适用于已排序列表。

若想判断列表未排序后再对其排序,不如直接调用列表的sort()方法,因为该方法内部会判断列表是否排序。对于已排序列表,该方法的时间复杂度为线性阶O(n)——判断为O(n)而排序为O(nlgn)。

2.3 for-loop

def IsListSorted_forloop(lst, key=lambda x, y: x = y):

for i, elem in enumerate(lst[1:]): #注意,enumerate默认迭代下标从0开始

if not key(lst[i], elem): #if elem lst[i]更快,但通用性差

return False

return True

无论列表是否已排序,本函数的时间复杂度均为线性阶O(n)。注意,参数key表明缺省的排序规则为升序。

2.4 all

def IsListSorted_allenumk(lst, key=lambda x, y: x = y):

return all(key(lst[i], elem) for i, elem in enumerate(lst[1:]))import operatordef IsListSorted_allenumo(lst, oCmp=operator.le):

return all(oCmp(lst[i], elem) for i, elem in enumerate(lst[1:]))def IsListSorted_allenumd(lst):

return all((lst[i] = elem) for i, elem in enumerate(lst[1:]))def IsListSorted_allxran(lst, key=lambda x,y: x = y):

return all(key(lst[i],lst[i+1]) for i in xrange(len(lst)-1))def IsListSorted_allzip(lst, key=lambda x,y: x = y):

from itertools import izip #Python 3中zip返回生成器(generator),而izip被废弃

return all(key(a, b) for (a, b) in izip(lst[:-1],lst[1:]))

lambda表达式与operator运算符速度相当,前者简单灵活,后者略为高效(实测并不一定)。但两者速度均不如列表元素直接比较(可能存在调用开销)。亦即,_allenumd()快于_allenumo()快于_allenumk()。

若使用lambda表达式指示排序规则,更改规则时只需要改变x和y之间的比较运算符;若使用operator模块指示排序规则,更改规则时需要改变对象比较方法。具体地,lt(x, y)等效于x y,le(x, y)等效于x = y,eq(x, y)等效于x == y,ne(x, y)等效于x != y,gt(x, y)等效于x y,ge(x, y)等效于x = y。例如,_allenumo()函数若要严格升序可设置oCmp=operator.lt。

此外,由all()函数的帮助信息可知,_allenumk()其实是_forloop()的等效形式。

2.5 numpy

def IsListSorted_numpy(arr, key=lambda dif: dif = 0):

import numpy try: if arr.dtype.kind == 'u': #无符号整数数组执行np.diff时存在underflow风险

arr = numpy.int64(lst) except AttributeError: pass #无dtype属性,非数组

return (key(numpy.diff(arr))).all() #numpy.diff(x)返回相邻数组元素的差值构成的数组

NumPy是用于科学计算的Python基础包,可存储和处理大型矩阵。它包含一个强大的N维数组对象,比Python自身的嵌套列表结构(nested list structure)高效得多。第三节的实测数据表明,_numpy()处理大型列表时性能非常出色。

在Windows系统中可通过pip install numpy命令安装NumPy包,不建议登录官网下载文件自行安装。

2.6 reduce

def IsListSorted_reduce(iterable, key=lambda x, y: x = y):

cmpFunc = lambda x, y: y if key(x, y) else float('inf') return reduce(cmpFunc, iterable, .0) float('inf')

reduce实现是all实现的变体。累加器(accumulator)中仅存储最后一个检查的列表元素,或者Infinity(若任一元素小于前个元素值)。

前面2.1~2.5小节涉及下标操作的函数适用于列表等可迭代对象(Iterable)。对于通用迭代器(Iterator)对象,即可以作用于next()函数或方法的对象,可使用_reduce()及后面除_rand()外各小节的函数。迭代器的计算是惰性的,只有在需要返回下一个数据时才会计算,以避免不必要的计算。而且,迭代器方式无需像列表那样切片为两个迭代对象。

2.7 imap

def IsListSorted_itermap(iterable, key=lambda x, y: x = y):

from itertools import imap, tee

a, b = tee(iterable) #为单个iterable创建两个独立的iterator

next(b, None) return all(imap(key, a, b))

2.8 izip

def IsListSorted_iterzip(iterable, key=lambda x, y: x = y):

from itertools import tee, izip

a, b = tee(iterable) next(b, None) return all(key(x, y) for x, y in izip(a, b))def pairwise(iterable):

from itertools import tee, izip

a, b = tee(iterable) next(b, None) return izip(a, b) #"s - (s0,s1), (s1,s2), (s2, s3), ..."def IsListSorted_iterzipf(iterable, key=lambda x, y: x = y):

return all(key(a, b) for a, b in pairwise(iterable))

第三节的实测数据表明,虽然存在外部函数调用,_iterzipf()却比_iterzip()略为高效。

2.9 fast

def IsListSorted_fastd(lst):

it = iter(lst) try:

prev = it.next() except StopIteration: return True

for cur in it: if prev cur: return False

prev = cur return Truedef IsListSorted_fastk(lst, key=lambda x, y: x = y):

it = iter(lst) try:

prev = it.next() except StopIteration: return True

for cur in it: if not key(prev, cur): return False

prev = cur return True

_fastd()和_fastk()是Stack Overflow网站回答里据称执行最快的。实测数据表明,在列表未排序时,它们的性能表现确实优异。

2.10 random

import randomdef IsListSorted_rand(lst, randNum=3, randLen=100):

listLen = len(lst) if listLen = 1: return True

#由首个元素和末尾元素猜测可能的排序规则

if lst[0] lst[-1]: #列表元素升序

key = lambda dif: dif = 0

else: #列表元素降序或相等

key = lambda dif: dif = 0

threshold, sortedFlag = 10000, True

import numpy if listLen = threshold or listLen = randLen*2 or not randNum: return (key(numpy.diff(numpy.array(lst)))).all() from random import sample for i in range(randNum):

sortedRandList = sorted(sample(xrange(listLen), randLen))

flag = (key(numpy.diff(numpy.array([lst[x] for x in sortedRandList])))).all()

sortedFlag = sortedFlag and flag return sortedFlag

_rand()借助随机采样降低运算规模,并融入其他判断函数的优点。例如,猜测列表可能的排序规则,并在随机采样不适合时使用相对快速的判断方式,如NumPy。

通过line_profiler分析可知,第20行和第21行与randLen有关,但两者耗时接近。因此randLen应小于listLen的一半,以抵消sorted开销。除内部限制外,用户可以调节随机序列个数和长度,如定制单个但较长的序列。

注意,_rand()不适用于存在微量异常数据的长列表。因为这些数据很可能被随机采样遗漏,从而影响判断结果的准确性。

我用python通过imap收取邮件时为什么能收取

想用python做一个很简单的接收邮件的功能,只看python的官方doc()真的很不好懂,经过google之,探索之,稍微总结一下:

要使用imap接收邮件,当然要导入imaplib拉.

import imaplib

然后按常规的,建立链接→登录

conn = imaplib.IMAP4("imap.xxx.com",143)

conn.login("userName","password")

然后我想查看收件箱的邮件,咋办呢?要先选择一个目录,收件箱默认名称是"INBOX",IMAP是支持创建文件夹,查看其它文件夹的,如果是自己新建的文件夹,那么名称一般会是"INBOX.新建文件夹",不同的邮箱可能表示方式不一样,如果你不知道的话,那运行conn.list()查看所有的文件夹.

conn.select("INBOX")

选择后,然后查看文件夹,注意,IMAP的查看其实是一个搜索的过程,IMAP的原始命令是search all(大概的),在python里这么用:

type, data = conn.search(None, 'ALL')

然后返回的是这个收件箱里所有邮件的编号,按接收时间升序排列,最后的表示最近.

search这个很鬼麻烦,因为官方文档里没讲这个函数的第二个参数怎么用,于是找了下,可以填的命令有:

于是如果我想找Essh邮件的话,使用

type, data = conn.search(None, '(SUBJECT "Essh")')

里面要用一个括号,代表是一个查询条件,可以同时指定多个查询条件,例如FROM xxxx SUBJECT "aaa",注意,命令要用括号罩住(痛苦的尝试)

search第一个参数是charset的意思,填None表示用默认ASCII,

data里获取到的是一个只有一个字符串元素的数组,包含很多数字,用空格隔开

['1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103'

于是想获取最后一封的做法是:

msgList = data[0].split()

last = msgList[len(msgList) - 1]

然后把那个邮件获取回来,用fetch函数

例子:

conn.fetch(last, '(RFC822.SIZE BODY[HEADER.FIELDS (SUBJECT)])')

但是返回的是一串MIME编码的东东,看不懂,如果能像eml那一操作一封邮件就好了.

方法是有的,用email库.

import email

然后以RFC822获取邮件格式,再用email.message_from_string转换为message对象.就可以当message操作了,()

type,data=connect.fetch(msgList[len(msgList)-1],'(RFC822)')

msg=email.message_from_string(data[0][1])

content=msg.get_payload(decode=True)

最后content得到就是邮件的内容了

IMAP 电子邮件系统函数库中的 $forwarded 命令 IMAP 命令

您好,

imap_append : 附加字符串到指定的邮箱中。

imap_base64 : 解 BASE64 编码。

imap_body : 读信的内文。

imap_check : 返回邮箱信息。

: 关闭 IMAP 链接。

imap_createmailbox : 建立新的信箱。

imap_delete : 标记欲删除邮件。

imap_deletemailbox : 删除既有信箱。

imap_expunge : 删除已标记的邮件。

imap_fetchbody : 从信件内文取出指定部分。

imap_fetchstructure : 获取某信件的结构信息。

imap_header : 获取某信件的标头信息。

imap_headers : 获取全部信件的标头信息。

imap_listmailbox : 获取邮箱列示。

imap_listsubscribed : 获取订阅邮箱列示。

imap_mail_copy : 复制指定信件到它处邮箱。

imap_mail_move : 移动指定信件到它处邮箱。

imap_num_msg : 取得信件数。

imap_num_recent : 取得新进信件数。

imap_open : 打开 IMAP 链接。

imap_ping : 检查 IMAP 是否连接。

imap_renamemailbox : 更改邮箱名字。

imap_reopen : 重开 IMAP 链接。

imap_subscribe : 订阅邮箱。

imap_undelete : 取消删除邮件标记。

imap_unsubscribe : 取消订阅邮箱。

imap_qprint : 将 qp 编码转成八位。

imap_8bit : 将八位转成 qp 编码。

imap_binary : 将八位转成 base64 编码。

imap_scanmailbox : 寻找信件有无特定字符串。

imap_mailboxmsginfo : 取得目前邮箱的信息。

imap_rfc822_write_address : 电子邮件位址标准化。

imap_rfc822_parse_adrlist : 解析电子邮件位址。

imap_setflag_full : 配置信件标志。

imap_clearflag_full : 清除信件标志。

imap_sort : 将信件标头排序。

imap_fetchheader : 取得原始标头。

imap_uid : 取得信件 UID。

imap_getmailboxes : 取得全部信件详细信息。

imap_getsubscribed : 列出所有订阅邮箱。

imap_msgno : 列出 UID 的连续信件。

imap_search : 搜寻指定标准的信件。

imap_last_error : 最后的错误信息。

imap_errors : 所有的错误信息。

imap_alerts : 所有的警告信息。

imap_status : 目前的状态信息。

Python 多进程内存占用问题

当我们有一个很长很长的任务队列(mission_list)和阈值对应的一个处理函数(missionFunction)时,我们一般采用如下的方式进行处理:

但是,如果这任务列表很长很长,处理函数很复杂(占用cpu)时,单核往往需要很长的时间进行处理,此时,Multiprocess便可以极大的提高我们程序的运行速度,相关内容请借鉴 multiprocessing --- 基于进程的并行 — Python 3.10.4 文档。

以上这种场景下,推荐大家采用最简单的进程池+map的方法进行处理,标准的写法, chunksize要借鉴官方的说法,最好大一点 :

但是!!!! 如果我们的任务列表非常的长,这会导致多进程还没跑起来之前,内存已经撑爆了,任务自然没法完成,此时我们有几种办法进行优化:

进程的启动方法有三种,可参考官方文档:

[图片上传失败...(image-48cd3c-1650511153989)]

在linux环境下,使用forkserver可以节省很多的内存空间, 因为进程启动的是一个服务,不会把主进程的数据全部复制

采用imap会极大的节省空间,它返回的是一个迭代器,也就是结果列表:

但注意,以上写法中,你写的结果迭代部分必须写在with下面。或者采用另一种写法:

还有最后一种,当你的mission list实在太大了,导致你在生成 mission list的时候已经把内存撑爆了,这个时候就得优化 mission_list了,如果你的mission_list是通过一个for循环生成的,你可以使用yield字段,将其封装为一个迭代器,传入进程池:

这样子,我们就封装好了mission_list,它是一个可迭代对象,在取数据的时候才会将数据拉到内存

我在项目中结合了后两种方法,原本256G的内存都不够用,但在修改后内存只占用了不到10G。希望能够帮助到你


网站名称:imap函数python imapexample
URL地址:http://ybzwz.com/article/hijdcj.html