AMcoder - javascript, python, java, html, php, sql

Ինչպես կարգավորել RabbitMQ-ն, որպեսզի մի քանի սպառողներին հավասարապես սպասարկի բազմաթիվ հերթեր

Ես RabbitMQ բրոքեր ունեմ բազմաթիվ հերթերով: Հաճախորդի կողմից (Java) ես ունեմ բազմաթիվ սպառողներ, որոնք բոլորն էլ լսում են իրենց հերթը այսպես.

QUEUE_1 -> DataConsumer1; QUEUE_2 -> DataConsumer2 ...

Նրանք բոլորն օգտագործում են մեկ կապ, բայց տարբեր ալիք: Ինչ է տեղի ունենում, երբ ես բեռնում եմ բոլոր հերթերը և սկսում եմ, որ հավելվածի բրոքերն առաջին հերթին սպասարկում է մեկ հերթ, քան մյուսը և այլն: Այսպիսով, հաղորդագրությունները ստացվում են իրենց համապատասխան սպառողի կողմից յուրաքանչյուր հերթում: Կցանկանայի նաև նշել, որ ես օգտագործում եմ նախնական առբերման 1 թիվը՝ փորձելով հասնել սպառողների տրաֆիկի արդար բաշխման:

Ինչպես կարող եմ այնպես անել, որ բոլոր հերթերը սպասարկվեն հավասարապես:

Խմբագրել. Ահա այն կոդը, որը ստեղծում է սպառողներ (բավականին հիմնական)

import com.rabbitmq.client.*;

import org.apache.log4j.Logger;

import java.util.UUID;
import java.util.concurrent.TimeoutException;

 * Used for consuming and acknowledging messages from defined queue.
public class Consumer {
    private final static Logger logger = Logger.getLogger(Consumer.class);
    // Maximum number of messages that can be on the consumer at a time
    private static int prefetchCount = 1;

    // Internal enum which contains queue names and their exchange keys
    private Queue queue;
    private Channel channel;
    private String consumerTag;
    private String uuid = UUID.randomUUID().toString();
    private boolean subscribed = false;
    private DeliverCallback deliverCallback = this::handleDeliver;
    private CancelCallback cancelCallback = this::handleCancel;
    private ConsumerShutdownSignalCallback consumerShutdownSignalCallback = this::handleShutdown;

