您当前的位置:首页 > IT编程 > python
| C语言 | Java | VB | VC | python | Android | TensorFlow | C++ | oracle | 学术与代码 | cnn卷积神经网络 | gnn | 图像修复 | Keras | 数据集 | Neo4j | 自然语言处理 | 深度学习 | 医学CAD | 医学影像 | 超参数 | pointnet | pytorch |

自学教程:python常用小脚本实例总结

51自学网 2022-07-22 18:48:22
  python
这篇教程python常用小脚本实例总结写得很实用,希望能帮到您。

前言

日常生活中常会遇到一些小任务,如果人工处理会很麻烦。

用python做些小脚本处理,能够提高不少效率。或者可以把python当工具使用,辅助提高一下办公效率。(比如我常拿python当计算器,计算和字符转换用)

以下总结下个人用到的一些python小脚本留作备忘。

打印16进制字符串

用途:通信报文中的hex数据不好看,可以打印为16进制的字符串显示出来。

#coding=utf-8#name: myutil.pydef print_hex1(s,prev='0x'):    for c in s:        print '%s%02x' %(prev,ord(c)),    printdef print_hex(s):    for c in s:        print '%02x' %(ord(c)),    print    print 'myutil'    def print_hex3(s,prev='0x'):    i = 0    for c in s:        print '%s%s,' %(prev,s[i:i+2]),        i += 2    print

文件合并

之前搞单片机时生成的hex应用程序文件不能直接刷到单片机里,还需要把iap程序合并成一个文件才能烧写到单片机。每次打包麻烦,做个脚本处理:

#path='C://Users//test//IAP_CZ_v204w.hex'#file=open(path,'r')#for ll in file.readlines()#    print ll#coding=gb18030import timeimport osdef prr():    print 'file combination begin..' path0=os.getcwd()print path0path=path0#path1=path0path2=path0path+='//IAP_CZ_v204w.hex'#path1+='//NC_armStaSystem.hex'path2+='//'print paths=raw_input('enter file path:')path1=s#path1+='//NC_armStaSystem.hex'print path1s=raw_input('enter file name:')path2+=spath2+=time.strftime('_%y%m%d%H%M%S')path2+='.hex'print path2prr()try:    f1=open(path,'r')    count=0    for l in f1.readlines():    #    print l        count+=1        #print count    f1.close()    f1=open(path,'r')    f2=open(path1,'r')    f3=open(path2,'w')    while(count>1):        l=f1.readline()    #   print l        f3.write(l)        count-=1    #   print count    f3.flush()    for l in f2.readlines():        f3.write(l)    f3.flush()    f3.close()    print 'combination success!'except Exception,ex:    print 'excettion occured!'    print ex    s=raw_input('press any key to continue...')    finally:    f1.close()    f2.close()    s=raw_input('press any key to continue...')    

多线程下载图集

网上好看的动漫图集,如果手工下载太费时了。简单分析下网页地址规律,写个多线程脚本搞定。

#!/usr/bin/python# -*- coding: utf-8 -*-# filename: paxel.py '''It is a multi-thread downloading tool     It was developed follow axel.        Author: volans        E-mail: volansw [at] gmail.com''' import sysimport osimport timeimport urllibfrom threading import Thread local_proxies = {'http': 'http://131.139.58.200:8080'} class AxelPython(Thread, urllib.FancyURLopener):    '''Multi-thread downloading class.         run() is a vitural method of Thread.    '''    def __init__(self, threadname, url, filename, ranges=0, proxies={}):        Thread.__init__(self, name=threadname)        urllib.FancyURLopener.__init__(self, proxies)        self.name = threadname        self.url = url        self.filename = filename        self.ranges = ranges        self.downloaded = 0     def run(self):        '''vertual function in Thread'''        try:            self.downloaded = os.path.getsize( self.filename )        except OSError:            #print 'never downloaded'            self.downloaded = 0         # rebuild start poind        self.startpoint = self.ranges[0] + self.downloaded                 # This part is completed        if self.startpoint >= self.ranges[1]:            print 'Part %s has been downloaded over.' % self.filename            return                 self.oneTimeSize = 16384 #16kByte/time        print 'task %s will download from %d to %d' % (self.name, self.startpoint, self.ranges[1])         self.addheader("Range", "bytes=%d-%d" % (self.startpoint, self.ranges[1]))                     self.urlhandle = self.open( self.url )         data = self.urlhandle.read( self.oneTimeSize )        while data:            filehandle = open( self.filename, 'ab+' )            filehandle.write( data )            filehandle.close()             self.downloaded += len( data )            #print "%s" % (self.name)            #progress = u'/r...'             data = self.urlhandle.read( self.oneTimeSize )         def GetUrlFileSize(url, proxies={}):    urlHandler = urllib.urlopen( url, proxies=proxies )    headers = urlHandler.info().headers    length = 0    for header in headers:        if header.find('Length') != -1:            length = header.split(':')[-1].strip()            length = int(length)    return length def SpliteBlocks(totalsize, blocknumber):    blocksize = totalsize/blocknumber    ranges = []    for i in range(0, blocknumber-1):        ranges.append((i*blocksize, i*blocksize +blocksize - 1))    ranges.append(( blocksize*(blocknumber-1), totalsize -1 ))     return rangesdef islive(tasks):    for task in tasks:        if task.isAlive():            return True    return False def paxel(url, output, blocks=6, proxies=local_proxies):    ''' paxel    '''    size = GetUrlFileSize( url, proxies )    ranges = SpliteBlocks( size, blocks )     threadname = [ "thread_%d" % i for i in range(0, blocks) ]    filename = [ "tmpfile_%d" % i for i in range(0, blocks) ]       tasks = []    for i in range(0,blocks):        task = AxelPython( threadname[i], url, filename[i], ranges[i] )        task.setDaemon( True )        task.start()        tasks.append( task )             time.sleep( 2 )    while islive(tasks):        downloaded = sum( [task.downloaded for task in tasks] )        process = downloaded/float(size)*100        show = u'/rFilesize:%d Downloaded:%d Completed:%.2f%%' % (size, downloaded, process)        sys.stdout.write(show)        sys.stdout.flush()        time.sleep( 0.5 )                 filehandle = open( output, 'wb+' )    for i in filename:        f = open( i, 'rb' )        filehandle.write( f.read() )        f.close()        try:            os.remove(i)            pass        except:            pass     filehandle.close() if __name__ == '__main__':    url = "http://xz1.mm667.com/xz84/images/001.jpg"    output = '001.jpg'    paxel( url, output, blocks=4, proxies={} )

