[openamq-dev] Multiple consumers per session
Martin Sustrik
sustrik at imatix.com
Sat Sep 29 11:24:46 CEST 2007
Birju,
The problem is that you are using same consumer tag for all the
consumers ("testclient"). Tags should be different - ideally pass NULL
for the argument and the broker will generate unique consumer name for
you. This way, if you don't want to cancel consumer, you don't even have
to care about consumer tags.
However, the server should report an error and close the connection in
case of duplicate consumer tags and that's not what's happening. We
consider this to be a bug and will fix it in the next release.
Martin
Birju Prajapati wrote:
> Here you go....
> By the way, this is the first time that I've written a C program, so
> please be kind!
>
> #include "asl.h"
> #include "amq_client_connection.h"
> #include "amq_client_session.h"
> int
> main (int argc, char *argv [])
> {
> amq_client_connection_t *connection = NULL;
> amq_client_session_t *session = NULL;
> amq_content_basic_t *content;
> icl_longstr_t *auth_data;
> byte *buffer;
> int num_waiting_content;
> int bsize;
> int *vals;
> // Initialise iCL system
> icl_system_initialise (argc, argv);
> // Open session to local server
> auth_data = amq_client_connection_auth_plain ("guest", "guest");
>
> connection = amq_client_connection_new ( "172.25.110.14", //IP
> "/", //VHOST
> auth_data, //AUTH
> "testclient", //CLIENT ID
> 0, //TRACE
> 30000); //TIMEOUT
>
> if (!connection) icl_console_print("can't connect");
> icl_longstr_destroy (&auth_data);
>
> session = amq_client_session_new (connection);
> if (!session) icl_console_print("cant get session");
>
> amq_client_session_queue_declare ( session,
> 0, //TICKET (NOT USED)
> "queue1", //QUEUE NAME
> 0, //PASSIVE (0=CREATE)
> 0, //DURABLE
> 0, //EXCLUSIVE
> 0, //AUTO_DELETE
> NULL); //OTHER ARGS
> amq_client_session_queue_declare ( session,
> 0, //TICKET (NOT USED)
> "queue2", //QUEUE NAME
> 0, //PASSIVE (0=CREATE)
> 0, //DURABLE
> 0, //EXCLUSIVE
> 0, //AUTO_DELETE
> NULL); //OTHER ARGS
> amq_client_session_queue_declare ( session,
> 0, //TICKET (NOT USED)
> "queue3", //QUEUE NAME
> 0, //PASSIVE (0=CREATE)
> 0, //DURABLE
> 0, //EXCLUSIVE
> 0, //AUTO_DELETE
> NULL); //OTHER ARGS
> amq_client_session_queue_declare ( session,
> 0, //TICKET (NOT USED)
> "queue4", //QUEUE NAME
> 0, //PASSIVE (0=CREATE)
> 0, //DURABLE
> 0, //EXCLUSIVE
> 0, //AUTO_DELETE
> NULL); //OTHER ARGS
>
> amq_client_session_queue_bind( session, 0, "queue1", "amq.fanout", "",
> NULL);
> amq_client_session_queue_bind( session, 0, "queue2", "amq.fanout", "",
> NULL);
> amq_client_session_queue_bind( session, 0, "queue3", "amq.fanout", "",
> NULL);
> amq_client_session_queue_bind( session, 0, "queue4", "amq.fanout", "",
> NULL);
>
> icl_console_print("bound");
>
> amq_client_session_basic_consume
> ( session, 0, "queue1", "testclient", FALSE, TRUE, FALSE, NULL);
>
> icl_console_print("consume1");
>
> amq_client_session_basic_consume
> ( session, 0, "queue2", "testclient", FALSE, TRUE, FALSE, NULL);
>
> icl_console_print("consume2"); //ONLY REACHES HERE AFTER A TIMEOUT
>
> amq_client_session_basic_consume
> ( session, 0, "queue3", "testclient", FALSE, TRUE, FALSE, NULL);
>
> icl_console_print("consume3");
>
> amq_client_session_basic_consume
> ( session, 0, "queue4", "testclient", FALSE, TRUE, FALSE, NULL);
>
> icl_console_print("consume4");
>
> icl_console_print("consumed");
>
> //PROCESS MESSAGES
> while (1) {
> int res = amq_client_session_wait (session,1000);
> if (res) {
> icl_console_print("res=%d", res);
> if (res == -1) exit(1);
> continue;
> }
> else {
> num_waiting_content =
> amq_client_session_get_basic_arrived_count(session);
> icl_console_print("num waiting=%d", num_waiting_content);
> if (num_waiting_content) {
> while( (content = amq_client_session_basic_arrived(session)) ) {
> bsize = content->body_size;
> icl_console_print("body size=%d", bsize);
> buffer = (byte*)malloc(bsize);
> amq_content_basic_get_body(content, buffer, bsize);
> vals = (int*)buffer;
> icl_console_print("2nd val=%d", *(vals+1));
> free(buffer);
> }
> }
> }
>
> }
> // Shutdown connection and session
> amq_client_session_destroy (&session);
> amq_client_connection_destroy (&connection);
> // Terminate iCL system
> icl_system_terminate ();
> return (0);
> }
>
>
> -----Original Message-----
> From: openamq-dev-bounces at lists.openamq.org
> [mailto:openamq-dev-bounces at lists.openamq.org] On Behalf Of Martin
> Sustrik
> Sent: 28 September 2007 15:46
> To: OpenAMQ development discussion
> Subject: Re: [openamq-dev] Multiple consumers per session
>
> Birju,
>
> I've tried and I can open several consumers from single session with no
> problem.
>
> Can you send us your test program that exhibits the behaviour?
>
> Thanks.
> Martin
>
> Birju Prajapati wrote:
>> Hi,
>> Am I only allowed one consumer per session? From one thread I'm trying
>
>> to consume from 4 queues, starting 4 consumers from the same session,
>> and it hangs on the call to start the second consumer.
>>
>> The server warns of:
>> cannot create consumer - too many consumers?
>>
>> The only statement in the spec that I can see bearing any resemblance
>> to my problem is in section 3.1.6, but this does not make it explicit
>> that I cannot have more than one consumer per session - it just states
>
>> that a consumer cannot be shared between channels (which the WireAPI
>> doesn't let you do anyway).
>> I also see in the spec (section 2.2.5) that it is 'encouraged' to use
>> a channel per thread. However in OpenAMQ, this means that it is
>> encouraged to use a connection per thread (as you can only have one
>> channel per connection).
>> Do I have to start a new session for each new consumer, and hence have
>
>> a different connection/thread/process for each?
>>
>> If that is the case, then I think it should be made explicit in the
>> spec.
>>
>> Thanks for your help,
>> Birju
>>
>>
>>
>> Schneider Trading
>> 4th Floor,
>> 25 Copthall Avenue
>> London EC2R 7BP
>>
>> --------------------------------------------------------
>>
>>
>>
>> Message from: Birju.Prajapati at schneidertrading.com
>> Message to: openamq-dev at lists.openamq.org Attached files: 0 This
>> e-mail and any files transmitted with it are confidential, may be
> legally privileged, and are for the sole use of the intended recipient.
> Copyright in this e-mail and any accompanying document created by us is
> owned by us. If you are not the intended recipient of this e-mail or any
> part of it please telephone our IT Department at the number shown above
> or by e-mail at info at schneidertrading.com.
>> You should not use or disclose to any other person the contents of
> this e-mail or its attachments (if any), nor take copies. This e-mail is
> not a representation or warranty and is not intended nor should it be
> taken to create any legal relations, contractual or otherwise.
>> Schneider Trading Associates Ltd (Registration No. 3692131) is
>> incorporated in England and Wales and the registered office is at 25
>> Copthall Avenue, London EC2R 7BP. The company is authorised and
>> regulated by the Financial Services Authority
>> _______________________________________________
>> openamq-dev mailing list
>> openamq-dev at lists.openamq.org
>> http://lists.openamq.org/mailman/listinfo/openamq-dev
>
> _______________________________________________
> openamq-dev mailing list
> openamq-dev at lists.openamq.org
> http://lists.openamq.org/mailman/listinfo/openamq-dev
>
>
>
> Schneider Trading
> 4th Floor,
> 25 Copthall Avenue
> London EC2R 7BP
>
> --------------------------------------------------------
>
>
>
> Message from: Birju.Prajapati at schneidertrading.com
> Message to: openamq-dev at lists.openamq.org
> Attached files: 0
> This e-mail and any files transmitted with it are confidential, may be legally privileged, and are for the sole use of the intended recipient. Copyright in this e-mail and any accompanying document created by us is owned by us. If you are not the intended recipient of this e-mail or any part of it please telephone our IT Department at the number shown above or by e-mail at info at schneidertrading.com.
> You should not use or disclose to any other person the contents of this e-mail or its attachments (if any), nor take copies. This e-mail is not a representation or warranty and is not intended nor should it be taken to create any legal relations, contractual or otherwise.
> Schneider Trading Associates Ltd (Registration No. 3692131) is incorporated in England and Wales and the registered office is at 25 Copthall Avenue, London EC2R 7BP. The company is authorised and regulated by the Financial Services Authority
> _______________________________________________
> openamq-dev mailing list
> openamq-dev at lists.openamq.org
> http://lists.openamq.org/mailman/listinfo/openamq-dev
More information about the openamq-dev
mailing list