springboot websocket 如何只对一个客户端发送数据?

场景:

目前需要实时在大屏上展示数据,且只有一个大屏客户端没有登陆页面就单独一个H5页面。

诉求:

由于H5没有登陆页,如何让socket只连接这个H5页面发送过来的请求,除这个页面以外的请求不连接。

下面的是我的代码。

pom.xml

        <dependency>
            <groupId>com.corundumstudio.socketio</groupId>
            <artifactId>netty-socketio</artifactId>
            <version>1.7.18</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

yml

ws:
  port: 8888
  title: socket服务器
  host: 0.0.0.0
  boss-count: 1
  work-count: 100
  allow-custom-requests: true
  upgrade-timeout: 10000
  ping-timeout: 60000
  ping-interval: 25000
@Data
@ConfigurationProperties("ws")
public class AppProperties {
    /**
     * title
     */
    private String title;

    /**
     * host
     */
    private String host;

    /**
     * port
     */
    private Integer port;

    /**
     * bossCount
     */
    private int bossCount;

    /**
     * workCount
     */
    private int workCount;

    /**
     * allowCustomRequests
     */
    private boolean allowCustomRequests;

    /**
     * upgradeTimeout
     */
    private int upgradeTimeout;

    /**
     * pingTimeout
     */
    private int pingTimeout;

    /**
     * pingInterval
     */
    private int pingInterval;
@Slf4j
@Configuration
@EnableConfigurationProperties({
        AppProperties.class,
})
public class AppConfiguration {

    @Bean
    public SocketIOServer socketIoServer(AppProperties appProperties) {
        SocketConfig socketConfig = new SocketConfig();
        socketConfig.setTcpNoDelay(true);
        socketConfig.setSoLinger(0);
        com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
        config.setSocketConfig(socketConfig);
        config.setHostname(appProperties.getHost());
        config.setPort(appProperties.getPort());
        config.setBossThreads(appProperties.getBossCount());
        config.setWorkerThreads(appProperties.getWorkCount());
        config.setAllowCustomRequests(appProperties.isAllowCustomRequests());
        config.setUpgradeTimeout(appProperties.getUpgradeTimeout());
        config.setPingTimeout(appProperties.getPingTimeout());
        config.setPingInterval(appProperties.getPingInterval());
        return new SocketIOServer(config);
    }
}
@Configuration
public class WebSocketConfiguration {

    @Bean
    public ServerEndpointExporter handlerAdapter() {
        return new ServerEndpointExporter();
    }
}
@Slf4j
@ServerEndpoint("/ws/screen")
@Component
public class WebSocketServer {
   
    private static final ConcurrentMap<String, WebSocketServer> USER_CLIENT_MAP = new ConcurrentHashMap<>();
    private Session session;

    /**
     * 连接建立成功调用的方法
     *
     * @param session 链接session
     */
    @OnOpen
    public void onOpen(Session session) {
        USER_CLIENT_MAP.put(session.getId(), this);
        this.session = session;
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message 消息内容
     * @param session session
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        log.debug("websocket连接onMessage。{},{},{}", message, this.toString(), session.getId());
        String key = this.session.getId();
        if (!USER_CLIENT_MAP.containsKey(key)) {
            this.close();
            return;
        }
    }

    /**
     * 发生异常时的处理
     *
     * @param session   客户端session
     * @param throwable session
     */
    @OnError
    public void onError(Session session, Throwable throwable) {
        if (this.session != null && this.session.isOpen()) {
            log.error("websocket连接onError。inputSession:{}-localSession:{}", session.getId(), this.toString(), throwable);
            this.close();
        } else {
            log.debug("已经关闭的websocket连接发生异常!inputSession:{}-localSession:{}", session.getId(), this.toString(), throwable);
        }
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        log.debug("websocket连接onClose。{}", this.toString());
        //然后关闭
        this.close();
    }

    /**
     * 推送消息给客户端
     *
     * @param message 消息内容
     */
    private void sendMessage(String message) {
        try {
            //使用同步块和同步方法发送。看是否能防止产生IllegalStateException异常
            //modify by caoshuo at 200506
            synchronized (session) {
                session.getBasicRemote().sendText(message);
            }
        } catch (Exception e) {
            log.error("websocket连接发送客户端发送消息时异常!{}-{}", message, this.toString(), e);
        }
    }

    /**
     * 关闭session连接
     */
    private void close() {
        //从本机map中移除连接信息
        USER_CLIENT_MAP.remove(this.session.getId());
        if (session == null) {
            log.debug("websocket连接关闭完成。{}", this.toString());
            return;
        }

        try {
            if (session.isOpen()) {
                session.close();
            }
            log.info("连接已经关闭");
        } catch (IOException e) {
            log.error("websocket连接关闭时异常。{}", this.toString(), e);
        }
    }

    public void sendToAll(Map<String, Object> map) {
        USER_CLIENT_MAP.values().forEach(item -> {
            item.sendMessage(map.toString());
        });
    }
}
@Slf4j
@Service
@RequiredArgsConstructor
public class MsgSendService {

    private static final Map<UUID, SocketIOClient> CLIENT_MAP = new ConcurrentHashMap<>();
    private final SocketIOServer socketIOServer;


    /**
     * 启动的时候会被调用一次
     */
    @PostConstruct
    private void autoStart() {
        log.info("start ws");
        socketIOServer.addConnectListener(client -> {
            CLIENT_MAP.put(client.getSessionId(), client);
        });
        socketIOServer.addDisconnectListener(client -> {
            CLIENT_MAP.remove(client.getSessionId());
            client.disconnect();
            log.info("移除client:{}", client.getSessionId());
        });
        socketIOServer.start();
        log.info("start finish");
    }

    @PreDestroy
    private void onDestroy() {
        if (socketIOServer != null) {
            socketIOServer.stop();
        }
    }

    public int sendMsg(Object demo) {
        CLIENT_MAP.forEach((key, value) -> {
            value.sendEvent("server_event", demo);
            log.info("发送数据成功:{}", key);
        });
        return CLIENT_MAP.size();
    }
}
@RequiredArgsConstructor
@RestController
public class MsgController {

    private final MsgSendService msgSendService;
    private final WebSocketServer webSocketServer;


    @ApiOperation("发送消息")
    @GetMapping("send")
    public String send(String msg) {
        Map<String,Object> map=new HashMap<>();
        map.put("msg",msg);
        map.put("date", LocalDateTime.now().toString());
        int size = msgSendService.sendMsg(map);
        webSocketServer.sendToAll(map);
        return "发送成功" + size;
    }
}

这个代码应该怎么改 大佬们指点下

阅读 2.2k
2 个回答

我没太懂你的问题,哪个页面需要接收消息、就在哪个页面建立 ws 连接啊,其他页面都不需要收消息为啥还要引入 ws?

看你这个需求除了H5页面应该也不会有别的客户端建立连接吧
如果还是要区分可以让客户端增加自己的标识,比如url为ws://xxxx?client=H5,然后在服务端onOpen通过session.getRequestParameterMap()获取client值,如果不是H5可以直接关掉

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题