package com.toasttab.network.domain.rabbitmq;

import android.content.Context;
import com.google.common.annotations.VisibleForTesting;
import com.google.gson.JsonParser;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.impl.AMQCommand;
import com.toasttab.common.R;
import com.toasttab.dataload.api.DataLoadService;
import com.toasttab.network.api.rabbitmq.RabbitMQDisconnectedEvent;
import com.toasttab.network.api.rabbitmq.RabbitMQReconnectedEvent;
import com.toasttab.pos.api.threading.ThreadFactoryBuilder;
import com.toasttab.pos.metrics.ToastMetricRegistry;
import com.toasttab.service.client.TlsSocketFactory;
import com.toasttab.sync.Message;
import com.toasttab.sync.MessageConsumerListener;
import com.toasttab.sync.MessageParseException;
import com.toasttab.sync.MessageReceiver;
import com.toasttab.sync.MessagingException;
import com.toasttab.sync.rabbitmq.AbstractRabbitMQClient;
import com.toasttab.sync.rabbitmq.ImmutableKeyStoreConfig;
import com.toasttab.sync.rabbitmq.ImmutableRabbitMQClientConfig;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.net.SocketTimeoutException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.zip.GZIPInputStream;
import javax.net.ssl.SSLSocketFactory;
import org.apache.commons.io.IOUtils;
import org.greenrobot.eventbus.EventBus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: classes.dex */
public class RabbitMQMessageConsumerImpl extends AbstractRabbitMQClient implements RabbitMQMessageConsumer {
    private static final int shutdownTimeoutMs = 10000;
    private Set<String> bindings;
    protected QueueingConsumer consumer;
    private final DataLoadService dataLoadService;
    private final EventBus eventBus;
    private String exchangeName;
    private long expirationInMilliseconds;
    private final ToastMetricRegistry metricRegistry;
    private String queueName;
    protected MessageReceiver receiver;
    private MessageQueueThread runThread;
    private static final int CONNECTION_TIMEOUT_MS = (int) TimeUnit.SECONDS.toMillis(10);
    private static final int SOCKET_TIMEOUT_MS = (int) TimeUnit.SECONDS.toMillis(10);
    private static final int HEARTBEAT_S = (int) TimeUnit.SECONDS.toSeconds(30);
    private static final int RPC_TIMEOUT_MS = (int) TimeUnit.SECONDS.toMillis(60);
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) RabbitMQMessageConsumer.class);
    private static final ExecutorService consumerExecutorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().staticThreadName("rabbitmq-internal-consumer-work-service").build());

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: classes5.dex */
    public interface Checker {
        void check(Channel channel) throws IOException;
    }

    /* loaded from: classes5.dex */
    public static class MessageParser {
        @VisibleForTesting
        public Message parseMessage(byte[] bArr) throws IOException, MessageParseException {
            Throwable th;
            InputStreamReader inputStreamReader;
            try {
                inputStreamReader = new InputStreamReader(new GZIPInputStream(new ByteArrayInputStream(bArr)), "UTF-8");
            } catch (Throwable th2) {
                th = th2;
                inputStreamReader = null;
            }
            try {
                Message fromJson = Message.fromJson(new JsonParser().parse(inputStreamReader).getAsJsonObject());
                if (RabbitMQMessageConsumerImpl.logger.isDebugEnabled()) {
                    Logger logger = RabbitMQMessageConsumerImpl.logger;
                    Object[] objArr = new Object[4];
                    objArr[0] = fromJson.type;
                    objArr[1] = fromJson.restaurantUuid;
                    objArr[2] = fromJson.routingKey;
                    objArr[3] = fromJson.command == null ? fromJson.entityType : fromJson.command;
                    logger.debug("Received {} message for restaurant {} with routing key {}. Entity type / command = {}", objArr);
                }
                if (RabbitMQMessageConsumerImpl.logger.isTraceEnabled()) {
                    RabbitMQMessageConsumerImpl.logger.trace("Full message:\n{}", fromJson.serialize());
                }
                IOUtils.closeQuietly((Reader) inputStreamReader);
                return fromJson;
            } catch (Throwable th3) {
                th = th3;
                IOUtils.closeQuietly((Reader) inputStreamReader);
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: classes5.dex */
    public class MessageQueueThread extends Thread {
        MessageParser messageParser;
        private boolean running;

        public MessageQueueThread() {
            super("toast-rabbitmq-message-consumer");
            this.running = false;
            this.messageParser = new MessageParser();
        }

        private void ack(QueueingConsumer.Delivery delivery) {
            try {
                if (RabbitMQMessageConsumerImpl.this.channel == null || !RabbitMQMessageConsumerImpl.this.channel.isOpen()) {
                    return;
                }
                RabbitMQMessageConsumerImpl.this.channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            } catch (IOException e) {
                RabbitMQMessageConsumerImpl.logger.warn("Failed basicAck: " + e.getMessage());
            } catch (Exception e2) {
                RabbitMQMessageConsumerImpl.logger.warn("Failed basicAck: " + e2.getMessage());
            }
        }

        private void nack(QueueingConsumer.Delivery delivery) {
            if (delivery != null) {
                try {
                    if (RabbitMQMessageConsumerImpl.this.channel == null || !RabbitMQMessageConsumerImpl.this.channel.isOpen()) {
                        return;
                    }
                    RabbitMQMessageConsumerImpl.this.channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
                } catch (IOException e) {
                    RabbitMQMessageConsumerImpl.logger.error("Failed basicAck: " + e.getMessage(), (Throwable) e);
                } catch (Exception e2) {
                    RabbitMQMessageConsumerImpl.logger.error("Failed basicAck: " + e2.getMessage(), (Throwable) e2);
                }
            }
        }

        public void kill() {
            this.running = false;
            interrupt();
        }

        /* JADX WARN: Code restructure failed: missing block: B:40:0x001b, code lost:
        
            com.toasttab.network.domain.rabbitmq.RabbitMQMessageConsumerImpl.logger.warn("Received Message but running boolean is false");
         */
        @Override // java.lang.Thread, java.lang.Runnable
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public void run() {
            /*
                Method dump skipped, instructions count: 375
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: com.toasttab.network.domain.rabbitmq.RabbitMQMessageConsumerImpl.MessageQueueThread.run():void");
        }

        @Override // java.lang.Thread
        public synchronized void start() {
            this.running = true;
            super.start();
        }
    }

    public RabbitMQMessageConsumerImpl(String str, Context context, DataLoadService dataLoadService, EventBus eventBus, ToastMetricRegistry toastMetricRegistry) throws MessagingException {
        super(ImmutableRabbitMQClientConfig.builder().mqUrl(str).keyStore(ImmutableKeyStoreConfig.builder().store(context.getResources().openRawResource(R.raw.rabbitmq_truststore)).passphrase("optitab".toCharArray()).format("BKS").build()).connectionTimeoutMs(CONNECTION_TIMEOUT_MS).socketTimeoutMs(SOCKET_TIMEOUT_MS).socketKeepalive(true).rpcTimeoutMs(RPC_TIMEOUT_MS).heartbeatS(HEARTBEAT_S).useNativeRecovery(false).channelShouldCheckRpcResponseType(true).build());
        this.bindings = new HashSet();
        this.dataLoadService = dataLoadService;
        this.eventBus = eventBus;
        this.metricRegistry = toastMetricRegistry;
        initConnectionFactory();
    }

    private void checkConnectionStatus() throws MessagingException {
        if (this.connection == null || this.channel == null) {
            throw new MessagingException("Connection to Toast messaging service lost");
        }
    }

    private void createQueue() throws IOException {
        HashMap hashMap = new HashMap();
        hashMap.put("x-expires", Long.valueOf(this.expirationInMilliseconds));
        Channel channel = this.channel;
        if (channel != null) {
            logger.info("Creating new non-durable queue '{}' in exchange '{}'", this.queueName, this.exchangeName);
            channel.queueDeclare(this.queueName, false, false, false, hashMap);
        }
    }

    private boolean createQueueAndConsumer() {
        try {
            if (this.queueName == null) {
                return false;
            }
            synchronized (this.rabbitMQStateChangeMutex) {
                if (this.channel != null && this.connection != null) {
                    if (!queueExists(this.connection, this.queueName)) {
                        this.dataLoadService.addDataLoadRequest("createqueue", DataLoadService.SyncType.DELTA);
                        createQueue();
                    }
                    this.consumer = new QueueingConsumer(this.channel);
                    this.channel.basicConsume(this.queueName, false, this.consumer);
                    return true;
                }
                return false;
            }
        } catch (ShutdownSignalException e) {
            logger.warn("ShutdownSignalException creating Rabbit MQ queue: " + e.getMessage());
            return false;
        } catch (IOException e2) {
            logger.warn("IOException creating Rabbit MQ queue: " + e2.getMessage());
            return false;
        } catch (TimeoutException e3) {
            logger.warn("TimeoutException creating Rabbit MQ queue " + e3.getMessage());
            return false;
        }
    }

    private static boolean exists(Connection connection, Checker checker) throws IOException, TimeoutException {
        try {
            Channel createChannel = connection.createChannel();
            checker.check(createChannel);
            createChannel.close();
            return true;
        } catch (IOException e) {
            if (e.getCause() instanceof ShutdownSignalException) {
                ShutdownSignalException shutdownSignalException = (ShutdownSignalException) e.getCause();
                if (!shutdownSignalException.isHardError()) {
                    Object reason = shutdownSignalException.getReason();
                    AMQP.Channel.Close close = null;
                    if (reason instanceof AMQP.Channel.Close) {
                        close = (AMQP.Channel.Close) shutdownSignalException.getReason();
                    } else if (reason instanceof AMQCommand) {
                        AMQCommand aMQCommand = (AMQCommand) reason;
                        if (aMQCommand.getMethod() instanceof AMQP.Channel.Close) {
                            close = (AMQP.Channel.Close) aMQCommand.getMethod();
                        }
                    }
                    if (close != null && close.getReplyCode() == 404) {
                        return false;
                    }
                }
            }
            throw e;
        }
    }

    public static String getExchangeName(String str) {
        return "restaurant_" + str;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleConnectionFailure(String str) {
        pause(false);
        RabbitMQDisconnectedEvent.send(this.eventBus);
    }

    private void handleConnectionReestablished() {
        RabbitMQReconnectedEvent.send(this.eventBus);
    }

    private boolean queueExists(Connection connection, final String str) throws IOException, TimeoutException {
        return str != null && exists(connection, new Checker() { // from class: com.toasttab.network.domain.rabbitmq.RabbitMQMessageConsumerImpl.1
            @Override // com.toasttab.network.domain.rabbitmq.RabbitMQMessageConsumerImpl.Checker
            public void check(Channel channel) throws IOException {
                channel.queueDeclarePassive(str);
            }
        });
    }

    private boolean rebindQueue() {
        if (this.bindings.size() <= 0) {
            return true;
        }
        try {
            for (String str : this.bindings) {
                logger.debug("Rebinding to '{}' in exchange '{}'", str, this.exchangeName);
                Channel channel = this.channel;
                if (channel != null) {
                    channel.queueBind(this.queueName, this.exchangeName, str);
                }
            }
            return true;
        } catch (ShutdownSignalException e) {
            logger.error("Error rebinding to queue", (Throwable) e);
            return false;
        } catch (IOException e2) {
            logger.error("Error rebinding to queue", (Throwable) e2);
            return false;
        }
    }

    private void stopRunThread(boolean z) {
        MessageQueueThread messageQueueThread = this.runThread;
        if (messageQueueThread != null) {
            messageQueueThread.kill();
            if (z) {
                try {
                    this.runThread.join();
                } catch (InterruptedException e) {
                    logger.error(e.getMessage(), (Throwable) e);
                }
            }
            this.runThread = null;
        }
    }

    @Override // com.toasttab.sync.MessageConsumer
    @Deprecated
    public void addListener(MessageConsumerListener messageConsumerListener) {
    }

    @Override // com.toasttab.sync.MessageConsumer
    public void bind(String str) throws MessagingException {
        this.bindings.add(str);
        try {
            checkConnectionStatus();
            this.channel.queueBind(this.queueName, this.exchangeName, str);
        } catch (ShutdownSignalException e) {
            logger.error("ShutdownSignalException binding Message Consumer: " + e.getMessage(), (Throwable) e);
            throw new MessagingException(e);
        } catch (IOException e2) {
            logger.error("IOException binding Message Consumer: " + e2.getMessage(), (Throwable) e2);
            throw new MessagingException(e2);
        }
    }

    @Override // com.toasttab.sync.MessageConsumer
    public void clearQueue() throws MessagingException {
        try {
            checkConnectionStatus();
            logger.info("clearQueue - deleting and creating queue and consumer");
            deleteQueue();
            createQueue();
            rebindQueue();
            if (this.channel != null) {
                this.consumer = new QueueingConsumer(this.channel);
                this.channel.basicConsume(this.queueName, false, this.consumer);
            }
            this.receiver.clearLocalQueue();
        } catch (ShutdownSignalException e) {
            throw new MessagingException(e);
        } catch (IOException e2) {
            throw new MessagingException(e2);
        }
    }

    @Override // com.toasttab.sync.rabbitmq.AbstractRabbitMQClient
    protected Connection createConnection() throws IOException, TimeoutException {
        try {
            return this.connectionFactory.newConnection(consumerExecutorService);
        } catch (IOException e) {
            if (e instanceof SocketTimeoutException) {
                logger.warn("Socket timed out establishing connection to Rabbit MQ");
            }
            throw e;
        }
    }

    @Override // com.toasttab.sync.rabbitmq.AbstractRabbitMQClient
    protected SSLSocketFactory decorateSocketFactory(SSLSocketFactory sSLSocketFactory) {
        return new TlsSocketFactory(sSLSocketFactory);
    }

    @Override // com.toasttab.sync.MessageConsumer
    public void deleteQueue() throws MessagingException {
        try {
            pause(false);
            this.receiver.clearLocalQueue();
            Channel channel = this.channel;
            if (channel != null) {
                channel.queueDelete(this.queueName);
            }
        } catch (ShutdownSignalException e) {
            logger.debug("deleteQueue ShutdownSignalException");
            throw new MessagingException(e);
        } catch (IOException e2) {
            if (!(e2.getCause() instanceof ShutdownSignalException)) {
                throw new MessagingException(e2);
            }
            if (!((ShutdownSignalException) e2.getCause()).isInitiatedByApplication()) {
                throw new MessagingException(e2);
            }
        }
    }

    public synchronized String getIsStartedDebugInfo() {
        if (this.runThread == null) {
            return "null-run-thread";
        }
        StringBuilder sb = new StringBuilder();
        sb.append("alive:");
        sb.append(this.runThread.isAlive());
        sb.append(",running:");
        sb.append(this.runThread.running);
        sb.append(",connection:");
        boolean z = true;
        sb.append(this.connection != null);
        sb.append(",channel:");
        if (this.channel == null) {
            z = false;
        }
        sb.append(z);
        return sb.toString();
    }

    @Override // com.toasttab.sync.MessageConsumer
    public void init(String str, String str2, MessageReceiver messageReceiver, long j) throws MessagingException {
        this.exchangeName = str;
        this.queueName = str2;
        this.receiver = messageReceiver;
        this.expirationInMilliseconds = j;
        if (!connect(false)) {
            throw new MessagingException("Cannot connect to queue");
        }
        if (this.channel.isOpen() && createQueueAndConsumer()) {
            return;
        }
        throw new MessagingException("Cannot bind to queue \"" + str2 + "\" in exchange \"" + str + "\"");
    }

    @Override // com.toasttab.sync.MessageConsumer
    public synchronized boolean isStarted() {
        boolean z;
        if (this.runThread != null && this.runThread.isAlive() && this.runThread.running && this.connection != null) {
            z = this.channel != null;
        }
        return z;
    }

    @Override // com.toasttab.sync.rabbitmq.AbstractRabbitMQClient
    protected void onConnectionLost() {
    }

    @Override // com.toasttab.sync.rabbitmq.AbstractRabbitMQClient
    public void onReconnected() throws MessagingException {
        boolean z;
        boolean z2 = false;
        if (createQueueAndConsumer()) {
            z2 = true;
            z = rebindQueue();
        } else {
            z = false;
        }
        if (z) {
            handleConnectionReestablished();
        } else {
            if (!z2) {
                throw new MessagingException("error creating queue and consumer");
            }
            throw new MessagingException("rebinding to the queue was unsuccessful");
        }
    }

    @Override // com.toasttab.sync.MessageConsumer
    public synchronized void pause(boolean z) {
        stopRunThread(z);
    }

    @Override // com.toasttab.sync.rabbitmq.AbstractRabbitMQClient, com.toasttab.network.domain.rabbitmq.RabbitMQMessageConsumer
    public boolean reconnect() {
        return super.reconnect();
    }

    @Override // com.toasttab.sync.MessageConsumer
    @Deprecated
    public void removeListener(MessageConsumerListener messageConsumerListener) {
    }

    @Override // com.toasttab.sync.rabbitmq.AbstractRabbitMQClient
    protected void scheduleReconnect() {
        handleConnectionFailure("Error connecting to Rabbit MQ - shutting down Rabbit MQ");
    }

    @Override // com.toasttab.sync.rabbitmq.AbstractRabbitMQClient, com.toasttab.sync.MessageConsumer
    public void shutdown() throws MessagingException {
        try {
            if (this.connection == null || !this.connection.isOpen()) {
                return;
            }
            this.shuttingDown = true;
            this.connection.close(10000);
            logger.info("Rabbit MQ connection closed successfully");
        } catch (ShutdownSignalException e) {
            throw new MessagingException("shutdown signal when trying to shutting down rabbitmq client", e);
        } catch (IOException e2) {
            throw new MessagingException("Error shutting down rabbitmq client", e2);
        }
    }

    @Override // com.toasttab.sync.MessageConsumer
    public synchronized void start() {
        if (this.runThread == null || !this.runThread.isAlive()) {
            try {
                logger.debug("Starting to read messages from the queue");
                this.runThread = new MessageQueueThread();
                this.runThread.start();
            } catch (Throwable unused) {
                start();
            }
        }
    }

    @Override // com.toasttab.sync.MessageConsumer
    public void unbind(String str) throws MessagingException {
        this.bindings.remove(str);
        try {
            checkConnectionStatus();
            this.channel.queueUnbind(this.queueName, this.exchangeName, str);
            logger.debug("Unbinding from '{}' in exchange '{}'", str, this.exchangeName);
        } catch (ShutdownSignalException e) {
            throw new MessagingException(e);
        } catch (IOException e2) {
            throw new MessagingException(e2);
        }
    }
}
