File size: 7,624 Bytes
b2d9e47
 
169f06f
b2d9e47
 
 
4d5727a
 
 
b2d9e47
a7a7e59
b2d9e47
 
 
e24267e
169f06f
4d5727a
b2d9e47
 
169f06f
b2d9e47
 
 
 
 
169f06f
 
 
4d5727a
b2d9e47
169f06f
 
 
 
 
 
 
 
 
4d5727a
 
 
a7a7e59
b2d9e47
 
 
4d5727a
169f06f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4d5727a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
169f06f
4d5727a
 
 
169f06f
4d5727a
 
 
 
 
 
 
 
 
 
 
 
169f06f
b2d9e47
 
 
 
 
169f06f
 
 
b2d9e47
169f06f
b2d9e47
169f06f
 
b2d9e47
169f06f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4d5727a
169f06f
 
 
 
 
 
 
 
b2d9e47
4d5727a
b2d9e47
 
 
 
 
169f06f
 
 
 
 
4d5727a
 
 
 
 
 
 
 
 
 
a7a7e59
b2d9e47
 
 
 
 
 
 
 
4d5727a
 
 
b2d9e47
 
169f06f
e24267e
 
169f06f
 
 
 
 
 
 
 
 
e24267e
 
 
 
 
 
 
 
4d5727a
 
 
 
 
 
 
 
e24267e
 
4d5727a
 
e24267e
 
b2d9e47
4d5727a
b2d9e47
 
 
 
 
 
 
 
 
4d5727a
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
#!/usr/bin/env python3
"""
Sync agentmemory data to/from a private HF Dataset repo.
Usage:
  python3 sync.py restore   -- download DB from HF on startup
  python3 sync.py backup    -- upload DB to HF (called in loop)

C4.1: Uses audit log high-water mark instead of mtime-based change detection.
C4.2: Exposes last sync status via a .sync_state JSON file read by /health.
"""
import json
import os
import sys
import shutil
import tempfile
import time
import sqlite3

try:
    from huggingface_hub import HfApi, snapshot_download, hf_hub_download
    from huggingface_hub.utils import EntryNotFoundError, RepositoryNotFoundError
except ImportError:
    print("[sync] huggingface_hub not installed, skipping sync")
    sys.exit(0)

HF_TOKEN = os.environ.get("HF_TOKEN", "")
REPO_ID  = os.environ.get("AGENTMEMORY_DATASET_REPO", "Yash030/agentmemory-python-data")
DATA_DIR = os.path.expanduser("~/.agentmemory")
DB_PATH  = os.path.join(DATA_DIR, "agentmemory.db")

# Only these paths are backed up/restored — everything else is ephemeral
SYNC_FILES = [
    "agentmemory.db",
    ".hmac",
]
SYNC_DIRS = [
    "second-brain",
]

# C4.1: High-water mark stored as JSON (replaces mtime STATE_FILE)
STATE_FILE = os.path.join(DATA_DIR, ".sync_state")


def get_api():
    return HfApi(token=HF_TOKEN)


def _collect_sync_targets():
    """Return list of (abs_path, repo_rel_path) for all files to sync."""
    targets = []
    for fname in SYNC_FILES:
        full = os.path.join(DATA_DIR, fname)
        if os.path.isfile(full):
            targets.append((full, fname))
    for dname in SYNC_DIRS:
        dpath = os.path.join(DATA_DIR, dname)
        if os.path.isdir(dpath):
            for root, _, files in os.walk(dpath):
                for f in files:
                    full = os.path.join(root, f)
                    rel  = os.path.relpath(full, DATA_DIR).replace("\\", "/")
                    targets.append((full, rel))
    return targets


def _get_audit_high_water_mark() -> int:
    """C4.1: Return MAX(id) from audit_log, or 0 if DB is absent/empty."""
    try:
        if not os.path.exists(DB_PATH):
            return 0
        conn = sqlite3.connect(DB_PATH, check_same_thread=False, timeout=5)
        try:
            row = conn.execute("SELECT MAX(id) FROM audit_log").fetchone()
            return int(row[0]) if row and row[0] is not None else 0
        finally:
            conn.close()
    except Exception:
        return 0


def _load_sync_state() -> dict:
    """C4.1: Load the persisted sync state dict."""
    if os.path.exists(STATE_FILE):
        try:
            with open(STATE_FILE, "r", encoding="utf-8") as f:
                return json.load(f)
        except Exception:
            pass
    return {"last_synced_audit_id": 0, "last_sync_at": None, "sync_status": "never"}


def _save_sync_state(state: dict) -> None:
    """C4.1/C4.2: Persist the sync state dict."""
    try:
        os.makedirs(DATA_DIR, exist_ok=True)
        with open(STATE_FILE, "w", encoding="utf-8") as f:
            json.dump(state, f)
    except Exception as e:
        print(f"[sync] failed to save sync state: {e}")


