background-jobs/TODO
2018-11-10 16:51:21 -06:00

71 lines
2 KiB
Text

1.
Gracefull Shutdown
2.
base port is return port
base port + 1 is port mapping port
base port + 2 is first queue port
server creates port mapping ON START, and moves jobs started more than 24 hours prior to now into QUEUE from RUNNING
server supplies an infinite push of port mapping on base_port + 1
worker gets port mapping from server using pull on base_port + 1
QueuePort bucket
key: PortNum
Value: QueueName
get access:
Cursor iterates over Key/Value pairs
set access:
Ports are never overwritten, but new ports can be added
interaction:
base_port: usize,
queues: BTreeSet<String>
queue_port_bucket = store.bucket();
store = store.write_lock();
let read_txn = store.read_txn();
let write_txn = store.write_txn();
let queue_map = lock(write_txn, |write_txn| {
let cursor = read_txn.cursor();
let (unused_queues, queue_map) = cursor
.iter()
.fold(Ok((queues, BTreeMap::new())), |acc, (port, queue)| {
acc.and_then(|(mut queues, mut bm)| {
port = parse_port(port)?;
queue = parse_queue(queue)?;
queues.remove(queue);
bm.insert(queue, port);
Ok((queues, bm))
})
})?;
let start_port = queue_map.iter().max_by(|(_, v)| v).unwrap_or(base_port + 2);
let (queue_map, _) = unused_queues
.into_iter()
.fold(Ok((queue_map, start_port)), |acc, queue_name| {
acc.and_then(|(mut bm, port_num)| {
write_txn.set(port_num, queue_name)?;
bm.insert(queue_name, port_num);
Ok((bm, port_num + 1))
})
})?;
Ok(queue_map)
})?;
write_txn.commit();
read_txn.commit();
Ok(queue_map)