Java / Java web · 2020年8月19日 0

HTML5服务器推送事件(Server-sent-event)

HTML5服务器推送事件(Server-sent-event)

在前端开发中,实现界面推送的方式,这里大概总结下三种方式

  1. 轮询(ajax),比较耗费服务器资源。COMET方式(COMET 技术并不是 HTML 5 )
  2. websocket 双向数据推送,灵活,功能强大
  3. Server-sent-event(简称SSE),单项数据推送(Server-sent Events 规范是 HTML 5 规范的一个组成部分)

这里我们只讨论SSE

SSE的本质:严格地说,HTTP协议无法做到服务器主动推送信息。但是有一种变通的发光法,就是服务器向客户端声明,接下来要发送的是流信息,也就是说,发送的不是一次性的数据包,而是一个数据流,会连续不断的发送过来。这是客户端不会关闭连接,会一直等待服务器发过来的数据流,视频播放就是这样的例子。本质上这种通信就是以流信息的方式,完成一次用时很长的下载。

SSE就是利用这种机制,使用流信息想浏览器推送信息。它基于HTTP协议,除了IE/Edge,其他浏览器都支持

闲谈

在Web开发中,浏览器和服务器之间使用请求/响应的交互模式。浏览器发出请求,服务器根据请求来生成响应。这种交互方式,服务器端产生数据变化后不能及时的通知给浏览,只能在浏览器下次请求的时候,才能获取(对于某些对数据实时性要求很高的应用,这种延迟是不能接受的)。

使用浏览器原生的EventSource对象的一个比较大的问题是IE并不支持(为实现在IE上COMET或轮询,第二种使用polyfill技术)

SSE的客户端API部署在EventSource对象上,使用之前检测浏览器是否支持SSE

if(typeof(EventSource)!=="undefined")
{
    // 浏览器支持 Server-Sent
    // 一些代码.....
}
else
{
    // 浏览器不支持 Server-Sent..
}
var source = new EventSource(url);
// url可以在当前网址同域,也可以跨域,跨域时可以指定第二个参数withCredentials 表示是否一起发送Cookies

EventSource的readyState 表明连接的当前状态,该属性只读

  • 0: 相当于常量EventSource.CONNECTIONG 表示连接还未建立,或者断线正在重连
  • 1:相当于常量EventSource.OPEN 表示连接已经建立,可以接受数据
  • 2:相当于常量EventSource.CLOSED 表示连接已断,且不会重连

Server-sent Events

Server-sent Events 规范是 HTML 5 规范的一个组成部分,该规范比较简单,主要由两部分组成:

第一部分:服务器端与浏览器之间的通讯协议

第二部分:浏览器端可以提供JavaScript中使用EventSource对象。

通讯协议是基于纯文本的简单协议

服务器端响应的内容类型为:text/event-stream,响应文本内容可以看成是一个事件流,有不同的事件组成。

每个事件由类型和数据两个部分组成,同时每个事件可以有一个可选的标识符。不同事件的内容之间通过仅包含回车符和换行符的空行来分隔。每个事件的数据可能由多行组成。

清单1

data: first event

data: second event
id: 100

event: myevent
data: third event
id: 101

: this is a comment
data: fourth event
data: fourth event continue

每个事件之间通过空行来分隔。对于每行来说,冒号(:)前面表示的该行的类型,冒号后面则是对应的值。

可能的类型包括:

  • 类型为空白,表示该行是注释,会在处理时被忽略
  • 类型为data,表示该行包含的是数据。以data开头的行可以出现多次。所有这些行都是该事件的数据。
  • 类型为event,表示该行用来声明事件的类型。浏览器在收到数据时,会产生对应类型的事件。
  • 类型 为id,表示该行用来声明事件的标识符
  • 类型为retry,表示该行用来声明浏览器在连接断开后检修 再次连接之前的等待时间

