?? serialposix.py
字號(hào):
raise ValueError,'invalid stopit specification:'+str(stopbits)
#setup parity
self.iflag = self.iflag & ~(TERMIOS.INPCK|TERMIOS.ISTRIP)
if parity == PARITY_NONE:
self.cflag = self.cflag & ~(TERMIOS.PARENB|TERMIOS.PARODD)
elif parity == PARITY_EVEN:
self.cflag = self.cflag & ~(TERMIOS.PARODD)
self.cflag = self.cflag | (TERMIOS.PARENB)
elif parity == PARITY_ODD:
self.cflag = self.cflag | (TERMIOS.PARENB|TERMIOS.PARODD)
else:
raise ValueError,'invalid parity: '+str(par)
#setup flow control
#xonxoff
if hasattr(TERMIOS, 'IXANY'):
if xonxoff:
self.iflag = self.iflag | (TERMIOS.IXON|TERMIOS.IXOFF|TERMIOS.IXANY)
else:
self.iflag = self.iflag & ~(TERMIOS.IXON|TERMIOS.IXOFF|TERMIOS.IXANY)
else:
if xonxoff:
self.iflag = self.iflag | (TERMIOS.IXON|TERMIOS.IXOFF)
else:
self.iflag = self.iflag & ~(TERMIOS.IXON|TERMIOS.IXOFF)
#rtscts
if hasattr(TERMIOS, 'CRTSCTS'):
if rtscts:
self.cflag = self.cflag | (TERMIOS.CRTSCTS)
else:
self.cflag = self.cflag & ~(TERMIOS.CRTSCTS)
elif hasattr(TERMIOS, 'CNEW_RTSCTS'): #try it with alternate constant name
if rtscts:
self.cflag = self.cflag | (TERMIOS.CNEW_RTSCTS)
else:
self.cflag = self.cflag & ~(TERMIOS.CNEW_RTSCTS)
#XXX should there be a warning if setting up rtscts (and xonxoff etc) fails??
#buffer
#vmin "minimal number of characters to be read. = for non blocking"
if vmin<0 or vmin>255:
raise ValueError,'invalid vmin: '+str(vmin)
self.cc[TERMIOS.VMIN] = vmin
#vtime
if vtime<0 or vtime>255:
raise ValueError,'invalid vtime: '+str(vtime)
self.cc[TERMIOS.VTIME] = vtime
#activate settings
self.__tcsetattr()
def __tcsetattr(self):
"""internal function to set port attributes"""
termios.tcsetattr(self.fd, TERMIOS.TCSANOW, [self.iflag,self.oflag,self.cflag,self.lflag,self.ispeed,self.ospeed,self.cc])
def __tcgetattr(self):
"""internal function to get port attributes"""
self.iflag,self.oflag,self.cflag,self.lflag,self.ispeed,self.ospeed,self.cc = termios.tcgetattr(self.fd)
def close(self):
"""close port"""
if self.fd:
os.close(self.fd)
self.fd = None
def setBaudrate(self, baudrate):
"""change baudrate after port is open"""
if not self.fd: raise portNotOpenError
self.__tcgetattr() #read current settings
#setup baudrate
try:
self.ispeed = self.ospeed = baudIntToEnum[baudrate]
except:
raise ValueError,'invalid baud rate: %s' % baudrate
self.__tcsetattr()
def inWaiting(self):
"""how many character are in the input queue"""
#~ s = fcntl.ioctl(self.fd, TERMIOS.FIONREAD, TIOCM_zero_str)
s = fcntl.ioctl(self.fd, TIOCINQ, TIOCM_zero_str)
return struct.unpack('I',s)[0]
def write(self, data):
"""write a string to the port"""
if not self.fd: raise portNotOpenError
t = len(data)
d = data
while t>0:
n = os.write(self.fd, d)
d = d[n:]
t = t - n
def read(self, size=1):
"""read a number of bytes from the port.
the default is one (unlike files)"""
if not self.fd: raise portNotOpenError
read = ''
inp = None
if size > 0:
while len(read) < size:
#print "\tread(): size",size, "have", len(read) #debug
ready,_,_ = select.select([self.fd],[],[], self.timeout)
if not ready:
break #timeout
buf = os.read(self.fd, size-len(read))
read = read + buf
if self.timeout >= 0 and not buf:
break #early abort on timeout
return read
def flushInput(self):
"""clear input queue"""
if not self.fd:
raise portNotOpenError
termios.tcflush(self.fd, TERMIOS.TCIFLUSH)
def flushOutput(self):
"""flush output"""
if not self.fd:
raise portNotOpenError
termios.tcflush(self.fd, TERMIOS.TCOFLUSH)
def sendBreak(self):
"""send break signal"""
if not self.fd:
raise portNotOpenError
termios.tcsendbreak(self.fd, 0)
def drainOutput(self):
"""internal - not portable!"""
if not self.fd: raise portNotOpenError
termios.tcdrain(self.fd)
def nonblocking(self):
"""internal - not portable!"""
if not self.fd:
raise portNotOpenError
fcntl.fcntl(self.fd, FCNTL.F_SETFL, FCNTL.O_NONBLOCK)
def getDSR(self):
"""read terminal status line"""
if not self.fd: raise portNotOpenError
s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str)
return struct.unpack('I',s)[0] & TIOCM_DSR
def getCD(self):
"""read terminal status line"""
if not self.fd: raise portNotOpenError
s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str)
return struct.unpack('I',s)[0] & TIOCM_CD
def getRI(self):
"""read terminal status line"""
if not self.fd: raise portNotOpenError
s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str)
return struct.unpack('I',s)[0] & TIOCM_RI
def getCTS(self):
"""read terminal status line"""
if not self.fd: raise portNotOpenError
s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str)
return struct.unpack('I',s)[0] & TIOCM_CTS
def setDTR(self,on=1):
"""set terminal status line"""
if not self.fd: raise portNotOpenError
if on:
fcntl.ioctl(self.fd, TIOCMBIS, TIOCM_DTR_str)
else:
fcntl.ioctl(self.fd, TIOCMBIC, TIOCM_DTR_str)
def setRTS(self,on=1):
"""set terminal status line"""
if not self.fd: raise portNotOpenError
if on:
fcntl.ioctl(self.fd, TIOCMBIS, TIOCM_RTS_str)
else:
fcntl.ioctl(self.fd, TIOCMBIC, TIOCM_RTS_str)
if __name__ == '__main__':
s = Serial(0,
baudrate=19200, #baudrate
bytesize=EIGHTBITS, #number of databits
parity=PARITY_EVEN, #enable parity checking
stopbits=STOPBITS_ONE, #number of stopbits
timeout=3, #set a timeout value, None for waiting forever
xonxoff=0, #enable software flow control
rtscts=0, #enable RTS/CTS flow control
)
s.setDTR(1)
s.flushInput()
s.flushOutput()
s.write('hello')
print repr(s.read(5))
print s.inWaiting()
del s
?? 快捷鍵說(shuō)明
復(fù)制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號(hào)
Ctrl + =
減小字號(hào)
Ctrl + -