mirror of
https://github.com/apirrone/Open_Duck_Mini_Runtime.git
synced 2025-09-05 13:37:55 +00:00
76 lines
1.9 KiB
Python
76 lines
1.9 KiB
Python
import socket
|
|
import time
|
|
import numpy as np
|
|
import pickle
|
|
from queue import Queue
|
|
from threading import Thread
|
|
from scipy.spatial.transform import Rotation as R
|
|
from FramesViewer.viewer import Viewer
|
|
import argparse
|
|
|
|
|
|
class IMUClient:
|
|
def __init__(self, host, port=1234, freq=30):
|
|
self.host = host
|
|
self.port = port
|
|
self.freq = freq
|
|
self.client_socket = socket.socket()
|
|
self.connected = False
|
|
while not self.connected:
|
|
try:
|
|
self.client_socket.connect((self.host, self.port))
|
|
self.connected = True
|
|
except Exception as e:
|
|
print(e)
|
|
time.sleep(0.5)
|
|
self.imu_queue = Queue(maxsize=1)
|
|
self.last_imu = [0, 0, 0, 0]
|
|
|
|
Thread(target=self.imu_worker, daemon=True).start()
|
|
|
|
def imu_worker(self):
|
|
while True:
|
|
try:
|
|
data = self.client_socket.recv(1024) # receive response
|
|
data = pickle.loads(data)
|
|
|
|
self.imu_queue.put(data)
|
|
except:
|
|
print("missed imu")
|
|
|
|
time.sleep(1 / self.freq)
|
|
|
|
def get_imu(self):
|
|
try:
|
|
self.last_imu = self.imu_queue.get(False)
|
|
except:
|
|
pass
|
|
|
|
return self.last_imu
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--ip", type=str, required=True, help="IP address of the robot")
|
|
args = parser.parse_args()
|
|
|
|
client = IMUClient(args.ip)
|
|
|
|
fv = Viewer()
|
|
fv.start()
|
|
pose = np.eye(4)
|
|
pose[:3, 3] = [0.1, 0.1, 0.1]
|
|
try:
|
|
while True:
|
|
quat = client.get_imu()
|
|
try:
|
|
rot_mat = R.from_quat(quat).as_matrix()
|
|
pose[:3, :3] = rot_mat
|
|
fv.pushFrame(pose, "pose")
|
|
except Exception as e:
|
|
print("error", e)
|
|
pass
|
|
time.sleep(1 / 30)
|
|
except KeyboardInterrupt:
|
|
pass
|