这篇教程python常用小脚本实例总结写得很实用,希望能帮到您。
前言日常生活中常会遇到一些小任务,如果人工处理会很麻烦。 用python做些小脚本处理,能够提高不少效率。或者可以把python当工具使用,辅助提高一下办公效率。(比如我常拿python当计算器,计算和字符转换用) 以下总结下个人用到的一些python小脚本留作备忘。
打印16进制字符串用途:通信报文中的hex数据不好看,可以打印为16进制的字符串显示出来。 #coding=utf-8#name: myutil.pydef print_hex1(s,prev='0x'): for c in s: print '%s%02x' %(prev,ord(c)), printdef print_hex(s): for c in s: print '%02x' %(ord(c)), print print 'myutil' def print_hex3(s,prev='0x'): i = 0 for c in s: print '%s%s,' %(prev,s[i:i+2]), i += 2 print
文件合并之前搞单片机时生成的hex应用程序文件不能直接刷到单片机里,还需要把iap程序合并成一个文件才能烧写到单片机。每次打包麻烦,做个脚本处理: #path='C://Users//test//IAP_CZ_v204w.hex'#file=open(path,'r')#for ll in file.readlines()# print ll#coding=gb18030import timeimport osdef prr(): print 'file combination begin..' path0=os.getcwd()print path0path=path0#path1=path0path2=path0path+='//IAP_CZ_v204w.hex'#path1+='//NC_armStaSystem.hex'path2+='//'print paths=raw_input('enter file path:')path1=s#path1+='//NC_armStaSystem.hex'print path1s=raw_input('enter file name:')path2+=spath2+=time.strftime('_%y%m%d%H%M%S')path2+='.hex'print path2prr()try: f1=open(path,'r') count=0 for l in f1.readlines(): # print l count+=1 #print count f1.close() f1=open(path,'r') f2=open(path1,'r') f3=open(path2,'w') while(count>1): l=f1.readline() # print l f3.write(l) count-=1 # print count f3.flush() for l in f2.readlines(): f3.write(l) f3.flush() f3.close() print 'combination success!'except Exception,ex: print 'excettion occured!' print ex s=raw_input('press any key to continue...') finally: f1.close() f2.close() s=raw_input('press any key to continue...')
多线程下载图集网上好看的动漫图集,如果手工下载太费时了。简单分析下网页地址规律,写个多线程脚本搞定。 #!/usr/bin/python# -*- coding: utf-8 -*-# filename: paxel.py '''It is a multi-thread downloading tool It was developed follow axel. Author: volans E-mail: volansw [at] gmail.com''' import sysimport osimport timeimport urllibfrom threading import Thread local_proxies = {'http': 'http://131.139.58.200:8080'} class AxelPython(Thread, urllib.FancyURLopener): '''Multi-thread downloading class. run() is a vitural method of Thread. ''' def __init__(self, threadname, url, filename, ranges=0, proxies={}): Thread.__init__(self, name=threadname) urllib.FancyURLopener.__init__(self, proxies) self.name = threadname self.url = url self.filename = filename self.ranges = ranges self.downloaded = 0 def run(self): '''vertual function in Thread''' try: self.downloaded = os.path.getsize( self.filename ) except OSError: #print 'never downloaded' self.downloaded = 0 # rebuild start poind self.startpoint = self.ranges[0] + self.downloaded # This part is completed if self.startpoint >= self.ranges[1]: print 'Part %s has been downloaded over.' % self.filename return self.oneTimeSize = 16384 #16kByte/time print 'task %s will download from %d to %d' % (self.name, self.startpoint, self.ranges[1]) self.addheader("Range", "bytes=%d-%d" % (self.startpoint, self.ranges[1])) self.urlhandle = self.open( self.url ) data = self.urlhandle.read( self.oneTimeSize ) while data: filehandle = open( self.filename, 'ab+' ) filehandle.write( data ) filehandle.close() self.downloaded += len( data ) #print "%s" % (self.name) #progress = u'/r...' data = self.urlhandle.read( self.oneTimeSize ) def GetUrlFileSize(url, proxies={}): urlHandler = urllib.urlopen( url, proxies=proxies ) headers = urlHandler.info().headers length = 0 for header in headers: if header.find('Length') != -1: length = header.split(':')[-1].strip() length = int(length) return length def SpliteBlocks(totalsize, blocknumber): blocksize = totalsize/blocknumber ranges = [] for i in range(0, blocknumber-1): ranges.append((i*blocksize, i*blocksize +blocksize - 1)) ranges.append(( blocksize*(blocknumber-1), totalsize -1 )) return rangesdef islive(tasks): for task in tasks: if task.isAlive(): return True return False def paxel(url, output, blocks=6, proxies=local_proxies): ''' paxel ''' size = GetUrlFileSize( url, proxies ) ranges = SpliteBlocks( size, blocks ) threadname = [ "thread_%d" % i for i in range(0, blocks) ] filename = [ "tmpfile_%d" % i for i in range(0, blocks) ] tasks = [] for i in range(0,blocks): task = AxelPython( threadname[i], url, filename[i], ranges[i] ) task.setDaemon( True ) task.start() tasks.append( task ) time.sleep( 2 ) while islive(tasks): downloaded = sum( [task.downloaded for task in tasks] ) process = downloaded/float(size)*100 show = u'/rFilesize:%d Downloaded:%d Completed:%.2f%%' % (size, downloaded, process) sys.stdout.write(show) sys.stdout.flush() time.sleep( 0.5 ) filehandle = open( output, 'wb+' ) for i in filename: f = open( i, 'rb' ) filehandle.write( f.read() ) f.close() try: os.remove(i) pass except: pass filehandle.close() if __name__ == '__main__': url = "http://xz1.mm667.com/xz84/images/001.jpg" output = '001.jpg' paxel( url, output, blocks=4, proxies={} )
多线程下载图片多线程下载图片并存储到指定目录中,若目录不存在则自动创建。 # -*- coding: UTF-8 -*-'''import reimport urlliburls='http://xz5.mm667.com/xz82/images/01.jpg'def getHtml(url): page = urllib.urlopen(url) html = page.read() return htmldef getImg(html): reg = r'src="(.+?/.jpg)" pic_ext' imgre = re.compile(reg) imglist = imgre.findall(html) x = 0 for imgurl in imglist: urllib.urlretrieve(imgurl,'%s.jpg' % x) x = x + 1html = getHtml("http://tieba.baidu.com/p/2460150866")getImg(html)'''import reimport urllibimport threadingimport timeimport socketsocket.setdefaulttimeout(30)urls=[]j=0for i in xrange(1,81): if (i-1)%4 == 0: j += 1 if ((j-1)%5) == 0 : j=1 site='http://xz%d.mm667.com/xz%02d/images/' %(j,i) urls.append(site) print urls[i-1]#print urls''' urls.append('http://xz1.mm667.com/xz01/images/')urls.append('http://xz1.mm667.com/xz02/images/')urls.append('http://xz1.mm667.com/xz03/images/')urls.append('http://xz1.mm667.com/xz04/images/')urls.append('http://xz1.mm667.com/xz84/images/')urls.append('http://xz2.mm667.com/xz85/images/')urls.append('http://xz3.mm667.com/xz86/images/')urls.append('http://xz1.mm667.com/s/')urls.append('http://xz1.mm667.com/p/')'''def mkdir(path): # 引入模块 import os # 去除首位空格 path=path.strip() # 去除尾部 / 符号 path=path.rstrip("//") # 判断路径是否存在 # 存在 True # 不存在 False isExists=os.path.exists(path) # 判断结果 if not isExists: # 如果不存在则创建目录 print path+u' 创建成功' # 创建目录操作函数 os.makedirs(path) return True else: # 如果目录存在则不创建,并提示目录已存在 print path+u' 目录已存在' return False def cbk(a,b,c): '''''回调函数 @a: 已经下载的数据块 @b: 数据块的大小 @c: 远程文件的大小 ''' per = 100.0 * a * b / c if per > 100: per = 100 print '%.2f%%' % per #url = 'http://www.sina.com.cn'local = 'd://mysite//pic1//'d=0mutex = threading.Lock()# mutex1 = threading.Lock()class MyThread(threading.Thread): def __init__(self, url, name): threading.Thread.__init__(self) self.url=url self.name=name def run(self): mutex.acquire() print print 'down from %s' % self.url time.sleep(1) mutex.release() try: urllib.urlretrieve(self.url, self.name) except Exception,e: print e time.sleep(1) urllib.urlretrieve(self.url, self.name)threads=[] for u in urls[84:]: d += 1 local = 'd://mysite//pic1//%d//' %d mkdir(local) print 'download begin...' for i in xrange(40): lcal = local url=u url += '%03d.jpg' %i lcal += '%03d.jpg' %i th = MyThread(url,lcal) threads.append(th) th.start()# for t in threads:# t.join()print 'over! download finished'
爬虫抓取信息#!/usr/bin/env python# -*- coding:utf-8 -*-"""Python爬虫,抓取一卡通相关企业信息Anthor: yangyongzhenVersion: 0.0.2Date: 2014-12-14Language: Python2.7.5Editor: Sublime Text2""" import urllib2, re, stringimport threading, Queue, timeimport sysimport osfrom bs4 import BeautifulSoup#from pprint import pprint reload(sys)sys.setdefaultencoding('utf8')_DATA = []FILE_LOCK = threading.Lock()SHARE_Q = Queue.Queue() #构造一个不限制大小的的队列_WORKER_THREAD_NUM = 3 #设置线程的个数 _Num = 0 #总条数class MyThread(threading.Thread) : def __init__(self, func,num) : super(MyThread, self).__init__() #调用父类的构造函数 self.func = func #传入线程函数逻辑 self.thread_num = num def run(self) : self.func() #print u'线程ID:',self.thread_num def worker() : global SHARE_Q while not SHARE_Q.empty(): url = SHARE_Q.get() #获得任务 my_page = get_page(url) find_data(my_page) #获得当前页面的数据 #write_into_file(temp_data) time.sleep(1) SHARE_Q.task_done() def get_page(url) : """ 根据所给的url爬取网页HTML Args: url: 表示当前要爬取页面的url Returns: 返回抓取到整个页面的HTML(unicode编码) Raises: URLError:url引发的异常 """ try : html = urllib2.urlopen(url).read() my_page = html.decode("gbk",'ignore') #my_page = unicode(html,'utf-8','ignore').encode('utf-8','ignore') #my_page = urllib2.urlopen(url).read().decode("utf8") except urllib2.URLError, e : if hasattr(e, "code"): print "The server couldn't fulfill the request." print "Error code: %s" % e.code elif hasattr(e, "reason"): print "We failed to reach a server. Please check your url and read the Reason" print "Reason: %s" % e.reason return my_page def find_data(my_page) : """ 通过返回的整个网页HTML, 正则匹配名称 Args: my_page: 传入页面的HTML文本用于正则匹配 """ global _Num temp_data = [] items = BeautifulSoup(my_page).find_all("div", style="width:96%;margin:10px;border-bottom:1px #CCC dashed;padding-bottom:10px;") for index, item in enumerate(items) : #print item #print item.h1 #print h.group() #temp_data.append(item) #print item.find(re.compile("^a")) href = item.find(re.compile("^a")) #soup = BeautifulSoup(item) #公司名称 if item.a: data = item.a.string.encode("gbk","ignore") print data temp_data.append(data) goods = item.find_all("div", style="font-size:12px;") #经营产品与联系方式 for i in goods: data = i.get_text().encode("gbk","ignore") temp_data.append(data) print data #b = item.find_all("b") #print b #链接地址 pat = re.compile(r'href="([^"]*)"') h = pat.search(str(item)) if h: #print h.group(0) href = h.group(1) print href temp_data.append(h.group(1)) _Num += 1 #b = item.find_all(text=re.compile("Dormouse")) #pprint(goods) #print href #pat = re.compile(r'title="([^"]*)"') #h = pat.search(str(href)) #if h: #print h.group(1) #temp_data.append(h.group(1)) _DATA.append(temp_data) #headers = {'User-Agent':"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/22.0.1207.1 Safari/537.1"}##浏览器请求头(大部分网站没有这个请求头会报错、请务必加上哦)#all_url = 'http://www.mzitu.com/all' ##开始的URL地址#start_html = requests.get(all_url, headers=headers) ##使用requests中的get方法来获取all_url(就是:http://www.mzitu.com/all这个地址)的内容 headers为上面设置的请求头、请务必参考requests官方文档解释#print(start_html.text) ##打印出start_html (请注意,concent是二进制的数据,一般用于下载图片、视频、音频、等多媒体内容是才使用concent, 对于打印网页内容请使用text) def main() : global SHARE_Q threads = [] start = time.clock() douban_url = "http://company.yktworld.com/comapny_search.asp?page={page}" #向队列中放入任务, 真正使用时, 应该设置为可持续的放入任务 for index in xrange(20) : SHARE_Q.put(douban_url.format(page = index * 1)) for i in xrange(_WORKER_THREAD_NUM) : thread = MyThread(worker,i) thread.start() #线程开始处理任务 threads.append(thread) for thread in threads : thread.join() SHARE_Q.join() i = 0 with open("down.txt", "w+") as my_file : for page in _DATA : i += 1 for name in page: my_file.write(name + "/n") print "Spider Successful!!!" end = time.clock() print u'抓取完成!' print u'总页数:',i print u'总条数:',_Num print u'一共用时:',end-start,u'秒' if __name__ == '__main__': main()
爬虫多线程下载电影名称#!/usr/bin/env python# -*- coding:utf-8 -*-"""Python爬虫Anthor: yangyongzhenVersion: 0.0.2Date: 2014-12-14Language: Python2.7.8Editor: Sublime Text2""" import urllib2, re, stringimport threading, Queue, timeimport sysimport osfrom bs4 import BeautifulSoup reload(sys)sys.setdefaultencoding('utf8')_DATA = []FILE_LOCK = threading.Lock()SHARE_Q = Queue.Queue() #构造一个不限制大小的的队列_WORKER_THREAD_NUM = 3 #设置线程的个数 rootpath = os.getcwd()+u'/抓取的内容/' def makedir(path): if not os.path.isdir(path): os.makedirs(path) #创建抓取的根目录#makedir(rootpath)#显示下载进度def Schedule(a,b,c): ''''' a:已经下载的数据块 b:数据块的大小 c:远程文件的大小 ''' per = 100.0 * a * b / c if per > 100 : per = 100 print '%.2f%%' % perclass MyThread(threading.Thread) : def __init__(self, func) : super(MyThread, self).__init__() #调用父类的构造函数 self.func = func #传入线程函数逻辑 def run(self) : self.func() def worker() : print 'work thread start.../n' global SHARE_Q while not SHARE_Q.empty(): url = SHARE_Q.get() #获得任务 my_page = get_page(url) find_title(my_page) #获得当前页面的电影名 #write_into_file(temp_data) time.sleep(1) SHARE_Q.task_done() def get_page(url) : """ 根据所给的url爬取网页HTML Args: url: 表示当前要爬取页面的url Returns: 返回抓取到整个页面的HTML(unicode编码) Raises: URLError:url引发的异常 """ try : html = urllib2.urlopen(url).read() my_page = html.decode("utf8") #my_page = unicode(html,'utf-8','ignore').encode('utf-8','ignore') #my_page = urllib2.urlopen(url).read().decode("utf8") except urllib2.URLError, e : if hasattr(e, "code"): print "The server couldn't fulfill the request." print "Error code: %s" % e.code elif hasattr(e, "reason"): print "We failed to reach a server. Please check your url and read the Reason" print "Reason: %s" % e.reason return my_page def find_title(my_page) : """ 通过返回的整个网页HTML, 正则匹配前100的电影名称 Args: my_page: 传入页面的HTML文本用于正则匹配 """ temp_data = [] movie_items = BeautifulSoup(my_page).findAll('h1') for index, item in enumerate(movie_items) : #print item #print item.h1 pat = re.compile(r'href="([^"]*)"') h = pat.search(str(item)) if h: #print h.group(0) href = h.group(1) print href temp_data.append(h.group(1)) #print h.group() #temp_data.append(item) #print item.find(re.compile("^a")) href = item.find(re.compile("^a")) #soup = BeautifulSoup(item) if item.a: #print item.a.string temp_data.append(item.a.string) #print href #pat = re.compile(r'title="([^"]*)"') #h = pat.search(str(href)) #if h: #print h.group(1) #temp_data.append(h.group(1)) _DATA.append(temp_data) def main() : global SHARE_Q threads = [] start = time.clock() douban_url = "http://movie.misszm.com/page/{page}" #向队列中放入任务, 真正使用时, 应该设置为可持续的放入任务 for index in xrange(5) : SHARE_Q.put(douban_url.format(page = index * 1)) for i in xrange(_WORKER_THREAD_NUM) : thread = MyThread(worker) thread.start() #线程开始处理任务 threads.append(thread) for thread in threads : thread.join() SHARE_Q.join() with open("movie.txt", "w+") as my_file : for page in _DATA : for movie_name in page: my_file.write(movie_name + "/n") print "Spider Successful!!!" end = time.clock() print u'抓取完成!' print u'一共用时:',end-start,u'秒' if __name__ == '__main__': main()
串口转tcp工具#coding=utf-8#author:yangyongzhen#QQ:534117529#'CardTest TcpServer - Simple Test Card Tool 1.00' import sys,threading,time;import serial;import binascii,encodings;import re;import os;from socket import *from struct import *;#from myutil import *;#name: myutil.py mylock = threading.RLock() Server_IP = ''Srever_Port = '' def print_hex1(s,prev='0x'): for c in s: print '%s%02x' %(prev,ord(c)), printdef print_hex(s): for c in s: print '%02x' %(ord(c)), print def hexto_str(s): r ='' for c in s: r += '%02x' %(ord(c)) return rdef strto_hex(s): r = s.decode('hex') return r#''代表服务器为localhost #在一个非保留端口号上进行监听 class ComThread: def __init__(self, Port=0): self.l_serial = None; self.alive = False; self.waitEnd = None; self.port = Port; #TCP部分 #self.sockobj = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.connection = None #数据 self.snddata = '' self.rcvdata = '' def waiting(self): if not self.waitEnd is None: self.waitEnd.wait(); def SetStopEvent(self): if not self.waitEnd is None: self.waitEnd.set(); self.alive = False; self.stop(); def start(self): self.l_serial = serial.Serial(); self.l_serial.port = self.port; self.l_serial.baudrate = 115200; self.l_serial.timeout = 2; #秒 self.l_serial.open(); if self.l_serial.isOpen(): self.waitEnd = threading.Event(); self.alive = True; print 'open serial port %d ok!/n' %(self.port+1) print 'baudrate:115200 /n' self.thread_read = None; self.thread_read = threading.Thread(target=self.FirstReader); self.thread_read.setDaemon(1); self.thread_read.start(); self.thread_write = None; self.thread_write = threading.Thread(target=self.FirstWriter); self.thread_write.setDaemon(1); self.thread_write.start(); #TCP部分 self.thread_TcpClient = None; self.thread_TcpClient = threading.Thread(target=self.TcpClient); self.thread_TcpClient.setDaemon(1); self.thread_TcpClient.start(); self.thread_TcpSend = None; self.thread_TcpSend = threading.Thread(target=self.TcpSend); self.thread_TcpSend.setDaemon(1); self.thread_TcpSend.start(); return True; else: return False; def FirstReader(self): while self.alive: # 接收间隔 time.sleep(0.1); try: data = ''; n = self.l_serial.inWaiting(); if n: data = data+self.l_serial.read(n); #for l in xrange(len(data)): #print '%02X' % ord(data[l]), # 发送数据 print u'->请求:' print data; mylock.acquire() self.snddata = data mylock.release() #print_hex(data); # 判断结束 except Exception, ex: print str(ex); self.waitEnd.set(); self.alive = False; def FirstWriter(self): while self.alive: # 接收间隔 time.sleep(0.1); try: #snddata = raw_input('/nenter data send:/n') if self.rcvdata!='': self.l_serial.write(self.rcvdata); print u'-<应答:' print self.rcvdata; mylock.acquire() self.rcvdata = ''; mylock.release() #print_hex(snddata); except Exception, ex: print str(ex); self.waitEnd.set(); self.alive = False; def TcpClient(self): while True: # 接收间隔 time.sleep(0.1); self.connection = socket(AF_INET, SOCK_STREAM); self.connection.connect((Server_IP, int(Server_Port))); print 'Connect to Server OK!'; self.snddata = '' self.rcvdata = '' while True: #读取客户端套接字的下一行 data = self.connection.recv(1024) #如果没有数量的话,那么跳出循环 if not data: break #发送一个回复至客户端 mylock.acquire() self.snddata = '' self.rcvdata = data mylock.release() #connection.send('Echo=>' + data) self.connection.close() self.waitEnd.set(); self.alive = False; def TcpSend(self): while True: # 接收间隔 time.sleep(0.1); while True: time.sleep(0.1); try: if not self.connection is None: if self.snddata != '': self.connection.send(self.snddata) mylock.acquire() self.rcvdata = '' self.snddata = '' mylock.release() except Exception, ex: pass def stop(self): self.alive = False; self.thread_read.join(); if self.l_serial.isOpen(): self.l_serial.close(); #测试用部分if __name__ == '__main__': print 'Serial to Tcp Tool 1.00/n' print 'Author:yangyongzhen/n' print 'QQ:534117529/n' print 'Copyright (c) **cap 2015-2016./n' Server_IP = raw_input('please enter ServerIP:') print 'Server_IP: %s' %(Server_IP) Server_Port = raw_input('please enter ServerPort:') print 'Server_Port: %s' %(Server_Port) com =raw_input('please enter com port(1-9):') rt = ComThread(int(com)-1); try: if rt.start(): rt.waiting(); rt.stop(); else: pass; except Exception,se: print str(se); if rt.alive: rt.stop(); os.system("pause") print ''; print 'End OK .'; del rt;
远程读卡器server端很早之前做过一个远程读卡器工具,原理就是在现场客服电脑上装个python做的tcpserver服务端,操控现场的读卡器。在公司内部做个客户端连接过去,这样实现在公司调试现场的卡片业务。 这个就是服务端工具的实现: #coding=utf-8#author:yangyongzhen#QQ:534117529#'CardTest TcpServer - Simple Test Card Tool 1.00' import sys,threading,time;import serial;import binascii,encodings;import re;import os;from socket import *from struct import *;#from myutil import *;#name: myutil.py mylock = threading.RLock() def print_hex1(s,prev='0x'): for c in s: print '%s%02x' %(prev,ord(c)), printdef print_hex(s): for c in s: print '%02x' %(ord(c)), print def hexto_str(s): r ='' for c in s: r += '%02x' %(ord(c)) return rdef strto_hex(s): r = s.decode('hex') return r#''代表服务器为localhost #在一个非保留端口号上进行监听 class ComThread: def __init__(self, Port=0): self.l_serial = None; self.alive = False; self.waitEnd = None; self.port = Port; #TCP部分 self.myHost = '' self.myPort = 5050 self.sockobj = socket(AF_INET, SOCK_STREAM) self.connection = None #数据 self.snddata = '' self.rcvdata = '' def waiting(self): if not self.waitEnd is None: self.waitEnd.wait(); def SetStopEvent(self): if not self.waitEnd is None: self.waitEnd.set(); self.alive = False; self.stop(); def start(self): self.l_serial = serial.Serial(); self.l_serial.port = self.port; self.l_serial.baudrate = 115200; self.l_serial.timeout = 2; #秒 self.l_serial.open(); if self.l_serial.isOpen(): self.waitEnd = threading.Event(); self.alive = True; print 'open serial port %d ok!/n' %(self.port+1) print 'baudrate:115200 /n' self.thread_read = None; self.thread_read = threading.Thread(target=self.FirstReader); self.thread_read.setDaemon(1); self.thread_read.start(); self.thread_write = None; self.thread_write = threading.Thread(target=self.FirstWriter); self.thread_write.setDaemon(1); self.thread_write.start(); #TCP部分 self.thread_TcpServer = None; self.thread_TcpServer = threading.Thread(target=self.TcpServer); self.thread_TcpServer.setDaemon(1); self.thread_TcpServer.start(); self.thread_TcpSend = None; self.thread_TcpSend = threading.Thread(target=self.TcpSend); self.thread_TcpSend.setDaemon(1); self.thread_TcpSend.start(); return True; else: return False; def FirstReader(self): while self.alive: # 接收间隔 time.sleep(0.1); try: data = ''; n = self.l_serial.inWaiting(); if n: data = data+self.l_serial.read(n); #for l in xrange(len(data)): #print '%02X' % ord(data[l]), # 发送数据 print 'serial recv:' print data; mylock.acquire() self.snddata = data mylock.release() #print_hex(data); # 判断结束 except Exception, ex: print str(ex); self.waitEnd.set(); self.alive = False; def FirstWriter(self): while self.alive: # 接收间隔 time.sleep(0.1); try: #snddata = raw_input('/nenter data send:/n') if self.rcvdata!='': self.l_serial.write(self.rcvdata); print 'serial send:' print self.rcvdata; mylock.acquire() self.rcvdata = ''; mylock.release() #print_hex(snddata); except Exception, ex: print str(ex); self.waitEnd.set(); self.alive = False; def TcpServer(self): self.sockobj.bind((self.myHost, self.myPort)) self.sockobj.listen(10) print 'TcpServer listen at 5050 oK!/n' print 'Waiting for connect.../n' while True: # 接收间隔 time.sleep(0.1); self.connection, address = self.sockobj.accept() print 'Server connected by', address self.snddata = '' self.rcvdata = '' try: while True: #读取客户端套接字的下一行 data = self.connection.recv(1024) #如果没有数量的话,那么跳出循环 if not data: break #发送一个回复至客户端 mylock.acquire() self.snddata = '' self.rcvdata = data mylock.release() #connection.send('Echo=>' + data) self.connection.close() except Exception, ex: self.connection.close() self.waitEnd.set(); self.alive = False; def TcpSend(self): while True: # 接收间隔 time.sleep(0.1); while True: time.sleep(0.1); try: if not self.connection is None: if self.snddata != '': self.connection.send(self.snddata) mylock.acquire() self.rcvdata = '' self.snddata = '' mylock.release() except Exception, ex: pass def stop(self): self.alive = False; self.thread_read.join(); if self.l_serial.isOpen(): self.l_serial.close(); #测试用部分if __name__ == '__main__': print 'CardTest TcpServer - Simple Test Card Tool 1.00/n' print 'Author:yangyongzhen/n' print 'QQ:534117529/n' print 'Copyright (c) **** 2015-2016./n' com =raw_input('please enter com port(1-9):') rt = ComThread(int(com)-1); try: if rt.start(): rt.waiting(); rt.stop(); else: pass; except Exception,se: print str(se); if rt.alive: rt.stop(); os.system("pause") print ''; print 'End OK .'; del rt;
黑客rtcp反向链接# -*- coding: utf-8 -*- '''filename:rtcp.py@desc:利用python的socket端口转发,用于远程维护如果连接不到远程,会sleep 36s,最多尝试200(即两小时)@usage:./rtcp.py stream1 stream2stream为:l:port或c:host:portl:port表示监听指定的本地端口c:host:port表示监听远程指定的端口@author: watercloud, zd, knownsec team@web: www.knownsec.com, blog.knownsec.com@date: 2009-7''' import socketimport sysimport threadingimport time streams = [None, None] # 存放需要进行数据转发的两个数据流(都是SocketObj对象)debug = 1 # 调试状态 0 or 1 def print_hex(s): for c in s: print '%02x' %(ord(c)), printdef _usage(): print 'Usage: ./rtcp.py stream1 stream2/nstream : L:port or C:host:port' def _get_another_stream(num): ''' 从streams获取另外一个流对象,如果当前为空,则等待 ''' if num == 0: num = 1 elif num == 1: num = 0 else: raise "ERROR" while True: if streams[num] == 'quit': print("can't connect to the target, quit now!") sys.exit(1) if streams[num] != None: return streams[num] else: time.sleep(1) def _xstream(num, s1, s2): ''' 交换两个流的数据 num为当前流编号,主要用于调试目的,区分两个回路状态用。 ''' try: while True: #注意,recv函数会阻塞,直到对端完全关闭(close后还需要一定时间才能关闭,最快关闭方法是shutdow) buff = s1.recv(1024) if debug > 0: print num,"recv" if len(buff) == 0: #对端关闭连接,读不到数据 print num,"one closed" break s2.sendall(buff) if debug > 0: print num,"sendall" print_hex(buff) except : print num,"one connect closed." try: s1.shutdown(socket.SHUT_RDWR) s1.close() except: pass try: s2.shutdown(socket.SHUT_RDWR) s2.close() except: pass streams[0] = None streams[1] = None print num, "CLOSED" def _server(port, num): ''' 处理服务情况,num为流编号(第0号还是第1号) ''' srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) srv.bind(('0.0.0.0', port)) srv.listen(1) #print 'local listening at port %d' (%(port)) while True: conn, addr = srv.accept() print "connected from:", addr streams[num] = conn # 放入本端流对象 s2 = _get_another_stream(num) # 获取另一端流对象 _xstream(num, conn, s2) def _connect(host, port, num): ''' 处理连接,num为流编号(第0号还是第1号) @note: 如果连接不到远程,会sleep 36s,最多尝试200(即两小时) ''' not_connet_time = 0 wait_time = 36 try_cnt = 199 while True: if not_connet_time > try_cnt: streams[num] = 'quit' print('not connected') return None conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: conn.connect((host, port)) except Exception, e: print ('can not connect %s:%s!' % (host, port)) not_connet_time += 1 time.sleep(wait_time) continue print "connected to %s:%i" % (host, port) streams[num] = conn #放入本端流对象 s2 = _get_another_stream(num) #获取另一端流对象 _xstream(num, conn, s2) if __name__ == '__main__': print 'Tcp to Tcp Tool 1.00/n' print 'Author:yangyongzhen/n' print 'QQ:534117529/n' print 'Copyright (c) Newcapec 2015-2016./n' Server_IP = raw_input('please enter Server IP:') print 'Server_IP: %s' %(Server_IP) Server_Port = raw_input('please enter Server Port:') print 'Server_Port: %s' %(Server_Port) com =raw_input('please enter Local Port:') tlist = [] # 线程列表,最终存放两个线程对象 #targv = [sys.argv[1], sys.argv[2] ] t = threading.Thread(target=_server, args=(int(com), 0)) tlist.append(t) t = threading.Thread(target=_connect, args=(Server_IP, int(Server_Port), 1)) tlist.append(t) for t in tlist: t.start() for t in tlist: t.join() sys.exit(0)
调用c的动态库示例# -*- coding:utf8 -*-from ctypes import *from binascii import unhexlify as unheximport osdll = cdll.LoadLibrary('mydll.dll'); print 'begin load mydll..'#key#str1='/x9B/xED/x98/x89/x15/x80/xC3/xB2'str1=unhex('0000556677222238')#datastr2=unhex('002d2000000100015566772222383CD881604D0D286A556677222238000020141214181427') #outputstr3='/x12/x34/x56/x78/x12/x34/x56/x78'pstr1=c_char_p()pstr2=c_char_p()pstr3=c_char_p()pstr1.value=str1pstr2.value=str2pstr3.value=str3dll.CurCalc_DES_MAC64(805306481,pstr1,0,pstr2,13,pstr3)print pstr1print pstr2print pstr3stro= pstr3.valueprint strostrtemp=''for c in stro: print "%02x" % (ord(c)) strtemp+="{0:02x}".format(ord(c))print strtempos.execlp("E://RSA.exe",'')s=raw_input('press any key to continue...')
tcp的socket连接报文测试工具# -*- coding: utf-8 -*-import socketfrom myutil import *from binascii import unhexlify as unhexfrom ctypes import *dll = cdll.LoadLibrary('mydll.dll')print 'begin load mydll..'HOST, PORT = "192.168.51.28", 5800sd ="1234567812345678"# Create a socket (SOCK_STREAM means a TCP socket)sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)try: # Connect to server and send data sock.connect((HOST, int(PORT)) print "Sent1 OK:" print sd # Receive data from the server and shut down received = sock.recv(1024) print "Received:" print_hex(received) print 'received len is 0x%02x' %(len(received)) print 'received data analysis...' re1=received[0:4] print_hex(re1) re1=received[4:6] print_hex(re1) re1=received[6:10] print_hex(re1) re1=received[10:16] print_hex(re1) #pack2 send sock.send(sd2.decode('hex')) print "Sent2 OK:" print sd2 # Receive data from the server and shut down received1 = sock.recv(1024) print "Received1:" print_hex(received1) print 'received1 len is 0x%02x' %(len(received1)) finally: sock.close() s=raw_input('press any key to continue...')
报文拼接与加解密测试# -*- coding: gb2312 -*-import socketfrom myutil import *from binascii import unhexlify as unhexfrom ctypes import *dll = cdll.LoadLibrary('mydll.dll')print 'begin load mydll..'#keykey='/xF1/xE2/xD3/xC4/xF1/xE2/xD3/xC4'#output MACmac='/x00'*8data='/x00'*8pkey=c_char_p()pdata=c_char_p()pmac=c_char_p()pkey.value=keypdata.value=datapmac.value=mac#pack1class pack: passpk=pack()pk.len='00000032'pk.ID='0001'pk.slnum='00000004'pk.poscode='123456781234'pk.rand='1122334455667788'pk.psam='313233343536'pk.kind='0000'pk.ver='000001'pk.time='20140805135601'pk.mac='06cc571e6d96e12d' data=unhex(pk.len+pk.ID+pk.slnum+pk.poscode+pk.rand+pk.psam+pk.kind+pk.ver+pk.time)#print_hex(data)pdata.value=data#cacl MACdll.CurCalc_DES_MAC64(805306481,pkey,0,pdata,42,pmac)stro= pmac.valuestrtemp=''for c in stro: strtemp+="{0:02x}".format(ord(c))#print strtemppk.mac=strtemp#data to sendsd=pk.len+pk.ID+pk.slnum+pk.poscode+pk.rand+pk.psam+pk.kind+pk.ver+pk.time+pk.macprint 'send1 len is 0x%02x' %(len(sd)/2)print sd#pack2class pack2: passpk2=pack2()pk2.len='0000006E'pk2.ID='0012'pk2.slnum='00000005'pk2.fatCode='00'pk2.cardASN='0000000000000000'pk2.cardType='00'pk2.userNO= '0000000000000000' pk2.fileName1='00000000000000000000000000000015'pk2.dataLen1='00'pk2.dataArea1='00000000000000319999990800FB2014080620240806FFFFFFFFFFFFFFFFFFFF'pk2.fileName2='00000000000000000000000000000016'pk2.dataLen2='00'pk2.dataArea2='000003E800FFFF16'pk2.mac='06cc571e6d96e12d' data2=unhex(pk2.len+pk2.ID+pk2.slnum+pk2.fatCode+pk2.cardASN+pk2.cardType+pk2.userNO+pk2.fileName1+pk2.dataLen1+pk2.dataArea1+pk2.fileName2+pk2.dataLen2+pk2.dataArea2) pdata.value=data2#cacl MACdll.CurCalc_DES_MAC64(805306481,pkey,0,pdata,102,pmac)stro= pmac.valuestrtemp=''for c in stro: strtemp+="{0:02x}".format(ord(c))#print strtemppk2.mac=strtemp#data to sendsd2=pk2.len+pk2.ID+pk2.slnum+pk2.fatCode+pk2.cardASN+pk2.cardType+pk2.userNO+pk2.fileName1+pk2.dataLen1+pk2.dataArea1+pk2.fileName2+pk2.dataLen2+pk2.dataArea2+pk2.mac print 'send2 len is 0x%02x' %(len(sd2)/2)print sd2 #PORT="192.168.60.37"#PORT="localhost"HOST, PORT = "192.168.51.28", 5800# Create a socket (SOCK_STREAM means a TCP socket)sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)try: # Connect to server and send data sock.connect((HOST, int(PORT)) #data= "123456789" #s = struct.pack('bbb',1,2,3) sock.send(sd.decode('hex')) print "Sent1 OK:" print sd # Receive data from the server and shut down received = sock.recv(1024) print "Received:" print_hex(received) print 'received len is 0x%02x' %(len(received)) print 'received data analysis...' re1=received[0:4] print_hex(re1) re1=received[4:6] print_hex(re1) re1=received[6:10] print_hex(re1) re1=received[10:16] print_hex(re1) #pack2 send sock.send(sd2.decode('hex')) print "Sent2 OK:" print sd2 # Receive data from the server and shut down received1 = sock.recv(1024) print "Received1:" print_hex(received1) print 'received1 len is 0x%02x' %(len(received1)) finally: sock.close() s=raw_input('press any key to continue...')
二进制文件解析工具# -*- coding: utf-8 -*-from myutil import *from binascii import unhexlify as unheximport ospath=os.getcwd() path+='//rec04.bin'#print pathprint "begin ans......" f1=open(path,'rb')for i in range(1,35): s=f1.read(280) print "data:",i print_hex(s) print 'read data is:'print_hex(s) recstatadd = 187print "终端编号:"print_hex(s[recstatadd:recstatadd+10])print "卡号长度:"print_hex(s[10])print "卡号: "print_hex(s[11:11+10])print "持卡序号1+所属地城市代码2+交易地城市代码2"print_hex(s[recstatadd+22:recstatadd+22+5])print "应用交易计数器"print_hex(s[92:92+2])print "交易前余额4,交易金额3"print_hex(s[recstatadd+29:recstatadd+29+7])print "交易日期:"print_hex(s[99:99+3])print "交易时间:"print_hex(s[44:44+3])print "终端编号"print_hex(s[21:21+8])print "商户编号"print_hex(s[21+8:21+8+15])print "批次号"print_hex(s[5:5+3]) print "应用密文"print_hex(s[47:47+8])print "授权金额"print_hex(s[103:103+6])print "其他金额"print_hex(s[115:115+6])print "终端验证结果"print_hex(s[94:5+94])print "应用交易计数器"print_hex(s[92:92+4])print "卡片验证结果"print_hex(s[56:56+32])print "卡片序列号:"print_hex(s[131])f1.close()
抓取动漫图片# -*- coding:utf8 -*-# 2013.12.36 19:41# 抓取dbmei.com的图片。 from bs4 import BeautifulSoupimport os, sys, urllib2,time,random # 创建文件夹path = os.getcwd() # 获取此脚本所在目录new_path = os.path.join(path,u'暴走漫画')if not os.path.isdir(new_path): os.mkdir(new_path) def page_loop(page=1): url = 'http://baozoumanhua.com/all/hot/page/%s?sv=1389537379' % page content = urllib2.urlopen(url) soup = BeautifulSoup(content) my_girl = soup.find_all('div',class_='img-wrap') for girl in my_girl: jokes = girl.find('img') link = jokes.get('src') flink = link print flink content2 = urllib2.urlopen(flink).read() #with open(u'暴走漫画'+'/'+time.strftime('%H-%M-%S')+random.choice('qwertyuiopasdfghjklzxcvbnm')+flink[-5:],'wb') as code: #在OSC上现学的 with open(u'暴走漫画'+'/'+flink[-11:],'wb') as code: code.write(content2) page = int(page) + 1 print u'开始抓取下一页' print 'the %s page' % page page_loop(page) page_loop()
抓取网站模板#!/usr/bin/env python# -*- coding: utf-8 -*-# by yangyongzhen# 2016-12-06 from bs4 import BeautifulSoupimport urllib,urllib2,os,timeimport re rootpath = os.getcwd()+u'/抓取的模板/' def makedir(path): if not os.path.isdir(path): os.makedirs(path) #创建抓取的根目录makedir(rootpath)#显示下载进度def Schedule(a,b,c): ''''' a:已经下载的数据块 b:数据块的大小 c:远程文件的大小 ''' per = 100.0 * a * b / c if per > 100 : per = 100 print '%.2f%%' % per def grabHref(url,listhref,localfile): html = urllib2.urlopen(url).read() html = unicode(html,'gb2312','ignore').encode('utf-8','ignore') content = BeautifulSoup(html).findAll('link') myfile = open(localfile,'w') pat = re.compile(r'href="([^"]*)"') pat2 = re.compile(r'http') for item in content: h = pat.search(str(item)) href = h.group(1) if pat2.search(href): ans = href else: ans = url+href listhref.append(ans) myfile.write(ans) myfile.write('/r/n') print ans content = BeautifulSoup(html).findAll('script') pat = re.compile(r'src="([^"]*)"') pat2 = re.compile(r'http') for item in content: h = pat.search(str(item)) if h: href = h.group(1) if pat2.search(href): ans = href else: ans = url+href listhref.append(ans) myfile.write(ans) myfile.write('/r/n') print ans content = BeautifulSoup(html).findAll('a') pat = re.compile(r'href="([^"]*)"') pat2 = re.compile(r'http') for item in content: h = pat.search(str(item)) if h: href = h.group(1) if pat2.search(href): ans = href else: ans = url+href listhref.append(ans) myfile.write(ans) myfile.write('/r/n') print ans myfile.close() def main(): url = "http://192.168.72.140/qdkj/" #采集网页的地址 listhref =[] #链接地址 localfile = 'ahref.txt' #保存链接地址为本地文件,文件名 grabHref(url,listhref,localfile) listhref = list(set(listhref)) #去除链接中的重复地址 curpath = rootpath start = time.clock() for item in listhref: curpath = rootpath name = item.split('/')[-1] fdir = item.split('/')[3:-1] for i in fdir: curpath += i curpath += '/' print curpath makedir(curpath) local = curpath+name urllib.urlretrieve(item, local,Schedule) # 远程保存函数 end = time.clock() print u'模板抓取完成!' print u'一共用时:',end-start,u'秒' if __name__=="__main__": main()
总结到此这篇关于python常用小脚本的文章就介绍到这了,更多相关python常用小脚本内容请搜索wanshiok.com以前的文章或继续浏览下面的相关文章希望大家以后多多支持wanshiok.com! 详解Python单元测试的两种写法 如何使用python对图片进行批量压缩详解 |