[openamq-dev] Multiple consumers per session

Birju Prajapati Birju.Prajapati at schneidertrading.com
Fri Sep 28 16:58:42 CEST 2007


Here you go....
By the way, this is the first time that I've written a C program, so
please be kind!

#include "asl.h"
#include "amq_client_connection.h"
#include "amq_client_session.h"
int
main (int argc, char *argv [])
{
  amq_client_connection_t    *connection = NULL;
  amq_client_session_t       *session = NULL;
  amq_content_basic_t        *content;
  icl_longstr_t              *auth_data;
  byte                       *buffer;
  int                        num_waiting_content;
  int                        bsize;
  int                        *vals;
  //  Initialise iCL system
  icl_system_initialise (argc, argv);
  //  Open session to local server
  auth_data  = amq_client_connection_auth_plain ("guest", "guest");

  connection = amq_client_connection_new ( "172.25.110.14", //IP
					   "/",             //VHOST
					   auth_data,       //AUTH
					   "testclient",    //CLIENT ID
					   0,               //TRACE
					   30000);          //TIMEOUT

  if (!connection) icl_console_print("can't connect");
  icl_longstr_destroy (&auth_data);

  session = amq_client_session_new (connection);
  if (!session) icl_console_print("cant get session");

  amq_client_session_queue_declare ( session,
				     0,         //TICKET (NOT USED)
				     "queue1",  //QUEUE NAME
				     0,         //PASSIVE (0=CREATE)
				     0,         //DURABLE
				     0,         //EXCLUSIVE
				     0,         //AUTO_DELETE
				     NULL);     //OTHER ARGS
  amq_client_session_queue_declare ( session,
				     0,         //TICKET (NOT USED)
				     "queue2",  //QUEUE NAME
				     0,         //PASSIVE (0=CREATE)
				     0,         //DURABLE
				     0,         //EXCLUSIVE
				     0,         //AUTO_DELETE
				     NULL);     //OTHER ARGS
  amq_client_session_queue_declare ( session,
				     0,         //TICKET (NOT USED)
				     "queue3",  //QUEUE NAME
				     0,         //PASSIVE (0=CREATE)
				     0,         //DURABLE
				     0,         //EXCLUSIVE
				     0,         //AUTO_DELETE
				     NULL);     //OTHER ARGS
  amq_client_session_queue_declare ( session,
				     0,         //TICKET (NOT USED)
				     "queue4",  //QUEUE NAME
				     0,         //PASSIVE (0=CREATE)
				     0,         //DURABLE
				     0,         //EXCLUSIVE
				     0,         //AUTO_DELETE
				     NULL);     //OTHER ARGS

  amq_client_session_queue_bind( session, 0, "queue1", "amq.fanout", "",
NULL);
  amq_client_session_queue_bind( session, 0, "queue2", "amq.fanout", "",
NULL);
  amq_client_session_queue_bind( session, 0, "queue3", "amq.fanout", "",
NULL);
  amq_client_session_queue_bind( session, 0, "queue4", "amq.fanout", "",
NULL);

  icl_console_print("bound");
  
  amq_client_session_basic_consume 
    ( session, 0, "queue1", "testclient", FALSE, TRUE, FALSE, NULL);

  icl_console_print("consume1");

  amq_client_session_basic_consume 
    ( session, 0, "queue2", "testclient", FALSE, TRUE, FALSE, NULL);

  icl_console_print("consume2");  //ONLY REACHES HERE AFTER A TIMEOUT

  amq_client_session_basic_consume 
    ( session, 0, "queue3", "testclient", FALSE, TRUE, FALSE, NULL);

  icl_console_print("consume3");

  amq_client_session_basic_consume 
    ( session, 0, "queue4", "testclient", FALSE, TRUE, FALSE, NULL);

  icl_console_print("consume4");

  icl_console_print("consumed");

  //PROCESS MESSAGES
  while (1) {
    int res = amq_client_session_wait (session,1000);
    if (res) {
      icl_console_print("res=%d", res);
      if (res == -1) exit(1);
      continue;
    }
    else {
      num_waiting_content =
amq_client_session_get_basic_arrived_count(session);
      icl_console_print("num waiting=%d", num_waiting_content);
      if (num_waiting_content) {
	while( (content = amq_client_session_basic_arrived(session)) ) {
	  bsize = content->body_size;
	  icl_console_print("body size=%d", bsize);
	  buffer = (byte*)malloc(bsize);
	  amq_content_basic_get_body(content, buffer, bsize);
	  vals = (int*)buffer;
	  icl_console_print("2nd val=%d", *(vals+1));
	  free(buffer); 
	}
      }
    }

  }
  //  Shutdown connection and session
  amq_client_session_destroy    (&session);
  amq_client_connection_destroy (&connection);
  //  Terminate iCL system
  icl_system_terminate ();
  return (0);
}
  

