[openamq-dev] amq_content_basic... destroy vs unlink

Martin Sustrik sustrik at imatix.com
Fri Oct 26 08:39:19 CEST 2007


Hello Dmitri,

Pieter outlined some basic reference-counting guidelines, however, I had 
a look at your example and the problem you have is a bit different.

Basically, each message (content) consists of "properties", "headers" 
and "body". "Properties" are things like "message_id", "reply_to" etc. 
"Headers" is a field table (generic untyped container) you can use to 
send your data in a structured way that can be used for content-based 
routing. "Body" is opaque binary data. Every message ("content") 
consists of these three elements.

Now, let's suppose you are not interested in properties and headers and 
want to use opaque body only. Even in that case, message creation is 
still a two-step process. First you create the message object itself 
(content), then you attach a binary BLOB to it. Note that lifetimes of 
content and BLOB are not necessarily the same!

Firstly, you create a content (message) object. This is a refcounted 
object and all the rules Pieter outlined apply. So you do 
amq_content_basic_new to create it and amq_content_basic_unlink to 
release your reference after publishing the message.

Secondly, the BLOB is a raw data chunk. You allocate it using malloc, 
icl_mem_alloc or any other allocation mechanism available - it may be 
even a static buffer. However, openAMQ client uses zero-copy techniques, 
so when you send the message, it doesn't make a copy of the BLOB, rather 
it remembers the pointer to the data. Now it's clear that you cannot 
deallocate the BLOB yourself: openAMQ client can still hold a pointer to 
it and use it vigorously. That's why you have to supply the BLOB 
deallocation function in amq_content_basic_set_body. That way, openAMQ 
client uses the BLOB and once it's done, it'll use supplied deallocation 
function to get rid of it.

Here's an example of sending static buffer:

//  Allocation of the message itself
amq_basic_content_t content = amq_content_basic_new ();

//  Allocation of the BLOB to send
static char *body = "This body is always the same";

// Attach the BLOB to the message (note NULL deallocation function
// - static data don't have to be deallocated)
amq_content_basic_set_body(content, body, strlen (body), NULL);

//  Publishing the message
amq_client_session_basic_publish (session, content, 0, "amq.direct", 
"rk", FALSE, FALSE);

//  Release the reference to the content object
amq_content_basic_unlink (&content);

Now, how to allocate the blob via malloc:

//  Allocation of the message itself
amq_basic_content_t content = amq_content_basic_new ();

//  Allocation of the BLOB to send
char *body = malloc (10);
memset (body, 'A', 10);

// Attach the BLOB to the message (note 'free' function
// supplied for BLOB deallocation)
amq_content_basic_set_body(content, body, 10, free);

//  Publishing the message
amq_client_session_basic_publish (session, content, 0, "amq.direct", 
"rk", FALSE, FALSE);

//  Release the reference to the content object
amq_content_basic_unlink (&content);

By replacing 'malloc' and 'free' by 'icl_mem_alloc' and 'icl_mem_free' 
you can use openamq's native memory allocation mechanism. By supplying 
your own functions, you can use any memory allocation mechanism you choose.

Hope this helps.

Martin

Dmitri Tsyganov wrote:
> Hello,
> 
> I have an application that publishes content_basic Open AMQ messages in
> a very rapid order. (who does not :))
> 
> The external level of my application is responsible for creating actual
> content (data). 
> 
> I have noticed some discrepancies with my data and need some
> clarification from the experts. I looked at the code and I have some
> ideas, but I want to make sure that my assumptions are correct.
> 
> According to the documentation I should be using
> amq_content_basic_destroy on the content created with
> amq_content_basic_new.
> 
> Sample code with my comments is attached.
> 
> Here are my questions:
> 
> 1. amq_content_basic_set_body does not copy content, only content
> pointer (although it duplicates exchange and routing key strings).
> amq_client_session_basic_publish is an asynchronous call. So it is not
> safe to destroy message body right after the call, right? Is is safe to
> call amq_content_basic_destroy? 
> 
> 2. Should I be calling unlink instead? Will it free the content memory?
> 
> 3. When I pass "mem free" callback to ..._set_body and then, after
> calling publish, I call amq_content_basic_destroy - I get following
> assertion:
> 
> ipr_bucket_list.c:1203: ipr_bucket_list_first_: Assertion `self' failed.
> 
> It does not assert if I call unlink instead. But... see question #2
> 
> Thank you very much in advance!
> 
> Dmitri
> 
> 
> 
> 
> ------------------------------------------------------------------------
> 
> 
> 
> // External layer provides message content
> const MessageContent* message = pOutbox->newMessage();
> 
> if( message && message->nSize )
> {
>     //  Create new content and publish it
>     amq_content_basic_t *content = amq_content_basic_new ();
> 
>     // allocate temp content buffer (from my cache) 
>     char* cache = content_new(); 
> 
>     if( content && cache )
>     {
>         // copy message content into temp buffer
> 	if(message->nSize <= MC_MAX_CONTENT)
> 	{
> 	    memcpy(cache, message->pContent, message->nSize);
> 	}
> 
> 	// *** was using this,... but I need to know when it is safe to clear content 		
> 	// amq_content_basic_set_body(content, cache, message->nSize, NULL);
> 
> 	// content_destroy is a pointer to an icl_mem_free compatible function
> 	amq_content_basic_set_body(content, cache, message->nSize, content_destroy);
> 
> 	amq_content_basic_set_message_id(content, message->szID);
> 
> 	// publish content 
> 	nError = amq_client_session_basic_publish(m_pSession, // session
> 						  content, // content reference
> 						  0, // access ticket (unused - use 0)
> 						  message->szExchange, // exchange name
> 						  message->szKey, // routing key
> 						  0, // if 1, must be routable
> 						  0); // if 1, must be deliverable
> 
> 	// when use destroy - receive asserts in 
> 	//
> 	// ipr_bucket_list.c:1203: ipr_bucket_list_first_: Assertion `self' failed.
> 	//
> 	// it asserts ONLY when I provide callback in ..._set_body 
> 	// amq_content_basic_destroy (&content);
> 	
> 	// OK when using unlink... but will it free the content memory
> 	amq_content_basic_unlink (&content);
>     }
> 	
>     // notify external layer that I am done with message content and it can be safely
>     // destroyed/reused
>     pOutbox->freeMessage(message);
> }
> 
> 
> ------------------------------------------------------------------------
> 
> _______________________________________________
> openamq-dev mailing list
> openamq-dev at lists.openamq.org
> http://lists.openamq.org/mailman/listinfo/openamq-dev



More information about the openamq-dev mailing list