多线程下载图片

多线程下载图片并存储到指定目录中,若目录不存在则自动创建。

# -*- coding: UTF-8 -*-'''import reimport urlliburls='http://xz5.mm667.com/xz82/images/01.jpg'def getHtml(url):    page = urllib.urlopen(url)    html = page.read()    return htmldef getImg(html):    reg = r'src="(.+?/.jpg)" pic_ext'    imgre = re.compile(reg)    imglist = imgre.findall(html)    x = 0    for imgurl in imglist:        urllib.urlretrieve(imgurl,'%s.jpg' % x)        x = x + 1html = getHtml("http://tieba.baidu.com/p/2460150866")getImg(html)'''import reimport urllibimport threadingimport timeimport socketsocket.setdefaulttimeout(30)urls=[]j=0for i in xrange(1,81):    if (i-1)%4 == 0:        j += 1    if ((j-1)%5) == 0 :        j=1    site='http://xz%d.mm667.com/xz%02d/images/' %(j,i)    urls.append(site)    print urls[i-1]#print urls'''    urls.append('http://xz1.mm667.com/xz01/images/')urls.append('http://xz1.mm667.com/xz02/images/')urls.append('http://xz1.mm667.com/xz03/images/')urls.append('http://xz1.mm667.com/xz04/images/')urls.append('http://xz1.mm667.com/xz84/images/')urls.append('http://xz2.mm667.com/xz85/images/')urls.append('http://xz3.mm667.com/xz86/images/')urls.append('http://xz1.mm667.com/s/')urls.append('http://xz1.mm667.com/p/')'''def mkdir(path):    # 引入模块    import os    # 去除首位空格    path=path.strip()    # 去除尾部 / 符号    path=path.rstrip("//")    # 判断路径是否存在    # 存在     True    # 不存在   False    isExists=os.path.exists(path)    # 判断结果    if not isExists:        # 如果不存在则创建目录        print path+u' 创建成功'        # 创建目录操作函数        os.makedirs(path)        return True    else:        # 如果目录存在则不创建,并提示目录已存在        print path+u' 目录已存在'        return False def cbk(a,b,c):    '''''回调函数    @a: 已经下载的数据块    @b: 数据块的大小    @c: 远程文件的大小    '''    per = 100.0 * a * b / c    if per > 100:        per = 100    print '%.2f%%' % per  #url = 'http://www.sina.com.cn'local = 'd://mysite//pic1//'d=0mutex = threading.Lock()# mutex1 = threading.Lock()class MyThread(threading.Thread):    def __init__(self, url, name):        threading.Thread.__init__(self)        self.url=url        self.name=name    def run(self):        mutex.acquire()        print        print 'down from %s' % self.url        time.sleep(1)        mutex.release()        try:            urllib.urlretrieve(self.url, self.name)        except Exception,e:            print e            time.sleep(1)            urllib.urlretrieve(self.url, self.name)threads=[]     for u in urls[84:]:    d += 1    local = 'd://mysite//pic1//%d//' %d    mkdir(local)    print 'download begin...'    for i in xrange(40):        lcal = local        url=u        url += '%03d.jpg' %i        lcal += '%03d.jpg' %i        th = MyThread(url,lcal)        threads.append(th)        th.start()# for t in threads:#     t.join()print 'over! download finished'

爬虫抓取信息

