Kill JsonIO and fix JSON driver caching issues (#2796)

* Kill JsonIO and fix JSON driver caching issues

Signed-off-by: Toby Harradine <tobyharradine@gmail.com>

* Ensure lock covers critical region in set()

Signed-off-by: Toby Harradine <tobyharradine@gmail.com>

* Make tests pass

Signed-off-by: Toby Harradine <tobyharradine@gmail.com>

* Use pickle over deepcopy in Config

Signed-off-by: Toby Harradine <tobyharradine@gmail.com>

* Fix temp instance creation

Signed-off-by: Toby Harradine <tobyharradine@gmail.com>

* Serialise value before doing anything in set()

Signed-off-by: Toby Harradine <tobyharradine@gmail.com>
This commit is contained in:
Toby Harradine
2019-06-28 04:49:45 +10:00
committed by Michael H
parent f3bbfdc64d
commit bff7e214ab
9 changed files with 133 additions and 157 deletions

View File

@@ -1,9 +1,12 @@
from pathlib import Path
import copy
import weakref
import asyncio
import json
import logging
from ..json_io import JsonIO
import os
import pickle
import weakref
from pathlib import Path
from typing import Any, Dict
from uuid import uuid4
from .red_base import BaseDriver, IdentifierData
@@ -59,13 +62,10 @@ class JSON(BaseDriver):
self.data_path = data_path_override
else:
self.data_path = Path.cwd() / "cogs" / ".data" / self.cog_name
self.data_path.mkdir(parents=True, exist_ok=True)
self.data_path = self.data_path / self.file_name
self.jsonIO = JsonIO(self.data_path)
self._lock = asyncio.Lock()
self._load_data()
async def has_valid_connection(self) -> bool:
@@ -90,10 +90,12 @@ class JSON(BaseDriver):
return
try:
self.data = self.jsonIO._load_json()
with self.data_path.open("r", encoding="utf-8") as fs:
self.data = json.load(fs)
except FileNotFoundError:
self.data = {}
self.jsonIO._save_json(self.data)
with self.data_path.open("w", encoding="utf-8") as fs:
json.dump(self.data, fs, indent=4)
def migrate_identifier(self, raw_identifier: int):
if self.unique_cog_identifier in self.data:
@@ -104,7 +106,7 @@ class JSON(BaseDriver):
if ident in self.data:
self.data[self.unique_cog_identifier] = self.data[ident]
del self.data[ident]
self.jsonIO._save_json(self.data)
_save_json(self.data_path, self.data)
break
async def get(self, identifier_data: IdentifierData):
@@ -112,18 +114,23 @@ class JSON(BaseDriver):
full_identifiers = identifier_data.to_tuple()
for i in full_identifiers:
partial = partial[i]
return copy.deepcopy(partial)
return pickle.loads(pickle.dumps(partial, -1))
async def set(self, identifier_data: IdentifierData, value=None):
partial = self.data
full_identifiers = identifier_data.to_tuple()
for i in full_identifiers[:-1]:
if i not in partial:
partial[i] = {}
partial = partial[i]
# This is both our deepcopy() and our way of making sure this value is actually JSON
# serializable.
value_copy = json.loads(json.dumps(value))
partial[full_identifiers[-1]] = copy.deepcopy(value)
await self.jsonIO._threadsafe_save_json(self.data)
async with self._lock:
for i in full_identifiers[:-1]:
if i not in partial:
partial[i] = {}
partial = partial[i]
partial[full_identifiers[-1]] = value_copy
await self._save()
async def clear(self, identifier_data: IdentifierData):
partial = self.data
@@ -131,35 +138,87 @@ class JSON(BaseDriver):
try:
for i in full_identifiers[:-1]:
partial = partial[i]
del partial[full_identifiers[-1]]
except KeyError:
pass
else:
await self.jsonIO._threadsafe_save_json(self.data)
async with self._lock:
try:
del partial[full_identifiers[-1]]
except KeyError:
pass
else:
await self._save()
async def import_data(self, cog_data, custom_group_data):
def update_write_data(identifier_data: IdentifierData, data):
def update_write_data(identifier_data: IdentifierData, _data):
partial = self.data
idents = identifier_data.to_tuple()
for ident in idents[:-1]:
if ident not in partial:
partial[ident] = {}
partial = partial[ident]
partial[idents[-1]] = data
partial[idents[-1]] = _data
for category, all_data in cog_data:
splitted_pkey = self._split_primary_key(category, custom_group_data, all_data)
for pkey, data in splitted_pkey:
ident_data = IdentifierData(
self.unique_cog_identifier,
category,
pkey,
(),
custom_group_data,
is_custom=category in custom_group_data,
)
update_write_data(ident_data, data)
await self.jsonIO._threadsafe_save_json(self.data)
async with self._lock:
for category, all_data in cog_data:
splitted_pkey = self._split_primary_key(category, custom_group_data, all_data)
for pkey, data in splitted_pkey:
ident_data = IdentifierData(
self.unique_cog_identifier,
category,
pkey,
(),
custom_group_data,
is_custom=category in custom_group_data,
)
update_write_data(ident_data, data)
await self._save()
async def _save(self) -> None:
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, _save_json, self.data_path, self.data)
def get_config_details(self):
return
def _save_json(path: Path, data: Dict[str, Any]) -> None:
"""
This fsync stuff here is entirely neccessary.
On windows, it is not available in entirety.
If a windows user ends up with tons of temp files, they should consider hosting on
something POSIX compatible, or using the mongo backend instead.
Most users wont encounter this issue, but with high write volumes,
without the fsync on both the temp file, and after the replace on the directory,
There's no real durability or atomicity guarantee from the filesystem.
In depth overview of underlying reasons why this is needed:
https://lwn.net/Articles/457667/
Also see:
http://man7.org/linux/man-pages/man2/open.2.html#NOTES (synchronous I/O section)
And:
https://www.mjmwired.net/kernel/Documentation/filesystems/ext4.txt#310
"""
filename = path.stem
tmp_file = "{}-{}.tmp".format(filename, uuid4().fields[0])
tmp_path = path.parent / tmp_file
with tmp_path.open(encoding="utf-8", mode="w") as fs:
json.dump(data, fs, indent=4)
fs.flush() # This does get closed on context exit, ...
os.fsync(fs.fileno()) # but that needs to happen prior to this line
tmp_path.replace(path)
try:
flag = os.O_DIRECTORY # pylint: disable=no-member
except AttributeError:
pass
else:
fd = os.open(path.parent, flag)
try:
os.fsync(fd)
finally:
os.close(fd)