Skip to article frontmatterSkip to article content
Site not loading correctly?

This may be due to an incorrect BASE_URL configuration. See the MyST Documentation for reference.

tr_queue

Persistent message queue with at-least-once semantics, built on timeranger2. Used by the MQTT broker for durable subscriptions.

Source code:

trq_answer()

trq_answer() extracts and returns a new JSON object containing only the metadata from the given message.

json_t *trq_answer(
    json_t *jn_message,  // not owned, Gps message, to get only __MD_TRQ__
    int     result
);

Parameters

KeyTypeDescription
jn_messagejson_t *A JSON object representing the original message. The ownership of this object is not transferred.
resultintAn integer result code that may be included in the returned metadata.

Returns

A new JSON object containing only the metadata extracted from jn_message. The caller assumes ownership of the returned object.

Notes

The function is specifically designed to extract the __MD_TRQ__ metadata field from the input message.


trq_check_backup()

trq_check_backup() performs a backup operation on the queue if necessary.

int trq_check_backup(
    tr_queue_t * trq
);

Parameters

KeyTypeDescription
trqtr_queue_t *The queue instance to check and perform backup if needed.

Returns

Returns 0 on success, or a negative value if an error occurs.

Notes

This function ensures that the queue’s backup mechanism is triggered when required.


trq_check_pending_rowid()

trq_check_pending_rowid() checks the pending status of a message identified by its rowid in the queue.

int trq_check_pending_rowid(
    tr_queue_t * trq,
    uint64_t __t__,
    uint64_t rowid
);

Parameters

KeyTypeDescription
trqtr_queue_t *The queue instance to check.
__t__uint64_tTime value of the message.
rowiduint64_tThe unique row identifier of the message.

Returns

Returns -1 if the rowid does not exist, 1 if the message is pending, and 0 if it is not pending.

Notes

This function provides a low-level check for message status in the queue.


trq_close()

Closes the given tr_queue, releasing associated resources. After calling trq_close(), ensure to invoke tranger2_shutdown() if no other queues are in use.

void trq_close(
    tr_queue_t * trq
);

Parameters

KeyTypeDescription
trqtr_queue_t *The queue instance to be closed.

Returns

This function does not return a value.

Notes

Ensure that trq_close() is called before shutting down the underlying TimeRanger instance with tranger2_shutdown().


trq_get_by_rowid()

trq_get_by_rowid() retrieves a message from the queue iterator using its row ID.

q_msg_t * trq_get_by_rowid(
    tr_queue_t * trq,
    uint64_t rowid
);

Parameters

KeyTypeDescription
trqtr_queue_t *The queue instance from which to retrieve the message.
rowiduint64_tThe row ID of the message to retrieve.

Returns

Returns a q_msg_t * handle to the retrieved message, or NULL if the message is not found.

Notes

The returned message remains owned by the queue and should not be freed manually.


trq_get_metadata()

Retrieves the metadata associated with a given JSON object. The returned JSON object is not owned by the caller.

json_t *trq_get_metadata(
    json_t *kw
);

Parameters

KeyTypeDescription
kwjson_t *The JSON object containing metadata.

Returns

A pointer to a JSON object containing the metadata. The returned JSON object is not owned by the caller and should not be modified or freed.

Notes

The returned JSON object is a reference and should not be altered or deallocated by the caller.


trq_load()

trq_load() loads pending messages from the queue and returns an iterator for traversal.

int trq_load(
    tr_queue_t * trq
);

Parameters

KeyTypeDescription
trqtr_queue_t *The queue instance from which pending messages will be loaded.

Returns

Returns an iterator for traversing the loaded messages.

Notes

Use trq_load_all() to load all messages, including non-pending ones.


trq_load_all()

trq_load_all() loads all messages from the queue within the specified rowid range, optionally filtering by key.

int trq_load_all(
    tr_queue_t * trq,
    int64_t from_rowid,
    int64_t to_rowid
);

Parameters

KeyTypeDescription
trqtr_queue_t *The queue instance from which messages will be loaded.
from_rowidint64_tThe starting rowid for loading messages.
to_rowidint64_tThe ending rowid for loading messages.

Returns

Returns an iterator over the loaded messages or an error code if the operation fails.

Notes

Use trq_load_all() to retrieve messages efficiently within a specific rowid range.


trq_msg_json()

trq_msg_json() retrieves the JSON representation of a queue message. The returned JSON object is not owned by the caller and must not be modified or freed.

json_t *trq_msg_json(
    q_msg_t *msg
);

Parameters

KeyTypeDescription
msgq_msg_t *The queue message whose JSON representation is to be retrieved.

Returns

A pointer to a json_t object representing the message. The returned JSON object is not owned by the caller.

Notes

The returned JSON object must not be modified or freed by the caller.


trq_open()

trq_open() initializes and opens a persistent queue using the specified tranger instance and topic configuration.

tr_queue_t *trq_open(
    json_t *tranger,
    const char *topic_name,
    const char *tkey,
    system_flag2_t system_flag,
    size_t backup_queue_size
);

Parameters

KeyTypeDescription
trangerjson_t *Pointer to the tranger instance managing the queue.
topic_nameconst char *Name of the topic associated with the queue.
tkeyconst char *Time key used for ordering messages in the queue.
system_flagsystem_flag2_tSystem flags controlling queue behavior.
backup_queue_sizesize_tMaximum number of messages to retain in the backup queue.

Returns

Returns a tr_queue_t * handle representing the opened queue, or NULL on failure.

Notes

Ensure that tranger2_startup() is called before invoking trq_open().


trq_set_hard_flag()

trq_set_hard_flag() marks a message with a hard flag, allowing it to be recovered in the next queue open if the flag is used in trq_load().

int trq_set_hard_flag(
    q_msg_t *msg,
    uint16_t hard_mark,
    BOOL set
);

Parameters

KeyTypeDescription
msgq_msg_t *The message to be marked.
hard_markuint16_tThe hard flag to set on the message.
setBOOLIf TRUE, the flag is set; if FALSE, the flag is cleared.

Returns

Returns 0 on success, or a negative value on failure.

Notes

A message must be flagged after being appended to the queue if it needs to be recovered in the next queue open using trq_load().


trq_set_metadata()

trq_set_metadata() sets a metadata key-value pair in the given JSON object.

int trq_set_metadata(
    json_t  *kw,
    const char *key,
    json_t  *jn_value  // owned
);

Parameters

KeyTypeDescription
kwjson_t *The JSON object where the metadata will be stored.
keyconst char *The key under which the metadata value will be stored.
jn_valuejson_t *The JSON value to be stored as metadata. Ownership is transferred.

Returns

Returns 0 on success, or a negative value on failure.

Notes

The caller must ensure that kw is a valid JSON object before calling trq_set_metadata().


trq_set_soft_mark()

trq_set_soft_mark() sets or clears a soft mark on a given queue message.

uint64_t trq_set_soft_mark(
    q_msg_t *msg,
    uint64_t soft_mark,
    BOOL set
);

Parameters

KeyTypeDescription
msgq_msg_t *The queue message on which the soft mark is to be set or cleared.
soft_markuint64_tThe soft mark value to be applied to the message.
setBOOLIf TRUE, the soft mark is set; if FALSE, the soft mark is cleared.

Returns

Returns the updated soft mark value of the message.

Notes

Soft marks are used for temporary message state tracking and do not persist across queue restarts.


trq_unload_msg()

The trq_unload_msg() function unloads a message from the queue iterator, removing it from memory.

void trq_unload_msg(
    q_msg_t *msg,
    int32_t result
);

Parameters

KeyTypeDescription
msgq_msg_t *The message to be unloaded from the queue iterator.
resultint32_tThe result code associated with the message unloading operation.

Returns

This function does not return a value.

Notes

Use trq_unload_msg() to free a message from the queue iterator after processing it.


trq_append2()

Appends a new message to the queue with an explicit timestamp and optional user flags.

q_msg_t * trq_append2(
    tr_queue_t * trq,
    json_int_t t,
    json_t *kw,
    uint16_t user_flag
);

Parameters

KeyTypeDescription
trqtr_queue_t *The queue instance to append the message to.
tjson_int_tTimestamp for the message. Pass 0 to use the current time.
kwjson_t *The JSON payload of the message. Ownership is transferred to the queue.
user_flaguint16_tOptional user-defined flags to associate with the message.

Returns

Returns a q_msg_t * handle to the appended message, or NULL on failure.


trq_load_all_by_time()

Loads all messages from the queue within a specified time range.

int trq_load_all_by_time(
    tr_queue_t * trq,
    int64_t from_t,
    int64_t to_t
);

Parameters

KeyTypeDescription
trqtr_queue_t *The queue instance from which messages will be loaded.
from_tint64_tThe starting timestamp for the time range.
to_tint64_tThe ending timestamp for the time range.

Returns

Returns 0 on success, or a negative value on error.


trq_msg_md()

Retrieves the metadata record associated with a queue message.

md2_record_ex_t *trq_msg_md(
    q_msg_t *msg
);

Parameters

KeyTypeDescription
msgq_msg_t *The queue message whose metadata is to be retrieved.

Returns

Returns a md2_record_ex_t * pointer to the internal metadata record.