#!/usr/bin/env python# -*- coding:utf-8 -*-"""Python爬虫,抓取一卡通相关企业信息Anthor: yangyongzhenVersion: 0.0.2Date: 2014-12-14Language: Python2.7.5Editor: Sublime Text2""" import urllib2, re, stringimport threading, Queue, timeimport sysimport osfrom bs4 import BeautifulSoup#from pprint import pprint reload(sys)sys.setdefaultencoding('utf8')_DATA = []FILE_LOCK = threading.Lock()SHARE_Q = Queue.Queue()  #构造一个不限制大小的的队列_WORKER_THREAD_NUM = 3  #设置线程的个数 _Num = 0 #总条数class MyThread(threading.Thread) :     def __init__(self, func,num) :        super(MyThread, self).__init__()  #调用父类的构造函数        self.func = func  #传入线程函数逻辑        self.thread_num = num      def run(self) :        self.func()        #print u'线程ID:',self.thread_num def worker() :    global SHARE_Q    while not SHARE_Q.empty():        url = SHARE_Q.get() #获得任务        my_page = get_page(url)        find_data(my_page)  #获得当前页面的数据        #write_into_file(temp_data)        time.sleep(1)        SHARE_Q.task_done() def get_page(url) :    """    根据所给的url爬取网页HTML    Args:         url: 表示当前要爬取页面的url    Returns:        返回抓取到整个页面的HTML(unicode编码)    Raises:        URLError:url引发的异常    """    try :        html = urllib2.urlopen(url).read()        my_page = html.decode("gbk",'ignore')        #my_page = unicode(html,'utf-8','ignore').encode('utf-8','ignore')        #my_page = urllib2.urlopen(url).read().decode("utf8")    except urllib2.URLError, e :        if hasattr(e, "code"):            print "The server couldn't fulfill the request."            print "Error code: %s" % e.code        elif hasattr(e, "reason"):            print "We failed to reach a server. Please check your url and read the Reason"            print "Reason: %s" % e.reason    return my_page def find_data(my_page) :    """    通过返回的整个网页HTML, 正则匹配名称        Args:        my_page: 传入页面的HTML文本用于正则匹配    """    global _Num    temp_data = []    items = BeautifulSoup(my_page).find_all("div", style="width:96%;margin:10px;border-bottom:1px #CCC dashed;padding-bottom:10px;")    for index, item in enumerate(items) :        #print item        #print item.h1        #print h.group()        #temp_data.append(item)        #print item.find(re.compile("^a"))        href = item.find(re.compile("^a"))        #soup = BeautifulSoup(item)        #公司名称        if item.a:            data = item.a.string.encode("gbk","ignore")            print data            temp_data.append(data)         goods = item.find_all("div", style="font-size:12px;")                #经营产品与联系方式        for i in goods:            data = i.get_text().encode("gbk","ignore")            temp_data.append(data)            print data        #b = item.find_all("b")        #print b        #链接地址        pat = re.compile(r'href="([^"]*)"')        h = pat.search(str(item))        if h:            #print h.group(0)            href = h.group(1)            print href            temp_data.append(h.group(1))         _Num += 1        #b = item.find_all(text=re.compile("Dormouse"))        #pprint(goods)        #print href        #pat = re.compile(r'title="([^"]*)"')        #h = pat.search(str(href))        #if h:            #print h.group(1)            #temp_data.append(h.group(1))    _DATA.append(temp_data) #headers = {'User-Agent':"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/22.0.1207.1 Safari/537.1"}##浏览器请求头(大部分网站没有这个请求头会报错、请务必加上哦)#all_url = 'http://www.mzitu.com/all'  ##开始的URL地址#start_html = requests.get(all_url,  headers=headers)  ##使用requests中的get方法来获取all_url(就是:http://www.mzitu.com/all这个地址)的内容 headers为上面设置的请求头、请务必参考requests官方文档解释#print(start_html.text) ##打印出start_html (请注意,concent是二进制的数据,一般用于下载图片、视频、音频、等多媒体内容是才使用concent, 对于打印网页内容请使用text) def main() :    global SHARE_Q    threads = []    start = time.clock()    douban_url = "http://company.yktworld.com/comapny_search.asp?page={page}"    #向队列中放入任务, 真正使用时, 应该设置为可持续的放入任务    for index in xrange(20) :           SHARE_Q.put(douban_url.format(page = index * 1))    for i in xrange(_WORKER_THREAD_NUM) :        thread = MyThread(worker,i)        thread.start()  #线程开始处理任务         threads.append(thread)    for thread in threads :        thread.join()    SHARE_Q.join()    i = 0    with open("down.txt", "w+") as my_file :        for page in _DATA :            i += 1            for name in page:                my_file.write(name + "/n")            print "Spider Successful!!!"    end = time.clock()    print u'抓取完成!'    print u'总页数:',i    print u'总条数:',_Num    print u'一共用时:',end-start,u'秒' if __name__ == '__main__':    main()

爬虫多线程下载电影名称

#!/usr/bin/env python# -*- coding:utf-8 -*-"""Python爬虫Anthor: yangyongzhenVersion: 0.0.2Date: 2014-12-14Language: Python2.7.8Editor: Sublime Text2""" import urllib2, re, stringimport threading, Queue, timeimport sysimport osfrom bs4 import BeautifulSoup reload(sys)sys.setdefaultencoding('utf8')_DATA = []FILE_LOCK = threading.Lock()SHARE_Q = Queue.Queue()  #构造一个不限制大小的的队列_WORKER_THREAD_NUM = 3  #设置线程的个数 rootpath = os.getcwd()+u'/抓取的内容/' def makedir(path):    if not os.path.isdir(path):        os.makedirs(path) #创建抓取的根目录#makedir(rootpath)#显示下载进度def Schedule(a,b,c):    '''''    a:已经下载的数据块    b:数据块的大小    c:远程文件的大小   '''    per = 100.0 * a * b / c    if per > 100 :        per = 100    print '%.2f%%' % perclass MyThread(threading.Thread) :     def __init__(self, func) :        super(MyThread, self).__init__()  #调用父类的构造函数        self.func = func  #传入线程函数逻辑     def run(self) :        self.func() def worker() :    print 'work thread start.../n'    global SHARE_Q    while not SHARE_Q.empty():        url = SHARE_Q.get() #获得任务        my_page = get_page(url)        find_title(my_page)  #获得当前页面的电影名        #write_into_file(temp_data)        time.sleep(1)        SHARE_Q.task_done() def get_page(url) :    """    根据所给的url爬取网页HTML    Args:         url: 表示当前要爬取页面的url    Returns:        返回抓取到整个页面的HTML(unicode编码)    Raises:        URLError:url引发的异常    """    try :        html = urllib2.urlopen(url).read()        my_page = html.decode("utf8")        #my_page = unicode(html,'utf-8','ignore').encode('utf-8','ignore')        #my_page = urllib2.urlopen(url).read().decode("utf8")    except urllib2.URLError, e :        if hasattr(e, "code"):            print "The server couldn't fulfill the request."            print "Error code: %s" % e.code        elif hasattr(e, "reason"):            print "We failed to reach a server. Please check your url and read the Reason"            print "Reason: %s" % e.reason    return my_page def find_title(my_page) :    """    通过返回的整个网页HTML, 正则匹配前100的电影名称        Args:        my_page: 传入页面的HTML文本用于正则匹配    """    temp_data = []    movie_items = BeautifulSoup(my_page).findAll('h1')    for index, item in enumerate(movie_items) :        #print item        #print item.h1        pat = re.compile(r'href="([^"]*)"')        h = pat.search(str(item))        if h:            #print h.group(0)            href = h.group(1)            print href            temp_data.append(h.group(1))        #print h.group()        #temp_data.append(item)        #print item.find(re.compile("^a"))        href = item.find(re.compile("^a"))        #soup = BeautifulSoup(item)        if item.a:            #print item.a.string            temp_data.append(item.a.string)        #print href        #pat = re.compile(r'title="([^"]*)"')        #h = pat.search(str(href))        #if h:            #print h.group(1)            #temp_data.append(h.group(1))    _DATA.append(temp_data)  def main() :    global SHARE_Q    threads = []    start = time.clock()    douban_url = "http://movie.misszm.com/page/{page}"    #向队列中放入任务, 真正使用时, 应该设置为可持续的放入任务    for index in xrange(5) :           SHARE_Q.put(douban_url.format(page = index * 1))    for i in xrange(_WORKER_THREAD_NUM) :        thread = MyThread(worker)        thread.start()  #线程开始处理任务        threads.append(thread)    for thread in threads :        thread.join()    SHARE_Q.join()    with open("movie.txt", "w+") as my_file :        for page in _DATA :            for movie_name in page:                my_file.write(movie_name + "/n")    print "Spider Successful!!!"    end = time.clock()    print u'抓取完成!'    print u'一共用时:',end-start,u'秒' if __name__ == '__main__':    main()

