first commit

Signed-off-by: Harald Hoyer <harald@profian.com>
This commit is contained in:
Harald Hoyer 2022-09-20 11:14:13 +02:00
commit 5a7a77b2c5
8 changed files with 316 additions and 0 deletions

2
.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
*.pyc
.idea/

74
Kettler.py Normal file
View file

@ -0,0 +1,74 @@
#!/usr/bin/env python2
import sys
import serial, hashlib
from time import sleep
from binascii import unhexlify
from ant.core import driver
from ant.core import node
from RowerTx import RowerTx
from antConst import *
DEBUG = False
LOG = None
rower_trainer = None
antnode = None
ROWER_SENSOR_ID = int(int(hashlib.md5(getserial()).hexdigest(), 16) & 0xfffe) + 7
if __name__ =='__main__':
while True:
try:
with serial.Serial('/dev/serial0', 9600, timeout=1) as ser:
ser.write(b"st\n")
line = ser.readline()
fields = line.split()
power = fields[7]
power = int(power)
except:
sleep(1)
print("Waiting for serial line!")
continue
break
NETKEY = unhexlify(sys.argv[1])
stick = driver.USB1Driver(device=sys.argv[2], log=LOG, debug=DEBUG)
antnode = node.Node(stick)
print("Starting ANT node on network %s" % sys.argv[1])
antnode.start()
key = node.NetworkKey('N:ANT+', NETKEY)
antnode.setNetworkKey(0, key)
print("Starting power meter with ANT+ ID " + repr(ROWER_SENSOR_ID))
try:
# Create the power meter object and open it
rower_trainer = RowerTx(antnode, ROWER_SENSOR_ID)
rower_trainer.open()
except Exception as e:
print("power_meter error: " + e.message)
rower_trainer = None
try:
with serial.Serial('/dev/serial0', 9600, timeout=1) as ser:
i = 0
while True:
i += 1
sleep(0.2)
ser.write(b"st\n")
line = ser.readline()
fields = line.split()
power = fields[7]
rower_trainer.update(power = int(power))
except:
pass
if rower_trainer:
print "Closing power meter"
rower_trainer.close()
rower_trainer.unassign()
if antnode:
print "Stopping ANT node"
antnode.stop()

21
LICENSE.txt Normal file
View file

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2017 Harald Hoyer
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

13
README.md Normal file
View file

@ -0,0 +1,13 @@
## Requirements
```
$ pip2 install --user requirements.txt
```
## Usage
```
$ python2 Kettler.py <ANT Network Key in ASCII Hex> <ANT USB device>
```
Use the ANT+ network key to record the data with ANT+ compatible devices, such as a bike computer or fitness watch.

138
RowerTx.py Normal file
View file