清单1,第一个事件只包含数据“first event”,会产生默认的事件,第二个事件的标识符是100,数据为“secondEvent”,第三个事件会产生类型为“myevent”的事件,最后一个事件的数据为“fourthevent \n fourth event continue”,当有多行数据时,实际的数据由每行数据以换行符连接而成

如果服务器端返回的数据中包含了事件的标识符,浏览器会记录最后一次接收到的事件的标识符,如果与服务器多连接中断,当浏览器端再次进行连接时,会通过HTTP头“Last-Event-ID”来声明最后一次接收到的事件的标识符,服务器端可以通过浏览器发送的事件标识符来确定从哪个事件开始来继续连接。

对于服务器端返回的响应,浏览器端需要通过JavaScript的EventSource对象来 处理,EventSource使用的是标准的事件监听器方式,只需要在对象上添加相应的事件处理方式即可。

EventSource对象提供的标准事件

名称 说明 事件处理方法
open 当成功与服务器建立连接时产生 onopen
message 当收到服务器发送的事件时产生 onmessage
error 当出现错误时产生 onerror

服务器端可以返回自定义类型的事件,可以使用addEventListenner方法来添加相应的事件处理方法

var  es = new EventSource('events');
es.onmessage = function(e){
    console.log(e.data);
}
es.addEventListener('myevent',function(e){
    console.log(e.data);
})

在指定URL创建出EventSource对象之后,可以通过onmessage和addEventListener方法来添加事件处理方法,当服务器端有新的事件产生,相应的事件处理方法会被调用。EventSource对象的onmessage属性的作用类似于addEventListerner('message'),不过onmessage属性只支持一个事件处理方法

默认情况下,服务器发来的数据,总是触发浏览器EventSource实例的message事件。

常见实现方式对比

(短)轮询 长轮询/Comet SSE WebSocket
浏览器支持 全部 全部 除IE/Edge 现代浏览器
是否独立协议 HTTP HTTP HTTP WS
是否轻量
断线重连
负载压力 占用内存/请求数 同(短)轮询 一般 一般
数据延迟 取决于请求间隔 同(短)轮询 实时 实时

Demo 实现方式一(Spring boot)

踩坑总结

  • 在连接的时候,前端连接是否跨域了,如果跨域先解决跨域

  • 如果服务器端调用sseEmitter.complete();前端就执行连接关闭,再次发送前端数据的时候,重新连接回打开(一般我们处理完成的时候执行)

  • 同一个用户在连接到后台的时候,后台的SseEmitter对象需要移除在重新添加,否则后台推送不过来

image-20200819175021176

精简版

服务器代码

package com.zdltech.javaexercise.ssedemo.controller;

import org.springframework.http.MediaType;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

@Controller
@CrossOrigin
public class IndexController {
    private Map<String,SseEmitter>  pushSseEmitterMap = new HashMap<>();

    @GetMapping(path = "/push/{id}",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter  pushSse(@PathVariable("id") String uid){
        System.out.println("pushSse is run");
        SseEmitter sseEmitter = new SseEmitter(0L);
        sseEmitter.onCompletion(()->{pushSseEmitterMap.remove(uid);});
        sseEmitter.onTimeout(()->{pushSseEmitterMap.remove(uid);});
        sseEmitter.onError(throwable->{
            System.out.println("onError is run");
            throwable.printStackTrace();
            pushSseEmitterMap.remove(uid);
        });
        if (!pushSseEmitterMap.containsKey(uid)){
            pushSseEmitterMap.remove(uid);
        }
        pushSseEmitterMap.put(uid,sseEmitter);
        return sseEmitter;
    }

//    @Scheduled(fixedDelay = 2*1000)
//    public void scheduledMsgEmitter() throws IOException
//    {
//        pushSseEmitterMap.keySet().forEach(key -> {
//           SseEmitter emitter = pushSseEmitterMap.get(key);
//            if (null != emitter){
//                try {
//                    System.out.println("Timeout : "+ emitter.getTimeout());
//                    emitter.send(": " + Calendar.getInstance().getTime());
//                } catch (IOException e) {
//                    e.printStackTrace();
//                }
//            }
//
//        });
//    }

    @RequestMapping("/push/send")
    @ResponseBody
    public String push(@RequestParam String uid,@RequestParam String value){
        System.out.println("push is run");
        SseEmitter  sseEmitter = pushSseEmitterMap.get(uid);
        if (sseEmitter!=null){
            try {
                sseEmitter.send(value,MediaType.APPLICATION_JSON);
            } catch (IOException e) {
                e.printStackTrace();
                return "fail";
            }
        }

        return "ok";
    }

    @RequestMapping("/push/finish")
    @ResponseBody
    public String finish(@RequestParam String uid,@RequestParam String value){
        System.out.println("finish is run");
        SseEmitter  sseEmitter = pushSseEmitterMap.get(uid);
        if (sseEmitter!=null){
            try {
                sseEmitter.send(value,MediaType.APPLICATION_JSON);
                sseEmitter.complete();
            } catch (IOException e) {
                e.printStackTrace();
                return "fail";
            }
        }

        return "ok";
    }
}

前端:

<!DOCTYPE html>
<html>
<head>
  <meta charset="utf-8">
  <meta name="viewport" content="width=device-width">
  <title>JS Bin</title>
</head>
<body>
<div id="example"></div>
<script>
  var source = new EventSource('http://127.0.0.1:8844/push/a123456');
// var source = new EventSource('http://127.0.0.1:8844/sse/connect/a123456');
  var div = document.getElementById('example');
  
  source.onopen = function (event) {
    div.innerHTML += '<p>Connection open ...</p>';
  };
  
  source.onerror = function (event) {
    div.innerHTML += '<p>Connection close.</p>';
  };
  
  source.addEventListener('connecttime', function (event) {
    div.innerHTML += ('<p>Start time: ' + event.data + '</p>');
  }, false);
  
  source.onmessage = function (event) {
    div.innerHTML += ('<p>Ping: ' + event.data + '</p>');
  };
  
</script>
</body>
</html>

执行结果:

image-20200819175320871

简单封装版

SseEmitter

SseEmitter是SpringMVC(4.2+)提供的一种技术,它是基于Http协议的,相比WebSocket,它更轻量,但是它只能从服务端向客户端单向发送信息。在SpringBoot中我们无需引用其他jar就可以使用

  • 创建AtomicInteger用于记录连接数
  • 创建ConcurrentHashMap用于存放连接信息
  • 建立连接:创建并返回一个带有超时时间的SseEmitter给前端。超时间设为0表示永不过期
  • 设置连接结束的回调方法completionCallBack
  • 设置连接超时的回调方法timeoutCallBack
  • 设置连接异常的回调方法errorCallBack
  • 创建推送信息的方法SseEmitter.send()
  • 创建移除连接的方法

服务器端

import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

@RestController
@RequestMapping("/sse")
@CrossOrigin
public class SseEmitterController {

    /**
     * 用于创建连接
     */
    @GetMapping("/connect/{userId}")
    public SseEmitter connect(@PathVariable String userId) {
        return SseEmitterServer.connect(userId);
    }

    @GetMapping("/push/{message}")
    public ResponseEntity<String> push(@PathVariable(name = "message") String message) {
        SseEmitterServer.batchSendMessage(message);
        return ResponseEntity.ok("WebSocket 推送消息给所有人");
    }

}


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;


public class SseEmitterServer {

    private static final Logger logger = LoggerFactory.getLogger(SseEmitterServer.class);

    /**
     * 当前连接数
     */
    private static AtomicInteger count = new AtomicInteger(0);

    /**
     * 使用map对象,便于根据userId来获取对应的SseEmitter,或者放redis里面
     */
    private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();