串口转tcp工具

#coding=utf-8#author:yangyongzhen#QQ:534117529#'CardTest TcpServer  - Simple Test Card Tool 1.00'  import sys,threading,time;import serial;import binascii,encodings;import re;import os;from socket import *from struct import *;#from myutil import *;#name: myutil.py mylock = threading.RLock()  Server_IP = ''Srever_Port = '' def print_hex1(s,prev='0x'):    for c in s:        print '%s%02x' %(prev,ord(c)),    printdef print_hex(s):    for c in s:        print '%02x' %(ord(c)),    print def hexto_str(s):    r =''    for c in s:        r += '%02x' %(ord(c))    return rdef strto_hex(s):    r = s.decode('hex')    return r#''代表服务器为localhost #在一个非保留端口号上进行监听  class ComThread:    def __init__(self, Port=0):        self.l_serial = None;        self.alive = False;        self.waitEnd = None;        self.port = Port;         #TCP部分        #self.sockobj = socket.socket(socket.AF_INET, socket.SOCK_STREAM)        self.connection = None        #数据        self.snddata = ''        self.rcvdata = ''     def waiting(self):        if not self.waitEnd is None:            self.waitEnd.wait();      def SetStopEvent(self):        if not self.waitEnd is None:            self.waitEnd.set();        self.alive = False;        self.stop();      def start(self):        self.l_serial = serial.Serial();        self.l_serial.port = self.port;        self.l_serial.baudrate = 115200;        self.l_serial.timeout = 2;  #秒        self.l_serial.open();         if self.l_serial.isOpen():            self.waitEnd = threading.Event();            self.alive = True;            print 'open serial port %d ok!/n' %(self.port+1)            print 'baudrate:115200 /n'            self.thread_read = None;            self.thread_read = threading.Thread(target=self.FirstReader);            self.thread_read.setDaemon(1);            self.thread_read.start();             self.thread_write = None;            self.thread_write = threading.Thread(target=self.FirstWriter);            self.thread_write.setDaemon(1);            self.thread_write.start();             #TCP部分            self.thread_TcpClient = None;            self.thread_TcpClient = threading.Thread(target=self.TcpClient);            self.thread_TcpClient.setDaemon(1);            self.thread_TcpClient.start();             self.thread_TcpSend = None;            self.thread_TcpSend = threading.Thread(target=self.TcpSend);            self.thread_TcpSend.setDaemon(1);            self.thread_TcpSend.start();             return True;        else:            return False;      def FirstReader(self):        while self.alive:            # 接收间隔            time.sleep(0.1);            try:                data = '';                n = self.l_serial.inWaiting();                if n:                    data = data+self.l_serial.read(n);                    #for l in xrange(len(data)):                        #print '%02X' % ord(data[l]),                    # 发送数据                    print u'->请求:'                    print data;                    mylock.acquire()                     self.snddata = data                    mylock.release()                    #print_hex(data);                                                    # 判断结束                           except Exception, ex:                print str(ex);         self.waitEnd.set();        self.alive = False;     def FirstWriter(self):        while self.alive:            # 接收间隔            time.sleep(0.1);            try:                #snddata = raw_input('/nenter data send:/n')                if self.rcvdata!='':                    self.l_serial.write(self.rcvdata);                     print u'-<应答:'                    print self.rcvdata;                    mylock.acquire()                     self.rcvdata = '';                    mylock.release()                #print_hex(snddata);                            except Exception, ex:                print str(ex);        self.waitEnd.set();        self.alive = False;     def TcpClient(self):        while True:            # 接收间隔            time.sleep(0.1);            self.connection = socket(AF_INET, SOCK_STREAM);            self.connection.connect((Server_IP, int(Server_Port)));            print 'Connect to Server OK!';            self.snddata = ''            self.rcvdata = ''            while True:                #读取客户端套接字的下一行                data = self.connection.recv(1024)                #如果没有数量的话,那么跳出循环                if not data: break                #发送一个回复至客户端                mylock.acquire()                 self.snddata = ''                self.rcvdata = data                mylock.release()                #connection.send('Echo=>' + data)            self.connection.close()         self.waitEnd.set();        self.alive = False;     def TcpSend(self):        while True:            # 接收间隔            time.sleep(0.1);            while True:                time.sleep(0.1);                try:                    if not self.connection is None:                        if self.snddata != '':                            self.connection.send(self.snddata)                            mylock.acquire()                             self.rcvdata = ''                            self.snddata = ''                            mylock.release()                except Exception, ex:                    pass                def stop(self):        self.alive = False;        self.thread_read.join();        if self.l_serial.isOpen():            self.l_serial.close();   #测试用部分if __name__ == '__main__':    print 'Serial to Tcp Tool 1.00/n'     print 'Author:yangyongzhen/n'    print 'QQ:534117529/n'    print 'Copyright (c) **cap 2015-2016./n'     Server_IP = raw_input('please enter ServerIP:')    print 'Server_IP: %s' %(Server_IP)    Server_Port = raw_input('please enter ServerPort:')    print 'Server_Port: %s' %(Server_Port)    com =raw_input('please enter com port(1-9):')    rt = ComThread(int(com)-1);    try:        if rt.start():            rt.waiting();            rt.stop();        else:            pass;                except Exception,se:        print str(se);     if rt.alive:        rt.stop();    os.system("pause")     print '';    print 'End OK .';    del rt;

