View source with formatted comments or as raw
    1:- encoding(utf8).
    2/*  Part of SWI-Prolog
    3
    4    Author:        Torbjörn Lager and Jan Wielemaker
    5    E-mail:        J.Wielemaker@vu.nl
    6    WWW:           http://www.swi-prolog.org
    7    Copyright (C): 2014-2024, Torbjörn Lager,
    8                              VU University Amsterdam
    9                              SWI-Prolog Solutions b.v.
   10    All rights reserved.
   11
   12    Redistribution and use in source and binary forms, with or without
   13    modification, are permitted provided that the following conditions
   14    are met:
   15
   16    1. Redistributions of source code must retain the above copyright
   17       notice, this list of conditions and the following disclaimer.
   18
   19    2. Redistributions in binary form must reproduce the above copyright
   20       notice, this list of conditions and the following disclaimer in
   21       the documentation and/or other materials provided with the
   22       distribution.
   23
   24    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   25    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   26    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   27    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   28    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   29    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   30    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   31    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   32    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   33    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   34    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   35    POSSIBILITY OF SUCH DAMAGE.
   36*/
   37
   38:- module(pengines,
   39          [ pengine_create/1,                   % +Options
   40            pengine_ask/3,                      % +Pengine, :Query, +Options
   41            pengine_next/2,                     % +Pengine. +Options
   42            pengine_stop/2,                     % +Pengine. +Options
   43            pengine_event/2,                    % -Event, +Options
   44            pengine_input/2,                    % +Prompt, -Term
   45            pengine_output/1,                   % +Term
   46            pengine_respond/3,                  % +Pengine, +Input, +Options
   47            pengine_debug/2,                    % +Format, +Args
   48            pengine_self/1,                     % -Pengine
   49            pengine_pull_response/2,            % +Pengine, +Options
   50            pengine_destroy/1,                  % +Pengine
   51            pengine_destroy/2,                  % +Pengine, +Options
   52            pengine_abort/1,                    % +Pengine
   53            pengine_application/1,              % +Application
   54            current_pengine_application/1,      % ?Application
   55            pengine_property/2,                 % ?Pengine, ?Property
   56            pengine_user/1,                     % -User
   57            pengine_event_loop/2,               % :Closure, +Options
   58            pengine_rpc/2,                      % +Server, :Goal
   59            pengine_rpc/3                       % +Server, :Goal, +Options
   60          ]).   61
   62/** <module> Pengines: Web Logic Programming Made Easy
   63
   64The library(pengines) provides an  infrastructure   for  creating Prolog
   65engines in a (remote) pengine server  and accessing these engines either
   66from Prolog or JavaScript.
   67
   68@author Torbjörn Lager and Jan Wielemaker
   69*/
   70
   71:- autoload(library(aggregate),[aggregate_all/3]).   72:- autoload(library(apply),[maplist/2,partition/4,exclude/3,maplist/3]).   73:- autoload(library(broadcast),[broadcast/1]).   74:- autoload(library(charsio),[open_chars_stream/2]).   75:- use_module(library(debug),[debug/1,debugging/1,debug/3,assertion/1]).   76:- autoload(library(error),
   77	    [ must_be/2,
   78	      existence_error/2,
   79	      permission_error/3,
   80	      domain_error/2
   81	    ]).   82:- autoload(library(filesex),[directory_file_path/3]).   83:- autoload(library(listing),[listing/1]).   84:- autoload(library(lists),[member/2,flatten/2,select/3,append/3]).   85:- autoload(library(modules),[in_temporary_module/3]).   86:- autoload(library(occurs),[sub_term/2]).   87:- autoload(library(option),
   88	    [select_option/3,option/2,option/3,select_option/4]).   89:- autoload(library(prolog_stack),[print_prolog_backtrace/2]).   90:- autoload(library(sandbox),[safe_goal/1]).   91:- autoload(library(statistics),[thread_statistics/2]).   92:- autoload(library(term_to_json),[term_to_json/2]).   93:- autoload(library(thread_pool),
   94	    [thread_pool_create/3,thread_create_in_pool/4]).   95:- autoload(library(time),[alarm/4,call_with_time_limit/2]).   96:- autoload(library(uri),
   97	    [ uri_components/2,
   98	      uri_query_components/2,
   99	      uri_data/3,
  100	      uri_data/4,
  101	      uri_encoded/3
  102	    ]).  103:- autoload(library(http/http_client),[http_read_data/3]).  104:- autoload(library(http/http_cors),[cors_enable/0,cors_enable/2]).  105:- autoload(library(http/http_dispatch),
  106	    [http_handler/3,http_404/2,http_reply_file/3]).  107:- autoload(library(http/http_open),[http_open/3]).  108:- autoload(library(http/http_parameters),[http_parameters/2]).  109:- autoload(library(http/http_stream),[is_cgi_stream/1]).  110:- autoload(library(http/http_wrapper),[http_peer/2]).  111
  112:- use_module(library(settings),[setting/2,setting/4]).  113:- use_module(library(http/http_json),
  114              [http_read_json_dict/2,reply_json_dict/1]).  115
  116:- if(exists_source(library(uuid))).  117:- autoload(library(uuid), [uuid/2]).  118:- endif.  119
  120
  121:- meta_predicate
  122    pengine_create(:),
  123    pengine_rpc(+, +, :),
  124    pengine_event_loop(1, +).  125
  126:- multifile
  127    write_result/3,                 % +Format, +Event, +Dict
  128    event_to_json/3,                % +Event, -JSON, +Format
  129    prepare_module/3,               % +Module, +Application, +Options
  130    prepare_goal/3,                 % +GoalIn, -GoalOut, +Options
  131    authentication_hook/3,          % +Request, +Application, -User
  132    not_sandboxed/2,                % +User, +App
  133    pengine_flush_output_hook/0.  134
  135:- predicate_options(pengine_create/1, 1,
  136                     [ id(-atom),
  137                       alias(atom),
  138                       application(atom),
  139                       destroy(boolean),
  140                       server(atom),
  141                       ask(compound),
  142                       template(compound),
  143                       chunk(integer;oneof([false])),
  144                       bindings(list),
  145                       src_list(list),
  146                       src_text(any),           % text
  147                       src_url(atom),
  148                       src_predicates(list)
  149                     ]).  150:- predicate_options(pengine_ask/3, 3,
  151                     [ template(any),
  152                       chunk(integer;oneof([false])),
  153                       bindings(list)
  154                     ]).  155:- predicate_options(pengine_next/2, 2,
  156                     [ chunk(integer),
  157                       pass_to(pengine_send/3, 3)
  158                     ]).  159:- predicate_options(pengine_stop/2, 2,
  160                     [ pass_to(pengine_send/3, 3)
  161                     ]).  162:- predicate_options(pengine_respond/3, 2,
  163                     [ pass_to(pengine_send/3, 3)
  164                     ]).  165:- predicate_options(pengine_rpc/3, 3,
  166                     [ chunk(integer;oneof([false])),
  167                       pass_to(pengine_create/1, 1)
  168                     ]).  169:- predicate_options(pengine_send/3, 3,
  170                     [ delay(number)
  171                     ]).  172:- predicate_options(pengine_event/2, 2,
  173                     [ listen(atom),
  174                       pass_to(system:thread_get_message/3, 3)
  175                     ]).  176:- predicate_options(pengine_pull_response/2, 2,
  177                     [ pass_to(http_open/3, 3)
  178                     ]).  179:- predicate_options(pengine_event_loop/2, 2,
  180                     []).                       % not yet implemented
  181
  182% :- debug(pengine(transition)).
  183:- debug(pengine(debug)).               % handle pengine_debug in pengine_rpc/3.
  184
  185goal_expansion(random_delay, Expanded) :-
  186    (   debugging(pengine(delay))
  187    ->  Expanded = do_random_delay
  188    ;   Expanded = true
  189    ).
  190
  191do_random_delay :-
  192    Delay is random(20)/1000,
  193    sleep(Delay).
  194
  195:- meta_predicate                       % internal meta predicates
  196    solve(+, ?, 0, +),
  197    findnsols_no_empty(+, ?, 0, -),
  198    pengine_event_loop(+, 1, +).  199
  200/**  pengine_create(:Options) is det.
  201
  202    Creates a new pengine. Valid options are:
  203
  204    * id(-ID)
  205      ID gets instantiated to the id of the created pengine.  ID is
  206      atomic.
  207
  208    * alias(+Name)
  209      The pengine is named Name (an atom). A slave pengine (child) can
  210      subsequently be referred to by this name.
  211
  212    * application(+Application)
  213      Application in which the pengine runs.  See pengine_application/1.
  214
  215    * server(+URL)
  216      The pengine will run in (and in the Prolog context of) the pengine
  217      server located at URL.
  218
  219    * src_list(+List_of_clauses)
  220      Inject a list of Prolog clauses into the pengine.
  221
  222    * src_text(+Atom_or_string)
  223      Inject the clauses specified by a source text into the pengine.
  224
  225    * src_url(+URL)
  226      Inject the clauses specified in the file located at URL into the
  227      pengine.
  228
  229    * src_predicates(+List)
  230      Send the local predicates denoted by List to the remote pengine.
  231      List is a list of predicate indicators.
  232
  233Remaining  options  are  passed  to  http_open/3  (meaningful  only  for
  234non-local pengines) and thread_create/3. Note   that for thread_create/3
  235only options changing the stack-sizes can be used. In particular, do not
  236pass the detached or alias options..
  237
  238Successful creation of a pengine will return an _event term_ of the
  239following form:
  240
  241    * create(ID, Term)
  242      ID is the id of the pengine that was created.
  243      Term is not used at the moment.
  244
  245An error will be returned if the pengine could not be created:
  246
  247    * error(ID, Term)
  248      ID is invalid, since no pengine was created.
  249      Term is the exception's error term.
  250*/
  251
  252
  253pengine_create(M:Options0) :-
  254    translate_local_sources(Options0, Options, M),
  255    (   select_option(server(BaseURL), Options, RestOptions)
  256    ->  remote_pengine_create(BaseURL, RestOptions)
  257    ;   local_pengine_create(Options)
  258    ).
  259
  260%!  translate_local_sources(+OptionsIn, -Options, +Module) is det.
  261%
  262%   Translate  the  `src_predicates`  and  `src_list`  options  into
  263%   `src_text`. We need to do that   anyway for remote pengines. For
  264%   local pengines, we could avoid  this   step,  but  there is very
  265%   little point in transferring source to a local pengine anyway as
  266%   local pengines can access any  Prolog   predicate  that you make
  267%   visible to the application.
  268%
  269%   Multiple sources are concatenated  to  end   up  with  a  single
  270%   src_text option.
  271
  272translate_local_sources(OptionsIn, Options, Module) :-
  273    translate_local_sources(OptionsIn, Sources, Options2, Module),
  274    (   Sources == []
  275    ->  Options = Options2
  276    ;   Sources = [Source]
  277    ->  Options = [src_text(Source)|Options2]
  278    ;   atomics_to_string(Sources, Source)
  279    ->  Options = [src_text(Source)|Options2]
  280    ).
  281
  282translate_local_sources([], [], [], _).
  283translate_local_sources([H0|T], [S0|S], Options, M) :-
  284    nonvar(H0),
  285    translate_local_source(H0, S0, M),
  286    !,
  287    translate_local_sources(T, S, Options, M).
  288translate_local_sources([H|T0], S, [H|T], M) :-
  289    translate_local_sources(T0, S, T, M).
  290
  291translate_local_source(src_predicates(PIs), Source, M) :-
  292    must_be(list, PIs),
  293    with_output_to(string(Source),
  294                   maplist(list_in_module(M), PIs)).
  295translate_local_source(src_list(Terms), Source, _) :-
  296    must_be(list, Terms),
  297    with_output_to(string(Source),
  298                   forall(member(Term, Terms),
  299                          format('~k .~n', [Term]))).
  300translate_local_source(src_text(Source), Source, _).
  301
  302list_in_module(M, PI) :-
  303    listing(M:PI).
  304
  305/**  pengine_send(+NameOrID, +Term) is det
  306
  307Same as pengine_send(NameOrID, Term, []).
  308*/
  309
  310pengine_send(Target, Event) :-
  311    pengine_send(Target, Event, []).
  312
  313
  314/**  pengine_send(+NameOrID, +Term, +Options) is det
  315
  316Succeeds immediately and  places  Term  in   the  queue  of  the pengine
  317NameOrID. Options is a list of options:
  318
  319   * delay(+Time)
  320     The actual sending is delayed by Time seconds. Time is an integer
  321     or a float.
  322
  323Any remaining options are passed to http_open/3.
  324*/
  325
  326pengine_send(Target, Event, Options) :-
  327    must_be(atom, Target),
  328    pengine_send2(Target, Event, Options).
  329
  330pengine_send2(self, Event, Options) :-
  331    !,
  332    thread_self(Queue),
  333    delay_message(queue(Queue), Event, Options).
  334pengine_send2(Name, Event, Options) :-
  335    child(Name, Target),
  336    !,
  337    delay_message(pengine(Target), Event, Options).
  338pengine_send2(Target, Event, Options) :-
  339    delay_message(pengine(Target), Event, Options).
  340
  341delay_message(Target, Event, Options) :-
  342    option(delay(Delay), Options),
  343    !,
  344    alarm(Delay,
  345          send_message(Target, Event, Options),
  346          _AlarmID,
  347          [remove(true)]).
  348delay_message(Target, Event, Options) :-
  349    random_delay,
  350    send_message(Target, Event, Options).
  351
  352send_message(queue(Queue), Event, _) :-
  353    thread_send_message(Queue, pengine_request(Event)).
  354send_message(pengine(Pengine), Event, Options) :-
  355    (   pengine_remote(Pengine, Server)
  356    ->  remote_pengine_send(Server, Pengine, Event, Options)
  357    ;   pengine_thread(Pengine, Thread)
  358    ->  thread_send_message(Thread, pengine_request(Event))
  359    ;   existence_error(pengine, Pengine)
  360    ).
  361
  362%!  pengine_request(-Request) is det.
  363%
  364%   To be used by a pengine to wait  for the next request. Such messages
  365%   are placed in the  queue  by   pengine_send/2.  Keeps  the thread in
  366%   normal state if an event arrives within a second. Otherwise it waits
  367%   for the `idle_limit` setting while   using  thread_idle/2 to minimis
  368%   resources.
  369
  370pengine_request(Request) :-
  371    thread_self(Me),
  372    thread_get_message(Me, pengine_request(Request), [timeout(1)]),
  373    !.
  374pengine_request(Request) :-
  375    pengine_self(Self),
  376    get_pengine_application(Self, Application),
  377    setting(Application:idle_limit, IdleLimit0),
  378    IdleLimit is IdleLimit0-1,
  379    thread_self(Me),
  380    (   thread_idle(thread_get_message(Me, pengine_request(Request),
  381                                       [timeout(IdleLimit)]),
  382                    long)
  383    ->  true
  384    ;   Request = destroy
  385    ).
  386
  387
  388%!  pengine_reply(+Event) is det.
  389%!  pengine_reply(+Queue, +Event) is det.
  390%
  391%   Reply Event to the parent of the   current  Pengine or the given
  392%   Queue.  Such  events  are  read   by    the   other   side  with
  393%   pengine_event/1.
  394%
  395%   If the message cannot be sent within the `idle_limit` setting of
  396%   the pengine, abort the pengine.
  397
  398pengine_reply(Event) :-
  399    pengine_parent(Queue),
  400    pengine_reply(Queue, Event).
  401
  402pengine_reply(_Queue, _Event0) :-
  403    nb_current(pengine_idle_limit_exceeded, true),
  404    !.
  405pengine_reply(Queue, Event0) :-
  406    arg(1, Event0, ID),
  407    wrap_first_answer(ID, Event0, Event),
  408    random_delay,
  409    debug(pengine(event), 'Reply to ~p: ~p', [Queue, Event]),
  410    (   pengine_self(ID),
  411        \+ pengine_detached(ID, _)
  412    ->  get_pengine_application(ID, Application),
  413        setting(Application:idle_limit, IdleLimit),
  414        debug(pengine(reply), 'Sending ~p, timeout: ~q', [Event, IdleLimit]),
  415        (   thread_send_message(Queue, pengine_event(ID, Event),
  416                                [ timeout(IdleLimit)
  417                                ])
  418        ->  true
  419        ;   thread_self(Me),
  420            debug(pengine(reply), 'pengine_reply: timeout for ~q (thread ~q)',
  421                  [ID, Me]),
  422            nb_setval(pengine_idle_limit_exceeded, true),
  423            thread_detach(Me),
  424            abort
  425        )
  426    ;   thread_send_message(Queue, pengine_event(ID, Event))
  427    ).
  428
  429wrap_first_answer(ID, Event0, CreateEvent) :-
  430    wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)]),
  431    arg(1, CreateEvent, ID),
  432    !,
  433    retract(wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)])).
  434wrap_first_answer(_ID, Event, Event).
  435
  436
  437empty_queue :-
  438    pengine_parent(Queue),
  439    empty_queue(Queue, 0, Discarded),
  440    debug(pengine(abort), 'Abort: discarded ~D messages', [Discarded]).
  441
  442empty_queue(Queue, C0, C) :-
  443    thread_get_message(Queue, _Term, [timeout(0)]),
  444    !,
  445    C1 is C0+1,
  446    empty_queue(Queue, C1, C).
  447empty_queue(_, C, C).
  448
  449
  450/** pengine_ask(+NameOrID, @Query, +Options) is det
  451
  452Asks pengine NameOrID a query Query.
  453
  454Options is a list of options:
  455
  456    * template(+Template)
  457      Template is a variable (or a term containing variables) shared
  458      with the query. By default, the template is identical to the
  459      query.
  460
  461    * chunk(+IntegerOrFalse)
  462      Retrieve solutions in chunks of Integer rather than one by one. 1
  463      means no chunking (default). Other integers indicate the maximum
  464      number of solutions to retrieve in one chunk.  If `false`, the
  465      Pengine goal is not executed using findall/3 and friends and
  466      we do not backtrack immediately over the goal.  As a result,
  467      changes to backtrackable global state are retained.  This is
  468      similar that using set_prolog_flag(toplevel_mode, recursive).
  469
  470    * bindings(+Bindings)
  471      Sets the global variable '$variable_names' to a list of
  472      `Name = Var` terms, providing access to the actual variable
  473      names.
  474
  475Any remaining options are passed to pengine_send/3.
  476
  477Note that the predicate pengine_ask/3 is deterministic, even for queries
  478that have more than one solution. Also,  the variables in Query will not
  479be bound. Instead, results will  be  returned   in  the  form  of _event
  480terms_.
  481
  482    * success(ID, Terms, Projection, Time, More)
  483      ID is the id of the pengine that succeeded in solving the query.
  484      Terms is a list holding instantiations of `Template`.  Projection
  485      is a list of variable names that should be displayed. Time is
  486      the CPU time used to produce the results and finally, More
  487      is either `true` or `false`, indicating whether we can expect the
  488      pengine to be able to return more solutions or not, would we call
  489      pengine_next/2.
  490
  491    * failure(ID)
  492      ID is the id of the pengine that failed for lack of a solutions.
  493
  494    * error(ID, Term)
  495      ID is the id of the pengine throwing the exception.
  496      Term is the exception's error term.
  497
  498    * output(ID, Term)
  499      ID is the id of a pengine running the query that called
  500      pengine_output/1. Term is the term that was passed in the first
  501      argument of pengine_output/1 when it was called.
  502
  503    * prompt(ID, Term)
  504      ID is the id of the pengine that called pengine_input/2 and Term is
  505      the prompt.
  506
  507Defined in terms of pengine_send/3, like so:
  508
  509==
  510pengine_ask(ID, Query, Options) :-
  511    partition(pengine_ask_option, Options, AskOptions, SendOptions),
  512    pengine_send(ID, ask(Query, AskOptions), SendOptions).
  513==
  514*/
  515
  516pengine_ask(ID, Query, Options) :-
  517    partition(pengine_ask_option, Options, AskOptions, SendOptions),
  518    pengine_send(ID, ask(Query, AskOptions), SendOptions).
  519
  520
  521pengine_ask_option(template(_)).
  522pengine_ask_option(chunk(_)).
  523pengine_ask_option(bindings(_)).
  524pengine_ask_option(breakpoints(_)).
  525
  526
  527/** pengine_next(+NameOrID, +Options) is det
  528
  529Asks pengine NameOrID for the  next  solution   to  a  query  started by
  530pengine_ask/3. Defined options are:
  531
  532    * chunk(+Count)
  533    Modify the chunk-size to Count before asking the next set of
  534    solutions.  This may not be used if the goal was started with
  535    chunk(false).
  536
  537Remaining  options  are  passed  to    pengine_send/3.   The  result  of
  538re-executing the current goal is returned  to the caller's message queue
  539in the form of _event terms_.
  540
  541    * success(ID, Terms, Projection, Time, More)
  542      See pengine_ask/3.
  543
  544    * failure(ID)
  545      ID is the id of the pengine that failed for lack of more solutions.
  546
  547    * error(ID, Term)
  548      ID is the id of the pengine throwing the exception.
  549      Term is the exception's error term.
  550
  551    * output(ID, Term)
  552      ID is the id of a pengine running the query that called
  553      pengine_output/1. Term is the term that was passed in the first
  554      argument of pengine_output/1 when it was called.
  555
  556    * prompt(ID, Term)
  557      ID is the id of the pengine that called pengine_input/2 and Term
  558      is the prompt.
  559
  560Defined in terms of pengine_send/3, as follows:
  561
  562==
  563pengine_next(ID, Options) :-
  564    pengine_send(ID, next, Options).
  565==
  566
  567*/
  568
  569pengine_next(ID, Options) :-
  570    select_option(chunk(Count), Options, Options1),
  571    !,
  572    pengine_send(ID, next(Count), Options1).
  573pengine_next(ID, Options) :-
  574    pengine_send(ID, next, Options).
  575
  576
  577/** pengine_stop(+NameOrID, +Options) is det
  578
  579Tells pengine NameOrID to stop looking  for   more  solutions to a query
  580started by pengine_ask/3. Options are passed to pengine_send/3.
  581
  582Defined in terms of pengine_send/3, like so:
  583
  584==
  585pengine_stop(ID, Options) :-
  586    pengine_send(ID, stop, Options).
  587==
  588*/
  589
  590pengine_stop(ID, Options) :- pengine_send(ID, stop, Options).
  591
  592
  593/** pengine_abort(+NameOrID) is det
  594
  595Aborts the running query. The pengine goes   back  to state `2', waiting
  596for new queries.
  597
  598@see pengine_destroy/1.
  599*/
  600
  601pengine_abort(Name) :-
  602    (   child(Name, Pengine)
  603    ->  true
  604    ;   Pengine = Name
  605    ),
  606    (   pengine_remote(Pengine, Server)
  607    ->  remote_pengine_abort(Server, Pengine, [])
  608    ;   pengine_thread(Pengine, Thread),
  609        debug(pengine(abort), 'Signalling thread ~p', [Thread]),
  610        catch(thread_signal(Thread, throw(abort_query)), _, true)
  611    ).
  612
  613
  614/** pengine_destroy(+NameOrID) is det.
  615    pengine_destroy(+NameOrID, +Options) is det.
  616
  617Destroys the pengine NameOrID.  With the option force(true), the pengine
  618is killed using abort/0 and pengine_destroy/2 succeeds.
  619*/
  620
  621pengine_destroy(ID) :-
  622    pengine_destroy(ID, []).
  623
  624pengine_destroy(Name, Options) :-
  625    (   child(Name, ID)
  626    ->  true
  627    ;   ID = Name
  628    ),
  629    option(force(true), Options),
  630    !,
  631    (   pengine_thread(ID, Thread)
  632    ->  catch(thread_signal(Thread, abort),
  633              error(existence_error(thread, _), _), true)
  634    ;   true
  635    ).
  636pengine_destroy(ID, _) :-
  637    catch(pengine_send(ID, destroy),
  638          error(existence_error(pengine, ID), _),
  639          retractall(child(_,ID))).
  640
  641
  642/*================= pengines administration =======================
  643*/
  644
  645%!  current_pengine(?Id, ?Parent, ?Location)
  646%
  647%   Dynamic predicate that registers our known pengines.  Id is
  648%   an atomic unique datatype.  Parent is the id of our parent
  649%   pengine.  Location is one of
  650%
  651%     - thread(ThreadId)
  652%     - remote(URL)
  653
  654:- dynamic
  655    current_pengine/6,              % Id, ParentId, Thread, URL, App, Destroy
  656    pengine_queue/4,                % Id, Queue, TimeOut, Time
  657    output_queue/3,                 % Id, Queue, Time
  658    pengine_user/2,                 % Id, User
  659    pengine_data/2,                 % Id, Data
  660    pengine_detached/2.             % Id, Data
  661:- volatile
  662    current_pengine/6,
  663    pengine_queue/4,
  664    output_queue/3,
  665    pengine_user/2,
  666    pengine_data/2,
  667    pengine_detached/2.  668
  669:- thread_local
  670    child/2.                        % ?Name, ?Child
  671
  672%!  pengine_register_local(+Id, +Thread, +Queue, +URL, +App, +Destroy) is det.
  673%!  pengine_register_remote(+Id, +URL, +Queue, +App, +Destroy) is det.
  674%!  pengine_unregister(+Id) is det.
  675
  676pengine_register_local(Id, Thread, Queue, URL, Application, Destroy) :-
  677    asserta(current_pengine(Id, Queue, Thread, URL, Application, Destroy)).
  678
  679pengine_register_remote(Id, URL, Application, Destroy) :-
  680    thread_self(Queue),
  681    asserta(current_pengine(Id, Queue, 0, URL, Application, Destroy)).
  682
  683%!  pengine_unregister(+Id)
  684%
  685%   Called by the pengine thread  destruction.   If  we are a remote
  686%   pengine thread, our URL  equals  =http=   and  the  queue is the
  687%   message queue used to send events to the HTTP workers.
  688
  689pengine_unregister(Id) :-
  690    thread_self(Me),
  691    (   current_pengine(Id, Queue, Me, http, _, _)
  692    ->  with_mutex(pengine, sync_destroy_queue_from_pengine(Id, Queue))
  693    ;   true
  694    ),
  695    retractall(current_pengine(Id, _, Me, _, _, _)),
  696    retractall(pengine_user(Id, _)),
  697    retractall(pengine_data(Id, _)).
  698
  699pengine_unregister_remote(Id) :-
  700    retractall(current_pengine(Id, _Parent, 0, _, _, _)).
  701
  702%!  pengine_self(-Id) is det.
  703%
  704%   True if the current thread is a pengine with Id.
  705
  706pengine_self(Id) :-
  707    thread_self(Thread),
  708    current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy).
  709
  710pengine_parent(Parent) :-
  711    nb_getval(pengine_parent, Parent).
  712
  713pengine_thread(Pengine, Thread) :-
  714    current_pengine(Pengine, _Parent, Thread, _URL, _Application, _Destroy),
  715    Thread \== 0,
  716    !.
  717
  718pengine_remote(Pengine, URL) :-
  719    current_pengine(Pengine, _Parent, 0, URL, _Application, _Destroy).
  720
  721get_pengine_application(Pengine, Application) :-
  722    current_pengine(Pengine, _Parent, _, _URL, Application, _Destroy),
  723    !.
  724
  725get_pengine_module(Pengine, Pengine).
  726
  727:- if(current_predicate(uuid/2)).  728pengine_uuid(Id) :-
  729    uuid(Id, [version(4)]).             % Version 4 is random.
  730:- else.  731pengine_uuid(Id) :-
  732    (   current_prolog_flag(max_integer, Max1)
  733    ->  Max is Max1-1
  734    ;   Max is 1<<128
  735    ),
  736    random_between(0, Max, Num),
  737    atom_number(Id, Num).
  738:- endif.  739
  740%!  protect_pengine(+Id, :Goal) is semidet.
  741%
  742%   Run Goal while protecting the Pengine  Id from being destroyed. Used
  743%   by the HTTP  I/O  routines  to   avoid  that  the  Pengine's  module
  744%   disappears while I/O is in progress. We  use a pool of locks because
  745%   the lock may be held relatively long by output routines.
  746%
  747%   This also runs Goal if the Pengine no longer exists. This deals with
  748%   Pengines terminated through destroy_or_continue/1.
  749%
  750%   @bug After destroy_or_continue/1 takes the destroy route, the module
  751%   may drop-out at any point in time,   resulting  in a possible crash.
  752%   Seems the only safe way out is   to  do (de)serialization inside the
  753%   Pengine.
  754
  755:- meta_predicate protect_pengine(+, 0).  756
  757protect_pengine(Id, Goal) :-
  758    term_hash(Id, Hash),
  759    LockN is Hash mod 64,
  760    atom_concat(pengine_done_, LockN, Lock),
  761    with_mutex(Lock,
  762               (   pengine_thread(Id, _)
  763               ->  Goal
  764               ;   Goal
  765               )).
  766
  767
  768/** pengine_application(+Application) is det.
  769
  770Directive that must be used to declare a pengine application module. The
  771module must not be associated to any   file.  The default application is
  772=pengine_sandbox=.  The  example  below  creates    a   new  application
  773=address_book=  and  imports  the  API  defined    in  the  module  file
  774=adress_book_api.pl= into the application.
  775
  776  ==
  777  :- pengine_application(address_book).
  778  :- use_module(address_book:adress_book_api).
  779  ==
  780*/
  781
  782pengine_application(Application) :-
  783    throw(error(context_error(nodirective,
  784                             pengine_application(Application)), _)).
  785
  786:- multifile
  787    system:term_expansion/2,
  788    current_application/1.  789
  790%!  current_pengine_application(?Application) is nondet.
  791%
  792%   True when Application is a currently defined application.
  793%
  794%   @see pengine_application/1
  795
  796current_pengine_application(Application) :-
  797    current_application(Application).
  798
  799
  800% Default settings for all applications
  801
  802:- setting(thread_pool_size, integer, 100,
  803           'Maximum number of pengines this application can run.').  804:- setting(thread_pool_stacks, list(compound), [],
  805           'Maximum stack sizes for pengines this application can run.').  806:- setting(slave_limit, integer, 3,
  807           'Maximum number of slave pengines a master pengine can create.').  808:- setting(time_limit, number, 300,
  809           'Maximum time to wait for output').  810:- setting(idle_limit, number, 300,
  811           'Pengine auto-destroys when idle for this time').  812:- setting(safe_goal_limit, number, 10,
  813           'Maximum time to try proving safety of the goal').  814:- setting(program_space, integer, 100_000_000,
  815           'Maximum memory used by predicates').  816:- setting(allow_from, list(atom), [*],
  817           'IP addresses from which remotes are allowed to connect').  818:- setting(deny_from, list(atom), [],
  819           'IP addresses from which remotes are NOT allowed to connect').  820:- setting(debug_info, boolean, false,
  821           'Keep information to support source-level debugging').  822
  823
  824system:term_expansion((:- pengine_application(Application)), Expanded) :-
  825    must_be(atom, Application),
  826    (   module_property(Application, file(_))
  827    ->  permission_error(create, pengine_application, Application)
  828    ;   true
  829    ),
  830    expand_term((:- setting(Application:thread_pool_size, integer,
  831                            setting(pengines:thread_pool_size),
  832                            'Maximum number of pengines this \c
  833                            application can run.')),
  834                ThreadPoolSizeSetting),
  835    expand_term((:- setting(Application:thread_pool_stacks, list(compound),
  836                            setting(pengines:thread_pool_stacks),
  837                            'Maximum stack sizes for pengines \c
  838                            this application can run.')),
  839                ThreadPoolStacksSetting),
  840    expand_term((:- setting(Application:slave_limit, integer,
  841                            setting(pengines:slave_limit),
  842                            'Maximum number of local slave pengines \c
  843                            a master pengine can create.')),
  844                SlaveLimitSetting),
  845    expand_term((:- setting(Application:time_limit, number,
  846                            setting(pengines:time_limit),
  847                            'Maximum time to wait for output')),
  848                TimeLimitSetting),
  849    expand_term((:- setting(Application:idle_limit, number,
  850                            setting(pengines:idle_limit),
  851                            'Pengine auto-destroys when idle for this time')),
  852                IdleLimitSetting),
  853    expand_term((:- setting(Application:safe_goal_limit, number,
  854                            setting(pengines:safe_goal_limit),
  855                            'Maximum time to try proving safety of the goal')),
  856                SafeGoalLimitSetting),
  857    expand_term((:- setting(Application:program_space, integer,
  858                            setting(pengines:program_space),
  859                            'Maximum memory used by predicates')),
  860                ProgramSpaceSetting),
  861    expand_term((:- setting(Application:allow_from, list(atom),
  862                            setting(pengines:allow_from),
  863                            'IP addresses from which remotes are allowed \c
  864                            to connect')),
  865                AllowFromSetting),
  866    expand_term((:- setting(Application:deny_from, list(atom),
  867                            setting(pengines:deny_from),
  868                            'IP addresses from which remotes are NOT \c
  869                            allowed to connect')),
  870                DenyFromSetting),
  871    expand_term((:- setting(Application:debug_info, boolean,
  872                            setting(pengines:debug_info),
  873                            'Keep information to support source-level \c
  874                            debugging')),
  875                DebugInfoSetting),
  876    flatten([ pengines:current_application(Application),
  877              ThreadPoolSizeSetting,
  878              ThreadPoolStacksSetting,
  879              SlaveLimitSetting,
  880              TimeLimitSetting,
  881              IdleLimitSetting,
  882              SafeGoalLimitSetting,
  883              ProgramSpaceSetting,
  884              AllowFromSetting,
  885              DenyFromSetting,
  886              DebugInfoSetting
  887            ], Expanded).
  888
  889% Register default application
  890
  891:- pengine_application(pengine_sandbox).  892
  893
  894/** pengine_property(?Pengine, ?Property) is nondet.
  895
  896True when Property is a property of   the  given Pengine. Enumerates all
  897pengines  that  are  known  to  the   calling  Prolog  process.  Defined
  898properties are:
  899
  900  * self(ID)
  901    Identifier of the pengine.  This is the same as the first argument,
  902    and can be used to enumerate all known pengines.
  903  * alias(Name)
  904    Name is the alias name of the pengine, as provided through the
  905    `alias` option when creating the pengine.
  906  * thread(Thread)
  907    If the pengine is a local pengine, Thread is the Prolog thread
  908    identifier of the pengine.
  909  * remote(Server)
  910    If the pengine is remote, the URL of the server.
  911  * application(Application)
  912    Pengine runs the given application
  913  * module(Module)
  914    Temporary module used for running the Pengine.
  915  * destroy(Destroy)
  916    Destroy is =true= if the pengines is destroyed automatically
  917    after completing the query.
  918  * parent(Queue)
  919    Message queue to which the (local) pengine reports.
  920  * source(?SourceID, ?Source)
  921    Source is the source code with the given SourceID. May be present if
  922    the setting `debug_info` is present.
  923  * detached(?Time)
  924    Pengine was detached at Time.
  925*/
  926
  927
  928pengine_property(Id, Prop) :-
  929    nonvar(Id), nonvar(Prop),
  930    pengine_property2(Prop, Id),
  931    !.
  932pengine_property(Id, Prop) :-
  933    pengine_property2(Prop, Id).
  934
  935pengine_property2(self(Id), Id) :-
  936    current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy).
  937pengine_property2(module(Id), Id) :-
  938    current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy).
  939pengine_property2(alias(Alias), Id) :-
  940    child(Alias, Id),
  941    Alias \== Id.
  942pengine_property2(thread(Thread), Id) :-
  943    current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy),
  944    Thread \== 0.
  945pengine_property2(remote(Server), Id) :-
  946    current_pengine(Id, _Parent, 0, Server, _Application, _Destroy).
  947pengine_property2(application(Application), Id) :-
  948    current_pengine(Id, _Parent, _Thread, _Server, Application, _Destroy).
  949pengine_property2(destroy(Destroy), Id) :-
  950    current_pengine(Id, _Parent, _Thread, _Server, _Application, Destroy).
  951pengine_property2(parent(Parent), Id) :-
  952    current_pengine(Id, Parent, _Thread, _URL, _Application, _Destroy).
  953pengine_property2(source(SourceID, Source), Id) :-
  954    pengine_data(Id, source(SourceID, Source)).
  955pengine_property2(detached(When), Id) :-
  956    pengine_detached(Id, When).
  957
  958/** pengine_output(+Term) is det
  959
  960Sends Term to the parent pengine or thread.
  961*/
  962
  963pengine_output(Term) :-
  964    pengine_self(Me),
  965    pengine_reply(output(Me, Term)).
  966
  967
  968/** pengine_debug(+Format, +Args) is det
  969
  970Create a message using format/3 from Format   and  Args and send this to
  971the    client.    The    default    JavaScript    client    will    call
  972=|console.log(Message)|=  if  there  is   a    console.   The  predicate
  973pengine_rpc/3 calls debug(pengine(debug), '~w',   [Message]).  The debug
  974topic pengine(debug) is enabled by default.
  975
  976@see debug/1 and nodebug/1 for controlling the pengine(debug) topic
  977@see format/2 for format specifications
  978*/
  979
  980pengine_debug(Format, Args) :-
  981    pengine_parent(Queue),
  982    pengine_self(Self),
  983    catch(safe_goal(format(atom(_), Format, Args)), E, true),
  984    (   var(E)
  985    ->  format(atom(Message), Format, Args)
  986    ;   message_to_string(E, Message)
  987    ),
  988    pengine_reply(Queue, debug(Self, Message)).
  989
  990
  991/*================= Local pengine =======================
  992*/
  993
  994%!  local_pengine_create(+Options)
  995%
  996%   Creates  a  local   Pengine,   which    is   a   thread  running
  997%   pengine_main/2.  It maintains two predicates:
  998%
  999%     - The global dynamic predicate id/2 relates Pengines to their
 1000%       childs.
 1001%     - The local predicate id/2 maps named childs to their ids.
 1002
 1003local_pengine_create(Options) :-
 1004    thread_self(Self),
 1005    option(application(Application), Options, pengine_sandbox),
 1006    create(Self, Child, Options, local, Application),
 1007    option(alias(Name), Options, Child),
 1008    assert(child(Name, Child)).
 1009
 1010
 1011%!  thread_pool:create_pool(+Application) is det.
 1012%
 1013%   On demand creation of a thread pool for a pengine application.
 1014
 1015:- multifile thread_pool:create_pool/1. 1016
 1017thread_pool:create_pool(Application) :-
 1018    current_application(Application),
 1019    setting(Application:thread_pool_size, Size),
 1020    setting(Application:thread_pool_stacks, Stacks),
 1021    thread_pool_create(Application, Size, Stacks).
 1022
 1023%!  create(+Queue, -Child, +Options, +URL, +Application) is det.
 1024%
 1025%   Create a new pengine thread.
 1026%
 1027%   @arg Queue is the queue (or thread handle) to report to
 1028%   @arg Child is the identifier of the created pengine.
 1029%   @arg URL is one of =local= or =http=
 1030
 1031create(Queue, Child, Options, local, Application) :-
 1032    !,
 1033    pengine_child_id(Child),
 1034    create0(Queue, Child, Options, local, Application).
 1035create(Queue, Child, Options, URL, Application) :-
 1036    pengine_child_id(Child),
 1037    catch(create0(Queue, Child, Options, URL, Application),
 1038          Error,
 1039          create_error(Queue, Child, Error)).
 1040
 1041pengine_child_id(Child) :-
 1042    (   nonvar(Child)
 1043    ->  true
 1044    ;   pengine_uuid(Child)
 1045    ).
 1046
 1047create_error(Queue, Child, Error) :-
 1048    pengine_reply(Queue, error(Child, Error)).
 1049
 1050create0(Queue, Child, Options, URL, Application) :-
 1051    (  current_application(Application)
 1052    -> true
 1053    ;  existence_error(pengine_application, Application)
 1054    ),
 1055    (   URL \== http                    % pengine is _not_ a child of the
 1056                                        % HTTP server thread
 1057    ->  aggregate_all(count, child(_,_), Count),
 1058        setting(Application:slave_limit, Max),
 1059        (   Count >= Max
 1060        ->  throw(error(resource_error(max_pengines), _))
 1061        ;   true
 1062        )
 1063    ;   true
 1064    ),
 1065    partition(pengine_create_option, Options, PengineOptions, RestOptions),
 1066    thread_create_in_pool(
 1067        Application,
 1068        pengine_main(Queue, PengineOptions, Application), ChildThread,
 1069        [ at_exit(pengine_done)
 1070        | RestOptions
 1071        ]),
 1072    option(destroy(Destroy), PengineOptions, true),
 1073    pengine_register_local(Child, ChildThread, Queue, URL, Application, Destroy),
 1074    thread_send_message(ChildThread, pengine_registered(Child)),
 1075    (   option(id(Id), Options)
 1076    ->  Id = Child
 1077    ;   true
 1078    ).
 1079
 1080pengine_create_option(src_text(_)).
 1081pengine_create_option(src_url(_)).
 1082pengine_create_option(application(_)).
 1083pengine_create_option(destroy(_)).
 1084pengine_create_option(ask(_)).
 1085pengine_create_option(template(_)).
 1086pengine_create_option(bindings(_)).
 1087pengine_create_option(chunk(_)).
 1088pengine_create_option(alias(_)).
 1089pengine_create_option(user(_)).
 1090
 1091
 1092%!  pengine_done is det.
 1093%
 1094%   Called from the pengine thread   `at_exit`  option. Destroys _child_
 1095%   pengines  using  pengine_destroy/1.  Cleaning  up   the  Pengine  is
 1096%   synchronised by the `pengine_done` mutex. See read_event/6.
 1097
 1098:- public
 1099    pengine_done/0. 1100
 1101pengine_done :-
 1102    thread_self(Me),
 1103    (   thread_property(Me, status(exception('$aborted'))),
 1104        thread_detach(Me),
 1105        pengine_self(Pengine)
 1106    ->  catch(pengine_reply(destroy(Pengine, abort(Pengine))),
 1107              error(_,_), true)
 1108    ;   true
 1109    ),
 1110    forall(child(_Name, Child),
 1111           pengine_destroy(Child)),
 1112    pengine_self(Id),
 1113    protect_pengine(Id, pengine_unregister(Id)).
 1114
 1115
 1116%!  pengine_main(+Parent, +Options, +Application)
 1117%
 1118%   Run a pengine main loop. First acknowledges its creation and run
 1119%   pengine_main_loop/1.
 1120
 1121:- thread_local wrap_first_answer_in_create_event/2. 1122
 1123:- meta_predicate
 1124    pengine_prepare_source(:, +). 1125
 1126pengine_main(Parent, Options, Application) :-
 1127    fix_streams,
 1128    thread_get_message(pengine_registered(Self)),
 1129    nb_setval(pengine_parent, Parent),
 1130    pengine_register_user(Options),
 1131    set_prolog_flag(mitigate_spectre, true),
 1132    catch(in_temporary_module(
 1133              Self,
 1134              pengine_prepare_source(Application, Options),
 1135              pengine_create_and_loop(Self, Application, Options)),
 1136          prepare_source_failed,
 1137          pengine_terminate(Self)).
 1138
 1139pengine_create_and_loop(Self, Application, Options) :-
 1140    setting(Application:slave_limit, SlaveLimit),
 1141    CreateEvent = create(Self, [slave_limit(SlaveLimit)|Extra]),
 1142    (   option(ask(Query0), Options)
 1143    ->  asserta(wrap_first_answer_in_create_event(CreateEvent, Extra)),
 1144        (   string(Query0)                      % string is not callable
 1145        ->  (   option(template(TemplateS), Options)
 1146            ->  Ask2 = Query0-TemplateS
 1147            ;   Ask2 = Query0
 1148            ),
 1149            catch(ask_to_term(Ask2, Self, Query, Template, Bindings),
 1150                  Error, true),
 1151            (   var(Error)
 1152            ->  true
 1153            ;   send_error(Error),
 1154                throw(prepare_source_failed)
 1155            )
 1156        ;   Query = Query0,
 1157            option(template(Template), Options, Query),
 1158            option(bindings(Bindings), Options, [])
 1159        ),
 1160        option(chunk(Chunk), Options, 1),
 1161        pengine_ask(Self, Query,
 1162                    [ template(Template),
 1163                      chunk(Chunk),
 1164                      bindings(Bindings)
 1165                    ])
 1166    ;   Extra = [],
 1167        pengine_reply(CreateEvent)
 1168    ),
 1169    pengine_main_loop(Self).
 1170
 1171
 1172%!  ask_to_term(+AskSpec, +Module, -Options, OptionsTail) is det.
 1173%
 1174%   Translate the AskSpec into a query, template and bindings. The trick
 1175%   is that we must parse using the  operator declarations of the source
 1176%   and we must make sure  variable   sharing  between  query and answer
 1177%   template are known.
 1178
 1179ask_to_term(Ask-Template, Module, Ask1, Template1, Bindings) :-
 1180    !,
 1181    format(string(AskTemplate), 't((~s),(~s))', [Template, Ask]),
 1182    term_string(t(Template1,Ask1), AskTemplate,
 1183                [ variable_names(Bindings0),
 1184                  module(Module)
 1185                ]),
 1186    phrase(template_bindings(Template1, Bindings0), Bindings).
 1187ask_to_term(Ask, Module, Ask1, Template, Bindings1) :-
 1188    term_string(Ask1, Ask,
 1189                [ variable_names(Bindings),
 1190                  module(Module)
 1191                ]),
 1192    exclude(anon, Bindings, Bindings1),
 1193    dict_create(Template, swish_default_template, Bindings1).
 1194
 1195template_bindings(Var, Bindings) -->
 1196    { var(Var) }, !,
 1197    (   { var_binding(Bindings, Var, Binding)
 1198        }
 1199    ->  [Binding]
 1200    ;   []
 1201    ).
 1202template_bindings([H|T], Bindings) -->
 1203    !,
 1204    template_bindings(H, Bindings),
 1205    template_bindings(T, Bindings).
 1206template_bindings(Compoound, Bindings) -->
 1207    { compound(Compoound), !,
 1208      compound_name_arguments(Compoound, _, Args)
 1209    },
 1210    template_bindings(Args, Bindings).
 1211template_bindings(_, _) --> [].
 1212
 1213var_binding(Bindings, Var, Binding) :-
 1214    member(Binding, Bindings),
 1215    arg(2, Binding, V),
 1216    V == Var, !.
 1217
 1218%!  fix_streams is det.
 1219%
 1220%   If we are a pengine that is   created  from a web server thread,
 1221%   the current output points to a CGI stream.
 1222
 1223fix_streams :-
 1224    fix_stream(current_output).
 1225
 1226fix_stream(Name) :-
 1227    is_cgi_stream(Name),
 1228    !,
 1229    debug(pengine(stream), '~w is a CGI stream!', [Name]),
 1230    set_stream(user_output, alias(Name)).
 1231fix_stream(_).
 1232
 1233%!  pengine_prepare_source(:Application, +Options) is det.
 1234%
 1235%   Load the source into the pengine's module.
 1236%
 1237%   @throws =prepare_source_failed= if it failed to prepare the
 1238%           sources.
 1239
 1240pengine_prepare_source(Module:Application, Options) :-
 1241    setting(Application:program_space, SpaceLimit),
 1242    set_module(Module:program_space(SpaceLimit)),
 1243    delete_import_module(Module, user),
 1244    add_import_module(Module, Application, start),
 1245    catch(prep_module(Module, Application, Options), Error, true),
 1246    (   var(Error)
 1247    ->  true
 1248    ;   send_error(Error),
 1249        throw(prepare_source_failed)
 1250    ).
 1251
 1252prep_module(Module, Application, Options) :-
 1253    maplist(copy_flag(Module, Application), [var_prefix]),
 1254    forall(prepare_module(Module, Application, Options), true),
 1255    setup_call_cleanup(
 1256        '$set_source_module'(OldModule, Module),
 1257        maplist(process_create_option(Module), Options),
 1258        '$set_source_module'(OldModule)).
 1259
 1260copy_flag(Module, Application, Flag) :-
 1261    current_prolog_flag(Application:Flag, Value),
 1262    !,
 1263    set_prolog_flag(Module:Flag, Value).
 1264copy_flag(_, _, _).
 1265
 1266process_create_option(Application, src_text(Text)) :-
 1267    !,
 1268    pengine_src_text(Text, Application).
 1269process_create_option(Application, src_url(URL)) :-
 1270    !,
 1271    pengine_src_url(URL, Application).
 1272process_create_option(_, _).
 1273
 1274
 1275%!  prepare_module(+Module, +Application, +Options) is semidet.
 1276%
 1277%   Hook, called to initialize  the   temporary  private module that
 1278%   provides the working context of a pengine. This hook is executed
 1279%   by the pengine's thread.  Preparing the source consists of three
 1280%   steps:
 1281%
 1282%     1. Add Application as (first) default import module for Module
 1283%     2. Call this hook
 1284%     3. Compile the source provided by the the `src_text` and
 1285%        `src_url` options
 1286%
 1287%   @arg    Module is a new temporary module (see
 1288%           in_temporary_module/3) that may be (further) prepared
 1289%           by this hook.
 1290%   @arg    Application (also a module) associated to the pengine.
 1291%   @arg    Options is passed from the environment and should
 1292%           (currently) be ignored.
 1293
 1294
 1295pengine_main_loop(ID) :-
 1296    catch(guarded_main_loop(ID), abort_query, pengine_aborted(ID)).
 1297
 1298pengine_aborted(ID) :-
 1299    thread_self(Self),
 1300    debug(pengine(abort), 'Aborting ~p (thread ~p)', [ID, Self]),
 1301    empty_queue,
 1302    destroy_or_continue(abort(ID)).
 1303
 1304
 1305%!  guarded_main_loop(+Pengine) is det.
 1306%
 1307%   Executes state `2' of  the  pengine,   where  it  waits  for two
 1308%   events:
 1309%
 1310%     - destroy
 1311%     Terminate the pengine
 1312%     - ask(:Goal, +Options)
 1313%     Solve Goal.
 1314
 1315guarded_main_loop(ID) :-
 1316    pengine_request(Request),
 1317    (   Request = destroy
 1318    ->  debug(pengine(transition), '~q: 2 = ~q => 1', [ID, destroy]),
 1319        pengine_terminate(ID)
 1320    ;   Request = ask(Goal, Options)
 1321    ->  debug(pengine(transition), '~q: 2 = ~q => 3', [ID, ask(Goal)]),
 1322        ask(ID, Goal, Options)
 1323    ;   debug(pengine(transition), '~q: 2 = ~q => 2', [ID, protocol_error]),
 1324        pengine_reply(error(ID, error(protocol_error, _))),
 1325        guarded_main_loop(ID)
 1326    ).
 1327
 1328
 1329pengine_terminate(ID) :-
 1330    pengine_reply(destroy(ID)),
 1331    thread_self(Me),            % Make the thread silently disappear
 1332    thread_detach(Me).
 1333
 1334
 1335%!  solve(+Chunk, +Template, :Goal, +ID) is det.
 1336%
 1337%   Solve Goal. Note that because we can ask for a new goal in state
 1338%   `6', we must provide for an ancesteral cut (prolog_cut_to/1). We
 1339%   need to be sure to  have  a   choice  point  before  we can call
 1340%   prolog_current_choice/1. This is the reason   why this predicate
 1341%   has two clauses.
 1342
 1343solve(Chunk, Template, Goal, ID) :-
 1344    prolog_current_choice(Choice),
 1345    (   integer(Chunk)
 1346    ->  State = count(Chunk)
 1347    ;   Chunk == false
 1348    ->  State = no_chunk
 1349    ;   domain_error(chunk, Chunk)
 1350    ),
 1351    statistics(cputime, Epoch),
 1352    Time = time(Epoch),
 1353    nb_current('$variable_names', Bindings),
 1354    filter_template(Template, Bindings, Template2),
 1355    '$current_typein_module'(CurrTypeIn),
 1356    (   '$set_typein_module'(ID),
 1357        call_cleanup(catch(findnsols_no_empty(State, Template2,
 1358                                              set_projection(Goal, Bindings),
 1359                                              Result),
 1360                           Error, true),
 1361                     query_done(Det, CurrTypeIn)),
 1362        arg(1, Time, T0),
 1363        statistics(cputime, T1),
 1364        CPUTime is T1-T0,
 1365        forall(pengine_flush_output_hook, true),
 1366        (   var(Error)
 1367        ->  projection(Projection),
 1368            (   var(Det)
 1369            ->  pengine_reply(success(ID, Result, Projection,
 1370                                      CPUTime, true)),
 1371                more_solutions(ID, Choice, State, Time)
 1372            ;   !,                      % commit
 1373                destroy_or_continue(success(ID, Result, Projection,
 1374                                            CPUTime, false))
 1375            )
 1376        ;   !,                          % commit
 1377            (   Error == abort_query
 1378            ->  throw(Error)
 1379            ;   destroy_or_continue(error(ID, Error))
 1380            )
 1381        )
 1382    ;   !,                              % commit
 1383        arg(1, Time, T0),
 1384        statistics(cputime, T1),
 1385        CPUTime is T1-T0,
 1386        destroy_or_continue(failure(ID, CPUTime))
 1387    ).
 1388solve(_, _, _, _).                      % leave a choice point
 1389
 1390query_done(true, CurrTypeIn) :-
 1391    '$set_typein_module'(CurrTypeIn).
 1392
 1393
 1394%!  set_projection(:Goal, +Bindings)
 1395%
 1396%   findnsols_no_empty/4  copies  its  goal  and    template   to  avoid
 1397%   instantiation thereof when it stops after finding N solutions. Using
 1398%   this helper we can a renamed version of Bindings that we can set.
 1399
 1400set_projection(Goal, Bindings) :-
 1401    b_setval('$variable_names', Bindings),
 1402    call(Goal).
 1403
 1404projection(Projection) :-
 1405    nb_current('$variable_names', Bindings),
 1406    !,
 1407    maplist(var_name, Bindings, Projection).
 1408projection([]).
 1409
 1410%!  filter_template(+Template0, +Bindings, -Template) is det.
 1411%
 1412%   Establish the final template. This is   there  because hooks such as
 1413%   goal_expansion/2 and the SWISH query  hooks   can  modify the set of
 1414%   bindings.
 1415%
 1416%   @bug Projection and template handling is pretty messy.
 1417
 1418filter_template(Template0, Bindings, Template) :-
 1419    is_dict(Template0, swish_default_template),
 1420    !,
 1421    dict_create(Template, swish_default_template, Bindings).
 1422filter_template(Template, _Bindings, Template).
 1423
 1424findnsols_no_empty(no_chunk, Template, Goal, List) =>
 1425    List = [Template],
 1426    call(Goal).
 1427findnsols_no_empty(State, Template, Goal, List) =>
 1428    findnsols(State, Template, Goal, List),
 1429    List \== [].
 1430
 1431destroy_or_continue(Event) :-
 1432    arg(1, Event, ID),
 1433    (   pengine_property(ID, destroy(true))
 1434    ->  thread_self(Me),
 1435        thread_detach(Me),
 1436        pengine_reply(destroy(ID, Event))
 1437    ;   pengine_reply(Event),
 1438        guarded_main_loop(ID)
 1439    ).
 1440
 1441%!  more_solutions(+Pengine, +Choice, +State, +Time)
 1442%
 1443%   Called after a solution was found while  there can be more. This
 1444%   is state `6' of the state machine. It processes these events:
 1445%
 1446%     * stop
 1447%     Go back via state `7' to state `2' (guarded_main_loop/1)
 1448%     * next
 1449%     Fail.  This causes solve/3 to backtrack on the goal asked,
 1450%     providing at most the current `chunk` solutions.
 1451%     * next(Count)
 1452%     As `next`, but sets the new chunk-size to Count.
 1453%     * ask(Goal, Options)
 1454%     Ask another goal.  Note that we must commit the choice point
 1455%     of the previous goal asked for.
 1456
 1457more_solutions(ID, Choice, State, Time) :-
 1458    pengine_request(Event),
 1459    more_solutions(Event, ID, Choice, State, Time).
 1460
 1461more_solutions(stop, ID, _Choice, _State, _Time) :-
 1462    !,
 1463    debug(pengine(transition), '~q: 6 = ~q => 7', [ID, stop]),
 1464    destroy_or_continue(stop(ID)).
 1465more_solutions(next, ID, _Choice, _State, Time) :-
 1466    !,
 1467    debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next]),
 1468    statistics(cputime, T0),
 1469    nb_setarg(1, Time, T0),
 1470    fail.
 1471more_solutions(next(Count), ID, _Choice, State, Time) :-
 1472    Count > 0,
 1473    State = count(_),                   % else fallthrough to protocol error
 1474    !,
 1475    debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next(Count)]),
 1476    nb_setarg(1, State, Count),
 1477    statistics(cputime, T0),
 1478    nb_setarg(1, Time, T0),
 1479    fail.
 1480more_solutions(ask(Goal, Options), ID, Choice, _State, _Time) :-
 1481    !,
 1482    debug(pengine(transition), '~q: 6 = ~q => 3', [ID, ask(Goal)]),
 1483    prolog_cut_to(Choice),
 1484    ask(ID, Goal, Options).
 1485more_solutions(destroy, ID, _Choice, _State, _Time) :-
 1486    !,
 1487    debug(pengine(transition), '~q: 6 = ~q => 1', [ID, destroy]),
 1488    pengine_terminate(ID).
 1489more_solutions(Event, ID, Choice, State, Time) :-
 1490    debug(pengine(transition), '~q: 6 = ~q => 6', [ID, protocol_error(Event)]),
 1491    pengine_reply(error(ID, error(protocol_error, _))),
 1492    more_solutions(ID, Choice, State, Time).
 1493
 1494%!  ask(+Pengine, :Goal, +Options)
 1495%
 1496%   Migrate from state `2' to `3'.  This predicate validates that it
 1497%   is safe to call Goal using safe_goal/1 and then calls solve/3 to
 1498%   prove the goal. It takes care of the chunk(N) option.
 1499
 1500ask(ID, Goal, Options) :-
 1501    catch(prepare_goal(ID, Goal, Goal1, Options), Error, true),
 1502    !,
 1503    (   var(Error)
 1504    ->  option(template(Template), Options, Goal),
 1505        option(chunk(N), Options, 1),
 1506        solve(N, Template, Goal1, ID)
 1507    ;   pengine_reply(error(ID, Error)),
 1508        guarded_main_loop(ID)
 1509    ).
 1510
 1511%!  prepare_goal(+Pengine, +GoalIn, -GoalOut, +Options) is det.
 1512%
 1513%   Prepare GoalIn for execution in Pengine.   This  implies we must
 1514%   perform goal expansion and, if the   system  is sandboxed, check
 1515%   the sandbox.
 1516%
 1517%   Note that expand_goal(Module:GoalIn, GoalOut) is  what we'd like
 1518%   to write, but this does not work correctly if the user wishes to
 1519%   expand `X:Y` while interpreting `X` not   as the module in which
 1520%   to run `Y`. This happens in the  CQL package. Possibly we should
 1521%   disallow this reinterpretation?
 1522
 1523prepare_goal(ID, Goal0, Module:Goal, Options) :-
 1524    option(bindings(Bindings), Options, []),
 1525    b_setval('$variable_names', Bindings),
 1526    (   prepare_goal(Goal0, Goal1, Options)
 1527    ->  true
 1528    ;   Goal1 = Goal0
 1529    ),
 1530    get_pengine_module(ID, Module),
 1531    setup_call_cleanup(
 1532        '$set_source_module'(Old, Module),
 1533        expand_goal(Goal1, Goal),
 1534        '$set_source_module'(_, Old)),
 1535    (   pengine_not_sandboxed(ID)
 1536    ->  true
 1537    ;   get_pengine_application(ID, App),
 1538        setting(App:safe_goal_limit, Limit),
 1539        catch(call_with_time_limit(
 1540                  Limit,
 1541                  safe_goal(Module:Goal)), E, true)
 1542    ->  (   var(E)
 1543        ->  true
 1544        ;   E = time_limit_exceeded
 1545        ->  throw(error(sandbox(time_limit_exceeded, Limit),_))
 1546        ;   throw(E)
 1547        )
 1548    ).
 1549
 1550
 1551%%  prepare_goal(+Goal0, -Goal1, +Options) is semidet.
 1552%
 1553%   Pre-preparation hook for running Goal0. The hook runs in the context
 1554%   of the pengine. Goal is the raw   goal  given to _ask_. The returned
 1555%   Goal1 is subject  to  goal   expansion  (expand_goal/2)  and sandbox
 1556%   validation (safe_goal/1) prior to  execution.   If  this goal fails,
 1557%   Goal0 is used for further processing.
 1558%
 1559%   @arg Options provides the options as given to _ask_
 1560
 1561
 1562%%  pengine_not_sandboxed(+Pengine) is semidet.
 1563%
 1564%   True when pengine does not operate in sandboxed mode. This implies a
 1565%   user must be  registered  by   authentication_hook/3  and  the  hook
 1566%   pengines:not_sandboxed(User, Application) must succeed.
 1567
 1568pengine_not_sandboxed(ID) :-
 1569    pengine_user(ID, User),
 1570    pengine_property(ID, application(App)),
 1571    not_sandboxed(User, App),
 1572    !.
 1573
 1574%%  not_sandboxed(+User, +Application) is semidet.
 1575%
 1576%   This hook is called to see whether the Pengine must be executed in a
 1577%   protected environment. It is only called after authentication_hook/3
 1578%   has confirmed the authentity  of  the   current  user.  If this hook
 1579%   succeeds, both loading the code and  executing the query is executed
 1580%   without enforcing sandbox security.  Typically, one should:
 1581%
 1582%     1. Provide a safe user authentication hook.
 1583%     2. Enable HTTPS in the server or put it behind an HTTPS proxy and
 1584%        ensure that the network between the proxy and the pengine
 1585%        server can be trusted.
 1586
 1587
 1588/** pengine_pull_response(+Pengine, +Options) is det
 1589
 1590Pulls a response (an event term) from the  slave Pengine if Pengine is a
 1591remote process, else does nothing at all.
 1592*/
 1593
 1594pengine_pull_response(Pengine, Options) :-
 1595    pengine_remote(Pengine, Server),
 1596    !,
 1597    remote_pengine_pull_response(Server, Pengine, Options).
 1598pengine_pull_response(_ID, _Options).
 1599
 1600
 1601/** pengine_input(+Prompt, -Term) is det
 1602
 1603Sends Prompt to the master (parent) pengine and waits for input. Note that Prompt may be
 1604any term, compound as well as atomic.
 1605*/
 1606
 1607pengine_input(Prompt, Term) :-
 1608    pengine_self(Self),
 1609    pengine_parent(Parent),
 1610    pengine_reply(Parent, prompt(Self, Prompt)),
 1611    pengine_request(Request),
 1612    (   Request = input(Input)
 1613    ->  Term = Input
 1614    ;   Request == destroy
 1615    ->  abort
 1616    ;   throw(error(protocol_error,_))
 1617    ).
 1618
 1619
 1620/** pengine_respond(+Pengine, +Input, +Options) is det
 1621
 1622Sends a response in the form of the term Input to a slave (child) pengine
 1623that has prompted its master (parent) for input.
 1624
 1625Defined in terms of pengine_send/3, as follows:
 1626
 1627==
 1628pengine_respond(Pengine, Input, Options) :-
 1629    pengine_send(Pengine, input(Input), Options).
 1630==
 1631
 1632*/
 1633
 1634pengine_respond(Pengine, Input, Options) :-
 1635    pengine_send(Pengine, input(Input), Options).
 1636
 1637
 1638%!  send_error(+Error) is det.
 1639%
 1640%   Send an error to my parent.   Remove non-readable blobs from the
 1641%   error term first using replace_blobs/2. If  the error contains a
 1642%   stack-trace, this is resolved to a string before sending.
 1643
 1644send_error(error(Formal, context(prolog_stack(Frames), Message))) :-
 1645    is_list(Frames),
 1646    !,
 1647    with_output_to(string(Stack),
 1648                   print_prolog_backtrace(current_output, Frames)),
 1649    pengine_self(Self),
 1650    replace_blobs(Formal, Formal1),
 1651    replace_blobs(Message, Message1),
 1652    pengine_reply(error(Self, error(Formal1,
 1653                                    context(prolog_stack(Stack), Message1)))).
 1654send_error(Error) :-
 1655    pengine_self(Self),
 1656    replace_blobs(Error, Error1),
 1657    pengine_reply(error(Self, Error1)).
 1658
 1659%!  replace_blobs(Term0, Term) is det.
 1660%
 1661%   Copy Term0 to Term, replacing non-text   blobs. This is required
 1662%   for error messages that may hold   streams  and other handles to
 1663%   non-readable objects.
 1664
 1665replace_blobs(Blob, Atom) :-
 1666    blob(Blob, Type), Type \== text,
 1667    !,
 1668    format(atom(Atom), '~p', [Blob]).
 1669replace_blobs(Term0, Term) :-
 1670    compound(Term0),
 1671    !,
 1672    compound_name_arguments(Term0, Name, Args0),
 1673    maplist(replace_blobs, Args0, Args),
 1674    compound_name_arguments(Term, Name, Args).
 1675replace_blobs(Term, Term).
 1676
 1677
 1678/*================= Remote pengines =======================
 1679*/
 1680
 1681
 1682remote_pengine_create(BaseURL, Options) :-
 1683    partition(pengine_create_option, Options, PengineOptions0, RestOptions),
 1684        (       option(ask(Query), PengineOptions0),
 1685                \+ option(template(_Template), PengineOptions0)
 1686        ->      PengineOptions = [template(Query)|PengineOptions0]
 1687        ;       PengineOptions = PengineOptions0
 1688        ),
 1689    options_to_dict(PengineOptions, PostData),
 1690    remote_post_rec(BaseURL, create, PostData, Reply, RestOptions),
 1691    arg(1, Reply, ID),
 1692    (   option(id(ID2), Options)
 1693    ->  ID = ID2
 1694    ;   true
 1695    ),
 1696    option(alias(Name), Options, ID),
 1697    assert(child(Name, ID)),
 1698    (   (   functor(Reply, create, _)   % actually created
 1699        ;   functor(Reply, output, _)   % compiler messages
 1700        )
 1701    ->  option(application(Application), PengineOptions, pengine_sandbox),
 1702        option(destroy(Destroy), PengineOptions, true),
 1703        pengine_register_remote(ID, BaseURL, Application, Destroy)
 1704    ;   true
 1705    ),
 1706    thread_self(Queue),
 1707    pengine_reply(Queue, Reply).
 1708
 1709options_to_dict(Options, Dict) :-
 1710    select_option(ask(Ask), Options, Options1),
 1711    select_option(template(Template), Options1, Options2),
 1712    !,
 1713    no_numbered_var_in(Ask+Template),
 1714    findall(AskString-TemplateString,
 1715            ask_template_to_strings(Ask, Template, AskString, TemplateString),
 1716            [ AskString-TemplateString ]),
 1717    options_to_dict(Options2, Dict0),
 1718    Dict = Dict0.put(_{ask:AskString,template:TemplateString}).
 1719options_to_dict(Options, Dict) :-
 1720    maplist(prolog_option, Options, Options1),
 1721    dict_create(Dict, _, Options1).
 1722
 1723no_numbered_var_in(Term) :-
 1724    sub_term(Sub, Term),
 1725    subsumes_term('$VAR'(_), Sub),
 1726    !,
 1727    domain_error(numbered_vars_free_term, Term).
 1728no_numbered_var_in(_).
 1729
 1730ask_template_to_strings(Ask, Template, AskString, TemplateString) :-
 1731    numbervars(Ask+Template, 0, _),
 1732    WOpts = [ numbervars(true), ignore_ops(true), quoted(true) ],
 1733    format(string(AskTemplate), '~W\n~W', [ Ask, WOpts,
 1734                                            Template, WOpts
 1735                                          ]),
 1736    split_string(AskTemplate, "\n", "", [AskString, TemplateString]).
 1737
 1738prolog_option(Option0, Option) :-
 1739    create_option_type(Option0, term),
 1740    !,
 1741    Option0 =.. [Name,Value],
 1742    format(string(String), '~k', [Value]),
 1743    Option =.. [Name,String].
 1744prolog_option(Option, Option).
 1745
 1746create_option_type(ask(_),         term).
 1747create_option_type(template(_),    term).
 1748create_option_type(application(_), atom).
 1749
 1750remote_pengine_send(BaseURL, ID, Event, Options) :-
 1751    remote_send_rec(BaseURL, send, ID, [event=Event], Reply, Options),
 1752    thread_self(Queue),
 1753    pengine_reply(Queue, Reply).
 1754
 1755remote_pengine_pull_response(BaseURL, ID, Options) :-
 1756    remote_send_rec(BaseURL, pull_response, ID, [], Reply, Options),
 1757    thread_self(Queue),
 1758    pengine_reply(Queue, Reply).
 1759
 1760remote_pengine_abort(BaseURL, ID, Options) :-
 1761    remote_send_rec(BaseURL, abort, ID, [], Reply, Options),
 1762    thread_self(Queue),
 1763    pengine_reply(Queue, Reply).
 1764
 1765%!  remote_send_rec(+Server, +Action, +ID, +Params, -Reply, +Options)
 1766%
 1767%   Issue a GET request on Server and   unify Reply with the replied
 1768%   term.
 1769
 1770remote_send_rec(Server, Action, ID, [event=Event], Reply, Options) :-
 1771    !,
 1772    server_url(Server, Action, [id=ID], URL),
 1773    http_open(URL, Stream,              % putting this in setup_call_cleanup/3
 1774              [ post(prolog(Event))     % makes it impossible to interrupt.
 1775              | Options
 1776              ]),
 1777    call_cleanup(
 1778        read_prolog_reply(Stream, Reply),
 1779        close(Stream)).
 1780remote_send_rec(Server, Action, ID, Params, Reply, Options) :-
 1781    server_url(Server, Action, [id=ID|Params], URL),
 1782    http_open(URL, Stream, Options),
 1783    call_cleanup(
 1784        read_prolog_reply(Stream, Reply),
 1785        close(Stream)).
 1786
 1787remote_post_rec(Server, Action, Data, Reply, Options) :-
 1788    server_url(Server, Action, [], URL),
 1789    probe(Action, URL),
 1790    http_open(URL, Stream,
 1791              [ post(json(Data))
 1792              | Options
 1793              ]),
 1794    call_cleanup(
 1795        read_prolog_reply(Stream, Reply),
 1796        close(Stream)).
 1797
 1798%!  probe(+Action, +URL) is det.
 1799%
 1800%   Probe the target. This is a  good   idea  before posting a large
 1801%   document and be faced with an authentication challenge. Possibly
 1802%   we should make this an option for simpler scenarios.
 1803
 1804probe(create, URL) :-
 1805    !,
 1806    http_open(URL, Stream, [method(options)]),
 1807    close(Stream).
 1808probe(_, _).
 1809
 1810read_prolog_reply(In, Reply) :-
 1811    set_stream(In, encoding(utf8)),
 1812    read(In, Reply0),
 1813    rebind_cycles(Reply0, Reply).
 1814
 1815rebind_cycles(@(Reply, Bindings), Reply) :-
 1816    is_list(Bindings),
 1817    !,
 1818    maplist(bind, Bindings).
 1819rebind_cycles(Reply, Reply).
 1820
 1821bind(Var = Value) :-
 1822    Var = Value.
 1823
 1824server_url(Server, Action, Params, URL) :-
 1825    atom_concat('pengine/', Action, PAction),
 1826    uri_edit([ path(PAction),
 1827               search(Params)
 1828             ], Server, URL).
 1829
 1830
 1831/** pengine_event(?EventTerm) is det.
 1832    pengine_event(?EventTerm, +Options) is det.
 1833
 1834Examines the pengine's event queue  and   if  necessary blocks execution
 1835until a term that unifies to Term  arrives   in  the queue. After a term
 1836from the queue has been unified to Term,   the  term is deleted from the
 1837queue.
 1838
 1839   Valid options are:
 1840
 1841   * timeout(+Time)
 1842     Time is a float or integer and specifies the maximum time to wait
 1843     in seconds. If no event has arrived before the time is up EventTerm
 1844     is bound to the atom =timeout=.
 1845   * listen(+Id)
 1846     Only listen to events from the pengine identified by Id.
 1847*/
 1848
 1849pengine_event(Event) :-
 1850    pengine_event(Event, []).
 1851
 1852pengine_event(Event, Options) :-
 1853    thread_self(Self),
 1854    option(listen(Id), Options, _),
 1855    (   thread_get_message(Self, pengine_event(Id, Event), Options)
 1856    ->  true
 1857    ;   Event = timeout
 1858    ),
 1859    update_remote_destroy(Event).
 1860
 1861update_remote_destroy(Event) :-
 1862    destroy_event(Event),
 1863    arg(1, Event, Id),
 1864    pengine_remote(Id, _Server),
 1865    !,
 1866    pengine_unregister_remote(Id).
 1867update_remote_destroy(_).
 1868
 1869destroy_event(destroy(_)).
 1870destroy_event(destroy(_,_)).
 1871destroy_event(create(_,Features)) :-
 1872    memberchk(answer(Answer), Features),
 1873    !,
 1874    nonvar(Answer),
 1875    destroy_event(Answer).
 1876
 1877
 1878/** pengine_event_loop(:Closure, +Options) is det
 1879
 1880Starts an event loop accepting event terms   sent to the current pengine
 1881or thread. For each such  event   E,  calls  ignore(call(Closure, E)). A
 1882closure thus acts as a _handler_  for   the  event. Some events are also
 1883treated specially:
 1884
 1885   * create(ID, Term)
 1886     The ID is placed in a list of active pengines.
 1887
 1888   * destroy(ID)
 1889     The ID is removed from the list of active pengines. When the last
 1890     pengine ID is removed, the loop terminates.
 1891
 1892   * output(ID, Term)
 1893     The predicate pengine_pull_response/2 is called.
 1894
 1895Valid options are:
 1896
 1897   * autoforward(+To)
 1898     Forwards received event terms to slaves. To is either =all=,
 1899     =all_but_sender= or a Prolog list of NameOrIDs. [not yet
 1900     implemented]
 1901
 1902*/
 1903
 1904pengine_event_loop(Closure, Options) :-
 1905    child(_,_),
 1906    !,
 1907    pengine_event(Event),
 1908    (   option(autoforward(all), Options) % TODO: Implement all_but_sender and list of IDs
 1909    ->  forall(child(_,ID), pengine_send(ID, Event))
 1910    ;   true
 1911    ),
 1912    pengine_event_loop(Event, Closure, Options).
 1913pengine_event_loop(_, _).
 1914
 1915:- meta_predicate
 1916    pengine_process_event(+, 1, -, +). 1917
 1918pengine_event_loop(Event, Closure, Options) :-
 1919    pengine_process_event(Event, Closure, Continue, Options),
 1920    (   Continue == true
 1921    ->  pengine_event_loop(Closure, Options)
 1922    ;   true
 1923    ).
 1924
 1925pengine_process_event(create(ID, T), Closure, Continue, Options) :-
 1926    debug(pengine(transition), '~q: 1 = /~q => 2', [ID, create(T)]),
 1927    (   select(answer(First), T, T1)
 1928    ->  ignore(call(Closure, create(ID, T1))),
 1929        pengine_process_event(First, Closure, Continue, Options)
 1930    ;   ignore(call(Closure, create(ID, T))),
 1931        Continue = true
 1932    ).
 1933pengine_process_event(output(ID, Msg), Closure, true, _Options) :-
 1934    debug(pengine(transition), '~q: 3 = /~q => 4', [ID, output(Msg)]),
 1935    ignore(call(Closure, output(ID, Msg))),
 1936    pengine_pull_response(ID, []).
 1937pengine_process_event(debug(ID, Msg), Closure, true, _Options) :-
 1938    debug(pengine(transition), '~q: 3 = /~q => 4', [ID, debug(Msg)]),
 1939    ignore(call(Closure, debug(ID, Msg))),
 1940    pengine_pull_response(ID, []).
 1941pengine_process_event(prompt(ID, Term), Closure, true, _Options) :-
 1942    debug(pengine(transition), '~q: 3 = /~q => 5', [ID, prompt(Term)]),
 1943    ignore(call(Closure, prompt(ID, Term))).
 1944pengine_process_event(success(ID, Sol, _Proj, _Time, More), Closure, true, _) :-
 1945    debug(pengine(transition), '~q: 3 = /~q => 6/2', [ID, success(Sol, More)]),
 1946    ignore(call(Closure, success(ID, Sol, More))).
 1947pengine_process_event(failure(ID, _Time), Closure, true, _Options) :-
 1948    debug(pengine(transition), '~q: 3 = /~q => 2', [ID, failure]),
 1949    ignore(call(Closure, failure(ID))).
 1950pengine_process_event(error(ID, Error), Closure, Continue, _Options) :-
 1951    debug(pengine(transition), '~q: 3 = /~q => 2', [ID, error(Error)]),
 1952    (   call(Closure, error(ID, Error))
 1953    ->  Continue = true
 1954    ;   forall(child(_,Child), pengine_destroy(Child)),
 1955        throw(Error)
 1956    ).
 1957pengine_process_event(stop(ID), Closure, true, _Options) :-
 1958    debug(pengine(transition), '~q: 7 = /~q => 2', [ID, stop]),
 1959    ignore(call(Closure, stop(ID))).
 1960pengine_process_event(destroy(ID, Event), Closure, Continue, Options) :-
 1961    pengine_process_event(Event, Closure, _, Options),
 1962    pengine_process_event(destroy(ID), Closure, Continue, Options).
 1963pengine_process_event(destroy(ID), Closure, true, _Options) :-
 1964    retractall(child(_,ID)),
 1965    debug(pengine(transition), '~q: 1 = /~q => 0', [ID, destroy]),
 1966    ignore(call(Closure, destroy(ID))).
 1967
 1968
 1969/** pengine_rpc(+URL, +Query) is nondet.
 1970    pengine_rpc(+URL, +Query, +Options) is nondet.
 1971
 1972Semantically equivalent to the sequence below,  except that the query is
 1973executed in (and in the Prolog context   of) the pengine server referred
 1974to by URL, rather than locally.
 1975
 1976  ==
 1977    copy_term_nat(Query, Copy),  % attributes are not copied to the server
 1978    call(Copy),			 % executed on server at URL
 1979    Query = Copy.
 1980  ==
 1981
 1982Valid options are:
 1983
 1984    - chunk(+IntegerOrFalse)
 1985      Can be used to reduce the number of network roundtrips being made.
 1986      See pengine_ask/3.
 1987    - timeout(+Time)
 1988      Wait at most Time seconds for the next event from the server.
 1989      The default is defined by the setting `pengines:time_limit`.
 1990
 1991Remaining  options  (except   the   server    option)   are   passed  to
 1992pengine_create/1.
 1993*/
 1994
 1995pengine_rpc(URL, Query) :-
 1996    pengine_rpc(URL, Query, []).
 1997
 1998pengine_rpc(URL, Query, M:Options0) :-
 1999    translate_local_sources(Options0, Options1, M),
 2000    (  option(timeout(_), Options1)
 2001    -> Options = Options1
 2002    ;  setting(time_limit, Limit),
 2003       Options = [timeout(Limit)|Options1]
 2004    ),
 2005    term_variables(Query, Vars),
 2006    Template =.. [v|Vars],
 2007    State = destroy(true),              % modified by process_event/4
 2008    setup_call_catcher_cleanup(
 2009        pengine_create([ ask(Query),
 2010                         template(Template),
 2011                         server(URL),
 2012                         id(Id)
 2013                       | Options
 2014                       ]),
 2015        wait_event(Template, State, [listen(Id)|Options]),
 2016        Why,
 2017        pengine_destroy_and_wait(State, Id, Why)).
 2018
 2019pengine_destroy_and_wait(destroy(true), Id, Why) :-
 2020    !,
 2021    debug(pengine(rpc), 'Destroying RPC because of ~p', [Why]),
 2022    pengine_destroy(Id),
 2023    wait_destroy(Id, 10).
 2024pengine_destroy_and_wait(_, _, Why) :-
 2025    debug(pengine(rpc), 'Not destroying RPC (~p)', [Why]).
 2026
 2027wait_destroy(Id, _) :-
 2028    \+ child(_, Id),
 2029    !.
 2030wait_destroy(Id, N) :-
 2031    pengine_event(Event, [listen(Id),timeout(10)]),
 2032    !,
 2033    (   destroy_event(Event)
 2034    ->  retractall(child(_,Id))
 2035    ;   succ(N1, N)
 2036    ->  wait_destroy(Id, N1)
 2037    ;   debug(pengine(rpc), 'RPC did not answer to destroy ~p', [Id]),
 2038        pengine_unregister_remote(Id),
 2039        retractall(child(_,Id))
 2040    ).
 2041
 2042wait_event(Template, State, Options) :-
 2043    pengine_event(Event, Options),
 2044    debug(pengine(event), 'Received ~p', [Event]),
 2045    process_event(Event, Template, State, Options).
 2046
 2047process_event(create(_ID, Features), Template, State, Options) :-
 2048    memberchk(answer(First), Features),
 2049    process_event(First, Template, State, Options).
 2050process_event(error(_ID, Error), _Template, _, _Options) :-
 2051    throw(Error).
 2052process_event(failure(_ID, _Time), _Template, _, _Options) :-
 2053    fail.
 2054process_event(prompt(ID, Prompt), Template, State, Options) :-
 2055    pengine_rpc_prompt(ID, Prompt, Reply),
 2056    pengine_send(ID, input(Reply)),
 2057    wait_event(Template, State, Options).
 2058process_event(output(ID, Term), Template, State, Options) :-
 2059    pengine_rpc_output(ID, Term),
 2060    pengine_pull_response(ID, Options),
 2061    wait_event(Template, State, Options).
 2062process_event(debug(ID, Message), Template, State, Options) :-
 2063    debug(pengine(debug), '~w', [Message]),
 2064    pengine_pull_response(ID, Options),
 2065    wait_event(Template, State, Options).
 2066process_event(success(_ID, Solutions, _Proj, _Time, false),
 2067              Template, _, _Options) :-
 2068    !,
 2069    member(Template, Solutions).
 2070process_event(success(ID, Solutions, _Proj, _Time, true),
 2071              Template, State, Options) :-
 2072    (   member(Template, Solutions)
 2073    ;   pengine_next(ID, Options),
 2074        wait_event(Template, State, Options)
 2075    ).
 2076process_event(destroy(ID, Event), Template, State, Options) :-
 2077    !,
 2078    retractall(child(_,ID)),
 2079    nb_setarg(1, State, false),
 2080    debug(pengine(destroy), 'State: ~p~n', [State]),
 2081    process_event(Event, Template, State, Options).
 2082% compatibility with older versions of the protocol.
 2083process_event(success(ID, Solutions, Time, More),
 2084              Template, State, Options) :-
 2085    process_event(success(ID, Solutions, _Proj, Time, More),
 2086                  Template, State, Options).
 2087
 2088
 2089pengine_rpc_prompt(ID, Prompt, Term) :-
 2090    prompt(ID, Prompt, Term0),
 2091    !,
 2092    Term = Term0.
 2093pengine_rpc_prompt(_ID, Prompt, Term) :-
 2094    setup_call_cleanup(
 2095        prompt(Old, Prompt),
 2096        read(Term),
 2097        prompt(_, Old)).
 2098
 2099pengine_rpc_output(ID, Term) :-
 2100    output(ID, Term),
 2101    !.
 2102pengine_rpc_output(_ID, Term) :-
 2103    print(Term).
 2104
 2105%%  prompt(+ID, +Prompt, -Term) is semidet.
 2106%
 2107%   Hook to handle pengine_input/2 from the remote pengine. If the hooks
 2108%   fails, pengine_rpc/3 calls read/1 using the current prompt.
 2109
 2110:- multifile prompt/3. 2111
 2112%%  output(+ID, +Term) is semidet.
 2113%
 2114%   Hook to handle pengine_output/1 from the remote pengine. If the hook
 2115%   fails, it calls print/1 on Term.
 2116
 2117:- multifile output/2. 2118
 2119
 2120/*================= HTTP handlers =======================
 2121*/
 2122
 2123%   Declare  HTTP  locations  we  serve  and   how.  Note  that  we  use
 2124%   time_limit(inifinite) because pengines have their  own timeout. Also
 2125%   note that we use spawn. This  is   needed  because we can easily get
 2126%   many clients waiting for  some  action   on  a  pengine to complete.
 2127%   Without spawning, we would quickly exhaust   the  worker pool of the
 2128%   HTTP server.
 2129%
 2130%   FIXME: probably we should wait for a   short time for the pengine on
 2131%   the default worker thread. Only if  that   time  has expired, we can
 2132%   call http_spawn/2 to continue waiting on   a  new thread. That would
 2133%   improve the performance and reduce the usage of threads.
 2134
 2135:- http_handler(root(pengine),               http_404([]),
 2136                [ id(pengines) ]). 2137:- http_handler(root(pengine/create),        http_pengine_create,
 2138                [ time_limit(infinite), spawn([]) ]). 2139:- http_handler(root(pengine/send),          http_pengine_send,
 2140                [ time_limit(infinite), spawn([]) ]). 2141:- http_handler(root(pengine/pull_response), http_pengine_pull_response,
 2142                [ time_limit(infinite), spawn([]) ]). 2143:- http_handler(root(pengine/abort),         http_pengine_abort,         []). 2144:- http_handler(root(pengine/detach),        http_pengine_detach,        []). 2145:- http_handler(root(pengine/list),          http_pengine_list,          []). 2146:- http_handler(root(pengine/ping),          http_pengine_ping,          []). 2147:- http_handler(root(pengine/destroy_all),   http_pengine_destroy_all,   []). 2148
 2149:- http_handler(root(pengine/'pengines.js'),
 2150                http_reply_file(library('http/web/js/pengines.js'), []), []). 2151:- http_handler(root(pengine/'plterm.css'),
 2152                http_reply_file(library('http/web/css/plterm.css'), []), []). 2153
 2154
 2155%%  http_pengine_create(+Request)
 2156%
 2157%   HTTP POST handler  for  =/pengine/create=.   This  API  accepts  the
 2158%   pengine  creation  parameters  both  as  =application/json=  and  as
 2159%   =www-form-encoded=.  Accepted parameters:
 2160%
 2161%     | **Parameter** | **Default**       | **Comment**                |
 2162%     |---------------|-------------------|----------------------------|
 2163%     | format        | `prolog`          | Output format              |
 2164%     | application   | `pengine_sandbox` | Pengine application        |
 2165%     | chunk         | 1                 | Chunk-size for results     |
 2166%     | collate       | 0 (off)           | Join output events         |
 2167%     | solutions     | chunked           | If `all`, emit all results |
 2168%     | ask           | -                 | The query                  |
 2169%     | template      | -                 | Output template            |
 2170%     | src_text      | ""                | Program                    |
 2171%     | src_url       | -                 | Program to download        |
 2172%     | disposition   | -                 | Download location          |
 2173%
 2174%     Note that solutions=all internally  uses   chunking  to obtain the
 2175%     results from the pengine, but the results are combined in a single
 2176%     HTTP reply. This is currently only  implemented by the CSV backend
 2177%     that is part of SWISH for   downloading unbounded result sets with
 2178%     limited memory resources.
 2179%
 2180%     Using  `chunk=false`  simulates  the   _recursive  toplevel_.  See
 2181%     pengine_ask/3.
 2182
 2183http_pengine_create(Request) :-
 2184    reply_options(Request, [post]),
 2185    !.
 2186http_pengine_create(Request) :-
 2187    memberchk(content_type(CT), Request),
 2188    sub_atom(CT, 0, _, _, 'application/json'),
 2189    !,
 2190    http_read_json_dict(Request, Dict),
 2191    dict_atom_option(format, Dict, Format, prolog),
 2192    dict_atom_option(application, Dict, Application, pengine_sandbox),
 2193    http_pengine_create(Request, Application, Format, Dict).
 2194http_pengine_create(Request) :-
 2195    Optional = [optional(true)],
 2196    OptString = [string|Optional],
 2197    Form = [ format(Format, [default(prolog)]),
 2198             application(Application, [default(pengine_sandbox)]),
 2199             chunk(_, [nonneg;oneof([false]), default(1)]),
 2200             collate(_, [number, default(0)]),
 2201             solutions(_, [oneof([all,chunked]), default(chunked)]),
 2202             ask(_, OptString),
 2203             template(_, OptString),
 2204             src_text(_, OptString),
 2205             disposition(_, OptString),
 2206             src_url(_, Optional)
 2207           ],
 2208    http_parameters(Request, Form),
 2209    form_dict(Form, Dict),
 2210    http_pengine_create(Request, Application, Format, Dict).
 2211
 2212dict_atom_option(Key, Dict, Atom, Default) :-
 2213    (   get_dict(Key, Dict, String)
 2214    ->  atom_string(Atom, String)
 2215    ;   Atom = Default
 2216    ).
 2217
 2218form_dict(Form, Dict) :-
 2219    form_values(Form, Pairs),
 2220    dict_pairs(Dict, _, Pairs).
 2221
 2222form_values([], []).
 2223form_values([H|T], Pairs) :-
 2224    arg(1, H, Value),
 2225    nonvar(Value),
 2226    !,
 2227    functor(H, Name, _),
 2228    Pairs = [Name-Value|PairsT],
 2229    form_values(T, PairsT).
 2230form_values([_|T], Pairs) :-
 2231    form_values(T, Pairs).
 2232
 2233%!  http_pengine_create(+Request, +Application, +Format, +OptionsDict)
 2234
 2235
 2236http_pengine_create(Request, Application, Format, Dict) :-
 2237    current_application(Application),
 2238    !,
 2239    allowed(Request, Application),
 2240    authenticate(Request, Application, UserOptions),
 2241    dict_to_options(Dict, Application, CreateOptions0),
 2242    append(UserOptions, CreateOptions0, CreateOptions),
 2243    pengine_uuid(Pengine),
 2244    message_queue_create(Queue, [max_size(25)]),
 2245    setting(Application:time_limit, TimeLimit),
 2246    get_time(Now),
 2247    asserta(pengine_queue(Pengine, Queue, TimeLimit, Now)),
 2248    broadcast(pengine(create(Pengine, Application, CreateOptions))),
 2249    create(Queue, Pengine, CreateOptions, http, Application),
 2250    create_wait_and_output_result(Pengine, Queue, Format,
 2251                                  TimeLimit, Dict),
 2252    gc_abandoned_queues.
 2253http_pengine_create(_Request, Application, Format, _Dict) :-
 2254    Error = existence_error(pengine_application, Application),
 2255    pengine_uuid(ID),
 2256    output_result(ID, Format, error(ID, error(Error, _))).
 2257
 2258
 2259dict_to_options(Dict, Application, CreateOptions) :-
 2260    dict_pairs(Dict, _, Pairs),
 2261    pairs_create_options(Pairs, Application, CreateOptions).
 2262
 2263pairs_create_options([], _, []) :- !.
 2264pairs_create_options([N-V0|T0], App, [Opt|T]) :-
 2265    Opt =.. [N,V],
 2266    pengine_create_option(Opt), N \== user,
 2267    !,
 2268    (   create_option_type(Opt, atom)
 2269    ->  atom_string(V, V0)               % term creation must be done if
 2270    ;   V = V0                           % we created the source and know
 2271    ),                                   % the operators.
 2272    pairs_create_options(T0, App, T).
 2273pairs_create_options([_|T0], App, T) :-
 2274    pairs_create_options(T0, App, T).
 2275
 2276%!  wait_and_output_result(+Pengine, +Queue,
 2277%!                         +Format, +TimeLimit, +Collate) is det.
 2278%
 2279%   Wait for the Pengine's Queue and if  there is a message, send it
 2280%   to the requester using  output_result/1.   If  Pengine  does not
 2281%   answer within the time specified   by  the setting =time_limit=,
 2282%   Pengine is aborted and the  result is error(time_limit_exceeded,
 2283%   _).
 2284
 2285wait_and_output_result(Pengine, Queue, Format, TimeLimit, Collate0) :-
 2286    Collate is min(Collate0, TimeLimit/10),
 2287    get_time(Epoch),
 2288    (   catch(thread_get_message(Queue, pengine_event(_, Event),
 2289                                 [ timeout(TimeLimit)
 2290                                 ]),
 2291              Error, true)
 2292    ->  (   var(Error)
 2293        ->  debug(pengine(wait), 'Got ~q from ~q', [Event, Queue]),
 2294            (   collating_event(Collate, Event)
 2295            ->  Deadline is Epoch+TimeLimit,
 2296                collect_events(Pengine, Collate, Queue, Deadline, 100, More),
 2297                Events = [Event|More],
 2298                ignore(destroy_queue_from_http(Pengine, Events, Queue)),
 2299                protect_pengine(Pengine, output_result(Pengine, Format, Events))
 2300            ;   ignore(destroy_queue_from_http(Pengine, Event, Queue)),
 2301                protect_pengine(Pengine, output_result(Pengine, Format, Event))
 2302            )
 2303        ;   output_result(Pengine, Format, died(Pengine))
 2304        )
 2305    ;   time_limit_exceeded(Pengine, Format)
 2306    ).
 2307
 2308%!  collect_events(+Pengine, +CollateTime, +Queue, +Deadline, +Max, -Events)
 2309%
 2310%   Collect more events as long as they   are not separated by more than
 2311%   CollateTime seconds and collect at most Max.
 2312
 2313collect_events(_Pengine, _Collate, _Queue, _Deadline, 0, []) :-
 2314    !.
 2315collect_events(Pengine, Collate, Queue, Deadline, Max, Events) :-
 2316    debug(pengine(wait), 'Waiting to collate events', []),
 2317    (   catch(thread_get_message(Queue, pengine_event(_, Event),
 2318                                 [ timeout(Collate)
 2319                                 ]),
 2320              Error, true)
 2321    ->  (   var(Error)
 2322        ->  debug(pengine(wait), 'Got ~q from ~q', [Event, Queue]),
 2323            Events = [Event|More],
 2324            (   collating_event(Collate, Event)
 2325            ->  Max2 is Max - 1,
 2326                collect_events(Pengine, Collate, Queue, Deadline, Max2, More)
 2327            ;   More = []
 2328            )
 2329        ;   Events = [died(Pengine)]
 2330        )
 2331    ;   get_time(Now),
 2332        Now > Deadline
 2333    ->  time_limit_event(Pengine, TimeLimitEvent),
 2334        Events = [TimeLimitEvent]
 2335    ;   Events = []
 2336    ).
 2337
 2338collating_event(0, _) :-
 2339    !,
 2340    fail.
 2341collating_event(_, output(_,_)).
 2342
 2343%!  create_wait_and_output_result(+Pengine, +Queue, +Format,
 2344%!                                +TimeLimit, +Dict) is det.
 2345%
 2346%   Intercepts  the  `solutions=all'  case    used  for  downloading
 2347%   results. Dict may contain a  `disposition`   key  to  denote the
 2348%   download location.
 2349
 2350create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict) :-
 2351    get_dict(solutions, Dict, all),
 2352    !,
 2353    between(1, infinite, Page),
 2354    (   catch(thread_get_message(Queue, pengine_event(_, Event),
 2355                                 [ timeout(TimeLimit)
 2356                                 ]),
 2357              Error, true)
 2358    ->  (   var(Error)
 2359        ->  debug(pengine(wait), 'Page ~D: got ~q from ~q', [Page, Event, Queue]),
 2360            (   destroy_queue_from_http(Pengine, Event, Queue)
 2361            ->  !,
 2362                protect_pengine(Pengine,
 2363                                output_result_2(Format, page(Page, Event), Dict))
 2364            ;   is_more_event(Event)
 2365            ->  pengine_thread(Pengine, Thread),
 2366                thread_send_message(Thread, pengine_request(next)),
 2367                protect_pengine(Pengine,
 2368                                output_result_2(Format, page(Page, Event), Dict)),
 2369                fail
 2370            ;   !,
 2371                protect_pengine(Pengine,
 2372                                output_result_2(Format, page(Page, Event), Dict))
 2373            )
 2374        ;   !, output_result(Pengine, Format, died(Pengine))
 2375        )
 2376    ;   !, time_limit_exceeded(Pengine, Format)
 2377    ),
 2378    !.
 2379create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict) :-
 2380    wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict.get(collate,0)).
 2381
 2382is_more_event(success(_Id, _Answers, _Projection, _Time, true)).
 2383is_more_event(create(_, Options)) :-
 2384    memberchk(answer(Event), Options),
 2385    is_more_event(Event).
 2386
 2387
 2388
 2389%!  time_limit_exceeded(+Pengine, +Format)
 2390%
 2391%   The Pengine did not reply within its time limit. Send a reply to the
 2392%   client in the requested format and interrupt the Pengine.
 2393%
 2394%   @bug Ideally, if the Pengine has `destroy` set to `false`, we should
 2395%   get the Pengine back to its main   loop.  Unfortunately we only have
 2396%   normal exceptions that may be  caught   by  the  Pengine and `abort`
 2397%   which cannot be caught and thus destroys the Pengine.
 2398
 2399time_limit_exceeded(Pengine, Format) :-
 2400    time_limit_event(Pengine, Event),
 2401    call_cleanup(
 2402        pengine_destroy(Pengine, [force(true)]),
 2403        output_result(Pengine, Format, Event)).
 2404
 2405time_limit_event(Pengine,
 2406                 destroy(Pengine, error(Pengine, time_limit_exceeded))).
 2407
 2408destroy_pengine_after_output(Pengine, Events) :-
 2409    is_list(Events),
 2410    last(Events, Last),
 2411    time_limit_event(Pengine,  Last),
 2412    !,
 2413    catch(ignore(pengine_destroy(Pengine, [force(true)])), error(_,_), true).
 2414destroy_pengine_after_output(_, _).
 2415
 2416
 2417%!  destroy_queue_from_http(+Pengine, +Event, +Queue) is semidet.
 2418%
 2419%   Consider destroying the output queue   for Pengine after sending
 2420%   Event back to the HTTP client. We can destroy the queue if
 2421%
 2422%     - The pengine already died (output_queue/3 is present) and
 2423%       the queue is empty.
 2424%     - This is a final (destroy) event.
 2425%
 2426%   @tbd    If the client did not request all output, the queue will
 2427%           not be destroyed.  We need some timeout and GC for that.
 2428
 2429destroy_queue_from_http(ID, _, Queue) :-
 2430    output_queue(ID, Queue, _),
 2431    !,
 2432    destroy_queue_if_empty(Queue).
 2433destroy_queue_from_http(ID, Event, Queue) :-
 2434    debug(pengine(destroy), 'DESTROY? ~p', [Event]),
 2435    is_destroy_event(Event),
 2436    !,
 2437    message_queue_property(Queue, size(Waiting)),
 2438    debug(pengine(destroy), 'Destroy ~p (waiting ~D)', [Queue, Waiting]),
 2439    with_mutex(pengine, sync_destroy_queue_from_http(ID, Queue)).
 2440
 2441is_destroy_event(destroy(_)).
 2442is_destroy_event(destroy(_,_)).
 2443is_destroy_event(create(_, Options)) :-
 2444    memberchk(answer(Event), Options),
 2445    is_destroy_event(Event).
 2446
 2447destroy_queue_if_empty(Queue) :-
 2448    thread_peek_message(Queue, _),
 2449    !.
 2450destroy_queue_if_empty(Queue) :-
 2451    retractall(output_queue(_, Queue, _)),
 2452    message_queue_destroy(Queue).
 2453
 2454%!  gc_abandoned_queues
 2455%
 2456%   Check whether there are queues  that   have  been abadoned. This
 2457%   happens if the stream contains output events and not all of them
 2458%   are read by the client.
 2459
 2460:- dynamic
 2461    last_gc/1. 2462
 2463gc_abandoned_queues :-
 2464    consider_queue_gc,
 2465    !,
 2466    get_time(Now),
 2467    (   output_queue(_, Queue, Time),
 2468        Now-Time > 15*60,
 2469        retract(output_queue(_, Queue, Time)),
 2470        message_queue_destroy(Queue),
 2471        fail
 2472    ;   retractall(last_gc(_)),
 2473        asserta(last_gc(Now))
 2474    ).
 2475gc_abandoned_queues.
 2476
 2477consider_queue_gc :-
 2478    predicate_property(output_queue(_,_,_), number_of_clauses(N)),
 2479    N > 100,
 2480    (   last_gc(Time),
 2481        get_time(Now),
 2482        Now-Time > 5*60
 2483    ->  true
 2484    ;   \+ last_gc(_)
 2485    ).
 2486
 2487%!  sync_destroy_queue_from_http(+Pengine, +Queue) is det.
 2488%!  sync_delay_destroy_queue(+Pengine, +Queue) is det.
 2489%
 2490%   Handle destruction of the message queue connecting the HTTP side
 2491%   to the pengine. We cannot delete the queue when the pengine dies
 2492%   because the queue may contain output  events. Termination of the
 2493%   pengine and finishing the  HTTP  exchange   may  happen  in both
 2494%   orders. This means we need handle this using synchronization.
 2495%
 2496%     * sync_destroy_queue_from_pengine(+Pengine, +Queue)
 2497%     Called (indirectly) from pengine_done/1 if the pengine's
 2498%     thread dies.
 2499%     * sync_destroy_queue_from_http(+Pengine, +Queue)
 2500%     Called from destroy_queue/3, from wait_and_output_result/5,
 2501%     i.e., from the HTTP side.
 2502
 2503:- dynamic output_queue_destroyed/1. 2504
 2505sync_destroy_queue_from_http(ID, Queue) :-
 2506    (   output_queue(ID, Queue, _)
 2507    ->  destroy_queue_if_empty(Queue)
 2508    ;   thread_peek_message(Queue, pengine_event(_, output(_,_)))
 2509    ->  debug(pengine(destroy), 'Delay destruction of ~p because of output',
 2510              [Queue]),
 2511        get_time(Now),
 2512        asserta(output_queue(ID, Queue, Now))
 2513    ;   message_queue_destroy(Queue),
 2514        asserta(output_queue_destroyed(Queue))
 2515    ).
 2516
 2517%!  sync_destroy_queue_from_pengine(+Pengine, +Queue)
 2518%
 2519%   Called  from  pengine_unregister/1  when    the  pengine  thread
 2520%   terminates. It is called while the mutex `pengine` held.
 2521
 2522sync_destroy_queue_from_pengine(ID, Queue) :-
 2523    (   retract(output_queue_destroyed(Queue))
 2524    ->  true
 2525    ;   get_time(Now),
 2526        asserta(output_queue(ID, Queue, Now))
 2527    ),
 2528    retractall(pengine_queue(ID, Queue, _, _)).
 2529
 2530
 2531http_pengine_send(Request) :-
 2532    reply_options(Request, [get,post]),
 2533    !.
 2534http_pengine_send(Request) :-
 2535    http_parameters(Request,
 2536                    [ id(ID, [ type(atom) ]),
 2537                      event(EventString, [optional(true)]),
 2538                      collate(Collate, [number, default(0)]),
 2539                      format(Format, [default(prolog)])
 2540                    ]),
 2541    catch(read_event(ID, Request, Format, EventString, Event),
 2542          Error,
 2543          true),
 2544    (   var(Error)
 2545    ->  debug(pengine(event), 'HTTP send: ~p', [Event]),
 2546        (   pengine_thread(ID, Thread)
 2547        ->  pengine_queue(ID, Queue, TimeLimit, _),
 2548            random_delay,
 2549            broadcast(pengine(send(ID, Event))),
 2550            thread_send_message(Thread, pengine_request(Event)),
 2551            wait_and_output_result(ID, Queue, Format, TimeLimit, Collate)
 2552        ;   atom(ID)
 2553        ->  pengine_died(Format, ID)
 2554        ;   http_404([], Request)
 2555        )
 2556    ;   Error = error(existence_error(pengine, ID), _)
 2557    ->  pengine_died(Format, ID)
 2558    ;   output_result(ID, Format, error(ID, Error))
 2559    ).
 2560
 2561pengine_died(Format, Pengine) :-
 2562    output_result(Pengine, Format,
 2563                  error(Pengine, error(existence_error(pengine, Pengine),_))).
 2564
 2565
 2566%!  read_event(+Pengine, +Request, +Format, +EventString, -Event) is det
 2567%
 2568%   Read an event on behalve of Pengine.  Note that the pengine's module
 2569%   should not be  deleted  while  we   are  reading  using  its  syntax
 2570%   (module). This is ensured using the `pengine_done` mutex.
 2571%
 2572%   @see pengine_done/0.
 2573
 2574read_event(Pengine, Request, Format, EventString, Event) :-
 2575    protect_pengine(
 2576        Pengine,
 2577        ( get_pengine_module(Pengine, Module),
 2578          read_event_2(Request, EventString, Module, Event0, Bindings)
 2579        )),
 2580    !,
 2581    fix_bindings(Format, Event0, Bindings, Event).
 2582read_event(Pengine, Request, _Format, _EventString, _Event) :-
 2583    debug(pengine(event), 'Pengine ~q vanished', [Pengine]),
 2584    discard_post_data(Request),
 2585    existence_error(pengine, Pengine).
 2586
 2587
 2588%%  read_event_(+Request, +EventString, +Module, -Event, -Bindings)
 2589%
 2590%   Read the sent event. The event is a   Prolog  term that is either in
 2591%   the =event= parameter or as a posted document.
 2592
 2593read_event_2(_Request, EventString, Module, Event, Bindings) :-
 2594    nonvar(EventString),
 2595    !,
 2596    term_string(Event, EventString,
 2597                [ variable_names(Bindings),
 2598                  module(Module)
 2599                ]).
 2600read_event_2(Request, _EventString, Module, Event, Bindings) :-
 2601    option(method(post), Request),
 2602    http_read_data(Request,     Event,
 2603                   [ content_type('application/x-prolog'),
 2604                     module(Module),
 2605                     variable_names(Bindings)
 2606                   ]).
 2607
 2608%%  discard_post_data(+Request) is det.
 2609%
 2610%   If this is a POST request, discard the posted data.
 2611
 2612discard_post_data(Request) :-
 2613    option(method(post), Request),
 2614    !,
 2615    setup_call_cleanup(
 2616        open_null_stream(NULL),
 2617        http_read_data(Request, _, [to(stream(NULL))]),
 2618        close(NULL)).
 2619discard_post_data(_).
 2620
 2621%!  fix_bindings(+Format, +EventIn, +Bindings, -Event) is det.
 2622%
 2623%   Generate the template for json(-s) Format  from the variables in
 2624%   the asked Goal. Variables starting  with an underscore, followed
 2625%   by an capital letter are ignored from the template.
 2626
 2627fix_bindings(Format,
 2628             ask(Goal, Options0), Bindings,
 2629             ask(Goal, NewOptions)) :-
 2630    json_lang(Format),
 2631    !,
 2632    exclude(anon, Bindings, NamedBindings),
 2633    template(NamedBindings, Template, Options0, Options1),
 2634    select_option(chunk(Paging), Options1, Options2, 1),
 2635    NewOptions = [ template(Template),
 2636                   chunk(Paging),
 2637                   bindings(NamedBindings)
 2638                 | Options2
 2639                 ].
 2640fix_bindings(_, Command, _, Command).
 2641
 2642template(_, Template, Options0, Options) :-
 2643    select_option(template(Template), Options0, Options),
 2644    !.
 2645template(Bindings, Template, Options, Options) :-
 2646    dict_create(Template, swish_default_template, Bindings).
 2647
 2648anon(Name=_) :-
 2649    sub_atom(Name, 0, _, _, '_'),
 2650    sub_atom(Name, 1, 1, _, Next),
 2651    char_type(Next, prolog_var_start).
 2652
 2653var_name(Name=_, Name).
 2654
 2655
 2656%!  json_lang(+Format) is semidet.
 2657%
 2658%   True if Format is a JSON variation.
 2659
 2660json_lang(json) :- !.
 2661json_lang(Format) :-
 2662    sub_atom(Format, 0, _, _, 'json-').
 2663
 2664%!  http_pengine_pull_response(+Request)
 2665%
 2666%   HTTP handler for /pengine/pull_response.  Pulls possible pending
 2667%   messages from the pengine.
 2668
 2669http_pengine_pull_response(Request) :-
 2670    reply_options(Request, [get]),
 2671    !.
 2672http_pengine_pull_response(Request) :-
 2673    http_parameters(Request,
 2674            [   id(ID, []),
 2675                format(Format, [default(prolog)]),
 2676                collate(Collate, [number, default(0)])
 2677            ]),
 2678    reattach(ID),
 2679    (   (   pengine_queue(ID, Queue, TimeLimit, _)
 2680        ->  true
 2681        ;   output_queue(ID, Queue, _),
 2682            TimeLimit = 0
 2683        )
 2684    ->  wait_and_output_result(ID, Queue, Format, TimeLimit, Collate)
 2685    ;   http_404([], Request)
 2686    ).
 2687
 2688%!  http_pengine_abort(+Request)
 2689%
 2690%   HTTP handler for /pengine/abort. Note that  abort may be sent at
 2691%   any time and the reply may  be   handled  by a pull_response. In
 2692%   that case, our  pengine  has  already   died  before  we  get to
 2693%   wait_and_output_result/5.
 2694
 2695http_pengine_abort(Request) :-
 2696    reply_options(Request, [get,post]),
 2697    !.
 2698http_pengine_abort(Request) :-
 2699    http_parameters(Request,
 2700            [   id(ID, [])
 2701            ]),
 2702    (   pengine_thread(ID, _Thread)
 2703    ->  broadcast(pengine(abort(ID))),
 2704        abort_pending_output(ID),
 2705        pengine_abort(ID),
 2706        reply_json_dict(true)
 2707    ;   http_404([], Request)
 2708    ).
 2709
 2710%!  http_pengine_detach(+Request)
 2711%
 2712%   Detach a Pengine while keeping it running.  This has the following
 2713%   consequences:
 2714%
 2715%     - `/destroy_all` including the id of this pengine is ignored.
 2716%     - Output from the pengine is stored in the queue without
 2717%       waiting for the queue to drain.
 2718%     - The Pengine becomes available through `/list`
 2719
 2720http_pengine_detach(Request) :-
 2721    reply_options(Request, [post]),
 2722    !.
 2723http_pengine_detach(Request) :-
 2724    http_parameters(Request,
 2725                    [ id(ID, [])
 2726                    ]),
 2727    http_read_json_dict(Request, ClientData),
 2728    (   pengine_property(ID, application(Application)),
 2729        allowed(Request, Application),
 2730        authenticate(Request, Application, _UserOptions)
 2731    ->  broadcast(pengine(detach(ID))),
 2732        get_time(Now),
 2733        assertz(pengine_detached(ID, ClientData.put(time, Now))),
 2734        pengine_queue(ID, Queue, _TimeLimit, _Now),
 2735        message_queue_set(Queue, max_size(1000)),
 2736        pengine_reply(Queue, detached(ID)),
 2737        reply_json_dict(true)
 2738    ;   http_404([], Request)
 2739    ).
 2740
 2741reattach(ID) :-
 2742    (   retract(pengine_detached(ID, _Data)),
 2743        pengine_queue(ID, Queue, _TimeLimit, _Now)
 2744    ->  message_queue_set(Queue, max_size(25))
 2745    ;   true
 2746    ).
 2747
 2748
 2749%!  http_pengine_destroy_all(+Request)
 2750%
 2751%   Destroy a list of pengines. Normally   called  by pengines.js if the
 2752%   browser window is closed.
 2753
 2754http_pengine_destroy_all(Request) :-
 2755    reply_options(Request, [get,post]),
 2756    !.
 2757http_pengine_destroy_all(Request) :-
 2758    http_parameters(Request,
 2759                    [ ids(IDsAtom, [])
 2760                    ]),
 2761    atomic_list_concat(IDs, ',', IDsAtom),
 2762    forall(( member(ID, IDs),
 2763             \+ pengine_detached(ID, _)
 2764           ),
 2765           pengine_destroy(ID, [force(true)])),
 2766    reply_json_dict("ok").
 2767
 2768%!  http_pengine_ping(+Request)
 2769%
 2770%   HTTP handler for /pengine/ping.  If   the  requested  Pengine is
 2771%   alive and event status(Pengine, Stats) is created, where `Stats`
 2772%   is the return of thread_statistics/2.
 2773
 2774http_pengine_ping(Request) :-
 2775    reply_options(Request, [get]),
 2776    !.
 2777http_pengine_ping(Request) :-
 2778    http_parameters(Request,
 2779                    [ id(Pengine, []),
 2780                      format(Format, [default(prolog)])
 2781                    ]),
 2782    (   pengine_thread(Pengine, Thread),
 2783        Error = error(_,_),
 2784        catch(thread_statistics(Thread, Stats), Error, fail)
 2785    ->  output_result(Pengine, Format, ping(Pengine, Stats))
 2786    ;   output_result(Pengine, Format, died(Pengine))
 2787    ).
 2788
 2789%!  http_pengine_list(+Request)
 2790%
 2791%   HTTP  handler  for  `/pengine/list`,   providing  information  about
 2792%   running Pengines.
 2793%
 2794%   @tbd Only list detached Pengines associated to the logged in user.
 2795
 2796http_pengine_list(Request) :-
 2797    reply_options(Request, [get]),
 2798    !.
 2799http_pengine_list(Request) :-
 2800    http_parameters(Request,
 2801                    [ status(Status, [default(detached), oneof([detached])]),
 2802                      application(Application, [default(pengine_sandbox)])
 2803                    ]),
 2804    allowed(Request, Application),
 2805    authenticate(Request, Application, _UserOptions),
 2806    findall(Term, listed_pengine(Application, Status, Term), Terms),
 2807    reply_json_dict(json{pengines: Terms}).
 2808
 2809listed_pengine(Application, detached, State) :-
 2810    State = pengine{id:Id,
 2811                    detached:Time,
 2812                    queued:Queued,
 2813                    stats:Stats},
 2814
 2815    pengine_property(Id, application(Application)),
 2816    pengine_property(Id, detached(Time)),
 2817    pengine_queue(Id, Queue, _TimeLimit, _Now),
 2818    message_queue_property(Queue, size(Queued)),
 2819    (   pengine_thread(Id, Thread),
 2820        catch(thread_statistics(Thread, Stats), _, fail)
 2821    ->  true
 2822    ;   Stats = thread{status:died}
 2823    ).
 2824
 2825
 2826%!  output_result(+Pengine, +Format, +EventTerm) is det.
 2827%!  output_result(+Pengine, +Format, +EventTerm, +OptionsDict) is det.
 2828%
 2829%   Formulate an HTTP response from a pengine event term. Format is
 2830%   one of =prolog=, =json= or =json-s=.
 2831%
 2832%   @arg EventTerm is either a single event or a list of events.
 2833
 2834:- dynamic
 2835    pengine_replying/2.             % +Pengine, +Thread
 2836
 2837output_result(Pengine, Format, Event) :-
 2838    thread_self(Thread),
 2839    cors_enable,            % contingent on http:cors setting
 2840    disable_client_cache,
 2841    setup_call_cleanup(
 2842        asserta(pengine_replying(Pengine, Thread), Ref),
 2843        catch(output_result_2(Format, Event, _{}),
 2844              pengine_abort_output,
 2845              true),
 2846        erase(Ref)),
 2847    destroy_pengine_after_output(Pengine, Event).
 2848
 2849output_result_2(Lang, Event, Dict) :-
 2850    write_result(Lang, Event, Dict),
 2851    !.
 2852output_result_2(prolog, Event, _) :-
 2853    !,
 2854    format('Content-type: text/x-prolog; charset=UTF-8~n~n'),
 2855    write_term(Event,
 2856               [ quoted(true),
 2857                 ignore_ops(true),
 2858                 fullstop(true),
 2859                 blobs(portray),
 2860                 portray_goal(portray_blob),
 2861                 nl(true)
 2862               ]).
 2863output_result_2(Lang, Event, _) :-
 2864    json_lang(Lang),
 2865    !,
 2866    (   event_term_to_json_data(Event, JSON, Lang)
 2867    ->  reply_json_dict(JSON)
 2868    ;   assertion(event_term_to_json_data(Event, _, Lang))
 2869    ).
 2870output_result_2(Lang, _Event, _) :-    % FIXME: allow for non-JSON format
 2871    domain_error(pengine_format, Lang).
 2872
 2873%!  portray_blob(+Blob, +Options) is det.
 2874%
 2875%   Portray non-text blobs that may  appear   in  output  terms. Not
 2876%   really sure about that. Basically such  terms need to be avoided
 2877%   as they are meaningless outside the process. The generated error
 2878%   is hard to debug though, so now we send them as `'$BLOB'(Type)`.
 2879%   Future versions may include more info, depending on `Type`.
 2880
 2881:- public portray_blob/2.               % called from write-term
 2882portray_blob(Blob, _Options) :-
 2883    blob(Blob, Type),
 2884    writeq('$BLOB'(Type)).
 2885
 2886%!  abort_pending_output(+Pengine) is det.
 2887%
 2888%   If we get an abort, it is possible that output is being produced
 2889%   for the client.  This predicate aborts these threads.
 2890
 2891abort_pending_output(Pengine) :-
 2892    forall(pengine_replying(Pengine, Thread),
 2893           abort_output_thread(Thread)).
 2894
 2895abort_output_thread(Thread) :-
 2896    catch(thread_signal(Thread, throw(pengine_abort_output)),
 2897          error(existence_error(thread, _), _),
 2898          true).
 2899
 2900%!  write_result(+Lang, +Event, +Dict) is semidet.
 2901%
 2902%   Hook that allows for different output formats. The core Pengines
 2903%   library supports `prolog` and various   JSON  dialects. The hook
 2904%   event_to_json/3 can be used to refine   the  JSON dialects. This
 2905%   hook must be used if  a   completely  different output format is
 2906%   desired.
 2907
 2908%!  disable_client_cache
 2909%
 2910%   Make sure the client will not cache our page.
 2911%
 2912%   @see http://stackoverflow.com/questions/49547/making-sure-a-web-page-is-not-cached-across-all-browsers
 2913
 2914disable_client_cache :-
 2915    format('Cache-Control: no-cache, no-store, must-revalidate\r\n\c
 2916            Pragma: no-cache\r\n\c
 2917            Expires: 0\r\n').
 2918
 2919event_term_to_json_data(Events, JSON, Lang) :-
 2920    is_list(Events),
 2921    !,
 2922    events_to_json_data(Events, JSON, Lang).
 2923event_term_to_json_data(Event, JSON, Lang) :-
 2924    event_to_json(Event, JSON, Lang),
 2925    !.
 2926event_term_to_json_data(success(ID, Bindings0, Projection, Time, More),
 2927                        json{event:success, id:ID, time:Time,
 2928                             data:Bindings, more:More, projection:Projection},
 2929                        json) :-
 2930    !,
 2931    term_to_json(Bindings0, Bindings).
 2932event_term_to_json_data(destroy(ID, Event),
 2933                        json{event:destroy, id:ID, data:JSON},
 2934                        Style) :-
 2935    !,
 2936    event_term_to_json_data(Event, JSON, Style).
 2937event_term_to_json_data(create(ID, Features0), JSON, Style) :-
 2938    !,
 2939    (   select(answer(First0), Features0, Features1)
 2940    ->  event_term_to_json_data(First0, First, Style),
 2941        Features = [answer(First)|Features1]
 2942    ;   Features = Features0
 2943    ),
 2944    dict_create(JSON, json, [event(create), id(ID)|Features]).
 2945event_term_to_json_data(destroy(ID, Event),
 2946                        json{event:destroy, id:ID, data:JSON}, Style) :-
 2947    !,
 2948    event_term_to_json_data(Event, JSON, Style).
 2949event_term_to_json_data(error(ID, ErrorTerm), Error, _Style) :-
 2950    !,
 2951    Error0 = json{event:error, id:ID, data:Message},
 2952    add_error_details(ErrorTerm, Error0, Error),
 2953    message_to_string(ErrorTerm, Message).
 2954event_term_to_json_data(failure(ID, Time),
 2955                        json{event:failure, id:ID, time:Time}, _) :-
 2956    !.
 2957event_term_to_json_data(EventTerm, json{event:F, id:ID}, _) :-
 2958    functor(EventTerm, F, 1),
 2959    !,
 2960    arg(1, EventTerm, ID).
 2961event_term_to_json_data(EventTerm, json{event:F, id:ID, data:JSON}, _) :-
 2962    functor(EventTerm, F, 2),
 2963    arg(1, EventTerm, ID),
 2964    arg(2, EventTerm, Data),
 2965    term_to_json(Data, JSON).
 2966
 2967events_to_json_data([], [], _).
 2968events_to_json_data([E|T0], [J|T], Lang) :-
 2969    event_term_to_json_data(E, J, Lang),
 2970    events_to_json_data(T0, T, Lang).
 2971
 2972:- public add_error_details/3. 2973
 2974%%  add_error_details(+Error, +JSON0, -JSON)
 2975%
 2976%   Add format error code and  location   information  to an error. Also
 2977%   used by pengines_io.pl.
 2978
 2979add_error_details(Error, JSON0, JSON) :-
 2980    add_error_code(Error, JSON0, JSON1),
 2981    add_error_location(Error, JSON1, JSON).
 2982
 2983%%  add_error_code(+Error, +JSON0, -JSON) is det.
 2984%
 2985%   Add a =code= field to JSON0 of Error is an ISO error term. The error
 2986%   code is the functor name of  the   formal  part  of the error, e.g.,
 2987%   =syntax_error=,  =type_error=,  etc.   Some    errors   carry   more
 2988%   information:
 2989%
 2990%     - existence_error(Type, Obj)
 2991%     {arg1:Type, arg2:Obj}, where Obj is stringified of it is not
 2992%     atomic.
 2993
 2994add_error_code(error(existence_error(Type, Obj), _), Error0, Error) :-
 2995    atom(Type),
 2996    !,
 2997    to_atomic(Obj, Value),
 2998    Error = Error0.put(_{code:existence_error, arg1:Type, arg2:Value}).
 2999add_error_code(error(Formal, _), Error0, Error) :-
 3000    callable(Formal),
 3001    !,
 3002    functor(Formal, Code, _),
 3003    Error = Error0.put(code, Code).
 3004add_error_code(_, Error, Error).
 3005
 3006% What to do with large integers?
 3007to_atomic(Obj, Atomic) :- atom(Obj),   !, Atomic = Obj.
 3008to_atomic(Obj, Atomic) :- number(Obj), !, Atomic = Obj.
 3009to_atomic(Obj, Atomic) :- string(Obj), !, Atomic = Obj.
 3010to_atomic(Obj, Atomic) :- term_string(Obj, Atomic).
 3011
 3012
 3013%%  add_error_location(+Error, +JSON0, -JSON) is det.
 3014%
 3015%   Add a =location= property if the  error   can  be  associated with a
 3016%   source location. The location is an   object  with properties =file=
 3017%   and =line= and, if available, the character location in the line.
 3018
 3019add_error_location(error(_, file(Path, Line, -1, _CharNo)), Term0, Term) :-
 3020    atom(Path), integer(Line),
 3021    !,
 3022    Term = Term0.put(_{location:_{file:Path, line:Line}}).
 3023add_error_location(error(_, file(Path, Line, Ch, _CharNo)), Term0, Term) :-
 3024    atom(Path), integer(Line), integer(Ch),
 3025    !,
 3026    Term = Term0.put(_{location:_{file:Path, line:Line, ch:Ch}}).
 3027add_error_location(_, Term, Term).
 3028
 3029
 3030%!  event_to_json(+Event, -JSONTerm, +Lang) is semidet.
 3031%
 3032%   Hook that translates a Pengine event  structure into a term suitable
 3033%   for reply_json_dict/1, according to the language specification Lang.
 3034%   This can be used to massage general Prolog terms, notably associated
 3035%   with `success(ID, Bindings, Projection, Time, More)` and `output(ID,
 3036%   Term)` into a format suitable for processing at the client side.
 3037
 3038%:- multifile pengines:event_to_json/3.
 3039
 3040
 3041                 /*******************************
 3042                 *        ACCESS CONTROL        *
 3043                 *******************************/
 3044
 3045%!  allowed(+Request, +Application) is det.
 3046%
 3047%   Check whether the peer is allowed to connect.  Returns a
 3048%   =forbidden= header if contact is not allowed.
 3049
 3050allowed(Request, Application) :-
 3051    setting(Application:allow_from, Allow),
 3052    match_peer(Request, Allow),
 3053    setting(Application:deny_from, Deny),
 3054    \+ match_peer(Request, Deny),
 3055    !.
 3056allowed(Request, _Application) :-
 3057    memberchk(request_uri(Here), Request),
 3058    throw(http_reply(forbidden(Here))).
 3059
 3060match_peer(_, Allowed) :-
 3061    memberchk(*, Allowed),
 3062    !.
 3063match_peer(_, []) :- !, fail.
 3064match_peer(Request, Allowed) :-
 3065    http_peer(Request, Peer),
 3066    debug(pengine(allow), 'Peer: ~q, Allow: ~q', [Peer, Allowed]),
 3067    (   memberchk(Peer, Allowed)
 3068    ->  true
 3069    ;   member(Pattern, Allowed),
 3070        match_peer_pattern(Pattern, Peer)
 3071    ).
 3072
 3073match_peer_pattern(Pattern, Peer) :-
 3074    ip_term(Pattern, IP),
 3075    ip_term(Peer, IP),
 3076    !.
 3077
 3078ip_term(Peer, Pattern) :-
 3079    split_string(Peer, ".", "", PartStrings),
 3080    ip_pattern(PartStrings, Pattern).
 3081
 3082ip_pattern([], []).
 3083ip_pattern([*], _) :- !.
 3084ip_pattern([S|T0], [N|T]) :-
 3085    number_string(N, S),
 3086    ip_pattern(T0, T).
 3087
 3088
 3089%%  authenticate(+Request, +Application, -UserOptions:list) is det.
 3090%
 3091%   Call authentication_hook/3, returning either `[user(User)]`, `[]` or
 3092%   an exception.
 3093
 3094authenticate(Request, Application, UserOptions) :-
 3095    authentication_hook(Request, Application, User),
 3096    !,
 3097    must_be(ground, User),
 3098    UserOptions = [user(User)].
 3099authenticate(_, _, []).
 3100
 3101%%  authentication_hook(+Request, +Application, -User) is semidet.
 3102%
 3103%   This hook is called  from  the   =/pengine/create=  HTTP  handler to
 3104%   discover whether the server is accessed   by  an authorized user. It
 3105%   can react in three ways:
 3106%
 3107%     - Succeed, binding User to a ground term.  The authentity of the
 3108%       user is available through pengine_user/1.
 3109%     - Fail.  The =/create= succeeds, but the pengine is not associated
 3110%       with a user.
 3111%     - Throw an exception to prevent creation of the pengine.  Two
 3112%       meaningful exceptions are:
 3113%         - throw(http_reply(authorise(basic(Realm))))
 3114%         Start a normal HTTP login challenge (reply 401)
 3115%         - throw(http_reply(forbidden(Path))))
 3116%         Reject the request using a 403 repply.
 3117%
 3118%   @see http_authenticate/3 can be used to implement this hook using
 3119%        default HTTP authentication data.
 3120
 3121pengine_register_user(Options) :-
 3122    option(user(User), Options),
 3123    !,
 3124    pengine_self(Me),
 3125    asserta(pengine_user(Me, User)).
 3126pengine_register_user(_).
 3127
 3128
 3129%%  pengine_user(-User) is semidet.
 3130%
 3131%   True when the pengine was create by  an HTTP request that authorized
 3132%   User.
 3133%
 3134%   @see authentication_hook/3 can be used to extract authorization from
 3135%        the HTTP header.
 3136
 3137pengine_user(User) :-
 3138    pengine_self(Me),
 3139    pengine_user(Me, User).
 3140
 3141%!  reply_options(+Request, +Methods) is semidet.
 3142%
 3143%   Reply the HTTP OPTIONS request
 3144
 3145reply_options(Request, Allowed) :-
 3146    option(method(options), Request),
 3147    !,
 3148    cors_enable(Request,
 3149                [ methods(Allowed)
 3150                ]),
 3151    format('Content-type: text/plain\r\n'),
 3152    format('~n').                   % empty body
 3153
 3154
 3155                 /*******************************
 3156                 *        COMPILE SOURCE        *
 3157                 *******************************/
 3158
 3159/** pengine_src_text(+SrcText, +Module) is det
 3160
 3161Asserts the clauses defined in SrcText in   the  private database of the
 3162current Pengine. This  predicate  processes   the  `src_text'  option of
 3163pengine_create/1.
 3164*/
 3165
 3166pengine_src_text(Src, Module) :-
 3167    pengine_self(Self),
 3168    format(atom(ID), 'pengine://~w/src', [Self]),
 3169    extra_load_options(Self, Options),
 3170    setup_call_cleanup(
 3171        open_chars_stream(Src, Stream),
 3172        load_files(Module:ID,
 3173                   [ stream(Stream),
 3174                     module(Module),
 3175                     silent(true)
 3176                   | Options
 3177                   ]),
 3178        close(Stream)),
 3179    keep_source(Self, ID, Src).
 3180
 3181system:'#file'(File, _Line) :-
 3182    prolog_load_context(stream, Stream),
 3183    set_stream(Stream, file_name(File)),
 3184    set_stream(Stream, record_position(false)),
 3185    set_stream(Stream, record_position(true)).
 3186
 3187%%   pengine_src_url(+URL, +Module) is det
 3188%
 3189%    Asserts the clauses defined in URL in   the private database of the
 3190%    current Pengine. This predicate processes   the `src_url' option of
 3191%    pengine_create/1.
 3192%
 3193%    @tbd: make a sensible guess at the encoding.
 3194
 3195pengine_src_url(URL, Module) :-
 3196    pengine_self(Self),
 3197    uri_encoded(path, URL, Path),
 3198    format(atom(ID), 'pengine://~w/url/~w', [Self, Path]),
 3199    extra_load_options(Self, Options),
 3200    (   get_pengine_application(Self, Application),
 3201        setting(Application:debug_info, false)
 3202    ->  setup_call_cleanup(
 3203            http_open(URL, Stream, []),
 3204            ( set_stream(Stream, encoding(utf8)),
 3205              load_files(Module:ID,
 3206                         [ stream(Stream),
 3207                           module(Module)
 3208                         | Options
 3209                         ])
 3210            ),
 3211            close(Stream))
 3212    ;   setup_call_cleanup(
 3213            http_open(URL, TempStream, []),
 3214            ( set_stream(TempStream, encoding(utf8)),
 3215              read_string(TempStream, _, Src)
 3216            ),
 3217            close(TempStream)),
 3218        setup_call_cleanup(
 3219            open_chars_stream(Src, Stream),
 3220            load_files(Module:ID,
 3221                       [ stream(Stream),
 3222                         module(Module)
 3223                       | Options
 3224                       ]),
 3225            close(Stream)),
 3226        keep_source(Self, ID, Src)
 3227    ).
 3228
 3229
 3230extra_load_options(Pengine, Options) :-
 3231    pengine_not_sandboxed(Pengine),
 3232    !,
 3233    Options = [].
 3234extra_load_options(_, [sandboxed(true)]).
 3235
 3236
 3237keep_source(Pengine, ID, SrcText) :-
 3238    get_pengine_application(Pengine, Application),
 3239    setting(Application:debug_info, true),
 3240    !,
 3241    to_string(SrcText, SrcString),
 3242    assertz(pengine_data(Pengine, source(ID, SrcString))).
 3243keep_source(_, _, _).
 3244
 3245to_string(String, String) :-
 3246    string(String),
 3247    !.
 3248to_string(Atom, String) :-
 3249    atom_string(Atom, String),
 3250    !.
 3251
 3252		 /*******************************
 3253		 *            SANDBOX		*
 3254		 *******************************/
 3255
 3256:- multifile
 3257    sandbox:safe_primitive/1. 3258
 3259sandbox:safe_primitive(pengines:pengine_input(_, _)).
 3260sandbox:safe_primitive(pengines:pengine_output(_)).
 3261sandbox:safe_primitive(pengines:pengine_debug(_,_)).
 3262
 3263
 3264                 /*******************************
 3265                 *            MESSAGES          *
 3266                 *******************************/
 3267
 3268prolog:error_message(sandbox(time_limit_exceeded, Limit)) -->
 3269    [ 'Could not prove safety of your goal within ~f seconds.'-[Limit], nl,
 3270      'This is normally caused by an insufficiently instantiated'-[], nl,
 3271      'meta-call (e.g., call(Var)) for which it is too expensive to'-[], nl,
 3272      'find all possible instantations of Var.'-[]
 3273    ]