[openamq-dev] session->alive always 0
Jörg Fischer
joerg.fischer at biologie.uni-freiburg.de
Thu Oct 25 11:15:32 CEST 2007
Hi,
I'm planning to use OpenAMQ in my current project and currently testing it.
In my current test I use a modified version of the chat room example
from the wiki.
Build environment is Win32 with Visual Studio 2005. I use the compiled
libraries from http://wiki.openamq.org/package:win32-builds (version
1.2c4). My source code is appended at the end of the mail.
I've got a few questions concerning this test:
1. It is unable to detect disconnections from the server, to be precise:
session->alive and
connection->alive are always 0. Sending and receiving works. It doesn't
change if the connection is ok or not. What did I do wrong?
...
if(!session->alive) // Why is session->alive always false?
{
icl_console_print ("SENDER E: session dead.");
cleanup(connection, session);
return -1;
}
...
2. All connection properties I get have 4 leading bytes with value 0. Why?
...
if (connection)
icl_console_print ("I: connected to %s/%s - %s - %s",
connection->server_product + 4, // Why are the first 4 bytes zero?
connection->server_version + 4,
connection->server_platform + 4,
connection->server_information + 4);
...
Thanks,
Jörg
--
Dipl.-Inf. Joerg Fischer
e-mail: joerg.fischer at biologie.uni-freiburg.de
Tel: +49-761-203-2910
Institute for Biology I Albert-Ludwigs University
Hauptstraße 1
79104 Freiburg
Germany
--- Source code:
#include "base.h"
#include "amq_client_connection.h"
#include "amq_client_session.h"
#include <iostream>
using namespace std;
void cleanup(amq_client_connection_t *connection, amq_client_session_t
*session);
int main (int argc, char *argv [])
{
char* name = argv[1];
char* server = argv[2];
char* topic = argv[3];
amq_client_connection_t *connection;
amq_client_session_t *session;
icl_longstr_t *auth_data;
amq_content_basic_t *content;
size_t message_size;
// Initialise system
icl_system_initialise (argc, argv);
// Open a connection
auth_data = amq_client_connection_auth_plain ("guest", "guest");
connection = amq_client_connection_new (server, "/cluster",
auth_data, "receiver_client", 0, 30000);
icl_longstr_destroy (&auth_data);
if (connection)
icl_console_print ("I: connected to %s/%s - %s - %s",
connection->server_product + 4, // Why are the first 4 bytes zero?
connection->server_version + 4,
connection->server_platform + 4,
connection->server_information + 4);
else {
icl_console_print ("E: could not connect to server");
return (1);
}
// Open a channel
session = amq_client_session_new (connection);
if(!session)
icl_console_print ("E: could not create session");
int result;
// Create a private queue
result = amq_client_session_queue_declare (
session, // session
0, // ticket
NULL, // queue name
FALSE, // passive
FALSE, // durable
TRUE, // exclusive
TRUE, // auto-delete
NULL); // arguments
if(result)
icl_console_print ("E: could not declare queue");
// Bind the queue to the exchange
result = amq_client_session_queue_bind (
session, // session
0, // ticket
NULL, // queue
"amq.topic", // exchange
topic , // routing-key
NULL); // arguments
if(result)
icl_console_print ("E: could not bind queue");
// Consume from the queue
result = amq_client_session_basic_consume (
session, // session
0, // ticket
NULL, // queue
NULL, // consumer-tag
TRUE, // no-local
TRUE, // no-ack
TRUE, // exclusive
NULL); // arguments
if(result)
icl_console_print ("E: could not register for consumation");
while (1)
{
while (1)
{
icl_console_print ("RECEIVER %s I: waiting...", name);
amq_client_session_wait (session, 0);
if(!session->alive) // Why is session->alive always false?
{
// handle CTRL+C
icl_console_print ("SENDER E: session dead.");
cleanup(connection, session);
return -1;
}
int numberMessages =
amq_client_session_get_basic_arrived_count (session);
if(numberMessages > 1)
{
icl_console_print ("RECEIVER %s I: got %d message(s)...",
name, numberMessages);
}
for(int i = 0; i < numberMessages; i++)
{
content = amq_client_session_basic_arrived (session);
if (content)
{
// Get the message body and write it to stdout
float* values = (float*)
malloc((size_t)content->body_size);
message_size = amq_content_basic_get_body
(content,(byte*) values, (size_t)content->body_size);
// process content
icl_console_print ("I: got %f", values[0]);
// ...
free(values);
// Destroy the message
amq_content_basic_unlink (&content);
}
else
{
icl_console_print ("E: could not get content");
cleanup(connection, session);
return -1;
}
}
}
cleanup(connection, session);
return 0;
}
}
void cleanup(amq_client_connection_t *connection, amq_client_session_t
*session)
{
amq_client_session_destroy (&session);
amq_client_connection_destroy (&connection);
icl_system_terminate ();
}
More information about the openamq-dev
mailing list