远程读卡器server端

很早之前做过一个远程读卡器工具,原理就是在现场客服电脑上装个python做的tcpserver服务端,操控现场的读卡器。在公司内部做个客户端连接过去,这样实现在公司调试现场的卡片业务。

这个就是服务端工具的实现:

#coding=utf-8#author:yangyongzhen#QQ:534117529#'CardTest TcpServer  - Simple Test Card Tool 1.00'  import sys,threading,time;import serial;import binascii,encodings;import re;import os;from socket import *from struct import *;#from myutil import *;#name: myutil.py mylock = threading.RLock()  def print_hex1(s,prev='0x'):    for c in s:        print '%s%02x' %(prev,ord(c)),    printdef print_hex(s):    for c in s:        print '%02x' %(ord(c)),    print def hexto_str(s):    r =''    for c in s:        r += '%02x' %(ord(c))    return rdef strto_hex(s):    r = s.decode('hex')    return r#''代表服务器为localhost #在一个非保留端口号上进行监听  class ComThread:    def __init__(self, Port=0):        self.l_serial = None;        self.alive = False;        self.waitEnd = None;        self.port = Port;         #TCP部分        self.myHost = ''        self.myPort = 5050        self.sockobj = socket(AF_INET, SOCK_STREAM)        self.connection = None        #数据        self.snddata = ''        self.rcvdata = ''     def waiting(self):        if not self.waitEnd is None:            self.waitEnd.wait();      def SetStopEvent(self):        if not self.waitEnd is None:            self.waitEnd.set();        self.alive = False;        self.stop();      def start(self):        self.l_serial = serial.Serial();        self.l_serial.port = self.port;        self.l_serial.baudrate = 115200;        self.l_serial.timeout = 2;  #秒        self.l_serial.open();         if self.l_serial.isOpen():            self.waitEnd = threading.Event();            self.alive = True;            print 'open serial port %d ok!/n' %(self.port+1)            print 'baudrate:115200 /n'            self.thread_read = None;            self.thread_read = threading.Thread(target=self.FirstReader);            self.thread_read.setDaemon(1);            self.thread_read.start();             self.thread_write = None;            self.thread_write = threading.Thread(target=self.FirstWriter);            self.thread_write.setDaemon(1);            self.thread_write.start();             #TCP部分            self.thread_TcpServer = None;            self.thread_TcpServer = threading.Thread(target=self.TcpServer);            self.thread_TcpServer.setDaemon(1);            self.thread_TcpServer.start();             self.thread_TcpSend = None;            self.thread_TcpSend = threading.Thread(target=self.TcpSend);            self.thread_TcpSend.setDaemon(1);            self.thread_TcpSend.start();             return True;        else:            return False;      def FirstReader(self):        while self.alive:            # 接收间隔            time.sleep(0.1);            try:                data = '';                n = self.l_serial.inWaiting();                if n:                    data = data+self.l_serial.read(n);                    #for l in xrange(len(data)):                        #print '%02X' % ord(data[l]),                    # 发送数据                    print 'serial recv:'                    print data;                    mylock.acquire()                     self.snddata = data                    mylock.release()                    #print_hex(data);                                                    # 判断结束                           except Exception, ex:                print str(ex);         self.waitEnd.set();        self.alive = False;     def FirstWriter(self):        while self.alive:            # 接收间隔            time.sleep(0.1);            try:                #snddata = raw_input('/nenter data send:/n')                if self.rcvdata!='':                    self.l_serial.write(self.rcvdata);                     print 'serial send:'                    print self.rcvdata;                    mylock.acquire()                     self.rcvdata = '';                    mylock.release()                #print_hex(snddata);                            except Exception, ex:                print str(ex);        self.waitEnd.set();        self.alive = False;     def TcpServer(self):        self.sockobj.bind((self.myHost, self.myPort))        self.sockobj.listen(10)        print 'TcpServer listen at 5050 oK!/n'        print 'Waiting for connect.../n'        while True:            # 接收间隔            time.sleep(0.1);            self.connection, address = self.sockobj.accept()            print 'Server connected by', address            self.snddata = ''            self.rcvdata = ''            try:                while True:                    #读取客户端套接字的下一行                    data = self.connection.recv(1024)                    #如果没有数量的话,那么跳出循环                    if not data: break                    #发送一个回复至客户端                    mylock.acquire()                     self.snddata = ''                    self.rcvdata = data                    mylock.release()                    #connection.send('Echo=>' + data)                self.connection.close()            except Exception, ex:                self.connection.close()         self.waitEnd.set();        self.alive = False;     def TcpSend(self):        while True:            # 接收间隔            time.sleep(0.1);            while True:                time.sleep(0.1);                try:                    if not self.connection is None:                        if self.snddata != '':                            self.connection.send(self.snddata)                            mylock.acquire()                             self.rcvdata = ''                            self.snddata = ''                            mylock.release()                except Exception, ex:                    pass                def stop(self):        self.alive = False;        self.thread_read.join();        if self.l_serial.isOpen():            self.l_serial.close();   #测试用部分if __name__ == '__main__':    print 'CardTest TcpServer  - Simple Test Card Tool 1.00/n'     print 'Author:yangyongzhen/n'    print 'QQ:534117529/n'    print 'Copyright (c) **** 2015-2016./n'     com =raw_input('please enter com port(1-9):')    rt = ComThread(int(com)-1);    try:        if rt.start():            rt.waiting();            rt.stop();        else:            pass;                except Exception,se:        print str(se);     if rt.alive:        rt.stop();    os.system("pause")     print '';    print 'End OK .';    del rt;