def restore():
    if not HF_TOKEN:
        print("[sync] No HF_TOKEN — skipping restore")
        return
    os.makedirs(DATA_DIR, exist_ok=True)
    api = get_api()

    # Check repo exists
    try:
        api.repo_info(REPO_ID, repo_type="dataset")
    except RepositoryNotFoundError:
        print(f"[sync] Dataset repo {REPO_ID} not found — fresh start")
        return
    except Exception as e:
        print(f"[sync] restore repo check error: {e}")
        return

    # Download each sync target individually
    all_targets = SYNC_FILES + [
        f for f in _list_repo_prefix(api, "second-brain/")
    ]

    if not all_targets:
        print("[sync] Dataset empty — fresh start")
        return

    for fname in all_targets:
        try:
            local_path = os.path.join(DATA_DIR, fname)
            os.makedirs(os.path.dirname(local_path), exist_ok=True)
            hf_hub_download(
                repo_id=REPO_ID,
                filename=fname,
                repo_type="dataset",
                token=HF_TOKEN,
                local_dir=DATA_DIR,
                local_dir_use_symlinks=False,
            )
            print(f"[sync] restored {fname}")
        except EntryNotFoundError:
            pass  # file not yet in repo, skip
        except Exception as e:
            print(f"[sync] restore {fname} error: {e}")

    print("[sync] restore complete")


def _list_repo_prefix(api, prefix):
    """List files in repo matching a path prefix."""
    try:
        from huggingface_hub import list_repo_files
        return [f for f in list_repo_files(REPO_ID, repo_type="dataset", token=HF_TOKEN)
                if f.startswith(prefix)]
    except Exception:
        return []


def backup():
    if not HF_TOKEN:
        return
    api = get_api()

    targets = _collect_sync_targets()
    if not targets:
        print("[sync] nothing to backup")
        return

    # C4.1: Compare audit log high-water mark instead of mtime fingerprint
    current_hwm = _get_audit_high_water_mark()
    state = _load_sync_state()
    last_hwm = state.get("last_synced_audit_id", 0)

    if current_hwm <= last_hwm:
        print(f"[sync] no new audit entries (hwm={current_hwm}) — skipping backup")
        return

    print(f"[sync] audit HWM changed ({last_hwm}{current_hwm}) — backing up...")

    # Ensure repo exists
    try:
        api.repo_info(REPO_ID, repo_type="dataset")
    except RepositoryNotFoundError:
        print(f"[sync] Creating dataset repo {REPO_ID}")
        api.create_repo(REPO_ID, repo_type="dataset", private=True)
    except Exception as e:
        print(f"[sync] repo_info error: {e}")
        # C4.2: record error state
        state["sync_status"] = "error"
        _save_sync_state(state)
        return

    # Stage only the targeted files
    staging = tempfile.mkdtemp(prefix="agentmemory_sync_")
    try:
        for full, rel in targets:
            dest = os.path.join(staging, rel.replace("/", os.sep))
            os.makedirs(os.path.dirname(dest), exist_ok=True)
            try:
                shutil.copy2(full, dest)
            except Exception as e:
                print(f"[sync] stage {rel} error: {e}")

        print(f"[sync] uploading {len(targets)} files to {REPO_ID}...")
        api.upload_folder(
            folder_path=staging,
            repo_id=REPO_ID,
            repo_type="dataset",
            token=HF_TOKEN,
            commit_message="sync: periodic backup",
        )
        print("[sync] backup complete")

        # C4.1/C4.2: update state with new HWM and timestamp
        import datetime
        state["last_synced_audit_id"] = current_hwm
        state["last_sync_at"] = datetime.datetime.utcnow().isoformat() + "Z"
        state["sync_status"] = "ok"
        _save_sync_state(state)

    except Exception as e:
        print(f"[sync] backup error: {e}")
        state["sync_status"] = "error"
        _save_sync_state(state)
    finally:
        shutil.rmtree(staging, ignore_errors=True)


if __name__ == "__main__":
    cmd = sys.argv[1] if len(sys.argv) > 1 else "backup"
    if cmd == "restore":
        restore()
    elif cmd == "backup":
        backup()
    else:
        print(f"[sync] unknown command: {cmd}")
        sys.exit(1)


HF_TOKEN = os.environ.get("HF_TOKEN", "")
REPO_ID  = os.environ.get("AGENTMEMORY_DATASET_REPO", "Yash030/agentmemory-python-data")
DATA_DIR = os.path.expanduser("~/.agentmemory")
DB_PATH  = os.path.join(DATA_DIR, "agentmemory.db")