package com.taobao.qianniu.android.newrainbow.core.channel;

import android.net.TrafficStats;
import android.os.Process;
import com.alibaba.com.fastipc.base.Config;
import com.taobao.codetrack.sdk.util.ReportUtil;
import com.taobao.qianniu.android.newrainbow.base.biz.RainbowPacketReader;
import com.taobao.qianniu.android.newrainbow.base.config.CConfig;
import com.taobao.qianniu.android.newrainbow.base.exception.ChannelReadException;
import com.taobao.qianniu.android.newrainbow.base.util.ResizeByteBufferWrapper;
import com.taobao.qianniu.android.newrainbow.base.util.Utils;
import java.io.InputStream;
import java.net.Socket;
import java.net.SocketException;
import java.nio.ByteBuffer;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: classes5.dex */
public class EReader implements IConfigConsumer {
    private static final String TAG = "EReader";
    private CConfig cConfig;
    private volatile InputStream inputStream;
    private Observable<byte[][]> observable;
    private RainbowPacketReader packetReader = new RainbowPacketReader();
    private Socket socket;
    private volatile Subscriber<byte[][]> subscriber;

    static {
        ReportUtil.by(1067788216);
        ReportUtil.by(659668049);
    }

    public EReader(CConfig cConfig, InputStream inputStream) {
        this.cConfig = cConfig;
        this.inputStream = inputStream;
        init();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public int doRead(byte[] bArr) throws ChannelReadException {
        try {
            if (this.inputStream != null) {
                return this.inputStream.read(bArr);
            }
            throw new ChannelReadException("reader was released");
        } catch (Exception e) {
            throw new ChannelReadException(e.getMessage(), e);
        }
    }

    private void init() {
        if (Utils.DEBUG) {
            Utils.logD(TAG, "init  ");
        }
        this.observable = Observable.a((Observable.OnSubscribe) new Observable.OnSubscribe<byte[][]>() { // from class: com.taobao.qianniu.android.newrainbow.core.channel.EReader.1
            @Override // rx.functions.Action1
            public void call(Subscriber<? super byte[][]> subscriber) {
                ByteBuffer put;
                if (Utils.DEBUG) {
                    Utils.logD(EReader.TAG, "call  ");
                }
                int threadPriority = Process.getThreadPriority(Process.myTid());
                boolean trySetThreadNiceValue = EReader.this.trySetThreadNiceValue();
                TrafficStats.setThreadStatsTag(Source.trafficTag);
                try {
                    if (EReader.this.socket != null) {
                        TrafficStats.tagSocket(EReader.this.socket);
                    }
                } catch (SocketException e) {
                    e.printStackTrace();
                }
                try {
                    try {
                        byte[] bArr = new byte[512];
                        ResizeByteBufferWrapper allocate = ResizeByteBufferWrapper.allocate(Config.PEICE_MAX_SIZE);
                        while (true) {
                            int doRead = EReader.this.doRead(bArr);
                            if (doRead > 0) {
                                put = allocate.put(bArr, 0, doRead);
                                put.flip();
                                while (true) {
                                    Object nextPacket = EReader.this.packetReader.nextPacket(put);
                                    if (nextPacket != null) {
                                        if (EReader.this.inputStream == null) {
                                            throw new ChannelReadException("reader was released");
                                        }
                                        subscriber.onNext(nextPacket);
                                    }
                                }
                            } else {
                                if (Utils.DEBUG) {
                                    Utils.logE(EReader.TAG, "end  ,cause read failed");
                                }
                                if (EReader.this.inputStream != null) {
                                    subscriber.onError(new ChannelReadException("channel broken"));
                                }
                                if (!trySetThreadNiceValue) {
                                    return;
                                }
                            }
                            put.compact();
                        }
                    } catch (Throwable th) {
                        if (EReader.this.inputStream != null) {
                            subscriber.onError(0 == 0 ? new ChannelReadException("channel broken") : null);
                        }
                        if (trySetThreadNiceValue) {
                            Process.setThreadPriority(threadPriority);
                        }
                        throw th;
                    }
                } catch (Throwable th2) {
                    Throwable channelReadException = th2 instanceof ChannelReadException ? th2 : new ChannelReadException(th2.getMessage(), th2);
                    if (Utils.DEBUG) {
                        Utils.logE(EReader.TAG, "end  ,cause " + th2.getMessage());
                    }
                    if (EReader.this.inputStream != null) {
                        if (channelReadException == null) {
                            channelReadException = new ChannelReadException("channel broken");
                        }
                        subscriber.onError(channelReadException);
                    }
                    if (!trySetThreadNiceValue) {
                        return;
                    }
                }
                Process.setThreadPriority(threadPriority);
            }
        }).c(Schedulers.m()).a(Schedulers.l());
        this.observable.b((Subscriber<? super byte[][]>) new Subscriber<byte[][]>() { // from class: com.taobao.qianniu.android.newrainbow.core.channel.EReader.2
            @Override // rx.Observer
            public void onCompleted() {
                Subscriber subscriber;
                if (EReader.this.inputStream == null || (subscriber = EReader.this.subscriber) == null || subscriber.isUnsubscribed()) {
                    return;
                }
                subscriber.onCompleted();
            }

            @Override // rx.Observer
            public void onError(Throwable th) {
                Subscriber subscriber;
                if (EReader.this.inputStream == null || (subscriber = EReader.this.subscriber) == null || subscriber.isUnsubscribed()) {
                    return;
                }
                subscriber.onError(th);
            }

            @Override // rx.Observer
            public void onNext(byte[][] bArr) {
                Subscriber subscriber;
                if (EReader.this.inputStream == null || (subscriber = EReader.this.subscriber) == null || subscriber.isUnsubscribed()) {
                    return;
                }
                if (Utils.DEBUG) {
                    Utils.logD(EReader.TAG, "read packet,header " + bArr[0].length + " body " + bArr[1].length);
                }
                subscriber.onNext(bArr);
            }

            @Override // rx.Subscriber
            public void onStart() {
                Subscriber subscriber;
                if (EReader.this.inputStream == null || (subscriber = EReader.this.subscriber) == null || subscriber.isUnsubscribed()) {
                    return;
                }
                subscriber.onStart();
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean trySetThreadNiceValue() {
        int rainbowChannelReaderThreadNiceValue;
        CConfig cConfig = this.cConfig;
        if (cConfig == null || (rainbowChannelReaderThreadNiceValue = cConfig.getRainbowChannelReaderThreadNiceValue()) == Process.getThreadPriority(Process.myTid()) || rainbowChannelReaderThreadNiceValue < -20 || rainbowChannelReaderThreadNiceValue > 19) {
            return false;
        }
        Process.setThreadPriority(rainbowChannelReaderThreadNiceValue);
        return true;
    }

    @Override // com.taobao.qianniu.android.newrainbow.core.channel.IConfigConsumer
    public boolean consume(int i, CConfig cConfig) {
        return (this.cConfig.updateFrom(i, cConfig) & 64) == 0;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void release() {
        if (Utils.DEBUG) {
            Utils.logD(TAG, "release  ");
        }
        this.observable = null;
        this.inputStream = null;
    }

    public void setSocket(Socket socket) {
        this.socket = socket;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setSubscriber(Subscriber<byte[][]> subscriber) {
        this.subscriber = subscriber;
    }
}