黑客rtcp反向链接

# -*- coding: utf-8 -*- '''filename:rtcp.py@desc:利用python的socket端口转发,用于远程维护如果连接不到远程,会sleep 36s,最多尝试200(即两小时)@usage:./rtcp.py stream1 stream2stream为:l:port或c:host:portl:port表示监听指定的本地端口c:host:port表示监听远程指定的端口@author: watercloud, zd, knownsec team@web: www.knownsec.com, blog.knownsec.com@date: 2009-7''' import socketimport sysimport threadingimport time streams = [None, None]  # 存放需要进行数据转发的两个数据流(都是SocketObj对象)debug = 1  # 调试状态 0 or 1 def print_hex(s):    for c in s:        print '%02x' %(ord(c)),    printdef _usage():    print 'Usage: ./rtcp.py stream1 stream2/nstream : L:port  or C:host:port' def _get_another_stream(num):    '''    从streams获取另外一个流对象,如果当前为空,则等待    '''    if num == 0:        num = 1    elif num == 1:        num = 0    else:        raise "ERROR"     while True:        if streams[num] == 'quit':            print("can't connect to the target, quit now!")            sys.exit(1)         if streams[num] != None:            return streams[num]        else:            time.sleep(1) def _xstream(num, s1, s2):    '''    交换两个流的数据    num为当前流编号,主要用于调试目的,区分两个回路状态用。    '''    try:        while True:            #注意,recv函数会阻塞,直到对端完全关闭(close后还需要一定时间才能关闭,最快关闭方法是shutdow)            buff = s1.recv(1024)            if debug > 0:                print num,"recv"            if len(buff) == 0: #对端关闭连接,读不到数据                print num,"one closed"                break            s2.sendall(buff)            if debug > 0:                print num,"sendall"                print_hex(buff)    except :        print num,"one connect closed."     try:        s1.shutdown(socket.SHUT_RDWR)        s1.close()    except:        pass     try:        s2.shutdown(socket.SHUT_RDWR)        s2.close()    except:        pass     streams[0] = None    streams[1] = None    print num, "CLOSED" def _server(port, num):    '''    处理服务情况,num为流编号(第0号还是第1号)    '''    srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)    srv.bind(('0.0.0.0', port))    srv.listen(1)    #print 'local listening at port %d' (%(port))    while True:        conn, addr = srv.accept()        print "connected from:", addr        streams[num] = conn  # 放入本端流对象        s2 = _get_another_stream(num)  # 获取另一端流对象        _xstream(num, conn, s2) def _connect(host, port, num):    '''    处理连接,num为流编号(第0号还是第1号)    @note: 如果连接不到远程,会sleep 36s,最多尝试200(即两小时)    '''    not_connet_time = 0    wait_time = 36    try_cnt = 199    while True:        if not_connet_time > try_cnt:            streams[num] = 'quit'            print('not connected')            return None         conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)        try:            conn.connect((host, port))        except Exception, e:            print ('can not connect %s:%s!' % (host, port))            not_connet_time += 1            time.sleep(wait_time)            continue         print "connected to %s:%i" % (host, port)        streams[num] = conn  #放入本端流对象        s2 = _get_another_stream(num) #获取另一端流对象        _xstream(num, conn, s2)  if __name__ == '__main__':    print 'Tcp to Tcp Tool 1.00/n'     print 'Author:yangyongzhen/n'    print 'QQ:534117529/n'    print 'Copyright (c) Newcapec 2015-2016./n'    Server_IP = raw_input('please enter Server IP:')    print 'Server_IP: %s' %(Server_IP)    Server_Port = raw_input('please enter Server Port:')    print 'Server_Port: %s' %(Server_Port)    com =raw_input('please enter Local Port:')    tlist = []  # 线程列表,最终存放两个线程对象    #targv = [sys.argv[1], sys.argv[2] ]    t = threading.Thread(target=_server, args=(int(com), 0))    tlist.append(t)     t = threading.Thread(target=_connect, args=(Server_IP, int(Server_Port), 1))    tlist.append(t)     for t in tlist:        t.start()    for t in tlist:        t.join()    sys.exit(0)

调用c的动态库示例

# -*- coding:utf8 -*-from ctypes import *from binascii import unhexlify as unheximport osdll = cdll.LoadLibrary('mydll.dll'); print 'begin load mydll..'#key#str1='/x9B/xED/x98/x89/x15/x80/xC3/xB2'str1=unhex('0000556677222238')#datastr2=unhex('002d2000000100015566772222383CD881604D0D286A556677222238000020141214181427') #outputstr3='/x12/x34/x56/x78/x12/x34/x56/x78'pstr1=c_char_p()pstr2=c_char_p()pstr3=c_char_p()pstr1.value=str1pstr2.value=str2pstr3.value=str3dll.CurCalc_DES_MAC64(805306481,pstr1,0,pstr2,13,pstr3)print pstr1print pstr2print pstr3stro= pstr3.valueprint strostrtemp=''for c in stro:    print "%02x" % (ord(c))    strtemp+="{0:02x}".format(ord(c))print strtempos.execlp("E://RSA.exe",'')s=raw_input('press any key to continue...')

