/*
 * Decompiled with CFR 0.152.
 */
package com.hummer.im._internals.mq;

import android.content.SharedPreferences;
import com.hummer.im.Error;
import com.hummer.im.HMR;
import com.hummer.im._internals.HMRContext;
import com.hummer.im._internals.Objects;
import com.hummer.im._internals.PrefStorage;
import com.hummer.im._internals.log.Log;
import com.hummer.im._internals.log.trace.Trace;
import com.hummer.im._internals.mq.RPCFetchPrivateMaxSeqId;
import com.hummer.im._internals.mq.RPCFetchPrivateSeqId;
import com.hummer.im._internals.mq.RPCFetchSharedMaxSeqId;
import com.hummer.im._internals.mq.RPCFetchSharedSeqId;
import com.hummer.im._internals.mq.RPCPullPrivateMessages;
import com.hummer.im._internals.mq.RPCPullSharedMessages;
import com.hummer.im._internals.mq.RPCReportPrivateSeqId;
import com.hummer.im._internals.mq.RPCReportSharedSeqId;
import com.hummer.im._internals.proto.Im;
import com.hummer.im._internals.proto.Push;
import com.hummer.im._internals.services.mq.RPCPullingResponse;
import com.hummer.im._internals.services.mq.Statistics;
import com.hummer.im.model.completion.CompletionUtils;
import com.hummer.im.model.completion.OnFailure;
import com.hummer.im.model.completion.OnSuccessArg;
import com.hummer.im.model.completion.RichCompletionArg;
import com.hummer.im.service.Channel;
import com.hummer.im.service.MQService;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Locale;

public final class Source
implements MQService.Source {
    private final Mode mode;
    private Long seqId;
    private Long lastPullAt;
    private boolean isDraining;
    private boolean sourceChanged;
    private static final long TimeoutDuration = 60000L;
    private static final long DEFAULT_PULLING_PERIOD = 900000L;

    public Source(Mode mode) {
        this.mode = mode;
    }

    public Mode getMode() {
        return this.mode;
    }

    @Override
    public void onTimerPulse(final MQService.MessagesDispatcher messagesDispatcher) {
        HMRContext.work.async("Source::onTimerPulse:" + this.mode.sourceName(), new Runnable(){

            @Override
            public void run() {
                if (Source.this.isOverdue(60000L)) {
                    if (Source.this.isDraining) {
                        Statistics.report(Statistics.Codes.NotResetIsDraining, new Statistics.Fields());
                    }
                    Source.this.isDraining = false;
                }
                Source.this.drainMessagesIfNeeded(messagesDispatcher);
            }
        });
    }

    @Override
    public void onManualPullingRequest(final MQService.MessagesDispatcher messagesDispatcher) {
        HMRContext.work.async("Source::onManualPullingRequest", new Runnable(){

            @Override
            public void run() {
                Source.this.drainMessagesIfNeeded(messagesDispatcher);
            }
        });
    }

    @Override
    public void onNetworkReconnected(final MQService.MessagesDispatcher messagesDispatcher) {
        HMRContext.work.async("Source::onNetworkReconnected", new Runnable(){

            @Override
            public void run() {
                Source.this.lastPullAt = null;
                Source.this.drainMessagesIfNeeded(messagesDispatcher);
            }
        });
    }

    @Override
    public void start(final MQService.MessagesDispatcher messagesDispatcher) {
        HMRContext.work.async("Source::start", new Runnable(){

            @Override
            public void run() {
                Source.this.performStarting(messagesDispatcher);
            }
        });
    }

    @Override
    public void stop() {
        HMRContext.work.async("Source::stop", new Runnable(){

            @Override
            public void run() {
                Source.this.performStopping();
            }
        });
    }

    public String toString() {
        return this.mode.sourceName();
    }

    private void performStarting(final MQService.MessagesDispatcher messagesDispatcher) {
        Log.i(this.mode.sourceName(), Trace.once().method("performStarting"));
        this.isDraining = false;
        this.lastPullAt = null;
        this.mode.start(new Mode.NotifyHandler(){

            @Override
            public void onNotify(final long l, final Im.Msg msg, final long l2) {
                HMRContext.work.async("", new Runnable(){

                    @Override
                    public void run() {
                        if (msg == null) {
                            Source.this.sourceChanged = true;
                            Source.this.drainMessagesIfNeeded(messagesDispatcher);
                            return;
                        }
                        Log.i(Source.this.mode.sourceName(), Trace.once().method("onNotify").info("prevSeqId", l).info("seq", l2).info("draining", Source.this.isDraining).info("localSeqId", Source.this.seqId));
                        if (Source.this.seqId == null || Source.this.isDraining || l == 0L) {
                            return;
                        }
                        if (l == Source.this.seqId) {
                            Log.i(Source.this.mode.sourceName(), Trace.once().method("onNotify: Accept msg"));
                            ArrayList<Im.Msg> arrayList = new ArrayList<Im.Msg>();
                            arrayList.add(msg);
                            messagesDispatcher.dispatch(arrayList, Source.this);
                            Source.this.lastPullAt = System.currentTimeMillis();
                            Source.this.setSeqId(l2, true);
                        } else {
                            Log.i(Source.this.mode.sourceName(), Trace.once().method("onNotify: Pull missing msgs"));
                            Source.this.sourceChanged = true;
                            Source.this.drainMessagesIfNeeded(messagesDispatcher);
                        }
                    }
                });
            }
        });
        this.mode.loadSeqId(this.getStrategy(this.mode), new RichCompletionArg<Long>("Source::loadSeqId:" + this.mode.sourceName()).onSuccess(new OnSuccessArg<Long>(){

            @Override
            public void onSuccess(final Long l) {
                if (l == null) {
                    Log.e("Source", Trace.once().method("performStarting").msg("loadedSeqId is <null>"));
                }
                HMRContext.work.async(Source.this.mode.sourceName() + "::loadSeqIdSuccess", new Runnable(){

                    @Override
                    public void run() {
                        Source.this.setSeqId(l, Source.this.seqId == null);
                        Source.this.drainMessagesIfNeeded(messagesDispatcher);
                    }
                });
            }
        }).onFailure(new OnFailure(){

            @Override
            public void onFailure(Error error) {
                HMRContext.work.async(Source.this.mode.sourceName() + "::loadSeqIdFailure", new Runnable(){

                    @Override
                    public void run() {
                        Source.this.setSeqId(0L, false);
                        Source.this.drainMessagesIfNeeded(messagesDispatcher);
                    }
                });
            }
        }));
    }

    private MQService.FetchStrategy getStrategy(Mode mode) {
        if (mode.getStrategy() != null) {
            return mode.getStrategy();
        }
        if (HMR.getService(MQService.class).getFetchStrategy() != null) {
            return HMR.getService(MQService.class).getFetchStrategy();
        }
        return MQService.FetchStrategy.Continuously;
    }

    private void performStopping() {
        Log.i(this.mode.sourceName(), Trace.once().method("performStopping"));
        this.mode.stop();
        this.isDraining = false;
        this.lastPullAt = null;
        this.seqId = null;
    }

    private boolean isOverdue(long l) {
        long l2 = l / 30L;
        long l3 = System.currentTimeMillis();
        return this.lastPullAt == null || l3 - this.lastPullAt >= l - l2;
    }

    private void drainMessagesIfNeeded(MQService.MessagesDispatcher messagesDispatcher) {
        boolean bl;
        boolean bl2 = bl = this.seqId == null || this.isDraining || !this.sourceChanged && !this.isOverdue(this.mode.getPullingPeriod());
        if (bl) {
            return;
        }
        Log.i(this.mode.sourceName(), Trace.once().method("drainMessagesIfNeeded").info("seqId", this.seqId).info("changed", this.sourceChanged).info("draining", this.isDraining).info("overdue", this.isOverdue(this.mode.getPullingPeriod())));
        this.sourceChanged = false;
        this.isDraining = true;
        this.drainMessages(messagesDispatcher, this.seqId, this.lastPullAt == null, new Runnable(){

            @Override
            public void run() {
                Source.this.isDraining = false;
            }
        });
    }

    private void drainMessages(final MQService.MessagesDispatcher messagesDispatcher, final long l, final boolean bl, final Runnable runnable) {
        Log.i(this.mode.sourceName(), Trace.once().method("drainMessages").info("fromSeqId", this.seqId).info("isFirstDrain", bl));
        HMR.getService(Channel.class).run(this.mode.createPullingRequest(l, bl, new RichCompletionArg<RPCPullingResponse>("Source::createPullingRequest:" + this.mode.sourceName()).onSuccess(new OnSuccessArg<RPCPullingResponse>(){

            @Override
            public void onSuccess(RPCPullingResponse rPCPullingResponse) {
                if (rPCPullingResponse.messages.size() > 0) {
                    try {
                        messagesDispatcher.dispatch(rPCPullingResponse.messages, Source.this);
                    }
                    catch (Throwable throwable) {
                        Log.e(Source.this.mode.sourceName(), Trace.once().method("handleDrainingSuccess").msg("Exception while dispatching messages").info("exception", throwable.getLocalizedMessage()));
                        Statistics.report(Statistics.Codes.ExceptionalDispatch, new Statistics.Fields(){
                            {
                                StringWriter stringWriter = new StringWriter();
                                throwable.printStackTrace(new PrintWriter(stringWriter));
                                this.errInfo = stringWriter.toString();
                            }
                        });
                    }
                }
                Source.this.lastPullAt = System.currentTimeMillis();
                Log.i(Source.this.mode.sourceName(), Trace.once().method("handleDrainingSuccess").msg("lastPullAt -> " + Source.this.lastPullAt));
                if (rPCPullingResponse.maxSeqId != null) {
                    if (rPCPullingResponse.hasMore) {
                        Source.this.drainMessages(messagesDispatcher, rPCPullingResponse.maxSeqId, bl, runnable);
                    } else {
                        HMRContext.work.async("Source::RPCPullingResponse:EOF", runnable);
                    }
                    Source.this.setSeqId(rPCPullingResponse.maxSeqId, true);
                } else if (!rPCPullingResponse.hasMore) {
                    HMRContext.work.async("Source::RPCPullingResponse:noMore", runnable);
                } else {
                    Log.e(Source.this.mode.sourceName(), Trace.once().method("handleDrainingSuccess").msg("BUGGY!! \u672a\u77e5\u573a\u666f\uff0chasMore\u4e3aTrue, \u4f46maxSeqId\u4e3anull\uff0c\u65e0\u6cd5\u786e\u5b9a\u4e0b\u6b21\u62c9\u53d6\u4ece\u54ea\u5f00\u59cb"));
                    HMRContext.work.async("Source::RPCPullingResponse:BUGGY", runnable);
                    Statistics.report(Statistics.Codes.ImpossibleScene, new Statistics.Fields(){
                        {
                            this.errInfo = String.format(Locale.US, "hasMore\u4e3aTrue, \u4f46maxSeqId\u4e3anull\uff0c\u65e0\u6cd5\u786e\u5b9a\u4e0b\u6b21\u62c9\u53d6\u4ece\u54ea\u5f00\u59cb\u3002fromSeqId: %d", l);
                        }
                    });
                }
            }
        }).onFailure(new OnFailure(){

            @Override
            public void onFailure(Error error) {
                HMRContext.work.async("Source::RPCPullingResponse:failure", runnable);
            }
        })));
    }

    private void setSeqId(long l, boolean bl) {
        if (this.seqId != null && this.seqId == l) {
            return;
        }
        Log.i(this.mode.sourceName(), Trace.once().method("setSeqId").msg("%d -> %d", this.seqId, l));
        if (bl) {
            this.mode.storeSeqId(l);
        }
        this.seqId = l;
    }

    public boolean equals(Object object) {
        if (this == object) {
            return true;
        }
        if (object == null || this.getClass() != object.getClass()) {
            return false;
        }
        Source source = (Source)object;
        return this.mode.equals(source.mode);
    }

    public int hashCode() {
        return this.mode.hashCode();
    }

    public static class Shared
    implements Mode {
        private static final String PrefKeySeqID = "_group_sys_seqid";
        private final long groupId;
        private final String topic;
        private Channel.NotificationHandler sharedHandler;
        private MQService.FetchStrategy strategy;
        private long pullingPeriod;

        public Shared(long l, String string) {
            this(l, string, null);
        }

        public Shared(long l, String string, MQService.FetchStrategy fetchStrategy) {
            this(l, string, fetchStrategy, 900000L);
        }

        public Shared(long l, String string, MQService.FetchStrategy fetchStrategy, long l2) {
            this.groupId = l;
            this.topic = string == null ? "" : string;
            this.strategy = fetchStrategy;
            this.pullingPeriod = l2 < 1L ? 900000L : l2;
        }

        @Override
        public MQService.FetchStrategy getStrategy() {
            return this.strategy;
        }

        @Override
        public long getPullingPeriod() {
            return this.pullingPeriod;
        }

        @Override
        public String sourceName() {
            return "SharedSource(" + this.groupId + "," + this.topic + ")";
        }

        @Override
        public String topicName() {
            return this.topic;
        }

        @Override
        public void loadSeqId(MQService.FetchStrategy fetchStrategy, RichCompletionArg<Long> richCompletionArg) {
            Long l = null;
            if (fetchStrategy == MQService.FetchStrategy.Continuously) {
                l = PrefStorage.storage().execute(new PrefStorage.Query<Long>(){

                    @Override
                    public Long run(SharedPreferences sharedPreferences) {
                        if (sharedPreferences == null) {
                            return null;
                        }
                        long l = sharedPreferences.getLong(Shared.this.prefKey(), -1L);
                        if (l == -1L) {
                            return null;
                        }
                        return l;
                    }
                });
            } else if (fetchStrategy == MQService.FetchStrategy.ReloadHistories) {
                l = 0L;
            }
            Log.i(this.sourceName(), Trace.once().method("loadSeqId").info("seqId", l));
            if (l == null) {
                if (fetchStrategy == MQService.FetchStrategy.IgnoreBefore) {
                    this.fetchMaxSeqId(3, this.topic, richCompletionArg);
                } else {
                    this.fetchSeqId(3, this.topic, richCompletionArg);
                }
            } else {
                CompletionUtils.dispatchSuccess(richCompletionArg, l);
            }
        }

        private void fetchSeqId(final int n, final String string, final RichCompletionArg<Long> richCompletionArg) {
            Log.i(this.sourceName(), Trace.once().method("Shared fetchSeqId"));
            if (n <= 0) {
                CompletionUtils.dispatchFailure(richCompletionArg, new Error(1005, "Shared Failed fetching seqId: " + string));
                return;
            }
            HMR.getService(Channel.class).run(new RPCFetchSharedSeqId(string, this.groupId, new RichCompletionArg<Long>("Source::RPCFetchSharedSeqId" + string).onSuccess(new OnSuccessArg<Long>(){

                @Override
                public void onSuccess(Long l) {
                    CompletionUtils.dispatchSuccess(richCompletionArg, l);
                }
            }).onFailure(new OnFailure(){

                @Override
                public void onFailure(Error error) {
                    if (error != null && error.code != 1005) {
                        CompletionUtils.dispatchFailure(richCompletionArg, error);
                    } else {
                        HMRContext.work.asyncAfter("Source::repeatFetchSeqId", 3000, new Runnable(){

                            @Override
                            public void run() {
                                Shared.this.fetchSeqId(n - 1, string, richCompletionArg);
                            }
                        });
                    }
                }
            })));
        }

        private void fetchMaxSeqId(final int n, final String string, final RichCompletionArg<Long> richCompletionArg) {
            Log.i(this.sourceName(), Trace.once().method("Shared fetchMaxSeqId"));
            if (n <= 0) {
                CompletionUtils.dispatchFailure(richCompletionArg, new Error(1005, "Shared Failed fetching maxSeqId: " + string));
                return;
            }
            HMR.getService(Channel.class).run(new RPCFetchSharedMaxSeqId(string, this.groupId, new RichCompletionArg<Long>("Source::RPCFetchSharedMaxSeqId" + string).onSuccess(new OnSuccessArg<Long>(){

                @Override
                public void onSuccess(Long l) {
                    CompletionUtils.dispatchSuccess(richCompletionArg, l);
                }
            }).onFailure(new OnFailure(){

                @Override
                public void onFailure(Error error) {
                    if (error != null && error.code != 1005) {
                        CompletionUtils.dispatchFailure(richCompletionArg, error);
                    } else {
                        HMRContext.work.asyncAfter("Source::repeatFetchMaxSeqId", 3000, new Runnable(){

                            @Override
                            public void run() {
                                Shared.this.fetchSeqId(n - 1, string, richCompletionArg);
                            }
                        });
                    }
                }
            })));
        }

        @Override
        public void storeSeqId(final long l) {
            PrefStorage.storage().execute(new PrefStorage.Edit(){

                @Override
                public void run(SharedPreferences.Editor editor) {
                    editor.putLong(Shared.this.prefKey(), l);
                }
            });
            HMR.getService(Channel.class).run(new RPCReportSharedSeqId(this.topic, this.groupId, l, null));
        }

        @Override
        public Channel.RPC createPullingRequest(long l, boolean bl, RichCompletionArg<RPCPullingResponse> richCompletionArg) {
            return new RPCPullSharedMessages(this.groupId, this.topic, l, 200, bl, richCompletionArg);
        }

        @Override
        public void start(final Mode.NotifyHandler notifyHandler) {
            Log.i(this.sourceName(), Trace.once().method("start"));
            this.sharedHandler = new Channel.NotificationHandler(){

                @Override
                public void onNotify(String string, String string2, final byte[] byArray) {
                    String string3 = "service_api_gateway/cim.proto.PushService.IMPushGroupSysMsg";
                    String string4 = string + '/' + string2;
                    if (!string3.equals(string4)) {
                        return;
                    }
                    HMRContext.work.async("SharedSource::onNotify", new Runnable(){

                        @Override
                        public void run() {
                            this.handleNotify(byArray);
                        }
                    });
                }

                public String toString() {
                    return Shared.this.sourceName();
                }

                private void handleNotify(byte[] byArray) {
                    Push.IMPushGroupSysMsgRequest iMPushGroupSysMsgRequest;
                    try {
                        iMPushGroupSysMsgRequest = (Push.IMPushGroupSysMsgRequest)((Push.IMPushGroupSysMsgRequest.Builder)Push.IMPushGroupSysMsgRequest.newBuilder().mergeFrom(byArray)).build();
                    }
                    catch (Throwable throwable) {
                        Log.e(Shared.this.sourceName(), Trace.once("Failed parsing IMPushGroupSysMsgRequest").info("Exception", throwable));
                        return;
                    }
                    if (iMPushGroupSysMsgRequest.getEnvName() != null && !iMPushGroupSysMsgRequest.getEnvName().isEmpty()) {
                        if (HMRContext.region == null) {
                            Log.w(Shared.this.sourceName(), Trace.once().method("onNotify").msg("localEnv is null"));
                            return;
                        }
                        HMRContext.Region region = HMRContext.Region.make(iMPushGroupSysMsgRequest.getRegion() + "/" + iMPushGroupSysMsgRequest.getEnvType() + "/" + iMPushGroupSysMsgRequest.getEnvName());
                        String string = HMRContext.region.toString();
                        String string2 = region.toString();
                        if (!Objects.equals(string2, string)) {
                            Log.i(Shared.this.sourceName(), Trace.once().method("onNotify").msg("Ignored notify of different env").info("localEnv", string).info("notifyEnv", string2));
                            return;
                        }
                    }
                    if (Objects.equals(Shared.this.topic, iMPushGroupSysMsgRequest.getTopic()) && Objects.equals(Shared.this.groupId, iMPushGroupSysMsgRequest.getGroupId())) {
                        Log.i(Shared.this.sourceName(), Trace.once().method("onSharedSourceChanged").info("topic", iMPushGroupSysMsgRequest.getTopic()).info("groupId", iMPushGroupSysMsgRequest.getGroupId()).info("seqId", iMPushGroupSysMsgRequest.getSeqId()));
                        notifyHandler.onNotify(iMPushGroupSysMsgRequest.getPrevSeqId(), iMPushGroupSysMsgRequest.getMsg(), iMPushGroupSysMsgRequest.getSeqId());
                    }
                }
            };
            HMR.getService(Channel.class).addNotificationHandler(this.sharedHandler);
            HMR.getService(Channel.class).subscribeGroupcast(this.getGroup(), null);
        }

        @Override
        public void stop() {
            Log.i(this.sourceName(), Trace.once().method("stop"));
            HMRContext.work.async("SharedSource::stop", new Runnable(){

                @Override
                public void run() {
                    HMR.getService(Channel.class).removeNotificationHandler(Shared.this.sharedHandler);
                    HMR.getService(Channel.class).unSubscribeGroupcast(Shared.this.getGroup(), null);
                }
            });
        }

        private String getGroup() {
            return String.format(Locale.US, "hummer:%d:%s:%d", HMRContext.getAppId(), this.topic, this.groupId);
        }

        private String prefKey() {
            if (this.topic.isEmpty()) {
                return this.groupId + PrefKeySeqID;
            }
            return this.groupId + "_" + this.topic + PrefKeySeqID;
        }

        public boolean equals(Object object) {
            if (this == object) {
                return true;
            }
            if (object == null || this.getClass() != object.getClass()) {
                return false;
            }
            Shared shared = (Shared)object;
            return this.groupId == shared.groupId && this.topic.equals(shared.topic);
        }

        public int hashCode() {
            return this.topic.hashCode() ^ (int)this.groupId;
        }
    }

    public static class Private
    implements Mode {
        private static final String StorageKeySeqIDPrefix = "local_sequence_id";
        private final String topic;
        private Channel.NotificationHandler changeHandler;
        private MQService.FetchStrategy strategy;
        private long pullingPeriod;

        public Private(String string) {
            this(string, null);
        }

        public Private(String string, MQService.FetchStrategy fetchStrategy) {
            this(string, fetchStrategy, 900000L);
        }

        public Private(String string, MQService.FetchStrategy fetchStrategy, long l) {
            this.topic = string == null ? "" : string;
            this.strategy = fetchStrategy;
            this.pullingPeriod = l < 1L ? 900000L : l;
        }

        @Override
        public MQService.FetchStrategy getStrategy() {
            return this.strategy;
        }

        @Override
        public long getPullingPeriod() {
            return this.pullingPeriod;
        }

        @Override
        public String sourceName() {
            return "PrivateSource(" + this.topic + ")";
        }

        @Override
        public String topicName() {
            return this.topic;
        }

        @Override
        public void loadSeqId(MQService.FetchStrategy fetchStrategy, RichCompletionArg<Long> richCompletionArg) {
            Long l = null;
            if (fetchStrategy == MQService.FetchStrategy.Continuously) {
                l = PrefStorage.storage().execute(new PrefStorage.Query<Long>(){

                    @Override
                    public Long run(SharedPreferences sharedPreferences) {
                        if (sharedPreferences == null) {
                            return null;
                        }
                        long l = sharedPreferences.getLong(Private.this.prefKey(), -1L);
                        if (l == -1L) {
                            return null;
                        }
                        return l;
                    }
                });
            } else if (fetchStrategy == MQService.FetchStrategy.ReloadHistories) {
                l = 0L;
            }
            Log.i(this.sourceName(), Trace.once().method("loadSeqId").info("seqId", l));
            if (l == null) {
                if (fetchStrategy == MQService.FetchStrategy.IgnoreBefore) {
                    this.fetchMaxSeqId(3, this.topic, richCompletionArg);
                } else {
                    this.fetchSeqId(3, this.topic, richCompletionArg);
                }
            } else {
                CompletionUtils.dispatchSuccess(richCompletionArg, l);
            }
        }

        @Override
        public void storeSeqId(final long l) {
            PrefStorage.storage().execute(new PrefStorage.Edit(){

                @Override
                public void run(SharedPreferences.Editor editor) {
                    editor.putLong(Private.this.prefKey(), l);
                }
            });
            HMR.getService(Channel.class).run(new RPCReportPrivateSeqId(this.topic, l, null));
        }

        @Override
        public Channel.RPC createPullingRequest(long l, boolean bl, RichCompletionArg<RPCPullingResponse> richCompletionArg) {
            return new RPCPullPrivateMessages(this.topic, bl, l, 200, null, richCompletionArg);
        }

        @Override
        public void start(final Mode.NotifyHandler notifyHandler) {
            Log.i(this.sourceName(), Trace.once().method("start"));
            this.changeHandler = new Channel.NotificationHandler(){

                @Override
                public void onNotify(String string, String string2, final byte[] byArray) {
                    String string3 = "service_api_gateway/cim.proto.PushService.IMPushMsg";
                    String string4 = string + '/' + string2;
                    if (!string3.equals(string4)) {
                        return;
                    }
                    HMRContext.work.async("Source::onNotify", new Runnable(){

                        @Override
                        public void run() {
                            this.handleNotify(byArray);
                        }
                    });
                }

                public String toString() {
                    return Private.this.sourceName();
                }

                private void handleNotify(byte[] byArray) {
                    try {
                        Push.IMPushMsgRequest iMPushMsgRequest = (Push.IMPushMsgRequest)((Push.IMPushMsgRequest.Builder)Push.IMPushMsgRequest.newBuilder().mergeFrom(byArray)).build();
                        if (iMPushMsgRequest.getEnvName() != null && !iMPushMsgRequest.getEnvName().isEmpty()) {
                            if (HMRContext.region == null) {
                                Log.w(Private.this.sourceName(), Trace.once().method("onNotify").msg("localEnv is null"));
                                return;
                            }
                            HMRContext.Region region = HMRContext.Region.make(iMPushMsgRequest.getRegion() + "/" + iMPushMsgRequest.getEnvType() + "/" + iMPushMsgRequest.getEnvName());
                            String string = HMRContext.region.toString();
                            String string2 = region.toString();
                            if (!Objects.equals(string2, string)) {
                                Log.i(Private.this.sourceName(), Trace.once().method("onNotify").msg("Ignored notify of different env").info("localEnv", string).info("remoteEnv", string2));
                                return;
                            }
                        }
                        if (Objects.equals(Private.this.topic, iMPushMsgRequest.getTopic())) {
                            Log.i(Private.this.sourceName(), Trace.once().method("onPrivateSourceChanged").info("topic", iMPushMsgRequest.getTopic()).info("seqId", iMPushMsgRequest.getSeqId()));
                            notifyHandler.onNotify(0L, null, iMPushMsgRequest.getSeqId());
                        }
                    }
                    catch (Throwable throwable) {
                        Log.e(Private.this.sourceName(), Trace.once("Failed parsing IMPushMsgRequest").info("Exception", throwable));
                    }
                }
            };
            HMR.getService(Channel.class).addNotificationHandler(this.changeHandler);
        }

        @Override
        public void stop() {
            Log.i(this.sourceName(), Trace.once().method("stop"));
            HMRContext.work.async("Source::stop", new Runnable(){

                @Override
                public void run() {
                    HMR.getService(Channel.class).removeNotificationHandler(Private.this.changeHandler);
                }
            });
        }

        private void fetchSeqId(final int n, final String string, final RichCompletionArg<Long> richCompletionArg) {
            Log.i(this.sourceName(), Trace.once().method("fetchSeqId"));
            if (n <= 0) {
                CompletionUtils.dispatchFailure(richCompletionArg, new Error(1005, "Failed fetching seqId: " + string));
                return;
            }
            HMR.getService(Channel.class).run(new RPCFetchPrivateSeqId(string, new RichCompletionArg<Long>("Source::RPCFetchPrivateSeqId:" + string).onSuccess(new OnSuccessArg<Long>(){

                @Override
                public void onSuccess(Long l) {
                    CompletionUtils.dispatchSuccess(richCompletionArg, l);
                }
            }).onFailure(new OnFailure(){

                @Override
                public void onFailure(Error error) {
                    if (error != null && error.code != 1005) {
                        CompletionUtils.dispatchFailure(richCompletionArg, error);
                    } else {
                        HMRContext.work.asyncAfter("Source::repeatFetch", 3000, new Runnable(){

                            @Override
                            public void run() {
                                Private.this.fetchSeqId(n - 1, string, richCompletionArg);
                            }
                        });
                    }
                }
            })));
        }

        private void fetchMaxSeqId(final int n, final String string, final RichCompletionArg<Long> richCompletionArg) {
            Log.i(this.sourceName(), Trace.once().method("fetchMaxSeqId"));
            if (n <= 0) {
                CompletionUtils.dispatchFailure(richCompletionArg, new Error(1005, "Failed fetching seqId: " + string));
                return;
            }
            HMR.getService(Channel.class).run(new RPCFetchPrivateMaxSeqId(string, new RichCompletionArg<Long>("Source::RPCFetchPrivateMaxSeqId:" + string).onSuccess(new OnSuccessArg<Long>(){

                @Override
                public void onSuccess(Long l) {
                    CompletionUtils.dispatchSuccess(richCompletionArg, l);
                }
            }).onFailure(new OnFailure(){

                @Override
                public void onFailure(Error error) {
                    if (error != null && error.code != 1005) {
                        CompletionUtils.dispatchFailure(richCompletionArg, error);
                    } else {
                        HMRContext.work.asyncAfter("Source::repeatFetch", 3000, new Runnable(){

                            @Override
                            public void run() {
                                Private.this.fetchSeqId(n - 1, string, richCompletionArg);
                            }
                        });
                    }
                }
            })));
        }

        private String prefKey() {
            String string = StorageKeySeqIDPrefix;
            if (!this.topic.isEmpty()) {
                string = string + "_" + this.topic;
            }
            return string;
        }

        public boolean equals(Object object) {
            if (this == object) {
                return true;
            }
            if (object == null || this.getClass() != object.getClass()) {
                return false;
            }
            Private private_ = (Private)object;
            return this.topic.equals(private_.topic);
        }

        public int hashCode() {
            return this.topic.hashCode();
        }
    }

    public static interface Mode {
        public String sourceName();

        public String topicName();

        public MQService.FetchStrategy getStrategy();

        public long getPullingPeriod();

        public void loadSeqId(MQService.FetchStrategy var1, RichCompletionArg<Long> var2);

        public void storeSeqId(long var1);

        public Channel.RPC createPullingRequest(long var1, boolean var3, RichCompletionArg<RPCPullingResponse> var4);

        public void start(NotifyHandler var1);

        public void stop();

        public static interface NotifyHandler {
            public void onNotify(long var1, Im.Msg var3, long var4);
        }
    }
}

