数字中国决赛

依旧爆零还是很难受的,不过出人意料的是全场师傅们居然陪我一起爆零哈哈

题目

(pow部分已略过)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
from time import time
import random
import os
import sys
import hashlib

import random as py_random

def sample(tau,eta,lens,B,s=None):
lower = -B + tau * eta
upper = B - tau * eta
c_rows = []
for _ in range(lens):
row = [0] * 128
for idx in py_random.sample(range(128), tau):
row[idx] = py_random.choice((-1, 1))
c_rows.append(row)
c = matrix(ZZ, c_rows)

if s is None:
s = vector(ZZ, [py_random.randint(-eta, eta) for _ in range(128)])
cs = c * s

# If one coordinate can never satisfy the bound for any y in [-B, B],
# rejection sampling would not terminate.
count = 0
while True:
y = vector(ZZ, [py_random.randint(-B, B) for _ in range(lens)])
z = cs + y
if all(lower <= zi <= upper for zi in z):
return (c_rows, list(z)), (list(s), list(y))
count+=1
if count>10000:
raise Exception("Rejection sampling failed after 10000 attempts. Please check the parameters.")
exit(0)

tau=39
eta=2
B=2*tau*eta
s= vector(ZZ, [py_random.randint(-eta, eta) for _ in range(128)])

s[0]=eta
print("the first stage")
cs=[]
leaks=[]
count=0
while count<15000:
print("how many samples do you want to get? (0 to exit)")
try:
n=int(input())
except:
print("[-] Invalid input!")
continue

if n == 0:
break
pk,sk=sample(tau,eta,n,B,s)
cs+=pk[0]
leaks+=[bin(i&0xffffffff).count('1') for i in sk[1]]
print(pk[1])
count+=n
print("the second stage")
count=0
while count<5000:
print("which samples do you interest in more? (-1 to exit)")
try:
indices=input()
indices = [int(item.strip()) for item in indices.split(',')]
assert all(-1 <= idx < len(cs) for idx in indices)
except:
print("[-] Invalid input!")
continue
if -1 in indices:
break
ans=[(cs[i],leaks[i]) for i in indices]
print(ans)
count+=len(indices)

print("the third stage")
print("plz input the guess secret s:")
try:
guess_s = [int(item.strip()) for item in input().split(',')]
assert len(guess_s) == 128
except:
print("[-] Invalid input!")
sys.exit(1)
if guess_s == list(s):
flag=open("./flag.txt").read().strip()
print(f"[+] Congratulation! Here is your flag: {flag}")
else:
print("[-] Wrong guess! Better luck next time!")

分析

题目分析

这道题本质上是一个带泄露的稀疏秘密恢复问题,灵感来自 FLIP(Fast Lattice-based Instantiation and Primitive)加密方案。目标是通过服务端给的一些有限信息,把一个 128 维的秘密向量 s 给恢复出来。

参数设置

参数 含义
N 128 秘密向量的维度
tau 39 每行 c 的非零元素个数
eta 2 秘密 s 每个分量的范围 [-2, 2]
B 2×39×2 = 156 噪声 y 的范围 [-B, B]

秘密向量 s

1
2
s = vector(ZZ, [py_random.randint(-eta, eta) for _ in range(128)])
s[0] = eta # s[0] = 2,这是已知条件

s 是一个 128 维整数向量,每个分量取值于 {-2, -1, 0, 1, 2},而且 s[0] = 2 是写死的。这个信息后面验证的时候会用到。

稀疏矩阵 c

1
2
3
4
5
6
c_rows = []
for _ in range(lens):
row = [0] * 128
for idx in py_random.sample(range(128), tau): # 随机选 39 个位置
row[idx] = py_random.choice((-1, 1)) # 每个位置放 ±1
c_rows.append(row)

c 的每一行是 128 维向量,恰好有 39 个位置是 ±1,剩下 89 个位置全是 0。说白了就是一个稀疏二值矩阵

c·s

1
cs = c * s   # 矩阵乘法,得到一个 lens 维向量

这里有个关键的数学性质:c·s 的每个分量是有界的

(cis)jsupp(ci)cijsj39×1×2=78|(\mathbf{c}_i \cdot \mathbf{s})| \leq \sum_{j \in \text{supp}(\mathbf{c}_i)} |c_{ij}| \cdot |s_j| \leq 39 \times 1 \times 2 = 78

原因很简单:c 的每行只有 39 个非零元素(都是 ±1),s 的每个分量绝对值不超过 2,所以内积的绝对值最多就是 78。