tcp的socket连接报文测试工具

# -*- coding: utf-8 -*-import socketfrom myutil import *from binascii import unhexlify as unhexfrom ctypes import *dll = cdll.LoadLibrary('mydll.dll')print 'begin load mydll..'HOST, PORT = "192.168.51.28", 5800sd ="1234567812345678"# Create a socket (SOCK_STREAM means a TCP socket)sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)try:    # Connect to server and send data    sock.connect((HOST, int(PORT))    print "Sent1 OK:"    print sd    # Receive data from the server and shut down    received = sock.recv(1024)    print "Received:"    print_hex(received)    print  'received len is 0x%02x' %(len(received))    print  'received data analysis...'    re1=received[0:4]    print_hex(re1)    re1=received[4:6]    print_hex(re1)    re1=received[6:10]    print_hex(re1)    re1=received[10:16]    print_hex(re1)     #pack2 send    sock.send(sd2.decode('hex'))    print "Sent2 OK:"    print sd2    # Receive data from the server and shut down    received1 = sock.recv(1024)    print "Received1:"    print_hex(received1)    print  'received1 len is 0x%02x' %(len(received1))    finally:    sock.close() s=raw_input('press any key to continue...')

报文拼接与加解密测试

# -*- coding: gb2312 -*-import socketfrom myutil import *from binascii import unhexlify as unhexfrom ctypes import *dll = cdll.LoadLibrary('mydll.dll')print 'begin load mydll..'#keykey='/xF1/xE2/xD3/xC4/xF1/xE2/xD3/xC4'#output MACmac='/x00'*8data='/x00'*8pkey=c_char_p()pdata=c_char_p()pmac=c_char_p()pkey.value=keypdata.value=datapmac.value=mac#pack1class pack:   passpk=pack()pk.len='00000032'pk.ID='0001'pk.slnum='00000004'pk.poscode='123456781234'pk.rand='1122334455667788'pk.psam='313233343536'pk.kind='0000'pk.ver='000001'pk.time='20140805135601'pk.mac='06cc571e6d96e12d' data=unhex(pk.len+pk.ID+pk.slnum+pk.poscode+pk.rand+pk.psam+pk.kind+pk.ver+pk.time)#print_hex(data)pdata.value=data#cacl MACdll.CurCalc_DES_MAC64(805306481,pkey,0,pdata,42,pmac)stro= pmac.valuestrtemp=''for c in stro:    strtemp+="{0:02x}".format(ord(c))#print strtemppk.mac=strtemp#data to sendsd=pk.len+pk.ID+pk.slnum+pk.poscode+pk.rand+pk.psam+pk.kind+pk.ver+pk.time+pk.macprint  'send1 len is 0x%02x' %(len(sd)/2)print sd#pack2class pack2:   passpk2=pack2()pk2.len='0000006E'pk2.ID='0012'pk2.slnum='00000005'pk2.fatCode='00'pk2.cardASN='0000000000000000'pk2.cardType='00'pk2.userNO= '0000000000000000' pk2.fileName1='00000000000000000000000000000015'pk2.dataLen1='00'pk2.dataArea1='00000000000000319999990800FB2014080620240806FFFFFFFFFFFFFFFFFFFF'pk2.fileName2='00000000000000000000000000000016'pk2.dataLen2='00'pk2.dataArea2='000003E800FFFF16'pk2.mac='06cc571e6d96e12d' data2=unhex(pk2.len+pk2.ID+pk2.slnum+pk2.fatCode+pk2.cardASN+pk2.cardType+pk2.userNO+pk2.fileName1+pk2.dataLen1+pk2.dataArea1+pk2.fileName2+pk2.dataLen2+pk2.dataArea2) pdata.value=data2#cacl MACdll.CurCalc_DES_MAC64(805306481,pkey,0,pdata,102,pmac)stro= pmac.valuestrtemp=''for c in stro:    strtemp+="{0:02x}".format(ord(c))#print strtemppk2.mac=strtemp#data to sendsd2=pk2.len+pk2.ID+pk2.slnum+pk2.fatCode+pk2.cardASN+pk2.cardType+pk2.userNO+pk2.fileName1+pk2.dataLen1+pk2.dataArea1+pk2.fileName2+pk2.dataLen2+pk2.dataArea2+pk2.mac print  'send2 len is 0x%02x' %(len(sd2)/2)print sd2  #PORT="192.168.60.37"#PORT="localhost"HOST, PORT = "192.168.51.28", 5800# Create a socket (SOCK_STREAM means a TCP socket)sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)try:    # Connect to server and send data    sock.connect((HOST, int(PORT))    #data= "123456789"    #s = struct.pack('bbb',1,2,3)    sock.send(sd.decode('hex'))    print "Sent1 OK:"    print sd    # Receive data from the server and shut down    received = sock.recv(1024)    print "Received:"    print_hex(received)    print  'received len is 0x%02x' %(len(received))    print  'received data analysis...'    re1=received[0:4]    print_hex(re1)    re1=received[4:6]    print_hex(re1)    re1=received[6:10]    print_hex(re1)    re1=received[10:16]    print_hex(re1)     #pack2 send    sock.send(sd2.decode('hex'))    print "Sent2 OK:"    print sd2    # Receive data from the server and shut down    received1 = sock.recv(1024)    print "Received1:"    print_hex(received1)    print  'received1 len is 0x%02x' %(len(received1))    finally:    sock.close() s=raw_input('press any key to continue...')

二进制文件解析工具

