大数据正式京淘13 青旅半醒 2022-06-01 12:43 78阅读 0赞 # 大数据正式13 # ### 定时任务 ### * 防止恶意订单 * 在订单提交之后,没有支付,但是订单没有生成效益,却减少了库存,如果大量生成这种订单,库存到0,无法继续购买 * 解决方案 * 虚拟商品数量:这个一直减,不是太好--适合紧急解决 * 引入定时任务,超时未支付订单自动回库,库存自动回退 * 电商:一天 * 解决技术 * Timer的API * 插件:石英钟 * 图解关系 * ![vyeDxNb.png][] * 使用 1. 本身就是一个jar包 2. 核心组件【Job,JobDetail具体处理,Scheduler调度器,Trigger触发器】 * JobDetail+Job * 继承自石英钟的父类,启动容器后,一旦加载JobDetail的实例,其中JobDetail下的多个job逻辑需要编写代码 * 调度器:负责调用一个JobDetail的的时间触发器 * 触发器:管理触发当前一个石英钟逻辑的JobDetail的组件,时间计算表达式,任何触发任务执行是由触发器计算管理的 3. 简单解释 1. Job * 表示一个工作,要执行的具体内容。 * 此接口中只有一个方法void execute(JobExecutionContext context) 2. JobDetail * JobDetail表示一个具体的可执行的调度程序,Job是这个可执行程调度程序所要执行的内容,另外JobDetail还包含了这个任务调度的方案和策略。 3. Trigger * 代表一个调度参数的配置,什么时候去调。 4. Scheduler * 代表一个调度容器,一个调度容器中可以注册多个JobDetail和Trigger。当Trigger与JobDetail组合,就可以被Scheduler容器调度了。 * 触发器的分类 * 简单触发器【Simple】 * 只能完成一些简单的circle时间逻辑,每隔一段时间,进行任务触发 * 复杂计算器【cron】 * 可以定在任意时间点进行事件的触发 * Second:秒 * Minute:分 * Hour:时 * Day-of-month:月中的天 * Month:月 * Day-of-work:周中的天 * Year:年 * 石英钟使用过程 * 创建JobDetail实例 * 注册调度器 * 调度触发器 * 计算时间触发时间 * 触发任务代码 * 执行job代码 * 工作原理 1. scheduler是一个计划调度器容器(总部),容器里面可以盛放众多的JobDetail和trigger,当容器启动后,里面的每个JobDetail都会根据trigger按部就班自动去执行。 2. JobDetail是一个可执行的工作,它本身可能是有状态的。 3. Trigger代表一个调度参数的配置,什么时候去调。 4. 当JobDetail和Trigger在scheduler容器上注册后,形成了装配好的作业(JobDetail和Trigger所组成的一对儿),就可以伴随容器启动而调度执行了。 5. scheduler是个容器,容器中有一个线程池,用来并行调度执行每个作业,这样可以提高容器效率。 6. 内部结构 * ![NMf2nFT.png][] ### 将石英钟添加到京淘 ### * 代码逻辑 1. 设置支付超时 2. 判断是否超时:【当前的时间 - 创建的时间】结合【status的状态】 3. 如果超时:归还库存 * 依赖 # # <!-- 石英钟任务 --> <dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz</artifactId> <version>2.2.1</version> </dependency> * 配置文件:applicationContext-scheduler.xml # # <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.0.xsd"> <!-- 定义任务bean --> <bean name="paymentOrderJobDetail" class="org.springframework.scheduling.quartz.JobDetailFactoryBean"> <!-- 指定具体的job类 --> <property name="jobClass" value="com.peng.job.PayMentOrderJob" /> <!-- 指定job的名称 --> <property name="name" value="paymentOrder" /> <!-- 指定job的分组 --> <property name="group" value="Order" /> <!-- 必须设置为true,如果为false,当没有活动的触发器与之关联时会在调度器中删除该任务 --> <property name="durability" value="true" /> <!-- 指定spring容器的key,如果不设定在job中的jobmap中是获取不到spring容器的 --> <property name="applicationContextJobDataKey" value="applicationContext" /> </bean> <!-- 定义触发器 --> <bean id="cronTrigger" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean"> <property name="jobDetail" ref="paymentOrderJobDetail" /> <!-- 每一天执行一次 --> <property name="cronExpression" value="0 0 0/23 * * ?" /> </bean> <!-- 定义调度器 --> <bean class="org.springframework.scheduling.quartz.SchedulerFactoryBean"> <property name="triggers"> <list> <ref bean="cronTrigger" /> </list> </property> </bean> </beans> * java代码:PayMentOrderJob.java # # package com.peng.job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.springframework.context.ApplicationContext; import org.springframework.scheduling.quartz.QuartzJobBean; public class PayMentOrderJob extends QuartzJobBean { @Override protected void executeInternal(JobExecutionContext context) throws JobExecutionException { ApplicationContext applicationContrext = (ApplicationContext) context.getJobDetail().getJobDataMap() .get("applicationContext"); System.err.println("定时任务执行中~~"); } } # 消息队列 # ### RabbitMQ ### * 引入 * 当前京淘的架构性能提升点 1. NGINX高并发 2. Redis内存缓存数据库(非结构数据) 3. amoeba提升数据最后关卡的性能 * 超负荷的请求,以上三个技术无法处理 * 当请求来到时,如果并发量太大,就让请求排成队列 * 基于erlang语言 * 消息队列分类 * simple简单队列【先后顺序】 * work工作模式【资源竞争】--红包 * publish/subscribe发布订阅【共享资源】:引入交换机--邮件的群发、群聊天、广播 * 路由模式:消息的生产者发送给交换机,通过路由判断key值发送到相应的队列--error通知 * topic主题模式(路由模式的一种):通过表达式进行判断--\*代表多个单词,\#号代表一个单词 * 注意:别名 * publish:fanout * routing:direct * topic:topic ### 使用 ### * 依赖 # # <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.5.1</version> </dependency> * 流程 1. 创建连接工厂 2. 从连接工厂获取connection 3. 从连接获取channel 4. 从channel获取绑定的queue 5. 生产者生产消息放入队列 6. 释放资源 ### RabbitMQ的工作原理 ### * 单发送,单接收 * 使用场景:简单的发送与接收,没有设么特别的处理 * ![0QPW6QC.png][] * 示例【生产者】 # # public class Send { private final static String QUEUE_NAME = "hello";//队列的名称 public static void main(String[] argv) throws Exception { // 获取连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 设置主机IP factory.setHost("localhost"); // 获取连接 Connection connection = factory.newConnection(); // 创建通道 Channel channel = connection.createChannel(); // 通道找到队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; // 发送消息给队列 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); // 关闭连接 channel.close(); connection.close(); } } * 示例【消费者】 # # public class Recv { // 队列名称 private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { // 获得连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 设置主机IP factory.setHost("localhost"); // 获得连接 Connection connection = factory.newConnection(); // 创建通道 Channel channel = connection.createChannel(); // 通道连接队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 接收队列 QueueingConsumer consumer = new QueueingConsumer(channel); // 执行 channel.basicConsume(QUEUE_NAME, true, consumer); // 遍历队列消息 while (true) { // 传送队列信息 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } } } * 单发送多接收 * 使用场景:一个发送端,多个接收端,如分布式的任务派发。为了保证消息发送的可靠性,不丢失消息,使消息持久化了。同时为了防止接收端在处理消息时down掉,只有在消息处理完成后才发送ack消息。 * ![vt95qL1.png][] * 示例【生产者】 # # public class NewTask { // 队列的名称 private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { // 连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 设置主机IP factory.setHost("localhost"); // 获取连接 Connection connection = factory.newConnection(); // 创建通道 Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = getMessage(argv); // PERSISTENT_TEXT_PLAIN:消息的持久化 channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } private static String getMessage(String[] strings){ if (strings.length < 1) return "Hello World!"; return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) return ""; StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } } * 示例【消费者】 # # public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 使用了channel.basicQos(1)保证在接收端一个消息没有处理完时不会接收另一个消息,即接收端发送了ack后才会接收下一个消息。在这种情况下发送端会尝试把消息发送给下一个not busy的接收端 channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(TASK_QUEUE_NAME, false, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); doWork(message); System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } private static void doWork(String task) throws InterruptedException { for (char ch: task.toCharArray()) { if (ch == '.') Thread.sleep(1000); } } } * Publish/Subscribe * 使用场景:发布、订阅模式,发送端发送广播消息,多个接收端接收 * ![1jRyhLo.png][] * 示例【生产者】 # # public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } private static String getMessage(String[] strings){ if (strings.length < 1) return "info: Hello World!"; return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) return ""; StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } } * 示例【消费者】 # # public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } } } * Routing (按路线发送接收) * 使用场景:发送端按routing key发送消息,不同的接收端按不同的routing key接收消息 * ![fzDXMDh.png][] * 示例【生产者】 # # public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String severity = getSeverity(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); System.out.println(" [x] Sent '" + severity + "':'" + message + "'"); channel.close(); connection.close(); } private static String getSeverity(String[] strings){ if (strings.length < 1) return "info"; return strings[0]; } private static String getMessage(String[] strings){ if (strings.length < 2) return "Hello World!"; return joinStrings(strings, " ", 1); } private static String joinStrings(String[] strings, String delimiter, int startIndex) { int length = strings.length; if (length == 0 ) return ""; if (length < startIndex ) return ""; StringBuilder words = new StringBuilder(strings[startIndex]); for (int i = startIndex + 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } } * 示例【消费者】 # # public class ReceiveLogsDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1){ System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]"); System.exit(1); } for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(" [x] Received '" + routingKey + "':'" + message + "'"); } } } * Topics (按topic发送接收) * 使用场景:发送端不只按固定的routing key发送消息,而是按字符串“匹配”发送,接收端同样如此。 * ![aUuVgIP.png][] * 示例【生产者】 # # public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) { Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String routingKey = getRouting(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes()); System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (Exception ignore) {} } } } private static String getRouting(String[] strings){ if (strings.length < 1) return "anonymous.info"; return strings[0]; } private static String getMessage(String[] strings){ if (strings.length < 2) return "Hello World!"; return joinStrings(strings, " ", 1); } private static String joinStrings(String[] strings, String delimiter, int startIndex) { int length = strings.length; if (length == 0 ) return ""; if (length < startIndex ) return ""; StringBuilder words = new StringBuilder(strings[startIndex]); for (int i = startIndex + 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } } * 示例【消费者】 # # public class ReceiveLogsTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) { Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1){ System.err.println("Usage: ReceiveLogsTopic [binding_key]..."); System.exit(1); } for(String bindingKey : argv){ channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(" [x] Received '" + routingKey + "':'" + message + "'"); } } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (Exception ignore) {} } } } } # 秒杀 # ### 业务场景分析 ### * 并发量很高的时间段--抢商品 * 队列中的消息可以是什么 1. 电话号码 2. username 3. ticket 4. ...... * 做法 * 调用SSO查询用户信息,把前n个消息获取到,后面的放入rabbitmq的垃圾桶 * 更高的并发可以考虑分布式的队列 * 文件位置 * 生产者:后台 * 消费者:前台 * 秒杀设计 * ![KoWfMTK.png][] * 未完待续。。。 ### 注:参考文章 ### * RabbitMQ的几种典型使用场景:https://www.rabbitmq.com/getstarted.html [vyeDxNb.png]: https://i.imgur.com/vyeDxNb.png [NMf2nFT.png]: https://i.imgur.com/NMf2nFT.png [0QPW6QC.png]: https://i.imgur.com/0QPW6QC.png [vt95qL1.png]: https://i.imgur.com/vt95qL1.png [1jRyhLo.png]: https://i.imgur.com/1jRyhLo.png [fzDXMDh.png]: https://i.imgur.com/fzDXMDh.png [aUuVgIP.png]: https://i.imgur.com/aUuVgIP.png [KoWfMTK.png]: https://i.imgur.com/KoWfMTK.png
相关 大数据正式京淘3 大数据正式京淘3 EasyUI简介 文档 每个组件的easyui有属性、方法和事件。用户可以方便地扩展。 属性 Dear 丶/ 2022年06月02日 12:51/ 0 赞/ 248 阅读
相关 大数据正式京淘2 大数据正式京淘2 项目统一 编码:UTF-8 环境:JDK1.8 Maven:3.5 数据库:5.5 项目支撑系统搭建 新建w 末蓝、/ 2022年06月02日 12:27/ 0 赞/ 223 阅读
相关 大数据正式京淘正式14 大数据正式京淘正式14 传统的检索方式 1.文本检索/windows检索 全文检索、全文遍历 加载到内存中 缺点:数据一多,无法高效查询 蔚落/ 2022年06月01日 13:54/ 0 赞/ 199 阅读
相关 大数据正式京淘13 大数据正式13 定时任务 防止恶意订单 在订单提交之后,没有支付,但是订单没有生成效益,却减少了库存,如果大量生成这种订单,库存到0, 青旅半醒/ 2022年06月01日 12:43/ 0 赞/ 79 阅读
相关 大数据正式京淘10 大数据正式京淘10 数据库的读写分离 电商项目京淘项目的瓶颈有哪些 1. 数据库瓶颈 2. IO【图片(文件)的上传】 我就是我/ 2022年06月01日 07:19/ 0 赞/ 241 阅读
相关 大数据正式京淘9 大数据正式京淘9 redis集群总结 引入槽道:14384个虚拟槽道,扩展节点,无需修改代码 删除节点 1. 线路割接 2. 心已赠人/ 2022年06月01日 06:23/ 0 赞/ 266 阅读
相关 大数据正式京淘7 大数据正式京淘7 解决入口流量--NGINX的集群分配问题 解决:配置多台DNS域名解析器 图解 ![lzglcWQ.png][ ╰半橙微兮°/ 2022年06月01日 04:37/ 0 赞/ 191 阅读
相关 大数据正式京淘6 大数据正式京淘6 Redis基础命令 set key value【修改值】 incr key【自增】 decr key【自减】 incrby 秒速五厘米/ 2022年06月01日 02:30/ 0 赞/ 214 阅读
相关 大数据正式京淘4 大数据正式京淘4 数据库性能 数据库需要维护外键的内部关联(if语句,用代码关联) 涉及外键的操作增删改查,判断外键消耗资源 外键存在导致数据库的 冷不防/ 2022年06月01日 00:23/ 0 赞/ 97 阅读
还没有评论,来说两句吧...