2
3:- module(queue,
4 [
5 queue_engine/1,
6 init_queue/2,
7 release_queue/1,
8 write_script_file/4,
9 write_script_file/7,
10 run_execs_in_queue/4,
11 flush_queue_recursive/2
12 ]). 13
14:- use_module(library(readutil)). 15
16:- use_module(library(biomake/biomake)). 17:- use_module(library(biomake/utils)). 18
19:- discontiguous queue_engine/1. 20:- discontiguous init_queue/2. 21:- discontiguous release_queue/1. 22:- discontiguous run_execs_in_queue/4. 23:- discontiguous default_qsub_exec/2. 24:- discontiguous default_qdel_exec/2. 25:- discontiguous qsub_output_arg/3. 26:- discontiguous qsub_error_arg/3. 27:- discontiguous qsub_dep_arg/3. 28:- discontiguous qsub_dep_arg_prefix/2. 29:- discontiguous qsub_dep_prefix/2. 30:- discontiguous qsub_dep_separator/2. 31:- discontiguous qsub_extra_args/2. 32:- discontiguous qsub_script_headers/4. 33:- discontiguous qsub_job_id/3. 34
38
42
43run_execs_with_qsub(Engine,Rule,SL,Opts) :-
44 rule_target(Rule,T,Opts),
45 rule_dependencies(Rule,DL,Opts),
46 qsub_kill(Engine,T,SL,Opts),
47 (get_opt(qsub_exec,QsubExec,Opts); default_qsub_exec(Engine,QsubExec)),
48 (get_opt(qsub_args,QsubArgs,Opts); QsubArgs = ""),
49 (get_opt(queue_args,QArgs,Opts); QArgs = ""),
50 (get_opt(qsub_header,QsubHeader,Opts); QsubHeader = ""),
51 (get_opt(qsub_header_file,QsubHeaderFile,Opts)
52 -> file_contents(QsubHeaderFile,QsubHeaderFileContents)
53 ; QsubHeaderFileContents = ""),
54 qsub_extra_args(Engine,ExtraArgs),
55 bindvar_rule('QsubArgs',Rule,Opts,RuleQsubArgs),
56 bindvar_rule('QsubHeader',Rule,Opts,RuleQsubHeader),
57 bindvar_rule('QsubHeaderFile',Rule,Opts,RuleQsubHeaderFile),
58 file_contents(RuleQsubHeaderFile,RuleQsubHeaderFileContents),
59 biomake_private_filename_dir_exists(T,[Engine,"job"],JobFilename),
60 format(string(RemoveJobFile),"rm ~w",[JobFilename]),
61 qsub_rule_execs(Rule,Es,Opts),
62 qsub_job_ids(Engine,DL,DepJobs),
63 qsub_script_headers(Engine,DepJobs,Opts,Headers),
64 append(Headers,["","# Main script"],HeadersWithComment),
65 qsub_dep_arg(Engine,DepJobs,DepArg),
66 biomake_private_filename_dir_exists(T,[Engine,"out"],OutPath),
67 qsub_output_arg(Engine,OutPath,OutArg),
68 biomake_private_filename_dir_exists(T,[Engine,"err"],ErrPath),
69 qsub_error_arg(Engine,ErrPath,ErrArg),
70 !,
71 write_script_file(T,
72 ["# Header from QsubHeader variable",
73 RuleQsubHeader,
74 "# Header from --qsub-header",
75 QsubHeader,
76 "# Contents of header file specified by QsubHeaderFile variable",
77 RuleQsubHeaderFileContents,
78 "# Contents of header file specified by --qsub-header-file",
79 QsubHeaderFileContents,
80 "# Generic headers" | HeadersWithComment],
81 Es,
82 [RemoveJobFile],
83 Opts,
84 [Engine],
85 ScriptFilename),
86 format(string(QsubCmd),"~w ~w ~w ~w ~w ~w ~w ~w ~w >~w",[QsubExec,OutArg,ErrArg,QArgs,QsubArgs,RuleQsubArgs,DepArg,ExtraArgs,ScriptFilename,JobFilename]),
87 report("Submitting job: ~w",[QsubCmd],Opts),
88 shell(QsubCmd).
89
90qsub_rule_execs(Rule,[Chdir,Biomake],Opts) :-
91 qsub_use_biomake(Opts),
92 !,
93 rule_target(Rule,T,Opts),
94 get_opt(biomake_cwd,Dir,Opts),
95 get_opt(biomake_prog,Prog,Opts),
96 get_opt(biomake_args,Args,Opts),
97 get_opt(qsub_biomake_args,QArgs,Opts),
98 format(string(Chdir),"cd ~w",[Dir]), 99 format(string(Biomake),"~w ~w ~w ~w",[Prog,Args,QArgs,T]).
100
101qsub_rule_execs(Rule,Es,Opts) :-
102 rule_execs(Rule,Es,Opts).
103
104file_contents(Filename,String) :-
105 Filename \= '',
106 exists_file(Filename),
107 read_file_to_codes(Filename,Codes,[]),
108 string_codes(String,Codes),
109 !.
110file_contents(_,"").
111
112qsub_use_biomake(Opts) :-
113 get_opt(qsub_use_biomake,true,Opts).
114
115qsub_job_ids(Engine,[D|Ds],[N|Ns]) :-
116 qsub_job_id(Engine,D,N),
117 !,
118 qsub_job_ids(Engine,Ds,Ns).
119qsub_job_ids(Engine,[_|Ds],Ns) :-
120 !,
121 qsub_job_ids(Engine,Ds,Ns).
122qsub_job_ids(_,[],[]).
123
124qsub_numeric_job_id(Engine,T,Id) :-
125 biomake_private_filename(T,[Engine,"job"],JobFilename),
126 exists_file(JobFilename),
127 phrase_from_file(first_int(Id),JobFilename).
128
129first_int(N) --> before_first_int(Cs), !, {number_codes(N,Cs)}.
130before_first_int([C|Cs]) --> parse_num_code(C), !, first_int_codes(Cs).
131before_first_int(Cs) --> [_], before_first_int(Cs).
132first_int_codes([C|Cs]) --> parse_num_code(C), !, first_int_codes(Cs).
133first_int_codes([]) --> [_], first_int_codes([]).
134first_int_codes([]) --> [].
135
136qsub_make_dep_arg(_,[],"").
137qsub_make_dep_arg(Engine,DepJobs,DepArg) :-
138 qsub_dep_arg_prefix(Engine,DepArgPrefix),
139 qsub_dep_prefix(Engine,DepPrefix),
140 qsub_dep_separator(Engine,DepSep),
141 string_concat(DepSep,DepPrefix,SepPrefix),
142 atomic_list_concat(DepJobs,SepPrefix,DepJobStr),
143 format(string(DepArg),"~w~w~w",[DepArgPrefix,DepPrefix,DepJobStr]).
144
148
149qsub_kill(Engine,T,SL,Opts) :-
150 qsub_job_id(Engine,T,Id),
151 (get_opt(qdel_exec,QdelExec,Opts); default_qdel_exec(Engine,QdelExec)),
152 (get_opt(qdel_args,QdelArgs,Opts); QdelArgs = ""),
153 (get_opt(queue_args,QArgs,Opts); QArgs = ""),
154 format(string(QdelCmd),"~w ~w ~w ~w",[QdelExec,QArgs,QdelArgs,Id]),
155 verbose_report("Killing previous job: ~w",[QdelCmd],SL,Opts),
156 (shell(QdelCmd); true),
157 biomake_private_filename(T,[Engine,"job"],JobFilename),
158 (exists_file(JobFilename) -> delete_file(JobFilename); true).
159qsub_kill(_,_,_,_).
160
161flush_queue_recursive(Dir,Opts) :-
162 get_opt(queue,Engine,Opts),
163 absolute_file_name(Dir,AbsDir), 164 flush_queue_recursive(Engine,AbsDir,[],Opts).
165
166flush_queue_recursive(_,X,_,_) :-
167 atom_chars(X,['.'|_]),
168 !.
169flush_queue_recursive(Engine,Dir,SL,Opts) :-
170 exists_directory(Dir),
171 !,
172 verbose_report("Scanning ~w",[Dir],SL,Opts),
173 directory_files(Dir,Files),
174 forall(member(File,Files),
175 flush_queue_recursive(Engine,File,[Dir|SL],Opts)).
176flush_queue_recursive(Engine,File,SL,Opts) :-
177 qsub_kill(Engine,File,SL,Opts),
178 !.
179flush_queue_recursive(_,_,_,_).
180
184
185write_script_file(T,Es,Opts,ScriptFilename) :-
186 write_script_file(T,[],Es,[],Opts,[],ScriptFilename).
187
188write_script_file(T,Headers,Es,Cleanup,Opts,Subdirs,ScriptFilename) :-
189 (get_opt(oneshell,true,Opts)
190 -> maplist(echo_wrap,Es,ShellExecs)
191 ; maplist(shell_echo_wrap,Es,ShellExecs)),
192 maplist(shell_wrap,Cleanup,ShellCleanup),
193 append(ShellExecs,ShellCleanup,ExecsWithCleanup),
194 write_script_file_contents(T,Headers,ExecsWithCleanup,Opts,Subdirs,ScriptFilename).
195
196write_script_file_contents(T,Headers,Execs,Opts,Subdirs,ScriptFilename) :-
197 working_directory(CWD,CWD),
198 open_script_file(T,Subdirs,ScriptFilename,IO),
199 shell_path(Sh),
200 wrap_shell_execs(T,Execs,Opts,Subdirs,ExecStr),
201 concat_string_list(Headers,HeaderStr,"\n"),
202 format(IO,"#!~w~n~w~ncd ~w~n~w~n",[Sh,HeaderStr,CWD,ExecStr]),
203 close(IO),
204 format(string(Chmod),"chmod +x ~w",[ScriptFilename]),
205 shell(Chmod).
206
207wrap_shell_execs(T,Execs,_Opts,Subdirs,ShellFilename) :-
208 shell_var_specified(Sh),
209 !,
210 open_shell_file(T,Subdirs,ShellFilename,IO),
211 concat_string_list(Execs,ExecStr,"\n"),
212 format(IO,"#!~w~n~w~n",[Sh,ExecStr]),
213 close(IO),
214 format(string(Chmod),"chmod +x ~w",[ShellFilename]),
215 shell(Chmod).
216
217wrap_shell_execs(_T,Execs,_Opts,_Subdirs,ExecStr) :-
218 !,
219 join_with_ands(Execs,ExecStr).
220
221join_with_ands(List,Str) :-
222 !,
223 concat_string_list(List,Str," &&\n").
224
225open_script_file(Target,Subdirs,Filename,Stream) :-
226 append(Subdirs,["script"],SubdirsScript),
227 open_biomake_private_file(Target,SubdirsScript,Filename,Stream).
228
229open_shell_file(Target,Subdirs,Filename,Stream) :-
230 append(Subdirs,["shell"],SubdirsShell),
231 open_biomake_private_file(Target,SubdirsShell,Filename,Stream).
232
236
237queue_engine(none).
238init_queue(none,_).
239release_queue(none).
240
241run_execs_in_queue(none,Rule,SL,Opts) :-
242 run_execs_now(Rule,SL,Opts).
243
247
248queue_engine(test).
249init_queue(test,_).
250release_queue(test).
251
252run_execs_in_queue(test,Rule,SL,Opts) :-
253 rule_target(Rule,T,Opts),
254 rule_dependencies(Rule,DL,Opts),
255 qsub_rule_execs(Rule,Es,Opts),
256 write_script_file(T,Es,Opts,Script),
257 report_run_exec(Script,T,SL,Opts),
258 update_hash(T,DL,Opts).
259
263
264queue_engine(sge).
265init_queue(sge,_).
266release_queue(sge).
267
268run_execs_in_queue(sge,Rule,SL,Opts) :-
269 run_execs_with_qsub(sge,Rule,SL,Opts).
270
271default_qsub_exec(sge,"qsub").
272default_qdel_exec(sge,"qdel").
273qsub_output_arg(sge,F,S) :- format(string(S),"-o ~w",[F]).
274qsub_error_arg(sge,F,S) :- format(string(S),"-e ~w",[F]).
275qsub_dep_arg_prefix(sge,"-hold_jid ").
276qsub_dep_prefix(sge,"").
277qsub_dep_separator(sge,",").
278qsub_dep_arg(sge,DepJobs,Arg) :- qsub_make_dep_arg(sge,DepJobs,Arg).
(sge,"").
(sge,_,_,[]).
281qsub_job_id(sge,T,N) :- qsub_numeric_job_id(sge,T,N).
282
286
287queue_engine(pbs).
288init_queue(pbs,_).
289release_queue(pbs).
290
291run_execs_in_queue(pbs,Rule,SL,Opts) :-
292 run_execs_with_qsub(pbs,Rule,SL,Opts).
293
294default_qsub_exec(pbs,"qsub").
295default_qdel_exec(pbs,"qdel").
296qsub_output_arg(pbs,F,S) :- format(string(S),"-o ~w",[F]).
297qsub_error_arg(pbs,F,S) :- format(string(S),"-e ~w",[F]).
298qsub_dep_arg_prefix(pbs,"-W depend=").
299qsub_dep_prefix(pbs,"afterok:").
300qsub_dep_separator(pbs,",").
301qsub_dep_arg(pbs,DepJobs,Arg) :- qsub_make_dep_arg(pbs,DepJobs,Arg).
302qsub_extra_args(pbs,"").
303qsub_script_headers(pbs,_,_,[]).
304qsub_job_id(pbs,T,N) :- qsub_numeric_job_id(pbs,T,N).
305
309
310queue_engine(slurm).
311init_queue(slurm,_).
312release_queue(slurm).
313
314run_execs_in_queue(slurm,Rule,SL,Opts) :-
315 run_execs_with_qsub(slurm,Rule,SL,Opts).
316
317default_qsub_exec(slurm,"sbatch").
318default_qdel_exec(slurm,"scancel").
319qsub_output_arg(slurm,F,S) :- format(string(S),"-o ~w",[F]).
320qsub_error_arg(slurm,F,S) :- format(string(S),"-e ~w",[F]).
321qsub_dep_arg_prefix(slurm,"--dependency=").
322qsub_dep_prefix(slurm,"afterok:").
323qsub_dep_separator(slurm,",").
324qsub_dep_arg(slurm,DepJobs,Arg) :- qsub_make_dep_arg(slurm,DepJobs,Arg).
325qsub_extra_args(slurm,"--parsable").
326qsub_script_headers(slurm,_,_,[]).
327qsub_job_id(slurm,T,N) :- qsub_numeric_job_id(slurm,T,N).
328
332
333:- dynamic poolq_scheduler/1. 334
335default_poolq_threads(4).
336
337queue_engine(poolq).
338init_queue(poolq,Opts) :-
339 ensure_loaded(library(poolq/poolq)),
340 (get_opt(poolq_threads,Size,Opts) ; default_poolq_threads(Size)),
341 poolq_create(Scheduler,Size,[]),
342 assert(poolq_scheduler(Scheduler)).
343release_queue(poolq) :-
344 poolq_scheduler(Scheduler),
345 poolq_wait(Scheduler,_Status).
346
347run_execs_in_queue(poolq,Rule,SL,Opts) :-
348 poolq_scheduler(Scheduler),
349 rule_target(Rule,T,Opts),
350 rule_dependencies(Rule,DL,Opts),
351 include(not_always_make_or_queue,Opts,Opts2),
352 poolq_submit_job(Scheduler,build(T,SL,[no_deps(true)|Opts2]),T,DL,[]).
353
354not_always_make_or_queue(always_make(true)) :- !, fail.
355not_always_make_or_queue(queue(_)) :- !, fail.
356not_always_make_or_queue(_)