1/* File: canny/redis.pl 2 Author: Roy Ratcliffe 3 Created: Sep 24 2022 4 Purpose: Canny Redis 5 6Copyright (c) 2022, Roy Ratcliffe, Northumberland, United Kingdom 7 8Permission is hereby granted, free of charge, to any person obtaining a 9copy of this software and associated documentation files (the 10"Software"), to deal in the Software without restriction, including 11without limitation the rights to use, copy, modify, merge, publish, 12distribute, sublicense, and/or sell copies of the Software, and to 13permit persons to whom the Software is furnished to do so, subject to 14the following conditions: 15 16 The above copyright notice and this permission notice shall be 17 included in all copies or substantial portions of the Software. 18 19THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 20OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 21MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. 22IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY 23CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, 24TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE 25SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 26 27*/ 28 29:- module(canny_redis, 30 [ redis_last_streams/2, % +Reads,-Streams:list 31 redis_last_streams/3, % +Reads,?Tag,-Streams:dict 32 redis_last_stream_entry/3, % +Streams,-StreamId,-Fields 33 redis_last_stream_entry/4, % +Streams,-StreamId,?Tag,-Fields 34 redis_keys_and_stream_ids/4, % +Streams,?Tag,-Keys,-StreamIds 35 redis_keys_and_stream_ids/3, % +Pairs,-Keys,-StreamIds, 36 redis_stream_read/4, % +Reads,-Key,-StreamId,-Fields 37 redis_stream_read/5, % +Reads,-Key,-StreamId,?Tag,-Fields 38 redis_stream_entry/3, % +Entries,-StreamId,-Fields 39 redis_stream_entry/4, % +Entries,-StreamId,?Tag,-Fields 40 redis_stream_id/1, % ?RedisTimeSeqPair 41 redis_stream_id/2, % ?StreamId,?RedisTimeSeqPair 42 redis_stream_id/3, % ?StreamId,?RedisTime,?Seq 43 redis_time/1, % +RedisTime 44 redis_date_time/3 % +RedisTime,-DateTime,+TimeZone 45 ]). 46:- autoload(library(lists), [member/2]). 47:- autoload(library(redis), [redis_array_dict/3]). 48:- autoload(library(apply), [maplist/3]). 49 50 /******************************* 51 * S t r e a m s * 52 *******************************/
62redis_last_streams(Reads, Streams) :- 63 maplist(redis_last_stream, Reads, Streams). 64 65redis_last_stream([Key, Entries], Key-StreamId) :- 66 redis_last_stream_entry(Entries, StreamId, _). 67 68redis_last_streams(Reads, Tag, Streams) :- 69 redis_last_streams(Reads, Streams0), 70 dict_create(Streams, Tag, Streams0).
79redis_last_stream_entry([[StreamId, Fields]], StreamId, Fields) :- 80 !. 81redis_last_stream_entry([_|Entries], StreamId, Fields) :- 82 redis_last_stream_entry(Entries, StreamId, Fields). 83 84redis_last_stream_entry(Entries, StreamId, Tag, Fields) :- 85 redis_last_stream_entry(Entries, StreamId, Fields0), 86 redis_array_dict(Fields0, Tag, Fields).
104redis_keys_and_stream_ids(Streams, Tag, Keys, StreamIds) :- 105 dict_pairs(Streams, Tag, Pairs), 106 redis_keys_and_stream_ids(Pairs, Keys, StreamIds). 107 108redis_keys_and_stream_ids([], [], []). 109redis_keys_and_stream_ids([Key-StreamId0|T0], [Key|T1], [RedisTime-Seq|T]) :- 110 redis_stream_id(StreamId0, RedisTime, Seq), 111 redis_keys_and_stream_ids(T0, T1, T).
122redis_stream_read(Reads, Key, StreamId, Fields) :- 123 member([Key, Entries], Reads), 124 redis_stream_entry(Entries, StreamId, Fields). 125 126redis_stream_read(Reads, Key, StreamId, Tag, Fields) :- 127 member([Key, Entries], Reads), 128 redis_stream_entry(Entries, StreamId, Tag, Fields).
144redis_stream_entry(Entries, StreamId, Fields) :- 145 member([StreamId0, Fields], Entries), 146 redis_stream_id(StreamId0, StreamId). 147 148redis_stream_entry(Entries, StreamId, Tag, Fields) :- 149 redis_stream_entry(Entries, StreamId, Fields0), 150 redis_array_dict(Fields0, Tag, Fields).
Deliberately validates incoming Redis time and sequence numbers. Both must be integers and both must be zero or more. The predicates fail otherwise. Internally, Redis stores stream identifiers as 128-bit unsigned integers split in half for the time and sequence values, each of 64 bits.
The 3-arity version of the predicate handles extraction of time and sequence integers from arbitrary stream identifiers: text or compound terms, including implied zero-sequence stream identifier with a single non-negative integer representing a millisecond Unix time.
180redis_stream_id(RedisTime-Seq) :- 181 redis_time(RedisTime), 182 integer(Seq), 183 Seq >= 0. 184 185redis_stream_id(StreamId, RedisTime-Seq) :- 186 var(StreamId), 187 !, 188 redis_stream_id(RedisTime-Seq), 189 atomic_list_concat([RedisTime, Seq], -, StreamId). 190redis_stream_id(StreamId, RedisTime-Seq) :- 191 ( atom(StreamId) 192 -> true 193 ; string(StreamId) 194 ), 195 split_string(StreamId, -, '', [RedisTime0, Seq0]), 196 number_string(RedisTime, RedisTime0), 197 number_string(Seq, Seq0), 198 redis_stream_id(RedisTime-Seq). 199 200redis_stream_id(RedisTime-Seq, RedisTime, Seq) :- 201 redis_stream_id(RedisTime-Seq), 202 !. 203redis_stream_id(RedisTime, RedisTime, 0) :- 204 redis_time(RedisTime), 205 !. 206redis_stream_id(StreamId, RedisTime, Seq) :- 207 redis_stream_id(StreamId, RedisTime-Seq).
216redis_time(RedisTime) :-
217 integer(RedisTime),
218 RedisTime >= 0.
224redis_date_time(RedisTime, DateTime, TimeZone) :-
225 Stamp is RedisTime / 1000,
226 stamp_date_time(Stamp, DateTime, TimeZone)