2
4
5:- module(poolq,
6 [
7 poolq_create/3,
8 poolq_submit_job/5,
9 poolq_wait/2
10 ]). 11
12:- dynamic job_waiting/4. 13:- dynamic job_running/2. 14:- dynamic job_complete/2. 15
16:- use_module(library(thread_pool)). 17
18poolq_create(Scheduler,Size,Options) :-
19 thread_create(init_scheduler(Size,Options),Scheduler,[]),
20 debug(poolq,"Started scheduling thread ~w",[Scheduler]).
21
22poolq_submit_job(Scheduler,Goal,JobId,JobDepIds,Options) :-
23 thread_send_message(Scheduler,submit(Goal,JobId,JobDepIds,Options)),
24 debug(poolq,"Sent message 'submit(~w,~w <-- ~w,~w)' to scheduling thread ~w",[Goal,JobId,JobDepIds,Options,Scheduler]).
25
26poolq_wait(Scheduler,Status) :-
27 thread_send_message(Scheduler,finish),
28 debug(poolq,"Sent message 'finish' to scheduling thread ~w",[Scheduler]),
29 thread_join(Scheduler,Status),
30 debug(poolq,"Scheduling thread ~w terminated",[Scheduler]).
31
32init_scheduler(Size,Options) :-
33 debug(poolq,"Scheduler: initializing",[]),
34 thread_self(Pool),
35 thread_pool_create(Pool,Size,Options),
36 debug(poolq,"Scheduler: created thread pool ~w with ~w threads, options ~w",[Pool,Size,Options]),
37 wait_for_message.
38
39wait_for_message :-
40 receive_message(Msg),
41 process_message(Msg).
42
43receive_message(Msg) :-
44 debug(poolq,"Scheduler: waiting for message",[]),
45 thread_get_message(Msg),
46 debug(poolq,"Scheduler: received message '~w'",[Msg]).
47
48process_message(Msg) :-
49 process_submit_message(Msg),
50 !,
51 wait_for_message.
52
53process_message(Msg) :-
54 process_complete_message(Msg),
55 !,
56 wait_for_message.
57
58process_message(finish) :-
59 !,
60 finish_queued_jobs.
61
62process_message(Msg) :-
63 process_error(Msg),
64 !,
65 wait_for_message.
66
67process_submit_message(submit(Goal,JobId,DepJobIds,Options)) :-
68 none_waiting_or_running(DepJobIds),
69 !,
70 debug(poolq,"Scheduler: job ~w has no unmet dependencies, starting immediately",[JobId]),
71 start_job(JobId,Goal,Options).
72
73process_submit_message(submit(Goal,JobId,DepJobIds,Options)) :-
74 !,
75 debug(poolq,"Scheduler: job ~w has dependencies ~w; postponing",[JobId,DepJobIds]),
76 assert(job_waiting(JobId,DepJobIds,Goal,Options)).
77
78process_complete_message(complete(JobId,JobStatus)) :-
79 job_running(JobId,Thread),
80 !,
81 debug(poolq,"Scheduler: job ~w on thread ~w finished with status ~w",[JobId,Thread,JobStatus]),
82 retract(job_running(JobId,Thread)),
83 assert(job_complete(JobId,JobStatus)),
84 thread_join(Thread,ThreadStatus),
85 debug(poolq,"Scheduler: job thread ~w terminated with status ~w",[Thread,ThreadStatus]),
86 start_queued_jobs.
87
88process_error(Msg) :-
89 format("Error: couldn't process message '~w'~n",[Msg]).
90
91finish_queued_jobs :-
92 start_queued_jobs,
93 wait_for_queue,
94 thread_self(Pool),
95 thread_pool_destroy(Pool),
96 debug(poolq,"Scheduler: destroyed thread pool",[]).
97
98wait_for_queue :-
99 job_running(_,_),
100 !,
101 receive_message(Msg),
102 (process_complete_message(Msg) ; process_error(Msg)),
103 wait_for_queue.
104wait_for_queue :-
105 job_waiting(_,_,_,_),
106 !,
107 bagof(JobId,job_waiting(JobId,_,_,_),AbandonedJobs),
108 format("Warning: unprocessed jobs ~w~n",[AbandonedJobs]),
109 fail.
110wait_for_queue.
111
112start_queued_jobs :-
113 debug(poolq,"Scheduler: looking for postponed jobs",[]),
114 start_queued_job,
115 !,
116 start_queued_jobs.
117start_queued_jobs :-
118 \+ job_waiting(_,_,_,_),
119 !,
120 debug(poolq,"Scheduler: no jobs waiting",[]).
121start_queued_jobs :-
122 debug(poolq,"Scheduler: no jobs ready to run",[]).
123
124start_queued_job :-
125 job_waiting(JobId,DepJobIds,Goal,Options),
126 none_waiting_or_running(DepJobIds),
127 retract(job_waiting(JobId,DepJobIds,Goal,Options)),
128 start_job(JobId,Goal,Options).
129
130start_job(JobId,Goal,Options) :-
131 debug(poolq,"Scheduler: starting job ~w: ~w",[Goal,Options]),
132 thread_self(Self),
133 thread_create_in_pool(Self,run_job(Self,JobId,Goal),Thread,Options),
134 assert(job_running(JobId,Thread)).
135
136run_job(Scheduler,JobId,Goal) :-
137 job_status(Goal,Status),
138 thread_send_message(Scheduler,complete(JobId,Status)),
139 debug(poolq,"Job ~w: sent message 'complete(~w,~w)' to scheduling thread ~w",[JobId,JobId,Status,Scheduler]).
140
141job_status(Goal,true) :- call(Goal), !.
142job_status(_,false).
143
144none_waiting_or_running(JobIds) :-
145 forall(member(JobId,JobIds),
146 \+ (job_waiting(JobId,_,_,_)
147 ; job_running(JobId,_)))