我正在尝试使用mosquitto接收数据,并使用python pandas将其保存为csv文件。在我停止脚本之前,数据是连续的。
mqtt\U发布。py代码>
import paho.mqtt.client as mqtt
import random
import schedule
import time
mqttc = mqtt.Client("python_pub")
mqttc.connect("localhost", 1883)
def job():
mqttc.publish("hello/world", random.randint(1, 10))
schedule.every(1).seconds.do(job)
while True:
schedule.run_pending()
time.sleep(1)
mqttc.loop(2)
mqtt_sub.py代码
import paho.mqtt.client as mqtt
import pandas as pd
def on_connect(client, userdata, rc):
print("Connected with result code "+str(rc))
client.subscribe("hello/world")
def on_message(client, userdata, msg):
datas = map(int, msg.payload)
for num in datas:
df = pd.DataFrame(data=datas, columns=['the_number'])
df.to_csv("testing.csv")
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("localhost", 1883, 60)
client.loop_forever()
从上面的脚本中,我得到了测试。csv
| the _number
0 | 2
2是我在停止脚本之前收到的最后一个数字
Connected with result code 0
[3]
[9]
[5]
[3]
[7]
[2]
...
...
KeyboardInterrupt
我希望得到测试。像这样的csv
| the_number
0 | 3
1 | 9
2 | 5
...
...
5 | 2
为了实现这一点,我尝试将以下df=pd. DataFrame(data=datas,列=['the_number'])
更改为df=pd. DataFrame(data=num,列=['the_number'])
并发生以下错误
pandas.core.common.PandasError: DataFrame constructor not properly called!
有人知道如何解决这个错误吗?我也觉得我在这里没有正确使用for循环。
谢谢你的建议和帮助。
[更新]
我在on_message
方法中添加/更改以下行
def on_message(client, userdata, msg):
datas = map(int, msg.payload)
df = pd.DataFrame(data=datas, columns=['the_number'])
f = open("test.csv", 'a')
df.to_csv(f)
f.close()
在Nulljack的帮助下,我能够在我的CSV文件中得到这样的结果
| the_number
0 | 3
| the_number
0 | 9
| the_number
0 | 5
| the_number
0 | 3
| the_number
0 | 7
我的目标是在CSV文件中实现类似的功能
| the_number
0 | 3
1 | 9
2 | 5
3 | 3
4 | 7
我之前从未使用过mosquitto,如果我的理解有误,我道歉。
在我看来on_message方法在您的mqtt_sub.py是运行每次您的mqtt_pub.py发布消息(即每一秒)这将导致您的testing.csv文件被覆盖每次您发布消息
为了解决这个问题,我将在on\u connect方法中初始化一个数据帧,然后在on\u消息中通过df向数据帧添加新值。追加
至于你终止后给CSV写信,我不确定。
希望这有帮助
其他帖子很拥挤,所以我把我的回复移到这里
尝试使用下面的代码
import paho.mqtt.client as mqtt
import pandas as pd
# Move df here
df = pd.DataFrame(columns=['the_number'])
def on_connect(client, userdata, rc):
print("Connected with result code "+str(rc))
client.subscribe("hello/world")
def on_message(client, userdata, msg):
datas = map(int, msg.payload)
# this adds the data to the dataframe at the correct index
df.iloc[df.size] = datas
# I reverted this line back to what you originally had
# This will overwrite the testing.csv file every time your subscriber
# receives a message, but since the dataframe is formatted like you want
# it shouldn't matter
df.to_csv("testing.csv")
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("localhost", 1883, 60)
client.loop_forever()