From 00c9b306b037379ceb38bc28f71440e82ebcac12 Mon Sep 17 00:00:00 2001 From: Harald Hoyer Date: Thu, 20 Apr 2017 07:44:37 +0200 Subject: [PATCH] all in one --- PowerMeterTx.py | 68 --------------------------------------- constants.py | 20 ------------ iconsole.py | 84 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 84 insertions(+), 88 deletions(-) delete mode 100644 PowerMeterTx.py delete mode 100644 constants.py diff --git a/PowerMeterTx.py b/PowerMeterTx.py deleted file mode 100644 index c6e638b..0000000 --- a/PowerMeterTx.py +++ /dev/null @@ -1,68 +0,0 @@ -import sys -from ant.core import message -from ant.core.constants import * -from ant.core.exceptions import ChannelError - -from constants import * - -VPOWER_DEBUG = False -CHANNEL_PERIOD = 8182 - - -# Transmitter for Bicycle Power ANT+ sensor -class PowerMeterTx(object): - class PowerData: - def __init__(self): - self.eventCount = 0 - self.eventTime = 0 - self.cumulativePower = 0 - self.instantaneousPower = 0 - - def __init__(self, antnode, sensor_id): - self.antnode = antnode - - # Get the channel - self.channel = antnode.getFreeChannel() - try: - self.channel.name = 'C:POWER' - self.channel.assign('N:ANT+', CHANNEL_TYPE_TWOWAY_TRANSMIT) - self.channel.setID(POWER_DEVICE_TYPE, sensor_id, 0) - self.channel.setPeriod(8182) - self.channel.setFrequency(57) - except ChannelError as e: - print "Channel config error: "+e.message - self.powerData = PowerMeterTx.PowerData() - - def open(self): - self.channel.open() - - def close(self): - self.channel.close() - - def unassign(self): - self.channel.unassign() - - # Power was updated, so send out an ANT+ message - def update(self, power, cadence): - if VPOWER_DEBUG: print 'PowerMeterTx: update called with power ', power - self.powerData.eventCount = (self.powerData.eventCount + 1) & 0xff - if VPOWER_DEBUG: print 'eventCount ', self.powerData.eventCount - self.powerData.cumulativePower = (self.powerData.cumulativePower + int(power)) & 0xffff - if VPOWER_DEBUG: print 'cumulativePower ', self.powerData.cumulativePower - self.powerData.instantaneousPower = int(power) - if VPOWER_DEBUG: print 'instantaneousPower ', self.powerData.instantaneousPower - - payload = chr(0x10) # standard power-only message - payload += chr(self.powerData.eventCount) - payload += chr(0xFF) # Pedal power not used - payload += chr(cadence) - payload += chr(self.powerData.cumulativePower & 0xff) - payload += chr(self.powerData.cumulativePower >> 8) - payload += chr(self.powerData.instantaneousPower & 0xff) - payload += chr(self.powerData.instantaneousPower >> 8) - - ant_msg = message.ChannelBroadcastDataMessage(self.channel.number, data=payload) - #sys.stdout.write('+') - #sys.stdout.flush() - if VPOWER_DEBUG: print 'Write message to ANT stick on channel ' + repr(self.channel.number) - self.antnode.driver.write(ant_msg.encode()) diff --git a/constants.py b/constants.py deleted file mode 100644 index f4a344f..0000000 --- a/constants.py +++ /dev/null @@ -1,20 +0,0 @@ -SPEED_DEVICE_TYPE = 0x7B -CADENCE_DEVICE_TYPE = 0x7A -SPEED_CADENCE_DEVICE_TYPE = 0x79 -POWER_DEVICE_TYPE = 0x0B - - -# Get the serial number of Raspberry Pi -def getserial(): - # Extract serial from cpuinfo file - cpuserial = "0000000000000000" - try: - f = open('/proc/cpuinfo', 'r') - for line in f: - if line[0:6] == 'Serial': - cpuserial = line[10:26] - f.close() - except: - cpuserial = "ERROR000000000" - - return cpuserial diff --git a/iconsole.py b/iconsole.py index cc6d9ae..ccd0783 100644 --- a/iconsole.py +++ b/iconsole.py @@ -48,6 +48,9 @@ from ant.core import node from PowerMeterTx import PowerMeterTx from constants import * from bluetooth import * +from ant.core import message +from ant.core.constants import * +from ant.core.exceptions import ChannelError INIT_A0 = struct.pack('BBBBB', 0xf0, 0xa0, 0x02, 0x02, 0x94) PING = struct.pack('BBBBB', 0xf0, 0xa0, 0x01, 0x01, 0x92) @@ -64,6 +67,87 @@ LOG = None NETKEY = '\xB9\xA5\x21\xFB\xBD\x72\xC3\x45' power_meter = None +SPEED_DEVICE_TYPE = 0x7B +CADENCE_DEVICE_TYPE = 0x7A +SPEED_CADENCE_DEVICE_TYPE = 0x79 +POWER_DEVICE_TYPE = 0x0B + +VPOWER_DEBUG = False +CHANNEL_PERIOD = 8182 + +# Get the serial number of Raspberry Pi +def getserial(): + # Extract serial from cpuinfo file + cpuserial = "0000000000000000" + try: + f = open('/proc/cpuinfo', 'r') + for line in f: + if line[0:6] == 'Serial': + cpuserial = line[10:26] + f.close() + except: + cpuserial = "ERROR000000000" + + return cpuserial + +# Transmitter for Bicycle Power ANT+ sensor +class PowerMeterTx(object): + class PowerData: + def __init__(self): + self.eventCount = 0 + self.eventTime = 0 + self.cumulativePower = 0 + self.instantaneousPower = 0 + + def __init__(self, antnode, sensor_id): + self.antnode = antnode + + # Get the channel + self.channel = antnode.getFreeChannel() + try: + self.channel.name = 'C:POWER' + self.channel.assign('N:ANT+', CHANNEL_TYPE_TWOWAY_TRANSMIT) + self.channel.setID(POWER_DEVICE_TYPE, sensor_id, 0) + self.channel.setPeriod(8182) + self.channel.setFrequency(57) + except ChannelError as e: + print "Channel config error: "+e.message + self.powerData = PowerMeterTx.PowerData() + + def open(self): + self.channel.open() + + def close(self): + self.channel.close() + + def unassign(self): + self.channel.unassign() + + # Power was updated, so send out an ANT+ message + def update(self, power, cadence): + if VPOWER_DEBUG: print 'PowerMeterTx: update called with power ', power + self.powerData.eventCount = (self.powerData.eventCount + 1) & 0xff + if VPOWER_DEBUG: print 'eventCount ', self.powerData.eventCount + self.powerData.cumulativePower = (self.powerData.cumulativePower + int(power)) & 0xffff + if VPOWER_DEBUG: print 'cumulativePower ', self.powerData.cumulativePower + self.powerData.instantaneousPower = int(power) + if VPOWER_DEBUG: print 'instantaneousPower ', self.powerData.instantaneousPower + + payload = chr(0x10) # standard power-only message + payload += chr(self.powerData.eventCount) + payload += chr(0xFF) # Pedal power not used + payload += chr(cadence) + payload += chr(self.powerData.cumulativePower & 0xff) + payload += chr(self.powerData.cumulativePower >> 8) + payload += chr(self.powerData.instantaneousPower & 0xff) + payload += chr(self.powerData.instantaneousPower >> 8) + + ant_msg = message.ChannelBroadcastDataMessage(self.channel.number, data=payload) + #sys.stdout.write('+') + #sys.stdout.flush() + if VPOWER_DEBUG: print 'Write message to ANT stick on channel ' + repr(self.channel.number) + self.antnode.driver.write(ant_msg.encode()) + class IConsole(object): def __init__(self, got): gota = struct.unpack('BBBBBBBBBBBBBBBBBBBBB', got)