# -*- coding: utf-8 -*-from myutil import *from binascii import unhexlify as unheximport ospath=os.getcwd() path+='//rec04.bin'#print  pathprint "begin ans......" f1=open(path,'rb')for i in range(1,35):    s=f1.read(280)    print "data:",i    print_hex(s)    print 'read data is:'print_hex(s) recstatadd = 187print "终端编号:"print_hex(s[recstatadd:recstatadd+10])print "卡号长度:"print_hex(s[10])print "卡号:    "print_hex(s[11:11+10])print "持卡序号1+所属地城市代码2+交易地城市代码2"print_hex(s[recstatadd+22:recstatadd+22+5])print "应用交易计数器"print_hex(s[92:92+2])print "交易前余额4,交易金额3"print_hex(s[recstatadd+29:recstatadd+29+7])print "交易日期:"print_hex(s[99:99+3])print "交易时间:"print_hex(s[44:44+3])print "终端编号"print_hex(s[21:21+8])print "商户编号"print_hex(s[21+8:21+8+15])print "批次号"print_hex(s[5:5+3]) print "应用密文"print_hex(s[47:47+8])print "授权金额"print_hex(s[103:103+6])print "其他金额"print_hex(s[115:115+6])print "终端验证结果"print_hex(s[94:5+94])print "应用交易计数器"print_hex(s[92:92+4])print "卡片验证结果"print_hex(s[56:56+32])print "卡片序列号:"print_hex(s[131])f1.close()

抓取动漫图片

# -*- coding:utf8 -*-# 2013.12.36 19:41# 抓取dbmei.com的图片。 from bs4 import BeautifulSoupimport os, sys, urllib2,time,random # 创建文件夹path = os.getcwd()                        # 获取此脚本所在目录new_path = os.path.join(path,u'暴走漫画')if not os.path.isdir(new_path):  os.mkdir(new_path)  def page_loop(page=1):  url = 'http://baozoumanhua.com/all/hot/page/%s?sv=1389537379' % page  content = urllib2.urlopen(url)  soup = BeautifulSoup(content)   my_girl = soup.find_all('div',class_='img-wrap')  for girl in my_girl:    jokes = girl.find('img')    link = jokes.get('src')    flink = link    print flink    content2 = urllib2.urlopen(flink).read()     #with open(u'暴走漫画'+'/'+time.strftime('%H-%M-%S')+random.choice('qwertyuiopasdfghjklzxcvbnm')+flink[-5:],'wb') as code:          #在OSC上现学的    with open(u'暴走漫画'+'/'+flink[-11:],'wb') as code:      code.write(content2)   page = int(page) + 1  print u'开始抓取下一页'  print 'the %s page' % page  page_loop(page)  page_loop()

抓取网站模板

#!/usr/bin/env python# -*- coding: utf-8 -*-# by yangyongzhen# 2016-12-06 from bs4 import BeautifulSoupimport urllib,urllib2,os,timeimport re rootpath = os.getcwd()+u'/抓取的模板/' def makedir(path):    if not os.path.isdir(path):        os.makedirs(path) #创建抓取的根目录makedir(rootpath)#显示下载进度def Schedule(a,b,c):    '''''    a:已经下载的数据块    b:数据块的大小    c:远程文件的大小   '''    per = 100.0 * a * b / c    if per > 100 :        per = 100    print '%.2f%%' % per def grabHref(url,listhref,localfile):    html = urllib2.urlopen(url).read()    html = unicode(html,'gb2312','ignore').encode('utf-8','ignore')     content = BeautifulSoup(html).findAll('link')    myfile = open(localfile,'w')    pat = re.compile(r'href="([^"]*)"')    pat2 = re.compile(r'http')    for item in content:        h = pat.search(str(item))        href = h.group(1)        if pat2.search(href):            ans = href        else:            ans = url+href        listhref.append(ans)            myfile.write(ans)        myfile.write('/r/n')        print ans     content = BeautifulSoup(html).findAll('script')    pat = re.compile(r'src="([^"]*)"')    pat2 = re.compile(r'http')    for item in content:        h = pat.search(str(item))        if h:            href = h.group(1)        if pat2.search(href):            ans = href        else:            ans = url+href        listhref.append(ans)             myfile.write(ans)        myfile.write('/r/n')        print ans     content = BeautifulSoup(html).findAll('a')    pat = re.compile(r'href="([^"]*)"')    pat2 = re.compile(r'http')    for item in content:        h = pat.search(str(item))        if h:            href = h.group(1)        if pat2.search(href):            ans = href        else:            ans = url+href        listhref.append(ans)         myfile.write(ans)        myfile.write('/r/n')        print ans     myfile.close()    def main():    url = "http://192.168.72.140/qdkj/"   #采集网页的地址    listhref =[]    #链接地址    localfile = 'ahref.txt'  #保存链接地址为本地文件,文件名    grabHref(url,listhref,localfile)    listhref = list(set(listhref)) #去除链接中的重复地址        curpath = rootpath    start = time.clock()    for item in listhref:        curpath = rootpath        name = item.split('/')[-1]        fdir = item.split('/')[3:-1]         for i in fdir:            curpath += i            curpath += '/'        print curpath        makedir(curpath)        local = curpath+name        urllib.urlretrieve(item, local,Schedule) # 远程保存函数     end = time.clock()    print u'模板抓取完成!'    print u'一共用时:',end-start,u'秒' if __name__=="__main__":     main()

总结

到此这篇关于python常用小脚本的文章就介绍到这了,更多相关python常用小脚本内容请搜索wanshiok.com以前的文章或继续浏览下面的相关文章希望大家以后多多支持wanshiok.com!


详解Python单元测试的两种写法
如何使用python对图片进行批量压缩详解
51自学网,即我要自学网,自学EXCEL、自学PS、自学CAD、自学C语言、自学css3实例,是一个通过网络自主学习工作技能的自学平台,网友喜欢的软件自学网站。
京ICP备13026421号-1