-----Original Message-----
From: openamq-dev-bounces at lists.openamq.org
[mailto:openamq-dev-bounces at lists.openamq.org] On Behalf Of Martin
Sustrik
Sent: 28 September 2007 15:46
To: OpenAMQ development discussion
Subject: Re: [openamq-dev] Multiple consumers per session

Birju,

I've tried and I can open several consumers from single session with no
problem.

Can you send us your test program that exhibits the behaviour?

Thanks.
Martin

Birju Prajapati wrote:
> Hi,
> Am I only allowed one consumer per session? From one thread I'm trying

> to consume from 4 queues, starting 4 consumers from the same session, 
> and it hangs on the call to start the second consumer.
> 
> The server warns of:
> cannot create consumer - too many consumers?
> 
> The only statement in the spec that I can see bearing any resemblance 
> to my problem is in section 3.1.6, but this does not make it explicit 
> that I cannot have more than one consumer per session - it just states

> that a consumer cannot be shared between channels (which the WireAPI 
> doesn't let you do anyway).
> I also see in the spec (section 2.2.5) that it is 'encouraged' to use 
> a channel per thread. However in OpenAMQ, this means that it is 
> encouraged to use a connection per thread (as you can only have one 
> channel per connection).
> Do I have to start a new session for each new consumer, and hence have

> a different connection/thread/process for each?
> 
> If that is the case, then I think it should be made explicit in the 
> spec.
> 
> Thanks for your help,
> Birju
>  
> 
>  
> Schneider Trading
> 4th Floor,
> 25 Copthall Avenue
> London EC2R 7BP
> 
> --------------------------------------------------------
> 
> 
> 
> Message from: Birju.Prajapati at schneidertrading.com
> Message to: openamq-dev at lists.openamq.org Attached files: 0 This 
> e-mail and any files transmitted with it are confidential, may be
legally privileged, and are for the sole use of the intended recipient.
Copyright in this e-mail and any accompanying document created by us is
owned by us. If you are not the intended recipient of this e-mail or any
part of it please telephone our IT Department at the number shown above
or by e-mail at info at schneidertrading.com.
> You should not use or disclose to any other person the contents of
this e-mail or its attachments (if any), nor take copies. This e-mail is
not a representation or warranty and is not intended nor should it be
taken to create any legal relations, contractual or otherwise.
> Schneider Trading Associates Ltd (Registration No. 3692131) is 
> incorporated in England and Wales and the registered office is at 25 
> Copthall Avenue, London EC2R 7BP. The company is authorised and 
> regulated by the Financial Services Authority 
> _______________________________________________
> openamq-dev mailing list
> openamq-dev at lists.openamq.org
> http://lists.openamq.org/mailman/listinfo/openamq-dev

_______________________________________________
openamq-dev mailing list
openamq-dev at lists.openamq.org
http://lists.openamq.org/mailman/listinfo/openamq-dev
 

 
Schneider Trading
4th Floor,
25 Copthall Avenue
London EC2R 7BP

--------------------------------------------------------



Message from: Birju.Prajapati at schneidertrading.com
Message to: openamq-dev at lists.openamq.org
Attached files: 0
This e-mail and any files transmitted with it are confidential, may be legally privileged, and are for the sole use of the intended recipient. Copyright in this e-mail and any accompanying document created by us is owned by us. If you are not the intended recipient of this e-mail or any part of it please telephone our IT Department at the number shown above or by e-mail at info at schneidertrading.com. 
You should not use or disclose to any other person the contents of this e-mail or its attachments (if any), nor take copies. This e-mail is not a representation or warranty and is not intended nor should it be taken to create any legal relations, contractual or otherwise.
Schneider Trading Associates Ltd (Registration No. 3692131) is incorporated in England and Wales and the registered office is at 25 Copthall Avenue, London EC2R 7BP. The company is authorised and regulated by the Financial Services Authority


More information about the openamq-dev mailing list