    /**
     * 创建用户连接并返回 SseEmitter
     *
     * @param userId 用户ID
     * @return SseEmitter
     */
    public static SseEmitter connect(String userId) {
        if (sseEmitterMap.containsKey(userId)){//在连接的时候,如果存在就移除原来连接,生产新的连接
            removeUser(userId);
        }
        // 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException
        SseEmitter sseEmitter = new SseEmitter(0L);
        // 注册回调
        sseEmitter.onCompletion(completionCallBack(userId));
        sseEmitter.onError(errorCallBack(userId));
        sseEmitter.onTimeout(timeoutCallBack(userId));
        sseEmitterMap.put(userId, sseEmitter);
        // 数量+1
        count.getAndIncrement();
        logger.info("创建新的sse连接,当前用户:{}", userId);
        return sseEmitter;
    }

    /**
     * 给指定用户发送信息
     */
    public static void sendMessage(String userId, String message) {
        if (sseEmitterMap.containsKey(userId)) {
            try {
                // sseEmitterMap.get(userId).send(message, MediaType.APPLICATION_JSON);
                sseEmitterMap.get(userId).send(message);
            } catch (IOException e) {
                logger.error("用户[{}]推送异常:{}", userId, e.getMessage());
                removeUser(userId);
            }
        }
    }

    /**
     * 群发消息
     */
    public static void batchSendMessage(String wsInfo, List<String> ids) {
        ids.forEach(userId -> sendMessage(wsInfo, userId));
    }

    /**
     * 群发所有人
     */
    public static void batchSendMessage(String wsInfo) {
        sseEmitterMap.forEach((k, v) -> {
            try {
                v.send(wsInfo, MediaType.APPLICATION_JSON);
            } catch (IOException e) {
                logger.error("用户[{}]推送异常:{}", k, e.getMessage());
                removeUser(k);
            }
        });
    }

    /**
     * 移除用户连接
     */
    public static void removeUser(String userId) {
        sseEmitterMap.remove(userId);
        // 数量-1
        count.getAndDecrement();
        logger.info("移除用户:{}", userId);
    }

    /**
     * 获取当前连接信息
     */
    public static List<String> getIds() {
        return new ArrayList<>(sseEmitterMap.keySet());
    }

    /**
     * 获取当前连接数量
     */
    public static int getUserCount() {
        return count.intValue();
    }

    private static Runnable completionCallBack(String userId) {
        return () -> {
            logger.info("结束连接:{}", userId);
            removeUser(userId);
        };
    }

    private static Runnable timeoutCallBack(String userId) {
        return () -> {
            logger.info("连接超时:{}", userId);
            removeUser(userId);
        };
    }

    private static Consumer<Throwable> errorCallBack(String userId) {
        return throwable -> {
            logger.info("连接异常:{}", userId);
            removeUser(userId);
        };
    }

}

前端二:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>SseEmitter</title>
</head>
<body>
<button onclick="closeSse()">关闭连接</button>
<div id="message"></div>
</body>
<script>
    let source = null;

    // 用时间戳模拟登录用户
    const userId = new Date().getTime();

    if (!!window.EventSource) {

        // 建立连接
        source = new EventSource('http://127.0.0.1:8844/sse/connect/' + userId);

        /**
         * 连接一旦建立,就会触发open事件
         * 另一种写法:source.onopen = function (event) {}
         */
        source.addEventListener('open', function (e) {
            setMessageInnerHTML("建立连接。。。");
        }, false);

        /**
         * 客户端收到服务器发来的数据
         * 另一种写法:source.onmessage = function (event) {}
         */
        source.addEventListener('message', function (e) {
            setMessageInnerHTML(e.data);
        });


        /**
         * 如果发生通信错误(比如连接中断),就会触发error事件
         * 或者:
         * 另一种写法:source.onerror = function (event) {}
         */
        source.addEventListener('error', function (e) {
            if (e.readyState === EventSource.CLOSED) {
                setMessageInnerHTML("连接关闭");
            } else {
                console.log(e);
            }
        }, false);

    } else {
        setMessageInnerHTML("你的浏览器不支持SSE");
    }

