?? avdtp.py
字號:
"""Partial realization of AVDTP, A2DP and AVRCP profiles* Copyright (C) 2006 Sergei Krivov <krivov@yahoo.com>** This program is free software; you can redistribute it and/or modify* it under the terms of the GNU General Public License as published by* the Free Software Foundation; either version 2 of the License, or* (at your option) any later version.** This program is distributed in the hope that it will be useful,* but WITHOUT ANY WARRANTY; without even the implied warranty of* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the* GNU General Public License for more details.** You should have received a copy of the GNU General Public License* along with this program; if not, write to the Free Software* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA."""import ctypestry: from ctypes import Structure,c_uint8,c_uint16,c_ulong,c_uint32,c_void_pexcept ImportError: # for old versions of ctypes from ctypes import Structure,c_ubyte,c_ushort,c_ulong,c_uint,c_void_p c_uint8=c_ubyte c_unit16=c_ushort c_unit32=c_unitimport bluetoothimport structimport socket#//Signal idsAVDTP_DISCOVER=1AVDTP_GET_CAPABILITIES=2AVDTP_SET_CONFIGURATION=3AVDTP_GET_CONFIGURATION=4AVDTP_RECONFIGURE=5AVDTP_OPEN =6AVDTP_START =7AVDTP_CLOSE =8AVDTP_SUSPEND =9AVDTP_ABORT =10AVDTP_SECURITY_CONTROL =11MEDIA_TRANSPORT_CATEGORY =1MEDIA_CODEC =7SBC_MEDIA_CODEC_TYPE =0AUDIO_MEDIA_TYPE =0#//Packet typesPACKET_TYPE_SINGLE =0PACKET_TYPE_START =1PACKET_TYPE_CONTINUE =2PACKET_TYPE_END =3#//Message TypesMESSAGE_TYPE_COMMAND =0MESSAGE_TYPE_ACCEPT =2MESSAGE_TYPE_REJECT =3MEDIA_PACKET_HEADER_LENGTH =14MAX_ADDITIONAL_CODEC=4deb=1tran=0class message_header_single(Structure): _pack_=1 _fields_=[("message_type",c_uint8,2),("packet_type",c_uint8,2)\ ,("transaction_label",c_uint8,4),("signal_id",c_uint8,6),("rfa0",c_uint8,2)]class message_header_start(Structure): _pack_=1 _fields_=[("message_type",c_uint8,2),("packet_type",c_uint8,2),("nsop",c_uint8,8)\ ,("transaction_label",c_uint8,4),("signal_id",c_uint8,6),("rfa0",c_uint8,2)]class message_header_continue(Structure): _pack_=1 _fields_=[("message_type",c_uint8,2),("packet_type",c_uint8,2)\ ,("transaction_label",c_uint8,4)]class message_single(Structure): _pack_=1 _fields_=[("header",message_header_single),("rfa0",c_uint8,2),("acp_seid",c_uint8,6)]class sbc_codec_elements(Structure): _pack_=1 _fields_=[("channel_mode",c_uint8,4),("frequency",c_uint8,4)\ ,("allocation_method",c_uint8,2),("subbands",c_uint8,2)\ ,("block_length",c_uint8,4),("min_bitpool",c_uint8,8),("max_bitpool",c_uint8,8)]class acp_seid_info(Structure): _pack_=1 _fields_=[("rfa0",c_uint8,1),("inuse",c_uint8,1),("acp_seid",c_uint8,6)\ ,("rfa1",c_uint8,3),("tsep",c_uint8,1),("media_type",c_uint8,4)]class sepd_resp(Structure): _pack_=1 _fields_=[("header",message_header_single)] for i in range(1+MAX_ADDITIONAL_CODEC):_fields_.append(("n%i" %(i),acp_seid_info))class sepd_reject(Structure): _pack_=1 _fields_=[("header",message_header_single),("error",c_uint8)]class getcap_resp(Structure): _pack_=1 _fields_=[("header",message_header_single)] for s in ("serv_cat","serv_cap_len","cap_type","length","media_type"\ ,"media_codec_type"): _fields_.append((s,c_uint8,8)) _fields_.append(("sbc_elements",sbc_codec_elements)) class set_sbc_req(Structure): _pack_=1 _fields_=[("header",message_header_single),("rfa0",c_uint8,2)\ ,("acp_seid",c_uint8,6),("rfa1",c_uint8,2),("int_seid",c_uint8,6)] for s in ("serv_cap","serv_cap_len","cap_type","length","media_type"\ ,"media_codec_type"): _fields_.append((s,c_uint8,8)) _fields_.append(("sbc_elements",sbc_codec_elements)) class set_sbc_resp(Structure): _pack_=1 _fields_=[("header",message_header_single),("serv_cap",c_uint8,8),("acp_seid",c_uint8,8)]class open_strm_resp(Structure): _pack_=1 _fields_=[("header",message_header_single),("error",c_uint8,8)]class start_strm_resp(Structure): _pack_=1 _fields_=[("header",message_header_single),("rfa0",c_uint8,2),("acp_seid",c_uint8,6)\ ,("error",c_uint8,8)]def print_fields(st,gap=''): print gap+str(st.__class__)+':' for f in st._fields_: a=st.__getattribute__(f[0]) try: b=a.__getattribute__("_fields_") print f[0]+":" print_fields(a,gap+' ') except: print gap+str(f[0]),a def avdtp_connect(dst,psm=25): sock=bluetooth.BluetoothSocket( bluetooth.L2CAP ) sock.bind(("",psm)) sock.connect((dst,psm)) if deb: print "connected to ",dst,psm bluetooth.set_l2cap_mtu(sock,672) return sockdef avdtp_disconnect(sock): sock.close()def init_command_single(req): global tran req.header.packet_type=PACKET_TYPE_SINGLE req.header.message_type=MESSAGE_TYPE_COMMAND req.header.transaction_label=tran req.header.rfa0=0 tran=(tran+1) & 0xf return reqdef send_packet(sock,packet): size=ctypes.sizeof(packet) if deb>1: print "sending packet",size,struct.unpack("%iB" %(size),packet) print_fields(packet) l=sock.send(packet) if l!=size: raise IOError('transmission error') def receive_response(sock,resp_class,resp_error_class=None): data=sock.recv(1024) size=ctypes.sizeof(resp_class) if len(data)>size: if deb>1:print "warning, possibly wrong responce class ",len(data),size if len(data)<size: if deb>1:print "warning, got partial responce, pad with zeros",len(data),size dl=size-len(data) l0=[0 for i in range(dl)] data+=struct.pack("%iB" %(dl),*l0) resp=ctypes.cast(data,ctypes.POINTER(resp_class)) resp=resp.contents if deb>1: print "received",struct.unpack("%iB" %(len(data)),data) print_fields(resp) return respdef avdtp_get_capabilities(sock,seid): cmd=message_single() cmd=init_command_single(cmd) cmd.header.signal_id=AVDTP_GET_CAPABILITIES cmd.acp_seid=seid send_packet(sock,cmd) resp=receive_response(sock,getcap_resp) return respdef avdtp_discover(sock): cmd=message_single() cmd=init_command_single(cmd) cmd.header.signal_id=AVDTP_DISCOVER send_packet(sock,cmd) resp=receive_response(sock,sepd_resp) lseid=[] for i in range(1+MAX_ADDITIONAL_CODEC): sep=resp.__getattribute__("n%i" %(i)) seid=sep.acp_seid if seid:lseid.append(seid) # take non zero seid if deb:print "got %i seid" %(len(lseid)) lsep=[(avdtp_get_capabilities(sock,seid),seid) for seid in lseid] return lsepdef avdtp_discover_rsp(sock): global tran cmd=receive_response(sock,message_single) tran=cmd.header.transaction_label rsp=sepd_resp() rsp=init_command_single(rsp) rsp.header.message_type=MESSAGE_TYPE_ACCEPT rsp.header.signal_id=AVDTP_DISCOVER sbc=rsp.n0 sbc.acp_seid=1 sbc.inuse=0 sbc.tsep=1 sbc.media_type=SBC_MEDIA_CODEC_TYPE send_packet(sock,rsp)def set_sbc_configuration(sock,capb_resp,seid,sbc_codec): cmd=set_sbc_req() cmd=init_command_single(cmd) cmd.header.signal_id=AVDTP_SET_CONFIGURATION cmd.serv_cap=MEDIA_TRANSPORT_CATEGORY cmd.acp_seid=seid cmd.int_seid=1 cmd.cap_type=MEDIA_CODEC cmd.length=6 cmd.media_type=AUDIO_MEDIA_TYPE cmd.media_codec_type=SBC_MEDIA_CODEC_TYPE cmd.sbc_elements=capb_resp.sbc_elements cmd.sbc_elements.allocation_method=2# values of parameters in sbc routine and here are different for par,vals in sbc_codec.conf_dict.items(): codec_att=sbc_codec.par.__getattribute__(par) code=vals[codec_att] resp_att=capb_resp.sbc_elements.__getattribute__(par) if resp_att & code: if deb: print 'setting %s=%i, %i' %(par,code,codec_att) cmd.sbc_elements.__setattr__(par,code) else: if deb: print 'can not set %s=%i, %i' %(par,code,codec_att) send_packet(sock,cmd) resp=receive_response(sock,set_sbc_resp) if resp.header.message_type!=MESSAGE_TYPE_ACCEPT: raise IOError('Can not set SBC codec parameters') if deb: print "Successfully set SBC codec parameters" return seid,sbc_codec def avdtp_set_configuration(sock,lsep,codecs): """ select codec from available codecs and sets codec configuration just SBC codec is implemented return seid, codec """ lsbc=[(resp,seid) for resp,seid in lsep if \ resp.header.message_type!=MESSAGE_TYPE_REJECT and\ resp.media_codec_type==SBC_MEDIA_CODEC_TYPE and\ resp.media_type==AUDIO_MEDIA_TYPE] if not lsbc: raise IOError('ACP site dose not have SBC codec') if len(lsbc)>1 and deb: print 'ACP site has more then one SBC codec, take first' # if ever possible resp,seid=lsbc[0] if deb: print "seid=",seid return set_sbc_configuration(sock,resp,seid,codecs[SBC_MEDIA_CODEC_TYPE]) def avdtp_open(dst,sock,seid): """open the audio stream""" cmd=message_single() cmd=init_command_single(cmd) cmd.header.signal_id=AVDTP_OPEN cmd.acp_seid=seid send_packet(sock,cmd) resp=receive_response(sock,open_strm_resp) if resp.error: raise IOError('Can not open stream') if deb: print "opened stream" return avdtp_connect(dst,25)def avdtp_start(sock,seid): cmd=message_single() cmd=init_command_single(cmd) cmd.header.signal_id=AVDTP_START cmd.acp_seid=seid send_packet(sock,cmd) resp=receive_response(sock,start_strm_resp) if resp.error: raise IOError('Can not start stream') if deb: print "started stream"def avdtp_suspend(sock,seid): cmd=message_single() cmd=init_command_single(cmd) cmd.header.signal_id=AVDTP_SUSPEND cmd.acp_seid=seid send_packet(sock,cmd) resp=receive_response(sock,start_strm_resp) if resp.error: raise IOError('Can not stop stream') if deb: print "suspended stream"def avdtp_close(sock,seid): cmd=message_single() cmd=init_command_single(cmd) cmd.header.signal_id=AVDTP_CLOSE cmd.acp_seid=seid send_packet(sock,cmd) resp=receive_response(sock,open_strm_resp) if resp.error: raise IOError('Can not close stream') if deb: print "closed steam" class media_packet_header(Structure): _pack_=1 _fields_=[("cc",c_uint8,4),("x",c_uint8,1),("p",c_uint8,1),("v",c_uint8,2)\ ,("pt",c_uint8,7),("m",c_uint8,1)\ ,("sequence_number",c_uint16),("time_stamp",c_uint32),("ssrc",c_uint32)]class media_payload_header(Structure): _pack_=1 _fields_=[("frame_count",c_uint8,4),("rfa01",c_uint8,1),("is_last_fragment",c_uint8,1)\ ,("is_first_fragment",c_uint8,1),("is_fragmented",c_uint8,1)]def media_packet(data,timestamp,frame_count,seq_number): mtu=672 if len(data)+ctypes.sizeof(media_payload_header)+ctypes.sizeof(media_packet_header)>mtu: raise ValueError('Media packet size >mtu') class _media_packet(Structure): _pack_=1 _fields_=[("media_packet_header",media_packet_header)\ ,("media_payload_header",media_payload_header)\ ,("data",c_uint8*ctypes.sizeof(data))] packet=_media_packet() packet.media_packet_header.v=2 packet.media_packet_header.pt=1 packet.media_packet_header.sequence_number=socket.htons(0) #seq_number packet.media_packet_header.time_stamp=socket.htonl(0)#timestamp packet.media_packet_header.ssrc=socket.htonl(1) packet.media_payload_header.frame_count=frame_count packet.media_payload_header.is_fragmented=0 packet.media_payload_header.rfa=0 packet.data=data return packet##### AVCTP & AVRCP ################################################################################### Message typesAVCTP_COMMAND_FRAME=0AVCTP_RESPONSE_FRAME=1CMD_PASSTHROUGH=0CMD_ACCEPTED=9PLAY_OP=68 #0x44STOP_OP=69 #0x45PAUSE_OP=70 #0x46NEXT_OP=75 #0x4bPREV_OP=76 #0x4cclass avctp_header(Structure): _pack_=1 _fields_=[("ipid",c_uint8,1),("cr",c_uint8,1),("packet_type",c_uint8,2),\ ("transaction_label",c_uint8,4),("pid",c_uint16)]class avctp_frame(Structure): _pack_=1 _fields_=[("header",avctp_header),("ctype",c_uint8,4),("zeros",c_uint8,4),\ ("subunit_id",c_uint8,3),("subunit_type",c_uint8,5),("opcode",c_uint8,8),\ ("operand0",c_uint8,8),("operand1",c_uint8,8)]def avrcp_accept_connection(): sock=bluetooth.BluetoothSocket( bluetooth.L2CAP ) port=23 sock.bind(("",port)) sock.listen(1) client_sock,address = sock.accept() if deb: print "Accepted connection from ",address return client_sockdef avrcp_command(sock,data,callback): cmd=ctypes.cast(data,ctypes.POINTER(avctp_frame)) cmd=cmd.contents if cmd.header.packet_type!=PACKET_TYPE_SINGLE: raise ValueError('packet type != PACKET_TYPE_SINGLE') if cmd.ctype==CMD_PASSTHROUGH: callback(cmd.operand0) cmd.header.ipid=0 # use the same packet for responce cmd.header.cr=AVCTP_RESPONSE_FRAME cmd.header.packet_type=PACKET_TYPE_SINGLE cmd.ctype=CMD_ACCEPTED send_packet(sock,cmd)def avrcp_receive_commands(sock,callback): while True: data=sock.recv(1024) if len(data)==0:break avrcp_command(sock,data,callback) sock.close() #####SBC codec, libscb #################################################################################class sbc_struct(Structure): _fields_=[("flags",c_ulong),("frequency",c_uint32),("channel_mode",c_uint32),("joint",c_uint32) ,('block_length',c_uint32),('subbands',c_uint32),('bitpool',c_uint32)\ ,('data',c_void_p),('size',c_uint32),('len',c_uint32)\ ,('duration',c_ulong), ('priv',c_void_p)]class sbc: def __init__(self,*arg,**kwd): self.par=sbc_struct() self.libsbc=ctypes.CDLL('libsbc.so') err=self.libsbc.sbc_init(ctypes.byref(self.par),c_ulong(1)) if err: print 'error initializing sbc coder',err self.configure(*arg,**kwd) def configure(self,frequency=48000,channels=2,bitpool=32): self.par.subbands=8 self.par.block_length=16 self.par.bitpool=bitpool self.par.frequency=frequency self.par.channel_mode=channels self.conf_dict={} self.conf_dict['frequency']={48000:1,44100:2,32000:4,16000:8} self.conf_dict['subbands']={8:1,4:2} self.conf_dict['block_length']={16:1,12:2,8:4,4:8} self.conf_dict['channel_mode']={2:2,1:1} def encode(self,data): if len(data)==0:return 0,0,0,0 l=self.libsbc.sbc_encode(ctypes.byref(self.par),data,len(data)) return l,self.par.data,self.par.len,self.par.duration def fin(self): self.libsbc.sbc_finish(ctypes.byref(self.par))
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -