websocket操作指南

发送事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package com.hollysys.hiadsp.websocket.test;

import java.util.UUID;

import com.hollysys.hiadsp.eventhub.util.HttpClientUtils;

import net.sf.json.JSONObject;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultHttpClient;

public class PublishClientTest {
public static void main(String[] args) {
String url;
// url = "http://59.110.14.235:18481/eventhub/publish/02/event_inner_04";
// url = "http://192.168.66.159:18481/eventhub/publish/04/event_inner_04";
// url = "http://eventhub-hollysys-project01.hiatest.tk:30312/eventhub/publish/000000/event_inner_000000";
// url = "http://workflow-eventhub-test-hollysys-project.hiadev.tk:80/eventhub/publish/04/event_inner_04";
url = "http://workflow-eventhub-test-hollysys-project.hiadev.tk:80/eventhub/rest/send";
String uuid = UUID.randomUUID().toString();
String body_json;
// body_json = "{\"eventName\":\" test" +
// "\",\"eventTime\":\"1489991893404\",\"source\":\"entityId\",\"sourceProperty\":\"\",\"eventData\":{\"vpredict\":\"29\",\"temp\":\"51\",\"vibration\":\"29\",\"collectTime\":\"1489727057183\",\"entityId\":\"111\",\"speed\":\"3363\",\"pres\":\"2\"}}";
// body_json = "{\"sourceId\":\"engetfb809d4d7c2844d2bfc6710a7753d23a\",\"eventName\":\"IOTData\",\"events\":[{\"heat\":80.0064720648321,\"temp\":30.0,\"light\":50.0,\"mov\":0,\"name\":\"IOT\"}]}";
String tenant_id = "000000";
String topicName = "sssssss";
body_json = "{\"eventName\": \"" + topicName + "\",\"eventTime\": 1517985178794,\"source\": \"\",\"sourceProperty\": \"\",\"eventData\": {\"tntId\":\"" + tenant_id + "\",\"sesssiong \":\"这是测试信息!!!!\"}}";
//body_json = "{\"eventData\":{\"dpsTime\":1543574185886,\"sesssiong \":\"订阅事件完成!\",\"tntId\":\""+tenant_id+"\"},\"eventName\":\""+topicName+"\",\"eventTime\":\"1543574184990\",\"source\":\" \",\"sourceProperty\":\" \"}";
body_json="{\"eventName\":\"sssssss\",\"eventTime\":\"1517985178794\",\"source\":\"\",\"sourceProperty\":\"\",\"eventData\":{\"tntId\":\"000000\",\"modelKey \":\"haier_temperature_ridge\",\"predictionData\":{\"data\":[]}}}";
JSONObject json = JSONObject.fromObject(body_json);
System.out.println(json);
JSONObject resultJson=null;
int i=0;
while (true){
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
resultJson = doPost(url, json);
System.out.println("消息"+i+"发送成功! " + resultJson.toString());
i++;
}



}
public static JSONObject doPost(String url, JSONObject json) {
DefaultHttpClient client = new DefaultHttpClient();
String surl = url.replaceAll(" ", "");
HttpPost post = new HttpPost(surl);
JSONObject response = null;
try {
StringEntity s = new StringEntity(json.toString());
s.setContentEncoding("UTF-8");
s.setContentType("application/json");// 发送json数据需要设置contentType
post.setEntity(s);
HttpResponse res = client.execute(post);
if (res.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
HttpEntity entity = res.getEntity();
String result = EntityUtils.toString(res.getEntity());// 返回json格式:
response = JSONObject.fromObject(result);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return response;
}
}
接收数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package com.hollysys.hiadsp.websocket.test;

import java.lang.reflect.Type;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;

import org.springframework.messaging.simp.stomp.StompFrameHandler;
import org.springframework.messaging.simp.stomp.StompHeaders;
import org.springframework.messaging.simp.stomp.StompSession;
import org.springframework.messaging.simp.stomp.StompSessionHandlerAdapter;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.messaging.WebSocketStompClient;
import org.springframework.web.socket.sockjs.client.SockJsClient;
import org.springframework.web.socket.sockjs.client.Transport;
import org.springframework.web.socket.sockjs.client.WebSocketTransport;
import org.springframework.web.socket.sockjs.frame.Jackson2SockJsMessageCodec;

public class StompSocketTest {
// {"eventName":" test","eventTime":"1489991893404","source":"entityId","sourceProperty":"","eventData":{"vpredict":"29","temp":"51","vibration":"29","collectTime":"1489727057183","entityId":"111","speed":"3363","pres":"2"}}
private static class newThred extends Thread {
private StompSession stompSession;
private String subscribeUrl;
private String websocketUrl;

public newThred(StompSession stompSession, String subscribeUrl, String websocketUrl) {
this.stompSession = stompSession;
this.subscribeUrl = subscribeUrl;
this.websocketUrl = websocketUrl;
}
@Override
public void run() {
System.out.println("事件监控开启!");
while (true) {
System.out.println(stompSession.isConnected());
try {
Thread.sleep(10000);

if (!stompSession.isConnected()) {
try {
stompSession = subscribe(websocketUrl, subscribeUrl);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
String websocketUrl = "http://workflow-eventhub-test-hollysys-project.hiadev.tk:80/eventhub/btuna-websocket";
String subscribeUrl = "/topic/000000/sssssss";
StompSocketTest obj = new StompSocketTest();
StompSession stompSession = obj.subscribe(websocketUrl, subscribeUrl);
System.out.println(websocketUrl + subscribeUrl);
new newThred(stompSession, subscribeUrl,websocketUrl).start();
Thread.sleep(12000000);
//{"eventData":{"sesssiong ":"重连完成!","tntId":"000000"},"eventName":"sssssss","eventTime":"1543573310269","modelKey":"22222","source":" ","sourceProperty":" "}
}


private static StompSession subscribe(String websocketUrl, String subscribeUrl) throws InterruptedException, ExecutionException {
Transport webSocketTransport = new WebSocketTransport(new StandardWebSocketClient());
List<Transport> transports = Collections.singletonList(webSocketTransport);
SockJsClient sockJsClient = new SockJsClient(transports);
sockJsClient.setMessageCodec(new Jackson2SockJsMessageCodec());
WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);
ListenableFuture<StompSession> lf = stompClient.connect(websocketUrl, new StompSessionHandlerAdapter() {
});
StompSession stompSession = lf.get();

stompSession.subscribe(subscribeUrl, new StompFrameHandler() {
//发送消息前
public Type getPayloadType(StompHeaders stompHeaders) {
return byte[].class;
}

//接收到消息时
public void handleFrame(StompHeaders stompHeaders, Object o) {
System.out.println("Received message: " + new String((byte[]) o));
}
});
return stompSession;
}
}
打赏
  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • Copyrights © 2018-2020 丁振莹
  • 访问人数: | 浏览次数:

你的每一分支持,是我努力下去的最大的力量 ٩(๑❛ᴗ❛๑)۶

支付宝
微信