    // 监听窗口关闭事件,主动去关闭sse连接,如果服务端设置永不过期,浏览器关闭后手动清理服务端数据
    window.onbeforeunload = function () {
        closeSse();
    };

    // 关闭Sse连接
    function closeSse() {
        source.close();
        const httpRequest = new XMLHttpRequest();
        httpRequest.open('GET', 'http://127.0.0.1:8844/sse/close/' + userId, true);
        httpRequest.send();
        console.log("close");
    }

    // 将消息显示在网页上
    function setMessageInnerHTML(innerHTML) {
        document.getElementById('message').innerHTML += innerHTML + '<br/>';
    }
</script>
</html>

Demo实现二(Node)

Node 服务器实例(如果自己想实现SSE服务端参考这个,主要是实现协议)

SSE要求服务器与浏览器保持连接,对于不同的服务器软件来说,所消耗的资源也不一样的。Node则是所有连接都使用同一个线程,因此消耗的资源会小很多,但是这个要求每个连接不能包含很耗时的操作(比如磁盘IO的读写)

var http = require("http");

http.createServer(function(req,res){
  var fileName = "."+req.url;
  if(fileName==="./stream"){
    res.writeHead(200,{
      "Content-Type":"text/event-stream",
      "Cache-Control":"no-cache",
      "Connection":"keep-alive",
      "Access-Control-Allow-Origin":"*"
    });
    res.write("retry:10000\n");
    res.write("event:connecttime\n");
    res.write("data:"+(new Date())+"\n\n");
    res.write("data:"+(new Date())+"\n\n");
    interval = setInterval(function(){
      res.write("data:"+(new Date())+"\n\n");
    }
    );
    req.connection.addListener("close",function(){
      clearInterval(interval);
    },fase);
  }
}).listen(8888,"127.0.0.1");

前端代码

<!DOCTYPE html>
<html>
<head>
  <meta charset="utf-8">
  <meta name="viewport" content="width=device-width">
  <title>JS Bin</title>
</head>
<body>
<div id="example"></div>
<script>
  var source = new EventSource('http://127.0.0.1:8844/stream');
  var div = document.getElementById('example');
  
  source.onopen = function (event) {
    div.innerHTML += '<p>Connection open ...</p>';
  };
  
  source.onerror = function (event) {
    div.innerHTML += '<p>Connection close.</p>';
  };
  
  source.addEventListener('connecttime', function (event) {
    div.innerHTML += ('<p>Start time: ' + event.data + '</p>');
  }, false);
  
  source.onmessage = function (event) {
    div.innerHTML += ('<p>Ping: ' + event.data + '</p>');
  };
  
</script>
</body>
</html>

Demo实现三(PHP)

Php实现SSE

<?php
date_default_timezone_set("America/New_York");
header("Content-Type: text/event-stream");

counter = rand(1, 10); // a random counter
while (1) {
// 1 is always true, so repeat the while loop forever (aka event-loop)curDate = date(DATE_ISO8601);
  echo "event: ping\n",
       'data: {"time": "' . curDate . '"}', "\n\n";

  // Send a simple message at random intervals.counter--;

  if (!counter) {
    echo 'data: This is a message at time ' .curDate, "\n\n";
    $counter = rand(1, 10); // reset random counter
  }

  // flush the output buffer and send echoed messages to the browser

  while (ob_get_level() > 0) {
    ob_end_flush();
  }
  flush();

  // break the loop if the client aborted the connection (closed the page)
  
  if ( connection_aborted() ) break;

  // sleep for 1 second before running the loop again
  
  sleep(1);

}

Demo实现四

服务器端实现

服务器端实现Server-sent Events,服务器端的 实现由两部分组成:一部分用来产生数据的org.eclipse.jetty.servlets.EventSource,另一部分作为浏览器访问端点的继承自org.eclipse.jetty.servlet.EventSourceServlet类的Servlet实现。

实现EventsSource接口MovenmentEventSource

public    class MovenmentEventSource implements EventSource{
  private int width =800;
  private int height = 600;
  private int stepMax =5;
  private int x=0;
  private int y = 0;
  private Random random = new Random();
  public MovennmentEventSource(int width,int height,int stepMax){
    this.width = width;
    this.height = height;
    this.stepMax = stepMax;
    this.x = random.nextInt(width);
    this.y = random.nextInt(height);
  }
  @Override
  public void onOpen(Emitter emitter) throws IOException{
    query(emitter);//开始生产位置信息
  }
  