     * The constructors sets the channel to RabbitMQ broker for the specified queue.
     * Callback for events are set to their default implementation.
     * @param queue RabbitMQ queue - this consumer will be assigned to this queue and will only be able to consume from it.
     * @see #setDeliverCallback(DeliverCallback)
     * @see #setCancelCallback(CancelCallback)
     * @see #setConsumerShutdownSignalCallback(ConsumerShutdownSignalCallback)
    public Consumer(Queue queue) {
        this.queue = queue;

        try {

        } catch (IOException e) {


    public Class getEntityClass() {
        return Queue.getEntityClassForQueue(queue);

    public String getUuid() {
        return uuid;

    public boolean isSubscribed() {
        return subscribed;

    public DeliverCallback getDeliverCallback() {
        return deliverCallback;

    public void setDeliverCallback(DeliverCallback deliverCallback) {
        this.deliverCallback = deliverCallback;

    public CancelCallback getCancelCallback() {
        return cancelCallback;

    public void setCancelCallback(CancelCallback cancelCallback) {
        this.cancelCallback = cancelCallback;

    public ConsumerShutdownSignalCallback getConsumerShutdownSignalCallback() {
        return consumerShutdownSignalCallback;

    public void setConsumerShutdownSignalCallback(ConsumerShutdownSignalCallback consumerShutdownSignalCallback) {
        this.consumerShutdownSignalCallback = consumerShutdownSignalCallback;

     * <p>
     * Subscribes to the set queue. The subscription can be cancelled using
     * Checks if the queue is set up properly.
     * </p>
     * <p>
     * Note: this is a non-blocking operation. The client will listen for incoming messages and handle them using
     * the provided DeliveryCallback function but the execution of this operation will be on another thread.
     * </p>
     * @throws IOException if I/O problem is encountered.
    public void subscribeToQueue() throws IOException {
        if (channel != null) {
            consumerTag = channel.basicConsume(
            subscribed = true;

        } else {
            logger.error("Channel does not exist. Unable to consume message.");


     * Confirms the message has been successfully processed.
     * @param deliveryTag Unique message tag generated by the server.
     * @throws IOException if I/O problem is encountered.
    public void acknowledgeMessageReceived(long deliveryTag) throws IOException {
        if (channel != null) {
            channel.basicAck(deliveryTag, false);

        } else {
            logger.error("Channel does not exist. Unable to acknowledge message delivery.");


     * Sends a negative acknowledgement to RabbitMQ without re-queueing the message.
     * @param deliveryTag Unique message tag generated by the server.
     * @throws IOException if I/O problem is encountered.
    public void rejectMessage(long deliveryTag) throws IOException {
        if (channel != null) {
            channel.basicReject(deliveryTag, false);

        } else {
            logger.error("Channel does not exist. Unable to reject message delivery.");


     * Cancels consumer subscription to the queue.
     * The consumer can be used for acknowledging messages, but will not receive new messages.
     * This does not close the underlying channel. To close the channel use closeChannel() method.
     * @throws IOException
     * @see #subscribeToQueue()
     * @see #closeChannel()
    public void cancelSubscription() throws IOException {
        if (channel != null) {
            subscribed = false;

        } else {
            logger.error("Channel does not exist. Unable to cancel consumer subscription.");

     * Explicitly closes channel to the queue.
     * After doing this you will not be able to use any of the methods of this class.
     * @throws IOException      if I/O problem is encountered.
     * @throws TimeoutException if connection problem occurs.
    public void closeChannel() throws IOException, TimeoutException {
        if (channel != null) {
            channel = null;
  "Closing RabbitMQ consumer channel...");

        } else {
            logger.error("Channel already closed.");


     * Checks if the queue exists and creates the channel.
     * If the queue does not exist channel is set to null and cannot be used.
     * @throws IOException if I/O problem is encountered.
    private void setUpChannel() throws IOException {

        channel = ChannelFactory.getInstance().createChannel();
        try {

        } catch (IOException e) {
            // When this exception occurs it renders the channel unusable so it's best set to null.
            channel = null;

            logger.error(String.format("Queue %s does not exist [%s]", queue.getQueueName(), e.getMessage()));

        }"Setting up RabbitMQ consumer channel. Channel successfully initialized: " + (channel != null));

     * Callback called when a message is delivered to the client.
     * Default implementation. Callback acknowledges message received and does nothing with it.
     * To use custom implementation use setDeliverCallback method.
     * @param consumerTag The consumer tag associated with the consumer.
     * @param message     Message object.
     * @see #setDeliverCallback(DeliverCallback)
    private void handleDeliver(String consumerTag, Delivery message) {
        Envelope envelope = message.getEnvelope();
        long deliveryTag = envelope.getDeliveryTag();"Message delivered: " + deliveryTag);

        try {
            channel.basicAck(deliveryTag, false);

        } catch (IOException e) {


     * Callback called when a service is cancelled.
     * Default implementation. To use custom implementation specify it in the constructor.
     * @param consumerTag The consumer tag associated with the consumer.
    private void handleCancel(String consumerTag) {"Consumer (" + consumerTag + ") cancelled: ");

     * Called when the consumer is abruptly shutdown due to termination of the underlying connection or channel.
     * Default implementation. To use custom implementation specify it in the constructor.
     * @param consumerTag The consumer tag associated with the consumer.
     * @param exception   Shutdown reason.
    private void handleShutdown(String consumerTag, ShutdownSignalException exception) {"Consumer (%s) shutdown. Reason: %s", consumerTag, exception.getMessage()));;

  • Ի՞նչ նկատի ունեք բոլոր հերթերի բեռնում ասելով: Ինչպե՞ս են հաղորդագրությունները ուղղվում դեպի հերթեր: 23.07.2019
  • @LutzHorn դա կարևոր է: Կա ուղիղ փոխանակում, որը ուղարկում է հաղորդագրություններ դեպի հերթ իր անունով: Բեռ ասելով նկատի ունեմ, որ յուրաքանչյուր հերթ ուներ մոտ 1000 հաղորդագրություն, և դրանք սպառվում էին 1000 մի հերթից, հետո 1000 մյուսից և այլն։ 23.07.2019
  • Դժվար է ասել, թե ինչ է տեղի ունենում առանց ձեր կոդը տեսնելու: Խնդրում ենք խմբագրել ձեր հարցը ներառել այն կոդը, որը ստեղծում է սպառողներին: 23.07.2019



ԹԱՐՄԱՑՈՒՄ. լուծված է, ըստ երևույթին իմ նախնական առբերման քանակը չէր սահմանվում, ուստի այն անսահմանափակ էր: Այդ իսկ պատճառով երթեւեկությունը փակվել է մեկ ալիքով, քանի դեռ հերթը չի սպառվել։

Նոր նյութեր

Օգտագործելով Fetch Vs Axios.Js-ը՝ HTTP հարցումներ կատարելու համար
JavaScript-ը կարող է ցանցային հարցումներ ուղարկել սերվեր և բեռնել նոր տեղեկատվություն, երբ դա անհրաժեշտ լինի: Օրինակ, մենք կարող ենք օգտագործել ցանցային հարցումը պատվեր ներկայացնելու,..

Տիրապետել հանգստության արվեստին. մշակողի ուղեցույց՝ ճնշման տակ ծաղկելու համար
Տիրապետել հանգստության արվեստին. մշակողի ուղեցույց՝ ճնշման տակ ծաղկելու համար Ինչպե՞ս հանգստացնել ձեր միտքը և աշխատեցնել ձեր պրոցեսորը: Ինչպես մնալ հանգիստ և զարգանալ ճնշման տակ...

Մեքենայի ուսուցում բանկային և ֆինանսների ոլորտում
Բարդ, խելացի անվտանգության համակարգերը և հաճախորդների սպասարկման պարզեցված ծառայությունները բիզնեսի հաջողության բանալին են: Ֆինանսական հաստատությունները, մասնավորապես, պետք է առաջ մնան կորի..

Ես AI-ին հարցրի կյանքի իմաստը, այն ինչ ասում էր, ցնցող էր:
Այն պահից ի վեր, երբ ես իմացա Արհեստական ​​ինտելեկտի մասին, ես հիացած էի այն բանով, թե ինչպես է այն կարողանում հասկանալ մարդկային նորմալ տեքստը, և այն կարող է առաջացնել իր սեփական արձագանքը դրա..

Ինչպես սովորել կոդավորումը Python-ում վագրի պես:
Սովորելու համար ծրագրավորման նոր լեզու ընտրելը բարդ է: Անկախ նրանից, թե դուք սկսնակ եք, թե առաջադեմ, դա օգնում է իմանալ, թե ինչ թեմաներ պետք է սովորել: Ծրագրավորման լեզվի հիմունքները, դրա..

C++-ի օրական բիթ(ե) | Ամենաերկար պալինդրոմային ենթաշարը
C++ #198-ի ամենօրյա բիթ(ե), Ընդհանուր հարցազրույցի խնդիր. Ամենաերկար պալինդրոմային ենթատող: Այսօր մենք կանդրադառնանք հարցազրույցի ընդհանուր խնդրին. Ամենաերկար palindromic substring...

Kydavra ICAReducer՝ ձեր տվյալների ծավալայինությունը նվազեցնելու համար
Ի՞նչ է ICAReducer-ը: ICAReducer-ն աշխատում է հետևյալ կերպ. այն նվազեցնում է նրանց միջև բարձր փոխկապակցված հատկանիշները մինչև մեկ սյունակ: Բավականին նման է PCAreducer-ին, չնայած այն..