BigFileReader.java

package edu.jiangxin.apktoolbox.file.password.recovery.category.dictionary.multithread;

import edu.jiangxin.apktoolbox.file.core.EncoderDetector;
import edu.jiangxin.apktoolbox.file.password.recovery.RecoveryPanel;
import edu.jiangxin.apktoolbox.file.password.recovery.State;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.*;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class BigFileReader {
    private static final Logger logger = LogManager.getLogger(BigFileReader.class.getSimpleName());

    private static final int DEFAULT_BUFFER_SIZE = 1024 * 1024;

    private static final int PROCESSOR_COUNT = Runtime.getRuntime().availableProcessors();

    private final String charset;
    private final int bufferSize;
    private final ScheduledThreadPoolExecutor executorService;
    private final long fileLength;
    private RandomAccessFile rAccessFile;
    private final Set<StartEndPair> startEndPairs;
    private CyclicBarrier cyclicBarrier;
    private final AtomicInteger counter = new AtomicInteger(0);
    private final CompleteCallback callback;

    private final AtomicBoolean success = new AtomicBoolean(false);

    private final RecoveryPanel panel;

    public BigFileReader(CompleteCallback callback, RecoveryPanel panel) {
        this.panel = panel;
        File file = panel.getDictionaryFile();
        if (!file.exists()) {
            throw new IllegalArgumentException("文件不存在!");
        }
        this.fileLength = file.length();
        this.charset = EncoderDetector.judgeFile(file.getAbsolutePath());
        this.bufferSize = DEFAULT_BUFFER_SIZE;
        try {
            this.rAccessFile = new RandomAccessFile(file, "r");
        } catch (FileNotFoundException e) {
            logger.error("BigFileReader FileNotFoundException");
        }
        this.executorService = new ScheduledThreadPoolExecutor(PROCESSOR_COUNT);
        this.startEndPairs = new HashSet<>();
        this.callback = callback;
    }

    public void start() {
        long everySize = fileLength / PROCESSOR_COUNT;
        try {
            calculateStartEnd(0, everySize);
        } catch (IOException e) {
            logger.error("start", e);
            return;
        }

        final long startTime = System.currentTimeMillis();
        int parties = startEndPairs.size();
        logger.info("[TaskTracing]Parties: " + parties);
        cyclicBarrier = new CyclicBarrier(parties, () -> {
            logger.info("use time: " + (System.currentTimeMillis() - startTime) + "ms");
            logger.info("all line: " + counter.get());
            callback.onComplete(null);
        });
        for (StartEndPair pair : startEndPairs) {
            logger.info("pair: " + pair);
            executorService.execute(new SliceReaderTask(pair));
        }
    }

    private void calculateStartEnd(long start, long size) throws IOException {
        if (start > fileLength - 1) {
            return;
        }
        StartEndPair pair = new StartEndPair();
        pair.start = start;
        long endPosition = start + size - 1;
        if (endPosition >= fileLength - 1) {
            pair.end = fileLength - 1;
            startEndPairs.add(pair);
            return;
        }

        rAccessFile.seek(endPosition);
        byte tmp = (byte) rAccessFile.read();
        while (tmp != '\n' && tmp != '\r') {
            endPosition++;
            if (endPosition >= fileLength - 1) {
                endPosition = fileLength - 1;
                break;
            }
            rAccessFile.seek(endPosition);
            tmp = (byte) rAccessFile.read();
        }
        pair.end = endPosition;
        startEndPairs.add(pair);

        calculateStartEnd(endPosition + 1, size);
    }

    public void shutdown() {
        try {
            rAccessFile.close();
        } catch (IOException e) {
            logger.error("shutdown IOException");
        }
        executorService.shutdown();
        logger.info("shutdown executorService");
    }

    private void handle(byte[] bytes) throws UnsupportedEncodingException {
        if (success.compareAndSet(true, true) || panel.getCurrentState() != State.WORKING) {
            return;
        }

        String line;
        if (charset == null) {
            line = new String(bytes);
        } else {
            line = new String(bytes, charset);
        }

        panel.setCurrentPassword(line);
        panel.increaseProgressBarValue();
        counter.decrementAndGet();

        if (panel.getCurrentFileChecker().checkPassword(line)) {
            if (success.compareAndSet(false, true)) {
                logger.info("find password: {}", line);
                callback.onComplete(line);
            }
        } else {
            if (!success.compareAndSet(true, true) && panel.getCurrentState() == State.WORKING) {
                logger.info("try password[{}] failed", line);
            }
        }
    }

    private static class StartEndPair {
        public long start;
        public long end;

        @Override
        public String toString() {
            return "star=" + start + ";end=" + end;
        }

        @Override
        public int hashCode() {
            final int prime = 31;
            int result = 1;
            result = prime * result + (int) (end ^ (end >>> 32));
            result = prime * result + (int) (start ^ (start >>> 32));
            return result;
        }

        @Override
        public boolean equals(Object obj) {
            if (this == obj)
                return true;
            if (obj == null)
                return false;
            if (getClass() != obj.getClass())
                return false;
            StartEndPair other = (StartEndPair) obj;
            if (end != other.end)
                return false;
            return start == other.start;
        }

    }

    private class SliceReaderTask implements Runnable {
        private final long start;
        private final long sliceSize;
        private final byte[] readBuff;

        public SliceReaderTask(StartEndPair pair) {
            this.start = pair.start;
            this.sliceSize = pair.end - pair.start + 1;
            this.readBuff = new byte[bufferSize];
        }

        @Override
        public void run() {
            try {
                MappedByteBuffer mapBuffer = rAccessFile.getChannel().map(FileChannel.MapMode.READ_ONLY, start, this.sliceSize);
                ByteArrayOutputStream bos = new ByteArrayOutputStream();
                for (int offset = 0; offset < sliceSize; offset += bufferSize) {
                    int readLength;
                    if (offset + bufferSize <= sliceSize) {
                        readLength = bufferSize;
                    } else {
                        readLength = (int) (sliceSize - offset);
                    }
                    mapBuffer.get(readBuff, 0, readLength);
                    for (int i = 0; i < readLength; i++) {
                        byte tmp = readBuff[i];
                        if (tmp == '\n' || tmp == '\r') {
                            if (bos.size() > 0) {
                                handle(bos.toByteArray());
                            }
                            bos.reset();
                        } else {
                            bos.write(tmp);
                        }
                    }
                }
                if (bos.size() > 0) {
                    handle(bos.toByteArray());
                }
                logger.info("[TaskTracing]Waiting number: " + cyclicBarrier.getNumberWaiting());
            } catch (Exception e) {
                logger.error("run Exception" + e.getMessage());
            }
            try {
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                logger.error("await InterruptedException");
                Thread.currentThread().interrupt();
            } catch (BrokenBarrierException e) {
                logger.error("await BrokenBarrierException");
            }
        }

    }
}