[openamq-dev] Help on OpenAMQ (fanout exchange type) required

Mousami Kelkar mousami.kelkar at ftindia.com
Tue Aug 21 14:30:06 CEST 2007


Help on OpenAMQ (fanout exchange type) requiredHi,

I have followed the guidelines mentioned in Advanced Message Queuing Protocol pdf to declare and bind fanout type of exchange and trying to publish and consume messages but unable to do so. Please find the test application for the same.

sender.c is as follows:

#include <asl.h>

#include <amq_client_connection.h>

#include <amq_client_session.h>

int main(int argc, char** argv)

{

        amq_client_connection_t * connection = NULL;

        amq_client_session_t * session = NULL;

        amq_content_basic_t * content = NULL;

        icl_longstr_t *auth_data;

        char *message_body;

        char message_text[1024];

                

        icl_system_initialise(argc, argv);

        auth_data = amq_client_connection_auth_plain("guest","guest");

        connection = amq_client_connection_new(

                        "127.0.0.1:5672",

                        "/",

                        auth_data,

                        "sender",

                        0,

                        30000);

        

        if(!connection)

        {

                return 1;

                printf("Connection failed\n");

        }

        icl_longstr_destroy(&auth_data);

        session = amq_client_session_new(connection);

        if(!session)

        {       

                printf("session failed\n");

                return 1;

        }

        amq_client_session_exchange_declare(

        session, 

        0,

        "amq.fanout",

        "fanout",

        0,

        1,

        0,

        0,

        NULL);

        

        /* Create a message*/

        

        while(1)

        {

           while(1)

           {

                

                

                /*Create message body*/

                fgets(message_text, sizeof(message_text), stdin);

                message_body = malloc(strlen(message_text) + 1);

                assert(message_body);

                if(!message_body)

                {

                        printf("message body failed\n");

                        return 1;

                }

                sprintf(message_body,"%s",message_text);

                

                content =amq_content_basic_new();

                //Create message        

                amq_content_basic_set_body(

                content,

                message_body,

                strlen(message_body),

                free);

                amq_content_basic_set_message_id(content, "ID0001");

        

                amq_client_session_basic_publish(

                session,

                content,

                0,

                "amq.fanout",

                "amq.myqueue",//routing key

                FALSE,

                FALSE);

                

                amq_content_basic_unlink(&content);

                }

        }

        amq_client_session_destroy(&session);

        amq_client_connection_destroy(&connection);

        icl_system_terminate();

        return 0;

        

}

receiver.c is as follows:

#include <asl.h>

#include <amq_client_connection.h>

#include <amq_client_session.h>

        

int main(int argc, char **argv)

{

        

        amq_client_connection_t *connection = NULL;

        amq_client_session_t *session = NULL;

        icl_longstr_t *auth_data = NULL;

        amq_content_basic_t *content = NULL;

        char message_text[1024];

        size_t message_size;

        

        

        icl_system_initialise(argc, argv);

        auth_data = amq_client_connection_auth_plain("guest", "guest");

        connection = amq_client_connection_new(

        "127.0.0.1:5672",

        "/",

        auth_data,

        "receiver",

        0,

        30000

        );

        

        if(!connection)

                 return 1;

        icl_longstr_destroy(&auth_data);

        session = amq_client_session_new(connection);

        

        if(!session)

                return 1;

        

        amq_client_session_queue_declare(

        session,

        0,

        "amq.myqueue",

        0,

        1,

        0,

        0,

        NULL);

        amq_client_session_queue_bind(

        session,

        0,

        "amq.myqueue",

        "",

        "amq.myqueue",

        NULL);

        

        amq_client_session_basic_consume(

        session,

        0,

        "amq.myqueue",

        NULL,

        0,

        0,

        0,

        NULL);

        

        while(1)

        {       

                while(1)

                {

                        amq_client_session_wait(session, 5000);

                        

                        if (amq_client_session_get_basic_arrived_count (session)) {

                            icl_console_print ("I: have messages to process...");

                            content = amq_client_session_basic_arrived (session);

                            while (content) {

                        //  process content

                                content = amq_client_session_basic_arrived (session);

                            }

                        }

                        if(!content)

                        {

                                printf("content not received\n");

                                return 1;

                        }

                        message_size = amq_content_basic_get_body(content,(byte*) message_text,                                                         sizeof(message_text));

                        if(message_size)

                        {

                                message_text[message_size] = '\0';

                                printf("%s\n", message_text);

                        }       

                                

                        if(!connection)

                                return 1;

                

                        //destroy the message

                        amq_content_basic_unlink(&content);     

                }       

                //wait for next message

                amq_client_session_wait(session,0);

                

                if(!connection)

                        return 1;

        

        }

        

        amq_client_session_destroy(&session);

        amq_client_connection_destroy(&connection);

        

        icl_system_terminate();

        

        return 0;

                

}

Can you please help me to solve the problem.Awaiting your reply.

Routing key explaination given in Advanced Message Queuing Protocol pdf mentions that For topic pub-sub routing, the routing key is the topic hierarchy value. - what do you mean by topic hierarchy value?

Thanking you in advance.

Thanks & best regards,

Mousami Kelkar       

Board No: 022-66494000     Extn: 256

Fax: 022-67099100     

mousami.kelkar at ftindia.com

www.ftindia.com 




Disclaimer:

The information in this E-mail (which includes any files transmitted with it) is CONFIDENTIAL and may be legally PRIVILEGED. It is intended solely for the addressee and access to this email by anyone else is unauthorized. If you have received it in error, please destroy any copies of this message, including any attachments, and delete it from your system notifying the sender immediately. Any disclosure, copying, distribution, dissemination, forwarding, printing or any action taken or omitted to be taken in reliance on it or utilising the same for any purpose other than what it is intended for, is prohibited and may be unlawful.

-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.openamq.org/pipermail/openamq-dev/attachments/20070821/586e72c3/attachment-0001.htm 


More information about the openamq-dev mailing list