1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.htmlunit.websocket;
16
17 import java.io.IOException;
18 import java.net.URI;
19 import java.nio.ByteBuffer;
20 import java.util.concurrent.Future;
21
22 import org.htmlunit.WebClient;
23 import org.htmlunit.WebClientOptions;
24 import org.htmlunit.jetty.util.ssl.SslContextFactory;
25 import org.htmlunit.jetty.websocket.api.Session;
26 import org.htmlunit.jetty.websocket.api.WebSocketPolicy;
27 import org.htmlunit.jetty.websocket.client.WebSocketClient;
28
29
30
31
32
33
34
35
36 public final class JettyWebSocketAdapter implements WebSocketAdapter {
37
38
39
40
41 public static final class JettyWebSocketAdapterFactory implements WebSocketAdapterFactory {
42
43
44
45 @Override
46 public WebSocketAdapter buildWebSocketAdapter(final WebClient webClient,
47 final WebSocketListener webSocketListener) {
48 return new JettyWebSocketAdapter(webClient, webSocketListener);
49 }
50 }
51
52 private final Object clientLock_ = new Object();
53 private WebSocketClient client_;
54 private WebSocketListener listener_;
55
56 private volatile Session incomingSession_;
57 private Session outgoingSession_;
58
59
60
61
62
63
64 public JettyWebSocketAdapter(final WebClient webClient, final WebSocketListener listener) {
65 super();
66 final WebClientOptions options = webClient.getOptions();
67
68 if (webClient.getOptions().isUseInsecureSSL()) {
69 client_ = new WebSocketClient(new SslContextFactory(true), null, null);
70
71
72
73 }
74 else {
75 client_ = new WebSocketClient();
76 }
77
78 listener_ = listener;
79
80
81 client_.setExecutor(webClient.getExecutor());
82
83 client_.getHttpClient().setCookieStore(new WebSocketCookieStore(webClient));
84
85 final WebSocketPolicy policy = client_.getPolicy();
86 int size = options.getWebSocketMaxBinaryMessageSize();
87 if (size > 0) {
88 policy.setMaxBinaryMessageSize(size);
89 }
90 size = options.getWebSocketMaxBinaryMessageBufferSize();
91 if (size > 0) {
92 policy.setMaxBinaryMessageBufferSize(size);
93 }
94 size = options.getWebSocketMaxTextMessageSize();
95 if (size > 0) {
96 policy.setMaxTextMessageSize(size);
97 }
98 size = options.getWebSocketMaxTextMessageBufferSize();
99 if (size > 0) {
100 policy.setMaxTextMessageBufferSize(size);
101 }
102 }
103
104
105
106
107 @Override
108 public void start() throws Exception {
109 synchronized (clientLock_) {
110 client_.start();
111 }
112 }
113
114
115
116
117 @Override
118 public void connect(final URI url) throws Exception {
119 synchronized (clientLock_) {
120 final Future<Session> connectFuture = client_.connect(new JettyWebSocketAdapterImpl(), url);
121 client_.getExecutor().execute(() -> {
122 try {
123 listener_.onWebSocketConnecting();
124 incomingSession_ = connectFuture.get();
125 }
126 catch (final Exception e) {
127 listener_.onWebSocketConnectError(e);
128 }
129 });
130 }
131 }
132
133
134
135
136 @Override
137 public void send(final Object content) throws IOException {
138 if (content instanceof String) {
139 outgoingSession_.getRemote().sendString((String) content);
140 }
141 else if (content instanceof ByteBuffer) {
142 outgoingSession_.getRemote().sendBytes((ByteBuffer) content);
143 }
144 else {
145 throw new IllegalStateException(
146 "Not Yet Implemented: WebSocket.send() was used to send non-string value");
147 }
148 }
149
150
151
152
153 @Override
154 public void closeIncommingSession() {
155 if (incomingSession_ != null) {
156 incomingSession_.close();
157 }
158 }
159
160
161
162
163 @Override
164 public void closeOutgoingSession() {
165 if (outgoingSession_ != null) {
166 outgoingSession_.close();
167 }
168 }
169
170
171
172
173 @Override
174 public void closeClient() throws Exception {
175 synchronized (clientLock_) {
176 if (client_ != null) {
177 client_.stop();
178 client_.destroy();
179
180
181 client_ = null;
182 }
183 }
184 }
185
186 private class JettyWebSocketAdapterImpl extends org.htmlunit.jetty.websocket.api.WebSocketAdapter {
187
188
189
190
191 JettyWebSocketAdapterImpl() {
192 super();
193 }
194
195
196
197
198 @Override
199 public void onWebSocketConnect(final Session session) {
200 super.onWebSocketConnect(session);
201 outgoingSession_ = session;
202
203 listener_.onWebSocketConnect();
204 }
205
206
207
208
209 @Override
210 public void onWebSocketClose(final int statusCode, final String reason) {
211 super.onWebSocketClose(statusCode, reason);
212 outgoingSession_ = null;
213
214 listener_.onWebSocketClose(statusCode, reason);
215 }
216
217
218
219
220 @Override
221 public void onWebSocketText(final String message) {
222 super.onWebSocketText(message);
223
224 listener_.onWebSocketText(message);
225 }
226
227
228
229
230 @Override
231 public void onWebSocketBinary(final byte[] data, final int offset, final int length) {
232 super.onWebSocketBinary(data, offset, length);
233
234 listener_.onWebSocketBinary(data, offset, length);
235 }
236
237
238
239
240 @Override
241 public void onWebSocketError(final Throwable cause) {
242 super.onWebSocketError(cause);
243 outgoingSession_ = null;
244
245 listener_.onWebSocketError(cause);
246 }
247 }
248 }