[openamq-dev] session->alive always 0

Jörg Fischer joerg.fischer at biologie.uni-freiburg.de
Thu Oct 25 11:15:32 CEST 2007


Hi,

I'm planning to use OpenAMQ in my current project and currently testing it.
In my current test I use a modified version of the chat room example 
from the wiki.
Build environment is Win32 with Visual Studio 2005. I use the compiled 
libraries from http://wiki.openamq.org/package:win32-builds (version 
1.2c4). My source code is appended at the end of the mail.
I've got a few questions concerning this test:

1. It is unable to detect disconnections from the server, to be precise: 
session->alive and
connection->alive are always 0.  Sending and receiving works. It doesn't 
change if the connection is ok or not. What did I do wrong?
   ...
   if(!session->alive) // Why is session->alive always false?
           {
               icl_console_print ("SENDER E: session dead.");
               cleanup(connection, session);
               return -1;
           }
  ...

2. All connection properties I get have 4 leading bytes with value 0. Why?
  ...
  if (connection)
       icl_console_print ("I: connected to %s/%s - %s - %s",
       connection->server_product + 4, // Why are the first 4 bytes zero?
       connection->server_version + 4,
       connection->server_platform + 4,
       connection->server_information + 4);
  ...

Thanks,

Jörg

-- 

Dipl.-Inf. Joerg Fischer

e-mail: joerg.fischer at biologie.uni-freiburg.de
Tel: +49-761-203-2910

Institute for Biology I        Albert-Ludwigs University

Hauptstraße 1
79104 Freiburg
Germany

--- Source code:
#include "base.h"
#include "amq_client_connection.h"
#include "amq_client_session.h"
#include <iostream>
using namespace std;

void cleanup(amq_client_connection_t *connection, amq_client_session_t 
*session);

int main (int argc, char *argv [])
{
   char* name   = argv[1];
   char* server = argv[2];
   char* topic  = argv[3];

   amq_client_connection_t    *connection;
   amq_client_session_t *session;
   icl_longstr_t *auth_data;
   amq_content_basic_t    *content;
   size_t message_size;

   //  Initialise system
   icl_system_initialise (argc, argv);

   //  Open a connection
   auth_data = amq_client_connection_auth_plain ("guest", "guest");
   connection = amq_client_connection_new (server, "/cluster", 
auth_data, "receiver_client", 0, 30000);
   icl_longstr_destroy (&auth_data);

   if (connection)
       icl_console_print ("I: connected to %s/%s - %s - %s",
       connection->server_product + 4, // Why are the first 4 bytes zero?
       connection->server_version + 4,
       connection->server_platform + 4,
       connection->server_information + 4);
   else {
       icl_console_print ("E: could not connect to server");
       return (1);
   }

   //  Open a channel
   session = amq_client_session_new (connection);
   if(!session)
       icl_console_print ("E: could not create session");
   int result;

   //  Create a private queue
   result = amq_client_session_queue_declare (
       session,                        //  session
       0,                              //  ticket
       NULL,                           //  queue name
       FALSE,                          //  passive
       FALSE,                          //  durable
       TRUE,                           //  exclusive
       TRUE,                           //  auto-delete
       NULL);                          //  arguments
   if(result)
       icl_console_print ("E: could not declare queue");

   //  Bind the queue to the exchange
   result = amq_client_session_queue_bind (
       session,                        //  session
       0,                              //  ticket
       NULL,                           //  queue
       "amq.topic",                    //  exchange
       topic ,                         //  routing-key
       NULL);                          //  arguments
   if(result)
       icl_console_print ("E: could not bind queue");


   //  Consume from the queue
   result = amq_client_session_basic_consume (
       session,                        //  session
       0,                              //  ticket
       NULL,                           //  queue
       NULL,                           //  consumer-tag
       TRUE,                           //  no-local
       TRUE,                           //  no-ack
       TRUE,                           //  exclusive
       NULL);                          //  arguments
   if(result)
       icl_console_print ("E: could not register for consumation");

   while (1)
   {

       while (1)
       {
           icl_console_print ("RECEIVER %s I: waiting...", name);
           amq_client_session_wait (session, 0);
           if(!session->alive) // Why is session->alive always false?
           {
               // handle CTRL+C
               icl_console_print ("SENDER E: session dead.");
               cleanup(connection, session);
               return -1;
           }
           int numberMessages = 
amq_client_session_get_basic_arrived_count (session);
           if(numberMessages > 1)
           {
               icl_console_print ("RECEIVER %s I: got %d message(s)...", 
name, numberMessages);
           }
           for(int i = 0; i < numberMessages; i++)
           {
               content = amq_client_session_basic_arrived (session);
               if (content)
               {
                   //  Get the message body and write it to stdout
                   float* values =  (float*) 
malloc((size_t)content->body_size);
                   message_size = amq_content_basic_get_body 
(content,(byte*) values, (size_t)content->body_size);
                   // process content
                   icl_console_print ("I: got %f", values[0]);
                   // ...

                   free(values);
                   //  Destroy the message
                   amq_content_basic_unlink (&content);
               }
               else
               {
                   icl_console_print ("E: could not get content");
                   cleanup(connection, session);                      
return -1;
               }
           }
       }
       cleanup(connection, session);
       return 0;
   }
}

void cleanup(amq_client_connection_t *connection, amq_client_session_t 
*session)
{
       amq_client_session_destroy (&session);
       amq_client_connection_destroy (&connection);
       icl_system_terminate ();
}




More information about the openamq-dev mailing list