当前位置: 首页 > news >正文

最后的并行查询加载模块BatchQueryLoader直接就是调用上面的异步并行查询执行器BatchQueryExecutor,完成不同数据源的数据并行异步加载,代码如下

  1. ** * @filename:BatchQueryLoader.java * * Newland Co. Ltd. All rights reserved. * * @Description:并行查询加载模块 * @author tangjie * @version 1.0 * */ package newlandframework.batchtask.parallel; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.List; public class BatchQueryLoader { private final Collection<StatementWrapper> statements = new ArrayList<StatementWrapper>(); public void attachLoadEnv(final String sql, final Statement statement, final Connection con) { statements.add(new StatementWrapper(sql, statement, con)); } public Collection<StatementWrapper> getStatements() { return statements; } public void close() throws SQLException { Iterator<StatementWrapper> iter = statements.iterator(); while (iter.hasNext()) { iter.next().getCon().close(); } } public List<ResultSet> executeQuery() throws SQLException { List<ResultSet> result; if (1 == statements.size()) { StatementWrapper entity = statements.iterator().next(); result = Arrays.asList(entity.getStatement().executeQuery( entity.getSql())); return result; } else { BatchQueryExecutor query = new BatchQueryExecutor(); result = query.executeQuery(statements, new BatchQuery<StatementWrapper, ResultSet>() { @Override public ResultSet query(final StatementWrapper input) throws Exception { return input.getStatement().executeQuery( input.getSql()); } }); return result; } } }

  2. 批量处理线程池运行参数配置加载BatchTaskConfigurationLoader模块,主要从负责从batchtask-configuration.xml中加载线程池的运行参数。BatchTaskConfiguration批处理线程池运行参数对应的JavaBean结构

    /** * @filename:BatchTaskConfiguration.java * * Newland Co. Ltd. All rights reserved. * * @Description:批处理线程池参数配置 * @author tangjie * @version 1.0 * */ package newlandframework.batchtask.parallel; import org.apache.commons.lang.builder.EqualsBuilder; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; public class BatchTaskConfiguration { private String name; private int corePoolSize; private int maxPoolSize; private int keepAliveTime; private int workQueueSize; public void setName(String name) { this.name = name; } public String getName() { return this.name; } public int getCorePoolSize() { return corePoolSize; } public void setCorePoolSize(int corePoolSize) { this.corePoolSize = corePoolSize; } public int getMaxPoolSize() { return maxPoolSize; } public void setMaxPoolSize(int maxPoolSize) { this.maxPoolSize = maxPoolSize; } public int getKeepAliveTime() { return keepAliveTime; } public void setKeepAliveTime(int keepAliveTime) { this.keepAliveTime = keepAliveTime; } public int getWorkQueueSize() { return workQueueSize; } public void setWorkQueueSize(int workQueueSize) { this.workQueueSize = workQueueSize; } public int hashCode() { return new HashCodeBuilder(1, 31).append(name).toHashCode(); } public String toString() { return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) .append("name", name).append("corePoolSize", corePoolSize) .append("maxPoolSize", maxPoolSize) .append("keepAliveTime", keepAliveTime) .append("workQueueSize", workQueueSize).toString(); } public boolean equals(Object o) { boolean res = false; if (o != null && BatchTaskConfiguration.class.isAssignableFrom(o.getClass())) { BatchTaskConfiguration s = (BatchTaskConfiguration) o; res = new EqualsBuilder().append(name, s.getName()).isEquals(); } return res; } }

    当然了,你进行参数配置的时候,还可以指定多个线程池,于是要设计一个:批处理线程池工厂类BatchTaskThreadFactoryConfiguration,来依次循环保存若干个线程池的参数配置

    /** * @filename:BatchTaskThreadFactoryConfiguration.java * * Newland Co. Ltd. All rights reserved. * * @Description:线程池参数配置工厂 * @author tangjie * @version 1.0 * */ package newlandframework.batchtask.parallel; import java.util.Map; import java.util.HashMap; public class BatchTaskThreadFactoryConfiguration { // 批处理线程池参数配置 private Map<String, BatchTaskConfiguration> batchTaskMap = new HashMap<String, BatchTaskConfiguration>(); public BatchTaskThreadFactoryConfiguration() { } public void joinBatchTaskConfiguration(BatchTaskConfiguration batchTaskConfiguration) { if (batchTaskMap.containsKey(batchTaskConfiguration.getName())) { return; }else{ batchTaskMap.put(batchTaskConfiguration.getName(), batchTaskConfiguration); } } public Map<String, BatchTaskConfiguration> getBatchTaskMap() { return batchTaskMap; } }

    剩下的是,加载运行时参数配置模块BatchTaskConfigurationLoader

    /** * @filename:BatchTaskConfigurationLoader.java * * Newland Co. Ltd. All rights reserved. * * @Description:线程池参数配置加载 * @author tangjie * @version 1.0 * */ package newlandframework.batchtask.parallel; import java.io.InputStream; import org.apache.commons.digester.Digester; public final class BatchTaskConfigurationLoader { private static final String BATCHTASK_THREADPOOL_CONFIG = "./newlandframework/batchtask/parallel/batchtask-configuration.xml"; private static BatchTaskThreadFactoryConfiguration config = null; private BatchTaskConfigurationLoader() { } // 单例模式为了控制并发要进行同步控制 public static BatchTaskThreadFactoryConfiguration getConfig() { if (config == null) { synchronized (BATCHTASK_THREADPOOL_CONFIG) { if (config == null) { try { InputStream is = getInputStream(); config = (BatchTaskThreadFactoryConfiguration) getDigester().parse(getInputStream()); } catch (Exception e) { e.printStackTrace(); } } } } return config; } private static InputStream getInputStream() { return BatchTaskConfigurationLoader.class.getClassLoader() .getResourceAsStream(BATCHTASK_THREADPOOL_CONFIG); } private static Digester getDigester() { Digester digester = new Digester(); digester.setValidating(false); digester.addObjectCreate("batchtask", BatchTaskThreadFactoryConfiguration.class); // 加载批处理异步批处理线程池参数配置 digester.addObjectCreate("*/jobpool", BatchTaskConfiguration.class); digester.addSetProperties("*/jobpool"); digester.addSetProperty("*/jobpool/attribute", "name", "value"); digester.addSetNext("*/jobpool", "joinBatchTaskConfiguration"); return digester; } }

    上面的这些模块主要是针对线程池的运行参数可以调整而设计准备的。

  3. 并行异步批处理模块BatchTaskReactor主要类图结构如下

    BatchTaskRunner这个接口,主要定义了批处理框架要初始化和回收资源的动作。

    /** * @filename:BatchTaskRunner.java * * Newland Co. Ltd. All rights reserved. * * @Description:批处理资源管理定义接口 * @author tangjie * @version 1.0 * */ package newlandframework.batchtask.parallel; import java.io.Closeable; public interface BatchTaskRunner extends Closeable { public void initialize(); public void close(); }

    我们还要重新实现一个线程工厂类BatchTaskThreadFactory,用来管理我们线程池当中的线程。我们可以把线程池当中的线程放到线程组里面,进行统一管理。比如线程池中的线程,它的运行状态监控等等处理,你可以通过重新生成一个监控线程,
    来运行、跟踪线程组里面线程的运行情况。当然你还可以重新封装一个JMX(Java Management Extensions)的MBean对象,通过JMX方式对线程池进行监控处理,本文的后面,有给出运用JMX技术,进行批处理线程池任务完成情况监控的实现,实现线程池中线程运行状态的监控可以参考一下。这里就不具体给出,线程池线程状态监控的JMX模块代码了。言归正传,线程工厂类BatchTaskThreadFactory的实现如下

    /** * @filename:BatchTaskThreadFactory.java * * Newland Co. Ltd. All rights reserved. * * @Description:线程池工厂 * @author tangjie * @version 1.0 * */ package newlandframework.batchtask.parallel; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.ThreadFactory; public class BatchTaskThreadFactory implements ThreadFactory { final private static String BATCHTASKFACTORYNAME = "batchtask-pool"; final private String name; final private ThreadGroup threadGroup; final private AtomicInteger threadNumber = new AtomicInteger(0); public BatchTaskThreadFactory() { this(BATCHTASKFACTORYNAME); } public BatchTaskThreadFactory(String name) { this.name = name; SecurityManager security = System.getSecurityManager(); threadGroup = (security != null) ? security.getThreadGroup() : Thread.currentThread().getThreadGroup(); } @Override public Thread newThread(Runnable runnable) { Thread thread = new Thread(threadGroup, runnable); thread.setName(String.format("BatchTask[%s-%d]", threadGroup.getName(), threadNumber.incrementAndGet())); System.out.println(String.format("BatchTask[%s-%d]", threadGroup.getName(), threadNumber.incrementAndGet())); if (thread.isDaemon()) { thread.setDaemon(false); } if (thread.getPriority() != Thread.NORM_PRIORITY) { thread.setPriority(Thread.NORM_PRIORITY); } return thread; } }

    下面是关键模块:并行异步批处理模块BatchTaskReactor的实现代码,主要还是对ThreadPoolExecutor进行地封装,考虑使用有界的数组阻塞队列ArrayBlockingQueue,还是为了防止:生产者无休止的请求服务,导致内存崩溃,最终做到内存使用可控
    采取的措施。

    /** * @filename:BatchTaskReactor.java * * Newland Co. Ltd. All rights reserved. * * @Description:批处理并行异步线程池处理模块 * @author tangjie * @version 1.0 * */ package newlandframework.batchtask.parallel; import java.util.Set; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public final class BatchTaskReactor implements BatchTaskRunner { private Map<String, ExecutorService> threadPools = new ConcurrentHashMap<String, ExecutorService>(); private static BatchTaskReactor context; private static Lock REACTORLOCK = new ReentrantLock(); public static final String BATCHTASK_THREADPOOL_NAME = "newlandframework_batchtask"; private BatchTaskReactor() { initialize(); } // 防止并发重复创建批处理反应器对象 public static BatchTaskReactor getReactor() { if (context == null) { try { REACTORLOCK.lock(); if (context == null) { context = new BatchTaskReactor(); } } finally { REACTORLOCK.unlock(); } } return context; } public ExecutorService getBatchTaskThreadPoolName() { return getBatchTaskThreadPool(BATCHTASK_THREADPOOL_NAME); } public ExecutorService getBatchTaskThreadPool(String poolName) { if (!threadPools.containsKey(poolName)) { throw new IllegalArgumentException(String.format( "批处理线程池名称:[%s]参数配置不存在", poolName)); } return threadPools.get(poolName); } public Set<String> getBatchTaskThreadPoolNames() { return threadPools.keySet(); } // 关闭线程池,同时等待异步执行的任务返回执行结果 public void close() { for (Entry<String, ExecutorService> entry : threadPools.entrySet()) { entry.getValue().shutdown(); System.out.println(String.format("关闭批处理线程池:[%s]成功", entry.getKey())); } threadPools.clear(); } // 初始化批处理线程池 public void initialize() { BatchTaskThreadFactoryConfiguration poolFactoryConfig = BatchTaskConfigurationLoader.getConfig(); if (poolFactoryConfig != null) { initThreadPool(poolFactoryConfig); } } private void initThreadPool(BatchTaskThreadFactoryConfiguration poolFactoryConfig) { for (Entry<String, BatchTaskConfiguration> entry : poolFactoryConfig.getBatchTaskMap().entrySet()) { BatchTaskConfiguration config = entry.getValue(); // 使用有界的阻塞队列,考虑为了防止生产者无休止的请求服务,导致内存崩溃,最终做到内存使用可控 BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(config.getWorkQueueSize()); ThreadPoolExecutor threadPool = new ThreadPoolExecutor( config.getCorePoolSize(), config.getMaxPoolSize(), config.getKeepAliveTime(), TimeUnit.SECONDS, queue, new BatchTaskThreadFactory(entry.getKey()),new ThreadPoolExecutor.CallerRunsPolicy()); threadPools.put(entry.getKey(), threadPool); System.out.println(String.format("批处理线程池:[%s]创建成功",config.toString())); } } }

http://www.gsyq.cn/news/1618147.html

相关文章:

  • URL 使用规范
  • Pikachu靶场从入门到精通(五):RCE、XXE、SSRF与反序列化漏洞实战
  • 硬件学习笔记
  • Go escape逃逸分析
  • 孤能子视角:Karpathy LLM Wiki,一个人工观察符自动编织系统
  • 第4章 RAG 检索增强生成全链路架构《AI Agent 开发平台资深技术专家 AI Agent 应用架构师 CTO 面试题库详解》
  • 生成式引擎优化(GEO)在酒店民宿行业的落地实践:对抗 OTA 流量截流
  • 智能合约开发中的威胁建模:代码生成前的安全基线构建
  • AI 编译优化入门:算子融合不是为了少写几行代码
  • Kiran Biometrics:开源生物识别认证系统的完整指南
  • ActiveReports for .NET 20.0J SP1-AIレポートウィザードがさらに進化
  • c++复习自存
  • Cursor Free VIP破解工具:3分钟解除AI编程助手试用限制的终极指南
  • 西安共享茶室平台开发?时段预约锁房技术源码讲解
  • 【小白也能轻松玩转龙虾】虾壳云一键部署入门攻略,分步搭建桌面端 OpenClaw v2.7.9(附最新安装包)
  • AI 辅助:独立创作:工具应放大作者,而不是替代作者
  • 后端开发者转型AI大模型的必备技能与实战指南
  • AI 辅助:少说漂亮话:基础设施要用事故假设来设计
  • 5个场景化解决方案:用taskt告别重复劳动,实现桌面自动化革命
  • Harness Engineering(驾驭工程)简单的演化过程
  • 那些与量子纠缠有关的物理概念和现象
  • “借道”MoP封装,AMD打破“存储墙”与“空间锁”
  • 2.4 中间层:底层驱动与标准库——固收与负债的“稳态输出”
  • 一张图讲清楚:MCP边界
  • 子任务想换个便宜模型跑?Sub-Agent 这样设计
  • 语音一键转文字超简单!2026多款免费软件详细步骤,新手一看就会
  • 开源视频生成模型选择
  • SpringBoot+Vue 私人西服定制_leabo管理平台源码【适合毕设/课设/学习】Java+MySQL
  • 用最新 GPT-5.6 润色论文是一种怎么样的体验?
  • 一张图讲清楚:Codex上下文