← BACK

Scheduler

Julian Rakuschek - 2023-04-11

Python

Scheduling

Multi-Threading

The below script features a scheduler which can process a queue. Background was the FGP testing system Herkules which required a more flexible solution than the standard Threadpool in python.

import json
import os
import subprocess
import threading
from dataclasses import dataclass
from datetime import datetime
from time import sleep
from typing import Callable

import redis

SCHEDULER_LOG_FILE = "/path/to/logfile.log"
LOGGING_ENABLED = True

# Redis settings
QUEUE = "queue"
ANNOUNCE_CHANNEL = "announce"
CURRENTLY_EXECUTING = "currently_executing"

# Thread slots to simulate a thread pool
# False = Slot is free
# True = Slot is taken
thread_slots = [False for _ in range(THREADS)]
thread_slots_cv_lock = threading.Lock()
thread_slots_cv = threading.Condition(thread_slots_cv_lock)
thread_slots_lock = threading.Lock()

# Change working directory to file directory
os.chdir(os.path.abspath(os.path.dirname(__file__)))
log_file_global = open(SCHEDULER_LOG_FILE, "a")

# ----------------------------------------------------------------------------------------------------------------------
def log(log_file, type_, message):
    """
    Writes a given message string to a file
    :param log_file: File descriptor of an already opened text file (more efficient to use already open file)
    :param type_: Prefix written in front of the message
    :param message: string to be logged
    """
    if LOGGING_ENABLED:
        timestamp = datetime.now().isoformat()
        log_file.write(f"[{timestamp}] [{type_}] {message}\n")
        print(f"[{timestamp}] [{type_}] {message}")
        log_file.flush()


# ----------------------------------------------------------------------------------------------------------------------
def request_thread_slot():
    """
    Returns the first free slot in the global array thread_slots and marks it as taken (True)
    => This serves as the Thread ID used for the build directory
    Be careful: thread_slots needs proper locking!
    """
    while True:
        thread_slots_lock.acquire()
        for slot_index in range(len(thread_slots)):
            if not thread_slots[slot_index]:
                thread_slots[slot_index] = True
                thread_slots_lock.release()
                return slot_index
        thread_slots_lock.release()
        thread_slots_cv_lock.acquire()
        thread_slots_cv.wait(1)
        thread_slots_cv_lock.release()


def free_thread_slot(slot):
    thread_slots_lock.acquire()
    thread_slots[slot] = False
    thread_slots_lock.release()
    thread_slots_cv_lock.acquire()
    thread_slots_cv.notify()
    thread_slots_cv_lock.release()


# ----------------------------------------------------------------------------------------------------------------------
def execute_worker_tasks(number):
    log(log_file_global, f"THREAD-{number}", f"Starting worker tasks")
    
    # ==> Execute your worker tasks here <==
    
    log(log_file_global, f"THREAD-{number}", f"Finished worker tasks")


# ----------------------------------------------------------------------------------------------------------------------
class Scheduler(threading.Thread):
    def __init__(self, announce_channel: str, redis_instance: redis.Redis, thread_function: Callable = execute_tests):
        threading.Thread.__init__(self)
        self.thread_function = thread_function
        self.redis = redis_instance
        self.redis.delete(QUEUE, ANNOUNCE_CHANNEL, CURRENTLY_TESTING)
        self.pubsub = self.redis.pubsub()
        self.pubsub.psubscribe(announce_channel)
        log(log_file_global, "SCHEDULER", "Initialized")

    def run(self):
        for m in self.pubsub.listen():
            while True:
                # Check if the queue is empty
                if not self.redis.lrange(QUEUE, 0, 0):
                    break
                slot = request_thread_slot()
                assert slot is not None
                item = self.redis.lpop(QUEUE)
                if item:
                    unique_id = item.decode('utf-8')
                    self.redis.lrem(QUEUE, 0, unique_id)
                    log(log_file_global, f"THREAD-{slot}", f"handling request for {unique_id}")
                    self.redis.sadd(CURRENTLY_EXECUTING, unique_id)
                    Worker(unique_id, slot, self.redis, self.thread_function).start()
                else:
                    free_thread_slot(slot)
                    break


# ----------------------------------------------------------------------------------------------------------------------
class Worker(threading.Thread):
    def __init__(self, unique_id: str, slot: int, redis: redis.Redis, func: Callable):
        threading.Thread.__init__(self)
        self.unique_id = unique_id
        self.slot = slot
        self.redis = redis
        self.func = func

    def run(self):
        assert self.slot is not None
        self.func(self.slot)
        self.redis.srem(CURRENTLY_EXECUTING, self.unique_id)
        free_thread_slot(self.slot)


# ----------------------------------------------------------------------------------------------------------------------
if __name__ == "__main__":
    try:
        client = Scheduler(ANNOUNCE_CHANNEL, redis.Redis(host='localhost', port=6379, db=1))
        client.start()
        client.join()
    except KeyboardInterrupt:
        print('\033[31;1;4mInterrupted\033[0m')
        log(log_file_global, "SCHEDULER", "Received Keyboard Interrupt\n"
                                          "--------------------------------------------------------")
        log_file_global.close()
        os._exit(0)