package com.techcenter.msgqueue;

import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/techcenter/msgqueue/GroupActiveMutliThreadMsgDeal.class */
public abstract class GroupActiveMutliThreadMsgDeal<M, T> extends SimpleMultiThreadMsgChain<T> implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(GroupActiveMutliThreadMsgDeal.class);
    private IMsgQueue<M> receiveQueue;
    private int groupSize = 100;
    private long maxReceiveTime = 0;
    private long noDataSleepTime = 2000;
    private boolean runFlag = true;
    private byte[] lock = new byte[0];

    @Override // com.techcenter.msgqueue.SimpleMultiThreadMsgChain
    public void init() {
        this.runFlag = true;
        super.init();
        new Thread(this).start();
    }

    @Override // com.techcenter.msgqueue.SimpleMultiThreadMsgChain
    public void destroy() {
        this.runFlag = false;
        try {
            synchronized (this.lock) {
                this.lock.notify();
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
        super.destroy();
    }

    @Override // java.lang.Runnable
    public void run() {
        logger.info("主动从队列获取消息线程开始");
        while (this.runFlag) {
            try {
                List<M> activeReceive = activeReceive();
                if (activeReceive == null || activeReceive.size() == 0) {
                    doWait(this.noDataSleepTime);
                } else {
                    if (logger.isDebugEnabled()) {
                        logger.debug("从队列中批量接收到{}个数据：{}", Integer.valueOf(activeReceive.size()), activeReceive.get(0));
                    }
                    chainExecute(convert2ContextMsg(activeReceive));
                }
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        }
        logger.info("主动从队列获取消息线程结束");
    }

    private List<M> activeReceive() {
        M receive;
        ArrayList arrayList = new ArrayList();
        long currentTimeMillis = System.currentTimeMillis();
        for (int i = 0; i < this.groupSize && (receive = this.receiveQueue.receive()) != null; i++) {
            arrayList.add(receive);
            if (this.maxReceiveTime >= 0 && System.currentTimeMillis() - currentTimeMillis > this.maxReceiveTime) {
                break;
            }
        }
        return arrayList;
    }

    private void doWait(long j) {
        try {
            synchronized (this.lock) {
                this.lock.wait(j);
            }
        } catch (Exception e) {
        }
    }

    public abstract T convert2ContextMsg(List<M> list);

    public void setReceiveQueue(IMsgQueue<M> iMsgQueue) {
        this.receiveQueue = iMsgQueue;
    }

    public void setGroupSize(int i) {
        this.groupSize = i;
    }

    public void setMaxReceiveTime(long j) {
        this.maxReceiveTime = j;
    }

    public void setNoDataSleepTime(long j) {
        this.noDataSleepTime = j;
    }
}
