use bluez

This commit is contained in:
Harald Hoyer 2017-04-20 07:42:01 +02:00
parent 410e199723
commit 2d011987d7
2 changed files with 141 additions and 156 deletions

View file

@ -5,7 +5,7 @@ from ant.core.exceptions import ChannelError
from constants import * from constants import *
VPOWER_DEBUG = True VPOWER_DEBUG = False
CHANNEL_PERIOD = 8182 CHANNEL_PERIOD = 8182
@ -62,7 +62,7 @@ class PowerMeterTx(object):
payload += chr(self.powerData.instantaneousPower >> 8) payload += chr(self.powerData.instantaneousPower >> 8)
ant_msg = message.ChannelBroadcastDataMessage(self.channel.number, data=payload) ant_msg = message.ChannelBroadcastDataMessage(self.channel.number, data=payload)
sys.stdout.write('+') #sys.stdout.write('+')
sys.stdout.flush() #sys.stdout.flush()
if VPOWER_DEBUG: print 'Write message to ANT stick on channel ' + repr(self.channel.number) if VPOWER_DEBUG: print 'Write message to ANT stick on channel ' + repr(self.channel.number)
self.antnode.driver.write(ant_msg.encode()) self.antnode.driver.write(ant_msg.encode())

View file

@ -40,13 +40,14 @@
# 357.7 83 # 357.7 83
# 364.6 84 6.9 pro rpm # 364.6 84 6.9 pro rpm
import serial, struct, sys, hashlib import serial, struct, sys, hashlib, curses
from time import sleep from time import sleep
from binascii import hexlify from binascii import hexlify
from ant.core import driver from ant.core import driver
from ant.core import node from ant.core import node
from PowerMeterTx import PowerMeterTx from PowerMeterTx import PowerMeterTx
from constants import * from constants import *
from bluetooth import *
INIT_A0 = struct.pack('BBBBB', 0xf0, 0xa0, 0x02, 0x02, 0x94) INIT_A0 = struct.pack('BBBBB', 0xf0, 0xa0, 0x02, 0x02, 0x94)
PING = struct.pack('BBBBB', 0xf0, 0xa0, 0x01, 0x01, 0x92) PING = struct.pack('BBBBB', 0xf0, 0xa0, 0x01, 0x01, 0x92)
@ -61,45 +62,28 @@ POWER_SENSOR_ID = int(int(hashlib.md5(getserial()).hexdigest(), 16) & 0xfffe) +
DEBUG = False DEBUG = False
LOG = None LOG = None
NETKEY = '\xB9\xA5\x21\xFB\xBD\x72\xC3\x45' NETKEY = '\xB9\xA5\x21\xFB\xBD\x72\xC3\x45'
power_meter = None
import signal class IConsole(object):
port = serial.Serial('/dev/rfcomm0') def __init__(self, got):
gota = struct.unpack('BBBBBBBBBBBBBBBBBBBBB', got)
self.time_str = "%02d:%02d:%02d:%02d" % (gota[2]-1, gota[3]-1, gota[4]-1, gota[5]-1)
self.speed = ((100*(gota[6]-1) + gota[7] -1) / 10.0)
self.speed_str = "V: % 3.1f km/h" % self.speed
self.rpm = ((100*(gota[8]-1) + gota[9] -1))
self.rpm_str = "% 3d RPM" % self.rpm
self.distance = ((100*(gota[10]-1) + gota[11] -1) / 10.0)
self.distance_str = "D: % 3.1f km" % self.distance
self.calories = ((100*(gota[12]-1) + gota[13] -1))
self.calories_str = "% 3d kcal" % self.calories
self.hf = ((100*(gota[14]-1) + gota[15] -1))
self.hf_str = "HF % 3d" % self.hf
self.power = ((100*(gota[16]-1) + gota[17] -1) / 10.0)
self.power_str = "% 3.1f W" % self.power
self.lvl = gota[18] -1
self.lvl_str = "L: %d" % self.lvl
class GracefulInterruptHandler(object): def send_ack(packet, expect=None, plen=0):
def __init__(self, sig=signal.SIGINT):
self.sig = sig
def __enter__(self):
self.interrupted = False
self.released = False
self.original_handler = signal.getsignal(self.sig)
def handler(signum, frame):
self.release()
self.interrupted = True
signal.signal(self.sig, handler)
return self
def __exit__(self, type, value, tb):
self.release()
def release(self):
if self.released:
return False
signal.signal(self.sig, self.original_handler)
self.released = True
return True
def send_ack(sig, packet, expect=None, plen=0):
if expect == None: if expect == None:
expect = 0xb0 | (ord(packet[1]) & 0xF) expect = 0xb0 | (ord(packet[1]) & 0xF)
@ -109,19 +93,12 @@ def send_ack(sig, packet, expect=None, plen=0):
got = None got = None
while got == None: while got == None:
sleep(0.1) sleep(0.1)
if sig.interrupted: sock.sendall(packet)
exit_all(sig)
port.read_all()
port.write(packet)
port.flush()
#print "->" + hexlify(packet)
i = 0 i = 0
while got == None and i < 6: while got == None and i < 6:
i+=1 i+=1
sleep(0.1) sleep(0.1)
if sig.interrupted: got = sock.recv(plen)
exit_all(sig)
got = port.read_all()
if len(got) == plen: if len(got) == plen:
#print "<-" + hexlify(got) #print "<-" + hexlify(got)
pass pass
@ -134,136 +111,146 @@ def send_ack(sig, packet, expect=None, plen=0):
if got and len(got) >= 3 and got[0] == packet[0] and ord(got[1]) == expect: if got and len(got) >= 3 and got[0] == packet[0] and ord(got[1]) == expect:
break break
got = None got = None
print "---> Retransmit" #print "---> Retransmit"
return got return got
def send_level(sig, lvl): def send_level(lvl):
packet = struct.pack('BBBBBB', 0xf0, 0xa6, 0x01, 0x01, lvl+1, (0xf0+0xa6+3+lvl) & 0xFF) packet = struct.pack('BBBBBB', 0xf0, 0xa6, 0x01, 0x01, lvl+1, (0xf0+0xa6+3+lvl) & 0xFF)
got = send_ack(sig, packet) got = send_ack(packet)
return got return got
def exit_all(sig): def btcon():
sig.interrupted = False addr = None
send_ack(sig, STOP) devs = discover_devices(duration=2, lookup_names = True)
print "STOP done" for (addr, name) in devs:
send_ack(sig, PING) if name.startswith("i-CONSOLE"):
print "ping done" break
send_ack(sig, PING) addr = None
print "ping done"
port.close()
if power_meter:
print "Closing power meter"
power_meter.close()
power_meter.unassign()
if antnode:
print "Stopping ANT node"
antnode.stop()
sys.exit(0) if addr == None:
print("could not find i-CONSOLE bluetooth")
sys.exit(0)
power_meter = None service_matches = find_service( address = addr, uuid = SERIAL_PORT_CLASS )
antnode = None
if len(service_matches) == 0:
print("couldn't find i-Console serial port")
sys.exit(0)
first_match = service_matches[0]
port = first_match["port"]
name = first_match["name"]
host = first_match["host"]
print("connecting to \"%s\" on %s" % (name, host))
# Create the client socket
sock=BluetoothSocket( RFCOMM )
sock.connect((host, port))
return sock
def prints(w, s):
w.addstr(3, 0, s)
w.clrtoeol()
w.refresh()
#send_level(10) #send_level(10)
def main(win): def main(win):
curses.noecho()
curses.cbreak()
win.nodelay(True) win.nodelay(True)
win.keypad(1)
win.refresh() win.refresh()
prints(win, "OK")
i = 0
send_ack(PING)
prints(win, "ping done")
send_ack(INIT_A0, expect=0xb7, plen=6)
prints(win, "A0 done")
for i in range(0, 5):
send_ack(PING)
prints(win, "ping done")
send_ack(STATUS, plen=6)
prints(win, "status done")
send_ack(PING)
prints(win, "ping done")
send_ack(INIT_A3)
prints(win, "A3 done")
send_ack(INIT_A4)
prints(win, "A4 done")
send_ack(START)
prints(win, "START done")
level = 1
while True:
sleep(0.3)
while True:
key = win.getch()
if key == ord('q'):
return
elif key == ord('a') or key == curses.KEY_UP or key == curses.KEY_RIGHT:
if level < 31:
level += 1
prints(win, "Level: %d" % level)
send_level(level)
elif key == ord('y') or key == curses.KEY_DOWN or key == curses.KEY_LEFT:
if level > 1:
level -= 1
prints(win, "Level: %d" % level)
send_level(level)
elif key == -1:
break
got = send_ack(READ, plen=21)
if len(got) == 21:
ic = IConsole(got)
win.addstr(0,0, "%s - %s - %s - %s - %s - %s - %s - %s" % (ic.time_str,
ic.speed_str,
ic.rpm_str,
ic.distance_str,
ic.calories_str,
ic.hf_str,
ic.power_str,
ic.lvl_str))
win.clrtoeol()
win.refresh()
power_meter.update(power = ic.power, cadence = ic.rpm)
if __name__ =='__main__':
sock = btcon()
stick = driver.USB1Driver(device="/dev/ttyUSB0", log=LOG, debug=DEBUG) stick = driver.USB1Driver(device="/dev/ttyUSB0", log=LOG, debug=DEBUG)
antnode = node.Node(stick) antnode = node.Node(stick)
print "Starting ANT node" print("Starting ANT node")
antnode.start() antnode.start()
key = node.NetworkKey('N:ANT+', NETKEY) key = node.NetworkKey('N:ANT+', NETKEY)
antnode.setNetworkKey(0, key) antnode.setNetworkKey(0, key)
print "Starting power meter with ANT+ ID " + repr(POWER_SENSOR_ID) print("Starting power meter with ANT+ ID " + repr(POWER_SENSOR_ID))
try: try:
# Create the power meter object and open it # Create the power meter object and open it
power_meter = PowerMeterTx(antnode, POWER_SENSOR_ID) power_meter = PowerMeterTx(antnode, POWER_SENSOR_ID)
power_meter.open() power_meter.open()
except Exception as e: except Exception as e:
print "power_meter error: " + e.message print("power_meter error: " + e.message)
power_meter = None power_meter = None
print "OK" curses.wrapper(main)
i = 0
with GracefulInterruptHandler() as sig:
send_ack(sig, PING)
print "ping done"
send_ack(sig, INIT_A0, expect=0xb7, plen=6) if sock:
print "A0 done" send_ack(STOP)
send_ack(PING)
for i in range(0, 5): sock.close()
send_ack(sig, PING)
print "ping done"
send_ack(sig, STATUS, plen=6)
print "status done"
send_ack(sig, PING)
print "ping done"
send_ack(sig, INIT_A3)
print "A3 done"
send_ack(sig, INIT_A4)
print "A4 done"
send_ack(sig, START)
print "START done"
level = 1
while True:
if sig.interrupted:
exit_all(sig)
sleep(0.3)
try:
key = win.getch()
if key == 'q':
break
if key == 'a':
level += 1
print "Level: %d" % level
send_level(sig, level)
if key == 'y':
level -= 1
print "Level: %d" % level
send_level(sig, level)
except Exception as e:
pass
#i+=1
# if i % 20 == 2:
# send_level((i/20) +1)
got = send_ack(sig, READ, plen=21)
if len(got) == 21:
gota = struct.unpack('BBBBBBBBBBBBBBBBBBBBB', got)
time = "%02d:%02d:%02d:%02d" % (gota[2]-1, gota[3]-1, gota[4]-1, gota[5]-1)
speed = "V: % 3.1f km/h" % ((100*(gota[6]-1) + gota[7] -1) / 10.0)
rpm = "% 3d RPM" % ((100*(gota[8]-1) + gota[9] -1))
distance = "D: % 3.1f km" % ((100*(gota[10]-1) + gota[11] -1) / 10.0)
calories = "% 3d kcal" % ((100*(gota[12]-1) + gota[13] -1))
hf = "HF % 3d" % ((100*(gota[14]-1) + gota[15] -1))
watt = "% 3.1f W" % ((100*(gota[16]-1) + gota[17] -1) / 10.0)
lvl = "L: %d" % (gota[18] -1)
print "%s - %s - %s - %s - %s - %s - %s - %s" % (time, speed, rpm, distance, calories, hf, watt, lvl)
power_meter.update(power = ((100*(gota[16]-1) + gota[17] -1) / 10.0),
cadence = ((100*(gota[8]-1) + gota[9] -1)))
send_ack(sig, STOP)
print "STOP done"
for i in range(0, 5):
send_ack(sig, PING)
print "ping done"
port.close()
if power_meter: if power_meter:
print "Closing power meter" print "Closing power meter"
@ -273,5 +260,3 @@ def main(win):
print "Stopping ANT node" print "Stopping ANT node"
antnode.stop() antnode.stop()
import curses
curses.wrapper(main)