요청문>
json화일에서 Time MType Type FCnt FPort MHz DR SNR RSSI DevAddr 항목들을 나열하여 웹으로 보여주는 python 코드를 짜서 실행해줘
결과>
업로드된 JSON 파일에서 필요한 항목(Time, MType, Type, FCnt, FPort, MHz, DR, SNR, RSSI, DevAddr)을 추출하여 웹 페이지로 표시하는 Python 코드를 작성하겠습니다.
아래 Flask 기반 웹 애플리케이션을 실행하면 JSON 데이터를 테이블 형식으로 보여줍니다.
이제 이 코드를 실행하면 JSON 데이터를 웹 브라우저에서 테이블 형태로 확인할 수 있습니다. Flask 서버를 실행한 후, 브라우저에서 http://localhost:5000 또는 http://<서버 IP>:5000에 접속하면 데이터를 확인할 수 있습니다.
추가 기능이 필요하거나 다른 방식으로 데이터를 가공하고 싶다면 말씀해주세요! 🚀
요청문>
File "show.py", line 5 SyntaxError: Non-ASCII character '\xed' in file show.py on line 5, but no encoding declared; see http://python.org/dev/peps/pep-0263/ for details
결과>
Python에서 한글(또는 다른 비ASCII 문자)이 포함된 코드를 실행하려면, 파일의 첫 번째 줄에 UTF-8 인코딩 선언을 추가해야 합니다.
🔧 해결 방법:
show.py 파일의 첫 번째 줄에 다음 코드를 추가하세요.
# -*- coding: utf-8 -*-
요청문>
json 관련 프로그램에서 Time을 GMT+9으로 계산해줘
결과>
from datetime import datetime, timezone, timedelta
# datetime.fromisoformat() 메서드는 Python 3.7 이상에서 지원
# 일부 JSON 시간 형식에서는 Z(Zulu time, UTC)를 포함하여 문제가 발생할 수 있습니다.
# 대체 방법: datetime.strptime()을 사용하여 시간을 변환하도록 수정
def convert_to_gmt9(utc_time_str):
try:
# 마이크로초 길이가 길 경우, 6자리까지만 유지
if "." in utc_time_str:
utc_time_str = utc_time_str.split(".")[0] + "." + utc_time_str.split(".")[1][:6]
#utc_time = datetime.fromisoformat(utc_time_str.replace("Z", "+00:00"))
utc_time = datetime.strptime(utc_time_str.replace("Z", ""), "%Y-%m-%dT%H:%M:%S.%f")
utc_time = utc_time.replace(tzinfo=timezone.utc)
gmt9_time = utc_time.astimezone(timezone(timedelta(hours=9)))
return gmt9_time.strftime("%Y-%m-%d %H:%M:%S")
except Exception:
return "Invalid GMT9 Time:" + utc_time_str
설치>
apt install python3-pip -y
pip3 --version
pip3 install flask
[/home/firstpw/PY/AIOT] python3 -c "import flask; print(flask.__version__)"
2.0.3
nameserver 8.8.8.8 to the /etc/resolv.conf file
# -*- coding: utf-8 -*-
from flask import Flask, render_template_string, request, json
import os
import base64
from datetime import datetime, timezone, timedelta
app = Flask(__name__)
UPLOAD_FOLDER = '/mnt/data/'
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
data = []
title = "JSON Data Display"
description = "Upload a JSON file to display its contents."
# HTML 템플릿
template = """
<!DOCTYPE html>
<html>
<head>
<title>{{ title }}</title>
<style>
table { width: 100%; border-collapse: collapse; }
th, td { border: 1px solid black; padding: 8px; text-align: left; }
th { background-color: #f2f2f2; }
.up { color: blue; }
.down { color: red; }
</style>
</head>
<body>
<h2>{{ title }}</h2>
<p>{{ description }}</p>
<form action="/" method="post" enctype="multipart/form-data">
<label for="title">Title:</label>
<input type="text" name="title" required><br><br>
<label for="description">Description:</label>
<input type="text" name="description" required><br><br>
<input type="file" name="file" accept=".json" required>
<input type="submit" value="Upload JSON">
</form>
{% if data %}
<table>
<tr>
<th>Time</th>
<th>MType</th>
<th>Type</th>
<th>FCnt</th>
<th>FPort</th>
<th>MHz</th>
<th>SF(12-DR)</th>
<th>SNR</th>
<th>RSSI</th>
<th>DevAddr</th>
<th>Payload</th>
</tr>
{% for item in data %}
<tr>
<td>{{ item.Time }}</td>
<td class="{{ 'up' if item.MType == 'UP' else 'down' }}">
{{ 'UP ⬆️' if item.MType == 'UP' else 'DN ⬇️' }}
</td>
<td>{{ item.Type }}</td>
<td>{{ item.FCnt }}</td>
<td>{{ item.FPort }}</td>
<td>{{ item.MHz }}</td>
<td>{{ item.DR }}</td>
<td>{{ item.SNR }}</td>
<td>{{ item.RSSI }}</td>
<td>{{ item.DevAddr }}</td>
<td>{{ item.Payload }}</td>
</tr>
{% endfor %}
</table>
{% endif %}
</body>
</html>
"""
# datetime.fromisoformat() 메서드는 Python 3.7 이상에서 지원
# 일부 JSON 시간 형식에서는 Z(Zulu time, UTC)를 포함하여 문제가 발생할 수 있습니다.
# 대체 방법: datetime.strptime()을 사용하여 시간을 변환하도록 수정
def convert_to_gmt9(utc_time_str):
try:
# 마이크로초 길이가 길 경우, 6자리까지만 유지
if "." in utc_time_str:
utc_time_str = utc_time_str.split(".")[0] + "." + utc_time_str.split(".")[1][:6]
#utc_time = datetime.fromisoformat(utc_time_str.replace("Z", "+00:00"))
utc_time = datetime.strptime(utc_time_str.replace("Z", ""), "%Y-%m-%dT%H:%M:%S.%f")
utc_time = utc_time.replace(tzinfo=timezone.utc)
gmt9_time = utc_time.astimezone(timezone(timedelta(hours=9)))
return gmt9_time.strftime("%Y-%m-%d %H:%M:%S")
except Exception:
return "Invalid GMT9 Time:" + utc_time_str
def decode_base64(base64_data):
try:
decoded_bytes = base64.b64decode(base64_data)
return decoded_bytes.hex().upper()
except Exception:
return "-"
def extract_common_data(entry):
device_ids = entry.get("identifiers", [{}])[0].get("device_ids", {})
return({
#"MType": entry.get("name", "-"),
"Time": convert_to_gmt9(entry.get("time", "-")).split(".")[0].replace("T", " "),
#"App": device_ids.get("application_ids", {}).get("application_id", "-"),
"DevEUI": device_ids.get("dev_eui", "-"),
"DevAddr": device_ids.get("dev_addr", "-"),
})
def extract_uplink_data(entry, uplink):
metadata = uplink.get("rx_metadata", [{}])[0] # 첫 번째 메타데이터 사용
f_opts = decode_base64(uplink.get("f_opts", ""))
f_opts = ("[Opt] " + f_opts) if f_opts else f_opts
payload = decode_base64(uplink.get("frm_payload", ""))
payload = ("[Pay] " if payload else '') + payload
return({
"MType": "UP",
"Type": 'C' if uplink.get("confirmed", False) else 'U',
"FCnt": uplink.get("f_cnt", "-"),
"FPort": uplink.get("f_port", "-"),
"MHz": round(float(uplink.get("settings", {}).get("frequency", 0)) / 1_000_000, 1),
"DR": 12 - uplink.get("settings", {}).get("data_rate", {}).get("lora", {}).get("spreading_factor", "-"),
"SNR": metadata.get("snr", "-"),
"RSSI": metadata.get("rssi", "-"),
"Payload": f_opts + (" / " if f_opts and payload else '') + payload
})
def extract_downlink_data(entry, payload_item, request_item):
device_ids = entry.get("identifiers", [{}])[0].get("device_ids", {})
f_hdr = payload_item.get("mac_payload", {}).get("f_hdr", {})
f_opts = decode_base64(f_hdr.get("f_opts", ""))
f_opts = ("[Opt] " if f_opts else '') + f_opts
payload = decode_base64(entry.get("raw_payload", ""))
payload = ("[Pay] " if payload else '') + payload
return({
"MType": "DN",
"Type": 'U' if payload_item.get("m_hdr", {}).get("m_type", "") == "UNCONFIRMED_DOWN" else 'C',
"FCnt": f_hdr.get("f_cnt", "-"),
"FPort": device_ids.get("application_ids", {}).get("application_id", "-"),
"MHz": round(float(request_item.get("rx1_frequency", 0)) / 1_000_000, 1),
"DR": 12 - request_item.get("rx1_data_rate", {}).get("lora", {}).get("spreading_factor", 0),
"SNR": "-",
"RSSI": "-",
"Payload": f_opts + (" / " if f_opts and payload else '') + payload
})
def extract_data(json_data):
extracted = []
for entry in json_data:
common_data = extract_common_data(entry)
updownlink = entry.get("data", {}).get("uplink_message", {})
if updownlink:
updown_data = extract_uplink_data(entry, updownlink)
else:
request_item = entry.get("data", {}).get("request", {})
payload_item = entry.get("data", {}).get("payload", {})
updown_data = extract_downlink_data(entry, payload_item, request_item)
extracted.append({
**common_data,
**updown_data
})
return extracted
@app.route('/', methods=['GET', 'POST'])
def index():
global data, title, description
if request.method == 'POST':
file = request.files['file']
title = request.form['title']
description = request.form['description']
if file and file.filename.endswith('.json'):
filepath = os.path.join(app.config['UPLOAD_FOLDER'], file.filename)
file.save(filepath)
with open(filepath, "r", encoding="utf-8") as f:
json_data = json.load(f)
data = extract_data(json_data)
return render_template_string(template, data=data, title=title, description=description)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)