噪声 y 与拒绝采样

1
2
3
4
5
6
7
lower = -B + tau * eta   # = -156 + 78 = -78
upper = B - tau * eta # = 156 - 78 = 78

y = vector(ZZ, [py_random.randint(-B, B) for _ in range(lens)]) # y ∈ [-156, 156]
z = cs + y
if all(lower <= zi <= upper for zi in z): # 要求 z ∈ [-78, 78]
return (c_rows, list(z)), (list(s), list(y))

所以最终每个样本满足:

zi=cis+yiz_i = \mathbf{c}_i \cdot \mathbf{s} + y_i

其中:

  • zi[78,78]z_i \in [-78, 78](拒绝采样保证)
  • yi[156,156]y_i \in [-156, 156](均匀随机生成)
  • cis[78,78]\mathbf{c}_i \cdot \mathbf{s} \in [-78, 78](稀疏性保证)

这里多说一句,为什么拒绝采样要限制 z ∈ [-78, 78]?

如果 z[78,78]z \in [-78, 78],那么 y=zcs[7878,78+78]=[156,156]y = z - \mathbf{c} \cdot \mathbf{s} \in [-78-78, 78+78] = [-156, 156],自动满足 y 的范围约束。换句话说,这个约束确保了:给定 z 之后,y 的有效范围是 [z78,z+78][z-78, z+78],宽度只有 156(而不是完整的 313)。当 |z| 比较小的时候,这个范围跟 [-156, 156] 的交集更小,HW 约束就更容易把 y 唯一确定下来。


交互逻辑

第一步:收集 z 值

你可以请求最多 15000 个样本。对于每个样本,服务端返回 z 值(也就是 cs+y\mathbf{c} \cdot \mathbf{s} + y),同时在后台偷偷记录了 HW32(y)(y 当作 32 位整数来看,二进制表示里 1 的个数)。

1
leaks += [bin(i & 0xffffffff).count('1') for i in sk[1]]

第二步:查询详细信息

你可以按索引查询最多 5000 个样本,服务端会返回 (c_row, leak),也就是稀疏行向量 ci\mathbf{c}_i 和对应的 HW32(yi)HW_{32}(y_i)

第三步:提交答案

猜出完整的 128 维秘密向量 s,全对就给 flag。

解题思路

我们已知:

  • zi=cis+yiz_i = \mathbf{c}_i \cdot \mathbf{s} + y_i
  • HW32(yi)HW_{32}(y_i)(y_i 的 32 位 Hamming Weight)
  • cis[78,78]\mathbf{c}_i \cdot \mathbf{s} \in [-78, 78]
  • yi[156,156]y_i \in [-156, 156]

目标: 恢复s

关键洞察:如果对某个样本,我们能唯一确定 cis\mathbf{c}_i \cdot \mathbf{s} 的值,那就得到了一个关于 s 的精确线性方程

怎么唯一确定 cis\mathbf{c}_i \cdot \mathbf{s}

已知 ziz_iHW32(yi)HW_{32}(y_i),我们可以:

  1. 枚举所有满足 HW32(y)=leakHW_{32}(y) = \text{leak}yy 值(范围 [156,156][-156, 156]
  2. 对每个候选 yy,计算 cis=ziy\mathbf{c}_i \cdot \mathbf{s} = z_i - y
  3. 检查 ziyz_i - y 是否在 [78,78][-78, 78] 范围内
  4. 如果只有一个候选满足条件,那 cis=rhs\mathbf{c}_i \cdot \mathbf{s} = \text{rhs} 就被唯一确定了

HW32(y) 的信息量

HW32(y)HW_{32}(y) 就是 y 的 32 位二进制表示中 1 的个数。对于 y[156,156]y \in [-156, 156](当作有符号 32 位整数来看),不同的 y 值可能有相同的 HW32,但这个约束已经大大缩小了候选空间。

举个例子,假设 z=3z = 3HW32(y)=15HW_{32}(y) = 15,那 y 必须同时满足:

  • y[378,3+78]=[75,81]y \in [3-78, 3+78] = [-75, 81]
  • HW32(y)=15HW_{32}(y) = 15

同时满足这两个条件的 y 可能就只有 1 个,这时候 cs=3y\mathbf{c} \cdot \mathbf{s} = 3 - y 就被唯一确定了。


这样一来,问题就变成了:

cis=ziyi\mathbf{c}_i \cdot \mathbf{s} = z_i - y_i

进一步选出 128 条线性无关方程做高斯消元解方程就好了

这里除了用高斯消元还可以用LLL恢复ss
利用s[0]=2,s[i][2,2]s[0]=2,s[i]\in[-2,2]cis=ziyi\mathbf{c}_i \cdot \mathbf{s} = z_i - y_i
造格求解:

(ci,(ziyi))(s1)=0\left( \mathbf{c}_i, -(z_i - y_i) \right) \begin{pmatrix} \mathbf{s} \\\\ 1 \end{pmatrix} = 0

也能出结果

exp1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
from sage.all import *
from pwn import *
import hashlib
import itertools
import string
import ast
from collections import defaultdict

# ==================== 配置 ====================

HOST = "sgctf.xyz"
PORT = 30152

N = 128
tau = 39
eta = 2
B = 2 * tau * eta # 156
CS_BOUND = tau * eta # 78

Z_NEED = 15000
QUERY_NEED = 5000
STAGE1_BATCH = 9
STAGE2_BATCH = 30


# ==================== 工具函数 ====================

def hw32(x):
return bin(int(x) & 0xffffffff).count("1")


def recv_python_list(io):
"""
从远端读取一行 Python list,并 ast.literal_eval 解析。
会自动跳过提示行、空行等非 list 行。
"""
while True:
line = io.recvline().strip()
# print("[DEBUG]", line)
if not line.startswith(b"["):
continue

try:
obj = ast.literal_eval(line.decode())
except Exception:
continue

if isinstance(obj, list):
return obj


def solve_pow(io):
"""
适配常见 CTF PoW:
sha256(XXXX + suffix) == target
如果题目本地无 PoW,可以直接注释掉 main 里的 solve_pow(p)。
"""
io.recvuntil(b"XXXX+")
suffix = io.recvuntil(b")").decode()[:-1]
io.recvuntil(b"== ")
target = io.recvline().strip().decode()

charset = string.ascii_letters + string.digits
for tup in itertools.product(charset, repeat=4):
prefix = "".join(tup)
digest = hashlib.sha256((prefix + suffix).encode()).hexdigest()
if digest == target:
io.sendline(prefix.encode())
return

raise RuntimeError("PoW failed")


# ==================== 第一阶段:收集 z ====================

def collect_z(io):
zs = []

io.recvuntil(b"(0 to exit)\n")

while len(zs) < Z_NEED:
n = min(STAGE1_BATCH, Z_NEED - len(zs))
io.sendline(str(n).encode())

vals = recv_python_list(io)
zs.extend(map(int, vals))

print(f"[+] collected z: {len(zs)}/{Z_NEED}")

if len(zs) < Z_NEED:
io.recvuntil(b"(0 to exit)\n")

return zs


# ==================== 第二阶段:查询 c_row 和 leak ====================

def query_samples(io, zs):
"""
选择 |z| 最小的 QUERY_NEED 个样本查询。
因为 |z| 越小,y 的候选范围越集中,HW32 更容易唯一确定 y / c*s。
"""
selected = sorted(range(len(zs)), key=lambda i: abs(zs[i]))[:QUERY_NEED]

c_rows = []
leaks = []

io.recvuntil(b"(-1 to exit)\n")

for off in range(0, QUERY_NEED, STAGE2_BATCH):
chunk = selected[off:off + STAGE2_BATCH]
io.sendline(",".join(map(str, chunk)).encode())

ans = recv_python_list(io)

for row, leak in ans:
c_rows.append(list(map(int, row)))
leaks.append(int(leak))

print(f"[+] queried samples: {len(c_rows)}/{QUERY_NEED}")

if off + STAGE2_BATCH < QUERY_NEED:
io.recvuntil(b"(-1 to exit)\n")

return selected, c_rows, leaks


# ==================== 利用 HW32 泄露构造精确方程 ====================

def build_exact_equations(zs, selected, c_rows, leaks):
"""
已知:
z = c*s + y
leak = HW32(y)

枚举 y in [-156,156] 且 HW32(y)=leak。
若满足 c*s = z-y in [-78,78] 的候选唯一,
则得到一条精确方程:
c*s = z-y
"""
hw_map = defaultdict(list)
for y in range(-B, B + 1):
hw_map[hw32(y)].append(y)

eq_rows = []
eq_rhs = []

for i in range(len(c_rows)):
z = int(zs[selected[i]])
leak = int(leaks[i])

candidates = []
for y in hw_map[leak]:
cs = z - y
if -CS_BOUND <= cs <= CS_BOUND:
candidates.append(cs)

if len(candidates) == 1:
eq_rows.append(c_rows[i])
eq_rhs.append(candidates[0])

A = Matrix(ZZ, eq_rows)
print(f"[+] exact equations = {len(eq_rows)}, rank = {A.rank()}")

return eq_rows, eq_rhs


# ==================== 高斯消元求解 ====================

def solve_by_gaussian(eq_rows, eq_rhs):
"""
从所有精确方程里挑出 128 条线性无关方程,
然后在 QQ 上精确高斯消元。
"""
if len(eq_rows) < N:
raise RuntimeError("not enough exact equations")

basis_rows = []
basis_rhs = []
cur_rank = 0

for row, rhs in zip(eq_rows, eq_rhs):
tmp = Matrix(QQ, basis_rows + [row])
new_rank = tmp.rank()

if new_rank > cur_rank:
basis_rows.append(row)
basis_rhs.append(rhs)
cur_rank = new_rank

if cur_rank == N:
break

print(f"[+] selected independent rows = {len(basis_rows)}, rank = {cur_rank}")

if cur_rank < N:
raise RuntimeError("rank not enough, cannot uniquely recover s")

A = Matrix(QQ, basis_rows)
b = vector(QQ, basis_rhs)

# solve_right 本质就是在精确域 QQ 上做高斯消元
sol = A.solve_right(b)

if not all(x.denominator() == 1 for x in sol):
raise RuntimeError("solution is not integral")

s = [int(x) for x in sol]

# 验证所有精确方程
A_check = Matrix(ZZ, eq_rows)
b_check = vector(ZZ, eq_rhs)
s_vec = vector(ZZ, s)

if s[0] != eta:
raise RuntimeError("s[0] check failed")

if not all(-eta <= x <= eta for x in s):
raise RuntimeError("range check failed")

if A_check * s_vec != b_check:
raise RuntimeError("equation check failed")

return s


# ==================== 主流程 ====================

def main():
p = remote(HOST, PORT)

solve_pow(p)

zs = collect_z(p)
selected, c_rows, leaks = query_samples(p, zs)

eq_rows, eq_rhs = build_exact_equations(zs, selected, c_rows, leaks)

s = solve_by_gaussian(eq_rows, eq_rhs)
print(f"[+] recovered s = {s}")

p.recvuntil(b"guess secret s:\n")
p.sendline(",".join(map(str, s)).encode())

print(p.recvall().decode(errors="ignore"))
p.close()


if __name__ == "__main__":
main()

exp2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
from sage.all import *
from pwn import *
import hashlib
import itertools
import string
import ast
from collections import defaultdict

# ==================== 配置 ====================

HOST = "sgctf.xyz"
PORT = 30102

N = 128
tau = 39
eta = 2
B = 2 * tau * eta # 156
CS_BOUND = tau * eta # 78

Z_NEED = 15000
QUERY_NEED = 5000
STAGE1_BATCH = 9
STAGE2_BATCH = 30


# ==================== 工具函数 ====================

def hw32(x):
return bin(int(x) & 0xffffffff).count("1")


def recv_python_list(io):
while True:
line = io.recvline().strip()
# print("[DEBUG]", line)
if not line.startswith(b"["):
continue

try:
obj = ast.literal_eval(line.decode())
except Exception:
continue

if isinstance(obj, list):
return obj


def solve_pow(io):
io.recvuntil(b"XXXX+")
suffix = io.recvuntil(b")").decode()[:-1]
io.recvuntil(b"== ")
target = io.recvline().strip().decode()

charset = string.ascii_letters + string.digits
for tup in itertools.product(charset, repeat=4):
prefix = "".join(tup)
digest = hashlib.sha256((prefix + suffix).encode()).hexdigest()
if digest == target:
io.sendline(prefix.encode())
return

raise RuntimeError("PoW failed")


# ==================== 第一阶段:收集 z ====================

def collect_z(io):
zs = []

io.recvuntil(b"(0 to exit)\n")

while len(zs) < Z_NEED:
n = min(STAGE1_BATCH, Z_NEED - len(zs))
io.sendline(str(n).encode())

vals = recv_python_list(io)
zs.extend(map(int, vals))

print(f"[+] collected z: {len(zs)}/{Z_NEED}")

if len(zs) < Z_NEED:
io.recvuntil(b"(0 to exit)\n")

return zs


# ==================== 第二阶段:查询 c_row 和 leak ====================

def query_samples(io, zs):
selected = sorted(range(len(zs)), key=lambda i: abs(zs[i]))[:QUERY_NEED]

c_rows = []
leaks = []

io.recvuntil(b"(-1 to exit)\n")

for off in range(0, QUERY_NEED, STAGE2_BATCH):
chunk = selected[off:off + STAGE2_BATCH]
io.sendline(",".join(map(str, chunk)).encode())

ans = recv_python_list(io)

for row, leak in ans:
c_rows.append(list(map(int, row)))
leaks.append(int(leak))

print(f"[+] queried samples: {len(c_rows)}/{QUERY_NEED}")

if off + STAGE2_BATCH < QUERY_NEED:
io.recvuntil(b"(-1 to exit)\n")

return selected, c_rows, leaks


# ==================== 利用 HW32 泄露构造精确方程 ====================

def build_exact_equations(zs, selected, c_rows, leaks):
hw_map = defaultdict(list)
for y in range(-B, B + 1):
hw_map[hw32(y)].append(y)

eq_rows = []
eq_rhs = []

for i in range(len(c_rows)):
z = int(zs[selected[i]])
leak = int(leaks[i])

candidates = []
for y in hw_map[leak]:
cs = z - y
if -CS_BOUND <= cs <= CS_BOUND:
candidates.append(cs)

if len(candidates) == 1:
eq_rows.append(c_rows[i])
eq_rhs.append(candidates[0])

A = Matrix(ZZ, eq_rows)
print(f"[+] exact equations = {len(eq_rows)}, rank(A) = {A.rank()}")

return eq_rows, eq_rhs


# ==================== LLL 求解 ====================

def solve_by_lll(eq_rows, eq_rhs):
"""
对每条方程:
c_i * s = rhs_i

构造扩展矩阵:
M_i = [c_i | -rhs_i]

则真实向量:
v = (s_0, s_1, ..., s_127, 1)

满足:
M * v = 0

因此 v 位于 M 的右核中。
求右核格后 LLL,短向量通常就是 ±k*(s,1)。
"""
M = Matrix(ZZ, [list(row) + [-int(rhs)] for row, rhs in zip(eq_rows, eq_rhs)])

print(f"[+] rank(M) = {M.rank()}, kernel dim over QQ = {N + 1 - M.rank()}")

# 先在 QQ 上求右核
Kq = M.change_ring(QQ).right_kernel().basis_matrix()

if Kq.nrows() == 0:
raise RuntimeError("empty kernel")

# 把有理基向量逐行清分母,转成整数格基
K_rows = []
for r in Kq.rows():
den = ZZ(1)
for x in r:
den = lcm(den, ZZ(x.denominator()))
K_rows.append([ZZ(x * den) for x in r])

K = Matrix(ZZ, K_rows)

print(f"[+] integer kernel basis shape = {K.nrows()} x {K.ncols()}")

# LLL 约化
K_red = K.LLL()

A_check = Matrix(ZZ, eq_rows)
b_check = vector(ZZ, eq_rhs)

# 直接检查 LLL 后的基向量
for v in K_red.rows():
last = ZZ(v[-1])
if last == 0:
continue

# v 应该是 k * (s,1),所以前 128 维都要能被 last 整除
ok = True
cand = []

for i in range(N):
if ZZ(v[i]) % last != 0:
ok = False
break
cand.append(int(ZZ(v[i]) // last))

if not ok:
continue

if cand[0] != eta:
continue

if not all(-eta <= x <= eta for x in cand):
continue

if A_check * vector(ZZ, cand) != b_check:
continue

return cand

raise RuntimeError("LLL failed to find s")


# ==================== 主流程 ====================

def main():
p = remote(HOST, PORT)

solve_pow(p)

zs = collect_z(p)
selected, c_rows, leaks = query_samples(p, zs)

eq_rows, eq_rhs = build_exact_equations(zs, selected, c_rows, leaks)

s = solve_by_lll(eq_rows, eq_rhs)
print(f"[+] recovered s = {s}")

p.recvuntil(b"guess secret s:\n")
p.sendline(",".join(map(str, s)).encode())

print(p.recvall().decode(errors="ignore"))
p.close()


if __name__ == "__main__":
main()

数字中国决赛
https://yourdomain.com/2026/04/26/数字中国决赛/
作者
ddanggui
发布于
2026年4月26日
许可协议