diff --git a/TODO b/TODO new file mode 100644 index 0000000..42b7340 --- /dev/null +++ b/TODO @@ -0,0 +1,71 @@ +1. +Gracefull Shutdown + +2. +base port is return port +base port + 1 is port mapping port +base port + 2 is first queue port + +server creates port mapping ON START, and moves jobs started more than 24 hours prior to now into QUEUE from RUNNING + +server supplies an infinite push of port mapping on base_port + 1 +worker gets port mapping from server using pull on base_port + 1 + +QueuePort bucket + key: PortNum + Value: QueueName + + get access: + Cursor iterates over Key/Value pairs + + set access: + Ports are never overwritten, but new ports can be added + + interaction: + base_port: usize, + queues: BTreeSet + + queue_port_bucket = store.bucket(); + + store = store.write_lock(); + + let read_txn = store.read_txn(); + let write_txn = store.write_txn(); + + let queue_map = lock(write_txn, |write_txn| { + let cursor = read_txn.cursor(); + + let (unused_queues, queue_map) = cursor + .iter() + .fold(Ok((queues, BTreeMap::new())), |acc, (port, queue)| { + acc.and_then(|(mut queues, mut bm)| { + port = parse_port(port)?; + queue = parse_queue(queue)?; + + queues.remove(queue); + bm.insert(queue, port); + + Ok((queues, bm)) + }) + })?; + + let start_port = queue_map.iter().max_by(|(_, v)| v).unwrap_or(base_port + 2); + + let (queue_map, _) = unused_queues + .into_iter() + .fold(Ok((queue_map, start_port)), |acc, queue_name| { + acc.and_then(|(mut bm, port_num)| { + write_txn.set(port_num, queue_name)?; + bm.insert(queue_name, port_num); + + Ok((bm, port_num + 1)) + }) + })?; + + Ok(queue_map) + })?; + + write_txn.commit(); + read_txn.commit(); + + Ok(queue_map)