/*
 * Decompiled with CFR 0.152.
 */
package sfa;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import sfa.SFAWordsTest;
import sfa.index.SFATrie;
import sfa.index.SFATrieTest;
import sfa.index.SortedListMap;
import sfa.timeseries.TimeSeries;
import sfa.timeseries.TimeSeriesLoader;
import sfa.transformation.SFA;

@RunWith(value=JUnit4.class)
public class SFABulkLoadTest {
    static File tempDir = null;
    static ExecutorService serializerExec = Executors.newFixedThreadPool(2);
    static ExecutorService transformExec = Executors.newFixedThreadPool(4);
    static LinkedList<Future<Long>> futures = new LinkedList();
    static int l = 16;
    static int leafThreshold = 100;
    static byte symbols = (byte)8;
    static Runtime runtime = Runtime.getRuntime();

    @Before
    public void setUpBucketDir() {
        try {
            tempDir = Files.createTempDirectory("tmp", new FileAttribute[0]).toFile();
            System.out.println("Created temp directory at " + tempDir.getAbsolutePath());
            tempDir.deleteOnExit();
        }
        catch (IOException e) {
            Assert.fail("Unable to create temp directory: " + e.getMessage());
        }
    }

    @Test
    public void testBulkLoadWholeMatching() throws IOException {
    }

    private TimeSeries getTimeSeries(int i, int n) {
        return TimeSeriesLoader.generateRandomWalkData(n, new Random(i));
    }

    private int getBestDepth(int count, int chunkSize) {
        int trieDepth = (int)Math.round(Math.log(count / chunkSize) / Math.log(8.0));
        System.out.println("Using trie depth:\t" + trieDepth + " (" + (int)Math.pow(8.0, trieDepth) + " buckets)");
        return trieDepth;
    }

    @Test
    public void testBulkLoadSubsequenceMatching() throws IOException {
        int N = 2000000;
        System.out.println("Loading/generating Time Series of queryLength " + N + "...");
        TimeSeries timeSeries = this.getTimeSeries(1, N);
        System.out.println("Sample DS size:\t" + N);
        ClassLoader classLoader = SFAWordsTest.class.getClassLoader();
        TimeSeries[] timeSeries2 = TimeSeriesLoader.readSamplesQuerySeries(classLoader.getResource("datasets/indexing/query_lightcurves.txt").getFile());
        int n = timeSeries2[0].getLength();
        System.out.println("Query DS size:\t" + n);
        long mem = runtime.totalMemory();
        SFA sfa = new SFA(SFA.HistogramType.EQUI_FREQUENCY);
        sfa.fitWindowing(new TimeSeries[]{timeSeries}, n, l, symbols, true, true);
        int chunkSize = 100000;
        System.out.println("Chunk size:\t" + chunkSize);
        int trieDepth = this.getBestDepth(N, chunkSize);
        SerializedStreams dataStream = new SerializedStreams(trieDepth);
        long time = System.currentTimeMillis();
        int i = 0;
        int a = 0;
        while (i < timeSeries.getLength()) {
            TimeSeries subsequence = timeSeries.getSubsequence(i, chunkSize + n - 1);
            double[][] words = sfa.transformWindowingDouble(subsequence);
            for (int pos = 0; pos < words.length; ++pos) {
                byte[] w = sfa.quantizationByte(words[pos]);
                dataStream.addToPartition(w, words[pos], i + pos, trieDepth);
            }
            long bytesWritten = 0L;
            while (!futures.isEmpty()) {
                try {
                    bytesWritten = futures.remove().get();
                }
                catch (Exception e) {
                    Assert.fail(e.getMessage());
                }
            }
            System.out.println("\tavg write speed: " + bytesWritten / (System.currentTimeMillis() - time) + " kb/s");
            i += chunkSize;
            ++a;
        }
        dataStream.setFinished();
        SFATrie index = this.buildSFATrie(l, leafThreshold, n, trieDepth, sfa);
        index.initializeSubsequenceMatching(timeSeries, n);
        this.performGC();
        System.out.println("Memory: " + (runtime.totalMemory() - mem) / 0x100000L + " MB (rough estimate)");
        int k = 1;
        int size = timeSeries.getData().length - n + 1;
        double[] means = index.means;
        double[] stds = index.stddev;
        for (int i2 = 0; i2 < timeSeries2.length; ++i2) {
            System.out.println(i2 + 1 + ". Query");
            TimeSeries query = timeSeries2[i2];
            time = System.currentTimeMillis();
            SortedListMap<Double, Integer> result = index.searchNearestNeighbor(query, k);
            time = System.currentTimeMillis() - time;
            System.out.println("\tSFATree:" + (double)time / 1000.0 + "s");
            List<Double> distances = result.keys();
            index.resetIoCosts();
            time = System.currentTimeMillis();
            double resultDistance = Double.MAX_VALUE;
            for (int ww = 0; ww < size; ++ww) {
                double distance = SFATrieTest.getEuclideanDistance(timeSeries, query, means[ww], stds[ww], resultDistance, ww);
                resultDistance = Math.min(distance, resultDistance);
            }
            time = System.currentTimeMillis() - time;
            System.out.println("\tEuclidean:" + (double)time / 1000.0 + "s");
            Assert.assertEquals("Distances do not match: " + resultDistance + "\t" + distances.get(0), distances.get(0), resultDistance, 0.003);
        }
        System.out.println("All ok...");
    }