  @Override
  public void onResume(Emitter emitter,String lastEventId){
    updatePosition(lastEventId);//更新起始位置
    query(emitter);//开始生成位置信息 
  }
  
  //根据Last-Event-Id来更新起始位置
  private void updatePosition(String id){
    if(id!=null){
      String[] pos = id.split(",");
      if(pos.length>1){
        int xPos =-1,yPos = -1;
        try{
          xPos = Integer.parseInt(pos[0],10);
          yPos = Integer.parseInt(pos[1],10);
        }catch(NumberFormatException e){
          
        }
        if(isValidMove(xPos,yPos)){
          xPos = xPos;
          yPos = yPos;
        }
      }
    }
  }
  
  private void query(Emitter emitter) throws IOException{
    emitter.comment("Start sending movement information.");
    while(true){
      emitter.comment("");
      move();//移动位置
      String id = String.format("%s,%s",x,y);
      emitter.id(id);//根据位置生成时间 标识符
      emitter.data(id);//发送位置信息数据
      try{
        Thread.sleep(2000);
      }catch(InterruptedException e){
        break;
      }
    }
  }
  
  @Override
  public void onClose(){
    
  }
  
  //获取下一个合法的移动位置
  private void move(){
    while(true){
      int[] move = getMove();
      int xNext = x+move[0];
      int yNext = y+move[1];
      if(isValidMove(xNext,yNext)){
        x = xNext;
        y = yNext;
        break;
      }
    }
  }
  
  //判断当前的移动位置是否合法
  private boolean inValidMove(int x,int y){
    return x >=0 && x<=width && y>=0 && y <= height;
  }
  
  //随机生成下一个移动位置
  private int[] getMove(){
    int[] xDir = new int[]{-1,0,1,0};
    int[] yDir = new int[]{0,-1,0,1};
    int dir = random,nextInt(4);
    return new int[]{xDir[dir]*random.nextInt(stepMax),yDir[dir]*random.nextInt(stepMax)};
  }
}

onOpen方法在浏览器端的连接打开的时候被调用,onResume方法在浏览器重新建立连接的时候被调用,onClose方法则在浏览器关闭连接的时候被调用。

pubic class MovementServlet extends EventSourceServlet{
   @Override
   protected Event Source newEventSource(HttpServletRequest request,String clientId){
     return new MovenmentEventSource(800,600,20);
   }
}

浏览器端实现

浏览器端的实现比较简单,只需要创建出EventSource对象,并添加相应的事件处理方法即可。

var es = new EventSource('sse/movement');
es.addEventListener('message',function(e){
  var pos = e.data.split(','),x = pos[0], y = pos[1];
  $("#box").css(
      left: x+'px',
      top: y+'px'
  );
});

参考

https://www.ibm.com/developerworks/cn/web/1307_chengfu_serversentevent/index.html

http://www.ruanyifeng.com/blog/2017/05/server-sent_events.html

https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events

https://blog.csdn.net/superylcfly/article/details/103979799

Share this: