This script contains a database pool implementation. It can be used in Inspire Automation scripts.

This script contains a database pool implementation. It can be used in Inspire Automation scripts.


Instructions for use:
Import it in Script Library and edit the SqlPoolConstants part with the proper values.

 Then it is possible to call it from scripts like this:


  DbPoolHelper.withConnectionPool() { sql ->
      id=1234
     name="John"
     sql.execute("insert into PERSON (id, name) values ($id, $name)")
  }


class SqlPoolConstants {

    // how long to wait for free connection when acquiring a connection and the
    // pool is used to maximum (milliseconds).
    // After this time, TimeoutException is thrown.
    static long getDbConnWaitTimeout() { return 1000 * 30 }

    // How long should the free connections be idle in the pool.
    // After this timeout the idle connection is closed and removed from pool.
    static long getDbConnIdleTimeout() { return 1000 * 60 }

    // maximum number of kept connections for each database pool
    static int getDbPoolMaxConnections() { return 5 }
   
    static int getMaxRetryOnError() { return 3 }

    // Define alias and the connection parameters for each database pool.
    // The alias is then used when calling the runSql() method.
    static def DATABASES = [
        'db1': [
            "jdbc:postgresql://localhost:5432/tracdb",
            "tracuser",
            "tracpass",
            "org.postgresql.Driver"
        ],
        'db2': [
           "jdbc:jtds:sqlserver://t218:1450/polcak4",
            "date",
            "heslo",
            "net.sourceforge.jtds.jdbc.Driver"
        ]
    ]
}

import groovy.sql.*
import java.util.*
import java.util.concurrent.TimeoutException
import groovy.lang.Singleton;
import java.sql.SQLException

class DbPool {
    /**
     * Acquires connection to DB, runs given closure with this connection and frees
     * the connection automatically.
     */
    static void withConnection(String database, Closure closure) {
        int max = SqlPoolConstants.getMaxRetryOnError()
        for(retry in 1..max) {
            Sql sql = SqlPoolImpl.instance.getConnection(database)
            try {
                try {
                    closure.call(sql)
                    return
                } catch(SQLException e) {
                    SqlPoolImpl.log("Error running sql operation. Retry ${retry} of ${max}. Error: " + e)
                    if(retry >= max) {
                        throw e;
                    }
                    Thread.sleep(1000)
                }
            } finally {
                SqlPoolImpl.instance.freeConnection(sql)
            }
        }
    }
}

// ----------------------------------------------------------------
// Implementation classes. Shouldn't be used from
// outside of this script.
// ----------------------------------------------------------------

/**
 * This class implements the sql pool core functionality. Is not meant to be used directly.
 */
@Singleton(lazy = true, strict = false)
class SqlPoolImpl {

    /**
     * Map of database aliases and their pools.
     * We don't synchronize access to this map, because
     * it is filled in constructor and then never modified.
     */
    private Map<String, DbPoolImpl> _pools = [:]

    /** Helper map used to store information about used Sql objects */
    private def _sqlToConnInfo = [:]

    /** This timer is used to plan connections removal from the pool */
    private def _timer = new Timer()

    SqlPoolImpl() {
        for (e in SqlPoolConstants.DATABASES) {
            _pools.put(e.key, new DbPoolImpl(e.value));
        }
        // Schedule logging statistics in regular intervals
        _timer.schedule(statisticsLogTask, 1000, 60 * 1000)
        _timer.schedule(removeExpiredTask, 1000, 500)
    }

    def statisticsLogTask = new TimerTask() {
        void run() {
            net.gmc.lib.logging.Logger.info(SqlPoolImpl.class, getStatistics())
        }
    }
    def removeExpiredTask = new TimerTask() {
        void run() {
            for (pool in _pools.values()) {
                pool.removeExpired()
            }
        }
    }

    String getStatistics() {
        return "Statistics: " + _pools.collect { it.key + ": " + it.value.getStatistics() }
    }

    private Sql getConnection(String dbAlias) {
        def pool
        pool = _pools[dbAlias]
        if (pool == null) {
            throw new Exception("Unknown database alias '" + dbAlias + "'")
        }
        ConnInfo ci = pool.getConnection()
        synchronized (_sqlToConnInfo) {
            _sqlToConnInfo.put(ci._sql, ci)
        }
        return ci._sql
    }

    private void freeConnection(Sql sql) {
        ConnInfo ci = null
        synchronized (_sqlToConnInfo) {
            ci = _sqlToConnInfo.remove(sql);
        }
        if (ci == null) { return; }
        ci._dbPool.freeConnection(ci);
    }

    def close() {
        log("Closing SqlPool")
        for (pool in _pools.values()) {
            pool.close()
        }
        _timer.cancel();
    }

    static log(String msg) {
        net.gmc.lib.logging.Logger.debug(SqlPoolImpl.class, msg, null)
    }

}

class ConnInfo {
    Sql _sql;
    DbPoolImpl _dbPool;
    long expirationTime;
    boolean isExpired() { return System.currentTimeMillis() > expirationTime}
    boolean isWorking() {
        try {
            _sql.execute("select 1");
            return true
        } catch(Exception e) {
            return false
        }
    }
}

class DbPoolImpl {
    def _dbInfo
    def _freeConnections = []
    def _usedConnections = []
    int _connectingCount = 0
    boolean _closed = false

    public DbPoolImpl(List<String> dbInfo) {
        _dbInfo = dbInfo
    }

    synchronized String getStatistics() {
        return "[total=${_usedConnections.size() + _freeConnections.size()}, idle=${_freeConnections.size()}, busy=${_usedConnections.size()}]"
    }

    ConnInfo getConnection() {
        long start = System.currentTimeMillis()
        boolean waitingLogged=false
        while (true) {
            long timeout = start + SqlPoolConstants.getDbConnWaitTimeout()
            if (System.currentTimeMillis() > timeout) {
                SqlPoolImpl.log("Timeout when waiting for free connection to " + _dbInfo[0])
                throw new TimeoutException("Timeout when waiting for free connection to " + _dbInfo[0])
            }
            boolean willCreateNewConnection = false
            try {
                synchronized (this) {
                    if (_closed) {
                        throw new Exception("DB Pool is closed");
                    }
                    // if some connection in cache
                    if (!_freeConnections.isEmpty()) {
                        // free connection available in cache, reuse it.
                        def connInfo = _freeConnections.remove(0)
                        if(connInfo.isWorking()) {
                            _usedConnections.add(connInfo)
                            SqlPoolImpl.log("Reusing connection " + connInfo._sql)
                           return connInfo
                        } else {
                            SqlPoolImpl.log("Connection is broken, will throw it away: " + connInfo._sql)
                            closeConn(connInfo)
                            // DO NOTHING, will create new connection
                        }
                    }

                    // no free connection in cache. Check if we can create a new one
                    // Note: If broken connection was closed, it was removed from _freeConnections and so the
                    // limit will be not execeeded.
                    if ((_freeConnections.size() + _usedConnections.size() + _connectingCount) < SqlPoolConstants.getDbPoolMaxConnections()) {
                        // We don't want create new connection inside a synchronized section (to not block other threads)
                        // So we just set a flag and create the connection after synchronized section
                        _connectingCount++
                        willCreateNewConnection = true
                    } else {
                        // No free and reached maximum => wait a little bit and repeat.
                        if(!waitingLogged) {
                            SqlPoolImpl.log("Waiting for place in DbPool")
                            waitingLogged=true
                        }
                        this.wait(1000)
                    }
                }
                if (willCreateNewConnection) {
                    try {
                        // we can allocate new connection
                        ConnInfo connInfo = new ConnInfo()
                        SqlPoolImpl.log("Creating new connection to " + _dbInfo[0])
                        connInfo._sql = Sql.newInstance(_dbInfo[0], _dbInfo[1], _dbInfo[2], _dbInfo[3])
                        SqlPoolImpl.log("Connected to " + _dbInfo[0] + " - connection: " + connInfo._sql)
                        synchronized (this) {
                            connInfo._dbPool = this
                            _usedConnections.add(connInfo)
                            _connectingCount--
                            willCreateNewConnection = false
                            if (_closed) {
                                freeConnection(connInfo)
                                throw new Exception("DB Pool is closed");
                            }
                        }
                        return connInfo
                    } catch(Exception e) {
                        SqlPoolImpl.log("Cannot create new connection, will try again later. Error: " + e)
                        synchronized (this) {
                            willCreateNewConnection = false
                            _connectingCount--;
                            this.wait(1000)
                        }
                    }
                }
            } finally {
                // In case there was an exception while creating new connection,
                // we need decrease the counter again.
                if (willCreateNewConnection) {
                    _connectingCount--
                }
            }
        }
    }

    synchronized void freeConnection(ConnInfo ci) {
        _usedConnections.remove(ci);
        if (_closed) {
            closeConn(ci)
        } else {
            _freeConnections.add(ci);
            ci.expirationTime = System.currentTimeMillis() + SqlPoolConstants.getDbConnIdleTimeout()
            SqlPoolImpl.log("Released connection " + ci._sql + ", removal planned at " + new Date(System.currentTimeMillis() + SqlPoolConstants.getDbConnIdleTimeout()))
            notifyAll()
        }
    }

    private void closeConn(ConnInfo ci) {
        SqlPoolImpl.log("Closing and removing connection " + ci._sql)
        try {
            ci._sql.close()
        } catch(Exception e) {}
    }

    void removeExpired() {
        def toClose = []
        synchronized (this) {
            _freeConnections.removeAll() {
                if(it.isExpired()) { toClose.add(it); return true }
                return false
            }
        }
        for(ci in toClose) {
            closeConn(ci)
        }
    }

    synchronized void close() {
        // What to do with used connections? We preserve it since
        // when closing the SqlPool (on script library recompile)
        // some executors can be running
        _closed = true;
        for (ci in _freeConnections) {
            closeConn(ci)
        }
        _freeConnections.clear()
        for(ci in _usedConnections) {
            SqlPoolImpl.log("Not closing used connection. Will be closed immediately after releasing: " + ci._sql)
        }
    }
}

// When the script is thrown away, it must close the pool
static void onScriptClose() {
    SqlPoolImpl.instance.close()
}

Comments

Popular posts from this blog

8 Machine Learning Algorithms explained in Human language

Deep Learning Architectures explained in Human Language

How To Set Up Pokemon MMO: