mirror of
https://github.com/apirrone/Open_Duck_Mini_Runtime.git
synced 2025-09-05 13:37:55 +00:00
62 lines
1.6 KiB
Python
62 lines
1.6 KiB
Python
import socket
|
|
import time
|
|
import pickle
|
|
from mini_bdx_runtime.imu import Imu
|
|
from threading import Thread
|
|
import time
|
|
|
|
import argparse
|
|
|
|
|
|
class IMUServer:
|
|
def __init__(self, imu=None):
|
|
self.host = "0.0.0.0"
|
|
self.port = 1234
|
|
|
|
self.server_socket = socket.socket()
|
|
self.server_socket.setsockopt(
|
|
socket.SOL_SOCKET, socket.SO_REUSEADDR, 1
|
|
) # enable address reuse
|
|
|
|
self.server_socket.bind((self.host, self.port))
|
|
|
|
if imu is None:
|
|
self.imu = Imu(50, user_pitch_bias=args.pitch_bias, upside_down=False)
|
|
else:
|
|
self.imu = imu
|
|
self.stop = False
|
|
|
|
Thread(target=self.run, daemon=True).start()
|
|
|
|
def run(self):
|
|
while not self.stop:
|
|
self.server_socket.listen(1)
|
|
conn, address = self.server_socket.accept() # accept new connection
|
|
print("Connection from: " + str(address))
|
|
try:
|
|
while True:
|
|
data = self.imu.get_data()
|
|
data = pickle.dumps(data)
|
|
conn.send(data) # send data to the client
|
|
time.sleep(1 / 30)
|
|
except:
|
|
pass
|
|
|
|
self.server_socket.close()
|
|
print("thread closed")
|
|
time.sleep(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--pitch_bias", type=float, default=0, help="deg")
|
|
args = parser.parse_args()
|
|
imu_server = IMUServer()
|
|
try:
|
|
while True:
|
|
time.sleep(0.01)
|
|
except KeyboardInterrupt:
|
|
print("Closing server")
|
|
imu_server.stop = True
|
|
|
|
time.sleep(2)
|