    protected SFATrie buildSFATrie(int l, int leafThreshold, int windowLength, int trieDepth, SFA sfa) {
        SFATrie index = null;
        System.out.println("Building and merging Trees:");
        File directory = tempDir;
        for (File bucket : directory.listFiles()) {
            if (!bucket.isFile() || !bucket.getName().contains("bucket")) continue;
            long time = System.currentTimeMillis();
            List<SFATrie.Approximation[]> windows = this.readFromFile(bucket);
            if (windows.isEmpty()) continue;
            SFATrie trie = new SFATrie(l, leafThreshold, sfa);
            trie.buildIndex(windows, trieDepth);
            if (index == null) {
                index = trie;
            } else {
                index.mergeTrees(trie);
            }
            System.out.println("Merging done in " + (System.currentTimeMillis() - time) + " ms. \t Elements: " + index.getSize() + "\t Height: " + index.getHeight());
        }
        if (index != null) {
            index.compress(true);
        }
        return index;
    }

    public void performGC() {
        try {
            System.gc();
            Thread.sleep(10L);
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    protected List<SFATrie.Approximation[]> readFromFile(File name) {
        System.out.println("Reading from : " + name.toString());
        long count = 0L;
        ArrayList<SFATrie.Approximation[]> data = new ArrayList<SFATrie.Approximation[]>();
        try (ObjectInputStream in2 = new ObjectInputStream(new BufferedInputStream(new FileInputStream(name)));){
            SFATrie.Approximation[] d = null;
            while ((d = (SFATrie.Approximation[])in2.readObject()) != null) {
                data.add(d);
                count += (long)d.length;
            }
        }
        catch (EOFException in2) {
        }
        catch (Exception e) {
            Assert.fail(e.getMessage());
        }
        System.out.println("\t" + count + " time series read.");
        return data;
    }

    @After
    public void tearDown() throws Exception {
        serializerExec.shutdown();
        transformExec.shutdown();
    }

    static class SerializedStreams {
        LinkedBlockingQueue<SFATrie.Approximation>[] wordPartitions;
        ObjectOutputStream[] partitionsStream;
        static final int minWriteToDiskLimit = 100000;
        long[] writtenSamples;
        long totalBytes = 0L;
        double time = 0.0;

        public SerializedStreams(int useLetters) {
            int count = (int)Math.pow(8.0, useLetters);
            this.wordPartitions = new LinkedBlockingQueue[count];
            this.partitionsStream = new ObjectOutputStream[count];
            this.writtenSamples = new long[count];
            this.time = System.currentTimeMillis();
            for (int i = 0; i < this.wordPartitions.length; ++i) {
                this.wordPartitions[i] = new LinkedBlockingQueue(200000);
                this.writtenSamples[i] = 0L;
            }
        }

        public void setFinished() {
            for (int i = 0; i < this.wordPartitions.length; ++i) {
                try {
                    ArrayList<SFATrie.Approximation> current = new ArrayList<SFATrie.Approximation>(this.wordPartitions[i].size());
                    this.wordPartitions[i].drainTo(current);
                    this.writeToDisk(current, i);
                    continue;
                }
                catch (Exception e) {
                    Assert.fail(e.getMessage());
                }
            }
            while (!futures.isEmpty()) {
                try {
                    futures.remove().get();
                }
                catch (Exception e) {
                    Assert.fail(e.toString());
                }
            }
            long totalTSwritten = 0L;
            for (int i = 0; i < this.wordPartitions.length; ++i) {
                try {
                    if (this.partitionsStream[i] == null) continue;
                    this.partitionsStream[i].close();
                    totalTSwritten += this.writtenSamples[i];
                    continue;
                }
                catch (Exception e) {
                    Assert.fail(e.getMessage());
                }
            }
            System.out.println("Time series written:" + totalTSwritten);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void addToPartition(byte[] words, double[] data, int offset, int prefixLength) {
            try {
                final int l = this.getPrefix(words, prefixLength);
                this.wordPartitions[l].put(new SFATrie.Approximation(data, words, offset));
                LinkedBlockingQueue<SFATrie.Approximation> linkedBlockingQueue = this.wordPartitions[l];
                synchronized (linkedBlockingQueue) {
                    if (this.wordPartitions[l].size() >= 100000) {
                        final ArrayList current = new ArrayList(this.wordPartitions[l].size());
                        this.wordPartitions[l].drainTo(current);
                        futures.add(serializerExec.submit(new Callable<Long>(){

                            @Override
                            public Long call() throws Exception {
                                this.writeToDisk(current, l);
                                totalBytes += (long)(current.size() * 20 * 8);
                                return totalBytes;
                            }
                        }));
                    }
                }
            }
            catch (Exception e) {
                Assert.fail(e.getMessage());
            }
        }

        protected int getPrefix(byte[] word, int useLetters) {
            int id = word[0];
            if (useLetters > 1) {
                id = id * 8 + word[1];
            }
            if (useLetters > 2) {
                id = id * 8 + word[2];
            }
            return id;
        }

        protected void writeToDisk(List<SFATrie.Approximation> current, int letter) throws IOException {
            if (!current.isEmpty()) {
                if (this.partitionsStream[letter] == null) {
                    String fileName = tempDir.getAbsolutePath() + File.separator + letter + ".bucket";
                    File file = new File(fileName);
                    file.deleteOnExit();
                    this.partitionsStream[letter] = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(file, false), 0x800000));
                }
                this.partitionsStream[letter].writeUnshared(current.toArray(new SFATrie.Approximation[0]));
                this.partitionsStream[letter].reset();
                try {
                    Thread.sleep(100L);
                }
                catch (InterruptedException e) {
                    Assert.fail(e.getMessage());
                }
                int n = letter;
                this.writtenSamples[n] = this.writtenSamples[n] + (long)current.size();
            }
        }
    }
}