@ -0,0 +1,138 @@
from ant.core import message
from ant.core.constants import *
from ant.core.exceptions import ChannelError
import thread
# from binascii import hexlify
import struct
CHANNEL_PERIOD = 8182
# Transmitter for Rower Power ANT+ sensor
class RowerTx(object):
data_lock = thread.allocate_lock()
class RowerData:
def __init__(self):
self.instantaneousPower = 0
self.distance = 0
self.i = 0
def __init__(self, antnode, sensor_id):
self.antnode = antnode
self.power = 0
self.sensor_id = sensor_id
# Get the channel
self.channel = antnode.getFreeChannel()
try:
self.channel.name = 'C:POWER'
self.channel.assign('N:ANT+', CHANNEL_TYPE_TWOWAY_TRANSMIT)
self.channel.setID(0x11, sensor_id & 0xFFFF, 5)
self.channel.setPeriod(8182)
self.channel.setFrequency(57)
except ChannelError as e:
print "Channel config error: " + e.message
self.powerData = RowerTx.RowerData()
self.channel.registerCallback(self)
def open(self):
self.channel.open()
def close(self):
self.channel.close()
def unassign(self):
self.channel.unassign()
def update(self, power):
self.data_lock.acquire()
self.power = power
self.data_lock.release()
def process(self, msg):
if isinstance(msg, message.ChannelEventMessage) and \
msg.getMessageID() == 1 and \
msg.getMessageCode() == EVENT_TX:
self.broadcast()
elif isinstance(msg, message.ChannelAcknowledgedDataMessage):
payload = msg.getPayload()
a, page, id_ = struct.unpack('BBB', payload[:3])
if a == 0 and page == 1 and id_ == 0xAA:
# print ("ChannelAcknowledgedDataMessage: " + hexlify(payload))
payload = chr(0x01)
payload += chr(0xAC)
payload += chr(0xFF)
payload += chr(0xFF)
payload += chr(0xFF)
payload += chr(0xFF)
payload += chr(0x00)
payload += chr(0x00)
ant_msg = message.ChannelBroadcastDataMessage(self.channel.number, data=payload)
self.antnode.driver.write(ant_msg.encode())
else:
print("Message ID %d Code %d" % (msg.getMessageID(), msg.getMessageCode()))
# Power was updated, so send out an ANT+ message
def broadcast(self):
self.powerData.i += 1
self.data_lock.acquire()
power = self.power
self.data_lock.release()
if power == 0:
speed_bytes = 0
distance_delta = 0
else:
pace = pow(2.80/float(power), 1.0/3.0)
speed = 1.0 / pace
speed_bytes = int(1000.0 / pace)
distance_delta = speed / 4.0
self.powerData.distance += distance_delta
if self.powerData.i % 132 == 64 or self.powerData.i % 132 == 65:
# page 80
payload = chr(0x50) # Manufacturer's Info
payload += chr(0xFF)
payload += chr(0xFF)
payload += chr(0x01) # HW Rev
payload += chr(0xFF) # MID LSB
payload += chr(0x00) # MID MSB
payload += chr(0x01) # Model LSB
payload += chr(0x00) # Model MSB
elif self.powerData.i % 132 == 130 or self.powerData.i % 132 == 131:
# page 81
payload = chr(0x51) # Product Info
payload += chr(0xFF)
payload += chr(0xFF) # SW Rev Supp
payload += chr(0x01) # SW Rev Main
payload += chr((self.sensor_id >> 0) & 0xFF) # Serial 0-7
payload += chr((self.sensor_id >> 8) & 0xFF) # Serial 8-15
payload += chr((self.sensor_id >> 16) & 0xFF) # Serial 16-23
payload += chr((self.sensor_id >> 24) & 0xFF) # Serial 24-31
elif self.powerData.i % 4 == 0 or self.powerData.i % 4 == 1:
# page 16
payload = chr(0x10) # standard fitness equipment page
payload += chr(22) # equipment type field / rower
payload += chr(self.powerData.i % 0xFF) # Elapsed Time
payload += chr(int(self.powerData.distance) % 0xFF) # Distance travelled accumulated
payload += chr(speed_bytes % 0xFF) # Speed LSB
payload += chr(speed_bytes >> 8) # Speed MSB
payload += chr(0xFF) # Heart rate
payload += chr((1<<2) | (1<<3)) # capabilities / FE state (transmit distance | virtual speed)
elif self.powerData.i % 4 == 2 or self.powerData.i % 4 == 3:
# page 22
self.powerData.instantaneousPower = int(power)
payload = chr(0x16) # specific rower data
payload += chr(0xFF)
payload += chr(0xFF)
payload += chr(0) # stroke count
payload += chr(0xFF) # stroke cadence
payload += chr(self.powerData.instantaneousPower & 0xff)
payload += chr(self.powerData.instantaneousPower >> 8)
payload += chr(0) # capabilities / FE state
ant_msg = message.ChannelBroadcastDataMessage(self.channel.number, data=payload)
self.antnode.driver.write(ant_msg.encode())

17
antConst.py Normal file
View file

@ -0,0 +1,17 @@
CADENCE_DEVICE_TYPE = 0x7A
SPEED2_DEVICE_TYPE = 0x0F
SPEED_DEVICE_TYPE = 0x7B
SPEED_CADENCE_DEVICE_TYPE = 0x79
POWER_DEVICE_TYPE = 0x0B
# Get the serial number of Raspberry Pi
def getserial():
machineid = "0000000000000000"
try:
f = open('/etc/machine-id', 'r')
machineid = f.readline()
f.close()
except:
machineid = "ERROR000000000"
return machineid

2
requirements.txt Normal file
View file

@ -0,0 +1,2 @@
git+https://github.com/baderj/python-ant.git
pybluez

49
testrower.py Normal file
View file

@ -0,0 +1,49 @@
import struct, sys, hashlib, curses
from time import sleep
from binascii import hexlify,unhexlify
from ant.core import driver
from ant.core import node
from RowerTx import RowerTx
from iConst import *
rower = None
ROWER_SENSOR_ID = int(int(hashlib.md5(getserial()).hexdigest(), 16) & 0xFFFFfffe) + 1
if __name__ =='__main__':
NETKEY = unhexlify(sys.argv[1])
stick = driver.USB1Driver(device=sys.argv[2], log=None, debug=True)
antnode = node.Node(stick)
print("Starting ANT node on network %s" % sys.argv[1])
antnode.start()
key = node.NetworkKey('N:ANT+', NETKEY)
antnode.setNetworkKey(0, key)
print("Starting power meter with ANT+ ID " + repr(ROWER_SENSOR_ID))
try:
# Create the power meter object and open it
rower = RowerTx(antnode, ROWER_SENSOR_ID)
rower.open()
except Exception as e:
print("power_meter error: " + e.message)
rower = None
i = 0
while True:
try:
sleep(1)
except:
break
rower.update(power = 179.2)
i += 1
if (i > 200):
break
if rower:
print "Closing power meter"
rower.close()
rower.unassign()
if antnode:
print "Stopping ANT node"
antnode.stop()