Commit b083d2ec by 周田

feat: 将发送过来的数据添加到 TDengine 中

parent 87e3bad1
......@@ -26,6 +26,20 @@
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.2.7</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.9</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -3,12 +3,17 @@ package org.linkor;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.linkor.callback.OnMessageCallback;
import org.linkor.db.TDengineUtils;
import org.linkor.setting.Setting;
import java.sql.SQLException;
public class Main {
public static void main(String[] args) {
public static void main(String[] args) throws SQLException, InterruptedException {
Setting.init();
TDengineUtils.init();
try {
// host为主机名,test为 clientId 即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
MqttClient client = new MqttClient(Setting.BROKER, Setting.CLIENT_ID, new MemoryPersistence());
......@@ -37,7 +42,6 @@ public class Main {
// client.subscribe("ModbusTcp/数据名字", Setting.QOS);
System.out.println("开始监听: " + Setting.TOPIC);
System.out.println("==========================================");
client.publish("ModbusTcp/压力", new MqttMessage("400".getBytes()));
}
catch (Exception e) {
e.printStackTrace();
......
package org.linkor.callback;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.linkor.db.TDengineUtils;
import java.util.Objects;
/**
* mqtt 回调函数
......@@ -19,6 +24,14 @@ public class OnMessageCallback implements MqttCallback {
public void messageArrived(String topic, MqttMessage message) {
// subscribe后得到的消息会执行到这里面
System.out.println("topic: " + topic + ", content: " + message.toString());
// JsonObject data = JsonParser.parseString(message.toString()).getAsJsonObject();
// String location = data.get("__name").getAsString();
// boolean state = Objects.equals(data.get("开关状态").getAsString(), "true");
// int temperature = data.get("温度").getAsInt();
// int press = data.get("压力").getAsInt();
// // 存数据
// TDengineUtils.insertData(location, state, temperature, press);
}
@Override
......
package org.linkor.db;
import com.taosdata.jdbc.TSDBDriver;
import com.taosdata.jdbc.TSDBPreparedStatement;
import java.sql.*;
import java.util.Properties;
/**
* TDengine 数据库相关操作
*/
public class TDengineUtils {
private static Connection conn = null;
public static void init() throws SQLException {
String jdbcUrl = "jdbc:TAOS://192.168.0.176:6030?user=root&password=taosdata";
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
conn = DriverManager.getConnection(jdbcUrl, properties);
System.out.println("Connected to TDengine");
try (Statement stmt = conn.createStatement()) {
stmt.execute("CREATE DATABASE if not exists power KEEP 3650");
stmt.executeUpdate("use power");
stmt.execute("CREATE STABLE if not exists o2oa (ts TIMESTAMP, st BOOL, temperature INT, press INT) " +
"TAGS (location BINARY(64))");
System.out.println("Created table o2oa");
}
}
public static void insertData(String location, boolean state, int temperature, int press){
String psql = "INSERT INTO ? USING power.o2oa TAGS(?) VALUES (?, ?, ?, ?)";
assert conn != null;
// try(TSDBPreparedStatement pst = (TSDBPreparedStatement) conn.prepareStatement(psql)) {
// pst.setTableName("d1001");
// pst.setTagString(0, location);
//
// System.out.println(333);
// pst.setTimestamp(0, new Timestamp(System.currentTimeMillis()));
// pst.setBoolean(1, state);
// pst.setInt(2, temperature);
// pst.setInt(3, press);
//
// pst.executeUpdate(psql);
// System.out.println(444);
// } catch (SQLException e) {
// e.printStackTrace();
// }
try (Statement stmt = conn.createStatement()) {
String sql = getRawData(location, state, temperature, press);
int rowCount = stmt.executeUpdate(sql);
}catch (SQLException e){
e.printStackTrace();
}
}
public static String getRawData (String location, boolean state, int temperature, int press){
StringBuilder sb = new StringBuilder("INSERT INTO power.s USING power.o2oa TAGS(");
sb.append('\'').append(location).append('\'') // tag: location
.append(") VALUES(")
.append('\'').append(new Timestamp(System.currentTimeMillis())).append('\'').append(",") // ts
.append(state).append(",") // state
.append(temperature).append(",") // temperature
.append(press).append(") "); // press
return sb.toString();
}
}
qos=1
# broker=tcp://192.168.0.176:1883
broker=tcp://127.0.0.1:521
topic=ModbusTcp/#
broker=tcp://192.168.0.176:1883
#broker=tcp://127.0.0.1:521
topic=ModbusTcp
username=admin
password=123456
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment